m/n/core/clusternet: init
This implements the new cluster networking daemon. This is just the
daemon itself with some tests. It's not yet used.
Change-Id: Ida34b647db0d075fcaaf2d57c9a8a14701713552
Reviewed-on: https://review.monogon.dev/c/monogon/+/1416
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/BUILD.bazel b/metropolis/node/BUILD.bazel
index 0904ec1..b03b107 100644
--- a/metropolis/node/BUILD.bazel
+++ b/metropolis/node/BUILD.bazel
@@ -9,10 +9,12 @@
name = "node",
srcs = [
"ids.go",
+ "net_protocols.go",
"ports.go",
],
importpath = "source.monogon.dev/metropolis/node",
visibility = ["//metropolis:__subpackages__"],
+ deps = ["@com_github_vishvananda_netlink//:netlink"],
)
# debug_build checks if we're building in debug mode and enables various debug features for the image.
diff --git a/metropolis/node/core/clusternet/BUILD.bazel b/metropolis/node/core/clusternet/BUILD.bazel
new file mode 100644
index 0000000..c577d96
--- /dev/null
+++ b/metropolis/node/core/clusternet/BUILD.bazel
@@ -0,0 +1,53 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+load("//metropolis/test/ktest:ktest.bzl", "ktest")
+
+go_library(
+ name = "clusternet",
+ srcs = [
+ "clusternet.go",
+ "types.go",
+ "wireguard.go",
+ ],
+ importpath = "source.monogon.dev/metropolis/node/core/clusternet",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//metropolis/node",
+ "//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/localstorage",
+ "//metropolis/pkg/event",
+ "//metropolis/pkg/supervisor",
+ "//metropolis/proto/common",
+ "@com_github_cenkalti_backoff_v4//:backoff",
+ "@com_github_vishvananda_netlink//:netlink",
+ "@com_zx2c4_golang_wireguard_wgctrl//:wgctrl",
+ "@com_zx2c4_golang_wireguard_wgctrl//wgtypes",
+ ],
+)
+
+go_test(
+ name = "clusternet_test",
+ srcs = [
+ "clusternet_test.go",
+ "types_test.go",
+ ],
+ embed = [":clusternet"],
+ deps = [
+ "//metropolis/node",
+ "//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/localstorage",
+ "//metropolis/node/core/localstorage/declarative",
+ "//metropolis/pkg/event/memory",
+ "//metropolis/pkg/supervisor",
+ "//metropolis/proto/common",
+ "@com_zx2c4_golang_wireguard_wgctrl//:wgctrl",
+ "@com_zx2c4_golang_wireguard_wgctrl//wgtypes",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//credentials/insecure",
+ "@org_golang_google_grpc//test/bufconn",
+ ],
+)
+
+ktest(
+ cmdline = "ramdisk_size=128",
+ tester = ":clusternet_test",
+)
diff --git a/metropolis/node/core/clusternet/clusternet.go b/metropolis/node/core/clusternet/clusternet.go
new file mode 100644
index 0000000..c4b4a8e
--- /dev/null
+++ b/metropolis/node/core/clusternet/clusternet.go
@@ -0,0 +1,174 @@
+// Package clusternet implements a Cluster Networking mesh service running on all
+// Metropolis nodes.
+//
+// The mesh is based on wireguard and a centralized configuration store in the
+// cluster Curator (in etcd).
+//
+// While the implementation is nearly generic, it currently makes an assumption
+// that it is used only for Kubernetes pod networking. That has a few
+// implications:
+//
+// First, we only have a single real route on the host into the wireguard
+// networking mesh / interface, and that is configured ahead of time in the
+// Service as ClusterNet. All destination addresses that should be carried by the
+// mesh must thus be part of this single route. Otherwise, traffic will be able
+// to flow into the node from other nodes, but will exit through another
+// interface. This is used in practice to allow other host nodes (whose external
+// addresses are outside the cluster network) to access the cluster network.
+//
+// Second, we only have a single source/owner of prefixes per node: the
+// Kubernetes service. This is reflected as the LocalKubernetesPodNetwork event
+// Value in Service.
+package clusternet
+
+import (
+ "context"
+ "fmt"
+ "net"
+
+ "github.com/cenkalti/backoff/v4"
+
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/pkg/event"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// Service implements the Cluster Networking Mesh. See package-level docs for
+// more details.
+type Service struct {
+ // Curator is the gRPC client that the service will use to reach the cluster's
+ // Curator, for pushing locally announced prefixes and pulling information about
+ // other nodes.
+ Curator apb.CuratorClient
+ // ClusterNet is the prefix that will be programmed to exit through the wireguard
+ // mesh.
+ ClusterNet net.IPNet
+ // DataDirectory is where the WireGuard key of this node will be stored.
+ DataDirectory *localstorage.DataKubernetesClusterNetworkingDirectory
+ // LocalKubernetesPodNetwork is an event.Value watched for prefixes that should
+ // be announced into the mesh. This is to be Set by the Kubernetes service once
+ // it knows about the local node's IPAM address assignment.
+ LocalKubernetesPodNetwork event.Value[*Prefixes]
+
+ // wg is the interface to all the low-level interactions with WireGuard (and
+ // kernel routing). If not set, this defaults to a production implementation.
+ // This can be overridden by test to a test implementation instead.
+ wg wireguard
+}
+
+// Run the Service. This must be used in a supervisor Runnable.
+func (s *Service) Run(ctx context.Context) error {
+ if s.wg == nil {
+ s.wg = &localWireguard{}
+ }
+ if err := s.wg.ensureOnDiskKey(s.DataDirectory); err != nil {
+ return fmt.Errorf("could not ensure wireguard key: %w", err)
+ }
+ if err := s.wg.setup(&s.ClusterNet); err != nil {
+ return fmt.Errorf("could not setup wireguard: %w", err)
+ }
+
+ supervisor.Logger(ctx).Infof("Wireguard setup complete, starting updaters...")
+
+ if err := supervisor.Run(ctx, "pusher", s.push); err != nil {
+ return err
+ }
+ if err := supervisor.Run(ctx, "puller", s.pull); err != nil {
+ return err
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ return ctx.Err()
+}
+
+// push is the sub-runnable responsible for letting the Curator know about what
+// prefixes that are originated by this node.
+func (s *Service) push(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ w := s.LocalKubernetesPodNetwork.Watch()
+ defer w.Close()
+
+ for {
+ // We only submit our wireguard key and prefixes when we're actually ready to
+ // announce something.
+ k8sPrefixes, err := w.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("couldn't get k8s prefixes: %w", err)
+ }
+
+ err = backoff.Retry(func() error {
+ _, err := s.Curator.UpdateNodeClusterNetworking(ctx, &apb.UpdateNodeClusterNetworkingRequest{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: s.wg.key().PublicKey().String(),
+ Prefixes: k8sPrefixes.proto(),
+ },
+ })
+ if err != nil {
+ supervisor.Logger(ctx).Warningf("Could not submit cluster networking update: %v", err)
+ }
+ return err
+ }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
+ if err != nil {
+ return fmt.Errorf("couldn't update curator: %w", err)
+ }
+ }
+}
+
+// pull is the sub-runnable responsible for fetching information about the
+// cluster networking setup/status of other nodes, and programming it as
+// WireGuard peers.
+func (s *Service) pull(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
+ Kind: &apb.WatchRequest_NodesInCluster_{
+ NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("curator watch failed: %w", err)
+ }
+ defer srv.CloseSend()
+
+ nodes := newNodemap()
+ for {
+ ev, err := srv.Recv()
+ if err != nil {
+ return fmt.Errorf("curator watch recv failed: %w", err)
+ }
+
+ updated, removed := nodes.update(ctx, ev)
+
+ for _, n := range removed {
+ supervisor.Logger(ctx).Infof("Node %s removed, unconfiguring", n.id)
+ if err := s.wg.unconfigurePeer(n); err != nil {
+ // Do nothing and hope whatever caused this will go away at some point.
+ supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", n.id, err)
+ }
+ }
+ var newNodes []*node
+ for _, n := range updated {
+ newNodes = append(newNodes, n)
+ supervisor.Logger(ctx).Infof("Node %s updated: pk %s, address %s, prefixes %v", n.id, n.pubkey, n.address, n.prefixes)
+ }
+ succeeded := 0
+ if err := s.wg.configurePeers(newNodes); err != nil {
+ // If configuring all nodes at once failed, go node-by-node to make sure we've
+ // done as much as possible.
+ supervisor.Logger(ctx).Warningf("Bulk node update call failed, trying node-by-node..: %v", err)
+ for _, n := range newNodes {
+ if err := s.wg.configurePeers([]*node{n}); err != nil {
+ supervisor.Logger(ctx).Errorf("Node %s failed: %v", n.id, err)
+ } else {
+ succeeded += 1
+ }
+ }
+ } else {
+ succeeded = len(newNodes)
+ }
+ supervisor.Logger(ctx).Infof("Successfully updated %d out of %d nodes", succeeded, len(newNodes))
+ }
+}
diff --git a/metropolis/node/core/clusternet/clusternet_test.go b/metropolis/node/core/clusternet/clusternet_test.go
new file mode 100644
index 0000000..0c70507
--- /dev/null
+++ b/metropolis/node/core/clusternet/clusternet_test.go
@@ -0,0 +1,485 @@
+package clusternet
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "os"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "golang.zx2c4.com/wireguard/wgctrl"
+ "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/test/bufconn"
+
+ common "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/localstorage/declarative"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// testCurator is a shim Curator implementation that serves pending Watch
+// requests based on data submitted to a channel.
+type testCurator struct {
+ apb.UnimplementedCuratorServer
+
+ watchC chan *apb.WatchEvent
+ updateReq memory.Value[*apb.UpdateNodeClusterNetworkingRequest]
+}
+
+// Watch implements a minimum Watch which just returns all nodes at once.
+func (t *testCurator) Watch(_ *apb.WatchRequest, srv apb.Curator_WatchServer) error {
+ ctx := srv.Context()
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case ev := <-t.watchC:
+ if err := srv.Send(ev); err != nil {
+ return err
+ }
+ }
+ }
+}
+
+func (t *testCurator) UpdateNodeClusterNetworking(ctx context.Context, req *apb.UpdateNodeClusterNetworkingRequest) (*apb.UpdateNodeClusterNetworkingResponse, error) {
+ t.updateReq.Set(req)
+ return &apb.UpdateNodeClusterNetworkingResponse{}, nil
+}
+
+// nodeWithPrefix submits a given node/key/address with prefixes to the Watch
+// event channel.
+func (t *testCurator) nodeWithPrefixes(key wgtypes.Key, id, address string, prefixes ...string) {
+ var p []*cpb.NodeClusterNetworking_Prefix
+ for _, prefix := range prefixes {
+ p = append(p, &cpb.NodeClusterNetworking_Prefix{Cidr: prefix})
+ }
+ n := &apb.Node{
+ Id: id,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: address,
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key.PublicKey().String(),
+ Prefixes: p,
+ },
+ }
+ t.watchC <- &apb.WatchEvent{
+ Nodes: []*apb.Node{
+ n,
+ },
+ }
+}
+
+// deleteNode submits a given node for deletion to the Watch event channel.
+func (t *testCurator) deleteNode(id string) {
+ t.watchC <- &apb.WatchEvent{
+ NodeTombstones: []*apb.WatchEvent_NodeTombstone{
+ {
+ NodeId: id,
+ },
+ },
+ }
+}
+
+// makeTestCurator returns a working testCurator alongside a grpc connection to
+// it.
+func makeTestCurator(t *testing.T) (*testCurator, *grpc.ClientConn) {
+ cur := &testCurator{
+ watchC: make(chan *apb.WatchEvent),
+ }
+
+ srv := grpc.NewServer()
+ apb.RegisterCuratorServer(srv, cur)
+ externalLis := bufconn.Listen(1024 * 1024)
+ go func() {
+ if err := srv.Serve(externalLis); err != nil {
+ t.Fatalf("GRPC serve failed: %v", err)
+ }
+ }()
+ withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
+ return externalLis.Dial()
+ })
+ cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Dialing GRPC failed: %v", err)
+ }
+
+ return cur, cl
+}
+
+// fakeWireguard implements wireguard while keeping peer information internally.
+type fakeWireguard struct {
+ k wgtypes.Key
+
+ muNodes sync.Mutex
+ nodes map[string]*node
+ failNextUpdate bool
+}
+
+func (f *fakeWireguard) ensureOnDiskKey(_ *localstorage.DataKubernetesClusterNetworkingDirectory) error {
+ f.k, _ = wgtypes.GeneratePrivateKey()
+ return nil
+}
+
+func (f *fakeWireguard) setup(clusterNet *net.IPNet) error {
+ f.muNodes.Lock()
+ defer f.muNodes.Unlock()
+ f.nodes = make(map[string]*node)
+ return nil
+}
+
+func (f *fakeWireguard) configurePeers(nodes []*node) error {
+ f.muNodes.Lock()
+ defer f.muNodes.Unlock()
+ if f.failNextUpdate {
+ f.failNextUpdate = false
+ return fmt.Errorf("synthetic test failure")
+ }
+ for _, n := range nodes {
+ f.nodes[n.id] = n
+ }
+ return nil
+}
+
+func (f *fakeWireguard) unconfigurePeer(n *node) error {
+ f.muNodes.Lock()
+ defer f.muNodes.Unlock()
+ delete(f.nodes, n.id)
+ return nil
+}
+
+func (f *fakeWireguard) key() wgtypes.Key {
+ return f.k
+}
+
+func (f *fakeWireguard) close() {
+}
+
+// TestClusternetBasic exercises clusternet with a fake curator and fake
+// wireguard, trying to exercise as many edge cases as possible.
+func TestClusternetBasic(t *testing.T) {
+ key1, err := wgtypes.GeneratePrivateKey()
+ if err != nil {
+ t.Fatalf("Failed to generate private key: %v", err)
+ }
+ key2, err := wgtypes.GeneratePrivateKey()
+ if err != nil {
+ t.Fatalf("Failed to generate private key: %v", err)
+ }
+
+ cur, cl := makeTestCurator(t)
+ defer cl.Close()
+ curator := apb.NewCuratorClient(cl)
+
+ var podNetwork memory.Value[*Prefixes]
+ wg := &fakeWireguard{}
+ svc := Service{
+ Curator: curator,
+ ClusterNet: net.IPNet{
+ IP: net.IP([]byte{10, 10, 0, 0}),
+ Mask: net.IPv4Mask(255, 255, 0, 0),
+ },
+ DataDirectory: nil,
+ LocalKubernetesPodNetwork: &podNetwork,
+
+ wg: wg,
+ }
+ supervisor.TestHarness(t, svc.Run)
+
+ checkState := func(nodes map[string]*node) error {
+ t.Helper()
+ wg.muNodes.Lock()
+ defer wg.muNodes.Unlock()
+ for nid, n := range nodes {
+ n2, ok := wg.nodes[nid]
+ if !ok {
+ return fmt.Errorf("node %q missing in programmed peers", nid)
+ }
+ if n2.pubkey != n.pubkey {
+ return fmt.Errorf("node %q pubkey mismatch: %q in programmed peers, %q wanted", nid, n2.pubkey, n.pubkey)
+ }
+ if n2.address != n.address {
+ return fmt.Errorf("node %q address mismatch: %q in programmed peers, %q wanted", nid, n2.address, n.address)
+ }
+ p := strings.Join(n.prefixes, ",")
+ p2 := strings.Join(n2.prefixes, ",")
+ if p != p2 {
+ return fmt.Errorf("node %q prefixes mismatch: %v in programmed peers, %v wanted", nid, n2.prefixes, n.prefixes)
+ }
+ }
+ for nid, _ := range wg.nodes {
+ if _, ok := nodes[nid]; !ok {
+ return fmt.Errorf("node %q present in programmed peers", nid)
+ }
+ }
+ return nil
+ }
+
+ assertStateEventual := func(nodes map[string]*node) {
+ t.Helper()
+ deadline := time.Now().Add(5 * time.Second)
+ for {
+ err := checkState(nodes)
+ if err == nil {
+ break
+ }
+ if time.Now().After(deadline) {
+ t.Error(err)
+ return
+ }
+ }
+
+ }
+
+ // Start with a single node.
+ cur.nodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.4")
+ assertStateEventual(map[string]*node{
+ "metropolis-fake-1": {
+ pubkey: key1.PublicKey().String(),
+ address: "1.2.3.4",
+ prefixes: nil,
+ },
+ })
+ // Change the node's peer address.
+ cur.nodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5")
+ assertStateEventual(map[string]*node{
+ "metropolis-fake-1": {
+ pubkey: key1.PublicKey().String(),
+ address: "1.2.3.5",
+ prefixes: nil,
+ },
+ })
+ // Add another node.
+ cur.nodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6")
+ assertStateEventual(map[string]*node{
+ "metropolis-fake-1": {
+ pubkey: key1.PublicKey().String(),
+ address: "1.2.3.5",
+ prefixes: nil,
+ },
+ "metropolis-fake-2": {
+ pubkey: key2.PublicKey().String(),
+ address: "1.2.3.6",
+ prefixes: nil,
+ },
+ })
+ // Add some prefixes to both nodes, but fail the next configurePeers call.
+ wg.muNodes.Lock()
+ wg.failNextUpdate = true
+ wg.muNodes.Unlock()
+ cur.nodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5", "10.100.10.0/24", "10.100.20.0/24")
+ cur.nodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6", "10.100.30.0/24", "10.100.40.0/24")
+ assertStateEventual(map[string]*node{
+ "metropolis-fake-1": {
+ pubkey: key1.PublicKey().String(),
+ address: "1.2.3.5",
+ prefixes: []string{
+ "10.100.10.0/24", "10.100.20.0/24",
+ },
+ },
+ "metropolis-fake-2": {
+ pubkey: key2.PublicKey().String(),
+ address: "1.2.3.6",
+ prefixes: []string{
+ "10.100.30.0/24", "10.100.40.0/24",
+ },
+ },
+ })
+ // Delete one of the nodes.
+ cur.deleteNode("metropolis-fake-1")
+ assertStateEventual(map[string]*node{
+ "metropolis-fake-2": {
+ pubkey: key2.PublicKey().String(),
+ address: "1.2.3.6",
+ prefixes: []string{
+ "10.100.30.0/24", "10.100.40.0/24",
+ },
+ },
+ })
+}
+
+// TestWireguardImplementation makes sure localWireguard behaves as expected.
+func TestWireguardIntegration(t *testing.T) {
+ if os.Getenv("IN_KTEST") != "true" {
+ t.Skip("Not in ktest")
+ }
+
+ root := &localstorage.Root{}
+ tmp, err := os.MkdirTemp("", "clusternet")
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = declarative.PlaceFS(root, tmp)
+ if err != nil {
+ t.Fatal(err)
+ }
+ os.MkdirAll(root.Data.Kubernetes.ClusterNetworking.FullPath(), 0700)
+ wg := &localWireguard{}
+
+ // Ensure key once and make note of it.
+ if err := wg.ensureOnDiskKey(&root.Data.Kubernetes.ClusterNetworking); err != nil {
+ t.Fatalf("Could not ensure wireguard key: %v", err)
+ }
+ key := wg.key().String()
+ // Do it again, and make sure the key hasn't changed.
+ wg = &localWireguard{}
+ if err := wg.ensureOnDiskKey(&root.Data.Kubernetes.ClusterNetworking); err != nil {
+ t.Fatalf("Could not ensure wireguard key second time: %v", err)
+ }
+ if want, got := key, wg.key().String(); want != got {
+ t.Fatalf("Key changed, was %q, became %q", want, got)
+ }
+
+ // Setup the interface.
+ cnet := net.IPNet{
+ IP: net.IP([]byte{10, 10, 0, 0}),
+ Mask: net.IPv4Mask(255, 255, 0, 0),
+ }
+ if err := wg.setup(&cnet); err != nil {
+ t.Fatalf("Failed to setup interface: %v", err)
+ }
+ // Do it again.
+ wg.close()
+ if err := wg.setup(&cnet); err != nil {
+ t.Fatalf("Failed to setup interface second time: %v", err)
+ }
+
+ // Check that the key and listen port are configured correctly.
+ wgClient, err := wgctrl.New()
+ if err != nil {
+ t.Fatalf("Failed to create wireguard client: %v", err)
+ }
+ wgDev, err := wgClient.Device(clusterNetDeviceName)
+ if err != nil {
+ t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+ }
+ if want, got := key, wgDev.PrivateKey.String(); want != got {
+ t.Errorf("Wireguard key mismatch, wanted %q, got %q", want, got)
+ }
+ if want, got := int(common.WireGuardPort), wgDev.ListenPort; want != got {
+ t.Errorf("Wireguard port mismatch, wanted %d, got %d", want, got)
+ }
+
+ // Add some peers and check that we got them.
+ pkeys := make([]wgtypes.Key, 2)
+ pkeys[0], err = wgtypes.GeneratePrivateKey()
+ if err != nil {
+ t.Fatalf("Failed to generate private key: %v", err)
+ }
+ pkeys[1], err = wgtypes.GeneratePrivateKey()
+ if err != nil {
+ t.Fatalf("Failed to generate private key: %v", err)
+ }
+ err = wg.configurePeers([]*node{
+ {
+ pubkey: pkeys[0].PublicKey().String(),
+ address: "10.100.0.1",
+ prefixes: []string{
+ "10.0.0.0/24",
+ "10.0.1.0/24",
+ },
+ },
+ {
+ pubkey: pkeys[1].PublicKey().String(),
+ address: "10.100.1.1",
+ prefixes: []string{
+ "10.1.0.0/24",
+ "10.1.1.0/24",
+ },
+ },
+ })
+ if err != nil {
+ t.Fatalf("Configuring peers failed: %v", err)
+ }
+
+ wgDev, err = wgClient.Device(clusterNetDeviceName)
+ if err != nil {
+ t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+ }
+ if want, got := 2, len(wgDev.Peers); want != got {
+ t.Errorf("Wanted %d peers, got %d", want, got)
+ } else {
+ for i := 0; i < 2; i++ {
+ if want, got := pkeys[i].PublicKey().String(), wgDev.Peers[i].PublicKey.String(); want != got {
+ t.Errorf("Peer %d should have key %q, got %q", i, want, got)
+ }
+ if want, got := fmt.Sprintf("10.100.%d.1:%s", i, common.WireGuardPort.PortString()), wgDev.Peers[i].Endpoint.String(); want != got {
+ t.Errorf("Peer %d should have endpoint %q, got %q", i, want, got)
+ }
+ if want, got := 2, len(wgDev.Peers[i].AllowedIPs); want != got {
+ t.Errorf("Peer %d should have %d peers, got %d", i, want, got)
+ } else {
+ for j := 0; j < 2; j++ {
+ if want, got := fmt.Sprintf("10.%d.%d.0/24", i, j), wgDev.Peers[i].AllowedIPs[j].String(); want != got {
+ t.Errorf("Peer %d should have allowed ip %d %q, got %q", i, j, want, got)
+ }
+ }
+ }
+ }
+ }
+
+ // Update one of the peers and check that things got applied.
+ err = wg.configurePeers([]*node{
+ {
+ pubkey: pkeys[0].PublicKey().String(),
+ address: "10.100.0.3",
+ prefixes: []string{
+ "10.0.0.0/24",
+ },
+ },
+ })
+ if err != nil {
+ t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+ }
+ wgDev, err = wgClient.Device(clusterNetDeviceName)
+ if err != nil {
+ t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+ }
+ if want, got := 2, len(wgDev.Peers); want != got {
+ t.Errorf("Wanted %d peers, got %d", want, got)
+ } else {
+ if want, got := pkeys[0].PublicKey().String(), wgDev.Peers[0].PublicKey.String(); want != got {
+ t.Errorf("Peer 0 should have key %q, got %q", want, got)
+ }
+ if want, got := fmt.Sprintf("10.100.0.3:%s", common.WireGuardPort.PortString()), wgDev.Peers[0].Endpoint.String(); want != got {
+ t.Errorf("Peer 0 should have endpoint %q, got %q", want, got)
+ }
+ if want, got := 1, len(wgDev.Peers[0].AllowedIPs); want != got {
+ t.Errorf("Peer 0 should have %d peers, got %d", want, got)
+ } else {
+ if want, got := "10.0.0.0/24", wgDev.Peers[0].AllowedIPs[0].String(); want != got {
+ t.Errorf("Peer 0 should have allowed ip 0 %q, got %q", want, got)
+ }
+ }
+ }
+
+ // Remove one of the peers and make sure it's gone.
+ err = wg.unconfigurePeer(&node{
+ pubkey: pkeys[0].PublicKey().String(),
+ })
+ if err != nil {
+ t.Fatalf("Failed to unconfigure peer: %v", err)
+ }
+ err = wg.unconfigurePeer(&node{
+ pubkey: pkeys[0].PublicKey().String(),
+ })
+ if err != nil {
+ t.Fatalf("Failed to unconfigure peer a second time: %v", err)
+ }
+ wgDev, err = wgClient.Device(clusterNetDeviceName)
+ if err != nil {
+ t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+ }
+ if want, got := 1, len(wgDev.Peers); want != got {
+ t.Errorf("Wanted %d peer, got %d", want, got)
+ }
+}
diff --git a/metropolis/node/core/clusternet/types.go b/metropolis/node/core/clusternet/types.go
new file mode 100644
index 0000000..9cd1052
--- /dev/null
+++ b/metropolis/node/core/clusternet/types.go
@@ -0,0 +1,138 @@
+package clusternet
+
+import (
+ "context"
+ "net/netip"
+ "sort"
+ "strings"
+
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// Prefixes are network prefixes that should be announced by a node to the
+// Cluster Networking mesh.
+type Prefixes []netip.Prefix
+
+func (p Prefixes) proto() (res []*cpb.NodeClusterNetworking_Prefix) {
+ for _, prefix := range p {
+ res = append(res, &cpb.NodeClusterNetworking_Prefix{
+ Cidr: prefix.String(),
+ })
+ }
+ return
+}
+
+// node is used for internal statekeeping in the cluster networking service.
+type node struct {
+ id string
+ pubkey string
+ address string
+ prefixes []string
+}
+
+// update mutates this node to whatever data is held in the given proto Node, and
+// returns true if any data changed.
+func (n *node) update(p *apb.Node) (changed bool) {
+ if n.address != p.Status.ExternalAddress {
+ n.address = p.Status.ExternalAddress
+ changed = true
+ }
+ if n.pubkey != p.Clusternet.WireguardPubkey {
+ n.pubkey = p.Clusternet.WireguardPubkey
+ changed = true
+ }
+
+ var newPrefixes []string
+ for _, prefix := range p.Clusternet.Prefixes {
+ if prefix.Cidr == "" {
+ continue
+ }
+ newPrefixes = append(newPrefixes, prefix.Cidr)
+ }
+ oldPrefixes := make([]string, len(n.prefixes))
+ copy(oldPrefixes[:], n.prefixes)
+
+ sort.Strings(newPrefixes)
+ sort.Strings(oldPrefixes)
+ if want, got := strings.Join(newPrefixes, ","), strings.Join(oldPrefixes, ","); want != got {
+ n.prefixes = newPrefixes
+ changed = true
+ }
+
+ return
+}
+
+// nodeMap is the main internal statekeeping structure of the pull sub-runnable.
+type nodeMap struct {
+ nodes map[string]*node
+}
+
+func newNodemap() *nodeMap {
+ return &nodeMap{
+ nodes: make(map[string]*node),
+ }
+}
+
+// update updates the nodeMap from the given Curator WatchEvent, interpreting
+// both node changes and deletions. Two nodeMaps are returned: the first one
+// contains only nodes that have been added/changed by the given event, the other
+// contains only nodes that have been deleted by the given event.
+func (m *nodeMap) update(ctx context.Context, ev *apb.WatchEvent) (changed, removed map[string]*node) {
+ changed = make(map[string]*node)
+ removed = make(map[string]*node)
+
+ // Make sure we're not getting multiple nodes with the same public key. This is
+ // not expected to happen in practice as the Curator should prevent this from
+ // happening, but we at least want to make sure we're not blowing up routing if
+ // other defenses fail.
+
+ // pkeys maps from public key to node ID.
+ pkeys := make(map[string]string)
+ for _, n := range m.nodes {
+ // We don't have to check whether we have any collisions already in m.nodes, as
+ // the check below prevents them from happening in the first place.
+ pkeys[n.pubkey] = n.id
+ }
+
+ for _, n := range ev.Nodes {
+ // Only care about nodes that have all required configuration set.
+ if n.Status == nil || n.Status.ExternalAddress == "" || n.Clusternet == nil || n.Clusternet.WireguardPubkey == "" {
+ // We could attempt to delete any matching node currently in this nodemap at this
+ // point, but this is likely transient and we don't want to just kill routing for
+ // no reason.
+ continue
+ }
+
+ key := n.Clusternet.WireguardPubkey
+ if id, ok := pkeys[key]; ok && id != n.Id {
+ supervisor.Logger(ctx).Warningf("Nodes %q and %q share wireguard key %q. That should not have happened.", n.Id, id, key)
+ continue
+ }
+
+ if _, ok := m.nodes[n.Id]; !ok {
+ m.nodes[n.Id] = &node{
+ id: n.Id,
+ }
+ }
+ diff := m.nodes[n.Id].update(n)
+ if diff {
+ changed[n.Id] = m.nodes[n.Id]
+ }
+ }
+
+ for _, t := range ev.NodeTombstones {
+ n, ok := m.nodes[t.NodeId]
+ if !ok {
+ // This is an indication of us losing data somehow. If this happens, it likely
+ // means a Curator bug.
+ supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
+ continue
+ }
+ removed[n.id] = n
+ delete(m.nodes, n.id)
+ }
+
+ return
+}
diff --git a/metropolis/node/core/clusternet/types_test.go b/metropolis/node/core/clusternet/types_test.go
new file mode 100644
index 0000000..8f58b4e
--- /dev/null
+++ b/metropolis/node/core/clusternet/types_test.go
@@ -0,0 +1,104 @@
+package clusternet
+
+import (
+ "testing"
+
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+func TestNodeUpdate(t *testing.T) {
+ var n node
+
+ for i, te := range []struct {
+ p *apb.Node
+ want bool
+ }{
+ // Case 0: empty incoming node, no change.
+ {
+ p: &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{}},
+ want: false,
+ },
+ // Case 1: wireguard key set.
+ {
+ p: &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake"}},
+ want: true,
+ },
+ // Case 2: wireguard key updated.
+ {
+ p: &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
+ want: true,
+ },
+ // Case 3: external address added.
+ {
+ p: &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.4"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
+ want: true,
+ },
+ // Case 4: external address changed.
+ {
+ p: &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
+ want: true,
+ },
+ // Case 5: prefixes added
+ {
+ p: &apb.Node{
+ Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: "fake2",
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.0.2.0/24"},
+ {Cidr: "10.0.1.0/24"},
+ },
+ },
+ },
+ want: true,
+ },
+ // Case 6: prefixes changed
+ {
+ p: &apb.Node{
+ Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: "fake2",
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.0.3.0/24"},
+ {Cidr: "10.0.1.0/24"},
+ },
+ },
+ },
+ want: true,
+ },
+ // Case 6: prefixes reordered (no change expected)
+ {
+ p: &apb.Node{
+ Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: "fake2",
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.0.3.0/24"},
+ {Cidr: "10.0.1.0/24"},
+ },
+ },
+ },
+ want: false,
+ },
+ // Case 6: prefixes removed
+ {
+ p: &apb.Node{
+ Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: "fake2",
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{},
+ },
+ },
+ want: true,
+ },
+ } {
+ got := n.update(te.p)
+ if te.want && !got {
+ t.Fatalf("Case %d: expected change, got no change", i)
+ }
+ if !te.want && got {
+ t.Fatalf("Case %d: expected no change, got change", i)
+ }
+ }
+}
diff --git a/metropolis/node/core/clusternet/wireguard.go b/metropolis/node/core/clusternet/wireguard.go
new file mode 100644
index 0000000..9ce6d49
--- /dev/null
+++ b/metropolis/node/core/clusternet/wireguard.go
@@ -0,0 +1,197 @@
+package clusternet
+
+import (
+ "fmt"
+ "net"
+ "os"
+
+ "github.com/vishvananda/netlink"
+ "golang.zx2c4.com/wireguard/wgctrl"
+ "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
+
+ common "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+)
+
+const (
+ // clusterNetDevicename is the name of the WireGuard interface that will be
+ // created in the host network namespace.
+ clusterNetDeviceName = "clusternet"
+)
+
+// wireguard decouples the cluster networking service from actual mutations
+// performed in the local Linux networking namespace. This is mostly done to help
+// in testing the cluster networking service.
+//
+// Because it's effectively just a mockable interface, see the actual
+// localWireguard method implementations for documentation.
+type wireguard interface {
+ ensureOnDiskKey(dir *localstorage.DataKubernetesClusterNetworkingDirectory) error
+ setup(clusterNet *net.IPNet) error
+ configurePeers(n []*node) error
+ unconfigurePeer(n *node) error
+ key() wgtypes.Key
+ close()
+}
+
+type localWireguard struct {
+ wgClient *wgctrl.Client
+ privKey wgtypes.Key
+}
+
+// ensureOnDiskKey loads the private key from disk or (if none exists) generates
+// one and persists it. The resulting key is then saved into the localWireguard
+// instance.
+func (s *localWireguard) ensureOnDiskKey(dir *localstorage.DataKubernetesClusterNetworkingDirectory) error {
+ keyRaw, err := dir.Key.Read()
+ if os.IsNotExist(err) {
+ key, err := wgtypes.GeneratePrivateKey()
+ if err != nil {
+ return fmt.Errorf("when generating key: %w", err)
+ }
+ if err := dir.Key.Write([]byte(key.String()), 0600); err != nil {
+ return fmt.Errorf("save failed: %w", err)
+ }
+ s.privKey = key
+ return nil
+ } else if err != nil {
+ return fmt.Errorf("load failed: %w", err)
+ }
+
+ key, err := wgtypes.ParseKey(string(keyRaw))
+ if err != nil {
+ return fmt.Errorf("invalid private key in file: %w", err)
+ }
+ s.privKey = key
+ return nil
+}
+
+// setup the local network namespace by creating a WireGuard interface and adding
+// a clusterNet route to it. If a matching WireGuard interface already exists in
+// the system, it is first deleted.
+//
+// ensureOnDiskKey must be called before calling this function.
+func (s *localWireguard) setup(clusterNet *net.IPNet) error {
+ links, err := netlink.LinkList()
+ if err != nil {
+ return fmt.Errorf("could not list links: %w", err)
+ }
+ for _, link := range links {
+ if link.Attrs().Name != clusterNetDeviceName {
+ continue
+ }
+ if err := netlink.LinkDel(link); err != nil {
+ return fmt.Errorf("could not remove existing clusternet link: %w", err)
+ }
+ }
+
+ wgInterface := &netlink.Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
+ if err := netlink.LinkAdd(wgInterface); err != nil {
+ return fmt.Errorf("when adding network interface: %w", err)
+ }
+
+ wgClient, err := wgctrl.New()
+ if err != nil {
+ return fmt.Errorf("when creating wireguard client: %w", err)
+ }
+ s.wgClient = wgClient
+
+ listenPort := int(common.WireGuardPort)
+ if err := s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ PrivateKey: &s.privKey,
+ ListenPort: &listenPort,
+ }); err != nil {
+ return fmt.Errorf("when setting up device: %w", err)
+ }
+
+ if err := netlink.RouteAdd(&netlink.Route{
+ Dst: clusterNet,
+ LinkIndex: wgInterface.Index,
+ Protocol: common.ProtocolClusternet,
+ }); err != nil && !os.IsExist(err) {
+ return fmt.Errorf("when creating cluster route: %w", err)
+ }
+ return nil
+}
+
+// configurePeers creates or updates a peers on the local wireguard interface
+// based on the given nodes.
+//
+// If any node is somehow invalid and causes a parse/reconfiguration error, the
+// function will return an error. The caller should retry with a different set of
+// nodes, performing search/bisection on its own.
+func (s *localWireguard) configurePeers(nodes []*node) error {
+ var configs []wgtypes.PeerConfig
+
+ for i, n := range nodes {
+ if s.privKey.PublicKey().String() == n.pubkey {
+ // Node doesn't need to connect to itself
+ continue
+ }
+ pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+ if err != nil {
+ return fmt.Errorf("node %d: failed to parse public-key %q: %w", i, n.pubkey, err)
+ }
+ addressParsed := net.ParseIP(n.address)
+ if addressParsed == nil {
+ return fmt.Errorf("node %d: failed to parse address %q: %w", i, n.address, err)
+ }
+ var allowedIPs []net.IPNet
+ for _, prefix := range n.prefixes {
+ _, podNet, err := net.ParseCIDR(prefix)
+ if err != nil {
+ // Just eat the parse error. Not much we can do here. We have enough validation
+ // in the rest of the system that we shouldn't ever reach this.
+ continue
+ }
+ allowedIPs = append(allowedIPs, *podNet)
+ }
+ endpoint := net.UDPAddr{Port: int(common.WireGuardPort), IP: addressParsed}
+ configs = append(configs, wgtypes.PeerConfig{
+ PublicKey: pubkeyParsed,
+ Endpoint: &endpoint,
+ ReplaceAllowedIPs: true,
+ AllowedIPs: allowedIPs,
+ })
+ }
+
+ err := s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ Peers: configs,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to configure WireGuard peers: %w", err)
+ }
+ return nil
+}
+
+// unconfigurePeer removes the peer from the local WireGuard interface based on
+// the given node. If no peer existed matching the given node, this operation is
+// a no-op.
+func (s *localWireguard) unconfigurePeer(n *node) error {
+ pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+ if err != nil {
+ return fmt.Errorf("failed to parse public-key %q: %w", n.pubkey, err)
+ }
+
+ err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ Peers: []wgtypes.PeerConfig{{
+ PublicKey: pubkeyParsed,
+ Remove: true,
+ }},
+ })
+ if err != nil {
+ return fmt.Errorf("failed to delete WireGuard peer: %w", err)
+ }
+ return nil
+}
+
+func (s *localWireguard) key() wgtypes.Key {
+ return s.privKey
+}
+
+// close cleans up after the wireguard client, but does _not_ remove the
+// interface or peers.
+func (s *localWireguard) close() {
+ s.wgClient.Close()
+ s.wgClient = nil
+}
diff --git a/metropolis/node/net_protocols.go b/metropolis/node/net_protocols.go
new file mode 100644
index 0000000..ac43cbf
--- /dev/null
+++ b/metropolis/node/net_protocols.go
@@ -0,0 +1,11 @@
+package node
+
+import "github.com/vishvananda/netlink"
+
+// These are netlink protocol numbers used internally for various netlink
+// resource (e.g. route) owners/manager.
+const (
+ // ProtocolClusternet is used by //metropolis/node/core/clusternet when
+ // creating/removing routes pointing to the clusternet interface.
+ ProtocolClusternet netlink.RouteProtocol = 129
+)