m/pkg/event: make type-safe
This is a fairly large change which makes use of Go type parameters
(“generics”) to make the event library (and its memory/etcd
implementations) type safe.
Since we now have the event.Value interface strongly typed, we also move
options which were implementation-specific (like BacklogOnly)
to be part of that interface, instead of the previously type-asserted
specific implementations. Use of options that are not handled by a
particular implementation is a runtime error. Expressing this in the
type system is probably not worth the effort.
We also implement Filter to allow offloading some of the functionality previously implemented in type assertion wrappers into the library itself.
In the end, this ends up removing a bunch of type assertion code, at
the cost of a fairly sweeping change. Unfortunately, some of this is due
to IntelliJ suddenly deciding to reformat comments.
Change-Id: I1ca6d93db1b5c4055a21af3fb9e5e3d425c0d86e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1322
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index 4f0ab36..246fe1f 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -20,7 +20,6 @@
"//metropolis/node/core/roleserve",
"//metropolis/node/core/rpc",
"//metropolis/node/core/rpc/resolver",
- "//metropolis/pkg/event/memory",
"//metropolis/pkg/supervisor",
"//metropolis/proto/api",
"//metropolis/proto/common",
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index d5eee03..6254669 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -14,15 +14,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// cluster implements low-level clustering logic, especially logic regarding to
-// bootstrapping, registering into and joining a cluster. Its goal is to provide
-// the rest of the node code with the following:
-// - A mounted plaintext storage.
-// - Node credentials/identity.
-// - A locally running etcd server if the node is supposed to run one, and a
-// client connection to that etcd cluster if so.
-// - The state of the cluster as seen by the node, to enable code to respond to
-// node lifecycle changes.
+// Package cluster implements low-level clustering logic, especially logic
+// regarding to bootstrapping, registering into and joining a cluster. Its goal
+// is to provide the rest of the node code with the following:
+// - A mounted plaintext storage.
+// - Node credentials/identity.
+// - A locally running etcd server if the node is supposed to run one, and a
+// client connection to that etcd cluster if so.
+// - The state of the cluster as seen by the node, to enable code to respond to
+// node lifecycle changes.
package cluster
import (
@@ -43,7 +43,6 @@
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/core/roleserve"
- "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
@@ -62,7 +61,6 @@
storageRoot *localstorage.Root
networkService *network.Service
roleServer *roleserve.Service
- status memory.Value
state
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index b570c69..5daa02f 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -99,6 +99,7 @@
"source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/logtree/unraw"
"source.monogon.dev/metropolis/pkg/pki"
@@ -144,7 +145,7 @@
type Service struct {
config *Config
- value memory.Value
+ value memory.Value[*Status]
ca *pki.Certificate
}
@@ -454,7 +455,7 @@
}
}
- w := s.Watch()
+ w := s.value.Watch()
for {
st, err := w.Get(ctx)
if err != nil {
@@ -473,6 +474,10 @@
}
}
+func (s *Service) Watch() event.Watcher[*Status] {
+ return s.value.Watch()
+}
+
// selfupdater is a runnable that performs a one-shot (once per Service Run,
// thus once for each configuration) update of the node's Peer URL in etcd. This
// is currently only really needed because the first node in the cluster
@@ -482,7 +487,7 @@
// more robust.
func (s *Service) selfupdater(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
- w := s.Watch()
+ w := s.value.Watch()
for {
st, err := w.Get(ctx)
if err != nil {
diff --git a/metropolis/node/core/consensus/status.go b/metropolis/node/core/consensus/status.go
index 992d0ac..17df8f9 100644
--- a/metropolis/node/core/consensus/status.go
+++ b/metropolis/node/core/consensus/status.go
@@ -23,38 +23,12 @@
type ServiceHandle interface {
// Watch returns a Event Value compatible Watcher for accessing the State of the
// consensus Service in a safe manner.
- Watch() Watcher
+ Watch() event.Watcher[*Status]
}
-// Watch returns a Event Value compatible Watcher for accessing the State of the
-// consensus Service in a safe manner.
-func (s *Service) Watch() Watcher {
- return Watcher{s.value.Watch()}
-}
-
-type Watcher struct {
- event.Watcher
-}
-
-func (w *Watcher) Get(ctx context.Context, opts ...event.GetOption) (*Status, error) {
- v, err := w.Watcher.Get(ctx, opts...)
- if err != nil {
- return nil, err
- }
- return v.(*Status), nil
-}
-
-func (w *Watcher) GetRunning(ctx context.Context) (*Status, error) {
- for {
- st, err := w.Get(ctx)
- if err != nil {
- return nil, err
- }
- if st.Running() {
- return st, nil
- }
- }
-}
+var FilterRunning = event.Filter(func(st *Status) bool {
+ return st.Running()
+})
// Status of the consensus service. It represents either a running consensus
// service to which a client can connect and on which management can be
diff --git a/metropolis/node/core/consensus/testhelpers.go b/metropolis/node/core/consensus/testhelpers.go
index f69f73e..8b0b213 100644
--- a/metropolis/node/core/consensus/testhelpers.go
+++ b/metropolis/node/core/consensus/testhelpers.go
@@ -10,7 +10,7 @@
)
type testServiceHandle struct {
- s memory.Value
+ s memory.Value[*Status]
}
// TestServiceHandle builds a somewhat functioning ServiceHandle from a bare
@@ -41,9 +41,5 @@
t.Fatalf("failed to ensure PKI CA: %v", err)
}
tsh.s.Set(st)
- return &tsh
-}
-
-func (h *testServiceHandle) Watch() Watcher {
- return Watcher{h.s.Watch()}
+ return &tsh.s
}
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index be23b55..2283b34 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -66,6 +66,7 @@
"//metropolis/node/core/curator/proto/private",
"//metropolis/node/core/identity",
"//metropolis/node/core/rpc",
+ "//metropolis/pkg/event",
"//metropolis/pkg/logtree",
"//metropolis/pkg/pki",
"//metropolis/pkg/supervisor",
diff --git a/metropolis/node/core/curator/curator.go b/metropolis/node/core/curator/curator.go
index 400f08c..336b7d6 100644
--- a/metropolis/node/core/curator/curator.go
+++ b/metropolis/node/core/curator/curator.go
@@ -25,7 +25,6 @@
"source.monogon.dev/metropolis/node/core/consensus"
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/identity"
- "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -57,7 +56,7 @@
// status is a memory Event Value for keeping the electionStatus of this
// instance. It is not exposed to users of the Curator.
- status memory.Value
+ status memory.Value[*electionStatus]
}
// New creates a new curator Service.
@@ -94,28 +93,6 @@
lock *ppb.LeaderElectionValue
}
-func (s *Service) electionWatch() electionWatcher {
- return electionWatcher{
- Watcher: s.status.Watch(),
- }
-}
-
-// electionWatcher is a type-safe wrapper around event.Watcher which provides
-// electionStatus values.
-type electionWatcher struct {
- event.Watcher
-}
-
-// get retrieves an electionStatus from the electionWatcher.
-func (w *electionWatcher) get(ctx context.Context) (*electionStatus, error) {
- val, err := w.Watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- status := val.(electionStatus)
- return &status, err
-}
-
// buildLockValue returns a serialized etcd value that will be set by the
// instance when it becomes a leader. This value is a serialized
// LeaderElectionValue from private/storage.proto.
@@ -206,7 +183,7 @@
if err := proto.Unmarshal(o.Kvs[0].Value, &lock); err != nil {
return fmt.Errorf("parsing existing lock value failed: %w", err)
}
- s.status.Set(electionStatus{
+ s.status.Set(&electionStatus{
follower: &electionStatusFollower{
lock: &lock,
},
@@ -223,7 +200,7 @@
supervisor.Logger(ctx).Info("Curator became leader.")
// Update status, watchers will now know that this curator is the leader.
- s.status.Set(electionStatus{
+ s.status.Set(&electionStatus{
leader: &electionStatusLeader{
lockKey: election.Key(),
lockRev: election.Rev(),
@@ -246,9 +223,9 @@
// Start local election watcher. This logs what this curator knows about its own
// leadership.
go func() {
- w := s.electionWatch()
+ w := s.status.Watch()
for {
- s, err := w.get(ctx)
+ s, err := w.Get(ctx)
if err != nil {
supervisor.Logger(ctx).Warningf("Election watcher exiting: get(): %v", err)
return
@@ -264,7 +241,7 @@
supervisor.Logger(ctx).Infof("Waiting for consensus...")
w := s.config.Consensus.Watch()
defer w.Close()
- st, err := w.GetRunning(ctx)
+ st, err := w.Get(ctx, consensus.FilterRunning)
if err != nil {
return fmt.Errorf("while waiting for consensus: %w", err)
}
@@ -279,7 +256,7 @@
// or forwarding to a remotely running leader.
lis := listener{
node: s.config.NodeCredentials,
- electionWatch: s.electionWatch,
+ electionWatch: s.status.Watch,
consensus: s.config.Consensus,
etcd: etcd,
}
@@ -297,9 +274,9 @@
supervisor.Signal(ctx, supervisor.SignalHealthy)
for {
- s.status.Set(electionStatus{})
+ s.status.Set(&electionStatus{})
err := s.elect(ctx)
- s.status.Set(electionStatus{})
+ s.status.Set(&electionStatus{})
if err != nil && errors.Is(err, ctx.Err()) {
return fmt.Errorf("election round failed due to context cancelation, not attempting to re-elect: %w", err)
diff --git a/metropolis/node/core/curator/curator_test.go b/metropolis/node/core/curator/curator_test.go
index b1a9671..e9674f5 100644
--- a/metropolis/node/core/curator/curator_test.go
+++ b/metropolis/node/core/curator/curator_test.go
@@ -13,6 +13,7 @@
"source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -131,11 +132,11 @@
// Run a watcher for each dut which sends that dut's newest available
// electionStatus (or error) to updC.
for e, d := range s {
- w := d.instance.electionWatch()
- go func(e string, w electionWatcher) {
+ w := d.instance.status.Watch()
+ go func(e string, w event.Watcher[*electionStatus]) {
defer w.Close()
for {
- s, err := w.get(ctx2)
+ s, err := w.Get(ctx2)
if err != nil {
updC <- dutUpdate{
endpoint: e,
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index a06f8cd..869cfd9 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -15,9 +15,11 @@
tpb "google.golang.org/protobuf/types/known/timestamppb"
common "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/consensus"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/etcd"
"source.monogon.dev/metropolis/pkg/pki"
cpb "source.monogon.dev/metropolis/proto/common"
@@ -74,7 +76,7 @@
defer w.Close()
for {
- v, err := w.Get(ctx)
+ nodeKV, err := w.Get(ctx)
if err != nil {
if rpcErr, ok := rpcError(err); ok {
return rpcErr
@@ -84,7 +86,6 @@
}
ev := &ipb.WatchEvent{}
- nodeKV := v.(nodeAtID)
nodeKV.appendToEvent(ev)
if err := srv.Send(ev); err != nil {
return err
@@ -99,7 +100,7 @@
ctx := srv.Context()
start, end := nodeEtcdPrefix.KeyRange()
- value := etcd.NewValue(l.etcd, start, nodeValueConverter, etcd.Range(end))
+ value := etcd.NewValue[*nodeAtID](l.etcd, start, nodeValueConverter, etcd.Range(end))
w := value.Watch()
defer w.Close()
@@ -107,15 +108,14 @@
// Perform initial fetch from etcd.
nodes := make(map[string]*Node)
for {
- v, err := w.Get(ctx, etcd.BacklogOnly)
- if err == etcd.BacklogDone {
+ nodeKV, err := w.Get(ctx, event.BacklogOnly[*nodeAtID]())
+ if err == event.BacklogDone {
break
}
if err != nil {
rpc.Trace(ctx).Printf("etcd watch failed (initial fetch): %v", err)
return status.Error(codes.Unavailable, "internal error during initial fetch")
}
- nodeKV := v.(nodeAtID)
if nodeKV.value != nil {
nodes[nodeKV.id] = nodeKV.value
}
@@ -148,13 +148,12 @@
// Send updates as they arrive from etcd watcher.
for {
- v, err := w.Get(ctx)
+ nodeKV, err := w.Get(ctx)
if err != nil {
rpc.Trace(ctx).Printf("etcd watch failed (update): %v", err)
return status.Errorf(codes.Unavailable, "internal error during update")
}
we := &ipb.WatchEvent{}
- nodeKV := v.(nodeAtID)
nodeKV.appendToEvent(we)
if err := srv.Send(we); err != nil {
return err
@@ -173,7 +172,7 @@
// nodeValueConverter is called by etcd node value watchers to convert updates
// from the cluster into nodeAtID, ensuring data integrity and checking
// invariants.
-func nodeValueConverter(key, value []byte) (interface{}, error) {
+func nodeValueConverter(key, value []byte) (*nodeAtID, error) {
res := nodeAtID{
id: nodeEtcdPrefix.ExtractID(string(key)),
}
@@ -192,7 +191,7 @@
// panic.
return nil, fmt.Errorf("invalid node key %q", key)
}
- return res, nil
+ return &res, nil
}
// appendToId records a node update represented by nodeAtID into a Curator
@@ -429,7 +428,7 @@
w := l.consensus.Watch()
defer w.Close()
- st, err := w.GetRunning(ctx)
+ st, err := w.Get(ctx, consensus.FilterRunning)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not get running consensus: %v", err)
}
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index b725759..69c1445 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -11,6 +11,7 @@
"google.golang.org/grpc/status"
dpb "google.golang.org/protobuf/types/known/durationpb"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
@@ -325,7 +326,7 @@
w := l.consensus.Watch()
defer w.Close()
- st, err := w.GetRunning(ctx)
+ st, err := w.Get(ctx, consensus.FilterRunning)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not get running consensus: %v", err)
}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index e093d31..2290ab9 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -15,6 +15,7 @@
cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
)
@@ -45,7 +46,7 @@
// listener to use when determining local leadership. As the listener may
// restart on error, this factory-function is used instead of an electionWatcher
// directly.
- electionWatch func() electionWatcher
+ electionWatch func() event.Watcher[*electionStatus]
consensus consensus.ServiceHandle
}
@@ -57,7 +58,7 @@
// waiting for a result.
w := l.electionWatch()
supervisor.Logger(ctx).Infof("Waiting for election status...")
- st, err := w.get(ctx)
+ st, err := w.Get(ctx)
if err != nil {
return fmt.Errorf("could not get election status: %w", err)
}
@@ -141,7 +142,7 @@
case st.leader != nil:
supervisor.Logger(ctx).Infof("Leader running until leadership lost.")
for {
- nst, err := w.get(ctx)
+ nst, err := w.Get(ctx)
if err != nil {
return fmt.Errorf("getting election status after starting listener failed, bailing just in case: %w", err)
}
@@ -152,7 +153,7 @@
case st.follower != nil:
supervisor.Logger(ctx).Infof("Follower running until leadership change.")
for {
- nst, err := w.get(ctx)
+ nst, err := w.Get(ctx)
if err != nil {
return fmt.Errorf("getting election status after starting listener failed, bailing just in case: %w", err)
}
diff --git a/metropolis/node/core/network/hostsfile/hostsfile.go b/metropolis/node/core/network/hostsfile/hostsfile.go
index 8b5eb9a..a4455ec 100644
--- a/metropolis/node/core/network/hostsfile/hostsfile.go
+++ b/metropolis/node/core/network/hostsfile/hostsfile.go
@@ -2,12 +2,12 @@
// files/interfaces used by the system to resolve the local node's name and the
// names of other nodes in the cluster:
//
-// 1. All cluster node names are written into /etc/hosts for DNS resolution.
-// 2. The local node's name is written into /etc/machine-id.
-// 3. The local node's name is set as the UNIX hostname of the machine (via the
-// sethostname call).
-// 4. The local node's ClusterDirectory is updated with the same set of
-// addresses as the one used in /etc/hosts.
+// 1. All cluster node names are written into /etc/hosts for DNS resolution.
+// 2. The local node's name is written into /etc/machine-id.
+// 3. The local node's name is set as the UNIX hostname of the machine (via the
+// sethostname call).
+// 4. The local node's ClusterDirectory is updated with the same set of
+// addresses as the one used in /etc/hosts.
//
// The hostsfile Service can start up in two modes: with cluster connectivity
// and without cluster connectivity. Without cluster connectivity, only
@@ -137,7 +137,7 @@
cmw := s.Roleserver.ClusterMembership.Watch()
defer cmw.Close()
supervisor.Logger(ctx).Infof("Waiting for node ID...")
- nodeID, err := cmw.GetNodeID(ctx)
+ nodeID, err := roleserve.GetNodeID(ctx, cmw)
if err != nil {
return err
}
@@ -261,7 +261,7 @@
defer cmw.Close()
supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := cmw.GetHome(ctx)
+ cm, err := cmw.Get(ctx, roleserve.FilterHome())
if err != nil {
return err
}
diff --git a/metropolis/node/core/network/main.go b/metropolis/node/core/network/main.go
index 45de6b2..8740d5a 100644
--- a/metropolis/node/core/network/main.go
+++ b/metropolis/node/core/network/main.go
@@ -55,7 +55,7 @@
natTable *nftables.Table
natPostroutingChain *nftables.Chain
- status memory.Value
+ status memory.Value[*Status]
}
func New() *Service {
@@ -76,35 +76,12 @@
DNSServers dhcp4c.DNSServers
}
-// Watcher allows network Service consumers to watch for updates of the current
-// Status.
-type Watcher struct {
- watcher event.Watcher
-}
-
-// Get returns the newest network Status from a Watcher. It will block until a
-// new Status is available.
-func (w *Watcher) Get(ctx context.Context) (*Status, error) {
- val, err := w.watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- status := val.(Status)
- return &status, err
-}
-
-func (w *Watcher) Close() error {
- return w.watcher.Close()
-}
-
// Watch returns a Watcher, which can be used by consumers of the network
// Service to retrieve the current network status.
// Close must be called on the Watcher when it is not used anymore in order to
// prevent goroutine leaks.
-func (s *Service) Watch() Watcher {
- return Watcher{
- watcher: s.status.Watch(),
- }
+func (s *Service) Watch() event.Watcher[*Status] {
+ return s.status.Watch()
}
// ConfigureDNS sets a DNS ExtraDirective on the built-in DNS server of the
@@ -134,7 +111,7 @@
s.ConfigureDNS(dns.NewUpstreamDirective(newServers))
}
// Notify status waiters.
- s.status.Set(Status{
+ s.status.Set(&Status{
ExternalAddress: new.AssignedIP,
DNSServers: new.DNSServers(),
})
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 5a0e2f6..0d0997d 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -6,19 +6,18 @@
// cluster's curator, updates the status of the node within the curator, and
// spawns on-demand services.
//
-//
-// .-----------. .--------. Watches .------------.
-// | Cluster |--------->| Role |<----------| Node Roles |
-// | Enrolment | Provides | Server | Updates '------------'
-// '-----------' Data | |----. .-------------.
-// '--------' '----->| Node Status |
-// Spawns | | Spawns '-------------'
-// .-----' '-----.
-// V V
-// .-----------. .------------.
-// | Consensus | | Kubernetes |
-// | & Curator | | |
-// '-----------' '------------'
+// .-----------. .--------. Watches .------------.
+// | Cluster |--------->| Role |<----------| Node Roles |
+// | Enrolment | Provides | Server | Updates '------------'
+// '-----------' Data | |----. .-------------.
+// '--------' '----->| Node Status |
+// Spawns | | Spawns '-------------'
+// .-----' '-----.
+// V V
+// .-----------. .------------.
+// | Consensus | | Kubernetes |
+// | & Curator | | |
+// '-----------' '------------'
//
// The internal state of the Role Server (eg. status of services, input from
// Cluster Enrolment, current node roles as retrieved from the cluster) is
@@ -38,7 +37,6 @@
// or remote). It is updated both by external processes (ie. data from the
// Cluster Enrolment) as well as logic responsible for spawning the control
// plane.
-//
package roleserve
import (
@@ -50,6 +48,7 @@
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/core/rpc/resolver"
+ "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -76,10 +75,10 @@
type Service struct {
Config
- ClusterMembership ClusterMembershipValue
- KubernetesStatus KubernetesStatusValue
- bootstrapData bootstrapDataValue
- localRoles localRolesValue
+ ClusterMembership memory.Value[*ClusterMembership]
+ KubernetesStatus memory.Value[*KubernetesStatus]
+ bootstrapData memory.Value[*bootstrapData]
+ localRoles memory.Value[*cpb.NodeRoles]
controlPlane *workerControlPlane
statusPush *workerStatusPush
@@ -141,11 +140,11 @@
// available on the loopback interface.
s.Resolver.AddOverride(nid, resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
- s.ClusterMembership.set(&ClusterMembership{
+ s.ClusterMembership.Set(&ClusterMembership{
pubkey: pubkey,
resolver: s.Resolver,
})
- s.bootstrapData.set(&bootstrapData{
+ s.bootstrapData.Set(&bootstrapData{
nodePrivateKey: privkey,
initialOwnerKey: iok,
clusterUnlockKey: cuk,
@@ -159,7 +158,7 @@
// available on the loopback interface.
s.Resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
- s.ClusterMembership.set(&ClusterMembership{
+ s.ClusterMembership.Set(&ClusterMembership{
remoteCurators: directory,
credentials: &credentials,
pubkey: credentials.PublicKey(),
@@ -172,7 +171,7 @@
// available on the loopback interface.
s.Resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
- s.ClusterMembership.set(&ClusterMembership{
+ s.ClusterMembership.Set(&ClusterMembership{
remoteCurators: directory,
credentials: &credentials,
pubkey: credentials.PublicKey(),
diff --git a/metropolis/node/core/roleserve/value_bootstrapdata.go b/metropolis/node/core/roleserve/value_bootstrapdata.go
index 85618bc..29a6ae2 100644
--- a/metropolis/node/core/roleserve/value_bootstrapdata.go
+++ b/metropolis/node/core/roleserve/value_bootstrapdata.go
@@ -1,11 +1,7 @@
package roleserve
import (
- "context"
"crypto/ed25519"
-
- "source.monogon.dev/metropolis/pkg/event"
- "source.monogon.dev/metropolis/pkg/event/memory"
)
// bootstrapData is an internal EventValue structure which is populated by the
@@ -19,29 +15,3 @@
initialOwnerKey []byte
nodePrivateJoinKey ed25519.PrivateKey
}
-
-type bootstrapDataValue struct {
- value memory.Value
-}
-
-func (c *bootstrapDataValue) Watch() *bootstrapDataWatcher {
- return &bootstrapDataWatcher{
- Watcher: c.value.Watch(),
- }
-}
-
-func (c *bootstrapDataValue) set(v *bootstrapData) {
- c.value.Set(v)
-}
-
-type bootstrapDataWatcher struct {
- event.Watcher
-}
-
-func (c *bootstrapDataWatcher) Get(ctx context.Context) (*bootstrapData, error) {
- v, err := c.Watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- return v.(*bootstrapData), nil
-}
diff --git a/metropolis/node/core/roleserve/value_clustermembership.go b/metropolis/node/core/roleserve/value_clustermembership.go
index f14e2c4..e956d10 100644
--- a/metropolis/node/core/roleserve/value_clustermembership.go
+++ b/metropolis/node/core/roleserve/value_clustermembership.go
@@ -13,7 +13,6 @@
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/node/core/rpc/resolver"
"source.monogon.dev/metropolis/pkg/event"
- "source.monogon.dev/metropolis/pkg/event/memory"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -53,44 +52,14 @@
resolver *resolver.Resolver
}
-type ClusterMembershipValue struct {
- value memory.Value
-}
-
-func (c *ClusterMembershipValue) Watch() *ClusterMembershipWatcher {
- return &ClusterMembershipWatcher{
- w: c.value.Watch(),
- }
-}
-
-func (c *ClusterMembershipValue) set(v *ClusterMembership) {
- c.value.Set(v)
-}
-
-type ClusterMembershipWatcher struct {
- w event.Watcher
-}
-
-func (c *ClusterMembershipWatcher) Close() error {
- return c.w.Close()
-}
-
-func (c *ClusterMembershipWatcher) getAny(ctx context.Context) (*ClusterMembership, error) {
- v, err := c.w.Get(ctx)
- if err != nil {
- return nil, err
- }
- return v.(*ClusterMembership), nil
-}
-
// GetNodeID returns the Node ID of the locally running node whenever available.
// NodeIDs are available early on in the node startup process and are guaranteed
// to never change at runtime. The watcher will then block all further Get calls
// until new information is available. This method should only be used if
// GetNodeID is the only method ran on the watcher.
-func (c *ClusterMembershipWatcher) GetNodeID(ctx context.Context) (string, error) {
+func GetNodeID(ctx context.Context, watcher event.Watcher[*ClusterMembership]) (string, error) {
for {
- cm, err := c.getAny(ctx)
+ cm, err := watcher.Get(ctx)
if err != nil {
return "", err
}
@@ -105,20 +74,16 @@
// the cluster's Curator). See proto.common.ClusterState for more information
// about cluster states. The watcher will then block all futher Get calls until
// new information is available.
-func (c *ClusterMembershipWatcher) GetHome(ctx context.Context) (*ClusterMembership, error) {
- for {
- cm, err := c.getAny(ctx)
- if err != nil {
- return nil, err
- }
+func FilterHome() event.GetOption[*ClusterMembership] {
+ return event.Filter(func(cm *ClusterMembership) bool {
if cm.credentials == nil {
- continue
+ return false
}
if cm.remoteCurators == nil {
- continue
+ return false
}
- return cm, nil
- }
+ return true
+ })
}
// DialCurator returns an authenticated gRPC client connection to the Curator
diff --git a/metropolis/node/core/roleserve/value_kubernetes.go b/metropolis/node/core/roleserve/value_kubernetes.go
index 4334dc3..88bdfeb 100644
--- a/metropolis/node/core/roleserve/value_kubernetes.go
+++ b/metropolis/node/core/roleserve/value_kubernetes.go
@@ -1,11 +1,7 @@
package roleserve
import (
- "context"
-
"source.monogon.dev/metropolis/node/kubernetes"
- "source.monogon.dev/metropolis/pkg/event"
- "source.monogon.dev/metropolis/pkg/event/memory"
)
// KubernetesStatus is an Event Value structure populated by a running
@@ -14,33 +10,3 @@
type KubernetesStatus struct {
Svc *kubernetes.Service
}
-
-type KubernetesStatusValue struct {
- value memory.Value
-}
-
-func (k *KubernetesStatusValue) Watch() *KubernetesStatusWatcher {
- return &KubernetesStatusWatcher{
- Watcher: k.value.Watch(),
- }
-}
-
-func (k *KubernetesStatusValue) set(v *KubernetesStatus) {
- k.value.Set(v)
-}
-
-type KubernetesStatusWatcher struct {
- event.Watcher
-}
-
-// Get waits until the Kubernetes services is available. The returned
-// KubernetesStatus is guaranteed to contain a kubernetes.Service that was
-// running at the time of this call returning (but which might have since been
-// stopped).
-func (k *KubernetesStatusWatcher) Get(ctx context.Context) (*KubernetesStatus, error) {
- v, err := k.Watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- return v.(*KubernetesStatus), nil
-}
diff --git a/metropolis/node/core/roleserve/value_node.go b/metropolis/node/core/roleserve/value_node.go
index f095f2f..26050ee 100644
--- a/metropolis/node/core/roleserve/value_node.go
+++ b/metropolis/node/core/roleserve/value_node.go
@@ -1,36 +1 @@
package roleserve
-
-import (
- "context"
-
- "source.monogon.dev/metropolis/pkg/event"
- "source.monogon.dev/metropolis/pkg/event/memory"
- cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-type localRolesValue struct {
- value memory.Value
-}
-
-func (c *localRolesValue) Watch() *localRolesWatcher {
- return &localRolesWatcher{
- Watcher: c.value.Watch(),
- }
-}
-
-func (c *localRolesValue) set(v *cpb.NodeRoles) {
- c.value.Set(v)
-}
-
-type localRolesWatcher struct {
- event.Watcher
-}
-
-// Get retrieves the roles assigned to the local node by the cluster.
-func (c *localRolesWatcher) Get(ctx context.Context) (*cpb.NodeRoles, error) {
- v, err := c.Watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- return v.(*cpb.NodeRoles), nil
-}
diff --git a/metropolis/node/core/roleserve/worker_controlplane.go b/metropolis/node/core/roleserve/worker_controlplane.go
index 5c8c4b2..354e67e 100644
--- a/metropolis/node/core/roleserve/worker_controlplane.go
+++ b/metropolis/node/core/roleserve/worker_controlplane.go
@@ -44,11 +44,11 @@
storageRoot *localstorage.Root
// bootstrapData will be read.
- bootstrapData *bootstrapDataValue
+ bootstrapData *memory.Value[*bootstrapData]
// clusterMembership will be read and written.
- clusterMembership *ClusterMembershipValue
+ clusterMembership *memory.Value[*ClusterMembership]
// localRoles will be read.
- localRoles *localRolesValue
+ localRoles *memory.Value[*cpb.NodeRoles]
// resolver will be read and used to populate ClusterMembership.
resolver *resolver.Resolver
}
@@ -98,7 +98,7 @@
// |
// NodeRoles -M-> rolesC --------------'
//
- var startupV memory.Value
+ var startupV memory.Value[*controlPlaneStartup]
// Channels are used as intermediaries between map stages and the final reduce,
// which is okay as long as the entire tree restarts simultaneously (which we
@@ -126,7 +126,7 @@
w := s.clusterMembership.Watch()
defer w.Close()
for {
- v, err := w.GetHome(ctx)
+ v, err := w.Get(ctx, FilterHome())
if err != nil {
return err
}
@@ -264,11 +264,10 @@
// Read config from startupV.
w := startupV.Watch()
defer w.Close()
- startupI, err := w.Get(ctx)
+ startup, err := w.Get(ctx)
if err != nil {
return err
}
- startup := startupI.(*controlPlaneStartup)
// Start Control Plane if we have a config.
if startup.consensusConfig == nil {
@@ -424,7 +423,7 @@
// We now have a locally running ControlPlane. Reflect that in a new
// ClusterMembership.
- s.clusterMembership.set(&ClusterMembership{
+ s.clusterMembership.Set(&ClusterMembership{
localConsensus: con,
localCurator: cur,
credentials: creds,
@@ -440,11 +439,10 @@
// Not restarting on every single change prevents us from going in a
// ClusterMembership -> ClusterDirectory -> ClusterMembership thrashing loop.
for {
- ncI, err := w.Get(ctx)
+ nc, err := w.Get(ctx)
if err != nil {
return err
}
- nc := ncI.(*controlPlaneStartup)
if nc.changed(startup) {
supervisor.Logger(ctx).Infof("Configuration changed, restarting...")
return fmt.Errorf("config changed, restarting")
diff --git a/metropolis/node/core/roleserve/worker_heartbeat.go b/metropolis/node/core/roleserve/worker_heartbeat.go
index db06845..fdaa9be 100644
--- a/metropolis/node/core/roleserve/worker_heartbeat.go
+++ b/metropolis/node/core/roleserve/worker_heartbeat.go
@@ -9,6 +9,7 @@
"source.monogon.dev/metropolis/node/core/curator"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -18,7 +19,7 @@
network *network.Service
// clusterMembership will be read.
- clusterMembership *ClusterMembershipValue
+ clusterMembership *memory.Value[*ClusterMembership]
}
func (s *workerHeartbeat) run(ctx context.Context) error {
@@ -28,7 +29,7 @@
w := s.clusterMembership.Watch()
defer w.Close()
supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := w.GetHome(ctx)
+ cm, err := w.Get(ctx, FilterHome())
if err != nil {
return err
}
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index 3be3c41..e31cf59 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -5,6 +5,7 @@
"fmt"
"net"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/kubernetes"
@@ -26,9 +27,9 @@
network *network.Service
storageRoot *localstorage.Root
- localRoles *localRolesValue
- clusterMembership *ClusterMembershipValue
- kubernetesStatus *KubernetesStatusValue
+ localRoles *memory.Value[*cpb.NodeRoles]
+ clusterMembership *memory.Value[*ClusterMembership]
+ kubernetesStatus *memory.Value[*KubernetesStatus]
}
// kubernetesStartup is used internally to provide a reduced (as in MapReduce
@@ -64,7 +65,7 @@
// |
// NodeRoles -M-> rolesC --------------'
//
- var startupV memory.Value
+ var startupV memory.Value[*kubernetesStartup]
clusterMembershipC := make(chan *ClusterMembership)
rolesC := make(chan *cpb.NodeRoles)
@@ -76,7 +77,7 @@
w := s.clusterMembership.Watch()
defer w.Close()
for {
- v, err := w.GetHome(ctx)
+ v, err := w.Get(ctx, FilterHome())
if err != nil {
return err
}
@@ -127,11 +128,11 @@
// KubernetesController local role.
var d *kubernetesStartup
for {
- dV, err := w.Get(ctx)
+ var err error
+ d, err = w.Get(ctx)
if err != nil {
return err
}
- d = dV.(*kubernetesStartup)
supervisor.Logger(ctx).Infof("Got new startup data.")
if d.roles.KubernetesController == nil {
supervisor.Logger(ctx).Infof("No Kubernetes role, not starting.")
@@ -144,10 +145,10 @@
break
}
- supervisor.Logger(ctx).Infof("Waiting for local consensus...")
+ supervisor.Logger(ctx).Infof("Waiting for local consensus (%+v)...")
cstW := d.membership.localConsensus.Watch()
defer cstW.Close()
- cst, err := cstW.GetRunning(ctx)
+ cst, err := cstW.Get(ctx, consensus.FilterRunning)
if err != nil {
return fmt.Errorf("waiting for local consensus: %w", err)
}
@@ -197,7 +198,7 @@
}
// Let downstream know that Kubernetes is running.
- s.kubernetesStatus.set(&KubernetesStatus{
+ s.kubernetesStatus.Set(&KubernetesStatus{
Svc: kubeSvc,
})
@@ -206,11 +207,10 @@
// Restart everything if we get a significantly different config (ie., a config
// whose change would/should either turn up or tear down Kubernetes).
for {
- ncI, err := w.Get(ctx)
+ nc, err := w.Get(ctx)
if err != nil {
return err
}
- nc := ncI.(*kubernetesStartup)
if nc.changed(d) {
supervisor.Logger(ctx).Errorf("watcher got new config, restarting")
return fmt.Errorf("restarting")
diff --git a/metropolis/node/core/roleserve/worker_rolefetch.go b/metropolis/node/core/roleserve/worker_rolefetch.go
index 3856f62..5f64676 100644
--- a/metropolis/node/core/roleserve/worker_rolefetch.go
+++ b/metropolis/node/core/roleserve/worker_rolefetch.go
@@ -5,24 +5,26 @@
"fmt"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
)
// workerRoleFetch is the Role Fetcher, an internal bookkeeping service
// responsible for populating localRoles based on a clusterMembership whenever
// the node is HOME and cluster credentials / curator access is available.
type workerRoleFetch struct {
- clusterMembership *ClusterMembershipValue
+ clusterMembership *memory.Value[*ClusterMembership]
// localRoles will be written.
- localRoles *localRolesValue
+ localRoles *memory.Value[*cpb.NodeRoles]
}
func (s *workerRoleFetch) run(ctx context.Context) error {
w := s.clusterMembership.Watch()
defer w.Close()
supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := w.GetHome(ctx)
+ cm, err := w.Get(ctx, FilterHome())
if err != nil {
return err
}
@@ -70,7 +72,7 @@
if n.Roles.KubernetesWorker != nil {
supervisor.Logger(ctx).Infof(" - kubernetes worker")
}
- s.localRoles.set(n.Roles)
+ s.localRoles.Set(n.Roles)
break
}
}
diff --git a/metropolis/node/core/roleserve/worker_statuspush.go b/metropolis/node/core/roleserve/worker_statuspush.go
index 732508a..ed806a0 100644
--- a/metropolis/node/core/roleserve/worker_statuspush.go
+++ b/metropolis/node/core/roleserve/worker_statuspush.go
@@ -10,6 +10,7 @@
common "source.monogon.dev/metropolis/node"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -20,7 +21,7 @@
network *network.Service
// clusterMembership will be read.
- clusterMembership *ClusterMembershipValue
+ clusterMembership *memory.Value[*ClusterMembership]
}
// workerStatusPushChannels contain all the channels between the status pusher's
@@ -142,7 +143,7 @@
defer w.Close()
supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
for {
- cm, err := w.GetHome(ctx)
+ cm, err := w.Get(ctx, FilterHome())
if err != nil {
return fmt.Errorf("getting cluster membership status failed: %w", err)
}