m/pkg/event: make type-safe

This is a fairly large change which makes use of Go type parameters
(“generics”) to make the event library (and its memory/etcd
implementations) type safe.

Since we now have the event.Value interface strongly typed, we also move
options which were implementation-specific (like BacklogOnly)
to be part of that interface, instead of the previously type-asserted
specific implementations. Use of options that are not handled by a
particular implementation is a runtime error. Expressing this in the
type system is probably not worth the effort.

We also implement Filter to allow offloading some of the functionality previously implemented in type assertion wrappers into the library itself.

In the end, this ends up removing a bunch of type assertion code, at
the cost of a fairly sweeping change. Unfortunately, some of this is due
to IntelliJ suddenly deciding to reformat comments.

Change-Id: I1ca6d93db1b5c4055a21af3fb9e5e3d425c0d86e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1322
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/pkg/event/etcd/etcd.go b/metropolis/pkg/event/etcd/etcd.go
index d49a592..14dfd99 100644
--- a/metropolis/pkg/event/etcd/etcd.go
+++ b/metropolis/pkg/event/etcd/etcd.go
@@ -19,17 +19,17 @@
 	// artificially, as there currently is no code path that needs this to be
 	// strictly true.  However, users of this library might want to rely on the
 	// Value type instead of particular Value implementations.
-	_ event.ValueWatch = &Value{}
+	_ event.ValueWatch[StringAt] = &Value[StringAt]{}
 )
 
 // Value is an 'Event Value' backed in an etcd cluster, accessed over an
 // etcd client. This is a stateless handle and can be copied and shared across
 // goroutines.
-type Value struct {
+type Value[T any] struct {
+	decoder func(key, value []byte) (T, error)
 	etcd    client.Namespaced
 	key     string
 	keyEnd  string
-	decoder BytesDecoder
 }
 
 type Option struct {
@@ -67,12 +67,12 @@
 // NewValue creates a new Value for a given key(s) in an etcd client. The
 // given decoder will be used to convert bytes retrieved from etcd into the
 // interface{} value retrieved by Get by this value's watcher.
-func NewValue(etcd client.Namespaced, key string, decoder BytesDecoder, options ...*Option) *Value {
-	res := &Value{
+func NewValue[T any](etcd client.Namespaced, key string, decoder func(key, value []byte) (T, error), options ...*Option) *Value[T] {
+	res := &Value[T]{
+		decoder: decoder,
 		etcd:    etcd,
 		key:     key,
 		keyEnd:  key,
-		decoder: decoder,
 	}
 
 	for _, opt := range options {
@@ -84,26 +84,25 @@
 	return res
 }
 
-// BytesDecoder is a function that converts bytes retrieved from etcd into an
-// end-user facing value. Additionally, a key is available so that returned
-// values can be augmented with the location they were retrieved from. This is
-// especially useful when returning values resulting from an etcd range.
-//
-// If an error is returned, the Get call performed on a watcher configured with
-// this decoder will fail, swallowing that particular update, but the watcher
-// will continue to work. Any provided BytesDecoder implementations must be safe
-// to copy.
-type BytesDecoder = func(key []byte, data []byte) (interface{}, error)
-
-// NoDecoder is a no-op decoder which passes through the retrieved bytes as a
-// []byte type to the user.
-func NoDecoder(key []byte, data []byte) (interface{}, error) {
-	return data, nil
+func DecoderNoop(_, value []byte) ([]byte, error) {
+	return value, nil
 }
 
-func (e *Value) Watch() event.Watcher {
+func DecoderStringAt(key, value []byte) (StringAt, error) {
+	return StringAt{
+		Key:   string(key),
+		Value: string(value),
+	}, nil
+}
+
+type StringAt struct {
+	Key   string
+	Value string
+}
+
+func (e *Value[T]) Watch() event.Watcher[T] {
 	ctx, ctxC := context.WithCancel(context.Background())
-	return &watcher{
+	return &watcher[T]{
 		Value: *e,
 
 		ctx:  ctx,
@@ -115,9 +114,9 @@
 	}
 }
 
-type watcher struct {
+type watcher[T any] struct {
 	// Value copy, used to configure the behaviour of this watcher.
-	Value
+	Value[T]
 
 	// ctx is the context that expresses the liveness of this watcher. It is
 	// canceled when the watcher is closed, and the etcd Watch hangs off of it.
@@ -163,7 +162,7 @@
 
 // setup initiates wc (the watch channel from etcd) after retrieving the initial
 // value(s) with a get operation.
-func (w *watcher) setup(ctx context.Context) error {
+func (w *watcher[T]) setup(ctx context.Context) error {
 	if w.wc != nil {
 		return nil
 	}
@@ -231,7 +230,7 @@
 
 // backfill blocks until a backlog of items is available. An error is returned
 // if the context is canceled.
-func (w *watcher) backfill(ctx context.Context) error {
+func (w *watcher[T]) backfill(ctx context.Context) error {
 	// Keep watching for watch events.
 	for {
 		var resp *clientv3.WatchResponse
@@ -327,44 +326,16 @@
 	backlogOnly bool
 }
 
-var (
-	// BacklogOnly will prevent Get from blocking on waiting for more updates from
-	// etcd, by instead returning BacklogDone whenever no more data is currently
-	// locally available. This is different however, from establishing that there
-	// are no more pending updates from the etcd cluster - the only way to ensure
-	// the local client is up to date is by performing Get calls without this option
-	// set.
-	//
-	// This mode of retrieval should only be used for the retrieval of the existing
-	// data in the etcd cluster on the initial creation of the Watcher (by
-	// repeatedly calling Get until BacklogDone isreturned), and shouldn't be set
-	// for any subsequent call. Any use of this option after that initial fetch is
-	// undefined behaviour that exposes the internals of the Get implementation, and
-	// must not be relied on. However, in the future, this behaviour might be
-	// formalized.
-	//
-	// This mode is particularly useful for ranged watchers. Non-ranged watchers can
-	// still use this option to distinguish between blocking because of the
-	// nonexistence of an object vs. blocking because of networking issues. However,
-	// non-ranged retrieval semantics generally will rarely need to make this
-	// distinction.
-	BacklogOnly = GetOption{backlogOnly: true}
-
-	// BacklogDone is returned by Get when BacklogOnly is set and there is no more
-	// event data stored in the Watcher client, ie. when the initial cluster state
-	// of the requested key has been retrieved.
-	BacklogDone = errors.New("no more backlogged data")
-)
-
 // Get implements the Get method of the Watcher interface.
 // It can return an error in three cases:
-//  - the given context is canceled (in which case, the given error will wrap
-//    the context error)
-//  - the watcher's BytesDecoder returned an error (in which case the error
-//    returned by the BytesDecoder will be returned verbatim)
-//  - it has been called with BacklogOnly and the Watcher has no more local
-//    event data to return (see BacklogOnly for more information on the
-//    semantics of this mode of operation)
+//   - the given context is canceled (in which case, the given error will wrap
+//     the context error)
+//   - the watcher's BytesDecoder returned an error (in which case the error
+//     returned by the BytesDecoder will be returned verbatim)
+//   - it has been called with BacklogOnly and the Watcher has no more local
+//     event data to return (see BacklogOnly for more information on the
+//     semantics of this mode of operation)
+//
 // Note that transient and permanent etcd errors are never returned, and the
 // Get call will attempt to recover from these errors as much as possible. This
 // also means that the user of the Watcher will not be notified if the
@@ -377,51 +348,72 @@
 // TODO(q3k): implement internal, limited buffering for backlogged data not yet
 // consumed by client, as etcd client library seems to use an unbound buffer in
 // case this happens ( see: watcherStream.buf in clientv3).
-func (w *watcher) Get(ctx context.Context, opts ...event.GetOption) (interface{}, error) {
+func (w *watcher[T]) Get(ctx context.Context, opts ...event.GetOption[T]) (T, error) {
+	var empty T
 	select {
 	case w.getSem <- struct{}{}:
 	default:
-		return nil, fmt.Errorf("cannot Get() concurrently on a single waiter")
+		return empty, fmt.Errorf("cannot Get() concurrently on a single waiter")
 	}
 	defer func() {
 		<-w.getSem
 	}()
 
 	backlogOnly := false
-	for _, optI := range opts {
-		opt, ok := optI.(GetOption)
-		if !ok {
-			return nil, fmt.Errorf("get options must be of type etcd.GetOption")
+	var predicate func(t T) bool
+	for _, opt := range opts {
+		if opt.Predicate != nil {
+			predicate = opt.Predicate
 		}
-		if opt.backlogOnly {
+		if opt.BacklogOnly {
 			backlogOnly = true
 		}
 	}
 
+	ranged := w.key != w.keyEnd
+	if ranged && predicate != nil {
+		return empty, errors.New("filtering unimplemented for ranged etcd values")
+	}
+	if backlogOnly && predicate != nil {
+		return empty, errors.New("filtering unimplemented for backlog-only requests")
+	}
+
+	for {
+		v, err := w.getUnlocked(ctx, ranged, backlogOnly)
+		if err != nil {
+			return empty, err
+		}
+		if predicate == nil || predicate(v) {
+			return v, nil
+		}
+	}
+}
+
+func (w *watcher[T]) getUnlocked(ctx context.Context, ranged, backlogOnly bool) (T, error) {
+	var empty T
 	// Early check for context cancelations, preventing spurious contact with etcd
 	// if there's no need to.
 	if w.ctx.Err() != nil {
-		return nil, w.ctx.Err()
+		return empty, w.ctx.Err()
 	}
 
 	if err := w.setup(ctx); err != nil {
-		return nil, fmt.Errorf("when setting up watcher: %w", err)
+		return empty, fmt.Errorf("when setting up watcher: %w", err)
 	}
 
 	if backlogOnly && len(w.backlogged) == 0 {
-		return nil, BacklogDone
+		return empty, event.BacklogDone
 	}
 
 	// Update backlog from etcd if needed.
 	if len(w.backlogged) < 1 {
 		err := w.backfill(ctx)
 		if err != nil {
-			return nil, fmt.Errorf("when watching for new value: %w", err)
+			return empty, fmt.Errorf("when watching for new value: %w", err)
 		}
 	}
 	// Backlog is now guaranteed to contain at least one element.
 
-	ranged := w.key != w.keyEnd
 	if !ranged {
 		// For non-ranged queries, drain backlog fully.
 		if len(w.backlogged) != 1 {
@@ -440,7 +432,7 @@
 	}
 }
 
-func (w *watcher) Close() error {
+func (w *watcher[T]) Close() error {
 	w.ctxC()
 	return nil
 }
diff --git a/metropolis/pkg/event/etcd/etcd_test.go b/metropolis/pkg/event/etcd/etcd_test.go
index 13f6ea8..81aee51 100644
--- a/metropolis/pkg/event/etcd/etcd_test.go
+++ b/metropolis/pkg/event/etcd/etcd_test.go
@@ -52,18 +52,18 @@
 // WG after it performs the initial retrieval of a value from etcd, but before
 // it starts the watcher. This is used to test potential race conditions
 // present between these two steps.
-func setRaceWg(w event.Watcher) *sync.WaitGroup {
+func setRaceWg[T any](w event.Watcher[T]) *sync.WaitGroup {
 	wg := sync.WaitGroup{}
-	w.(*watcher).testRaceWG = &wg
+	w.(*watcher[T]).testRaceWG = &wg
 	return &wg
 }
 
 // setSetupWg creates a new WaitGroup and sets the given watcher to wait on
 // thie WG after an etcd watch channel is created. This is used in tests to
 // ensure that the watcher is fully created before it is tested.
-func setSetupWg(w event.Watcher) *sync.WaitGroup {
+func setSetupWg[T any](w event.Watcher[T]) *sync.WaitGroup {
 	wg := sync.WaitGroup{}
-	w.(*watcher).testSetupWG = &wg
+	w.(*watcher[T]).testSetupWG = &wg
 	return &wg
 }
 
@@ -152,7 +152,7 @@
 
 // expect runs a Get on the given Watcher, ensuring the returned value is a
 // given string.
-func expect(t *testing.T, w event.Watcher, value string) {
+func expect(t *testing.T, w event.Watcher[StringAt], value string) {
 	t.Helper()
 	ctx, ctxC := context.WithCancel(context.Background())
 	defer ctxC()
@@ -162,7 +162,7 @@
 		t.Fatalf("Get: %v", err)
 	}
 
-	if got, want := string(got.([]byte)), value; got != want {
+	if got, want := got.Value, value; got != want {
 		t.Errorf("Wanted value %q, got %q", want, got)
 	}
 }
@@ -173,7 +173,7 @@
 // blocks for 101 milliseconds). Thus, this function should be used sparingly
 // and in tests that perform other baseline behaviour checks alongside this
 // test.
-func expectTimeout(t *testing.T, w event.Watcher) {
+func expectTimeout[T any](t *testing.T, w event.Watcher[T]) {
 	t.Helper()
 	ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
 	got, err := w.Get(ctx)
@@ -186,7 +186,7 @@
 
 // wait wraps a watcher into a channel of strings, ensuring that the watcher
 // never errors on Get calls and always returns strings.
-func wait(t *testing.T, w event.Watcher) (chan string, func()) {
+func wait(t *testing.T, w event.Watcher[StringAt]) (chan string, func()) {
 	t.Helper()
 	ctx, ctxC := context.WithCancel(context.Background())
 
@@ -201,7 +201,7 @@
 			if err != nil {
 				t.Fatalf("Get: %v", err)
 			}
-			c <- string(got.([]byte))
+			c <- got.Value
 		}
 	}()
 
@@ -214,7 +214,7 @@
 	defer tc.close()
 
 	k := "test-simple"
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	tc.put(t, k, "one")
 
 	watcher := value.Watch()
@@ -231,42 +231,27 @@
 
 	q, cancel := wait(t, watcher)
 	// Test will hang here if the above value does not receive the set "six".
+	log.Printf("a")
 	for el := range q {
+		log.Printf("%q", el)
 		if el == "six" {
 			break
 		}
 	}
+	log.Printf("b")
 	cancel()
 }
 
-// stringAt is a helper type for testing ranged watchers. It's returned by a
-// watcher whose decoder is set to stringDecoder.
-type stringAt struct {
-	key, value string
-}
-
-func stringAtDecoder(key, value []byte) (interface{}, error) {
-	valueS := ""
-	if value != nil {
-		valueS = string(value)
-	}
-	return stringAt{
-		key:   string(key),
-		value: valueS,
-	}, nil
-}
-
 // stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
 // the given map with the retrieved value.
-func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher, m map[string]string) {
+func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher[StringAt], m map[string]string) {
 	t.Helper()
 
 	vr, err := w.Get(ctx)
 	if err != nil {
 		t.Fatalf("Get: %v", err)
 	}
-	v := vr.(stringAt)
-	m[v.key] = v.value
+	m[vr.Key] = vr.Value
 }
 
 // TestSimpleRange exercises the simplest behaviour of a ranged watcher,
@@ -277,7 +262,7 @@
 
 	ks := "test-simple-range/"
 	ke := "test-simple-range0"
-	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
 	tc.put(t, ks+"a", "one")
 	tc.put(t, ks+"b", "two")
 	tc.put(t, ks+"c", "three")
@@ -320,7 +305,7 @@
 	defer tc.close()
 
 	k := "test-cancel"
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	tc.put(t, k, "one")
 
 	watcher := value.Watch()
@@ -355,7 +340,7 @@
 	defer tc.close()
 
 	k := "test-cancel-on-get"
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	tc.put(t, k, "one")
 
@@ -419,7 +404,7 @@
 	tc.setEndpoints(0)
 
 	k := "test-client-reconnect"
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	tc.put(t, k, "one")
 
 	watcher := value.Watch()
@@ -455,10 +440,10 @@
 	tcRest.setEndpoints(1, 2)
 
 	k := "test-client-partition"
-	valueOne := NewValue(tcOne.namespaced, k, NoDecoder)
+	valueOne := NewValue(tcOne.namespaced, k, DecoderStringAt)
 	watcherOne := valueOne.Watch()
 	defer watcherOne.Close()
-	valueRest := NewValue(tcRest.namespaced, k, NoDecoder)
+	valueRest := NewValue(tcRest.namespaced, k, DecoderStringAt)
 	watcherRest := valueRest.Watch()
 	defer watcherRest.Close()
 
@@ -489,7 +474,7 @@
 
 	k := "test-early-use"
 
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	defer watcher.Close()
 
@@ -515,7 +500,7 @@
 	k := "test-remove"
 	tc.put(t, k, "one")
 
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	defer watcher.Close()
 
@@ -532,7 +517,7 @@
 
 	ks := "test-remove-range/"
 	ke := "test-remove-range0"
-	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
 	tc.put(t, ks+"a", "one")
 	tc.put(t, ks+"b", "two")
 	tc.put(t, ks+"c", "three")
@@ -573,7 +558,7 @@
 	tc.put(t, k, "one")
 	tc.remove(t, k)
 
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	defer watcher.Close()
 
@@ -602,13 +587,13 @@
 // on Get, but that the watcher continues to work after the error has been
 // returned.
 func TestDecoder(t *testing.T) {
-	decodeStringifiedNumbersDivisibleBy3 := func(_, data []byte) (interface{}, error) {
-		num, err := strconv.ParseInt(string(data), 10, 64)
+	decoderDivisibleByThree := func(_, value []byte) (int64, error) {
+		num, err := strconv.ParseInt(string(value), 10, 64)
 		if err != nil {
-			return nil, fmt.Errorf("not a valid number")
+			return 0, fmt.Errorf("not a valid number")
 		}
 		if (num % 3) != 0 {
-			return nil, fmt.Errorf("not divisible by 3")
+			return 0, fmt.Errorf("not divisible by 3")
 		}
 		return num, nil
 	}
@@ -620,7 +605,7 @@
 	defer ctxC()
 
 	k := "test-decoder"
-	value := NewValue(tc.namespaced, k, decodeStringifiedNumbersDivisibleBy3)
+	value := NewValue(tc.namespaced, k, decoderDivisibleByThree)
 	watcher := value.Watch()
 	defer watcher.Close()
 	tc.put(t, k, "3")
@@ -643,7 +628,7 @@
 				}
 			} else {
 				queue <- errOrInt{
-					val: res.(int64),
+					val: res,
 				}
 			}
 		}
@@ -686,7 +671,7 @@
 	defer tc.close()
 
 	k := "test-backlog"
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	defer watcher.Close()
 
@@ -704,8 +689,7 @@
 		if err != nil {
 			t.Fatalf("Get() returned error before expected final value: %v", err)
 		}
-		val := string(valB.([]byte))
-		if val == "val-999" {
+		if valB.Value == "val-999" {
 			break
 		}
 	}
@@ -719,7 +703,7 @@
 
 	ks := "test-backlog-range/"
 	ke := "test-backlog-range0"
-	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
 	w := value.Watch()
 	defer w.Close()
 
@@ -764,30 +748,30 @@
 	k := "test-backlog-only"
 	tc.put(t, k, "initial")
 
-	value := NewValue(tc.namespaced, k, NoDecoder)
+	value := NewValue(tc.namespaced, k, DecoderStringAt)
 	watcher := value.Watch()
 	defer watcher.Close()
 
-	d, err := watcher.Get(ctx, BacklogOnly)
+	d, err := watcher.Get(ctx, event.BacklogOnly[StringAt]())
 	if err != nil {
 		t.Fatalf("First Get failed: %v", err)
 	}
-	if want, got := "initial", string(d.([]byte)); want != got {
+	if want, got := "initial", d.Value; want != got {
 		t.Fatalf("First Get: wanted value %q, got %q", want, got)
 	}
 
 	// As expected, next call to Get with BacklogOnly fails - there truly is no new
 	// updates to emit.
-	_, err = watcher.Get(ctx, BacklogOnly)
-	if want, got := BacklogDone, err; want != got {
+	_, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
+	if want, got := event.BacklogDone, err; want != got {
 		t.Fatalf("Second Get: wanted %v, got %v", want, got)
 	}
 
 	// Implementation detail: even though there is a new value ('second'),
 	// BacklogOnly will still return BacklogDone.
 	tc.put(t, k, "second")
-	_, err = watcher.Get(ctx, BacklogOnly)
-	if want, got := BacklogDone, err; want != got {
+	_, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
+	if want, got := event.BacklogDone, err; want != got {
 		t.Fatalf("Third Get: wanted %v, got %v", want, got)
 	}
 
@@ -796,7 +780,7 @@
 	if err != nil {
 		t.Fatalf("Fourth Get failed: %v", err)
 	}
-	if want, got := "second", string(d.([]byte)); want != got {
+	if want, got := "second", d.Value; want != got {
 		t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
 	}
 }
@@ -821,7 +805,7 @@
 		}
 	}
 
-	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
 	w := value.Watch()
 	defer w.Close()
 
@@ -829,12 +813,11 @@
 	res := make(map[string]string)
 
 	// Run first Get - this is the barrier defining what's part of the backlog.
-	g, err := w.Get(ctx, BacklogOnly)
+	g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
 	if err != nil {
 		t.Fatalf("Get: %v", err)
 	}
-	kv := g.(stringAt)
-	res[kv.key] = kv.value
+	res[g.Key] = g.Value
 
 	// These won't be part of the backlog.
 	tc.put(t, ks+"a", fmt.Sprintf("val-100"))
@@ -843,16 +826,15 @@
 	// Retrieve the rest of the backlog until BacklogDone is returned.
 	nUpdates := 1
 	for {
-		g, err := w.Get(ctx, BacklogOnly)
-		if err == BacklogDone {
+		g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
+		if err == event.BacklogDone {
 			break
 		}
 		if err != nil {
 			t.Fatalf("Get: %v", err)
 		}
 		nUpdates += 1
-		kv := g.(stringAt)
-		res[kv.key] = kv.value
+		res[g.Key] = g.Value
 	}
 
 	// The backlog should've been compacted to just two entries at their newest
diff --git a/metropolis/pkg/event/event.go b/metropolis/pkg/event/event.go
index bc46525..ba9190c 100644
--- a/metropolis/pkg/event/event.go
+++ b/metropolis/pkg/event/event.go
@@ -20,7 +20,7 @@
 // Values currently are kept in memory (see: MemoryValue), but a future
 // implementation might exist for other storage backends, eg. etcd.
 //
-// Background and intended use
+// # Background and intended use
 //
 // The Event Value library is intended to be used within Metropolis'
 // supervisor-based runnables to communicate state changes to other runnables,
@@ -33,20 +33,20 @@
 // Why not just channels?
 //
 // Plain channels have multiple deficiencies for this usecase:
-//  - Strict FIFO behaviour: all values sent to a channel must be received, and
-//    historic and newest data must be treated in the same way. This means that
-//    a consumer of state changes must process all updates to the value as if
-//    they are the newest, and unable to skip rapid updates when a system is
-//    slowly settling due to a cascading state change.
-//  - Implementation overhead: implementing an observer
-//    registration/unregistration pattern is prone to programming bugs,
-//    especially for features like always first sending the current state to a
-//    new observer.
-//  - Strict buffer size: due to their FIFO nature and the possibility of
-//    consumers not receiving actively, channels would have to buffer all
-//    existing updates, requiring some arbitrary best-guess channel buffer
-//    sizing that would still not prevent blocking writes or data loss in a
-//    worst case scenario.
+//   - Strict FIFO behaviour: all values sent to a channel must be received, and
+//     historic and newest data must be treated in the same way. This means that
+//     a consumer of state changes must process all updates to the value as if
+//     they are the newest, and unable to skip rapid updates when a system is
+//     slowly settling due to a cascading state change.
+//   - Implementation overhead: implementing an observer
+//     registration/unregistration pattern is prone to programming bugs,
+//     especially for features like always first sending the current state to a
+//     new observer.
+//   - Strict buffer size: due to their FIFO nature and the possibility of
+//     consumers not receiving actively, channels would have to buffer all
+//     existing updates, requiring some arbitrary best-guess channel buffer
+//     sizing that would still not prevent blocking writes or data loss in a
+//     worst case scenario.
 //
 // Or, in other words: Go channels are a synchronization primitive, not a
 // ready-made solution to this problem. The Event Value implementation in fact
@@ -56,40 +56,30 @@
 //
 // Go's condition variable implementation doesn't fully address our needs
 // either:
-// - No context/canceling support: once a condition is being Wait()ed on,
-//   this cannot be interrupted. This is especially painful and unwieldy when
-//   dealing with context-heavy code, such as Metropolis.
-// - Spartan API: expecting users to plainly use sync.Cond is risky, as the API
-//   is fairly low-level.
-// - No solution for late consumers: late consumers (ones that missed the value
-//   being set by a producer) would still have to implement logic in order to
-//   find out such a value, as sync.Cond only supports what amounts to
-//   edge-level triggers as part of its Broadcast/Signal system.
+//   - No context/canceling support: once a condition is being Wait()ed on,
+//     this cannot be interrupted. This is especially painful and unwieldy when
+//     dealing with context-heavy code, such as Metropolis.
+//   - Spartan API: expecting users to plainly use sync.Cond is risky, as the API
+//     is fairly low-level.
+//   - No solution for late consumers: late consumers (ones that missed the value
+//     being set by a producer) would still have to implement logic in order to
+//     find out such a value, as sync.Cond only supports what amounts to
+//     edge-level triggers as part of its Broadcast/Signal system.
 //
 // It would be possible to implement MemoryValue using a sync.Cond internally,
 // but such an implementation would likely be more complex than the current
 // implementation based on channels and mutexes, as it would have to work
 // around issues like lack of canceling, etc.
-//
-// Type safety
-//
-// The Value/Watcher interfaces are, unfortunately, implemented using
-// interface{}. There was an attempt to use Go's existing generic types facility
-// (interfaces) to solve this problem. However, with Type Parameters likely soon
-// appearing in mainline Go, this was not a priority, as that will fully solve
-// this problem without requiring mental gymnastics. For now, users of this
-// library will have to write some boilerplate code to allow consumers/watchers
-// to access the data in a a typesafe manner without assertions. See
-// ExampleValue_full for one possible approach to this.
 package event
 
 import (
 	"context"
+	"errors"
 )
 
 // A Value is an 'Event Value', some piece of data that can be updated ('Set')
 // by Producers and retrieved by Consumers.
-type Value interface {
+type Value[T any] interface {
 	// Set updates the Value to the given data. It is safe to call this from
 	// multiple goroutines, including concurrently.
 	//
@@ -103,24 +93,24 @@
 	// All updates will be serialized in an arbitrary order - if multiple
 	// producers wish to perform concurrent actions to update the Value partially,
 	// this should be negotiated and serialized externally by the producers.
-	Set(val interface{})
+	Set(val T)
 
 	// ValueWatch implements the Watch method. It is split out into another
 	// interface to allow some 'Event Values' to implement only the watch/read
 	// part, with the write side being implicit or defined by a more complex
-	// interface then a simple Set().
-	ValueWatch
+	// interface than a simple Set().
+	ValueWatch[T]
 }
 
 // ValueWatch is the read side of an 'Event Value', witch can by retrieved by
 // Consumers by performing a Watch operation on it.
-type ValueWatch interface {
+type ValueWatch[T any] interface {
 	// Watch retrieves a Watcher that keeps track on the version of the data
 	// contained within the Value that was last seen by a consumer. Once a
 	// Watcher is retrieved, it can be used to then get the actual data stored
 	// within the Value, and to reliably retrieve updates to it without having
 	// to poll for changes.
-	Watch() Watcher
+	Watch() Watcher[T]
 }
 
 // A Watcher keeps track of the last version of data seen by a consumer for a
@@ -128,11 +118,11 @@
 // safe to use this type concurrently. However, it is safe to move/copy it
 // across different goroutines, as long as no two goroutines access it
 // simultaneously.
-type Watcher interface {
+type Watcher[T any] interface {
 	// Get blocks until a Value's data is available:
 	//  - On first use of a Watcher, Get will return the data contained in the
 	//    value at the time of calling .Watch(), or block if no data has been
-	//    .Set() on it yet. If a value has been Set() since the the initial
+	//    .Set() on it yet. If a value has been Set() since the initial
 	//    creation of the Watch() but before Get() is called for the first
 	//    time, the first Get() call will immediately return the new value.
 	//  - On subsequent uses of a Watcher, Get will block until the given Value
@@ -168,11 +158,51 @@
 	// continue skipping some updates.
 	// If multiple goroutines need to access the Value, they should each use
 	// their own Watcher.
-	Get(context.Context, ...GetOption) (interface{}, error)
+	Get(context.Context, ...GetOption[T]) (T, error)
 
 	// Close must be called if the Watcher is not going to be used anymore -
 	// otherwise, a goroutine will leak.
 	Close() error
 }
 
-type GetOption interface{}
+type GetOption[T any] struct {
+	Predicate   func(t T) bool
+	BacklogOnly bool
+}
+
+func Filter[T any](pred func(T) bool) GetOption[T] {
+	return GetOption[T]{
+		Predicate: pred,
+	}
+}
+
+// BacklogOnly will prevent Get from blocking on waiting for more updates from
+// etcd, by instead returning BacklogDone whenever no more data is currently
+// locally available. This is different however, from establishing that there
+// are no more pending updates from the etcd cluster - the only way to ensure
+// the local client is up to date is by performing Get calls without this option
+// set.
+//
+// This mode of retrieval should only be used for the retrieval of the existing
+// data in the etcd cluster on the initial creation of the Watcher (by
+// repeatedly calling Get until BacklogDone is returned), and shouldn't be set
+// for any subsequent call. Any use of this option after that initial fetch is
+// undefined behaviour that exposes the internals of the Get implementation, and
+// must not be relied on. However, in the future, this behaviour might be
+// formalized.
+//
+// This mode is particularly useful for ranged watchers. Non-ranged watchers can
+// still use this option to distinguish between blocking because of the
+// nonexistence of an object vs. blocking because of networking issues. However,
+// non-ranged retrieval semantics generally will rarely need to make this
+// distinction.
+func BacklogOnly[T any]() GetOption[T] {
+	return GetOption[T]{BacklogOnly: true}
+}
+
+var (
+	// BacklogDone is returned by Get when BacklogOnly is set and there is no more
+	// event data stored in the Watcher client, ie. when the initial cluster state
+	// of the requested key has been retrieved.
+	BacklogDone = errors.New("no more backlogged data")
+)
diff --git a/metropolis/pkg/event/memory/BUILD.bazel b/metropolis/pkg/event/memory/BUILD.bazel
index da07dc3..d8c1990 100644
--- a/metropolis/pkg/event/memory/BUILD.bazel
+++ b/metropolis/pkg/event/memory/BUILD.bazel
@@ -15,5 +15,4 @@
         "memory_test.go",
     ],
     embed = [":memory"],
-    deps = ["//metropolis/pkg/event"],
 )
diff --git a/metropolis/pkg/event/memory/example_test.go b/metropolis/pkg/event/memory/example_test.go
index a119666..583650c 100644
--- a/metropolis/pkg/event/memory/example_test.go
+++ b/metropolis/pkg/event/memory/example_test.go
@@ -21,8 +21,6 @@
 	"fmt"
 	"net"
 	"time"
-
-	"source.monogon.dev/metropolis/pkg/event"
 )
 
 // NetworkStatus is example data that will be stored in a Value.
@@ -31,35 +29,11 @@
 	DefaultGateway  net.IP
 }
 
-// NetworkStatusWatcher is a typesafe wrapper around a Watcher.
-type NetworkStatusWatcher struct {
-	watcher event.Watcher
-}
-
-// Get wraps Watcher.Get and performs type assertion.
-func (s *NetworkStatusWatcher) Get(ctx context.Context) (*NetworkStatus, error) {
-	val, err := s.watcher.Get(ctx)
-	if err != nil {
-		return nil, err
-	}
-	ns := val.(NetworkStatus)
-	return &ns, nil
-}
-
 // NetworkService is a fake/example network service that is responsible for
 // communicating the newest information about a machine's network configuration
 // to consumers/watchers.
 type NetworkService struct {
-	Provider Value
-}
-
-// Watch is a thin wrapper around Value.Watch that returns the typesafe
-// NetworkStatusWatcher wrapper.
-func (s *NetworkService) Watch() NetworkStatusWatcher {
-	watcher := s.Provider.Watch()
-	return NetworkStatusWatcher{
-		watcher: watcher,
-	}
+	Provider Value[NetworkStatus]
 }
 
 // Run pretends to execute the network service's main logic loop, in which it
@@ -113,7 +87,7 @@
 	// Run an /etc/hosts updater. It will watch for updates from the NetworkService
 	// about the current IP address of the node.
 	go func() {
-		w := ns.Watch()
+		w := ns.Provider.Watch()
 		for {
 			status, err := w.Get(ctx)
 			if err != nil {
diff --git a/metropolis/pkg/event/memory/memory.go b/metropolis/pkg/event/memory/memory.go
index 0db2524..f0a2ab9 100644
--- a/metropolis/pkg/event/memory/memory.go
+++ b/metropolis/pkg/event/memory/memory.go
@@ -18,6 +18,7 @@
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"sync"
 
@@ -29,25 +30,25 @@
 	// there currently is no code path that needs this to be strictly true. However,
 	// users of this library might want to rely on the Value type instead of
 	// particular Value implementations.
-	_ event.Value = &Value{}
+	_ event.Value[int] = &Value[int]{}
 )
 
 // Value is a 'memory value', which implements a event.Value stored in memory.
 // It is safe to construct an empty object of this type. However, this must not
 // be copied.
-type Value struct {
+type Value[T any] struct {
 	// mu guards the inner, innerSet and watchers fields.
 	mu sync.RWMutex
 	// inner is the latest data Set on the Value. It is used to provide the
 	// newest version of the Set data to new watchers.
-	inner interface{}
+	inner T
 	// innerSet is true when inner has been Set at least once. It is used to
 	// differentiate between a nil and unset value.
 	innerSet bool
 	// watchers is the list of watchers that should be updated when new data is
 	// Set. It will grow on every .Watch() and shrink any time a watcher is
 	// determined to have been closed.
-	watchers []*watcher
+	watchers []*watcher[T]
 
 	// Sync, if set to true, blocks all .Set() calls on the Value until all
 	// Watchers derived from it actively .Get() the new value. This can be used
@@ -65,7 +66,7 @@
 // multiple goroutines, including concurrently.
 //
 // For more information about guarantees, see event.Value.Set.
-func (m *Value) Set(val interface{}) {
+func (m *Value[T]) Set(val T) {
 	m.mu.Lock()
 	defer m.mu.Unlock()
 
@@ -75,7 +76,7 @@
 
 	// Go through all watchers, updating them on the new value and filtering out
 	// all closed watchers.
-	newWatchers := make([]*watcher, 0, len(m.watchers))
+	newWatchers := make([]*watcher[T], 0, len(m.watchers))
 	for _, w := range m.watchers {
 		w := w
 		if w.closed() {
@@ -89,15 +90,15 @@
 
 // watcher implements the event.Watcher interface for watchers returned by
 // Value.
-type watcher struct {
+type watcher[T any] struct {
 	// activeReqC is a channel used to request an active submission channel
 	// from a pending Get function, if any.
-	activeReqC chan chan interface{}
+	activeReqC chan chan T
 	// deadletterSubmitC is a channel used to communicate a value that
 	// attempted to be submitted via activeReqC. This will be received by the
 	// deadletter worker of this watcher and passed on to the next .Get call
 	// that occurs.
-	deadletterSubmitC chan interface{}
+	deadletterSubmitC chan T
 
 	// getSem is a channel-based semaphore (which is of size 1, and thus in
 	// fact a mutex) that is used to ensure that only a single .Get() call is
@@ -112,10 +113,10 @@
 // contained within the Value that was last seen by a consumer.
 //
 // For more information about guarantees, see event.Value.Watch.
-func (m *Value) Watch() event.Watcher {
-	waiter := &watcher{
-		activeReqC:        make(chan chan interface{}),
-		deadletterSubmitC: make(chan interface{}),
+func (m *Value[T]) Watch() event.Watcher[T] {
+	waiter := &watcher[T]{
+		activeReqC:        make(chan chan T),
+		deadletterSubmitC: make(chan T),
 		close:             make(chan struct{}),
 		getSem:            make(chan struct{}, 1),
 	}
@@ -143,9 +144,9 @@
 // It watches the deadletterSubmitC channel for updated data, and overrides
 // previously received data. Then, when a .Get() begins to pend (and respond to
 // activeReqC receives), the deadletter worker will deliver that value.
-func (m *watcher) deadletterWorker() {
+func (m *watcher[T]) deadletterWorker() {
 	// Current value, and flag to mark it as set (vs. nil).
-	var cur interface{}
+	var cur T
 	var set bool
 
 	for {
@@ -186,7 +187,7 @@
 }
 
 // closed returns whether this watcher has been closed.
-func (m *watcher) closed() bool {
+func (m *watcher[T]) closed() bool {
 	select {
 	case _, ok := <-m.close:
 		if !ok {
@@ -198,7 +199,7 @@
 }
 
 // update is the high level update-this-watcher function called by Value.
-func (m *watcher) update(sync bool, val interface{}) {
+func (m *watcher[T]) update(sync bool, val T) {
 	// If synchronous delivery was requested, block until a watcher .Gets it.
 	if sync {
 		c := <-m.activeReqC
@@ -224,38 +225,37 @@
 	}
 }
 
-func (m *watcher) Close() error {
+func (m *watcher[T]) Close() error {
 	close(m.deadletterSubmitC)
 	close(m.close)
 	return nil
 }
 
-// GetOption is a memory.Get-specific option passed to Get. Currently no options
-// are implemented.
-type GetOption struct {
-}
-
 // Get blocks until a Value's data is available. See event.Watcher.Get for
 // guarantees and more information.
-func (m *watcher) Get(ctx context.Context, opts ...event.GetOption) (interface{}, error) {
+func (m *watcher[T]) Get(ctx context.Context, opts ...event.GetOption[T]) (T, error) {
 	// Make sure we're the only active .Get call.
+	var empty T
 	select {
 	case m.getSem <- struct{}{}:
 	default:
-		return nil, fmt.Errorf("cannot Get() concurrently on a single waiter")
+		return empty, fmt.Errorf("cannot Get() concurrently on a single waiter")
 	}
 	defer func() {
 		<-m.getSem
 	}()
 
-	for _, optI := range opts {
-		_, ok := optI.(GetOption)
-		if !ok {
-			return nil, fmt.Errorf("get options must be of type memory.GetOption")
+	var predicate func(t T) bool
+	for _, opt := range opts {
+		if opt.Predicate != nil {
+			predicate = opt.Predicate
+		}
+		if opt.BacklogOnly != false {
+			return empty, errors.New("BacklogOnly is not implemented for memory watchers")
 		}
 	}
 
-	c := make(chan interface{})
+	c := make(chan T)
 
 	// Start responding on activeReqC. This signals to .update and to the
 	// deadletter worker that we're ready to accept data updates.
@@ -285,9 +285,12 @@
 	for {
 		select {
 		case <-ctx.Done():
-			return nil, ctx.Err()
+			return empty, ctx.Err()
 		case m.activeReqC <- c:
 		case val := <-c:
+			if predicate != nil && !predicate(val) {
+				continue
+			}
 			return val, nil
 		}
 	}
diff --git a/metropolis/pkg/event/memory/memory_test.go b/metropolis/pkg/event/memory/memory_test.go
index f4feb33..41f121e 100644
--- a/metropolis/pkg/event/memory/memory_test.go
+++ b/metropolis/pkg/event/memory/memory_test.go
@@ -28,7 +28,7 @@
 // TestAsync exercises the high-level behaviour of a Value, in which a
 // watcher is able to catch up to the newest Set value.
 func TestAsync(t *testing.T) {
-	p := Value{}
+	p := Value[int]{}
 	p.Set(0)
 
 	ctx := context.Background()
@@ -39,7 +39,7 @@
 	if err != nil {
 		t.Fatalf("Get: %v", err)
 	}
-	if want, got := 0, val.(int); want != got {
+	if want, got := 0, val; want != got {
 		t.Fatalf("Value: got %d, wanted %d", got, want)
 	}
 
@@ -54,7 +54,7 @@
 	if err != nil {
 		t.Fatalf("Get: %v", err)
 	}
-	if want, got := 100, val.(int); want != got {
+	if want, got := 100, val; want != got {
 		t.Fatalf("Value: got %d, wanted %d", got, want)
 	}
 }
@@ -64,7 +64,7 @@
 // This particular test ensures that .Set() calls to a Watcher result in a
 // prefect log of updates being transmitted to a watcher.
 func TestSync(t *testing.T) {
-	p := Value{
+	p := Value[int]{
 		Sync: true,
 	}
 	values := make(chan int, 100)
@@ -79,7 +79,7 @@
 			if err != nil {
 				panic(err)
 			}
-			values <- value.(int)
+			values <- value
 		}
 	}()
 
@@ -109,7 +109,7 @@
 // This particular test ensures that .Set() calls actually block when a watcher
 // is unattended.
 func TestSyncBlocks(t *testing.T) {
-	p := Value{
+	p := Value[int]{
 		Sync: true,
 	}
 	ctx := context.Background()
@@ -124,7 +124,7 @@
 	if err != nil {
 		t.Fatalf("Get: %v", err)
 	}
-	if want, got := 0, value.(int); want != got {
+	if want, got := 0, value; want != got {
 		t.Fatalf("Got initial value %d, wanted %d", got, want)
 	}
 
@@ -160,7 +160,7 @@
 		t.Fatalf("Set() returned before Get()")
 	}
 
-	if want, got := 1, value.(int); want != got {
+	if want, got := 1, value; want != got {
 		t.Fatalf("Wanted value %d, got %d", want, got)
 	}
 
@@ -175,7 +175,7 @@
 // TestMultipleGets verifies that calling .Get() on a single watcher from two
 // goroutines is prevented by returning an error in exactly one of them.
 func TestMultipleGets(t *testing.T) {
-	p := Value{}
+	p := Value[int]{}
 	ctx := context.Background()
 
 	w := p.Watch()
@@ -204,7 +204,7 @@
 func TestConcurrency(t *testing.T) {
 	ctx := context.Background()
 
-	p := Value{}
+	p := Value[int]{}
 	p.Set(0)
 
 	// Number of watchers to create.
@@ -236,10 +236,10 @@
 				}
 
 				// Ensure monotonicity of received data.
-				if val.(int) <= prev {
+				if val <= prev {
 					done(fmt.Errorf("received out of order data: %d after %d", val, prev))
 				}
-				prev = val.(int)
+				prev = val
 
 				// Quit when the final value is received.
 				if val == final {
@@ -274,7 +274,7 @@
 // aborts that particular Get call, but also allows subsequent use of the same
 // watcher.
 func TestCanceling(t *testing.T) {
-	p := Value{
+	p := Value[int]{
 		Sync: true,
 	}
 
@@ -316,7 +316,7 @@
 func TestSetAfterWatch(t *testing.T) {
 	ctx := context.Background()
 
-	p := Value{}
+	p := Value[int]{}
 	p.Set(0)
 
 	watcher := p.Watch()
@@ -326,7 +326,7 @@
 	if err != nil {
 		t.Fatalf("Get: %v", err)
 	}
-	if want, got := 1, data.(int); want != got {
+	if want, got := 1, data; want != got {
 		t.Errorf("Get should've returned %v, got %v", want, got)
 	}
 }
diff --git a/metropolis/pkg/pki/crl.go b/metropolis/pkg/pki/crl.go
index 8b886bf..23838a1 100644
--- a/metropolis/pkg/pki/crl.go
+++ b/metropolis/pkg/pki/crl.go
@@ -145,8 +145,8 @@
 
 // WatchCRL returns and Event Value compatible CRLWatcher which can be used to
 // retrieve and watch for the newest CRL available from this CA certificate.
-func (c *Certificate) WatchCRL(cl client.Namespaced) CRLWatcher {
-	value := etcd.NewValue(cl, c.crlPath(), func(_, data []byte) (interface{}, error) {
+func (c *Certificate) WatchCRL(cl client.Namespaced) event.Watcher[*CRL] {
+	value := etcd.NewValue(cl, c.crlPath(), func(_, data []byte) (*CRL, error) {
 		crl, err := x509.ParseCRL(data)
 		if err != nil {
 			return nil, fmt.Errorf("could not parse CRL from etcd: %w", err)
@@ -156,29 +156,10 @@
 			List: crl,
 		}, nil
 	})
-	return CRLWatcher{value.Watch()}
-}
-
-// CRLWatcher is a Event Value compatible Watcher which will be updated any time
-// a given CA certificate's CRL gets updated.
-type CRLWatcher struct {
-	event.Watcher
+	return value.Watch()
 }
 
 type CRL struct {
 	Raw  []byte
 	List *pkix.CertificateList
 }
-
-// Retrieve the newest available CRL from etcd, blocking until one is available
-// or updated.
-//
-// The first call will block until a CRL is available, which happens the first
-// time a given CA certificate is stored in etcd (eg. through an Ensure call).
-func (c *CRLWatcher) Get(ctx context.Context, opts ...event.GetOption) (*CRL, error) {
-	v, err := c.Watcher.Get(ctx, opts...)
-	if err != nil {
-		return nil, err
-	}
-	return v.(*CRL), nil
-}