metropolis: move curator client watches to curator/watcher

This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.

Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/clusternet/clusternet.go b/metropolis/node/core/clusternet/clusternet.go
index 2e01c77..85ac63f 100644
--- a/metropolis/node/core/clusternet/clusternet.go
+++ b/metropolis/node/core/clusternet/clusternet.go
@@ -26,9 +26,11 @@
 	"fmt"
 	"net"
 	"net/netip"
+	"slices"
 
 	"github.com/cenkalti/backoff/v4"
 
+	"source.monogon.dev/metropolis/node/core/curator/watcher"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/pkg/event"
@@ -176,58 +178,45 @@
 func (s *Service) pull(ctx context.Context) error {
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
-	srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
-		Kind: &apb.WatchRequest_NodesInCluster_{
-			NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+	var batch []*apb.Node
+	return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+		FilterFn: func(a *apb.Node) bool {
+			if a.Clusternet == nil {
+				return false
+			}
+			if a.Clusternet.WireguardPubkey == "" {
+				return false
+			}
+			return true
+		},
+		EqualsFn: func(a *apb.Node, b *apb.Node) bool {
+			if a.Status.ExternalAddress != b.Status.ExternalAddress {
+				return false
+			}
+			if a.Clusternet.WireguardPubkey != b.Clusternet.WireguardPubkey {
+				return false
+			}
+			if !slices.Equal(a.Clusternet.Prefixes, b.Clusternet.Prefixes) {
+				return false
+			}
+			return true
+		},
+		OnNewUpdated: func(new *apb.Node) error {
+			batch = append(batch, new)
+			return nil
+		},
+		OnBatchDone: func() error {
+			if err := s.wg.configurePeers(batch); err != nil {
+				supervisor.Logger(ctx).Errorf("nodes couldn't be configured: %v", err)
+			}
+			batch = nil
+			return nil
+		},
+		OnDeleted: func(prev *apb.Node) error {
+			if err := s.wg.unconfigurePeer(prev); err != nil {
+				supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", prev.Id, err)
+			}
+			return nil
 		},
 	})
-	if err != nil {
-		return fmt.Errorf("curator watch failed: %w", err)
-	}
-	defer srv.CloseSend()
-
-	nodes := newNodemap()
-	for {
-		ev, err := srv.Recv()
-		if err != nil {
-			return fmt.Errorf("curator watch recv failed: %w", err)
-		}
-
-		updated, removed := nodes.update(ctx, ev)
-
-		for _, n := range removed {
-			supervisor.Logger(ctx).Infof("Node %s removed, unconfiguring", n.id)
-			if err := s.wg.unconfigurePeer(n.copy()); err != nil {
-				// Do nothing and hope whatever caused this will go away at some point.
-				supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", n.id, err)
-			}
-		}
-		var newNodes []*node
-		for _, n := range updated {
-			newNodes = append(newNodes, n.copy())
-			supervisor.Logger(ctx).Infof("Node %s updated: pk %s, address %s, prefixes %v", n.id, n.pubkey, n.address, n.prefixes)
-		}
-		succeeded := 0
-		if err := s.wg.configurePeers(newNodes); err != nil {
-			// If configuring all nodes at once failed, go node-by-node to make sure we've
-			// done as much as possible.
-			supervisor.Logger(ctx).Warningf("Bulk node update call failed, trying node-by-node..: %v", err)
-			for _, n := range newNodes {
-				if err := s.wg.configurePeers([]*node{n}); err != nil {
-					supervisor.Logger(ctx).Errorf("Node %s failed: %v", n.id, err)
-				} else {
-					succeeded += 1
-				}
-			}
-		} else {
-			succeeded = len(newNodes)
-		}
-
-		if len(newNodes) != 0 {
-			supervisor.Logger(ctx).Infof("Successfully updated %d out of %d nodes", succeeded, len(newNodes))
-
-			numNodes, numPrefixes := nodes.stats()
-			supervisor.Logger(ctx).Infof("Total: %d nodes, %d prefixes.", numNodes, numPrefixes)
-		}
-	}
 }