m/pkg/event: make type-safe

This is a fairly large change which makes use of Go type parameters
(“generics”) to make the event library (and its memory/etcd
implementations) type safe.

Since we now have the event.Value interface strongly typed, we also move
options which were implementation-specific (like BacklogOnly)
to be part of that interface, instead of the previously type-asserted
specific implementations. Use of options that are not handled by a
particular implementation is a runtime error. Expressing this in the
type system is probably not worth the effort.

We also implement Filter to allow offloading some of the functionality previously implemented in type assertion wrappers into the library itself.

In the end, this ends up removing a bunch of type assertion code, at
the cost of a fairly sweeping change. Unfortunately, some of this is due
to IntelliJ suddenly deciding to reformat comments.

Change-Id: I1ca6d93db1b5c4055a21af3fb9e5e3d425c0d86e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1322
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/pkg/event/memory/memory.go b/metropolis/pkg/event/memory/memory.go
index 0db2524..f0a2ab9 100644
--- a/metropolis/pkg/event/memory/memory.go
+++ b/metropolis/pkg/event/memory/memory.go
@@ -18,6 +18,7 @@
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"sync"
 
@@ -29,25 +30,25 @@
 	// there currently is no code path that needs this to be strictly true. However,
 	// users of this library might want to rely on the Value type instead of
 	// particular Value implementations.
-	_ event.Value = &Value{}
+	_ event.Value[int] = &Value[int]{}
 )
 
 // Value is a 'memory value', which implements a event.Value stored in memory.
 // It is safe to construct an empty object of this type. However, this must not
 // be copied.
-type Value struct {
+type Value[T any] struct {
 	// mu guards the inner, innerSet and watchers fields.
 	mu sync.RWMutex
 	// inner is the latest data Set on the Value. It is used to provide the
 	// newest version of the Set data to new watchers.
-	inner interface{}
+	inner T
 	// innerSet is true when inner has been Set at least once. It is used to
 	// differentiate between a nil and unset value.
 	innerSet bool
 	// watchers is the list of watchers that should be updated when new data is
 	// Set. It will grow on every .Watch() and shrink any time a watcher is
 	// determined to have been closed.
-	watchers []*watcher
+	watchers []*watcher[T]
 
 	// Sync, if set to true, blocks all .Set() calls on the Value until all
 	// Watchers derived from it actively .Get() the new value. This can be used
@@ -65,7 +66,7 @@
 // multiple goroutines, including concurrently.
 //
 // For more information about guarantees, see event.Value.Set.
-func (m *Value) Set(val interface{}) {
+func (m *Value[T]) Set(val T) {
 	m.mu.Lock()
 	defer m.mu.Unlock()
 
@@ -75,7 +76,7 @@
 
 	// Go through all watchers, updating them on the new value and filtering out
 	// all closed watchers.
-	newWatchers := make([]*watcher, 0, len(m.watchers))
+	newWatchers := make([]*watcher[T], 0, len(m.watchers))
 	for _, w := range m.watchers {
 		w := w
 		if w.closed() {
@@ -89,15 +90,15 @@
 
 // watcher implements the event.Watcher interface for watchers returned by
 // Value.
-type watcher struct {
+type watcher[T any] struct {
 	// activeReqC is a channel used to request an active submission channel
 	// from a pending Get function, if any.
-	activeReqC chan chan interface{}
+	activeReqC chan chan T
 	// deadletterSubmitC is a channel used to communicate a value that
 	// attempted to be submitted via activeReqC. This will be received by the
 	// deadletter worker of this watcher and passed on to the next .Get call
 	// that occurs.
-	deadletterSubmitC chan interface{}
+	deadletterSubmitC chan T
 
 	// getSem is a channel-based semaphore (which is of size 1, and thus in
 	// fact a mutex) that is used to ensure that only a single .Get() call is
@@ -112,10 +113,10 @@
 // contained within the Value that was last seen by a consumer.
 //
 // For more information about guarantees, see event.Value.Watch.
-func (m *Value) Watch() event.Watcher {
-	waiter := &watcher{
-		activeReqC:        make(chan chan interface{}),
-		deadletterSubmitC: make(chan interface{}),
+func (m *Value[T]) Watch() event.Watcher[T] {
+	waiter := &watcher[T]{
+		activeReqC:        make(chan chan T),
+		deadletterSubmitC: make(chan T),
 		close:             make(chan struct{}),
 		getSem:            make(chan struct{}, 1),
 	}
@@ -143,9 +144,9 @@
 // It watches the deadletterSubmitC channel for updated data, and overrides
 // previously received data. Then, when a .Get() begins to pend (and respond to
 // activeReqC receives), the deadletter worker will deliver that value.
-func (m *watcher) deadletterWorker() {
+func (m *watcher[T]) deadletterWorker() {
 	// Current value, and flag to mark it as set (vs. nil).
-	var cur interface{}
+	var cur T
 	var set bool
 
 	for {
@@ -186,7 +187,7 @@
 }
 
 // closed returns whether this watcher has been closed.
-func (m *watcher) closed() bool {
+func (m *watcher[T]) closed() bool {
 	select {
 	case _, ok := <-m.close:
 		if !ok {
@@ -198,7 +199,7 @@
 }
 
 // update is the high level update-this-watcher function called by Value.
-func (m *watcher) update(sync bool, val interface{}) {
+func (m *watcher[T]) update(sync bool, val T) {
 	// If synchronous delivery was requested, block until a watcher .Gets it.
 	if sync {
 		c := <-m.activeReqC
@@ -224,38 +225,37 @@
 	}
 }
 
-func (m *watcher) Close() error {
+func (m *watcher[T]) Close() error {
 	close(m.deadletterSubmitC)
 	close(m.close)
 	return nil
 }
 
-// GetOption is a memory.Get-specific option passed to Get. Currently no options
-// are implemented.
-type GetOption struct {
-}
-
 // Get blocks until a Value's data is available. See event.Watcher.Get for
 // guarantees and more information.
-func (m *watcher) Get(ctx context.Context, opts ...event.GetOption) (interface{}, error) {
+func (m *watcher[T]) Get(ctx context.Context, opts ...event.GetOption[T]) (T, error) {
 	// Make sure we're the only active .Get call.
+	var empty T
 	select {
 	case m.getSem <- struct{}{}:
 	default:
-		return nil, fmt.Errorf("cannot Get() concurrently on a single waiter")
+		return empty, fmt.Errorf("cannot Get() concurrently on a single waiter")
 	}
 	defer func() {
 		<-m.getSem
 	}()
 
-	for _, optI := range opts {
-		_, ok := optI.(GetOption)
-		if !ok {
-			return nil, fmt.Errorf("get options must be of type memory.GetOption")
+	var predicate func(t T) bool
+	for _, opt := range opts {
+		if opt.Predicate != nil {
+			predicate = opt.Predicate
+		}
+		if opt.BacklogOnly != false {
+			return empty, errors.New("BacklogOnly is not implemented for memory watchers")
 		}
 	}
 
-	c := make(chan interface{})
+	c := make(chan T)
 
 	// Start responding on activeReqC. This signals to .update and to the
 	// deadletter worker that we're ready to accept data updates.
@@ -285,9 +285,12 @@
 	for {
 		select {
 		case <-ctx.Done():
-			return nil, ctx.Err()
+			return empty, ctx.Err()
 		case m.activeReqC <- c:
 		case val := <-c:
+			if predicate != nil && !predicate(val) {
+				continue
+			}
 			return val, nil
 		}
 	}