m/pkg/event: make type-safe
This is a fairly large change which makes use of Go type parameters
(“generics”) to make the event library (and its memory/etcd
implementations) type safe.
Since we now have the event.Value interface strongly typed, we also move
options which were implementation-specific (like BacklogOnly)
to be part of that interface, instead of the previously type-asserted
specific implementations. Use of options that are not handled by a
particular implementation is a runtime error. Expressing this in the
type system is probably not worth the effort.
We also implement Filter to allow offloading some of the functionality previously implemented in type assertion wrappers into the library itself.
In the end, this ends up removing a bunch of type assertion code, at
the cost of a fairly sweeping change. Unfortunately, some of this is due
to IntelliJ suddenly deciding to reformat comments.
Change-Id: I1ca6d93db1b5c4055a21af3fb9e5e3d425c0d86e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1322
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/pkg/event/event.go b/metropolis/pkg/event/event.go
index bc46525..ba9190c 100644
--- a/metropolis/pkg/event/event.go
+++ b/metropolis/pkg/event/event.go
@@ -20,7 +20,7 @@
// Values currently are kept in memory (see: MemoryValue), but a future
// implementation might exist for other storage backends, eg. etcd.
//
-// Background and intended use
+// # Background and intended use
//
// The Event Value library is intended to be used within Metropolis'
// supervisor-based runnables to communicate state changes to other runnables,
@@ -33,20 +33,20 @@
// Why not just channels?
//
// Plain channels have multiple deficiencies for this usecase:
-// - Strict FIFO behaviour: all values sent to a channel must be received, and
-// historic and newest data must be treated in the same way. This means that
-// a consumer of state changes must process all updates to the value as if
-// they are the newest, and unable to skip rapid updates when a system is
-// slowly settling due to a cascading state change.
-// - Implementation overhead: implementing an observer
-// registration/unregistration pattern is prone to programming bugs,
-// especially for features like always first sending the current state to a
-// new observer.
-// - Strict buffer size: due to their FIFO nature and the possibility of
-// consumers not receiving actively, channels would have to buffer all
-// existing updates, requiring some arbitrary best-guess channel buffer
-// sizing that would still not prevent blocking writes or data loss in a
-// worst case scenario.
+// - Strict FIFO behaviour: all values sent to a channel must be received, and
+// historic and newest data must be treated in the same way. This means that
+// a consumer of state changes must process all updates to the value as if
+// they are the newest, and unable to skip rapid updates when a system is
+// slowly settling due to a cascading state change.
+// - Implementation overhead: implementing an observer
+// registration/unregistration pattern is prone to programming bugs,
+// especially for features like always first sending the current state to a
+// new observer.
+// - Strict buffer size: due to their FIFO nature and the possibility of
+// consumers not receiving actively, channels would have to buffer all
+// existing updates, requiring some arbitrary best-guess channel buffer
+// sizing that would still not prevent blocking writes or data loss in a
+// worst case scenario.
//
// Or, in other words: Go channels are a synchronization primitive, not a
// ready-made solution to this problem. The Event Value implementation in fact
@@ -56,40 +56,30 @@
//
// Go's condition variable implementation doesn't fully address our needs
// either:
-// - No context/canceling support: once a condition is being Wait()ed on,
-// this cannot be interrupted. This is especially painful and unwieldy when
-// dealing with context-heavy code, such as Metropolis.
-// - Spartan API: expecting users to plainly use sync.Cond is risky, as the API
-// is fairly low-level.
-// - No solution for late consumers: late consumers (ones that missed the value
-// being set by a producer) would still have to implement logic in order to
-// find out such a value, as sync.Cond only supports what amounts to
-// edge-level triggers as part of its Broadcast/Signal system.
+// - No context/canceling support: once a condition is being Wait()ed on,
+// this cannot be interrupted. This is especially painful and unwieldy when
+// dealing with context-heavy code, such as Metropolis.
+// - Spartan API: expecting users to plainly use sync.Cond is risky, as the API
+// is fairly low-level.
+// - No solution for late consumers: late consumers (ones that missed the value
+// being set by a producer) would still have to implement logic in order to
+// find out such a value, as sync.Cond only supports what amounts to
+// edge-level triggers as part of its Broadcast/Signal system.
//
// It would be possible to implement MemoryValue using a sync.Cond internally,
// but such an implementation would likely be more complex than the current
// implementation based on channels and mutexes, as it would have to work
// around issues like lack of canceling, etc.
-//
-// Type safety
-//
-// The Value/Watcher interfaces are, unfortunately, implemented using
-// interface{}. There was an attempt to use Go's existing generic types facility
-// (interfaces) to solve this problem. However, with Type Parameters likely soon
-// appearing in mainline Go, this was not a priority, as that will fully solve
-// this problem without requiring mental gymnastics. For now, users of this
-// library will have to write some boilerplate code to allow consumers/watchers
-// to access the data in a a typesafe manner without assertions. See
-// ExampleValue_full for one possible approach to this.
package event
import (
"context"
+ "errors"
)
// A Value is an 'Event Value', some piece of data that can be updated ('Set')
// by Producers and retrieved by Consumers.
-type Value interface {
+type Value[T any] interface {
// Set updates the Value to the given data. It is safe to call this from
// multiple goroutines, including concurrently.
//
@@ -103,24 +93,24 @@
// All updates will be serialized in an arbitrary order - if multiple
// producers wish to perform concurrent actions to update the Value partially,
// this should be negotiated and serialized externally by the producers.
- Set(val interface{})
+ Set(val T)
// ValueWatch implements the Watch method. It is split out into another
// interface to allow some 'Event Values' to implement only the watch/read
// part, with the write side being implicit or defined by a more complex
- // interface then a simple Set().
- ValueWatch
+ // interface than a simple Set().
+ ValueWatch[T]
}
// ValueWatch is the read side of an 'Event Value', witch can by retrieved by
// Consumers by performing a Watch operation on it.
-type ValueWatch interface {
+type ValueWatch[T any] interface {
// Watch retrieves a Watcher that keeps track on the version of the data
// contained within the Value that was last seen by a consumer. Once a
// Watcher is retrieved, it can be used to then get the actual data stored
// within the Value, and to reliably retrieve updates to it without having
// to poll for changes.
- Watch() Watcher
+ Watch() Watcher[T]
}
// A Watcher keeps track of the last version of data seen by a consumer for a
@@ -128,11 +118,11 @@
// safe to use this type concurrently. However, it is safe to move/copy it
// across different goroutines, as long as no two goroutines access it
// simultaneously.
-type Watcher interface {
+type Watcher[T any] interface {
// Get blocks until a Value's data is available:
// - On first use of a Watcher, Get will return the data contained in the
// value at the time of calling .Watch(), or block if no data has been
- // .Set() on it yet. If a value has been Set() since the the initial
+ // .Set() on it yet. If a value has been Set() since the initial
// creation of the Watch() but before Get() is called for the first
// time, the first Get() call will immediately return the new value.
// - On subsequent uses of a Watcher, Get will block until the given Value
@@ -168,11 +158,51 @@
// continue skipping some updates.
// If multiple goroutines need to access the Value, they should each use
// their own Watcher.
- Get(context.Context, ...GetOption) (interface{}, error)
+ Get(context.Context, ...GetOption[T]) (T, error)
// Close must be called if the Watcher is not going to be used anymore -
// otherwise, a goroutine will leak.
Close() error
}
-type GetOption interface{}
+type GetOption[T any] struct {
+ Predicate func(t T) bool
+ BacklogOnly bool
+}
+
+func Filter[T any](pred func(T) bool) GetOption[T] {
+ return GetOption[T]{
+ Predicate: pred,
+ }
+}
+
+// BacklogOnly will prevent Get from blocking on waiting for more updates from
+// etcd, by instead returning BacklogDone whenever no more data is currently
+// locally available. This is different however, from establishing that there
+// are no more pending updates from the etcd cluster - the only way to ensure
+// the local client is up to date is by performing Get calls without this option
+// set.
+//
+// This mode of retrieval should only be used for the retrieval of the existing
+// data in the etcd cluster on the initial creation of the Watcher (by
+// repeatedly calling Get until BacklogDone is returned), and shouldn't be set
+// for any subsequent call. Any use of this option after that initial fetch is
+// undefined behaviour that exposes the internals of the Get implementation, and
+// must not be relied on. However, in the future, this behaviour might be
+// formalized.
+//
+// This mode is particularly useful for ranged watchers. Non-ranged watchers can
+// still use this option to distinguish between blocking because of the
+// nonexistence of an object vs. blocking because of networking issues. However,
+// non-ranged retrieval semantics generally will rarely need to make this
+// distinction.
+func BacklogOnly[T any]() GetOption[T] {
+ return GetOption[T]{BacklogOnly: true}
+}
+
+var (
+ // BacklogDone is returned by Get when BacklogOnly is set and there is no more
+ // event data stored in the Watcher client, ie. when the initial cluster state
+ // of the requested key has been retrieved.
+ BacklogDone = errors.New("no more backlogged data")
+)