m/pkg/event: implement Pipe helper
This simplifies some of the roleserver code, and is possible now that
event values are strongly typed.
Change-Id: I0a22ff97fe4304d35cc9cee105e98bc224d1433b
Reviewed-on: https://review.monogon.dev/c/monogon/+/1323
Reviewed-by: Leopold Schabel <leo@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/roleserve/worker_controlplane.go b/metropolis/node/core/roleserve/worker_controlplane.go
index 354e67e..e5c6efb 100644
--- a/metropolis/node/core/roleserve/worker_controlplane.go
+++ b/metropolis/node/core/roleserve/worker_controlplane.go
@@ -16,6 +16,7 @@
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/rpc/resolver"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/pki"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -109,43 +110,11 @@
supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
// Plain conversion from Event Value to channel.
- "map-bootstrap-data": func(ctx context.Context) error {
- w := s.bootstrapData.Watch()
- defer w.Close()
- for {
- v, err := w.Get(ctx)
- if err != nil {
- return err
- }
- bootstrapDataC <- v
- }
- },
+ "map-bootstrap-data": event.Pipe[*bootstrapData](s.bootstrapData, bootstrapDataC),
// Plain conversion from Event Value to channel.
- "map-cluster-membership": func(ctx context.Context) error {
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- w := s.clusterMembership.Watch()
- defer w.Close()
- for {
- v, err := w.Get(ctx, FilterHome())
- if err != nil {
- return err
- }
- clusterMembershipC <- v
- }
- },
+ "map-cluster-membership": event.Pipe[*ClusterMembership](s.clusterMembership, clusterMembershipC, FilterHome()),
// Plain conversion from Event Value to channel.
- "map-roles": func(ctx context.Context) error {
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- w := s.localRoles.Watch()
- defer w.Close()
- for {
- v, err := w.Get(ctx)
- if err != nil {
- return err
- }
- rolesC <- v
- }
- },
+ "map-roles": event.Pipe[*cpb.NodeRoles](s.localRoles, rolesC),
// Provide config from clusterMembership and roles.
"reduce-config": func(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index e31cf59..8e1030c 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -11,6 +11,7 @@
"source.monogon.dev/metropolis/node/kubernetes"
"source.monogon.dev/metropolis/node/kubernetes/containerd"
kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
cpb "source.monogon.dev/metropolis/proto/common"
@@ -72,31 +73,9 @@
supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
// Plain conversion from Event Value to channel.
- "map-cluster-membership": func(ctx context.Context) error {
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- w := s.clusterMembership.Watch()
- defer w.Close()
- for {
- v, err := w.Get(ctx, FilterHome())
- if err != nil {
- return err
- }
- clusterMembershipC <- v
- }
- },
+ "map-cluster-membership": event.Pipe[*ClusterMembership](s.clusterMembership, clusterMembershipC, FilterHome()),
// Plain conversion from Event Value to channel.
- "map-roles": func(ctx context.Context) error {
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- w := s.localRoles.Watch()
- defer w.Close()
- for {
- v, err := w.Get(ctx)
- if err != nil {
- return err
- }
- rolesC <- v
- }
- },
+ "map-roles": event.Pipe[*cpb.NodeRoles](s.localRoles, rolesC),
// Provide config from clusterMembership and roles.
"reduce-config": func(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
diff --git a/metropolis/pkg/event/BUILD.bazel b/metropolis/pkg/event/BUILD.bazel
index 92405ff..e6506b5 100644
--- a/metropolis/pkg/event/BUILD.bazel
+++ b/metropolis/pkg/event/BUILD.bazel
@@ -5,4 +5,5 @@
srcs = ["event.go"],
importpath = "source.monogon.dev/metropolis/pkg/event",
visibility = ["//visibility:public"],
+ deps = ["//metropolis/pkg/supervisor"],
)
diff --git a/metropolis/pkg/event/event.go b/metropolis/pkg/event/event.go
index ba9190c..0898eb5 100644
--- a/metropolis/pkg/event/event.go
+++ b/metropolis/pkg/event/event.go
@@ -75,6 +75,8 @@
import (
"context"
"errors"
+
+ "source.monogon.dev/metropolis/pkg/supervisor"
)
// A Value is an 'Event Value', some piece of data that can be updated ('Set')
@@ -206,3 +208,26 @@
// of the requested key has been retrieved.
BacklogDone = errors.New("no more backlogged data")
)
+
+// Pipe a Value's initial state and subsequent updates to an already existing
+// channel in a supervisor.Runnable. This is mostly useful when wanting to select
+// {} on many Values.
+//
+// The given channel will NOT be closed when the runnable exits. The process
+// receiving from the channel should be running in a group with the pipe
+// runnable, so that both restart if either does. This ensures that there is always
+// at least one value in the channel when the receiver starts.
+func Pipe[T any](value Value[T], c chan<- T, opts ...GetOption[T]) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ w := value.Watch()
+ defer w.Close()
+ for {
+ v, err := w.Get(ctx, opts...)
+ if err != nil {
+ return err
+ }
+ c <- v
+ }
+ }
+}