Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 1 | // Copyright 2020 The Monogon Project Authors. |
| 2 | // |
| 3 | // SPDX-License-Identifier: Apache-2.0 |
| 4 | // |
| 5 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | // you may not use this file except in compliance with the License. |
| 7 | // You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, software |
| 12 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | // See the License for the specific language governing permissions and |
| 15 | // limitations under the License. |
| 16 | |
| 17 | // Package event defines and implements Event Values, a mechanism in which |
| 18 | // multiple consumers can watch a value for updates in a reliable way. |
| 19 | // |
| 20 | // Values currently are kept in memory (see: MemoryValue), but a future |
| 21 | // implementation might exist for other storage backends, eg. etcd. |
| 22 | // |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 23 | // # Background and intended use |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 24 | // |
| 25 | // The Event Value library is intended to be used within Metropolis' |
| 26 | // supervisor-based runnables to communicate state changes to other runnables, |
| 27 | // while permitting both sides to restart if needed. It grew out of multiple |
| 28 | // codebases reimplementing an ad-hoc observer pattern, and from the |
| 29 | // realization that implementing all possible edge cases of such patterns is |
| 30 | // non-trivial and subject to programming errors. As such, it was turned into a |
| 31 | // self-standing library. |
| 32 | // |
| 33 | // Why not just channels? |
| 34 | // |
| 35 | // Plain channels have multiple deficiencies for this usecase: |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 36 | // - Strict FIFO behaviour: all values sent to a channel must be received, and |
| 37 | // historic and newest data must be treated in the same way. This means that |
| 38 | // a consumer of state changes must process all updates to the value as if |
| 39 | // they are the newest, and unable to skip rapid updates when a system is |
| 40 | // slowly settling due to a cascading state change. |
| 41 | // - Implementation overhead: implementing an observer |
| 42 | // registration/unregistration pattern is prone to programming bugs, |
| 43 | // especially for features like always first sending the current state to a |
| 44 | // new observer. |
| 45 | // - Strict buffer size: due to their FIFO nature and the possibility of |
| 46 | // consumers not receiving actively, channels would have to buffer all |
| 47 | // existing updates, requiring some arbitrary best-guess channel buffer |
| 48 | // sizing that would still not prevent blocking writes or data loss in a |
| 49 | // worst case scenario. |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 50 | // |
| 51 | // Or, in other words: Go channels are a synchronization primitive, not a |
| 52 | // ready-made solution to this problem. The Event Value implementation in fact |
| 53 | // extensively uses Go channels within its implementation as a building block. |
| 54 | // |
| 55 | // Why not just condition variables (sync.Cond)? |
| 56 | // |
| 57 | // Go's condition variable implementation doesn't fully address our needs |
| 58 | // either: |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 59 | // - No context/canceling support: once a condition is being Wait()ed on, |
| 60 | // this cannot be interrupted. This is especially painful and unwieldy when |
| 61 | // dealing with context-heavy code, such as Metropolis. |
| 62 | // - Spartan API: expecting users to plainly use sync.Cond is risky, as the API |
| 63 | // is fairly low-level. |
| 64 | // - No solution for late consumers: late consumers (ones that missed the value |
| 65 | // being set by a producer) would still have to implement logic in order to |
| 66 | // find out such a value, as sync.Cond only supports what amounts to |
| 67 | // edge-level triggers as part of its Broadcast/Signal system. |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 68 | // |
| 69 | // It would be possible to implement MemoryValue using a sync.Cond internally, |
| 70 | // but such an implementation would likely be more complex than the current |
| 71 | // implementation based on channels and mutexes, as it would have to work |
| 72 | // around issues like lack of canceling, etc. |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 73 | package event |
| 74 | |
| 75 | import ( |
| 76 | "context" |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 77 | "errors" |
Serge Bazanski | 9dd92d9 | 2023-03-01 14:29:07 +0000 | [diff] [blame] | 78 | |
| 79 | "source.monogon.dev/metropolis/pkg/supervisor" |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 80 | ) |
| 81 | |
| 82 | // A Value is an 'Event Value', some piece of data that can be updated ('Set') |
| 83 | // by Producers and retrieved by Consumers. |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 84 | type Value[T any] interface { |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 85 | // Set updates the Value to the given data. It is safe to call this from |
| 86 | // multiple goroutines, including concurrently. |
| 87 | // |
| 88 | // Any time Set is called, any consumers performing a Watch on this Value |
| 89 | // will be notified with the new data - even if the Set data is the same as |
| 90 | // the one that was already stored. |
| 91 | // |
| 92 | // A Value will initially have no data set. This 'no data' state is seen by |
| 93 | // consumers by the first .Get() call on the Watcher blocking until data is Set. |
| 94 | // |
| 95 | // All updates will be serialized in an arbitrary order - if multiple |
| 96 | // producers wish to perform concurrent actions to update the Value partially, |
| 97 | // this should be negotiated and serialized externally by the producers. |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 98 | Set(val T) |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 99 | |
Serge Bazanski | fac8b2e | 2021-05-04 12:23:26 +0200 | [diff] [blame] | 100 | // ValueWatch implements the Watch method. It is split out into another |
| 101 | // interface to allow some 'Event Values' to implement only the watch/read |
| 102 | // part, with the write side being implicit or defined by a more complex |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 103 | // interface than a simple Set(). |
| 104 | ValueWatch[T] |
Serge Bazanski | fac8b2e | 2021-05-04 12:23:26 +0200 | [diff] [blame] | 105 | } |
| 106 | |
| 107 | // ValueWatch is the read side of an 'Event Value', witch can by retrieved by |
| 108 | // Consumers by performing a Watch operation on it. |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 109 | type ValueWatch[T any] interface { |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 110 | // Watch retrieves a Watcher that keeps track on the version of the data |
| 111 | // contained within the Value that was last seen by a consumer. Once a |
| 112 | // Watcher is retrieved, it can be used to then get the actual data stored |
| 113 | // within the Value, and to reliably retrieve updates to it without having |
| 114 | // to poll for changes. |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 115 | Watch() Watcher[T] |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 116 | } |
| 117 | |
| 118 | // A Watcher keeps track of the last version of data seen by a consumer for a |
| 119 | // given Value. Each consumer should use an own Watcher instance, and it is not |
| 120 | // safe to use this type concurrently. However, it is safe to move/copy it |
| 121 | // across different goroutines, as long as no two goroutines access it |
| 122 | // simultaneously. |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 123 | type Watcher[T any] interface { |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 124 | // Get blocks until a Value's data is available: |
| 125 | // - On first use of a Watcher, Get will return the data contained in the |
| 126 | // value at the time of calling .Watch(), or block if no data has been |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 127 | // .Set() on it yet. If a value has been Set() since the initial |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 128 | // creation of the Watch() but before Get() is called for the first |
| 129 | // time, the first Get() call will immediately return the new value. |
| 130 | // - On subsequent uses of a Watcher, Get will block until the given Value |
| 131 | // has been Set with new data. This does not necessarily mean that the |
| 132 | // new data is different - consumers should always perform their own |
| 133 | // checks on whether the update is relevant to them (ie., the data has |
| 134 | // changed in a significant way), unless specified otherwise by a Value |
| 135 | // publisher. |
| 136 | // |
| 137 | // Get() will always return the current newest data that has been Set() on |
| 138 | // the Value, and not a full log of historical events. This is geared |
| 139 | // towards event values where consumers only care about changes to data |
| 140 | // since last retrieval, not every value that has been Set along the way. |
| 141 | // Thus, consumers need not make sure that they actively .Get() on a |
| 142 | // watcher all the times. |
| 143 | // |
| 144 | // If the context is canceled before data is available to be returned, the |
| 145 | // context's error will be returned. However, the Watcher will still need to be |
| 146 | // Closed, as it is still fully functional after the context has been canceled. |
| 147 | // |
| 148 | // Concurrent requests to Get result in an error. The reasoning to return |
| 149 | // an error instead of attempting to serialize the requests is that any |
| 150 | // concurrent access from multiple goroutines would cause a desync in the |
| 151 | // next usage of the Watcher. For example: |
| 152 | // 1) w.Get() (in G0) and w.Get(G1) start. They both block waiting for an |
| 153 | // initial value. |
| 154 | // 2) v.Set(0) |
| 155 | // 3) w.Get() in G0 returns 0, |
| 156 | // 4) v.Set(1) |
| 157 | // 4) w.Get() in G1 returns 1, |
| 158 | // This would cause G0 and G1 to become desynchronized between eachother |
| 159 | // (both have different value data) and subsequent updates will also |
| 160 | // continue skipping some updates. |
| 161 | // If multiple goroutines need to access the Value, they should each use |
| 162 | // their own Watcher. |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 163 | Get(context.Context, ...GetOption[T]) (T, error) |
Serge Bazanski | c00318e | 2021-03-03 12:39:24 +0100 | [diff] [blame] | 164 | |
| 165 | // Close must be called if the Watcher is not going to be used anymore - |
| 166 | // otherwise, a goroutine will leak. |
| 167 | Close() error |
| 168 | } |
Serge Bazanski | 8d45a05 | 2021-10-18 17:24:24 +0200 | [diff] [blame] | 169 | |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 170 | type GetOption[T any] struct { |
| 171 | Predicate func(t T) bool |
| 172 | BacklogOnly bool |
| 173 | } |
| 174 | |
| 175 | func Filter[T any](pred func(T) bool) GetOption[T] { |
| 176 | return GetOption[T]{ |
| 177 | Predicate: pred, |
| 178 | } |
| 179 | } |
| 180 | |
| 181 | // BacklogOnly will prevent Get from blocking on waiting for more updates from |
| 182 | // etcd, by instead returning BacklogDone whenever no more data is currently |
| 183 | // locally available. This is different however, from establishing that there |
| 184 | // are no more pending updates from the etcd cluster - the only way to ensure |
| 185 | // the local client is up to date is by performing Get calls without this option |
| 186 | // set. |
| 187 | // |
| 188 | // This mode of retrieval should only be used for the retrieval of the existing |
| 189 | // data in the etcd cluster on the initial creation of the Watcher (by |
| 190 | // repeatedly calling Get until BacklogDone is returned), and shouldn't be set |
| 191 | // for any subsequent call. Any use of this option after that initial fetch is |
| 192 | // undefined behaviour that exposes the internals of the Get implementation, and |
| 193 | // must not be relied on. However, in the future, this behaviour might be |
| 194 | // formalized. |
| 195 | // |
| 196 | // This mode is particularly useful for ranged watchers. Non-ranged watchers can |
| 197 | // still use this option to distinguish between blocking because of the |
| 198 | // nonexistence of an object vs. blocking because of networking issues. However, |
| 199 | // non-ranged retrieval semantics generally will rarely need to make this |
| 200 | // distinction. |
| 201 | func BacklogOnly[T any]() GetOption[T] { |
| 202 | return GetOption[T]{BacklogOnly: true} |
| 203 | } |
| 204 | |
| 205 | var ( |
| 206 | // BacklogDone is returned by Get when BacklogOnly is set and there is no more |
| 207 | // event data stored in the Watcher client, ie. when the initial cluster state |
| 208 | // of the requested key has been retrieved. |
| 209 | BacklogDone = errors.New("no more backlogged data") |
| 210 | ) |
Serge Bazanski | 9dd92d9 | 2023-03-01 14:29:07 +0000 | [diff] [blame] | 211 | |
| 212 | // Pipe a Value's initial state and subsequent updates to an already existing |
| 213 | // channel in a supervisor.Runnable. This is mostly useful when wanting to select |
| 214 | // {} on many Values. |
| 215 | // |
| 216 | // The given channel will NOT be closed when the runnable exits. The process |
| 217 | // receiving from the channel should be running in a group with the pipe |
| 218 | // runnable, so that both restart if either does. This ensures that there is always |
| 219 | // at least one value in the channel when the receiver starts. |
| 220 | func Pipe[T any](value Value[T], c chan<- T, opts ...GetOption[T]) supervisor.Runnable { |
| 221 | return func(ctx context.Context) error { |
| 222 | supervisor.Signal(ctx, supervisor.SignalHealthy) |
| 223 | w := value.Watch() |
| 224 | defer w.Close() |
| 225 | for { |
| 226 | v, err := w.Get(ctx, opts...) |
| 227 | if err != nil { |
| 228 | return err |
| 229 | } |
| 230 | c <- v |
| 231 | } |
| 232 | } |
| 233 | } |