metropolis: move curator client watches to curator/watcher

This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.

Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/metrics/discovery.go b/metropolis/node/core/metrics/discovery.go
index 037d3b0..d5a992b 100644
--- a/metropolis/node/core/metrics/discovery.go
+++ b/metropolis/node/core/metrics/discovery.go
@@ -7,23 +7,22 @@
 	"net/http"
 	"sync"
 
-	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-
+	"source.monogon.dev/go/types/mapsets"
+	"source.monogon.dev/metropolis/node/core/curator/watcher"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 )
 
 type Discovery struct {
 	Curator ipb.CuratorClient
 
 	// sdResp will contain the cached sdResponse
-	sdResp sdResponse
+	sdResp mapsets.OrderedMap[string, sdTarget]
 	// sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
 	sdRespMtx sync.RWMutex
 }
 
-type sdResponse []sdTarget
-
 type sdTarget struct {
 	Targets []string          `json:"targets"`
 	Labels  map[string]string `json:"labels"`
@@ -33,73 +32,63 @@
 func (s *Discovery) Run(ctx context.Context) error {
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
-	srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
-		Kind: &apb.WatchRequest_NodesInCluster_{
-			NodesInCluster: &apb.WatchRequest_NodesInCluster{},
-		},
-	})
-	if err != nil {
-		return fmt.Errorf("curator watch failed: %w", err)
-	}
-	defer srv.CloseSend()
-
 	defer func() {
 		s.sdRespMtx.Lock()
 		// disable the metrics endpoint until the new routine takes over
-		s.sdResp = nil
+		s.sdResp.Clear()
 		s.sdRespMtx.Unlock()
 	}()
 
-	nodes := make(map[string]*apb.Node)
-	for {
-		ev, err := srv.Recv()
-		if err != nil {
-			// The watcher wont return a properly wrapped error which confuses
-			// our testing harness. Lets just return the context error directly
-			// if it exists.
-			if ctx.Err() != nil {
-				return ctx.Err()
+	return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+		FilterFn: func(a *ipb.Node) bool {
+			if a.Status == nil {
+				return false
 			}
-
-			return fmt.Errorf("curator watch recv failed: %w", err)
-		}
-
-		for _, n := range ev.Nodes {
-			nodes[n.Id] = n
-		}
-
-		for _, t := range ev.NodeTombstones {
-			n, ok := nodes[t.NodeId]
-			if !ok {
-				// This is an indication of us losing data somehow. If this happens, it likely
-				// means a Curator bug.
-				supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
-				continue
+			if a.Status.ExternalAddress == "" {
+				return false
 			}
-			delete(nodes, n.Id)
-		}
-
-		s.sdRespMtx.Lock()
-
-		s.sdResp = nil
-		for _, n := range nodes {
-			// Only care about nodes that have all required configuration set.
-			if n.Status == nil || n.Status.ExternalAddress == "" || n.Roles == nil {
-				continue
+			if a.Roles == nil {
+				return false
 			}
+			return true
+		},
+		EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
+			if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
+				return false
+			}
+			if (a.Roles.KubernetesController == nil) != (b.Roles.KubernetesController == nil) {
+				return false
+			}
+			if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
+				return false
+			}
+			if a.Status.ExternalAddress != b.Status.ExternalAddress {
+				return false
+			}
+			return true
+		},
+		OnNewUpdated: func(new *ipb.Node) error {
+			s.sdRespMtx.Lock()
+			defer s.sdRespMtx.Unlock()
 
-			s.sdResp = append(s.sdResp, sdTarget{
-				Targets: []string{n.Status.ExternalAddress},
+			s.sdResp.Insert(new.Id, sdTarget{
+				Targets: []string{new.Status.ExternalAddress},
 				Labels: map[string]string{
-					"__meta_metropolis_role_kubernetes_worker":     fmt.Sprintf("%t", n.Roles.KubernetesWorker != nil),
-					"__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", n.Roles.KubernetesController != nil),
-					"__meta_metropolis_role_consensus_member":      fmt.Sprintf("%t", n.Roles.ConsensusMember != nil),
+					"__meta_metropolis_role_kubernetes_worker":     fmt.Sprintf("%t", new.Roles.KubernetesWorker != nil),
+					"__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", new.Roles.KubernetesController != nil),
+					"__meta_metropolis_role_consensus_member":      fmt.Sprintf("%t", new.Roles.ConsensusMember != nil),
 				},
 			})
-		}
+			return nil
+		},
+		OnDeleted: func(prev *ipb.Node) error {
+			s.sdRespMtx.Lock()
+			defer s.sdRespMtx.Unlock()
 
-		s.sdRespMtx.Unlock()
-	}
+			s.sdResp.Delete(prev.Id)
+			return nil
+		},
+	})
 }
 
 func (s *Discovery) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -111,9 +100,10 @@
 	s.sdRespMtx.RLock()
 	defer s.sdRespMtx.RUnlock()
 
-	// If sdResp is nil, which only happens if we are not a master node
-	// or we are still booting, we respond with NotImplemented.
-	if s.sdResp == nil {
+	// If sdResp is empty, respond with Service Unavailable. This will only happen
+	// early enough in the lifecycle of a control plane node that it doesn't know
+	// about itself, or if this is not a control plane node:
+	if s.sdResp.Count() == 0 {
 		w.WriteHeader(http.StatusServiceUnavailable)
 		return
 	}
@@ -121,7 +111,12 @@
 	w.Header().Set("Content-Type", "application/json")
 	w.WriteHeader(http.StatusOK)
 
-	if err := json.NewEncoder(w).Encode(s.sdResp); err != nil {
+	// Turn into a plain array as expected by the service discovery API.
+	var res []sdTarget
+	for _, v := range s.sdResp.Values() {
+		res = append(res, v.Value)
+	}
+	if err := json.NewEncoder(w).Encode(res); err != nil {
 		// If the encoder fails its mostly because of closed connections
 		// so lets just ignore these errors.
 		return