metropolis: move curator client watches to curator/watcher

This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.

Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/kubernetes/apiproxy.go b/metropolis/node/kubernetes/apiproxy.go
index 9f9c851..d937824 100644
--- a/metropolis/node/kubernetes/apiproxy.go
+++ b/metropolis/node/kubernetes/apiproxy.go
@@ -2,12 +2,12 @@
 
 import (
 	"context"
-	"fmt"
 	"net"
 
 	"source.monogon.dev/go/net/tinylb"
 	"source.monogon.dev/metropolis/node"
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/node/core/curator/watcher"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 )
 
@@ -15,38 +15,36 @@
 // the currently known nodes running a Kubernetes apiserver as retrieved from the
 // given curator client.
 func updateLoadbalancerAPIServers(ctx context.Context, val *memory.Value[tinylb.BackendSet], cur ipb.CuratorClient) error {
-	w, err := cur.Watch(ctx, &ipb.WatchRequest{
-		Kind: &ipb.WatchRequest_NodesInCluster_{
-			NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
-		},
-	})
-	if err != nil {
-		return fmt.Errorf("watch failed: %w", err)
-	}
-	defer w.CloseSend()
-
 	set := &tinylb.BackendSet{}
 	val.Set(set.Clone())
-	for {
-		ev, err := w.Recv()
-		if err != nil {
-			return fmt.Errorf("receive failed: %w", err)
-		}
 
-		for _, n := range ev.Nodes {
-			if n.Status == nil || n.Status.ExternalAddress == "" {
-				set.Delete(n.Id)
-				continue
+	return watcher.WatchNodes(ctx, cur, watcher.SimpleFollower{
+		FilterFn: func(a *ipb.Node) bool {
+			if a.Status == nil {
+				return false
 			}
-			if n.Roles.KubernetesController == nil {
-				set.Delete(n.Id)
-				continue
+			if a.Status.ExternalAddress == "" {
+				return false
 			}
-			set.Insert(n.Id, &tinylb.SimpleTCPBackend{Remote: net.JoinHostPort(n.Status.ExternalAddress, node.KubernetesAPIPort.PortString())})
-		}
-		for _, t := range ev.NodeTombstones {
-			set.Delete(t.NodeId)
-		}
-		val.Set(set.Clone())
-	}
+			if a.Roles.KubernetesController == nil {
+				return false
+			}
+			return true
+		},
+		EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
+			return a.Status.ExternalAddress == b.Status.ExternalAddress
+		},
+		OnNewUpdated: func(new *ipb.Node) error {
+			set.Insert(new.Id, &tinylb.SimpleTCPBackend{
+				Remote: net.JoinHostPort(new.Status.ExternalAddress, node.KubernetesAPIPort.PortString()),
+			})
+			val.Set(set.Clone())
+			return nil
+		},
+		OnDeleted: func(prev *ipb.Node) error {
+			set.Delete(prev.Id)
+			val.Set(set.Clone())
+			return nil
+		},
+	})
 }