m/n/core/clusternet: init

This implements the new cluster networking daemon. This is just the
daemon itself with some tests. It's not yet used.

Change-Id: Ida34b647db0d075fcaaf2d57c9a8a14701713552
Reviewed-on: https://review.monogon.dev/c/monogon/+/1416
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/BUILD.bazel b/metropolis/node/BUILD.bazel
index 0904ec1..b03b107 100644
--- a/metropolis/node/BUILD.bazel
+++ b/metropolis/node/BUILD.bazel
@@ -9,10 +9,12 @@
     name = "node",
     srcs = [
         "ids.go",
+        "net_protocols.go",
         "ports.go",
     ],
     importpath = "source.monogon.dev/metropolis/node",
     visibility = ["//metropolis:__subpackages__"],
+    deps = ["@com_github_vishvananda_netlink//:netlink"],
 )
 
 # debug_build checks if we're building in debug mode and enables various debug features for the image.
diff --git a/metropolis/node/core/clusternet/BUILD.bazel b/metropolis/node/core/clusternet/BUILD.bazel
new file mode 100644
index 0000000..c577d96
--- /dev/null
+++ b/metropolis/node/core/clusternet/BUILD.bazel
@@ -0,0 +1,53 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+load("//metropolis/test/ktest:ktest.bzl", "ktest")
+
+go_library(
+    name = "clusternet",
+    srcs = [
+        "clusternet.go",
+        "types.go",
+        "wireguard.go",
+    ],
+    importpath = "source.monogon.dev/metropolis/node/core/clusternet",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//metropolis/node",
+        "//metropolis/node/core/curator/proto/api",
+        "//metropolis/node/core/localstorage",
+        "//metropolis/pkg/event",
+        "//metropolis/pkg/supervisor",
+        "//metropolis/proto/common",
+        "@com_github_cenkalti_backoff_v4//:backoff",
+        "@com_github_vishvananda_netlink//:netlink",
+        "@com_zx2c4_golang_wireguard_wgctrl//:wgctrl",
+        "@com_zx2c4_golang_wireguard_wgctrl//wgtypes",
+    ],
+)
+
+go_test(
+    name = "clusternet_test",
+    srcs = [
+        "clusternet_test.go",
+        "types_test.go",
+    ],
+    embed = [":clusternet"],
+    deps = [
+        "//metropolis/node",
+        "//metropolis/node/core/curator/proto/api",
+        "//metropolis/node/core/localstorage",
+        "//metropolis/node/core/localstorage/declarative",
+        "//metropolis/pkg/event/memory",
+        "//metropolis/pkg/supervisor",
+        "//metropolis/proto/common",
+        "@com_zx2c4_golang_wireguard_wgctrl//:wgctrl",
+        "@com_zx2c4_golang_wireguard_wgctrl//wgtypes",
+        "@org_golang_google_grpc//:go_default_library",
+        "@org_golang_google_grpc//credentials/insecure",
+        "@org_golang_google_grpc//test/bufconn",
+    ],
+)
+
+ktest(
+    cmdline = "ramdisk_size=128",
+    tester = ":clusternet_test",
+)
diff --git a/metropolis/node/core/clusternet/clusternet.go b/metropolis/node/core/clusternet/clusternet.go
new file mode 100644
index 0000000..c4b4a8e
--- /dev/null
+++ b/metropolis/node/core/clusternet/clusternet.go
@@ -0,0 +1,174 @@
+// Package clusternet implements a Cluster Networking mesh service running on all
+// Metropolis nodes.
+//
+// The mesh is based on wireguard and a centralized configuration store in the
+// cluster Curator (in etcd).
+//
+// While the implementation is nearly generic, it currently makes an assumption
+// that it is used only for Kubernetes pod networking. That has a few
+// implications:
+//
+// First, we only have a single real route on the host into the wireguard
+// networking mesh / interface, and that is configured ahead of time in the
+// Service as ClusterNet. All destination addresses that should be carried by the
+// mesh must thus be part of this single route. Otherwise, traffic will be able
+// to flow into the node from other nodes, but will exit through another
+// interface. This is used in practice to allow other host nodes (whose external
+// addresses are outside the cluster network) to access the cluster network.
+//
+// Second, we only have a single source/owner of prefixes per node: the
+// Kubernetes service. This is reflected as the LocalKubernetesPodNetwork event
+// Value in Service.
+package clusternet
+
+import (
+	"context"
+	"fmt"
+	"net"
+
+	"github.com/cenkalti/backoff/v4"
+
+	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/pkg/event"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// Service implements the Cluster Networking Mesh. See package-level docs for
+// more details.
+type Service struct {
+	// Curator is the gRPC client that the service will use to reach the cluster's
+	// Curator, for pushing locally announced prefixes and pulling information about
+	// other nodes.
+	Curator apb.CuratorClient
+	// ClusterNet is the prefix that will be programmed to exit through the wireguard
+	// mesh.
+	ClusterNet net.IPNet
+	// DataDirectory is where the WireGuard key of this node will be stored.
+	DataDirectory *localstorage.DataKubernetesClusterNetworkingDirectory
+	// LocalKubernetesPodNetwork is an event.Value watched for prefixes that should
+	// be announced into the mesh. This is to be Set by the Kubernetes service once
+	// it knows about the local node's IPAM address assignment.
+	LocalKubernetesPodNetwork event.Value[*Prefixes]
+
+	// wg is the interface to all the low-level interactions with WireGuard (and
+	// kernel routing). If not set, this defaults to a production implementation.
+	// This can be overridden by test to a test implementation instead.
+	wg wireguard
+}
+
+// Run the Service. This must be used in a supervisor Runnable.
+func (s *Service) Run(ctx context.Context) error {
+	if s.wg == nil {
+		s.wg = &localWireguard{}
+	}
+	if err := s.wg.ensureOnDiskKey(s.DataDirectory); err != nil {
+		return fmt.Errorf("could not ensure wireguard key: %w", err)
+	}
+	if err := s.wg.setup(&s.ClusterNet); err != nil {
+		return fmt.Errorf("could not setup wireguard: %w", err)
+	}
+
+	supervisor.Logger(ctx).Infof("Wireguard setup complete, starting updaters...")
+
+	if err := supervisor.Run(ctx, "pusher", s.push); err != nil {
+		return err
+	}
+	if err := supervisor.Run(ctx, "puller", s.pull); err != nil {
+		return err
+	}
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	<-ctx.Done()
+	return ctx.Err()
+}
+
+// push is the sub-runnable responsible for letting the Curator know about what
+// prefixes that are originated by this node.
+func (s *Service) push(ctx context.Context) error {
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+	w := s.LocalKubernetesPodNetwork.Watch()
+	defer w.Close()
+
+	for {
+		// We only submit our wireguard key and prefixes when we're actually ready to
+		// announce something.
+		k8sPrefixes, err := w.Get(ctx)
+		if err != nil {
+			return fmt.Errorf("couldn't get k8s prefixes: %w", err)
+		}
+
+		err = backoff.Retry(func() error {
+			_, err := s.Curator.UpdateNodeClusterNetworking(ctx, &apb.UpdateNodeClusterNetworkingRequest{
+				Clusternet: &cpb.NodeClusterNetworking{
+					WireguardPubkey: s.wg.key().PublicKey().String(),
+					Prefixes:        k8sPrefixes.proto(),
+				},
+			})
+			if err != nil {
+				supervisor.Logger(ctx).Warningf("Could not submit cluster networking update: %v", err)
+			}
+			return err
+		}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
+		if err != nil {
+			return fmt.Errorf("couldn't update curator: %w", err)
+		}
+	}
+}
+
+// pull is the sub-runnable responsible for fetching information about the
+// cluster networking setup/status of other nodes, and programming it as
+// WireGuard peers.
+func (s *Service) pull(ctx context.Context) error {
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+	srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
+		Kind: &apb.WatchRequest_NodesInCluster_{
+			NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+		},
+	})
+	if err != nil {
+		return fmt.Errorf("curator watch failed: %w", err)
+	}
+	defer srv.CloseSend()
+
+	nodes := newNodemap()
+	for {
+		ev, err := srv.Recv()
+		if err != nil {
+			return fmt.Errorf("curator watch recv failed: %w", err)
+		}
+
+		updated, removed := nodes.update(ctx, ev)
+
+		for _, n := range removed {
+			supervisor.Logger(ctx).Infof("Node %s removed, unconfiguring", n.id)
+			if err := s.wg.unconfigurePeer(n); err != nil {
+				// Do nothing and hope whatever caused this will go away at some point.
+				supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", n.id, err)
+			}
+		}
+		var newNodes []*node
+		for _, n := range updated {
+			newNodes = append(newNodes, n)
+			supervisor.Logger(ctx).Infof("Node %s updated: pk %s, address %s, prefixes %v", n.id, n.pubkey, n.address, n.prefixes)
+		}
+		succeeded := 0
+		if err := s.wg.configurePeers(newNodes); err != nil {
+			// If configuring all nodes at once failed, go node-by-node to make sure we've
+			// done as much as possible.
+			supervisor.Logger(ctx).Warningf("Bulk node update call failed, trying node-by-node..: %v", err)
+			for _, n := range newNodes {
+				if err := s.wg.configurePeers([]*node{n}); err != nil {
+					supervisor.Logger(ctx).Errorf("Node %s failed: %v", n.id, err)
+				} else {
+					succeeded += 1
+				}
+			}
+		} else {
+			succeeded = len(newNodes)
+		}
+		supervisor.Logger(ctx).Infof("Successfully updated %d out of %d nodes", succeeded, len(newNodes))
+	}
+}
diff --git a/metropolis/node/core/clusternet/clusternet_test.go b/metropolis/node/core/clusternet/clusternet_test.go
new file mode 100644
index 0000000..0c70507
--- /dev/null
+++ b/metropolis/node/core/clusternet/clusternet_test.go
@@ -0,0 +1,485 @@
+package clusternet
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"os"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"golang.zx2c4.com/wireguard/wgctrl"
+	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	"google.golang.org/grpc/test/bufconn"
+
+	common "source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/localstorage/declarative"
+	"source.monogon.dev/metropolis/pkg/event/memory"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+
+	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// testCurator is a shim Curator implementation that serves pending Watch
+// requests based on data submitted to a channel.
+type testCurator struct {
+	apb.UnimplementedCuratorServer
+
+	watchC    chan *apb.WatchEvent
+	updateReq memory.Value[*apb.UpdateNodeClusterNetworkingRequest]
+}
+
+// Watch implements a minimum Watch which just returns all nodes at once.
+func (t *testCurator) Watch(_ *apb.WatchRequest, srv apb.Curator_WatchServer) error {
+	ctx := srv.Context()
+	for {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case ev := <-t.watchC:
+			if err := srv.Send(ev); err != nil {
+				return err
+			}
+		}
+	}
+}
+
+func (t *testCurator) UpdateNodeClusterNetworking(ctx context.Context, req *apb.UpdateNodeClusterNetworkingRequest) (*apb.UpdateNodeClusterNetworkingResponse, error) {
+	t.updateReq.Set(req)
+	return &apb.UpdateNodeClusterNetworkingResponse{}, nil
+}
+
+// nodeWithPrefix submits a given node/key/address with prefixes to the Watch
+// event channel.
+func (t *testCurator) nodeWithPrefixes(key wgtypes.Key, id, address string, prefixes ...string) {
+	var p []*cpb.NodeClusterNetworking_Prefix
+	for _, prefix := range prefixes {
+		p = append(p, &cpb.NodeClusterNetworking_Prefix{Cidr: prefix})
+	}
+	n := &apb.Node{
+		Id: id,
+		Status: &cpb.NodeStatus{
+			ExternalAddress: address,
+		},
+		Clusternet: &cpb.NodeClusterNetworking{
+			WireguardPubkey: key.PublicKey().String(),
+			Prefixes:        p,
+		},
+	}
+	t.watchC <- &apb.WatchEvent{
+		Nodes: []*apb.Node{
+			n,
+		},
+	}
+}
+
+// deleteNode submits a given node for deletion to the Watch event channel.
+func (t *testCurator) deleteNode(id string) {
+	t.watchC <- &apb.WatchEvent{
+		NodeTombstones: []*apb.WatchEvent_NodeTombstone{
+			{
+				NodeId: id,
+			},
+		},
+	}
+}
+
+// makeTestCurator returns a working testCurator alongside a grpc connection to
+// it.
+func makeTestCurator(t *testing.T) (*testCurator, *grpc.ClientConn) {
+	cur := &testCurator{
+		watchC: make(chan *apb.WatchEvent),
+	}
+
+	srv := grpc.NewServer()
+	apb.RegisterCuratorServer(srv, cur)
+	externalLis := bufconn.Listen(1024 * 1024)
+	go func() {
+		if err := srv.Serve(externalLis); err != nil {
+			t.Fatalf("GRPC serve failed: %v", err)
+		}
+	}()
+	withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
+		return externalLis.Dial()
+	})
+	cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		t.Fatalf("Dialing GRPC failed: %v", err)
+	}
+
+	return cur, cl
+}
+
+// fakeWireguard implements wireguard while keeping peer information internally.
+type fakeWireguard struct {
+	k wgtypes.Key
+
+	muNodes        sync.Mutex
+	nodes          map[string]*node
+	failNextUpdate bool
+}
+
+func (f *fakeWireguard) ensureOnDiskKey(_ *localstorage.DataKubernetesClusterNetworkingDirectory) error {
+	f.k, _ = wgtypes.GeneratePrivateKey()
+	return nil
+}
+
+func (f *fakeWireguard) setup(clusterNet *net.IPNet) error {
+	f.muNodes.Lock()
+	defer f.muNodes.Unlock()
+	f.nodes = make(map[string]*node)
+	return nil
+}
+
+func (f *fakeWireguard) configurePeers(nodes []*node) error {
+	f.muNodes.Lock()
+	defer f.muNodes.Unlock()
+	if f.failNextUpdate {
+		f.failNextUpdate = false
+		return fmt.Errorf("synthetic test failure")
+	}
+	for _, n := range nodes {
+		f.nodes[n.id] = n
+	}
+	return nil
+}
+
+func (f *fakeWireguard) unconfigurePeer(n *node) error {
+	f.muNodes.Lock()
+	defer f.muNodes.Unlock()
+	delete(f.nodes, n.id)
+	return nil
+}
+
+func (f *fakeWireguard) key() wgtypes.Key {
+	return f.k
+}
+
+func (f *fakeWireguard) close() {
+}
+
+// TestClusternetBasic exercises clusternet with a fake curator and fake
+// wireguard, trying to exercise as many edge cases as possible.
+func TestClusternetBasic(t *testing.T) {
+	key1, err := wgtypes.GeneratePrivateKey()
+	if err != nil {
+		t.Fatalf("Failed to generate private key: %v", err)
+	}
+	key2, err := wgtypes.GeneratePrivateKey()
+	if err != nil {
+		t.Fatalf("Failed to generate private key: %v", err)
+	}
+
+	cur, cl := makeTestCurator(t)
+	defer cl.Close()
+	curator := apb.NewCuratorClient(cl)
+
+	var podNetwork memory.Value[*Prefixes]
+	wg := &fakeWireguard{}
+	svc := Service{
+		Curator: curator,
+		ClusterNet: net.IPNet{
+			IP:   net.IP([]byte{10, 10, 0, 0}),
+			Mask: net.IPv4Mask(255, 255, 0, 0),
+		},
+		DataDirectory:             nil,
+		LocalKubernetesPodNetwork: &podNetwork,
+
+		wg: wg,
+	}
+	supervisor.TestHarness(t, svc.Run)
+
+	checkState := func(nodes map[string]*node) error {
+		t.Helper()
+		wg.muNodes.Lock()
+		defer wg.muNodes.Unlock()
+		for nid, n := range nodes {
+			n2, ok := wg.nodes[nid]
+			if !ok {
+				return fmt.Errorf("node %q missing in programmed peers", nid)
+			}
+			if n2.pubkey != n.pubkey {
+				return fmt.Errorf("node %q pubkey mismatch: %q in programmed peers, %q wanted", nid, n2.pubkey, n.pubkey)
+			}
+			if n2.address != n.address {
+				return fmt.Errorf("node %q address mismatch: %q in programmed peers, %q wanted", nid, n2.address, n.address)
+			}
+			p := strings.Join(n.prefixes, ",")
+			p2 := strings.Join(n2.prefixes, ",")
+			if p != p2 {
+				return fmt.Errorf("node %q prefixes mismatch: %v in programmed peers, %v wanted", nid, n2.prefixes, n.prefixes)
+			}
+		}
+		for nid, _ := range wg.nodes {
+			if _, ok := nodes[nid]; !ok {
+				return fmt.Errorf("node %q present in programmed peers", nid)
+			}
+		}
+		return nil
+	}
+
+	assertStateEventual := func(nodes map[string]*node) {
+		t.Helper()
+		deadline := time.Now().Add(5 * time.Second)
+		for {
+			err := checkState(nodes)
+			if err == nil {
+				break
+			}
+			if time.Now().After(deadline) {
+				t.Error(err)
+				return
+			}
+		}
+
+	}
+
+	// Start with a single node.
+	cur.nodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.4")
+	assertStateEventual(map[string]*node{
+		"metropolis-fake-1": {
+			pubkey:   key1.PublicKey().String(),
+			address:  "1.2.3.4",
+			prefixes: nil,
+		},
+	})
+	// Change the node's peer address.
+	cur.nodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5")
+	assertStateEventual(map[string]*node{
+		"metropolis-fake-1": {
+			pubkey:   key1.PublicKey().String(),
+			address:  "1.2.3.5",
+			prefixes: nil,
+		},
+	})
+	// Add another node.
+	cur.nodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6")
+	assertStateEventual(map[string]*node{
+		"metropolis-fake-1": {
+			pubkey:   key1.PublicKey().String(),
+			address:  "1.2.3.5",
+			prefixes: nil,
+		},
+		"metropolis-fake-2": {
+			pubkey:   key2.PublicKey().String(),
+			address:  "1.2.3.6",
+			prefixes: nil,
+		},
+	})
+	// Add some prefixes to both nodes, but fail the next configurePeers call.
+	wg.muNodes.Lock()
+	wg.failNextUpdate = true
+	wg.muNodes.Unlock()
+	cur.nodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5", "10.100.10.0/24", "10.100.20.0/24")
+	cur.nodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6", "10.100.30.0/24", "10.100.40.0/24")
+	assertStateEventual(map[string]*node{
+		"metropolis-fake-1": {
+			pubkey:  key1.PublicKey().String(),
+			address: "1.2.3.5",
+			prefixes: []string{
+				"10.100.10.0/24", "10.100.20.0/24",
+			},
+		},
+		"metropolis-fake-2": {
+			pubkey:  key2.PublicKey().String(),
+			address: "1.2.3.6",
+			prefixes: []string{
+				"10.100.30.0/24", "10.100.40.0/24",
+			},
+		},
+	})
+	// Delete one of the nodes.
+	cur.deleteNode("metropolis-fake-1")
+	assertStateEventual(map[string]*node{
+		"metropolis-fake-2": {
+			pubkey:  key2.PublicKey().String(),
+			address: "1.2.3.6",
+			prefixes: []string{
+				"10.100.30.0/24", "10.100.40.0/24",
+			},
+		},
+	})
+}
+
+// TestWireguardImplementation makes sure localWireguard behaves as expected.
+func TestWireguardIntegration(t *testing.T) {
+	if os.Getenv("IN_KTEST") != "true" {
+		t.Skip("Not in ktest")
+	}
+
+	root := &localstorage.Root{}
+	tmp, err := os.MkdirTemp("", "clusternet")
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = declarative.PlaceFS(root, tmp)
+	if err != nil {
+		t.Fatal(err)
+	}
+	os.MkdirAll(root.Data.Kubernetes.ClusterNetworking.FullPath(), 0700)
+	wg := &localWireguard{}
+
+	// Ensure key once and make note of it.
+	if err := wg.ensureOnDiskKey(&root.Data.Kubernetes.ClusterNetworking); err != nil {
+		t.Fatalf("Could not ensure wireguard key: %v", err)
+	}
+	key := wg.key().String()
+	// Do it again, and make sure the key hasn't changed.
+	wg = &localWireguard{}
+	if err := wg.ensureOnDiskKey(&root.Data.Kubernetes.ClusterNetworking); err != nil {
+		t.Fatalf("Could not ensure wireguard key second time: %v", err)
+	}
+	if want, got := key, wg.key().String(); want != got {
+		t.Fatalf("Key changed, was %q, became %q", want, got)
+	}
+
+	// Setup the interface.
+	cnet := net.IPNet{
+		IP:   net.IP([]byte{10, 10, 0, 0}),
+		Mask: net.IPv4Mask(255, 255, 0, 0),
+	}
+	if err := wg.setup(&cnet); err != nil {
+		t.Fatalf("Failed to setup interface: %v", err)
+	}
+	// Do it again.
+	wg.close()
+	if err := wg.setup(&cnet); err != nil {
+		t.Fatalf("Failed to setup interface second time: %v", err)
+	}
+
+	// Check that the key and listen port are configured correctly.
+	wgClient, err := wgctrl.New()
+	if err != nil {
+		t.Fatalf("Failed to create wireguard client: %v", err)
+	}
+	wgDev, err := wgClient.Device(clusterNetDeviceName)
+	if err != nil {
+		t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+	}
+	if want, got := key, wgDev.PrivateKey.String(); want != got {
+		t.Errorf("Wireguard key mismatch, wanted %q, got %q", want, got)
+	}
+	if want, got := int(common.WireGuardPort), wgDev.ListenPort; want != got {
+		t.Errorf("Wireguard port mismatch, wanted %d, got %d", want, got)
+	}
+
+	// Add some peers and check that we got them.
+	pkeys := make([]wgtypes.Key, 2)
+	pkeys[0], err = wgtypes.GeneratePrivateKey()
+	if err != nil {
+		t.Fatalf("Failed to generate private key: %v", err)
+	}
+	pkeys[1], err = wgtypes.GeneratePrivateKey()
+	if err != nil {
+		t.Fatalf("Failed to generate private key: %v", err)
+	}
+	err = wg.configurePeers([]*node{
+		{
+			pubkey:  pkeys[0].PublicKey().String(),
+			address: "10.100.0.1",
+			prefixes: []string{
+				"10.0.0.0/24",
+				"10.0.1.0/24",
+			},
+		},
+		{
+			pubkey:  pkeys[1].PublicKey().String(),
+			address: "10.100.1.1",
+			prefixes: []string{
+				"10.1.0.0/24",
+				"10.1.1.0/24",
+			},
+		},
+	})
+	if err != nil {
+		t.Fatalf("Configuring peers failed: %v", err)
+	}
+
+	wgDev, err = wgClient.Device(clusterNetDeviceName)
+	if err != nil {
+		t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+	}
+	if want, got := 2, len(wgDev.Peers); want != got {
+		t.Errorf("Wanted %d peers, got %d", want, got)
+	} else {
+		for i := 0; i < 2; i++ {
+			if want, got := pkeys[i].PublicKey().String(), wgDev.Peers[i].PublicKey.String(); want != got {
+				t.Errorf("Peer %d should have key %q, got %q", i, want, got)
+			}
+			if want, got := fmt.Sprintf("10.100.%d.1:%s", i, common.WireGuardPort.PortString()), wgDev.Peers[i].Endpoint.String(); want != got {
+				t.Errorf("Peer %d should have endpoint %q, got %q", i, want, got)
+			}
+			if want, got := 2, len(wgDev.Peers[i].AllowedIPs); want != got {
+				t.Errorf("Peer %d should have %d peers, got %d", i, want, got)
+			} else {
+				for j := 0; j < 2; j++ {
+					if want, got := fmt.Sprintf("10.%d.%d.0/24", i, j), wgDev.Peers[i].AllowedIPs[j].String(); want != got {
+						t.Errorf("Peer %d should have allowed ip %d %q, got %q", i, j, want, got)
+					}
+				}
+			}
+		}
+	}
+
+	// Update one of the peers and check that things got applied.
+	err = wg.configurePeers([]*node{
+		{
+			pubkey:  pkeys[0].PublicKey().String(),
+			address: "10.100.0.3",
+			prefixes: []string{
+				"10.0.0.0/24",
+			},
+		},
+	})
+	if err != nil {
+		t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+	}
+	wgDev, err = wgClient.Device(clusterNetDeviceName)
+	if err != nil {
+		t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+	}
+	if want, got := 2, len(wgDev.Peers); want != got {
+		t.Errorf("Wanted %d peers, got %d", want, got)
+	} else {
+		if want, got := pkeys[0].PublicKey().String(), wgDev.Peers[0].PublicKey.String(); want != got {
+			t.Errorf("Peer 0 should have key %q, got %q", want, got)
+		}
+		if want, got := fmt.Sprintf("10.100.0.3:%s", common.WireGuardPort.PortString()), wgDev.Peers[0].Endpoint.String(); want != got {
+			t.Errorf("Peer 0 should have endpoint %q, got %q", want, got)
+		}
+		if want, got := 1, len(wgDev.Peers[0].AllowedIPs); want != got {
+			t.Errorf("Peer 0 should have %d peers, got %d", want, got)
+		} else {
+			if want, got := "10.0.0.0/24", wgDev.Peers[0].AllowedIPs[0].String(); want != got {
+				t.Errorf("Peer 0 should have allowed ip 0 %q, got %q", want, got)
+			}
+		}
+	}
+
+	// Remove one of the peers and make sure it's gone.
+	err = wg.unconfigurePeer(&node{
+		pubkey: pkeys[0].PublicKey().String(),
+	})
+	if err != nil {
+		t.Fatalf("Failed to unconfigure peer: %v", err)
+	}
+	err = wg.unconfigurePeer(&node{
+		pubkey: pkeys[0].PublicKey().String(),
+	})
+	if err != nil {
+		t.Fatalf("Failed to unconfigure peer a second time: %v", err)
+	}
+	wgDev, err = wgClient.Device(clusterNetDeviceName)
+	if err != nil {
+		t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+	}
+	if want, got := 1, len(wgDev.Peers); want != got {
+		t.Errorf("Wanted %d peer, got %d", want, got)
+	}
+}
diff --git a/metropolis/node/core/clusternet/types.go b/metropolis/node/core/clusternet/types.go
new file mode 100644
index 0000000..9cd1052
--- /dev/null
+++ b/metropolis/node/core/clusternet/types.go
@@ -0,0 +1,138 @@
+package clusternet
+
+import (
+	"context"
+	"net/netip"
+	"sort"
+	"strings"
+
+	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// Prefixes are network prefixes that should be announced by a node to the
+// Cluster Networking mesh.
+type Prefixes []netip.Prefix
+
+func (p Prefixes) proto() (res []*cpb.NodeClusterNetworking_Prefix) {
+	for _, prefix := range p {
+		res = append(res, &cpb.NodeClusterNetworking_Prefix{
+			Cidr: prefix.String(),
+		})
+	}
+	return
+}
+
+// node is used for internal statekeeping in the cluster networking service.
+type node struct {
+	id       string
+	pubkey   string
+	address  string
+	prefixes []string
+}
+
+// update mutates this node to whatever data is held in the given proto Node, and
+// returns true if any data changed.
+func (n *node) update(p *apb.Node) (changed bool) {
+	if n.address != p.Status.ExternalAddress {
+		n.address = p.Status.ExternalAddress
+		changed = true
+	}
+	if n.pubkey != p.Clusternet.WireguardPubkey {
+		n.pubkey = p.Clusternet.WireguardPubkey
+		changed = true
+	}
+
+	var newPrefixes []string
+	for _, prefix := range p.Clusternet.Prefixes {
+		if prefix.Cidr == "" {
+			continue
+		}
+		newPrefixes = append(newPrefixes, prefix.Cidr)
+	}
+	oldPrefixes := make([]string, len(n.prefixes))
+	copy(oldPrefixes[:], n.prefixes)
+
+	sort.Strings(newPrefixes)
+	sort.Strings(oldPrefixes)
+	if want, got := strings.Join(newPrefixes, ","), strings.Join(oldPrefixes, ","); want != got {
+		n.prefixes = newPrefixes
+		changed = true
+	}
+
+	return
+}
+
+// nodeMap is the main internal statekeeping structure of the pull sub-runnable.
+type nodeMap struct {
+	nodes map[string]*node
+}
+
+func newNodemap() *nodeMap {
+	return &nodeMap{
+		nodes: make(map[string]*node),
+	}
+}
+
+// update updates the nodeMap from the given Curator WatchEvent, interpreting
+// both node changes and deletions. Two nodeMaps are returned: the first one
+// contains only nodes that have been added/changed by the given event, the other
+// contains only nodes that have been deleted by the given event.
+func (m *nodeMap) update(ctx context.Context, ev *apb.WatchEvent) (changed, removed map[string]*node) {
+	changed = make(map[string]*node)
+	removed = make(map[string]*node)
+
+	// Make sure we're not getting multiple nodes with the same public key. This is
+	// not expected to happen in practice as the Curator should prevent this from
+	// happening, but we at least want to make sure we're not blowing up routing if
+	// other defenses fail.
+
+	// pkeys maps from public key to node ID.
+	pkeys := make(map[string]string)
+	for _, n := range m.nodes {
+		// We don't have to check whether we have any collisions already in m.nodes, as
+		// the check below prevents them from happening in the first place.
+		pkeys[n.pubkey] = n.id
+	}
+
+	for _, n := range ev.Nodes {
+		// Only care about nodes that have all required configuration set.
+		if n.Status == nil || n.Status.ExternalAddress == "" || n.Clusternet == nil || n.Clusternet.WireguardPubkey == "" {
+			// We could attempt to delete any matching node currently in this nodemap at this
+			// point, but this is likely transient and we don't want to just kill routing for
+			// no reason.
+			continue
+		}
+
+		key := n.Clusternet.WireguardPubkey
+		if id, ok := pkeys[key]; ok && id != n.Id {
+			supervisor.Logger(ctx).Warningf("Nodes %q and %q share wireguard key %q. That should not have happened.", n.Id, id, key)
+			continue
+		}
+
+		if _, ok := m.nodes[n.Id]; !ok {
+			m.nodes[n.Id] = &node{
+				id: n.Id,
+			}
+		}
+		diff := m.nodes[n.Id].update(n)
+		if diff {
+			changed[n.Id] = m.nodes[n.Id]
+		}
+	}
+
+	for _, t := range ev.NodeTombstones {
+		n, ok := m.nodes[t.NodeId]
+		if !ok {
+			// This is an indication of us losing data somehow. If this happens, it likely
+			// means a Curator bug.
+			supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
+			continue
+		}
+		removed[n.id] = n
+		delete(m.nodes, n.id)
+	}
+
+	return
+}
diff --git a/metropolis/node/core/clusternet/types_test.go b/metropolis/node/core/clusternet/types_test.go
new file mode 100644
index 0000000..8f58b4e
--- /dev/null
+++ b/metropolis/node/core/clusternet/types_test.go
@@ -0,0 +1,104 @@
+package clusternet
+
+import (
+	"testing"
+
+	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+func TestNodeUpdate(t *testing.T) {
+	var n node
+
+	for i, te := range []struct {
+		p    *apb.Node
+		want bool
+	}{
+		// Case 0: empty incoming node, no change.
+		{
+			p:    &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{}},
+			want: false,
+		},
+		// Case 1: wireguard key set.
+		{
+			p:    &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake"}},
+			want: true,
+		},
+		// Case 2: wireguard key updated.
+		{
+			p:    &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
+			want: true,
+		},
+		// Case 3: external address added.
+		{
+			p:    &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.4"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
+			want: true,
+		},
+		// Case 4: external address changed.
+		{
+			p:    &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
+			want: true,
+		},
+		// Case 5: prefixes added
+		{
+			p: &apb.Node{
+				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
+				Clusternet: &cpb.NodeClusterNetworking{
+					WireguardPubkey: "fake2",
+					Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+						{Cidr: "10.0.2.0/24"},
+						{Cidr: "10.0.1.0/24"},
+					},
+				},
+			},
+			want: true,
+		},
+		// Case 6: prefixes changed
+		{
+			p: &apb.Node{
+				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
+				Clusternet: &cpb.NodeClusterNetworking{
+					WireguardPubkey: "fake2",
+					Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+						{Cidr: "10.0.3.0/24"},
+						{Cidr: "10.0.1.0/24"},
+					},
+				},
+			},
+			want: true,
+		},
+		// Case 6: prefixes reordered (no change expected)
+		{
+			p: &apb.Node{
+				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
+				Clusternet: &cpb.NodeClusterNetworking{
+					WireguardPubkey: "fake2",
+					Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+						{Cidr: "10.0.3.0/24"},
+						{Cidr: "10.0.1.0/24"},
+					},
+				},
+			},
+			want: false,
+		},
+		// Case 6: prefixes removed
+		{
+			p: &apb.Node{
+				Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
+				Clusternet: &cpb.NodeClusterNetworking{
+					WireguardPubkey: "fake2",
+					Prefixes:        []*cpb.NodeClusterNetworking_Prefix{},
+				},
+			},
+			want: true,
+		},
+	} {
+		got := n.update(te.p)
+		if te.want && !got {
+			t.Fatalf("Case %d: expected change, got no change", i)
+		}
+		if !te.want && got {
+			t.Fatalf("Case %d: expected no change, got change", i)
+		}
+	}
+}
diff --git a/metropolis/node/core/clusternet/wireguard.go b/metropolis/node/core/clusternet/wireguard.go
new file mode 100644
index 0000000..9ce6d49
--- /dev/null
+++ b/metropolis/node/core/clusternet/wireguard.go
@@ -0,0 +1,197 @@
+package clusternet
+
+import (
+	"fmt"
+	"net"
+	"os"
+
+	"github.com/vishvananda/netlink"
+	"golang.zx2c4.com/wireguard/wgctrl"
+	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
+
+	common "source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/localstorage"
+)
+
+const (
+	// clusterNetDevicename is the name of the WireGuard interface that will be
+	// created in the host network namespace.
+	clusterNetDeviceName = "clusternet"
+)
+
+// wireguard decouples the cluster networking service from actual mutations
+// performed in the local Linux networking namespace. This is mostly done to help
+// in testing the cluster networking service.
+//
+// Because it's effectively just a mockable interface, see the actual
+// localWireguard method implementations for documentation.
+type wireguard interface {
+	ensureOnDiskKey(dir *localstorage.DataKubernetesClusterNetworkingDirectory) error
+	setup(clusterNet *net.IPNet) error
+	configurePeers(n []*node) error
+	unconfigurePeer(n *node) error
+	key() wgtypes.Key
+	close()
+}
+
+type localWireguard struct {
+	wgClient *wgctrl.Client
+	privKey  wgtypes.Key
+}
+
+// ensureOnDiskKey loads the private key from disk or (if none exists) generates
+// one and persists it. The resulting key is then saved into the localWireguard
+// instance.
+func (s *localWireguard) ensureOnDiskKey(dir *localstorage.DataKubernetesClusterNetworkingDirectory) error {
+	keyRaw, err := dir.Key.Read()
+	if os.IsNotExist(err) {
+		key, err := wgtypes.GeneratePrivateKey()
+		if err != nil {
+			return fmt.Errorf("when generating key: %w", err)
+		}
+		if err := dir.Key.Write([]byte(key.String()), 0600); err != nil {
+			return fmt.Errorf("save failed: %w", err)
+		}
+		s.privKey = key
+		return nil
+	} else if err != nil {
+		return fmt.Errorf("load failed: %w", err)
+	}
+
+	key, err := wgtypes.ParseKey(string(keyRaw))
+	if err != nil {
+		return fmt.Errorf("invalid private key in file: %w", err)
+	}
+	s.privKey = key
+	return nil
+}
+
+// setup the local network namespace by creating a WireGuard interface and adding
+// a clusterNet route to it. If a matching WireGuard interface already exists in
+// the system, it is first deleted.
+//
+// ensureOnDiskKey must be called before calling this function.
+func (s *localWireguard) setup(clusterNet *net.IPNet) error {
+	links, err := netlink.LinkList()
+	if err != nil {
+		return fmt.Errorf("could not list links: %w", err)
+	}
+	for _, link := range links {
+		if link.Attrs().Name != clusterNetDeviceName {
+			continue
+		}
+		if err := netlink.LinkDel(link); err != nil {
+			return fmt.Errorf("could not remove existing clusternet link: %w", err)
+		}
+	}
+
+	wgInterface := &netlink.Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
+	if err := netlink.LinkAdd(wgInterface); err != nil {
+		return fmt.Errorf("when adding network interface: %w", err)
+	}
+
+	wgClient, err := wgctrl.New()
+	if err != nil {
+		return fmt.Errorf("when creating wireguard client: %w", err)
+	}
+	s.wgClient = wgClient
+
+	listenPort := int(common.WireGuardPort)
+	if err := s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+		PrivateKey: &s.privKey,
+		ListenPort: &listenPort,
+	}); err != nil {
+		return fmt.Errorf("when setting up device: %w", err)
+	}
+
+	if err := netlink.RouteAdd(&netlink.Route{
+		Dst:       clusterNet,
+		LinkIndex: wgInterface.Index,
+		Protocol:  common.ProtocolClusternet,
+	}); err != nil && !os.IsExist(err) {
+		return fmt.Errorf("when creating cluster route: %w", err)
+	}
+	return nil
+}
+
+// configurePeers creates or updates a peers on the local wireguard interface
+// based on the given nodes.
+//
+// If any node is somehow invalid and causes a parse/reconfiguration error, the
+// function will return an error. The caller should retry with a different set of
+// nodes, performing search/bisection on its own.
+func (s *localWireguard) configurePeers(nodes []*node) error {
+	var configs []wgtypes.PeerConfig
+
+	for i, n := range nodes {
+		if s.privKey.PublicKey().String() == n.pubkey {
+			// Node doesn't need to connect to itself
+			continue
+		}
+		pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+		if err != nil {
+			return fmt.Errorf("node %d: failed to parse public-key %q: %w", i, n.pubkey, err)
+		}
+		addressParsed := net.ParseIP(n.address)
+		if addressParsed == nil {
+			return fmt.Errorf("node %d: failed to parse address %q: %w", i, n.address, err)
+		}
+		var allowedIPs []net.IPNet
+		for _, prefix := range n.prefixes {
+			_, podNet, err := net.ParseCIDR(prefix)
+			if err != nil {
+				// Just eat the parse error. Not much we can do here. We have enough validation
+				// in the rest of the system that we shouldn't ever reach this.
+				continue
+			}
+			allowedIPs = append(allowedIPs, *podNet)
+		}
+		endpoint := net.UDPAddr{Port: int(common.WireGuardPort), IP: addressParsed}
+		configs = append(configs, wgtypes.PeerConfig{
+			PublicKey:         pubkeyParsed,
+			Endpoint:          &endpoint,
+			ReplaceAllowedIPs: true,
+			AllowedIPs:        allowedIPs,
+		})
+	}
+
+	err := s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+		Peers: configs,
+	})
+	if err != nil {
+		return fmt.Errorf("failed to configure WireGuard peers: %w", err)
+	}
+	return nil
+}
+
+// unconfigurePeer removes the peer from the local WireGuard interface based on
+// the given node. If no peer existed matching the given node, this operation is
+// a no-op.
+func (s *localWireguard) unconfigurePeer(n *node) error {
+	pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+	if err != nil {
+		return fmt.Errorf("failed to parse public-key %q: %w", n.pubkey, err)
+	}
+
+	err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+		Peers: []wgtypes.PeerConfig{{
+			PublicKey: pubkeyParsed,
+			Remove:    true,
+		}},
+	})
+	if err != nil {
+		return fmt.Errorf("failed to delete WireGuard peer: %w", err)
+	}
+	return nil
+}
+
+func (s *localWireguard) key() wgtypes.Key {
+	return s.privKey
+}
+
+// close cleans up after the wireguard client, but does _not_ remove the
+// interface or peers.
+func (s *localWireguard) close() {
+	s.wgClient.Close()
+	s.wgClient = nil
+}
diff --git a/metropolis/node/net_protocols.go b/metropolis/node/net_protocols.go
new file mode 100644
index 0000000..ac43cbf
--- /dev/null
+++ b/metropolis/node/net_protocols.go
@@ -0,0 +1,11 @@
+package node
+
+import "github.com/vishvananda/netlink"
+
+// These are netlink protocol numbers used internally for various netlink
+// resource (e.g. route) owners/manager.
+const (
+	// ProtocolClusternet is used by //metropolis/node/core/clusternet when
+	// creating/removing routes pointing to the clusternet interface.
+	ProtocolClusternet netlink.RouteProtocol = 129
+)