m/pkg/event: implement Pipe helper
This simplifies some of the roleserver code, and is possible now that
event values are strongly typed.
Change-Id: I0a22ff97fe4304d35cc9cee105e98bc224d1433b
Reviewed-on: https://review.monogon.dev/c/monogon/+/1323
Reviewed-by: Leopold Schabel <leo@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/pkg/event/BUILD.bazel b/metropolis/pkg/event/BUILD.bazel
index 92405ff..e6506b5 100644
--- a/metropolis/pkg/event/BUILD.bazel
+++ b/metropolis/pkg/event/BUILD.bazel
@@ -5,4 +5,5 @@
srcs = ["event.go"],
importpath = "source.monogon.dev/metropolis/pkg/event",
visibility = ["//visibility:public"],
+ deps = ["//metropolis/pkg/supervisor"],
)
diff --git a/metropolis/pkg/event/event.go b/metropolis/pkg/event/event.go
index ba9190c..0898eb5 100644
--- a/metropolis/pkg/event/event.go
+++ b/metropolis/pkg/event/event.go
@@ -75,6 +75,8 @@
import (
"context"
"errors"
+
+ "source.monogon.dev/metropolis/pkg/supervisor"
)
// A Value is an 'Event Value', some piece of data that can be updated ('Set')
@@ -206,3 +208,26 @@
// of the requested key has been retrieved.
BacklogDone = errors.New("no more backlogged data")
)
+
+// Pipe a Value's initial state and subsequent updates to an already existing
+// channel in a supervisor.Runnable. This is mostly useful when wanting to select
+// {} on many Values.
+//
+// The given channel will NOT be closed when the runnable exits. The process
+// receiving from the channel should be running in a group with the pipe
+// runnable, so that both restart if either does. This ensures that there is always
+// at least one value in the channel when the receiver starts.
+func Pipe[T any](value Value[T], c chan<- T, opts ...GetOption[T]) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ w := value.Watch()
+ defer w.Close()
+ for {
+ v, err := w.Get(ctx, opts...)
+ if err != nil {
+ return err
+ }
+ c <- v
+ }
+ }
+}