metropolis: move curator client watches to curator/watcher

This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.

Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index ce0b5cc..dd224d5 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -23,6 +23,7 @@
         "//metropolis/node/core/consensus",
         "//metropolis/node/core/curator",
         "//metropolis/node/core/curator/proto/api",
+        "//metropolis/node/core/curator/watcher",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/localstorage",
         "//metropolis/node/core/metrics",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 3140ea8..718c394 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -50,7 +50,6 @@
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
-
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
diff --git a/metropolis/node/core/roleserve/worker_rolefetch.go b/metropolis/node/core/roleserve/worker_rolefetch.go
index 40d2ffd..aaac076 100644
--- a/metropolis/node/core/roleserve/worker_rolefetch.go
+++ b/metropolis/node/core/roleserve/worker_rolefetch.go
@@ -2,10 +2,10 @@
 
 import (
 	"context"
-	"fmt"
 
 	"google.golang.org/protobuf/proto"
 
+	"source.monogon.dev/metropolis/node/core/curator/watcher"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
@@ -70,65 +70,48 @@
 	// Run networked part in a sub-runnable so that network errors don't cause us to
 	// retry the above and make us possibly trigger spurious restarts.
 	supervisor.Run(ctx, "watcher", func(ctx context.Context) error {
-		w := s.curatorConnection.Watch()
-		defer w.Close()
-		cc, err := w.Get(ctx)
+		cw := s.curatorConnection.Watch()
+		defer cw.Close()
+		cc, err := cw.Get(ctx)
 		if err != nil {
 			return err
 		}
 
 		nodeID := cc.nodeID()
 		cur := ipb.NewCuratorClient(cc.conn)
-
-		// Start watch for current node, update localRoles whenever we get something
-		// new.
-		srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
-			NodeInCluster: &ipb.WatchRequest_NodeInCluster{
-				NodeId: nodeID,
-			},
-		}})
-		if err != nil {
-			return fmt.Errorf("watch failed: %w", err)
-		}
-		defer srv.CloseSend()
+		w := watcher.WatchNode(ctx, cur, nodeID)
+		defer w.Close()
 
 		supervisor.Signal(ctx, supervisor.SignalHealthy)
-		for {
-			ev, err := srv.Recv()
-			if err != nil {
-				return fmt.Errorf("watch event receive failed: %w", err)
-			}
-			for _, n := range ev.Nodes {
-				n := n
-				// Skip spuriously sent other nodes.
-				if n.Id != nodeID {
-					continue
-				}
-				supervisor.Logger(ctx).Infof("Got new node data. Roles:")
-				if n.Roles.ConsensusMember != nil {
-					supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
-				}
-				if n.Roles.KubernetesController != nil {
-					supervisor.Logger(ctx).Infof(" - kubernetes controller")
-				}
-				if n.Roles.KubernetesWorker != nil {
-					supervisor.Logger(ctx).Infof(" - kubernetes worker")
-				}
-				s.localRoles.Set(n.Roles)
 
-				// Persist role data to disk.
-				bytes, err := proto.Marshal(n.Roles)
+		// Run watch for current node, update localRoles whenever we get something
+		// new.
+		for w.Next() {
+			n := w.Node()
+			supervisor.Logger(ctx).Infof("Got new node data. Roles:")
+			if n.Roles.ConsensusMember != nil {
+				supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
+			}
+			if n.Roles.KubernetesController != nil {
+				supervisor.Logger(ctx).Infof(" - kubernetes controller")
+			}
+			if n.Roles.KubernetesWorker != nil {
+				supervisor.Logger(ctx).Infof(" - kubernetes worker")
+			}
+			s.localRoles.Set(n.Roles)
+
+			// Persist role data to disk.
+			bytes, err := proto.Marshal(n.Roles)
+			if err != nil {
+				supervisor.Logger(ctx).Errorf("Failed to marshal node roles: %w", err)
+			} else {
+				err = s.storageRoot.Data.Node.PersistedRoles.Write(bytes, 0400)
 				if err != nil {
-					supervisor.Logger(ctx).Errorf("Failed to marshal node roles: %w", err)
-				} else {
-					err = s.storageRoot.Data.Node.PersistedRoles.Write(bytes, 0400)
-					if err != nil {
-						supervisor.Logger(ctx).Errorf("Failed to write node roles: %w", err)
-					}
+					supervisor.Logger(ctx).Errorf("Failed to write node roles: %w", err)
 				}
-				break
 			}
 		}
+		return w.Error()
 	})
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)