m/pkg/event: implement Pipe helper

This simplifies some of the roleserver code, and is possible now that
event values are strongly typed.

Change-Id: I0a22ff97fe4304d35cc9cee105e98bc224d1433b
Reviewed-on: https://review.monogon.dev/c/monogon/+/1323
Reviewed-by: Leopold Schabel <leo@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/pkg/event/BUILD.bazel b/metropolis/pkg/event/BUILD.bazel
index 92405ff..e6506b5 100644
--- a/metropolis/pkg/event/BUILD.bazel
+++ b/metropolis/pkg/event/BUILD.bazel
@@ -5,4 +5,5 @@
     srcs = ["event.go"],
     importpath = "source.monogon.dev/metropolis/pkg/event",
     visibility = ["//visibility:public"],
+    deps = ["//metropolis/pkg/supervisor"],
 )
diff --git a/metropolis/pkg/event/event.go b/metropolis/pkg/event/event.go
index ba9190c..0898eb5 100644
--- a/metropolis/pkg/event/event.go
+++ b/metropolis/pkg/event/event.go
@@ -75,6 +75,8 @@
 import (
 	"context"
 	"errors"
+
+	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
 // A Value is an 'Event Value', some piece of data that can be updated ('Set')
@@ -206,3 +208,26 @@
 	// of the requested key has been retrieved.
 	BacklogDone = errors.New("no more backlogged data")
 )
+
+// Pipe a Value's initial state and subsequent updates to an already existing
+// channel in a supervisor.Runnable. This is mostly useful when wanting to select
+// {} on many Values.
+//
+// The given channel will NOT be closed when the runnable exits. The process
+// receiving from the channel should be running in a group with the pipe
+// runnable, so that both restart if either does. This ensures that there is always
+// at least one value in the channel when the receiver starts.
+func Pipe[T any](value Value[T], c chan<- T, opts ...GetOption[T]) supervisor.Runnable {
+	return func(ctx context.Context) error {
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		w := value.Watch()
+		defer w.Close()
+		for {
+			v, err := w.Get(ctx, opts...)
+			if err != nil {
+				return err
+			}
+			c <- v
+		}
+	}
+}