m/pkg/event: make type-safe

This is a fairly large change which makes use of Go type parameters
(“generics”) to make the event library (and its memory/etcd
implementations) type safe.

Since we now have the event.Value interface strongly typed, we also move
options which were implementation-specific (like BacklogOnly)
to be part of that interface, instead of the previously type-asserted
specific implementations. Use of options that are not handled by a
particular implementation is a runtime error. Expressing this in the
type system is probably not worth the effort.

We also implement Filter to allow offloading some of the functionality previously implemented in type assertion wrappers into the library itself.

In the end, this ends up removing a bunch of type assertion code, at
the cost of a fairly sweeping change. Unfortunately, some of this is due
to IntelliJ suddenly deciding to reformat comments.

Change-Id: I1ca6d93db1b5c4055a21af3fb9e5e3d425c0d86e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1322
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 5a0e2f6..0d0997d 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -6,19 +6,18 @@
 // cluster's curator, updates the status of the node within the curator, and
 // spawns on-demand services.
 //
-//
-//  .-----------.          .--------.  Watches  .------------.
-//  | Cluster   |--------->| Role   |<----------| Node Roles |
-//  | Enrolment | Provides | Server |  Updates  '------------'
-//  '-----------'   Data   |        |----.      .-------------.
-//                         '--------'    '----->| Node Status |
-//                    Spawns |    | Spawns      '-------------'
-//                     .-----'    '-----.
-//                     V                V
-//                 .-----------. .------------.
-//                 | Consensus | | Kubernetes |
-//                 | & Curator | |            |
-//                 '-----------' '------------'
+//	.-----------.          .--------.  Watches  .------------.
+//	| Cluster   |--------->| Role   |<----------| Node Roles |
+//	| Enrolment | Provides | Server |  Updates  '------------'
+//	'-----------'   Data   |        |----.      .-------------.
+//	                       '--------'    '----->| Node Status |
+//	                  Spawns |    | Spawns      '-------------'
+//	                   .-----'    '-----.
+//	                   V                V
+//	               .-----------. .------------.
+//	               | Consensus | | Kubernetes |
+//	               | & Curator | |            |
+//	               '-----------' '------------'
 //
 // The internal state of the Role Server (eg. status of services, input from
 // Cluster Enrolment, current node roles as retrieved from the cluster) is
@@ -38,7 +37,6 @@
 // or remote). It is updated both by external processes (ie. data from the
 // Cluster Enrolment) as well as logic responsible for spawning the control
 // plane.
-//
 package roleserve
 
 import (
@@ -50,6 +48,7 @@
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/node/core/rpc/resolver"
+	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
@@ -76,10 +75,10 @@
 type Service struct {
 	Config
 
-	ClusterMembership ClusterMembershipValue
-	KubernetesStatus  KubernetesStatusValue
-	bootstrapData     bootstrapDataValue
-	localRoles        localRolesValue
+	ClusterMembership memory.Value[*ClusterMembership]
+	KubernetesStatus  memory.Value[*KubernetesStatus]
+	bootstrapData     memory.Value[*bootstrapData]
+	localRoles        memory.Value[*cpb.NodeRoles]
 
 	controlPlane *workerControlPlane
 	statusPush   *workerStatusPush
@@ -141,11 +140,11 @@
 	// available on the loopback interface.
 	s.Resolver.AddOverride(nid, resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
 
-	s.ClusterMembership.set(&ClusterMembership{
+	s.ClusterMembership.Set(&ClusterMembership{
 		pubkey:   pubkey,
 		resolver: s.Resolver,
 	})
-	s.bootstrapData.set(&bootstrapData{
+	s.bootstrapData.Set(&bootstrapData{
 		nodePrivateKey:     privkey,
 		initialOwnerKey:    iok,
 		clusterUnlockKey:   cuk,
@@ -159,7 +158,7 @@
 	// available on the loopback interface.
 	s.Resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
 
-	s.ClusterMembership.set(&ClusterMembership{
+	s.ClusterMembership.Set(&ClusterMembership{
 		remoteCurators: directory,
 		credentials:    &credentials,
 		pubkey:         credentials.PublicKey(),
@@ -172,7 +171,7 @@
 	// available on the loopback interface.
 	s.Resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
 
-	s.ClusterMembership.set(&ClusterMembership{
+	s.ClusterMembership.Set(&ClusterMembership{
 		remoteCurators: directory,
 		credentials:    &credentials,
 		pubkey:         credentials.PublicKey(),
diff --git a/metropolis/node/core/roleserve/value_bootstrapdata.go b/metropolis/node/core/roleserve/value_bootstrapdata.go
index 85618bc..29a6ae2 100644
--- a/metropolis/node/core/roleserve/value_bootstrapdata.go
+++ b/metropolis/node/core/roleserve/value_bootstrapdata.go
@@ -1,11 +1,7 @@
 package roleserve
 
 import (
-	"context"
 	"crypto/ed25519"
-
-	"source.monogon.dev/metropolis/pkg/event"
-	"source.monogon.dev/metropolis/pkg/event/memory"
 )
 
 // bootstrapData is an internal EventValue structure which is populated by the
@@ -19,29 +15,3 @@
 	initialOwnerKey    []byte
 	nodePrivateJoinKey ed25519.PrivateKey
 }
-
-type bootstrapDataValue struct {
-	value memory.Value
-}
-
-func (c *bootstrapDataValue) Watch() *bootstrapDataWatcher {
-	return &bootstrapDataWatcher{
-		Watcher: c.value.Watch(),
-	}
-}
-
-func (c *bootstrapDataValue) set(v *bootstrapData) {
-	c.value.Set(v)
-}
-
-type bootstrapDataWatcher struct {
-	event.Watcher
-}
-
-func (c *bootstrapDataWatcher) Get(ctx context.Context) (*bootstrapData, error) {
-	v, err := c.Watcher.Get(ctx)
-	if err != nil {
-		return nil, err
-	}
-	return v.(*bootstrapData), nil
-}
diff --git a/metropolis/node/core/roleserve/value_clustermembership.go b/metropolis/node/core/roleserve/value_clustermembership.go
index f14e2c4..e956d10 100644
--- a/metropolis/node/core/roleserve/value_clustermembership.go
+++ b/metropolis/node/core/roleserve/value_clustermembership.go
@@ -13,7 +13,6 @@
 	"source.monogon.dev/metropolis/node/core/rpc"
 	"source.monogon.dev/metropolis/node/core/rpc/resolver"
 	"source.monogon.dev/metropolis/pkg/event"
-	"source.monogon.dev/metropolis/pkg/event/memory"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
@@ -53,44 +52,14 @@
 	resolver *resolver.Resolver
 }
 
-type ClusterMembershipValue struct {
-	value memory.Value
-}
-
-func (c *ClusterMembershipValue) Watch() *ClusterMembershipWatcher {
-	return &ClusterMembershipWatcher{
-		w: c.value.Watch(),
-	}
-}
-
-func (c *ClusterMembershipValue) set(v *ClusterMembership) {
-	c.value.Set(v)
-}
-
-type ClusterMembershipWatcher struct {
-	w event.Watcher
-}
-
-func (c *ClusterMembershipWatcher) Close() error {
-	return c.w.Close()
-}
-
-func (c *ClusterMembershipWatcher) getAny(ctx context.Context) (*ClusterMembership, error) {
-	v, err := c.w.Get(ctx)
-	if err != nil {
-		return nil, err
-	}
-	return v.(*ClusterMembership), nil
-}
-
 // GetNodeID returns the Node ID of the locally running node whenever available.
 // NodeIDs are available early on in the node startup process and are guaranteed
 // to never change at runtime. The watcher will then block all further Get calls
 // until new information is available. This method should only be used if
 // GetNodeID is the only method ran on the watcher.
-func (c *ClusterMembershipWatcher) GetNodeID(ctx context.Context) (string, error) {
+func GetNodeID(ctx context.Context, watcher event.Watcher[*ClusterMembership]) (string, error) {
 	for {
-		cm, err := c.getAny(ctx)
+		cm, err := watcher.Get(ctx)
 		if err != nil {
 			return "", err
 		}
@@ -105,20 +74,16 @@
 // the cluster's Curator). See proto.common.ClusterState for more information
 // about cluster states. The watcher will then block all futher Get calls until
 // new information is available.
-func (c *ClusterMembershipWatcher) GetHome(ctx context.Context) (*ClusterMembership, error) {
-	for {
-		cm, err := c.getAny(ctx)
-		if err != nil {
-			return nil, err
-		}
+func FilterHome() event.GetOption[*ClusterMembership] {
+	return event.Filter(func(cm *ClusterMembership) bool {
 		if cm.credentials == nil {
-			continue
+			return false
 		}
 		if cm.remoteCurators == nil {
-			continue
+			return false
 		}
-		return cm, nil
-	}
+		return true
+	})
 }
 
 // DialCurator returns an authenticated gRPC client connection to the Curator
diff --git a/metropolis/node/core/roleserve/value_kubernetes.go b/metropolis/node/core/roleserve/value_kubernetes.go
index 4334dc3..88bdfeb 100644
--- a/metropolis/node/core/roleserve/value_kubernetes.go
+++ b/metropolis/node/core/roleserve/value_kubernetes.go
@@ -1,11 +1,7 @@
 package roleserve
 
 import (
-	"context"
-
 	"source.monogon.dev/metropolis/node/kubernetes"
-	"source.monogon.dev/metropolis/pkg/event"
-	"source.monogon.dev/metropolis/pkg/event/memory"
 )
 
 // KubernetesStatus is an Event Value structure populated by a running
@@ -14,33 +10,3 @@
 type KubernetesStatus struct {
 	Svc *kubernetes.Service
 }
-
-type KubernetesStatusValue struct {
-	value memory.Value
-}
-
-func (k *KubernetesStatusValue) Watch() *KubernetesStatusWatcher {
-	return &KubernetesStatusWatcher{
-		Watcher: k.value.Watch(),
-	}
-}
-
-func (k *KubernetesStatusValue) set(v *KubernetesStatus) {
-	k.value.Set(v)
-}
-
-type KubernetesStatusWatcher struct {
-	event.Watcher
-}
-
-// Get waits until the Kubernetes services is available. The returned
-// KubernetesStatus is guaranteed to contain a kubernetes.Service that was
-// running at the time of this call returning (but which might have since been
-// stopped).
-func (k *KubernetesStatusWatcher) Get(ctx context.Context) (*KubernetesStatus, error) {
-	v, err := k.Watcher.Get(ctx)
-	if err != nil {
-		return nil, err
-	}
-	return v.(*KubernetesStatus), nil
-}
diff --git a/metropolis/node/core/roleserve/value_node.go b/metropolis/node/core/roleserve/value_node.go
index f095f2f..26050ee 100644
--- a/metropolis/node/core/roleserve/value_node.go
+++ b/metropolis/node/core/roleserve/value_node.go
@@ -1,36 +1 @@
 package roleserve
-
-import (
-	"context"
-
-	"source.monogon.dev/metropolis/pkg/event"
-	"source.monogon.dev/metropolis/pkg/event/memory"
-	cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-type localRolesValue struct {
-	value memory.Value
-}
-
-func (c *localRolesValue) Watch() *localRolesWatcher {
-	return &localRolesWatcher{
-		Watcher: c.value.Watch(),
-	}
-}
-
-func (c *localRolesValue) set(v *cpb.NodeRoles) {
-	c.value.Set(v)
-}
-
-type localRolesWatcher struct {
-	event.Watcher
-}
-
-// Get retrieves the roles assigned to the local node by the cluster.
-func (c *localRolesWatcher) Get(ctx context.Context) (*cpb.NodeRoles, error) {
-	v, err := c.Watcher.Get(ctx)
-	if err != nil {
-		return nil, err
-	}
-	return v.(*cpb.NodeRoles), nil
-}
diff --git a/metropolis/node/core/roleserve/worker_controlplane.go b/metropolis/node/core/roleserve/worker_controlplane.go
index 5c8c4b2..354e67e 100644
--- a/metropolis/node/core/roleserve/worker_controlplane.go
+++ b/metropolis/node/core/roleserve/worker_controlplane.go
@@ -44,11 +44,11 @@
 	storageRoot *localstorage.Root
 
 	// bootstrapData will be read.
-	bootstrapData *bootstrapDataValue
+	bootstrapData *memory.Value[*bootstrapData]
 	// clusterMembership will be read and written.
-	clusterMembership *ClusterMembershipValue
+	clusterMembership *memory.Value[*ClusterMembership]
 	// localRoles will be read.
-	localRoles *localRolesValue
+	localRoles *memory.Value[*cpb.NodeRoles]
 	// resolver will be read and used to populate ClusterMembership.
 	resolver *resolver.Resolver
 }
@@ -98,7 +98,7 @@
 	//                                             |
 	//         NodeRoles -M-> rolesC --------------'
 	//
-	var startupV memory.Value
+	var startupV memory.Value[*controlPlaneStartup]
 
 	// Channels are used as intermediaries between map stages and the final reduce,
 	// which is okay as long as the entire tree restarts simultaneously (which we
@@ -126,7 +126,7 @@
 			w := s.clusterMembership.Watch()
 			defer w.Close()
 			for {
-				v, err := w.GetHome(ctx)
+				v, err := w.Get(ctx, FilterHome())
 				if err != nil {
 					return err
 				}
@@ -264,11 +264,10 @@
 		// Read config from startupV.
 		w := startupV.Watch()
 		defer w.Close()
-		startupI, err := w.Get(ctx)
+		startup, err := w.Get(ctx)
 		if err != nil {
 			return err
 		}
-		startup := startupI.(*controlPlaneStartup)
 
 		// Start Control Plane if we have a config.
 		if startup.consensusConfig == nil {
@@ -424,7 +423,7 @@
 
 			// We now have a locally running ControlPlane. Reflect that in a new
 			// ClusterMembership.
-			s.clusterMembership.set(&ClusterMembership{
+			s.clusterMembership.Set(&ClusterMembership{
 				localConsensus: con,
 				localCurator:   cur,
 				credentials:    creds,
@@ -440,11 +439,10 @@
 		// Not restarting on every single change prevents us from going in a
 		// ClusterMembership -> ClusterDirectory -> ClusterMembership thrashing loop.
 		for {
-			ncI, err := w.Get(ctx)
+			nc, err := w.Get(ctx)
 			if err != nil {
 				return err
 			}
-			nc := ncI.(*controlPlaneStartup)
 			if nc.changed(startup) {
 				supervisor.Logger(ctx).Infof("Configuration changed, restarting...")
 				return fmt.Errorf("config changed, restarting")
diff --git a/metropolis/node/core/roleserve/worker_heartbeat.go b/metropolis/node/core/roleserve/worker_heartbeat.go
index db06845..fdaa9be 100644
--- a/metropolis/node/core/roleserve/worker_heartbeat.go
+++ b/metropolis/node/core/roleserve/worker_heartbeat.go
@@ -9,6 +9,7 @@
 	"source.monogon.dev/metropolis/node/core/curator"
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
@@ -18,7 +19,7 @@
 	network *network.Service
 
 	// clusterMembership will be read.
-	clusterMembership *ClusterMembershipValue
+	clusterMembership *memory.Value[*ClusterMembership]
 }
 
 func (s *workerHeartbeat) run(ctx context.Context) error {
@@ -28,7 +29,7 @@
 	w := s.clusterMembership.Watch()
 	defer w.Close()
 	supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
-	cm, err := w.GetHome(ctx)
+	cm, err := w.Get(ctx, FilterHome())
 	if err != nil {
 		return err
 	}
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index 3be3c41..e31cf59 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -5,6 +5,7 @@
 	"fmt"
 	"net"
 
+	"source.monogon.dev/metropolis/node/core/consensus"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/node/kubernetes"
@@ -26,9 +27,9 @@
 	network     *network.Service
 	storageRoot *localstorage.Root
 
-	localRoles        *localRolesValue
-	clusterMembership *ClusterMembershipValue
-	kubernetesStatus  *KubernetesStatusValue
+	localRoles        *memory.Value[*cpb.NodeRoles]
+	clusterMembership *memory.Value[*ClusterMembership]
+	kubernetesStatus  *memory.Value[*KubernetesStatus]
 }
 
 // kubernetesStartup is used internally to provide a reduced (as in MapReduce
@@ -64,7 +65,7 @@
 	//                                             |
 	//         NodeRoles -M-> rolesC --------------'
 	//
-	var startupV memory.Value
+	var startupV memory.Value[*kubernetesStartup]
 
 	clusterMembershipC := make(chan *ClusterMembership)
 	rolesC := make(chan *cpb.NodeRoles)
@@ -76,7 +77,7 @@
 			w := s.clusterMembership.Watch()
 			defer w.Close()
 			for {
-				v, err := w.GetHome(ctx)
+				v, err := w.Get(ctx, FilterHome())
 				if err != nil {
 					return err
 				}
@@ -127,11 +128,11 @@
 		// KubernetesController local role.
 		var d *kubernetesStartup
 		for {
-			dV, err := w.Get(ctx)
+			var err error
+			d, err = w.Get(ctx)
 			if err != nil {
 				return err
 			}
-			d = dV.(*kubernetesStartup)
 			supervisor.Logger(ctx).Infof("Got new startup data.")
 			if d.roles.KubernetesController == nil {
 				supervisor.Logger(ctx).Infof("No Kubernetes role, not starting.")
@@ -144,10 +145,10 @@
 
 			break
 		}
-		supervisor.Logger(ctx).Infof("Waiting for local consensus...")
+		supervisor.Logger(ctx).Infof("Waiting for local consensus (%+v)...")
 		cstW := d.membership.localConsensus.Watch()
 		defer cstW.Close()
-		cst, err := cstW.GetRunning(ctx)
+		cst, err := cstW.Get(ctx, consensus.FilterRunning)
 		if err != nil {
 			return fmt.Errorf("waiting for local consensus: %w", err)
 		}
@@ -197,7 +198,7 @@
 		}
 
 		// Let downstream know that Kubernetes is running.
-		s.kubernetesStatus.set(&KubernetesStatus{
+		s.kubernetesStatus.Set(&KubernetesStatus{
 			Svc: kubeSvc,
 		})
 
@@ -206,11 +207,10 @@
 		// Restart everything if we get a significantly different config (ie., a config
 		// whose change would/should either turn up or tear down Kubernetes).
 		for {
-			ncI, err := w.Get(ctx)
+			nc, err := w.Get(ctx)
 			if err != nil {
 				return err
 			}
-			nc := ncI.(*kubernetesStartup)
 			if nc.changed(d) {
 				supervisor.Logger(ctx).Errorf("watcher got new config, restarting")
 				return fmt.Errorf("restarting")
diff --git a/metropolis/node/core/roleserve/worker_rolefetch.go b/metropolis/node/core/roleserve/worker_rolefetch.go
index 3856f62..5f64676 100644
--- a/metropolis/node/core/roleserve/worker_rolefetch.go
+++ b/metropolis/node/core/roleserve/worker_rolefetch.go
@@ -5,24 +5,26 @@
 	"fmt"
 
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
 // workerRoleFetch is the Role Fetcher, an internal bookkeeping service
 // responsible for populating localRoles based on a clusterMembership whenever
 // the node is HOME and cluster credentials / curator access is available.
 type workerRoleFetch struct {
-	clusterMembership *ClusterMembershipValue
+	clusterMembership *memory.Value[*ClusterMembership]
 
 	// localRoles will be written.
-	localRoles *localRolesValue
+	localRoles *memory.Value[*cpb.NodeRoles]
 }
 
 func (s *workerRoleFetch) run(ctx context.Context) error {
 	w := s.clusterMembership.Watch()
 	defer w.Close()
 	supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
-	cm, err := w.GetHome(ctx)
+	cm, err := w.Get(ctx, FilterHome())
 	if err != nil {
 		return err
 	}
@@ -70,7 +72,7 @@
 			if n.Roles.KubernetesWorker != nil {
 				supervisor.Logger(ctx).Infof(" - kubernetes worker")
 			}
-			s.localRoles.set(n.Roles)
+			s.localRoles.Set(n.Roles)
 			break
 		}
 	}
diff --git a/metropolis/node/core/roleserve/worker_statuspush.go b/metropolis/node/core/roleserve/worker_statuspush.go
index 732508a..ed806a0 100644
--- a/metropolis/node/core/roleserve/worker_statuspush.go
+++ b/metropolis/node/core/roleserve/worker_statuspush.go
@@ -10,6 +10,7 @@
 	common "source.monogon.dev/metropolis/node"
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
@@ -20,7 +21,7 @@
 	network *network.Service
 
 	// clusterMembership will be read.
-	clusterMembership *ClusterMembershipValue
+	clusterMembership *memory.Value[*ClusterMembership]
 }
 
 // workerStatusPushChannels contain all the channels between the status pusher's
@@ -142,7 +143,7 @@
 		defer w.Close()
 		supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
 		for {
-			cm, err := w.GetHome(ctx)
+			cm, err := w.Get(ctx, FilterHome())
 			if err != nil {
 				return fmt.Errorf("getting cluster membership status failed: %w", err)
 			}