m/pkg/event: implement Pipe helper

This simplifies some of the roleserver code, and is possible now that
event values are strongly typed.

Change-Id: I0a22ff97fe4304d35cc9cee105e98bc224d1433b
Reviewed-on: https://review.monogon.dev/c/monogon/+/1323
Reviewed-by: Leopold Schabel <leo@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/roleserve/worker_controlplane.go b/metropolis/node/core/roleserve/worker_controlplane.go
index 354e67e..e5c6efb 100644
--- a/metropolis/node/core/roleserve/worker_controlplane.go
+++ b/metropolis/node/core/roleserve/worker_controlplane.go
@@ -16,6 +16,7 @@
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/rpc/resolver"
+	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/pki"
 	"source.monogon.dev/metropolis/pkg/supervisor"
@@ -109,43 +110,11 @@
 
 	supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
 		// Plain conversion from Event Value to channel.
-		"map-bootstrap-data": func(ctx context.Context) error {
-			w := s.bootstrapData.Watch()
-			defer w.Close()
-			for {
-				v, err := w.Get(ctx)
-				if err != nil {
-					return err
-				}
-				bootstrapDataC <- v
-			}
-		},
+		"map-bootstrap-data": event.Pipe[*bootstrapData](s.bootstrapData, bootstrapDataC),
 		// Plain conversion from Event Value to channel.
-		"map-cluster-membership": func(ctx context.Context) error {
-			supervisor.Signal(ctx, supervisor.SignalHealthy)
-			w := s.clusterMembership.Watch()
-			defer w.Close()
-			for {
-				v, err := w.Get(ctx, FilterHome())
-				if err != nil {
-					return err
-				}
-				clusterMembershipC <- v
-			}
-		},
+		"map-cluster-membership": event.Pipe[*ClusterMembership](s.clusterMembership, clusterMembershipC, FilterHome()),
 		// Plain conversion from Event Value to channel.
-		"map-roles": func(ctx context.Context) error {
-			supervisor.Signal(ctx, supervisor.SignalHealthy)
-			w := s.localRoles.Watch()
-			defer w.Close()
-			for {
-				v, err := w.Get(ctx)
-				if err != nil {
-					return err
-				}
-				rolesC <- v
-			}
-		},
+		"map-roles": event.Pipe[*cpb.NodeRoles](s.localRoles, rolesC),
 		// Provide config from clusterMembership and roles.
 		"reduce-config": func(ctx context.Context) error {
 			supervisor.Signal(ctx, supervisor.SignalHealthy)
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index e31cf59..8e1030c 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -11,6 +11,7 @@
 	"source.monogon.dev/metropolis/node/kubernetes"
 	"source.monogon.dev/metropolis/node/kubernetes/containerd"
 	kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
+	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	cpb "source.monogon.dev/metropolis/proto/common"
@@ -72,31 +73,9 @@
 
 	supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
 		// Plain conversion from Event Value to channel.
-		"map-cluster-membership": func(ctx context.Context) error {
-			supervisor.Signal(ctx, supervisor.SignalHealthy)
-			w := s.clusterMembership.Watch()
-			defer w.Close()
-			for {
-				v, err := w.Get(ctx, FilterHome())
-				if err != nil {
-					return err
-				}
-				clusterMembershipC <- v
-			}
-		},
+		"map-cluster-membership": event.Pipe[*ClusterMembership](s.clusterMembership, clusterMembershipC, FilterHome()),
 		// Plain conversion from Event Value to channel.
-		"map-roles": func(ctx context.Context) error {
-			supervisor.Signal(ctx, supervisor.SignalHealthy)
-			w := s.localRoles.Watch()
-			defer w.Close()
-			for {
-				v, err := w.Get(ctx)
-				if err != nil {
-					return err
-				}
-				rolesC <- v
-			}
-		},
+		"map-roles": event.Pipe[*cpb.NodeRoles](s.localRoles, rolesC),
 		// Provide config from clusterMembership and roles.
 		"reduce-config": func(ctx context.Context) error {
 			supervisor.Signal(ctx, supervisor.SignalHealthy)
diff --git a/metropolis/pkg/event/BUILD.bazel b/metropolis/pkg/event/BUILD.bazel
index 92405ff..e6506b5 100644
--- a/metropolis/pkg/event/BUILD.bazel
+++ b/metropolis/pkg/event/BUILD.bazel
@@ -5,4 +5,5 @@
     srcs = ["event.go"],
     importpath = "source.monogon.dev/metropolis/pkg/event",
     visibility = ["//visibility:public"],
+    deps = ["//metropolis/pkg/supervisor"],
 )
diff --git a/metropolis/pkg/event/event.go b/metropolis/pkg/event/event.go
index ba9190c..0898eb5 100644
--- a/metropolis/pkg/event/event.go
+++ b/metropolis/pkg/event/event.go
@@ -75,6 +75,8 @@
 import (
 	"context"
 	"errors"
+
+	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
 // A Value is an 'Event Value', some piece of data that can be updated ('Set')
@@ -206,3 +208,26 @@
 	// of the requested key has been retrieved.
 	BacklogDone = errors.New("no more backlogged data")
 )
+
+// Pipe a Value's initial state and subsequent updates to an already existing
+// channel in a supervisor.Runnable. This is mostly useful when wanting to select
+// {} on many Values.
+//
+// The given channel will NOT be closed when the runnable exits. The process
+// receiving from the channel should be running in a group with the pipe
+// runnable, so that both restart if either does. This ensures that there is always
+// at least one value in the channel when the receiver starts.
+func Pipe[T any](value Value[T], c chan<- T, opts ...GetOption[T]) supervisor.Runnable {
+	return func(ctx context.Context) error {
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		w := value.Watch()
+		defer w.Close()
+		for {
+			v, err := w.Get(ctx, opts...)
+			if err != nil {
+				return err
+			}
+			c <- v
+		}
+	}
+}