m/pkg/event: make type-safe

This is a fairly large change which makes use of Go type parameters
(“generics”) to make the event library (and its memory/etcd
implementations) type safe.

Since we now have the event.Value interface strongly typed, we also move
options which were implementation-specific (like BacklogOnly)
to be part of that interface, instead of the previously type-asserted
specific implementations. Use of options that are not handled by a
particular implementation is a runtime error. Expressing this in the
type system is probably not worth the effort.

We also implement Filter to allow offloading some of the functionality previously implemented in type assertion wrappers into the library itself.

In the end, this ends up removing a bunch of type assertion code, at
the cost of a fairly sweeping change. Unfortunately, some of this is due
to IntelliJ suddenly deciding to reformat comments.

Change-Id: I1ca6d93db1b5c4055a21af3fb9e5e3d425c0d86e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1322
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/pkg/event/etcd/etcd.go b/metropolis/pkg/event/etcd/etcd.go
index d49a592..14dfd99 100644
--- a/metropolis/pkg/event/etcd/etcd.go
+++ b/metropolis/pkg/event/etcd/etcd.go
@@ -19,17 +19,17 @@
 	// artificially, as there currently is no code path that needs this to be
 	// strictly true.  However, users of this library might want to rely on the
 	// Value type instead of particular Value implementations.
-	_ event.ValueWatch = &Value{}
+	_ event.ValueWatch[StringAt] = &Value[StringAt]{}
 )
 
 // Value is an 'Event Value' backed in an etcd cluster, accessed over an
 // etcd client. This is a stateless handle and can be copied and shared across
 // goroutines.
-type Value struct {
+type Value[T any] struct {
+	decoder func(key, value []byte) (T, error)
 	etcd    client.Namespaced
 	key     string
 	keyEnd  string
-	decoder BytesDecoder
 }
 
 type Option struct {
@@ -67,12 +67,12 @@
 // NewValue creates a new Value for a given key(s) in an etcd client. The
 // given decoder will be used to convert bytes retrieved from etcd into the
 // interface{} value retrieved by Get by this value's watcher.
-func NewValue(etcd client.Namespaced, key string, decoder BytesDecoder, options ...*Option) *Value {
-	res := &Value{
+func NewValue[T any](etcd client.Namespaced, key string, decoder func(key, value []byte) (T, error), options ...*Option) *Value[T] {
+	res := &Value[T]{
+		decoder: decoder,
 		etcd:    etcd,
 		key:     key,
 		keyEnd:  key,
-		decoder: decoder,
 	}
 
 	for _, opt := range options {
@@ -84,26 +84,25 @@
 	return res
 }
 
-// BytesDecoder is a function that converts bytes retrieved from etcd into an
-// end-user facing value. Additionally, a key is available so that returned
-// values can be augmented with the location they were retrieved from. This is
-// especially useful when returning values resulting from an etcd range.
-//
-// If an error is returned, the Get call performed on a watcher configured with
-// this decoder will fail, swallowing that particular update, but the watcher
-// will continue to work. Any provided BytesDecoder implementations must be safe
-// to copy.
-type BytesDecoder = func(key []byte, data []byte) (interface{}, error)
-
-// NoDecoder is a no-op decoder which passes through the retrieved bytes as a
-// []byte type to the user.
-func NoDecoder(key []byte, data []byte) (interface{}, error) {
-	return data, nil
+func DecoderNoop(_, value []byte) ([]byte, error) {
+	return value, nil
 }
 
-func (e *Value) Watch() event.Watcher {
+func DecoderStringAt(key, value []byte) (StringAt, error) {
+	return StringAt{
+		Key:   string(key),
+		Value: string(value),
+	}, nil
+}
+
+type StringAt struct {
+	Key   string
+	Value string
+}
+
+func (e *Value[T]) Watch() event.Watcher[T] {
 	ctx, ctxC := context.WithCancel(context.Background())
-	return &watcher{
+	return &watcher[T]{
 		Value: *e,
 
 		ctx:  ctx,
@@ -115,9 +114,9 @@
 	}
 }
 
-type watcher struct {
+type watcher[T any] struct {
 	// Value copy, used to configure the behaviour of this watcher.
-	Value
+	Value[T]
 
 	// ctx is the context that expresses the liveness of this watcher. It is
 	// canceled when the watcher is closed, and the etcd Watch hangs off of it.
@@ -163,7 +162,7 @@
 
 // setup initiates wc (the watch channel from etcd) after retrieving the initial
 // value(s) with a get operation.
-func (w *watcher) setup(ctx context.Context) error {
+func (w *watcher[T]) setup(ctx context.Context) error {
 	if w.wc != nil {
 		return nil
 	}
@@ -231,7 +230,7 @@
 
 // backfill blocks until a backlog of items is available. An error is returned
 // if the context is canceled.
-func (w *watcher) backfill(ctx context.Context) error {
+func (w *watcher[T]) backfill(ctx context.Context) error {
 	// Keep watching for watch events.
 	for {
 		var resp *clientv3.WatchResponse
@@ -327,44 +326,16 @@
 	backlogOnly bool
 }
 
-var (
-	// BacklogOnly will prevent Get from blocking on waiting for more updates from
-	// etcd, by instead returning BacklogDone whenever no more data is currently
-	// locally available. This is different however, from establishing that there
-	// are no more pending updates from the etcd cluster - the only way to ensure
-	// the local client is up to date is by performing Get calls without this option
-	// set.
-	//
-	// This mode of retrieval should only be used for the retrieval of the existing
-	// data in the etcd cluster on the initial creation of the Watcher (by
-	// repeatedly calling Get until BacklogDone isreturned), and shouldn't be set
-	// for any subsequent call. Any use of this option after that initial fetch is
-	// undefined behaviour that exposes the internals of the Get implementation, and
-	// must not be relied on. However, in the future, this behaviour might be
-	// formalized.
-	//
-	// This mode is particularly useful for ranged watchers. Non-ranged watchers can
-	// still use this option to distinguish between blocking because of the
-	// nonexistence of an object vs. blocking because of networking issues. However,
-	// non-ranged retrieval semantics generally will rarely need to make this
-	// distinction.
-	BacklogOnly = GetOption{backlogOnly: true}
-
-	// BacklogDone is returned by Get when BacklogOnly is set and there is no more
-	// event data stored in the Watcher client, ie. when the initial cluster state
-	// of the requested key has been retrieved.
-	BacklogDone = errors.New("no more backlogged data")
-)
-
 // Get implements the Get method of the Watcher interface.
 // It can return an error in three cases:
-//  - the given context is canceled (in which case, the given error will wrap
-//    the context error)
-//  - the watcher's BytesDecoder returned an error (in which case the error
-//    returned by the BytesDecoder will be returned verbatim)
-//  - it has been called with BacklogOnly and the Watcher has no more local
-//    event data to return (see BacklogOnly for more information on the
-//    semantics of this mode of operation)
+//   - the given context is canceled (in which case, the given error will wrap
+//     the context error)
+//   - the watcher's BytesDecoder returned an error (in which case the error
+//     returned by the BytesDecoder will be returned verbatim)
+//   - it has been called with BacklogOnly and the Watcher has no more local
+//     event data to return (see BacklogOnly for more information on the
+//     semantics of this mode of operation)
+//
 // Note that transient and permanent etcd errors are never returned, and the
 // Get call will attempt to recover from these errors as much as possible. This
 // also means that the user of the Watcher will not be notified if the
@@ -377,51 +348,72 @@
 // TODO(q3k): implement internal, limited buffering for backlogged data not yet
 // consumed by client, as etcd client library seems to use an unbound buffer in
 // case this happens ( see: watcherStream.buf in clientv3).
-func (w *watcher) Get(ctx context.Context, opts ...event.GetOption) (interface{}, error) {
+func (w *watcher[T]) Get(ctx context.Context, opts ...event.GetOption[T]) (T, error) {
+	var empty T
 	select {
 	case w.getSem <- struct{}{}:
 	default:
-		return nil, fmt.Errorf("cannot Get() concurrently on a single waiter")
+		return empty, fmt.Errorf("cannot Get() concurrently on a single waiter")
 	}
 	defer func() {
 		<-w.getSem
 	}()
 
 	backlogOnly := false
-	for _, optI := range opts {
-		opt, ok := optI.(GetOption)
-		if !ok {
-			return nil, fmt.Errorf("get options must be of type etcd.GetOption")
+	var predicate func(t T) bool
+	for _, opt := range opts {
+		if opt.Predicate != nil {
+			predicate = opt.Predicate
 		}
-		if opt.backlogOnly {
+		if opt.BacklogOnly {
 			backlogOnly = true
 		}
 	}
 
+	ranged := w.key != w.keyEnd
+	if ranged && predicate != nil {
+		return empty, errors.New("filtering unimplemented for ranged etcd values")
+	}
+	if backlogOnly && predicate != nil {
+		return empty, errors.New("filtering unimplemented for backlog-only requests")
+	}
+
+	for {
+		v, err := w.getUnlocked(ctx, ranged, backlogOnly)
+		if err != nil {
+			return empty, err
+		}
+		if predicate == nil || predicate(v) {
+			return v, nil
+		}
+	}
+}
+
+func (w *watcher[T]) getUnlocked(ctx context.Context, ranged, backlogOnly bool) (T, error) {
+	var empty T
 	// Early check for context cancelations, preventing spurious contact with etcd
 	// if there's no need to.
 	if w.ctx.Err() != nil {
-		return nil, w.ctx.Err()
+		return empty, w.ctx.Err()
 	}
 
 	if err := w.setup(ctx); err != nil {
-		return nil, fmt.Errorf("when setting up watcher: %w", err)
+		return empty, fmt.Errorf("when setting up watcher: %w", err)
 	}
 
 	if backlogOnly && len(w.backlogged) == 0 {
-		return nil, BacklogDone
+		return empty, event.BacklogDone
 	}
 
 	// Update backlog from etcd if needed.
 	if len(w.backlogged) < 1 {
 		err := w.backfill(ctx)
 		if err != nil {
-			return nil, fmt.Errorf("when watching for new value: %w", err)
+			return empty, fmt.Errorf("when watching for new value: %w", err)
 		}
 	}
 	// Backlog is now guaranteed to contain at least one element.
 
-	ranged := w.key != w.keyEnd
 	if !ranged {
 		// For non-ranged queries, drain backlog fully.
 		if len(w.backlogged) != 1 {
@@ -440,7 +432,7 @@
 	}
 }
 
-func (w *watcher) Close() error {
+func (w *watcher[T]) Close() error {
 	w.ctxC()
 	return nil
 }
diff --git a/metropolis/pkg/event/etcd/etcd_test.go b/metropolis/pkg/event/etcd/etcd_test.go
index 13f6ea8..81aee51 100644
--- a/metropolis/pkg/event/etcd/etcd_test.go
+++ b/metropolis/pkg/event/etcd/etcd_test.go
@@ -52,18 +52,18 @@
 // WG after it performs the initial retrieval of a value from etcd, but before
 // it starts the watcher. This is used to test potential race conditions
 // present between these two steps.
-func setRaceWg(w event.Watcher) *sync.WaitGroup {
+func setRaceWg[T any](w event.Watcher[T]) *sync.WaitGroup {
 	wg := sync.WaitGroup{}
-	w.(*watcher).testRaceWG = &wg
+	w.(*watcher[T]).testRaceWG = &wg
 	return &wg
 }
 
 // setSetupWg creates a new WaitGroup and sets the given watcher to wait on
 // thie WG after an etcd watch channel is created. This is used in tests to
 // ensure that the watcher is fully created before it is tested.
-func setSetupWg(w event.Watcher) *sync.WaitGroup {
+func setSetupWg[T any](w event.Watcher[T]) *sync.WaitGroup {
 	wg := sync.WaitGroup{}
-	w.(*watcher).testSetupWG = &wg
+	w.(*watcher[T]).testSetupWG = &wg
 	return &wg
 }
 
@@ -152,7 +152,7 @@
 
 // expect runs a Get on the given Watcher, ensuring the returned value is a
 // given string.
-func expect(t *testing.T, w event.Watcher, value string) {
+func expect(t *testing.T, w event.Watcher[StringAt], value string) {
 	t.Helper()
 	ctx, ctxC := context.WithCancel(context.Background())
 	defer ctxC()
@@ -162,7 +162,7 @@
 		t.Fatalf("Get: %v", err)
 	}
 
-	if got, want := string(got.([]byte)), value; got != want {
+	if got, want := got.Value, value; got != want {
 		t.Errorf("Wanted value %q, got %q", want, got)
 	}
 }
@@ -173,7 +173,7 @@
 // blocks for 101 milliseconds). Thus, this function should be used sparingly
 // and in tests that perform other baseline behaviour checks alongside this
 // test.
-func expectTimeout(t *testing.T, w event.Watcher) {
+func expectTimeout[T any](t *testing.T, w event.Watcher[T]) {
 	t.Helper()
 	ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
 	got, err := w.Get(ctx)
@@ -186,7 +186,7 @@
 
 // wait wraps a watcher into a channel of strings, ensuring that the watcher
 // never errors on Get calls and always returns strings.
-func wait(t *testing.T, w event.Watcher) (chan string, func()) {
+func wait(t *testing.T, w event.Watcher[StringAt]) (chan string, func()) {
 	t.Helper()
 	ctx, ctxC := context.WithCancel(context.Background())
 
@@ -201,7 +201,7 @@
 			if err != nil {
 				t.Fatalf("Get: %v", err)
 			}
-			c <- string(got.([]byte))
+			c <- got.Value
 		}
 	}()
 
@@ -214,7 +214,7 @@
 	defer tc.close()
 
 	k := "test-simple"
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	tc.put(t, k, "one")
 
 	watcher := value.Watch()
@@ -231,42 +231,27 @@
 
 	q, cancel := wait(t, watcher)
 	// Test will hang here if the above value does not receive the set "six".
+	log.Printf("a")
 	for el := range q {
+		log.Printf("%q", el)
 		if el == "six" {
 			break
 		}
 	}
+	log.Printf("b")
 	cancel()
 }
 
-// stringAt is a helper type for testing ranged watchers. It's returned by a
-// watcher whose decoder is set to stringDecoder.
-type stringAt struct {
-	key, value string
-}
-
-func stringAtDecoder(key, value []byte) (interface{}, error) {
-	valueS := ""
-	if value != nil {
-		valueS = string(value)
-	}
-	return stringAt{
-		key:   string(key),
-		value: valueS,
-	}, nil
-}
-
 // stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
 // the given map with the retrieved value.
-func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher, m map[string]string) {
+func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher[StringAt], m map[string]string) {
 	t.Helper()
 
 	vr, err := w.Get(ctx)
 	if err != nil {
 		t.Fatalf("Get: %v", err)
 	}
-	v := vr.(stringAt)
-	m[v.key] = v.value
+	m[vr.Key] = vr.Value
 }
 
 // TestSimpleRange exercises the simplest behaviour of a ranged watcher,
@@ -277,7 +262,7 @@
 
 	ks := "test-simple-range/"
 	ke := "test-simple-range0"
-	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
 	tc.put(t, ks+"a", "one")
 	tc.put(t, ks+"b", "two")
 	tc.put(t, ks+"c", "three")
@@ -320,7 +305,7 @@
 	defer tc.close()
 
 	k := "test-cancel"
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	tc.put(t, k, "one")
 
 	watcher := value.Watch()
@@ -355,7 +340,7 @@
 	defer tc.close()
 
 	k := "test-cancel-on-get"
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	tc.put(t, k, "one")
 
@@ -419,7 +404,7 @@
 	tc.setEndpoints(0)
 
 	k := "test-client-reconnect"
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	tc.put(t, k, "one")
 
 	watcher := value.Watch()
@@ -455,10 +440,10 @@
 	tcRest.setEndpoints(1, 2)
 
 	k := "test-client-partition"
-	valueOne := NewValue(tcOne.namespaced, k, NoDecoder)
+	valueOne := NewValue(tcOne.namespaced, k, DecoderStringAt)
 	watcherOne := valueOne.Watch()
 	defer watcherOne.Close()
-	valueRest := NewValue(tcRest.namespaced, k, NoDecoder)
+	valueRest := NewValue(tcRest.namespaced, k, DecoderStringAt)
 	watcherRest := valueRest.Watch()
 	defer watcherRest.Close()
 
@@ -489,7 +474,7 @@
 
 	k := "test-early-use"
 
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	defer watcher.Close()
 
@@ -515,7 +500,7 @@
 	k := "test-remove"
 	tc.put(t, k, "one")
 
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	defer watcher.Close()
 
@@ -532,7 +517,7 @@
 
 	ks := "test-remove-range/"
 	ke := "test-remove-range0"
-	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
 	tc.put(t, ks+"a", "one")
 	tc.put(t, ks+"b", "two")
 	tc.put(t, ks+"c", "three")
@@ -573,7 +558,7 @@
 	tc.put(t, k, "one")
 	tc.remove(t, k)
 
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	defer watcher.Close()
 
@@ -602,13 +587,13 @@
 // on Get, but that the watcher continues to work after the error has been
 // returned.
 func TestDecoder(t *testing.T) {
-	decodeStringifiedNumbersDivisibleBy3 := func(_, data []byte) (interface{}, error) {
-		num, err := strconv.ParseInt(string(data), 10, 64)
+	decoderDivisibleByThree := func(_, value []byte) (int64, error) {
+		num, err := strconv.ParseInt(string(value), 10, 64)
 		if err != nil {
-			return nil, fmt.Errorf("not a valid number")
+			return 0, fmt.Errorf("not a valid number")
 		}
 		if (num % 3) != 0 {
-			return nil, fmt.Errorf("not divisible by 3")
+			return 0, fmt.Errorf("not divisible by 3")
 		}
 		return num, nil
 	}
@@ -620,7 +605,7 @@
 	defer ctxC()
 
 	k := "test-decoder"
-	value := NewValue(tc.namespaced, k, decodeStringifiedNumbersDivisibleBy3)
+	value := NewValue(tc.namespaced, k, decoderDivisibleByThree)
 	watcher := value.Watch()
 	defer watcher.Close()
 	tc.put(t, k, "3")
@@ -643,7 +628,7 @@
 				}
 			} else {
 				queue <- errOrInt{
-					val: res.(int64),
+					val: res,
 				}
 			}
 		}
@@ -686,7 +671,7 @@
 	defer tc.close()
 
 	k := "test-backlog"
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	defer watcher.Close()
 
@@ -704,8 +689,7 @@
 		if err != nil {
 			t.Fatalf("Get() returned error before expected final value: %v", err)
 		}
-		val := string(valB.([]byte))
-		if val == "val-999" {
+		if valB.Value == "val-999" {
 			break
 		}
 	}
@@ -719,7 +703,7 @@
 
 	ks := "test-backlog-range/"
 	ke := "test-backlog-range0"
-	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
 	w := value.Watch()
 	defer w.Close()
 
@@ -764,30 +748,30 @@
 	k := "test-backlog-only"
 	tc.put(t, k, "initial")
 
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	defer watcher.Close()
 
-	d, err := watcher.Get(ctx, BacklogOnly)
+	d, err := watcher.Get(ctx, event.BacklogOnly[StringAt]())
 	if err != nil {
 		t.Fatalf("First Get failed: %v", err)
 	}
-	if want, got := "initial", string(d.([]byte)); want != got {
+	if want, got := "initial", d.Value; want != got {
 		t.Fatalf("First Get: wanted value %q, got %q", want, got)
 	}
 
 	// As expected, next call to Get with BacklogOnly fails - there truly is no new
 	// updates to emit.
-	_, err = watcher.Get(ctx, BacklogOnly)
-	if want, got := BacklogDone, err; want != got {
+	_, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
+	if want, got := event.BacklogDone, err; want != got {
 		t.Fatalf("Second Get: wanted %v, got %v", want, got)
 	}
 
 	// Implementation detail: even though there is a new value ('second'),
 	// BacklogOnly will still return BacklogDone.
 	tc.put(t, k, "second")
-	_, err = watcher.Get(ctx, BacklogOnly)
-	if want, got := BacklogDone, err; want != got {
+	_, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
+	if want, got := event.BacklogDone, err; want != got {
 		t.Fatalf("Third Get: wanted %v, got %v", want, got)
 	}
 
@@ -796,7 +780,7 @@
 	if err != nil {
 		t.Fatalf("Fourth Get failed: %v", err)
 	}
-	if want, got := "second", string(d.([]byte)); want != got {
+	if want, got := "second", d.Value; want != got {
 		t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
 	}
 }
@@ -821,7 +805,7 @@
 		}
 	}
 
-	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
 	w := value.Watch()
 	defer w.Close()
 
@@ -829,12 +813,11 @@
 	res := make(map[string]string)
 
 	// Run first Get - this is the barrier defining what's part of the backlog.
-	g, err := w.Get(ctx, BacklogOnly)
+	g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
 	if err != nil {
 		t.Fatalf("Get: %v", err)
 	}
-	kv := g.(stringAt)
-	res[kv.key] = kv.value
+	res[g.Key] = g.Value
 
 	// These won't be part of the backlog.
 	tc.put(t, ks+"a", fmt.Sprintf("val-100"))
@@ -843,16 +826,15 @@
 	// Retrieve the rest of the backlog until BacklogDone is returned.
 	nUpdates := 1
 	for {
-		g, err := w.Get(ctx, BacklogOnly)
-		if err == BacklogDone {
+		g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
+		if err == event.BacklogDone {
 			break
 		}
 		if err != nil {
 			t.Fatalf("Get: %v", err)
 		}
 		nUpdates += 1
-		kv := g.(stringAt)
-		res[kv.key] = kv.value
+		res[g.Key] = g.Value
 	}
 
 	// The backlog should've been compacted to just two entries at their newest