metropolis: move curator client watches to curator/watcher

This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.

Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/clusternet/BUILD.bazel b/metropolis/node/core/clusternet/BUILD.bazel
index a5fc41a..1ccce66 100644
--- a/metropolis/node/core/clusternet/BUILD.bazel
+++ b/metropolis/node/core/clusternet/BUILD.bazel
@@ -13,6 +13,7 @@
     deps = [
         "//metropolis/node",
         "//metropolis/node/core/curator/proto/api",
+        "//metropolis/node/core/curator/watcher",
         "//metropolis/node/core/localstorage",
         "//metropolis/node/core/network",
         "//metropolis/pkg/event",
@@ -27,10 +28,7 @@
 
 go_test(
     name = "clusternet_test",
-    srcs = [
-        "clusternet_test.go",
-        "types_test.go",
-    ],
+    srcs = ["clusternet_test.go"],
     embed = [":clusternet"],
     deps = [
         "//metropolis/node",
diff --git a/metropolis/node/core/clusternet/clusternet.go b/metropolis/node/core/clusternet/clusternet.go
index 2e01c77..85ac63f 100644
--- a/metropolis/node/core/clusternet/clusternet.go
+++ b/metropolis/node/core/clusternet/clusternet.go
@@ -26,9 +26,11 @@
 	"fmt"
 	"net"
 	"net/netip"
+	"slices"
 
 	"github.com/cenkalti/backoff/v4"
 
+	"source.monogon.dev/metropolis/node/core/curator/watcher"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/pkg/event"
@@ -176,58 +178,45 @@
 func (s *Service) pull(ctx context.Context) error {
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
-	srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
-		Kind: &apb.WatchRequest_NodesInCluster_{
-			NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+	var batch []*apb.Node
+	return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+		FilterFn: func(a *apb.Node) bool {
+			if a.Clusternet == nil {
+				return false
+			}
+			if a.Clusternet.WireguardPubkey == "" {
+				return false
+			}
+			return true
+		},
+		EqualsFn: func(a *apb.Node, b *apb.Node) bool {
+			if a.Status.ExternalAddress != b.Status.ExternalAddress {
+				return false
+			}
+			if a.Clusternet.WireguardPubkey != b.Clusternet.WireguardPubkey {
+				return false
+			}
+			if !slices.Equal(a.Clusternet.Prefixes, b.Clusternet.Prefixes) {
+				return false
+			}
+			return true
+		},
+		OnNewUpdated: func(new *apb.Node) error {
+			batch = append(batch, new)
+			return nil
+		},
+		OnBatchDone: func() error {
+			if err := s.wg.configurePeers(batch); err != nil {
+				supervisor.Logger(ctx).Errorf("nodes couldn't be configured: %v", err)
+			}
+			batch = nil
+			return nil
+		},
+		OnDeleted: func(prev *apb.Node) error {
+			if err := s.wg.unconfigurePeer(prev); err != nil {
+				supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", prev.Id, err)
+			}
+			return nil
 		},
 	})
-	if err != nil {
-		return fmt.Errorf("curator watch failed: %w", err)
-	}
-	defer srv.CloseSend()
-
-	nodes := newNodemap()
-	for {
-		ev, err := srv.Recv()
-		if err != nil {
-			return fmt.Errorf("curator watch recv failed: %w", err)
-		}
-
-		updated, removed := nodes.update(ctx, ev)
-
-		for _, n := range removed {
-			supervisor.Logger(ctx).Infof("Node %s removed, unconfiguring", n.id)
-			if err := s.wg.unconfigurePeer(n.copy()); err != nil {
-				// Do nothing and hope whatever caused this will go away at some point.
-				supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", n.id, err)
-			}
-		}
-		var newNodes []*node
-		for _, n := range updated {
-			newNodes = append(newNodes, n.copy())
-			supervisor.Logger(ctx).Infof("Node %s updated: pk %s, address %s, prefixes %v", n.id, n.pubkey, n.address, n.prefixes)
-		}
-		succeeded := 0
-		if err := s.wg.configurePeers(newNodes); err != nil {
-			// If configuring all nodes at once failed, go node-by-node to make sure we've
-			// done as much as possible.
-			supervisor.Logger(ctx).Warningf("Bulk node update call failed, trying node-by-node..: %v", err)
-			for _, n := range newNodes {
-				if err := s.wg.configurePeers([]*node{n}); err != nil {
-					supervisor.Logger(ctx).Errorf("Node %s failed: %v", n.id, err)
-				} else {
-					succeeded += 1
-				}
-			}
-		} else {
-			succeeded = len(newNodes)
-		}
-
-		if len(newNodes) != 0 {
-			supervisor.Logger(ctx).Infof("Successfully updated %d out of %d nodes", succeeded, len(newNodes))
-
-			numNodes, numPrefixes := nodes.stats()
-			supervisor.Logger(ctx).Infof("Total: %d nodes, %d prefixes.", numNodes, numPrefixes)
-		}
-	}
 }
diff --git a/metropolis/node/core/clusternet/clusternet_test.go b/metropolis/node/core/clusternet/clusternet_test.go
index bda1459..b92cfb2 100644
--- a/metropolis/node/core/clusternet/clusternet_test.go
+++ b/metropolis/node/core/clusternet/clusternet_test.go
@@ -4,7 +4,8 @@
 	"fmt"
 	"net"
 	"os"
-	"strings"
+	"slices"
+	"sort"
 	"sync"
 	"testing"
 	"time"
@@ -21,6 +22,7 @@
 	"source.monogon.dev/metropolis/test/util"
 
 	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
 // fakeWireguard implements wireguard while keeping peer information internally.
@@ -28,7 +30,7 @@
 	k wgtypes.Key
 
 	muNodes        sync.Mutex
-	nodes          map[string]*node
+	nodes          map[string]*apb.Node
 	failNextUpdate bool
 }
 
@@ -40,27 +42,29 @@
 func (f *fakeWireguard) setup(clusterNet *net.IPNet) error {
 	f.muNodes.Lock()
 	defer f.muNodes.Unlock()
-	f.nodes = make(map[string]*node)
+	f.nodes = make(map[string]*apb.Node)
 	return nil
 }
 
-func (f *fakeWireguard) configurePeers(nodes []*node) error {
+func (f *fakeWireguard) configurePeers(nodes []*apb.Node) error {
 	f.muNodes.Lock()
 	defer f.muNodes.Unlock()
+
 	if f.failNextUpdate {
 		f.failNextUpdate = false
 		return fmt.Errorf("synthetic test failure")
 	}
+
 	for _, n := range nodes {
-		f.nodes[n.id] = n
+		f.nodes[n.Id] = n
 	}
 	return nil
 }
 
-func (f *fakeWireguard) unconfigurePeer(n *node) error {
+func (f *fakeWireguard) unconfigurePeer(node *apb.Node) error {
 	f.muNodes.Lock()
 	defer f.muNodes.Unlock()
-	delete(f.nodes, n.id)
+	delete(f.nodes, node.Id)
 	return nil
 }
 
@@ -105,7 +109,7 @@
 	}
 	supervisor.TestHarness(t, svc.Run)
 
-	checkState := func(nodes map[string]*node) error {
+	checkState := func(nodes map[string]*apb.Node) error {
 		t.Helper()
 		wg.muNodes.Lock()
 		defer wg.muNodes.Unlock()
@@ -114,16 +118,23 @@
 			if !ok {
 				return fmt.Errorf("node %q missing in programmed peers", nid)
 			}
-			if n2.pubkey != n.pubkey {
-				return fmt.Errorf("node %q pubkey mismatch: %q in programmed peers, %q wanted", nid, n2.pubkey, n.pubkey)
+			if got, want := n2.Clusternet.WireguardPubkey, n.Clusternet.WireguardPubkey; got != want {
+				return fmt.Errorf("node %q pubkey mismatch: %q in programmed peers, %q wanted", nid, got, want)
 			}
-			if n2.address != n.address {
-				return fmt.Errorf("node %q address mismatch: %q in programmed peers, %q wanted", nid, n2.address, n.address)
+			if got, want := n2.Status.ExternalAddress, n.Status.ExternalAddress; got != want {
+				return fmt.Errorf("node %q address mismatch: %q in programmed peers, %q wanted", nid, got, want)
 			}
-			p := strings.Join(n.prefixes, ",")
-			p2 := strings.Join(n2.prefixes, ",")
-			if p != p2 {
-				return fmt.Errorf("node %q prefixes mismatch: %v in programmed peers, %v wanted", nid, n2.prefixes, n.prefixes)
+			var p, p2 []string
+			for _, prefix := range n.Clusternet.Prefixes {
+				p = append(p, prefix.Cidr)
+			}
+			for _, prefix := range n2.Clusternet.Prefixes {
+				p2 = append(p2, prefix.Cidr)
+			}
+			sort.Strings(p)
+			sort.Strings(p2)
+			if !slices.Equal(p, p2) {
+				return fmt.Errorf("node %q prefixes mismatch: %v in programmed peers, %v wanted", nid, p2, p)
 			}
 		}
 		for nid, _ := range wg.nodes {
@@ -134,7 +145,7 @@
 		return nil
 	}
 
-	assertStateEventual := func(nodes map[string]*node) {
+	assertStateEventual := func(nodes map[string]*apb.Node) {
 		t.Helper()
 		deadline := time.Now().Add(5 * time.Second)
 		for {
@@ -152,34 +163,46 @@
 
 	// Start with a single node.
 	cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.4")
-	assertStateEventual(map[string]*node{
+	assertStateEventual(map[string]*apb.Node{
 		"metropolis-fake-1": {
-			pubkey:   key1.PublicKey().String(),
-			address:  "1.2.3.4",
-			prefixes: nil,
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.4",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key1.PublicKey().String(),
+			},
 		},
 	})
 	// Change the node's peer address.
 	cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5")
-	assertStateEventual(map[string]*node{
+	assertStateEventual(map[string]*apb.Node{
 		"metropolis-fake-1": {
-			pubkey:   key1.PublicKey().String(),
-			address:  "1.2.3.5",
-			prefixes: nil,
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.5",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key1.PublicKey().String(),
+			},
 		},
 	})
 	// Add another node.
 	cur.NodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6")
-	assertStateEventual(map[string]*node{
+	assertStateEventual(map[string]*apb.Node{
 		"metropolis-fake-1": {
-			pubkey:   key1.PublicKey().String(),
-			address:  "1.2.3.5",
-			prefixes: nil,
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.5",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key1.PublicKey().String(),
+			},
 		},
 		"metropolis-fake-2": {
-			pubkey:   key2.PublicKey().String(),
-			address:  "1.2.3.6",
-			prefixes: nil,
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.6",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key2.PublicKey().String(),
+			},
 		},
 	})
 	// Add some prefixes to both nodes, but fail the next configurePeers call.
@@ -188,30 +211,42 @@
 	wg.muNodes.Unlock()
 	cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5", "10.100.10.0/24", "10.100.20.0/24")
 	cur.NodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6", "10.100.30.0/24", "10.100.40.0/24")
-	assertStateEventual(map[string]*node{
+	assertStateEventual(map[string]*apb.Node{
 		"metropolis-fake-1": {
-			pubkey:  key1.PublicKey().String(),
-			address: "1.2.3.5",
-			prefixes: []string{
-				"10.100.10.0/24", "10.100.20.0/24",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.5",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key1.PublicKey().String(),
+				// No prefixes as the call failed.
 			},
 		},
 		"metropolis-fake-2": {
-			pubkey:  key2.PublicKey().String(),
-			address: "1.2.3.6",
-			prefixes: []string{
-				"10.100.30.0/24", "10.100.40.0/24",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.6",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key2.PublicKey().String(),
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.100.30.0/24"},
+					{Cidr: "10.100.40.0/24"},
+				},
 			},
 		},
 	})
 	// Delete one of the nodes.
 	cur.DeleteNode("metropolis-fake-1")
-	assertStateEventual(map[string]*node{
+	assertStateEventual(map[string]*apb.Node{
 		"metropolis-fake-2": {
-			pubkey:  key2.PublicKey().String(),
-			address: "1.2.3.6",
-			prefixes: []string{
-				"10.100.30.0/24", "10.100.40.0/24",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "1.2.3.6",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: key2.PublicKey().String(),
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.100.30.0/24"},
+					{Cidr: "10.100.40.0/24"},
+				},
 			},
 		},
 	})
@@ -289,21 +324,31 @@
 	if err != nil {
 		t.Fatalf("Failed to generate private key: %v", err)
 	}
-	err = wg.configurePeers([]*node{
+	err = wg.configurePeers([]*apb.Node{
 		{
-			pubkey:  pkeys[0].PublicKey().String(),
-			address: "10.100.0.1",
-			prefixes: []string{
-				"10.0.0.0/24",
-				"10.0.1.0/24",
+			Id: "test-0",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "10.100.0.1",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: pkeys[0].PublicKey().String(),
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.0.0.0/24"},
+					{Cidr: "10.0.1.0/24"},
+				},
 			},
 		},
 		{
-			pubkey:  pkeys[1].PublicKey().String(),
-			address: "10.100.1.1",
-			prefixes: []string{
-				"10.1.0.0/24",
-				"10.1.1.0/24",
+			Id: "test-1",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "10.100.1.1",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: pkeys[1].PublicKey().String(),
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.1.0.0/24"},
+					{Cidr: "10.1.1.0/24"},
+				},
 			},
 		},
 	})
@@ -338,17 +383,22 @@
 	}
 
 	// Update one of the peers and check that things got applied.
-	err = wg.configurePeers([]*node{
+	err = wg.configurePeers([]*apb.Node{
 		{
-			pubkey:  pkeys[0].PublicKey().String(),
-			address: "10.100.0.3",
-			prefixes: []string{
-				"10.0.0.0/24",
+			Id: "test-0",
+			Status: &cpb.NodeStatus{
+				ExternalAddress: "10.100.0.3",
+			},
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: pkeys[0].PublicKey().String(),
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.0.0.0/24"},
+				},
 			},
 		},
 	})
 	if err != nil {
-		t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+		t.Fatalf("Failed to update peer: %v", err)
 	}
 	wgDev, err = wgClient.Device(clusterNetDeviceName)
 	if err != nil {
@@ -373,14 +423,18 @@
 	}
 
 	// Remove one of the peers and make sure it's gone.
-	err = wg.unconfigurePeer(&node{
-		pubkey: pkeys[0].PublicKey().String(),
+	err = wg.unconfigurePeer(&apb.Node{
+		Clusternet: &cpb.NodeClusterNetworking{
+			WireguardPubkey: pkeys[0].PublicKey().String(),
+		},
 	})
 	if err != nil {
 		t.Fatalf("Failed to unconfigure peer: %v", err)
 	}
-	err = wg.unconfigurePeer(&node{
-		pubkey: pkeys[0].PublicKey().String(),
+	err = wg.unconfigurePeer(&apb.Node{
+		Clusternet: &cpb.NodeClusterNetworking{
+			WireguardPubkey: pkeys[0].PublicKey().String(),
+		},
 	})
 	if err != nil {
 		t.Fatalf("Failed to unconfigure peer a second time: %v", err)
diff --git a/metropolis/node/core/clusternet/types.go b/metropolis/node/core/clusternet/types.go
index 088cf8e..67e401a 100644
--- a/metropolis/node/core/clusternet/types.go
+++ b/metropolis/node/core/clusternet/types.go
@@ -1,13 +1,10 @@
 package clusternet
 
 import (
-	"context"
 	"net/netip"
 	"sort"
 	"strings"
 
-	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	"source.monogon.dev/metropolis/pkg/supervisor"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
@@ -59,131 +56,3 @@
 func (p *Prefixes) Equal(o *Prefixes) bool {
 	return p.String() == o.String()
 }
-
-// node is used for internal statekeeping in the cluster networking service.
-type node struct {
-	id       string
-	pubkey   string
-	address  string
-	prefixes []string
-}
-
-func (n *node) copy() *node {
-	n2 := *n
-	return &n2
-}
-
-// update mutates this node to whatever data is held in the given proto Node, and
-// returns true if any data changed.
-func (n *node) update(p *apb.Node) (changed bool) {
-	if n.address != p.Status.ExternalAddress {
-		n.address = p.Status.ExternalAddress
-		changed = true
-	}
-	if n.pubkey != p.Clusternet.WireguardPubkey {
-		n.pubkey = p.Clusternet.WireguardPubkey
-		changed = true
-	}
-
-	var newPrefixes []string
-	for _, prefix := range p.Clusternet.Prefixes {
-		if prefix.Cidr == "" {
-			continue
-		}
-		newPrefixes = append(newPrefixes, prefix.Cidr)
-	}
-	oldPrefixes := make([]string, len(n.prefixes))
-	copy(oldPrefixes[:], n.prefixes)
-
-	sort.Strings(newPrefixes)
-	sort.Strings(oldPrefixes)
-	if want, got := strings.Join(newPrefixes, ","), strings.Join(oldPrefixes, ","); want != got {
-		n.prefixes = newPrefixes
-		changed = true
-	}
-
-	return
-}
-
-// nodeMap is the main internal statekeeping structure of the pull sub-runnable.
-type nodeMap struct {
-	nodes map[string]*node
-}
-
-func newNodemap() *nodeMap {
-	return &nodeMap{
-		nodes: make(map[string]*node),
-	}
-}
-
-func (n *nodeMap) stats() (nodes int, prefixes int) {
-	nodes = len(n.nodes)
-
-	for _, node := range n.nodes {
-		prefixes += len(node.prefixes)
-	}
-
-	return
-}
-
-// update updates the nodeMap from the given Curator WatchEvent, interpreting
-// both node changes and deletions. Two nodeMaps are returned: the first one
-// contains only nodes that have been added/changed by the given event, the other
-// contains only nodes that have been deleted by the given event.
-func (m *nodeMap) update(ctx context.Context, ev *apb.WatchEvent) (changed, removed map[string]*node) {
-	changed = make(map[string]*node)
-	removed = make(map[string]*node)
-
-	// Make sure we're not getting multiple nodes with the same public key. This is
-	// not expected to happen in practice as the Curator should prevent this from
-	// happening, but we at least want to make sure we're not blowing up routing if
-	// other defenses fail.
-
-	// pkeys maps from public key to node ID.
-	pkeys := make(map[string]string)
-	for _, n := range m.nodes {
-		// We don't have to check whether we have any collisions already in m.nodes, as
-		// the check below prevents them from happening in the first place.
-		pkeys[n.pubkey] = n.id
-	}
-
-	for _, n := range ev.Nodes {
-		// Only care about nodes that have all required configuration set.
-		if n.Status == nil || n.Status.ExternalAddress == "" || n.Clusternet == nil || n.Clusternet.WireguardPubkey == "" {
-			// We could attempt to delete any matching node currently in this nodemap at this
-			// point, but this is likely transient and we don't want to just kill routing for
-			// no reason.
-			continue
-		}
-
-		key := n.Clusternet.WireguardPubkey
-		if id, ok := pkeys[key]; ok && id != n.Id {
-			supervisor.Logger(ctx).Warningf("Nodes %q and %q share wireguard key %q. That should not have happened.", n.Id, id, key)
-			continue
-		}
-
-		if _, ok := m.nodes[n.Id]; !ok {
-			m.nodes[n.Id] = &node{
-				id: n.Id,
-			}
-		}
-		diff := m.nodes[n.Id].update(n)
-		if diff {
-			changed[n.Id] = m.nodes[n.Id]
-		}
-	}
-
-	for _, t := range ev.NodeTombstones {
-		n, ok := m.nodes[t.NodeId]
-		if !ok {
-			// This is an indication of us losing data somehow. If this happens, it likely
-			// means a Curator bug.
-			supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
-			continue
-		}
-		removed[n.id] = n
-		delete(m.nodes, n.id)
-	}
-
-	return
-}
diff --git a/metropolis/node/core/clusternet/types_test.go b/metropolis/node/core/clusternet/types_test.go
deleted file mode 100644
index 8f58b4e..0000000
--- a/metropolis/node/core/clusternet/types_test.go
+++ /dev/null
@@ -1,104 +0,0 @@
-package clusternet
-
-import (
-	"testing"
-
-	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-func TestNodeUpdate(t *testing.T) {
-	var n node
-
-	for i, te := range []struct {
-		p    *apb.Node
-		want bool
-	}{
-		// Case 0: empty incoming node, no change.
-		{
-			p:    &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{}},
-			want: false,
-		},
-		// Case 1: wireguard key set.
-		{
-			p:    &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake"}},
-			want: true,
-		},
-		// Case 2: wireguard key updated.
-		{
-			p:    &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
-			want: true,
-		},
-		// Case 3: external address added.
-		{
-			p:    &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.4"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
-			want: true,
-		},
-		// Case 4: external address changed.
-		{
-			p:    &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
-			want: true,
-		},
-		// Case 5: prefixes added
-		{
-			p: &apb.Node{
-				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
-				Clusternet: &cpb.NodeClusterNetworking{
-					WireguardPubkey: "fake2",
-					Prefixes: []*cpb.NodeClusterNetworking_Prefix{
-						{Cidr: "10.0.2.0/24"},
-						{Cidr: "10.0.1.0/24"},
-					},
-				},
-			},
-			want: true,
-		},
-		// Case 6: prefixes changed
-		{
-			p: &apb.Node{
-				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
-				Clusternet: &cpb.NodeClusterNetworking{
-					WireguardPubkey: "fake2",
-					Prefixes: []*cpb.NodeClusterNetworking_Prefix{
-						{Cidr: "10.0.3.0/24"},
-						{Cidr: "10.0.1.0/24"},
-					},
-				},
-			},
-			want: true,
-		},
-		// Case 6: prefixes reordered (no change expected)
-		{
-			p: &apb.Node{
-				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
-				Clusternet: &cpb.NodeClusterNetworking{
-					WireguardPubkey: "fake2",
-					Prefixes: []*cpb.NodeClusterNetworking_Prefix{
-						{Cidr: "10.0.3.0/24"},
-						{Cidr: "10.0.1.0/24"},
-					},
-				},
-			},
-			want: false,
-		},
-		// Case 6: prefixes removed
-		{
-			p: &apb.Node{
-				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
-				Clusternet: &cpb.NodeClusterNetworking{
-					WireguardPubkey: "fake2",
-					Prefixes:        []*cpb.NodeClusterNetworking_Prefix{},
-				},
-			},
-			want: true,
-		},
-	} {
-		got := n.update(te.p)
-		if te.want && !got {
-			t.Fatalf("Case %d: expected change, got no change", i)
-		}
-		if !te.want && got {
-			t.Fatalf("Case %d: expected no change, got change", i)
-		}
-	}
-}
diff --git a/metropolis/node/core/clusternet/wireguard.go b/metropolis/node/core/clusternet/wireguard.go
index 9ce6d49..4c38c79 100644
--- a/metropolis/node/core/clusternet/wireguard.go
+++ b/metropolis/node/core/clusternet/wireguard.go
@@ -10,6 +10,7 @@
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 
 	common "source.monogon.dev/metropolis/node"
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 )
 
@@ -28,8 +29,8 @@
 type wireguard interface {
 	ensureOnDiskKey(dir *localstorage.DataKubernetesClusterNetworkingDirectory) error
 	setup(clusterNet *net.IPNet) error
-	configurePeers(n []*node) error
-	unconfigurePeer(n *node) error
+	configurePeers(nodes []*ipb.Node) error
+	unconfigurePeer(n *ipb.Node) error
 	key() wgtypes.Key
 	close()
 }
@@ -114,31 +115,26 @@
 	return nil
 }
 
-// configurePeers creates or updates a peers on the local wireguard interface
+// configurePeers creates or updates peers on the local wireguard interface
 // based on the given nodes.
-//
-// If any node is somehow invalid and causes a parse/reconfiguration error, the
-// function will return an error. The caller should retry with a different set of
-// nodes, performing search/bisection on its own.
-func (s *localWireguard) configurePeers(nodes []*node) error {
+func (s *localWireguard) configurePeers(nodes []*ipb.Node) error {
 	var configs []wgtypes.PeerConfig
-
 	for i, n := range nodes {
-		if s.privKey.PublicKey().String() == n.pubkey {
+		if s.privKey.PublicKey().String() == n.Clusternet.WireguardPubkey {
 			// Node doesn't need to connect to itself
 			continue
 		}
-		pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+		pubkeyParsed, err := wgtypes.ParseKey(n.Clusternet.WireguardPubkey)
 		if err != nil {
-			return fmt.Errorf("node %d: failed to parse public-key %q: %w", i, n.pubkey, err)
+			return fmt.Errorf("node %d: failed to parse public-key %q: %w", i, n.Clusternet.WireguardPubkey, err)
 		}
-		addressParsed := net.ParseIP(n.address)
+		addressParsed := net.ParseIP(n.Status.ExternalAddress)
 		if addressParsed == nil {
-			return fmt.Errorf("node %d: failed to parse address %q: %w", i, n.address, err)
+			return fmt.Errorf("node %d: failed to parse address %q: %w", i, n.Status.ExternalAddress, err)
 		}
 		var allowedIPs []net.IPNet
-		for _, prefix := range n.prefixes {
-			_, podNet, err := net.ParseCIDR(prefix)
+		for _, prefix := range n.Clusternet.Prefixes {
+			_, podNet, err := net.ParseCIDR(prefix.Cidr)
 			if err != nil {
 				// Just eat the parse error. Not much we can do here. We have enough validation
 				// in the rest of the system that we shouldn't ever reach this.
@@ -167,10 +163,10 @@
 // unconfigurePeer removes the peer from the local WireGuard interface based on
 // the given node. If no peer existed matching the given node, this operation is
 // a no-op.
-func (s *localWireguard) unconfigurePeer(n *node) error {
-	pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+func (s *localWireguard) unconfigurePeer(n *ipb.Node) error {
+	pubkeyParsed, err := wgtypes.ParseKey(n.Clusternet.WireguardPubkey)
 	if err != nil {
-		return fmt.Errorf("failed to parse public-key %q: %w", n.pubkey, err)
+		return fmt.Errorf("failed to parse public-key %q: %w", n.Clusternet.WireguardPubkey, err)
 	}
 
 	err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{