blob: 0898eb5405dce399201f58acc497ce37d5df8695 [file] [log] [blame]
Serge Bazanskic00318e2021-03-03 12:39:24 +01001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17// Package event defines and implements Event Values, a mechanism in which
18// multiple consumers can watch a value for updates in a reliable way.
19//
20// Values currently are kept in memory (see: MemoryValue), but a future
21// implementation might exist for other storage backends, eg. etcd.
22//
Serge Bazanski37110c32023-03-01 13:57:27 +000023// # Background and intended use
Serge Bazanskic00318e2021-03-03 12:39:24 +010024//
25// The Event Value library is intended to be used within Metropolis'
26// supervisor-based runnables to communicate state changes to other runnables,
27// while permitting both sides to restart if needed. It grew out of multiple
28// codebases reimplementing an ad-hoc observer pattern, and from the
29// realization that implementing all possible edge cases of such patterns is
30// non-trivial and subject to programming errors. As such, it was turned into a
31// self-standing library.
32//
33// Why not just channels?
34//
35// Plain channels have multiple deficiencies for this usecase:
Serge Bazanski37110c32023-03-01 13:57:27 +000036// - Strict FIFO behaviour: all values sent to a channel must be received, and
37// historic and newest data must be treated in the same way. This means that
38// a consumer of state changes must process all updates to the value as if
39// they are the newest, and unable to skip rapid updates when a system is
40// slowly settling due to a cascading state change.
41// - Implementation overhead: implementing an observer
42// registration/unregistration pattern is prone to programming bugs,
43// especially for features like always first sending the current state to a
44// new observer.
45// - Strict buffer size: due to their FIFO nature and the possibility of
46// consumers not receiving actively, channels would have to buffer all
47// existing updates, requiring some arbitrary best-guess channel buffer
48// sizing that would still not prevent blocking writes or data loss in a
49// worst case scenario.
Serge Bazanskic00318e2021-03-03 12:39:24 +010050//
51// Or, in other words: Go channels are a synchronization primitive, not a
52// ready-made solution to this problem. The Event Value implementation in fact
53// extensively uses Go channels within its implementation as a building block.
54//
55// Why not just condition variables (sync.Cond)?
56//
57// Go's condition variable implementation doesn't fully address our needs
58// either:
Serge Bazanski37110c32023-03-01 13:57:27 +000059// - No context/canceling support: once a condition is being Wait()ed on,
60// this cannot be interrupted. This is especially painful and unwieldy when
61// dealing with context-heavy code, such as Metropolis.
62// - Spartan API: expecting users to plainly use sync.Cond is risky, as the API
63// is fairly low-level.
64// - No solution for late consumers: late consumers (ones that missed the value
65// being set by a producer) would still have to implement logic in order to
66// find out such a value, as sync.Cond only supports what amounts to
67// edge-level triggers as part of its Broadcast/Signal system.
Serge Bazanskic00318e2021-03-03 12:39:24 +010068//
69// It would be possible to implement MemoryValue using a sync.Cond internally,
70// but such an implementation would likely be more complex than the current
71// implementation based on channels and mutexes, as it would have to work
72// around issues like lack of canceling, etc.
Serge Bazanskic00318e2021-03-03 12:39:24 +010073package event
74
75import (
76 "context"
Serge Bazanski37110c32023-03-01 13:57:27 +000077 "errors"
Serge Bazanski9dd92d92023-03-01 14:29:07 +000078
79 "source.monogon.dev/metropolis/pkg/supervisor"
Serge Bazanskic00318e2021-03-03 12:39:24 +010080)
81
82// A Value is an 'Event Value', some piece of data that can be updated ('Set')
83// by Producers and retrieved by Consumers.
Serge Bazanski37110c32023-03-01 13:57:27 +000084type Value[T any] interface {
Serge Bazanskic00318e2021-03-03 12:39:24 +010085 // Set updates the Value to the given data. It is safe to call this from
86 // multiple goroutines, including concurrently.
87 //
88 // Any time Set is called, any consumers performing a Watch on this Value
89 // will be notified with the new data - even if the Set data is the same as
90 // the one that was already stored.
91 //
92 // A Value will initially have no data set. This 'no data' state is seen by
93 // consumers by the first .Get() call on the Watcher blocking until data is Set.
94 //
95 // All updates will be serialized in an arbitrary order - if multiple
96 // producers wish to perform concurrent actions to update the Value partially,
97 // this should be negotiated and serialized externally by the producers.
Serge Bazanski37110c32023-03-01 13:57:27 +000098 Set(val T)
Serge Bazanskic00318e2021-03-03 12:39:24 +010099
Serge Bazanskifac8b2e2021-05-04 12:23:26 +0200100 // ValueWatch implements the Watch method. It is split out into another
101 // interface to allow some 'Event Values' to implement only the watch/read
102 // part, with the write side being implicit or defined by a more complex
Serge Bazanski37110c32023-03-01 13:57:27 +0000103 // interface than a simple Set().
104 ValueWatch[T]
Serge Bazanskifac8b2e2021-05-04 12:23:26 +0200105}
106
107// ValueWatch is the read side of an 'Event Value', witch can by retrieved by
108// Consumers by performing a Watch operation on it.
Serge Bazanski37110c32023-03-01 13:57:27 +0000109type ValueWatch[T any] interface {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100110 // Watch retrieves a Watcher that keeps track on the version of the data
111 // contained within the Value that was last seen by a consumer. Once a
112 // Watcher is retrieved, it can be used to then get the actual data stored
113 // within the Value, and to reliably retrieve updates to it without having
114 // to poll for changes.
Serge Bazanski37110c32023-03-01 13:57:27 +0000115 Watch() Watcher[T]
Serge Bazanskic00318e2021-03-03 12:39:24 +0100116}
117
118// A Watcher keeps track of the last version of data seen by a consumer for a
119// given Value. Each consumer should use an own Watcher instance, and it is not
120// safe to use this type concurrently. However, it is safe to move/copy it
121// across different goroutines, as long as no two goroutines access it
122// simultaneously.
Serge Bazanski37110c32023-03-01 13:57:27 +0000123type Watcher[T any] interface {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100124 // Get blocks until a Value's data is available:
125 // - On first use of a Watcher, Get will return the data contained in the
126 // value at the time of calling .Watch(), or block if no data has been
Serge Bazanski37110c32023-03-01 13:57:27 +0000127 // .Set() on it yet. If a value has been Set() since the initial
Serge Bazanskic00318e2021-03-03 12:39:24 +0100128 // creation of the Watch() but before Get() is called for the first
129 // time, the first Get() call will immediately return the new value.
130 // - On subsequent uses of a Watcher, Get will block until the given Value
131 // has been Set with new data. This does not necessarily mean that the
132 // new data is different - consumers should always perform their own
133 // checks on whether the update is relevant to them (ie., the data has
134 // changed in a significant way), unless specified otherwise by a Value
135 // publisher.
136 //
137 // Get() will always return the current newest data that has been Set() on
138 // the Value, and not a full log of historical events. This is geared
139 // towards event values where consumers only care about changes to data
140 // since last retrieval, not every value that has been Set along the way.
141 // Thus, consumers need not make sure that they actively .Get() on a
142 // watcher all the times.
143 //
144 // If the context is canceled before data is available to be returned, the
145 // context's error will be returned. However, the Watcher will still need to be
146 // Closed, as it is still fully functional after the context has been canceled.
147 //
148 // Concurrent requests to Get result in an error. The reasoning to return
149 // an error instead of attempting to serialize the requests is that any
150 // concurrent access from multiple goroutines would cause a desync in the
151 // next usage of the Watcher. For example:
152 // 1) w.Get() (in G0) and w.Get(G1) start. They both block waiting for an
153 // initial value.
154 // 2) v.Set(0)
155 // 3) w.Get() in G0 returns 0,
156 // 4) v.Set(1)
157 // 4) w.Get() in G1 returns 1,
158 // This would cause G0 and G1 to become desynchronized between eachother
159 // (both have different value data) and subsequent updates will also
160 // continue skipping some updates.
161 // If multiple goroutines need to access the Value, they should each use
162 // their own Watcher.
Serge Bazanski37110c32023-03-01 13:57:27 +0000163 Get(context.Context, ...GetOption[T]) (T, error)
Serge Bazanskic00318e2021-03-03 12:39:24 +0100164
165 // Close must be called if the Watcher is not going to be used anymore -
166 // otherwise, a goroutine will leak.
167 Close() error
168}
Serge Bazanski8d45a052021-10-18 17:24:24 +0200169
Serge Bazanski37110c32023-03-01 13:57:27 +0000170type GetOption[T any] struct {
171 Predicate func(t T) bool
172 BacklogOnly bool
173}
174
175func Filter[T any](pred func(T) bool) GetOption[T] {
176 return GetOption[T]{
177 Predicate: pred,
178 }
179}
180
181// BacklogOnly will prevent Get from blocking on waiting for more updates from
182// etcd, by instead returning BacklogDone whenever no more data is currently
183// locally available. This is different however, from establishing that there
184// are no more pending updates from the etcd cluster - the only way to ensure
185// the local client is up to date is by performing Get calls without this option
186// set.
187//
188// This mode of retrieval should only be used for the retrieval of the existing
189// data in the etcd cluster on the initial creation of the Watcher (by
190// repeatedly calling Get until BacklogDone is returned), and shouldn't be set
191// for any subsequent call. Any use of this option after that initial fetch is
192// undefined behaviour that exposes the internals of the Get implementation, and
193// must not be relied on. However, in the future, this behaviour might be
194// formalized.
195//
196// This mode is particularly useful for ranged watchers. Non-ranged watchers can
197// still use this option to distinguish between blocking because of the
198// nonexistence of an object vs. blocking because of networking issues. However,
199// non-ranged retrieval semantics generally will rarely need to make this
200// distinction.
201func BacklogOnly[T any]() GetOption[T] {
202 return GetOption[T]{BacklogOnly: true}
203}
204
205var (
206 // BacklogDone is returned by Get when BacklogOnly is set and there is no more
207 // event data stored in the Watcher client, ie. when the initial cluster state
208 // of the requested key has been retrieved.
209 BacklogDone = errors.New("no more backlogged data")
210)
Serge Bazanski9dd92d92023-03-01 14:29:07 +0000211
212// Pipe a Value's initial state and subsequent updates to an already existing
213// channel in a supervisor.Runnable. This is mostly useful when wanting to select
214// {} on many Values.
215//
216// The given channel will NOT be closed when the runnable exits. The process
217// receiving from the channel should be running in a group with the pipe
218// runnable, so that both restart if either does. This ensures that there is always
219// at least one value in the channel when the receiver starts.
220func Pipe[T any](value Value[T], c chan<- T, opts ...GetOption[T]) supervisor.Runnable {
221 return func(ctx context.Context) error {
222 supervisor.Signal(ctx, supervisor.SignalHealthy)
223 w := value.Watch()
224 defer w.Close()
225 for {
226 v, err := w.Get(ctx, opts...)
227 if err != nil {
228 return err
229 }
230 c <- v
231 }
232 }
233}