blob: 8d2c077c3bf1fbe636b37e728b15d5089de4efcb [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
Serge Bazanskic00318e2021-03-03 12:39:24 +01002// SPDX-License-Identifier: Apache-2.0
Serge Bazanskic00318e2021-03-03 12:39:24 +01003
4// Package event defines and implements Event Values, a mechanism in which
5// multiple consumers can watch a value for updates in a reliable way.
6//
7// Values currently are kept in memory (see: MemoryValue), but a future
8// implementation might exist for other storage backends, eg. etcd.
9//
Serge Bazanski37110c32023-03-01 13:57:27 +000010// # Background and intended use
Serge Bazanskic00318e2021-03-03 12:39:24 +010011//
12// The Event Value library is intended to be used within Metropolis'
13// supervisor-based runnables to communicate state changes to other runnables,
14// while permitting both sides to restart if needed. It grew out of multiple
15// codebases reimplementing an ad-hoc observer pattern, and from the
16// realization that implementing all possible edge cases of such patterns is
17// non-trivial and subject to programming errors. As such, it was turned into a
18// self-standing library.
19//
20// Why not just channels?
21//
22// Plain channels have multiple deficiencies for this usecase:
Serge Bazanski37110c32023-03-01 13:57:27 +000023// - Strict FIFO behaviour: all values sent to a channel must be received, and
24// historic and newest data must be treated in the same way. This means that
25// a consumer of state changes must process all updates to the value as if
26// they are the newest, and unable to skip rapid updates when a system is
27// slowly settling due to a cascading state change.
28// - Implementation overhead: implementing an observer
29// registration/unregistration pattern is prone to programming bugs,
30// especially for features like always first sending the current state to a
31// new observer.
32// - Strict buffer size: due to their FIFO nature and the possibility of
33// consumers not receiving actively, channels would have to buffer all
34// existing updates, requiring some arbitrary best-guess channel buffer
35// sizing that would still not prevent blocking writes or data loss in a
36// worst case scenario.
Serge Bazanskic00318e2021-03-03 12:39:24 +010037//
38// Or, in other words: Go channels are a synchronization primitive, not a
39// ready-made solution to this problem. The Event Value implementation in fact
40// extensively uses Go channels within its implementation as a building block.
41//
42// Why not just condition variables (sync.Cond)?
43//
44// Go's condition variable implementation doesn't fully address our needs
45// either:
Serge Bazanski37110c32023-03-01 13:57:27 +000046// - No context/canceling support: once a condition is being Wait()ed on,
47// this cannot be interrupted. This is especially painful and unwieldy when
48// dealing with context-heavy code, such as Metropolis.
49// - Spartan API: expecting users to plainly use sync.Cond is risky, as the API
50// is fairly low-level.
51// - No solution for late consumers: late consumers (ones that missed the value
52// being set by a producer) would still have to implement logic in order to
53// find out such a value, as sync.Cond only supports what amounts to
54// edge-level triggers as part of its Broadcast/Signal system.
Serge Bazanskic00318e2021-03-03 12:39:24 +010055//
56// It would be possible to implement MemoryValue using a sync.Cond internally,
57// but such an implementation would likely be more complex than the current
58// implementation based on channels and mutexes, as it would have to work
59// around issues like lack of canceling, etc.
Serge Bazanskic00318e2021-03-03 12:39:24 +010060package event
61
62import (
63 "context"
Serge Bazanski37110c32023-03-01 13:57:27 +000064 "errors"
Serge Bazanski9dd92d92023-03-01 14:29:07 +000065
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020066 "source.monogon.dev/osbase/supervisor"
Serge Bazanskic00318e2021-03-03 12:39:24 +010067)
68
69// A Value is an 'Event Value', some piece of data that can be updated ('Set')
70// by Producers and retrieved by Consumers.
Serge Bazanski37110c32023-03-01 13:57:27 +000071type Value[T any] interface {
Serge Bazanskic00318e2021-03-03 12:39:24 +010072 // Set updates the Value to the given data. It is safe to call this from
73 // multiple goroutines, including concurrently.
74 //
75 // Any time Set is called, any consumers performing a Watch on this Value
76 // will be notified with the new data - even if the Set data is the same as
77 // the one that was already stored.
78 //
79 // A Value will initially have no data set. This 'no data' state is seen by
80 // consumers by the first .Get() call on the Watcher blocking until data is Set.
81 //
82 // All updates will be serialized in an arbitrary order - if multiple
83 // producers wish to perform concurrent actions to update the Value partially,
84 // this should be negotiated and serialized externally by the producers.
Serge Bazanski37110c32023-03-01 13:57:27 +000085 Set(val T)
Serge Bazanskic00318e2021-03-03 12:39:24 +010086
Serge Bazanskifac8b2e2021-05-04 12:23:26 +020087 // ValueWatch implements the Watch method. It is split out into another
88 // interface to allow some 'Event Values' to implement only the watch/read
89 // part, with the write side being implicit or defined by a more complex
Serge Bazanski37110c32023-03-01 13:57:27 +000090 // interface than a simple Set().
91 ValueWatch[T]
Serge Bazanskifac8b2e2021-05-04 12:23:26 +020092}
93
94// ValueWatch is the read side of an 'Event Value', witch can by retrieved by
95// Consumers by performing a Watch operation on it.
Serge Bazanski37110c32023-03-01 13:57:27 +000096type ValueWatch[T any] interface {
Serge Bazanskic00318e2021-03-03 12:39:24 +010097 // Watch retrieves a Watcher that keeps track on the version of the data
98 // contained within the Value that was last seen by a consumer. Once a
99 // Watcher is retrieved, it can be used to then get the actual data stored
100 // within the Value, and to reliably retrieve updates to it without having
101 // to poll for changes.
Serge Bazanski37110c32023-03-01 13:57:27 +0000102 Watch() Watcher[T]
Serge Bazanskic00318e2021-03-03 12:39:24 +0100103}
104
105// A Watcher keeps track of the last version of data seen by a consumer for a
106// given Value. Each consumer should use an own Watcher instance, and it is not
107// safe to use this type concurrently. However, it is safe to move/copy it
108// across different goroutines, as long as no two goroutines access it
109// simultaneously.
Serge Bazanski37110c32023-03-01 13:57:27 +0000110type Watcher[T any] interface {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100111 // Get blocks until a Value's data is available:
112 // - On first use of a Watcher, Get will return the data contained in the
113 // value at the time of calling .Watch(), or block if no data has been
Serge Bazanski37110c32023-03-01 13:57:27 +0000114 // .Set() on it yet. If a value has been Set() since the initial
Serge Bazanskic00318e2021-03-03 12:39:24 +0100115 // creation of the Watch() but before Get() is called for the first
116 // time, the first Get() call will immediately return the new value.
117 // - On subsequent uses of a Watcher, Get will block until the given Value
118 // has been Set with new data. This does not necessarily mean that the
119 // new data is different - consumers should always perform their own
120 // checks on whether the update is relevant to them (ie., the data has
121 // changed in a significant way), unless specified otherwise by a Value
122 // publisher.
123 //
124 // Get() will always return the current newest data that has been Set() on
125 // the Value, and not a full log of historical events. This is geared
126 // towards event values where consumers only care about changes to data
127 // since last retrieval, not every value that has been Set along the way.
128 // Thus, consumers need not make sure that they actively .Get() on a
129 // watcher all the times.
130 //
131 // If the context is canceled before data is available to be returned, the
132 // context's error will be returned. However, the Watcher will still need to be
133 // Closed, as it is still fully functional after the context has been canceled.
134 //
135 // Concurrent requests to Get result in an error. The reasoning to return
136 // an error instead of attempting to serialize the requests is that any
137 // concurrent access from multiple goroutines would cause a desync in the
138 // next usage of the Watcher. For example:
139 // 1) w.Get() (in G0) and w.Get(G1) start. They both block waiting for an
140 // initial value.
141 // 2) v.Set(0)
142 // 3) w.Get() in G0 returns 0,
143 // 4) v.Set(1)
144 // 4) w.Get() in G1 returns 1,
145 // This would cause G0 and G1 to become desynchronized between eachother
146 // (both have different value data) and subsequent updates will also
147 // continue skipping some updates.
148 // If multiple goroutines need to access the Value, they should each use
149 // their own Watcher.
Serge Bazanski37110c32023-03-01 13:57:27 +0000150 Get(context.Context, ...GetOption[T]) (T, error)
Serge Bazanskic00318e2021-03-03 12:39:24 +0100151
152 // Close must be called if the Watcher is not going to be used anymore -
Jan Schär36bde9c2024-03-19 15:05:33 +0100153 // otherwise, it will not be garbage collected.
Serge Bazanskic00318e2021-03-03 12:39:24 +0100154 Close() error
155}
Serge Bazanski8d45a052021-10-18 17:24:24 +0200156
Serge Bazanski37110c32023-03-01 13:57:27 +0000157type GetOption[T any] struct {
158 Predicate func(t T) bool
159 BacklogOnly bool
160}
161
162func Filter[T any](pred func(T) bool) GetOption[T] {
163 return GetOption[T]{
164 Predicate: pred,
165 }
166}
167
168// BacklogOnly will prevent Get from blocking on waiting for more updates from
Tim Windelschmidt513df182024-04-18 23:44:50 +0200169// etcd, by instead returning ErrBacklogDone whenever no more data is currently
Serge Bazanski37110c32023-03-01 13:57:27 +0000170// locally available. This is different however, from establishing that there
171// are no more pending updates from the etcd cluster - the only way to ensure
172// the local client is up to date is by performing Get calls without this option
173// set.
174//
175// This mode of retrieval should only be used for the retrieval of the existing
176// data in the etcd cluster on the initial creation of the Watcher (by
Tim Windelschmidt513df182024-04-18 23:44:50 +0200177// repeatedly calling Get until ErrBacklogDone is returned), and shouldn't be set
Serge Bazanski37110c32023-03-01 13:57:27 +0000178// for any subsequent call. Any use of this option after that initial fetch is
179// undefined behaviour that exposes the internals of the Get implementation, and
180// must not be relied on. However, in the future, this behaviour might be
181// formalized.
182//
183// This mode is particularly useful for ranged watchers. Non-ranged watchers can
184// still use this option to distinguish between blocking because of the
185// nonexistence of an object vs. blocking because of networking issues. However,
186// non-ranged retrieval semantics generally will rarely need to make this
187// distinction.
188func BacklogOnly[T any]() GetOption[T] {
189 return GetOption[T]{BacklogOnly: true}
190}
191
192var (
Tim Windelschmidt513df182024-04-18 23:44:50 +0200193 // ErrBacklogDone is returned by Get when BacklogOnly is set and there is no more
Serge Bazanski37110c32023-03-01 13:57:27 +0000194 // event data stored in the Watcher client, ie. when the initial cluster state
195 // of the requested key has been retrieved.
Tim Windelschmidt513df182024-04-18 23:44:50 +0200196 ErrBacklogDone = errors.New("no more backlogged data")
Serge Bazanski37110c32023-03-01 13:57:27 +0000197)
Serge Bazanski9dd92d92023-03-01 14:29:07 +0000198
199// Pipe a Value's initial state and subsequent updates to an already existing
200// channel in a supervisor.Runnable. This is mostly useful when wanting to select
201// {} on many Values.
202//
203// The given channel will NOT be closed when the runnable exits. The process
204// receiving from the channel should be running in a group with the pipe
205// runnable, so that both restart if either does. This ensures that there is always
206// at least one value in the channel when the receiver starts.
207func Pipe[T any](value Value[T], c chan<- T, opts ...GetOption[T]) supervisor.Runnable {
208 return func(ctx context.Context) error {
209 supervisor.Signal(ctx, supervisor.SignalHealthy)
210 w := value.Watch()
211 defer w.Close()
212 for {
213 v, err := w.Get(ctx, opts...)
214 if err != nil {
215 return err
216 }
Serge Bazanski8d377ce2024-03-28 14:20:03 +0100217 select {
218 case c <- v:
219 case <-ctx.Done():
220 return ctx.Err()
221 }
Serge Bazanski9dd92d92023-03-01 14:29:07 +0000222 }
223 }
224}