metropolis: move curator client watches to curator/watcher

This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.

Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/clusternet/BUILD.bazel b/metropolis/node/core/clusternet/BUILD.bazel
index a5fc41a..1ccce66 100644
--- a/metropolis/node/core/clusternet/BUILD.bazel
+++ b/metropolis/node/core/clusternet/BUILD.bazel
@@ -13,6 +13,7 @@
     deps = [
         "//metropolis/node",
         "//metropolis/node/core/curator/proto/api",
+        "//metropolis/node/core/curator/watcher",
         "//metropolis/node/core/localstorage",
         "//metropolis/node/core/network",
         "//metropolis/pkg/event",
@@ -27,10 +28,7 @@
 
 go_test(
     name = "clusternet_test",
-    srcs = [
-        "clusternet_test.go",
-        "types_test.go",
-    ],
+    srcs = ["clusternet_test.go"],
     embed = [":clusternet"],
     deps = [
         "//metropolis/node",
diff --git a/metropolis/node/core/clusternet/clusternet.go b/metropolis/node/core/clusternet/clusternet.go
index 2e01c77..85ac63f 100644
--- a/metropolis/node/core/clusternet/clusternet.go
+++ b/metropolis/node/core/clusternet/clusternet.go
@@ -26,9 +26,11 @@
 	"fmt"
 	"net"
 	"net/netip"
+	"slices"
 
 	"github.com/cenkalti/backoff/v4"
 
+	"source.monogon.dev/metropolis/node/core/curator/watcher"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/pkg/event"
@@ -176,58 +178,45 @@
 func (s *Service) pull(ctx context.Context) error {
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
-	srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
-		Kind: &apb.WatchRequest_NodesInCluster_{
-			NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+	var batch []*apb.Node
+	return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+		FilterFn: func(a *apb.Node) bool {
+			if a.Clusternet == nil {
+				return false
+			}
+			if a.Clusternet.WireguardPubkey == "" {
+				return false
+			}
+			return true
+		},
+		EqualsFn: func(a *apb.Node, b *apb.Node) bool {
+			if a.Status.ExternalAddress != b.Status.ExternalAddress {
+				return false
+			}
+			if a.Clusternet.WireguardPubkey != b.Clusternet.WireguardPubkey {
+				return false
+			}
+			if !slices.Equal(a.Clusternet.Prefixes, b.Clusternet.Prefixes) {
+				return false
+			}
+			return true
+		},
+		OnNewUpdated: func(new *apb.Node) error {
+			batch = append(batch, new)
+			return nil
+		},
+		OnBatchDone: func() error {
+			if err := s.wg.configurePeers(batch); err != nil {
+				supervisor.Logger(ctx).Errorf("nodes couldn't be configured: %v", err)
+			}
+			batch = nil
+			return nil
+		},
+		OnDeleted: func(prev *apb.Node) error {
+			if err := s.wg.unconfigurePeer(prev); err != nil {
+				supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", prev.Id, err)
+			}
+			return nil
 		},
 	})
-	if err != nil {
-		return fmt.Errorf("curator watch failed: %w", err)
-	}
-	defer srv.CloseSend()
-
-	nodes := newNodemap()
-	for {
-		ev, err := srv.Recv()
-		if err != nil {
-			return fmt.Errorf("curator watch recv failed: %w", err)
-		}
-
-		updated, removed := nodes.update(ctx, ev)
-
-		for _, n := range removed {
-			supervisor.Logger(ctx).Infof("Node %s removed, unconfiguring", n.id)
-			if err := s.wg.unconfigurePeer(n.copy()); err != nil {
-				// Do nothing and hope whatever caused this will go away at some point.
-				supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", n.id, err)
-			}
-		}
-		var newNodes []*node
-		for _, n := range updated {
-			newNodes = append(newNodes, n.copy())
-			supervisor.Logger(ctx).Infof("Node %s updated: pk %s, address %s, prefixes %v", n.id, n.pubkey, n.address, n.prefixes)
-		}
-		succeeded := 0
-		if err := s.wg.configurePeers(newNodes); err != nil {
-			// If configuring all nodes at once failed, go node-by-node to make sure we've
-			// done as much as possible.
-			supervisor.Logger(ctx).Warningf("Bulk node update call failed, trying node-by-node..: %v", err)
-			for _, n := range newNodes {
-				if err := s.wg.configurePeers([]*node{n}); err != nil {
-					supervisor.Logger(ctx).Errorf("Node %s failed: %v", n.id, err)
-				} else {
-					succeeded += 1
-				}
-			}
-		} else {
-			succeeded = len(newNodes)
-		}
-
-		if len(newNodes) != 0 {
-			supervisor.Logger(ctx).Infof("Successfully updated %d out of %d nodes", succeeded, len(newNodes))
-
-			numNodes, numPrefixes := nodes.stats()
-			supervisor.Logger(ctx).Infof("Total: %d nodes, %d prefixes.", numNodes, numPrefixes)
-		}
-	}
 }
diff --git a/metropolis/node/core/clusternet/clusternet_test.go b/metropolis/node/core/clusternet/clusternet_test.go
index bda1459..b92cfb2 100644
--- a/metropolis/node/core/clusternet/clusternet_test.go
+++ b/metropolis/node/core/clusternet/clusternet_test.go
@@ -4,7 +4,8 @@
 	"fmt"
 	"net"
 	"os"
-	"strings"
+	"slices"
+	"sort"
 	"sync"
 	"testing"
 	"time"
@@ -21,6 +22,7 @@
 	"source.monogon.dev/metropolis/test/util"
 
 	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
 // fakeWireguard implements wireguard while keeping peer information internally.
@@ -28,7 +30,7 @@
 	k wgtypes.Key
 
 	muNodes        sync.Mutex
-	nodes          map[string]*node
+	nodes          map[string]*apb.Node
 	failNextUpdate bool
 }
 
@@ -40,27 +42,29 @@
 func (f *fakeWireguard) setup(clusterNet *net.IPNet) error {
 	f.muNodes.Lock()
 	defer f.muNodes.Unlock()
-	f.nodes = make(map[string]*node)
+	f.nodes = make(map[string]*apb.Node)
 	return nil
 }
 
-func (f *fakeWireguard) configurePeers(nodes []*node) error {
+func (f *fakeWireguard) configurePeers(nodes []*apb.Node) error {
 	f.muNodes.Lock()
 	defer f.muNodes.Unlock()
+
 	if f.failNextUpdate {
 		f.failNextUpdate = false
 		return fmt.Errorf("synthetic test failure")
 	}
+
 	for _, n := range nodes {
-		f.nodes[n.id] = n
+		f.nodes[n.Id] = n
 	}
 	return nil
 }
 
-func (f *fakeWireguard) unconfigurePeer(n *node) error {
+func (f *fakeWireguard) unconfigurePeer(node *apb.Node) error {
 	f.muNodes.Lock()
 	defer f.muNodes.Unlock()
-	delete(f.nodes, n.id)
+	delete(f.nodes, node.Id)
 	return nil
 }
 
@@ -105,7 +109,7 @@
 	}
 	supervisor.TestHarness(t, svc.Run)
 
-	checkState := func(nodes map[string]*node) error {
+	checkState := func(nodes map[string]*apb.Node) error {
 		t.Helper()
 		wg.muNodes.Lock()
 		defer wg.muNodes.Unlock()
@@ -114,16 +118,23 @@
 			if !ok {
 				return fmt.Errorf("node %q missing in programmed peers", nid)
 			}
-			if n2.pubkey != n.pubkey {
-				return fmt.Errorf("node %q pubkey mismatch: %q in programmed peers, %q wanted", nid, n2.pubkey, n.pubkey)
+			if got, want := n2.Clusternet.WireguardPubkey, n.Clusternet.WireguardPubkey; got != want {
+				return fmt.Errorf("node %q pubkey mismatch: %q in programmed peers, %q wanted", nid, got, want)
 			}
-			if n2.address != n.address {
-				return fmt.Errorf("node %q address mismatch: %q in programmed peers, %q wanted", nid, n2.address, n.address)
+			if got, want := n2.Status.ExternalAddress, n.Status.ExternalAddress; got != want {
+				return fmt.Errorf("node %q address mismatch: %q in programmed peers, %q wanted", nid, got, want)
 			}
-			p := strings.Join(n.prefixes, ",")
-			p2 := strings.Join(n2.prefixes, ",")
-			if p != p2 {
-				return fmt.Errorf("node %q prefixes mismatch: %v in programmed peers, %v wanted", nid, n2.prefixes, n.prefixes)
+			var p, p2 []string
+			for _, prefix := range n.Clusternet.Prefixes {
+				p = append(p, prefix.Cidr)
+			}
+			for _, prefix := range n2.Clusternet.Prefixes {
+				p2 = append(p2, prefix.Cidr)
+			}
+			sort.Strings(p)
+			sort.Strings(p2)
+			if !slices.Equal(p, p2) {
+				return fmt.Errorf("node %q prefixes mismatch: %v in programmed peers, %v wanted", nid, p2, p)
 			}
 		}
 		for nid, _ := range wg.nodes {
@@ -134,7 +145,7 @@
 		return nil
 	}
 
-	assertStateEventual := func(nodes map[string]*node) {
+	assertStateEventual := func(nodes map[string]*apb.Node) {
 		t.Helper()
 		deadline := time.Now().Add(5 * time.Second)
 		for {
@@ -152,34 +163,46 @@
 
 	// Start with a single node.
 	cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.4")
-	assertStateEventual(map[string]*node{
+	assertStateEventual(map[string]*apb.Node{
 		"metropolis-fake-1": {
-			pubkey:   key1.PublicKey().String(),
-			address:  "1.2.3.4",
-			prefixes: nil,
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.4",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key1.PublicKey().String(),
+			},
 		},
 	})
 	// Change the node's peer address.
 	cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5")
-	assertStateEventual(map[string]*node{
+	assertStateEventual(map[string]*apb.Node{
 		"metropolis-fake-1": {
-			pubkey:   key1.PublicKey().String(),
-			address:  "1.2.3.5",
-			prefixes: nil,
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.5",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key1.PublicKey().String(),
+			},
 		},
 	})
 	// Add another node.
 	cur.NodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6")
-	assertStateEventual(map[string]*node{
+	assertStateEventual(map[string]*apb.Node{
 		"metropolis-fake-1": {
-			pubkey:   key1.PublicKey().String(),
-			address:  "1.2.3.5",
-			prefixes: nil,
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.5",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key1.PublicKey().String(),
+			},
 		},
 		"metropolis-fake-2": {
-			pubkey:   key2.PublicKey().String(),
-			address:  "1.2.3.6",
-			prefixes: nil,
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.6",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key2.PublicKey().String(),
+			},
 		},
 	})
 	// Add some prefixes to both nodes, but fail the next configurePeers call.
@@ -188,30 +211,42 @@
 	wg.muNodes.Unlock()
 	cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5", "10.100.10.0/24", "10.100.20.0/24")
 	cur.NodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6", "10.100.30.0/24", "10.100.40.0/24")
-	assertStateEventual(map[string]*node{
+	assertStateEventual(map[string]*apb.Node{
 		"metropolis-fake-1": {
-			pubkey:  key1.PublicKey().String(),
-			address: "1.2.3.5",
-			prefixes: []string{
-				"10.100.10.0/24", "10.100.20.0/24",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.5",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key1.PublicKey().String(),
+				// No prefixes as the call failed.
 			},
 		},
 		"metropolis-fake-2": {
-			pubkey:  key2.PublicKey().String(),
-			address: "1.2.3.6",
-			prefixes: []string{
-				"10.100.30.0/24", "10.100.40.0/24",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.6",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key2.PublicKey().String(),
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.100.30.0/24"},
+					{Cidr: "10.100.40.0/24"},
+				},
 			},
 		},
 	})
 	// Delete one of the nodes.
 	cur.DeleteNode("metropolis-fake-1")
-	assertStateEventual(map[string]*node{
+	assertStateEventual(map[string]*apb.Node{
 		"metropolis-fake-2": {
-			pubkey:  key2.PublicKey().String(),
-			address: "1.2.3.6",
-			prefixes: []string{
-				"10.100.30.0/24", "10.100.40.0/24",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.6",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key2.PublicKey().String(),
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.100.30.0/24"},
+					{Cidr: "10.100.40.0/24"},
+				},
 			},
 		},
 	})
@@ -289,21 +324,31 @@
 	if err != nil {
 		t.Fatalf("Failed to generate private key: %v", err)
 	}
-	err = wg.configurePeers([]*node{
+	err = wg.configurePeers([]*apb.Node{
 		{
-			pubkey:  pkeys[0].PublicKey().String(),
-			address: "10.100.0.1",
-			prefixes: []string{
-				"10.0.0.0/24",
-				"10.0.1.0/24",
+			Id: "test-0",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "10.100.0.1",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: pkeys[0].PublicKey().String(),
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.0.0.0/24"},
+					{Cidr: "10.0.1.0/24"},
+				},
 			},
 		},
 		{
-			pubkey:  pkeys[1].PublicKey().String(),
-			address: "10.100.1.1",
-			prefixes: []string{
-				"10.1.0.0/24",
-				"10.1.1.0/24",
+			Id: "test-1",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "10.100.1.1",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: pkeys[1].PublicKey().String(),
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.1.0.0/24"},
+					{Cidr: "10.1.1.0/24"},
+				},
 			},
 		},
 	})
@@ -338,17 +383,22 @@
 	}
 
 	// Update one of the peers and check that things got applied.
-	err = wg.configurePeers([]*node{
+	err = wg.configurePeers([]*apb.Node{
 		{
-			pubkey:  pkeys[0].PublicKey().String(),
-			address: "10.100.0.3",
-			prefixes: []string{
-				"10.0.0.0/24",
+			Id: "test-0",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "10.100.0.3",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: pkeys[0].PublicKey().String(),
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.0.0.0/24"},
+				},
 			},
 		},
 	})
 	if err != nil {
-		t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+		t.Fatalf("Failed to update peer: %v", err)
 	}
 	wgDev, err = wgClient.Device(clusterNetDeviceName)
 	if err != nil {
@@ -373,14 +423,18 @@
 	}
 
 	// Remove one of the peers and make sure it's gone.
-	err = wg.unconfigurePeer(&node{
-		pubkey: pkeys[0].PublicKey().String(),
+	err = wg.unconfigurePeer(&apb.Node{
+		Clusternet: &cpb.NodeClusterNetworking{
+			WireguardPubkey: pkeys[0].PublicKey().String(),
+		},
 	})
 	if err != nil {
 		t.Fatalf("Failed to unconfigure peer: %v", err)
 	}
-	err = wg.unconfigurePeer(&node{
-		pubkey: pkeys[0].PublicKey().String(),
+	err = wg.unconfigurePeer(&apb.Node{
+		Clusternet: &cpb.NodeClusterNetworking{
+			WireguardPubkey: pkeys[0].PublicKey().String(),
+		},
 	})
 	if err != nil {
 		t.Fatalf("Failed to unconfigure peer a second time: %v", err)
diff --git a/metropolis/node/core/clusternet/types.go b/metropolis/node/core/clusternet/types.go
index 088cf8e..67e401a 100644
--- a/metropolis/node/core/clusternet/types.go
+++ b/metropolis/node/core/clusternet/types.go
@@ -1,13 +1,10 @@
 package clusternet
 
 import (
-	"context"
 	"net/netip"
 	"sort"
 	"strings"
 
-	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	"source.monogon.dev/metropolis/pkg/supervisor"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
@@ -59,131 +56,3 @@
 func (p *Prefixes) Equal(o *Prefixes) bool {
 	return p.String() == o.String()
 }
-
-// node is used for internal statekeeping in the cluster networking service.
-type node struct {
-	id       string
-	pubkey   string
-	address  string
-	prefixes []string
-}
-
-func (n *node) copy() *node {
-	n2 := *n
-	return &n2
-}
-
-// update mutates this node to whatever data is held in the given proto Node, and
-// returns true if any data changed.
-func (n *node) update(p *apb.Node) (changed bool) {
-	if n.address != p.Status.ExternalAddress {
-		n.address = p.Status.ExternalAddress
-		changed = true
-	}
-	if n.pubkey != p.Clusternet.WireguardPubkey {
-		n.pubkey = p.Clusternet.WireguardPubkey
-		changed = true
-	}
-
-	var newPrefixes []string
-	for _, prefix := range p.Clusternet.Prefixes {
-		if prefix.Cidr == "" {
-			continue
-		}
-		newPrefixes = append(newPrefixes, prefix.Cidr)
-	}
-	oldPrefixes := make([]string, len(n.prefixes))
-	copy(oldPrefixes[:], n.prefixes)
-
-	sort.Strings(newPrefixes)
-	sort.Strings(oldPrefixes)
-	if want, got := strings.Join(newPrefixes, ","), strings.Join(oldPrefixes, ","); want != got {
-		n.prefixes = newPrefixes
-		changed = true
-	}
-
-	return
-}
-
-// nodeMap is the main internal statekeeping structure of the pull sub-runnable.
-type nodeMap struct {
-	nodes map[string]*node
-}
-
-func newNodemap() *nodeMap {
-	return &nodeMap{
-		nodes: make(map[string]*node),
-	}
-}
-
-func (n *nodeMap) stats() (nodes int, prefixes int) {
-	nodes = len(n.nodes)
-
-	for _, node := range n.nodes {
-		prefixes += len(node.prefixes)
-	}
-
-	return
-}
-
-// update updates the nodeMap from the given Curator WatchEvent, interpreting
-// both node changes and deletions. Two nodeMaps are returned: the first one
-// contains only nodes that have been added/changed by the given event, the other
-// contains only nodes that have been deleted by the given event.
-func (m *nodeMap) update(ctx context.Context, ev *apb.WatchEvent) (changed, removed map[string]*node) {
-	changed = make(map[string]*node)
-	removed = make(map[string]*node)
-
-	// Make sure we're not getting multiple nodes with the same public key. This is
-	// not expected to happen in practice as the Curator should prevent this from
-	// happening, but we at least want to make sure we're not blowing up routing if
-	// other defenses fail.
-
-	// pkeys maps from public key to node ID.
-	pkeys := make(map[string]string)
-	for _, n := range m.nodes {
-		// We don't have to check whether we have any collisions already in m.nodes, as
-		// the check below prevents them from happening in the first place.
-		pkeys[n.pubkey] = n.id
-	}
-
-	for _, n := range ev.Nodes {
-		// Only care about nodes that have all required configuration set.
-		if n.Status == nil || n.Status.ExternalAddress == "" || n.Clusternet == nil || n.Clusternet.WireguardPubkey == "" {
-			// We could attempt to delete any matching node currently in this nodemap at this
-			// point, but this is likely transient and we don't want to just kill routing for
-			// no reason.
-			continue
-		}
-
-		key := n.Clusternet.WireguardPubkey
-		if id, ok := pkeys[key]; ok && id != n.Id {
-			supervisor.Logger(ctx).Warningf("Nodes %q and %q share wireguard key %q. That should not have happened.", n.Id, id, key)
-			continue
-		}
-
-		if _, ok := m.nodes[n.Id]; !ok {
-			m.nodes[n.Id] = &node{
-				id: n.Id,
-			}
-		}
-		diff := m.nodes[n.Id].update(n)
-		if diff {
-			changed[n.Id] = m.nodes[n.Id]
-		}
-	}
-
-	for _, t := range ev.NodeTombstones {
-		n, ok := m.nodes[t.NodeId]
-		if !ok {
-			// This is an indication of us losing data somehow. If this happens, it likely
-			// means a Curator bug.
-			supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
-			continue
-		}
-		removed[n.id] = n
-		delete(m.nodes, n.id)
-	}
-
-	return
-}
diff --git a/metropolis/node/core/clusternet/types_test.go b/metropolis/node/core/clusternet/types_test.go
deleted file mode 100644
index 8f58b4e..0000000
--- a/metropolis/node/core/clusternet/types_test.go
+++ /dev/null
@@ -1,104 +0,0 @@
-package clusternet
-
-import (
-	"testing"
-
-	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-func TestNodeUpdate(t *testing.T) {
-	var n node
-
-	for i, te := range []struct {
-		p    *apb.Node
-		want bool
-	}{
-		// Case 0: empty incoming node, no change.
-		{
-			p:    &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{}},
-			want: false,
-		},
-		// Case 1: wireguard key set.
-		{
-			p:    &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake"}},
-			want: true,
-		},
-		// Case 2: wireguard key updated.
-		{
-			p:    &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
-			want: true,
-		},
-		// Case 3: external address added.
-		{
-			p:    &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.4"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
-			want: true,
-		},
-		// Case 4: external address changed.
-		{
-			p:    &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
-			want: true,
-		},
-		// Case 5: prefixes added
-		{
-			p: &apb.Node{
-				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
-				Clusternet: &cpb.NodeClusterNetworking{
-					WireguardPubkey: "fake2",
-					Prefixes: []*cpb.NodeClusterNetworking_Prefix{
-						{Cidr: "10.0.2.0/24"},
-						{Cidr: "10.0.1.0/24"},
-					},
-				},
-			},
-			want: true,
-		},
-		// Case 6: prefixes changed
-		{
-			p: &apb.Node{
-				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
-				Clusternet: &cpb.NodeClusterNetworking{
-					WireguardPubkey: "fake2",
-					Prefixes: []*cpb.NodeClusterNetworking_Prefix{
-						{Cidr: "10.0.3.0/24"},
-						{Cidr: "10.0.1.0/24"},
-					},
-				},
-			},
-			want: true,
-		},
-		// Case 6: prefixes reordered (no change expected)
-		{
-			p: &apb.Node{
-				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
-				Clusternet: &cpb.NodeClusterNetworking{
-					WireguardPubkey: "fake2",
-					Prefixes: []*cpb.NodeClusterNetworking_Prefix{
-						{Cidr: "10.0.3.0/24"},
-						{Cidr: "10.0.1.0/24"},
-					},
-				},
-			},
-			want: false,
-		},
-		// Case 6: prefixes removed
-		{
-			p: &apb.Node{
-				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
-				Clusternet: &cpb.NodeClusterNetworking{
-					WireguardPubkey: "fake2",
-					Prefixes:        []*cpb.NodeClusterNetworking_Prefix{},
-				},
-			},
-			want: true,
-		},
-	} {
-		got := n.update(te.p)
-		if te.want && !got {
-			t.Fatalf("Case %d: expected change, got no change", i)
-		}
-		if !te.want && got {
-			t.Fatalf("Case %d: expected no change, got change", i)
-		}
-	}
-}
diff --git a/metropolis/node/core/clusternet/wireguard.go b/metropolis/node/core/clusternet/wireguard.go
index 9ce6d49..4c38c79 100644
--- a/metropolis/node/core/clusternet/wireguard.go
+++ b/metropolis/node/core/clusternet/wireguard.go
@@ -10,6 +10,7 @@
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 
 	common "source.monogon.dev/metropolis/node"
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 )
 
@@ -28,8 +29,8 @@
 type wireguard interface {
 	ensureOnDiskKey(dir *localstorage.DataKubernetesClusterNetworkingDirectory) error
 	setup(clusterNet *net.IPNet) error
-	configurePeers(n []*node) error
-	unconfigurePeer(n *node) error
+	configurePeers(nodes []*ipb.Node) error
+	unconfigurePeer(n *ipb.Node) error
 	key() wgtypes.Key
 	close()
 }
@@ -114,31 +115,26 @@
 	return nil
 }
 
-// configurePeers creates or updates a peers on the local wireguard interface
+// configurePeers creates or updates peers on the local wireguard interface
 // based on the given nodes.
-//
-// If any node is somehow invalid and causes a parse/reconfiguration error, the
-// function will return an error. The caller should retry with a different set of
-// nodes, performing search/bisection on its own.
-func (s *localWireguard) configurePeers(nodes []*node) error {
+func (s *localWireguard) configurePeers(nodes []*ipb.Node) error {
 	var configs []wgtypes.PeerConfig
-
 	for i, n := range nodes {
-		if s.privKey.PublicKey().String() == n.pubkey {
+		if s.privKey.PublicKey().String() == n.Clusternet.WireguardPubkey {
 			// Node doesn't need to connect to itself
 			continue
 		}
-		pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+		pubkeyParsed, err := wgtypes.ParseKey(n.Clusternet.WireguardPubkey)
 		if err != nil {
-			return fmt.Errorf("node %d: failed to parse public-key %q: %w", i, n.pubkey, err)
+			return fmt.Errorf("node %d: failed to parse public-key %q: %w", i, n.Clusternet.WireguardPubkey, err)
 		}
-		addressParsed := net.ParseIP(n.address)
+		addressParsed := net.ParseIP(n.Status.ExternalAddress)
 		if addressParsed == nil {
-			return fmt.Errorf("node %d: failed to parse address %q: %w", i, n.address, err)
+			return fmt.Errorf("node %d: failed to parse address %q: %w", i, n.Status.ExternalAddress, err)
 		}
 		var allowedIPs []net.IPNet
-		for _, prefix := range n.prefixes {
-			_, podNet, err := net.ParseCIDR(prefix)
+		for _, prefix := range n.Clusternet.Prefixes {
+			_, podNet, err := net.ParseCIDR(prefix.Cidr)
 			if err != nil {
 				// Just eat the parse error. Not much we can do here. We have enough validation
 				// in the rest of the system that we shouldn't ever reach this.
@@ -167,10 +163,10 @@
 // unconfigurePeer removes the peer from the local WireGuard interface based on
 // the given node. If no peer existed matching the given node, this operation is
 // a no-op.
-func (s *localWireguard) unconfigurePeer(n *node) error {
-	pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+func (s *localWireguard) unconfigurePeer(n *ipb.Node) error {
+	pubkeyParsed, err := wgtypes.ParseKey(n.Clusternet.WireguardPubkey)
 	if err != nil {
-		return fmt.Errorf("failed to parse public-key %q: %w", n.pubkey, err)
+		return fmt.Errorf("failed to parse public-key %q: %w", n.Clusternet.WireguardPubkey, err)
 	}
 
 	err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
diff --git a/metropolis/node/core/metrics/BUILD.bazel b/metropolis/node/core/metrics/BUILD.bazel
index 88cdc1b..6483377 100644
--- a/metropolis/node/core/metrics/BUILD.bazel
+++ b/metropolis/node/core/metrics/BUILD.bazel
@@ -10,8 +10,10 @@
     importpath = "source.monogon.dev/metropolis/node/core/metrics",
     visibility = ["//visibility:public"],
     deps = [
+        "//go/types/mapsets",
         "//metropolis/node",
         "//metropolis/node/core/curator/proto/api",
+        "//metropolis/node/core/curator/watcher",
         "//metropolis/node/core/identity",
         "//metropolis/pkg/supervisor",
     ],
diff --git a/metropolis/node/core/metrics/discovery.go b/metropolis/node/core/metrics/discovery.go
index 037d3b0..d5a992b 100644
--- a/metropolis/node/core/metrics/discovery.go
+++ b/metropolis/node/core/metrics/discovery.go
@@ -7,23 +7,22 @@
 	"net/http"
 	"sync"
 
-	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-
+	"source.monogon.dev/go/types/mapsets"
+	"source.monogon.dev/metropolis/node/core/curator/watcher"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 )
 
 type Discovery struct {
 	Curator ipb.CuratorClient
 
 	// sdResp will contain the cached sdResponse
-	sdResp sdResponse
+	sdResp mapsets.OrderedMap[string, sdTarget]
 	// sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
 	sdRespMtx sync.RWMutex
 }
 
-type sdResponse []sdTarget
-
 type sdTarget struct {
 	Targets []string          `json:"targets"`
 	Labels  map[string]string `json:"labels"`
@@ -33,73 +32,63 @@
 func (s *Discovery) Run(ctx context.Context) error {
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
-	srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
-		Kind: &apb.WatchRequest_NodesInCluster_{
-			NodesInCluster: &apb.WatchRequest_NodesInCluster{},
-		},
-	})
-	if err != nil {
-		return fmt.Errorf("curator watch failed: %w", err)
-	}
-	defer srv.CloseSend()
-
 	defer func() {
 		s.sdRespMtx.Lock()
 		// disable the metrics endpoint until the new routine takes over
-		s.sdResp = nil
+		s.sdResp.Clear()
 		s.sdRespMtx.Unlock()
 	}()
 
-	nodes := make(map[string]*apb.Node)
-	for {
-		ev, err := srv.Recv()
-		if err != nil {
-			// The watcher wont return a properly wrapped error which confuses
-			// our testing harness. Lets just return the context error directly
-			// if it exists.
-			if ctx.Err() != nil {
-				return ctx.Err()
+	return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+		FilterFn: func(a *ipb.Node) bool {
+			if a.Status == nil {
+				return false
 			}
-
-			return fmt.Errorf("curator watch recv failed: %w", err)
-		}
-
-		for _, n := range ev.Nodes {
-			nodes[n.Id] = n
-		}
-
-		for _, t := range ev.NodeTombstones {
-			n, ok := nodes[t.NodeId]
-			if !ok {
-				// This is an indication of us losing data somehow. If this happens, it likely
-				// means a Curator bug.
-				supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
-				continue
+			if a.Status.ExternalAddress == "" {
+				return false
 			}
-			delete(nodes, n.Id)
-		}
-
-		s.sdRespMtx.Lock()
-
-		s.sdResp = nil
-		for _, n := range nodes {
-			// Only care about nodes that have all required configuration set.
-			if n.Status == nil || n.Status.ExternalAddress == "" || n.Roles == nil {
-				continue
+			if a.Roles == nil {
+				return false
 			}
+			return true
+		},
+		EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
+			if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
+				return false
+			}
+			if (a.Roles.KubernetesController == nil) != (b.Roles.KubernetesController == nil) {
+				return false
+			}
+			if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
+				return false
+			}
+			if a.Status.ExternalAddress != b.Status.ExternalAddress {
+				return false
+			}
+			return true
+		},
+		OnNewUpdated: func(new *ipb.Node) error {
+			s.sdRespMtx.Lock()
+			defer s.sdRespMtx.Unlock()
 
-			s.sdResp = append(s.sdResp, sdTarget{
-				Targets: []string{n.Status.ExternalAddress},
+			s.sdResp.Insert(new.Id, sdTarget{
+				Targets: []string{new.Status.ExternalAddress},
 				Labels: map[string]string{
-					"__meta_metropolis_role_kubernetes_worker":     fmt.Sprintf("%t", n.Roles.KubernetesWorker != nil),
-					"__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", n.Roles.KubernetesController != nil),
-					"__meta_metropolis_role_consensus_member":      fmt.Sprintf("%t", n.Roles.ConsensusMember != nil),
+					"__meta_metropolis_role_kubernetes_worker":     fmt.Sprintf("%t", new.Roles.KubernetesWorker != nil),
+					"__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", new.Roles.KubernetesController != nil),
+					"__meta_metropolis_role_consensus_member":      fmt.Sprintf("%t", new.Roles.ConsensusMember != nil),
 				},
 			})
-		}
+			return nil
+		},
+		OnDeleted: func(prev *ipb.Node) error {
+			s.sdRespMtx.Lock()
+			defer s.sdRespMtx.Unlock()
 
-		s.sdRespMtx.Unlock()
-	}
+			s.sdResp.Delete(prev.Id)
+			return nil
+		},
+	})
 }
 
 func (s *Discovery) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -111,9 +100,10 @@
 	s.sdRespMtx.RLock()
 	defer s.sdRespMtx.RUnlock()
 
-	// If sdResp is nil, which only happens if we are not a master node
-	// or we are still booting, we respond with NotImplemented.
-	if s.sdResp == nil {
+	// If sdResp is empty, respond with Service Unavailable. This will only happen
+	// early enough in the lifecycle of a control plane node that it doesn't know
+	// about itself, or if this is not a control plane node:
+	if s.sdResp.Count() == 0 {
 		w.WriteHeader(http.StatusServiceUnavailable)
 		return
 	}
@@ -121,7 +111,12 @@
 	w.Header().Set("Content-Type", "application/json")
 	w.WriteHeader(http.StatusOK)
 
-	if err := json.NewEncoder(w).Encode(s.sdResp); err != nil {
+	// Turn into a plain array as expected by the service discovery API.
+	var res []sdTarget
+	for _, v := range s.sdResp.Values() {
+		res = append(res, v.Value)
+	}
+	if err := json.NewEncoder(w).Encode(res); err != nil {
 		// If the encoder fails its mostly because of closed connections
 		// so lets just ignore these errors.
 		return
diff --git a/metropolis/node/core/network/hostsfile/BUILD.bazel b/metropolis/node/core/network/hostsfile/BUILD.bazel
index e86850e..51b7f4f 100644
--- a/metropolis/node/core/network/hostsfile/BUILD.bazel
+++ b/metropolis/node/core/network/hostsfile/BUILD.bazel
@@ -7,6 +7,7 @@
     visibility = ["//visibility:public"],
     deps = [
         "//metropolis/node/core/curator/proto/api",
+        "//metropolis/node/core/curator/watcher",
         "//metropolis/node/core/localstorage",
         "//metropolis/node/core/network",
         "//metropolis/pkg/event",
diff --git a/metropolis/node/core/network/hostsfile/hostsfile.go b/metropolis/node/core/network/hostsfile/hostsfile.go
index 4407169..660cf3b 100644
--- a/metropolis/node/core/network/hostsfile/hostsfile.go
+++ b/metropolis/node/core/network/hostsfile/hostsfile.go
@@ -27,6 +27,7 @@
 	"google.golang.org/grpc"
 	"google.golang.org/protobuf/proto"
 
+	"source.monogon.dev/metropolis/node/core/curator/watcher"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/pkg/event"
@@ -270,41 +271,38 @@
 // reflects the up-to-date view of the cluster returned from the Curator Watch
 // call, including any node deletions.
 func (s *Service) runCluster(ctx context.Context) error {
-	w, err := s.Curator.Watch(ctx, &ipb.WatchRequest{
-		Kind: &ipb.WatchRequest_NodesInCluster_{
-			NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
+	nodes := make(nodeMap)
+	return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+		FilterFn: func(a *ipb.Node) bool {
+			if a.Status == nil || a.Status.ExternalAddress == "" {
+				return false
+			}
+			return true
+		},
+		EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
+			return a.Status.ExternalAddress == b.Status.ExternalAddress
+		},
+		OnNewUpdated: func(new *ipb.Node) error {
+			nodes[new.Id] = nodeInfo{
+				address:      new.Status.ExternalAddress,
+				local:        false,
+				controlPlane: new.Roles.ConsensusMember != nil,
+			}
+			return nil
+		},
+		OnDeleted: func(prev *ipb.Node) error {
+			delete(nodes, prev.Id)
+			return nil
+		},
+		OnBatchDone: func() error {
+			// Send copy over channel, as we need to decouple the gRPC receiver from the main
+			// processing loop of the worker.
+			nodesCopy := make(nodeMap)
+			for k, v := range nodes {
+				nodesCopy[k] = v
+			}
+			s.clusterC <- nodesCopy
+			return nil
 		},
 	})
-	if err != nil {
-		return fmt.Errorf("curator watch failed: %w", err)
-	}
-
-	nodes := make(nodeMap)
-	for {
-		ev, err := w.Recv()
-		if err != nil {
-			return fmt.Errorf("receive failed: %w", err)
-		}
-		for _, n := range ev.Nodes {
-			if n.Status == nil || n.Status.ExternalAddress == "" {
-				continue
-			}
-			nodes[n.Id] = nodeInfo{
-				address:      n.Status.ExternalAddress,
-				local:        false,
-				controlPlane: n.Roles.ConsensusMember != nil,
-			}
-		}
-		for _, t := range ev.NodeTombstones {
-			delete(nodes, t.NodeId)
-		}
-
-		// Copy nodemap before passing it over to the main goroutine. The values don't
-		// need to be deep copied as they're not ever changed (only inserted).
-		nodesCopy := make(nodeMap)
-		for k, v := range nodes {
-			nodesCopy[k] = v
-		}
-		s.clusterC <- nodesCopy
-	}
 }
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index ce0b5cc..dd224d5 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -23,6 +23,7 @@
         "//metropolis/node/core/consensus",
         "//metropolis/node/core/curator",
         "//metropolis/node/core/curator/proto/api",
+        "//metropolis/node/core/curator/watcher",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/localstorage",
         "//metropolis/node/core/metrics",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 3140ea8..718c394 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -50,7 +50,6 @@
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
-
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
diff --git a/metropolis/node/core/roleserve/worker_rolefetch.go b/metropolis/node/core/roleserve/worker_rolefetch.go
index 40d2ffd..aaac076 100644
--- a/metropolis/node/core/roleserve/worker_rolefetch.go
+++ b/metropolis/node/core/roleserve/worker_rolefetch.go
@@ -2,10 +2,10 @@
 
 import (
 	"context"
-	"fmt"
 
 	"google.golang.org/protobuf/proto"
 
+	"source.monogon.dev/metropolis/node/core/curator/watcher"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
@@ -70,65 +70,48 @@
 	// Run networked part in a sub-runnable so that network errors don't cause us to
 	// retry the above and make us possibly trigger spurious restarts.
 	supervisor.Run(ctx, "watcher", func(ctx context.Context) error {
-		w := s.curatorConnection.Watch()
-		defer w.Close()
-		cc, err := w.Get(ctx)
+		cw := s.curatorConnection.Watch()
+		defer cw.Close()
+		cc, err := cw.Get(ctx)
 		if err != nil {
 			return err
 		}
 
 		nodeID := cc.nodeID()
 		cur := ipb.NewCuratorClient(cc.conn)
-
-		// Start watch for current node, update localRoles whenever we get something
-		// new.
-		srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
-			NodeInCluster: &ipb.WatchRequest_NodeInCluster{
-				NodeId: nodeID,
-			},
-		}})
-		if err != nil {
-			return fmt.Errorf("watch failed: %w", err)
-		}
-		defer srv.CloseSend()
+		w := watcher.WatchNode(ctx, cur, nodeID)
+		defer w.Close()
 
 		supervisor.Signal(ctx, supervisor.SignalHealthy)
-		for {
-			ev, err := srv.Recv()
-			if err != nil {
-				return fmt.Errorf("watch event receive failed: %w", err)
-			}
-			for _, n := range ev.Nodes {
-				n := n
-				// Skip spuriously sent other nodes.
-				if n.Id != nodeID {
-					continue
-				}
-				supervisor.Logger(ctx).Infof("Got new node data. Roles:")
-				if n.Roles.ConsensusMember != nil {
-					supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
-				}
-				if n.Roles.KubernetesController != nil {
-					supervisor.Logger(ctx).Infof(" - kubernetes controller")
-				}
-				if n.Roles.KubernetesWorker != nil {
-					supervisor.Logger(ctx).Infof(" - kubernetes worker")
-				}
-				s.localRoles.Set(n.Roles)
 
-				// Persist role data to disk.
-				bytes, err := proto.Marshal(n.Roles)
+		// Run watch for current node, update localRoles whenever we get something
+		// new.
+		for w.Next() {
+			n := w.Node()
+			supervisor.Logger(ctx).Infof("Got new node data. Roles:")
+			if n.Roles.ConsensusMember != nil {
+				supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
+			}
+			if n.Roles.KubernetesController != nil {
+				supervisor.Logger(ctx).Infof(" - kubernetes controller")
+			}
+			if n.Roles.KubernetesWorker != nil {
+				supervisor.Logger(ctx).Infof(" - kubernetes worker")
+			}
+			s.localRoles.Set(n.Roles)
+
+			// Persist role data to disk.
+			bytes, err := proto.Marshal(n.Roles)
+			if err != nil {
+				supervisor.Logger(ctx).Errorf("Failed to marshal node roles: %w", err)
+			} else {
+				err = s.storageRoot.Data.Node.PersistedRoles.Write(bytes, 0400)
 				if err != nil {
-					supervisor.Logger(ctx).Errorf("Failed to marshal node roles: %w", err)
-				} else {
-					err = s.storageRoot.Data.Node.PersistedRoles.Write(bytes, 0400)
-					if err != nil {
-						supervisor.Logger(ctx).Errorf("Failed to write node roles: %w", err)
-					}
+					supervisor.Logger(ctx).Errorf("Failed to write node roles: %w", err)
 				}
-				break
 			}
 		}
+		return w.Error()
 	})
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
diff --git a/metropolis/node/core/rpc/resolver/BUILD.bazel b/metropolis/node/core/rpc/resolver/BUILD.bazel
index 3a2e6cd..4acf31e 100644
--- a/metropolis/node/core/rpc/resolver/BUILD.bazel
+++ b/metropolis/node/core/rpc/resolver/BUILD.bazel
@@ -12,6 +12,7 @@
     deps = [
         "//metropolis/node",
         "//metropolis/node/core/curator/proto/api",
+        "//metropolis/node/core/curator/watcher",
         "//metropolis/proto/common",
         "@com_github_cenkalti_backoff_v4//:backoff",
         "@org_golang_google_grpc//:go_default_library",
diff --git a/metropolis/node/core/rpc/resolver/processor.go b/metropolis/node/core/rpc/resolver/processor.go
index 975b2df..40174bf 100644
--- a/metropolis/node/core/rpc/resolver/processor.go
+++ b/metropolis/node/core/rpc/resolver/processor.go
@@ -34,47 +34,11 @@
 	resC chan *curatorMap
 }
 
-type nodeStatusMap map[string]*cpb.NodeStatus
-
-func (n nodeStatusMap) equals(o nodeStatusMap) bool {
-	// Check that we have the same keys on both maps.
-	for k, _ := range n {
-		_, ok := o[k]
-		if !ok {
-			return false
-		}
-	}
-	for k, _ := range o {
-		_, ok := n[k]
-		if !ok {
-			return false
-		}
-	}
-	// Keys are equal, compare values.
-	for k, v1 := range n {
-		v2 := o[k]
-
-		cur1 := v1.RunningCurator != nil
-		cur2 := v2.RunningCurator != nil
-		if cur1 != cur2 {
-			return false
-		}
-		if v1.ExternalAddress != v2.ExternalAddress {
-			return false
-		}
-
-		if cur1 && cur2 && v1.RunningCurator.Port != v2.RunningCurator.Port {
-			return false
-		}
-	}
-	return true
-}
-
 // requestNodesUpdate is received from the curator updater, and carries
 // information about the current curators as seen by the cluster control plane.
 type requestNodesUpdate struct {
 	// nodes is a map from node ID to received status
-	nodes nodeStatusMap
+	nodes map[string]*cpb.NodeStatus
 }
 
 // requestSeedAdd is received from AddEndpoint calls. It updates the processor's
diff --git a/metropolis/node/core/rpc/resolver/resolver.go b/metropolis/node/core/rpc/resolver/resolver.go
index 36ad180..5b89f73 100644
--- a/metropolis/node/core/rpc/resolver/resolver.go
+++ b/metropolis/node/core/rpc/resolver/resolver.go
@@ -13,6 +13,8 @@
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/keepalive"
 
+	"source.monogon.dev/metropolis/node/core/curator/watcher"
+
 	common "source.monogon.dev/metropolis/node"
 	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	cpb "source.monogon.dev/metropolis/proto/common"
@@ -223,66 +225,54 @@
 	defer cl.Close()
 
 	cur := apb.NewCuratorClient(cl)
-	prevCurators := make(nodeStatusMap)
 
 	return backoff.RetryNotify(func() error {
-		w, err := cur.Watch(ctx, &apb.WatchRequest{
-			Kind: &apb.WatchRequest_NodesInCluster_{
-				NodesInCluster: &apb.WatchRequest_NodesInCluster{},
-			},
-		})
-		if err != nil {
-			return fmt.Errorf("could not watch nodes: %v", err)
-		}
-
 		// Map from node ID to status.
 		nodes := make(map[string]*cpb.NodeStatus)
 
-		// Keep updating map from watcher.
-		for {
-			ev, err := w.Recv()
-			if err != nil {
-				return fmt.Errorf("when receiving node: %w", err)
-			}
-			bo.Reset()
-
-			// Update internal map but only care about curators.
-			for _, n := range ev.Nodes {
-				if n.Status == nil || n.Status.RunningCurator == nil {
-					delete(nodes, n.Id)
-					continue
+		return watcher.WatchNodes(ctx, cur, &watcher.SimpleFollower{
+			FilterFn: func(a *apb.Node) bool {
+				if a.Status == nil {
+					return false
 				}
-				nodes[n.Id] = n.Status
-			}
-			for _, n := range ev.NodeTombstones {
-				delete(nodes, n.NodeId)
-			}
-
-			// Make a copy to submit to client.
-			curators := make(nodeStatusMap)
-			var curatorNames []string
-			for k, v := range nodes {
-				if v == nil {
-					continue
+				if a.Status.ExternalAddress == "" {
+					return false
 				}
-				curators[k] = v
-				curatorNames = append(curatorNames, k)
-			}
-
-			// Only submit an update (and log) if the effective curator map actually changed.
-			if prevCurators.equals(curators) {
-				continue
-			}
-			prevCurators = curators
-
-			r.logger("CURUPDATE: got new curators: %s", strings.Join(curatorNames, ", "))
-
-			select {
-			case r.reqC <- &request{nu: &requestNodesUpdate{nodes: curators}}:
-			case <-ctx.Done():
-				return ctx.Err()
-			}
-		}
+				if a.Status.RunningCurator == nil {
+					return false
+				}
+				return true
+			},
+			EqualsFn: func(a *apb.Node, b *apb.Node) bool {
+				if a.Status.ExternalAddress != b.Status.ExternalAddress {
+					return false
+				}
+				if (a.Status.RunningCurator == nil) != (b.Status.RunningCurator == nil) {
+					return false
+				}
+				return true
+			},
+			OnNewUpdated: func(new *apb.Node) error {
+				nodes[new.Id] = new.Status
+				return nil
+			},
+			OnDeleted: func(prev *apb.Node) error {
+				delete(nodes, prev.Id)
+				return nil
+			},
+			OnBatchDone: func() error {
+				nodesClone := make(map[string]*cpb.NodeStatus)
+				for k, v := range nodes {
+					nodesClone[k] = v
+				}
+				select {
+				case r.reqC <- &request{nu: &requestNodesUpdate{nodes: nodesClone}}:
+				case <-ctx.Done():
+					return ctx.Err()
+				}
+				return nil
+			},
+		})
 	}, backoff.WithContext(bo, ctx), func(err error, t time.Duration) {
 		c := make(chan *responseDebug)
 		r.reqC <- &request{dbg: &requestDebug{resC: c}}
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index a88b4b3..8e68973 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -20,6 +20,7 @@
         "//metropolis/node",
         "//metropolis/node/core/clusternet",
         "//metropolis/node/core/curator/proto/api",
+        "//metropolis/node/core/curator/watcher",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/localstorage",
         "//metropolis/node/core/network",
diff --git a/metropolis/node/kubernetes/apiproxy.go b/metropolis/node/kubernetes/apiproxy.go
index 9f9c851..d937824 100644
--- a/metropolis/node/kubernetes/apiproxy.go
+++ b/metropolis/node/kubernetes/apiproxy.go
@@ -2,12 +2,12 @@
 
 import (
 	"context"
-	"fmt"
 	"net"
 
 	"source.monogon.dev/go/net/tinylb"
 	"source.monogon.dev/metropolis/node"
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/node/core/curator/watcher"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 )
 
@@ -15,38 +15,36 @@
 // the currently known nodes running a Kubernetes apiserver as retrieved from the
 // given curator client.
 func updateLoadbalancerAPIServers(ctx context.Context, val *memory.Value[tinylb.BackendSet], cur ipb.CuratorClient) error {
-	w, err := cur.Watch(ctx, &ipb.WatchRequest{
-		Kind: &ipb.WatchRequest_NodesInCluster_{
-			NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
-		},
-	})
-	if err != nil {
-		return fmt.Errorf("watch failed: %w", err)
-	}
-	defer w.CloseSend()
-
 	set := &tinylb.BackendSet{}
 	val.Set(set.Clone())
-	for {
-		ev, err := w.Recv()
-		if err != nil {
-			return fmt.Errorf("receive failed: %w", err)
-		}
 
-		for _, n := range ev.Nodes {
-			if n.Status == nil || n.Status.ExternalAddress == "" {
-				set.Delete(n.Id)
-				continue
+	return watcher.WatchNodes(ctx, cur, watcher.SimpleFollower{
+		FilterFn: func(a *ipb.Node) bool {
+			if a.Status == nil {
+				return false
 			}
-			if n.Roles.KubernetesController == nil {
-				set.Delete(n.Id)
-				continue
+			if a.Status.ExternalAddress == "" {
+				return false
 			}
-			set.Insert(n.Id, &tinylb.SimpleTCPBackend{Remote: net.JoinHostPort(n.Status.ExternalAddress, node.KubernetesAPIPort.PortString())})
-		}
-		for _, t := range ev.NodeTombstones {
-			set.Delete(t.NodeId)
-		}
-		val.Set(set.Clone())
-	}
+			if a.Roles.KubernetesController == nil {
+				return false
+			}
+			return true
+		},
+		EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
+			return a.Status.ExternalAddress == b.Status.ExternalAddress
+		},
+		OnNewUpdated: func(new *ipb.Node) error {
+			set.Insert(new.Id, &tinylb.SimpleTCPBackend{
+				Remote: net.JoinHostPort(new.Status.ExternalAddress, node.KubernetesAPIPort.PortString()),
+			})
+			val.Set(set.Clone())
+			return nil
+		},
+		OnDeleted: func(prev *ipb.Node) error {
+			set.Delete(prev.Id)
+			val.Set(set.Clone())
+			return nil
+		},
+	})
 }