m/pkg/event: make type-safe

This is a fairly large change which makes use of Go type parameters
(“generics”) to make the event library (and its memory/etcd
implementations) type safe.

Since we now have the event.Value interface strongly typed, we also move
options which were implementation-specific (like BacklogOnly)
to be part of that interface, instead of the previously type-asserted
specific implementations. Use of options that are not handled by a
particular implementation is a runtime error. Expressing this in the
type system is probably not worth the effort.

We also implement Filter to allow offloading some of the functionality previously implemented in type assertion wrappers into the library itself.

In the end, this ends up removing a bunch of type assertion code, at
the cost of a fairly sweeping change. Unfortunately, some of this is due
to IntelliJ suddenly deciding to reformat comments.

Change-Id: I1ca6d93db1b5c4055a21af3fb9e5e3d425c0d86e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1322
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/pkg/event/etcd/etcd.go b/metropolis/pkg/event/etcd/etcd.go
index d49a592..14dfd99 100644
--- a/metropolis/pkg/event/etcd/etcd.go
+++ b/metropolis/pkg/event/etcd/etcd.go
@@ -19,17 +19,17 @@
 	// artificially, as there currently is no code path that needs this to be
 	// strictly true.  However, users of this library might want to rely on the
 	// Value type instead of particular Value implementations.
-	_ event.ValueWatch = &Value{}
+	_ event.ValueWatch[StringAt] = &Value[StringAt]{}
 )
 
 // Value is an 'Event Value' backed in an etcd cluster, accessed over an
 // etcd client. This is a stateless handle and can be copied and shared across
 // goroutines.
-type Value struct {
+type Value[T any] struct {
+	decoder func(key, value []byte) (T, error)
 	etcd    client.Namespaced
 	key     string
 	keyEnd  string
-	decoder BytesDecoder
 }
 
 type Option struct {
@@ -67,12 +67,12 @@
 // NewValue creates a new Value for a given key(s) in an etcd client. The
 // given decoder will be used to convert bytes retrieved from etcd into the
 // interface{} value retrieved by Get by this value's watcher.
-func NewValue(etcd client.Namespaced, key string, decoder BytesDecoder, options ...*Option) *Value {
-	res := &Value{
+func NewValue[T any](etcd client.Namespaced, key string, decoder func(key, value []byte) (T, error), options ...*Option) *Value[T] {
+	res := &Value[T]{
+		decoder: decoder,
 		etcd:    etcd,
 		key:     key,
 		keyEnd:  key,
-		decoder: decoder,
 	}
 
 	for _, opt := range options {
@@ -84,26 +84,25 @@
 	return res
 }
 
-// BytesDecoder is a function that converts bytes retrieved from etcd into an
-// end-user facing value. Additionally, a key is available so that returned
-// values can be augmented with the location they were retrieved from. This is
-// especially useful when returning values resulting from an etcd range.
-//
-// If an error is returned, the Get call performed on a watcher configured with
-// this decoder will fail, swallowing that particular update, but the watcher
-// will continue to work. Any provided BytesDecoder implementations must be safe
-// to copy.
-type BytesDecoder = func(key []byte, data []byte) (interface{}, error)
-
-// NoDecoder is a no-op decoder which passes through the retrieved bytes as a
-// []byte type to the user.
-func NoDecoder(key []byte, data []byte) (interface{}, error) {
-	return data, nil
+func DecoderNoop(_, value []byte) ([]byte, error) {
+	return value, nil
 }
 
-func (e *Value) Watch() event.Watcher {
+func DecoderStringAt(key, value []byte) (StringAt, error) {
+	return StringAt{
+		Key:   string(key),
+		Value: string(value),
+	}, nil
+}
+
+type StringAt struct {
+	Key   string
+	Value string
+}
+
+func (e *Value[T]) Watch() event.Watcher[T] {
 	ctx, ctxC := context.WithCancel(context.Background())
-	return &watcher{
+	return &watcher[T]{
 		Value: *e,
 
 		ctx:  ctx,
@@ -115,9 +114,9 @@
 	}
 }
 
-type watcher struct {
+type watcher[T any] struct {
 	// Value copy, used to configure the behaviour of this watcher.
-	Value
+	Value[T]
 
 	// ctx is the context that expresses the liveness of this watcher. It is
 	// canceled when the watcher is closed, and the etcd Watch hangs off of it.
@@ -163,7 +162,7 @@
 
 // setup initiates wc (the watch channel from etcd) after retrieving the initial
 // value(s) with a get operation.
-func (w *watcher) setup(ctx context.Context) error {
+func (w *watcher[T]) setup(ctx context.Context) error {
 	if w.wc != nil {
 		return nil
 	}
@@ -231,7 +230,7 @@
 
 // backfill blocks until a backlog of items is available. An error is returned
 // if the context is canceled.
-func (w *watcher) backfill(ctx context.Context) error {
+func (w *watcher[T]) backfill(ctx context.Context) error {
 	// Keep watching for watch events.
 	for {
 		var resp *clientv3.WatchResponse
@@ -327,44 +326,16 @@
 	backlogOnly bool
 }
 
-var (
-	// BacklogOnly will prevent Get from blocking on waiting for more updates from
-	// etcd, by instead returning BacklogDone whenever no more data is currently
-	// locally available. This is different however, from establishing that there
-	// are no more pending updates from the etcd cluster - the only way to ensure
-	// the local client is up to date is by performing Get calls without this option
-	// set.
-	//
-	// This mode of retrieval should only be used for the retrieval of the existing
-	// data in the etcd cluster on the initial creation of the Watcher (by
-	// repeatedly calling Get until BacklogDone isreturned), and shouldn't be set
-	// for any subsequent call. Any use of this option after that initial fetch is
-	// undefined behaviour that exposes the internals of the Get implementation, and
-	// must not be relied on. However, in the future, this behaviour might be
-	// formalized.
-	//
-	// This mode is particularly useful for ranged watchers. Non-ranged watchers can
-	// still use this option to distinguish between blocking because of the
-	// nonexistence of an object vs. blocking because of networking issues. However,
-	// non-ranged retrieval semantics generally will rarely need to make this
-	// distinction.
-	BacklogOnly = GetOption{backlogOnly: true}
-
-	// BacklogDone is returned by Get when BacklogOnly is set and there is no more
-	// event data stored in the Watcher client, ie. when the initial cluster state
-	// of the requested key has been retrieved.
-	BacklogDone = errors.New("no more backlogged data")
-)
-
 // Get implements the Get method of the Watcher interface.
 // It can return an error in three cases:
-//  - the given context is canceled (in which case, the given error will wrap
-//    the context error)
-//  - the watcher's BytesDecoder returned an error (in which case the error
-//    returned by the BytesDecoder will be returned verbatim)
-//  - it has been called with BacklogOnly and the Watcher has no more local
-//    event data to return (see BacklogOnly for more information on the
-//    semantics of this mode of operation)
+//   - the given context is canceled (in which case, the given error will wrap
+//     the context error)
+//   - the watcher's BytesDecoder returned an error (in which case the error
+//     returned by the BytesDecoder will be returned verbatim)
+//   - it has been called with BacklogOnly and the Watcher has no more local
+//     event data to return (see BacklogOnly for more information on the
+//     semantics of this mode of operation)
+//
 // Note that transient and permanent etcd errors are never returned, and the
 // Get call will attempt to recover from these errors as much as possible. This
 // also means that the user of the Watcher will not be notified if the
@@ -377,51 +348,72 @@
 // TODO(q3k): implement internal, limited buffering for backlogged data not yet
 // consumed by client, as etcd client library seems to use an unbound buffer in
 // case this happens ( see: watcherStream.buf in clientv3).
-func (w *watcher) Get(ctx context.Context, opts ...event.GetOption) (interface{}, error) {
+func (w *watcher[T]) Get(ctx context.Context, opts ...event.GetOption[T]) (T, error) {
+	var empty T
 	select {
 	case w.getSem <- struct{}{}:
 	default:
-		return nil, fmt.Errorf("cannot Get() concurrently on a single waiter")
+		return empty, fmt.Errorf("cannot Get() concurrently on a single waiter")
 	}
 	defer func() {
 		<-w.getSem
 	}()
 
 	backlogOnly := false
-	for _, optI := range opts {
-		opt, ok := optI.(GetOption)
-		if !ok {
-			return nil, fmt.Errorf("get options must be of type etcd.GetOption")
+	var predicate func(t T) bool
+	for _, opt := range opts {
+		if opt.Predicate != nil {
+			predicate = opt.Predicate
 		}
-		if opt.backlogOnly {
+		if opt.BacklogOnly {
 			backlogOnly = true
 		}
 	}
 
+	ranged := w.key != w.keyEnd
+	if ranged && predicate != nil {
+		return empty, errors.New("filtering unimplemented for ranged etcd values")
+	}
+	if backlogOnly && predicate != nil {
+		return empty, errors.New("filtering unimplemented for backlog-only requests")
+	}
+
+	for {
+		v, err := w.getUnlocked(ctx, ranged, backlogOnly)
+		if err != nil {
+			return empty, err
+		}
+		if predicate == nil || predicate(v) {
+			return v, nil
+		}
+	}
+}
+
+func (w *watcher[T]) getUnlocked(ctx context.Context, ranged, backlogOnly bool) (T, error) {
+	var empty T
 	// Early check for context cancelations, preventing spurious contact with etcd
 	// if there's no need to.
 	if w.ctx.Err() != nil {
-		return nil, w.ctx.Err()
+		return empty, w.ctx.Err()
 	}
 
 	if err := w.setup(ctx); err != nil {
-		return nil, fmt.Errorf("when setting up watcher: %w", err)
+		return empty, fmt.Errorf("when setting up watcher: %w", err)
 	}
 
 	if backlogOnly && len(w.backlogged) == 0 {
-		return nil, BacklogDone
+		return empty, event.BacklogDone
 	}
 
 	// Update backlog from etcd if needed.
 	if len(w.backlogged) < 1 {
 		err := w.backfill(ctx)
 		if err != nil {
-			return nil, fmt.Errorf("when watching for new value: %w", err)
+			return empty, fmt.Errorf("when watching for new value: %w", err)
 		}
 	}
 	// Backlog is now guaranteed to contain at least one element.
 
-	ranged := w.key != w.keyEnd
 	if !ranged {
 		// For non-ranged queries, drain backlog fully.
 		if len(w.backlogged) != 1 {
@@ -440,7 +432,7 @@
 	}
 }
 
-func (w *watcher) Close() error {
+func (w *watcher[T]) Close() error {
 	w.ctxC()
 	return nil
 }