metropolis: move curator client watches to curator/watcher
This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.
Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index ce0b5cc..dd224d5 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -23,6 +23,7 @@
"//metropolis/node/core/consensus",
"//metropolis/node/core/curator",
"//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/curator/watcher",
"//metropolis/node/core/identity",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/metrics",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 3140ea8..718c394 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -50,7 +50,6 @@
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/pkg/supervisor"
-
cpb "source.monogon.dev/metropolis/proto/common"
)
diff --git a/metropolis/node/core/roleserve/worker_rolefetch.go b/metropolis/node/core/roleserve/worker_rolefetch.go
index 40d2ffd..aaac076 100644
--- a/metropolis/node/core/roleserve/worker_rolefetch.go
+++ b/metropolis/node/core/roleserve/worker_rolefetch.go
@@ -2,10 +2,10 @@
import (
"context"
- "fmt"
"google.golang.org/protobuf/proto"
+ "source.monogon.dev/metropolis/node/core/curator/watcher"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -70,65 +70,48 @@
// Run networked part in a sub-runnable so that network errors don't cause us to
// retry the above and make us possibly trigger spurious restarts.
supervisor.Run(ctx, "watcher", func(ctx context.Context) error {
- w := s.curatorConnection.Watch()
- defer w.Close()
- cc, err := w.Get(ctx)
+ cw := s.curatorConnection.Watch()
+ defer cw.Close()
+ cc, err := cw.Get(ctx)
if err != nil {
return err
}
nodeID := cc.nodeID()
cur := ipb.NewCuratorClient(cc.conn)
-
- // Start watch for current node, update localRoles whenever we get something
- // new.
- srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
- NodeInCluster: &ipb.WatchRequest_NodeInCluster{
- NodeId: nodeID,
- },
- }})
- if err != nil {
- return fmt.Errorf("watch failed: %w", err)
- }
- defer srv.CloseSend()
+ w := watcher.WatchNode(ctx, cur, nodeID)
+ defer w.Close()
supervisor.Signal(ctx, supervisor.SignalHealthy)
- for {
- ev, err := srv.Recv()
- if err != nil {
- return fmt.Errorf("watch event receive failed: %w", err)
- }
- for _, n := range ev.Nodes {
- n := n
- // Skip spuriously sent other nodes.
- if n.Id != nodeID {
- continue
- }
- supervisor.Logger(ctx).Infof("Got new node data. Roles:")
- if n.Roles.ConsensusMember != nil {
- supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
- }
- if n.Roles.KubernetesController != nil {
- supervisor.Logger(ctx).Infof(" - kubernetes controller")
- }
- if n.Roles.KubernetesWorker != nil {
- supervisor.Logger(ctx).Infof(" - kubernetes worker")
- }
- s.localRoles.Set(n.Roles)
- // Persist role data to disk.
- bytes, err := proto.Marshal(n.Roles)
+ // Run watch for current node, update localRoles whenever we get something
+ // new.
+ for w.Next() {
+ n := w.Node()
+ supervisor.Logger(ctx).Infof("Got new node data. Roles:")
+ if n.Roles.ConsensusMember != nil {
+ supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
+ }
+ if n.Roles.KubernetesController != nil {
+ supervisor.Logger(ctx).Infof(" - kubernetes controller")
+ }
+ if n.Roles.KubernetesWorker != nil {
+ supervisor.Logger(ctx).Infof(" - kubernetes worker")
+ }
+ s.localRoles.Set(n.Roles)
+
+ // Persist role data to disk.
+ bytes, err := proto.Marshal(n.Roles)
+ if err != nil {
+ supervisor.Logger(ctx).Errorf("Failed to marshal node roles: %w", err)
+ } else {
+ err = s.storageRoot.Data.Node.PersistedRoles.Write(bytes, 0400)
if err != nil {
- supervisor.Logger(ctx).Errorf("Failed to marshal node roles: %w", err)
- } else {
- err = s.storageRoot.Data.Node.PersistedRoles.Write(bytes, 0400)
- if err != nil {
- supervisor.Logger(ctx).Errorf("Failed to write node roles: %w", err)
- }
+ supervisor.Logger(ctx).Errorf("Failed to write node roles: %w", err)
}
- break
}
}
+ return w.Error()
})
supervisor.Signal(ctx, supervisor.SignalHealthy)