m/n/c/curator: implement cluster networking storage

This is just the Curator side, the client implementation will come in a
subsequent change.

Change-Id: I4a9b5ad5c77662e11122d0a1cea22d80ecfe4299
Reviewed-on: https://review.monogon.dev/c/monogon/+/1413
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index bb30bbe..24c2d97 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -10,6 +10,7 @@
         "impl_leader.go",
         "impl_leader_aaa.go",
         "impl_leader_certificates.go",
+        "impl_leader_cluster_networking.go",
         "impl_leader_curator.go",
         "impl_leader_management.go",
         "listener.go",
@@ -39,6 +40,7 @@
         "@com_github_google_cel_go//cel:go_default_library",
         "@com_github_google_cel_go//checker/decls:go_default_library",
         "@com_github_google_cel_go//common/types:go_default_library",
+        "@com_zx2c4_golang_wireguard_wgctrl//wgtypes",
         "@go_googleapis//google/api/expr/v1alpha1:expr_go_proto",
         "@io_etcd_go_etcd_client_v3//:client",
         "@io_etcd_go_etcd_client_v3//concurrency",
diff --git a/metropolis/node/core/curator/impl_leader_cluster_networking.go b/metropolis/node/core/curator/impl_leader_cluster_networking.go
new file mode 100644
index 0000000..51d813a
--- /dev/null
+++ b/metropolis/node/core/curator/impl_leader_cluster_networking.go
@@ -0,0 +1,100 @@
+package curator
+
+import (
+	"context"
+	"net/netip"
+
+	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+
+	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/node/core/rpc"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+func (l *leaderCurator) UpdateNodeClusterNetworking(ctx context.Context, req *ipb.UpdateNodeClusterNetworkingRequest) (*ipb.UpdateNodeClusterNetworkingResponse, error) {
+	// Ensure that the given node_id matches the calling node. We currently
+	// only allow for direct self-reporting of status by nodes.
+	pi := rpc.GetPeerInfo(ctx)
+	if pi == nil || pi.Node == nil {
+		return nil, status.Error(codes.PermissionDenied, "only nodes can update node cluster networking")
+	}
+	id := identity.NodeID(pi.Node.PublicKey)
+
+	if req.Clusternet == nil {
+		return nil, status.Error(codes.InvalidArgument, "clusternet must be set")
+	}
+	cn := req.Clusternet
+	if cn.WireguardPubkey == "" {
+		return nil, status.Error(codes.InvalidArgument, "clusternet.wireguard_pubkey must be set")
+	}
+	_, err := wgtypes.ParseKey(cn.WireguardPubkey)
+	if err != nil {
+		return nil, status.Error(codes.InvalidArgument, "clusternet.wireguard_pubkey must be a valid wireguard public key")
+	}
+
+	// TODO(q3k): unhardcode this and synchronize with Kubernetes code.
+	clusterNet := netip.MustParsePrefix("10.0.0.0/16")
+
+	// Update node with new clusternetworking data. We're doing a load/modify/store,
+	// so lock here.
+	l.muNodes.Lock()
+	defer l.muNodes.Unlock()
+
+	// Retrieve node ...
+	node, err := nodeLoad(ctx, l.leadership, id)
+	if err != nil {
+		return nil, err
+	}
+
+	if node.status == nil {
+		return nil, status.Error(codes.FailedPrecondition, "node needs to submit at least one status update")
+	}
+	externalIP := node.status.ExternalAddress
+
+	// Parse/validate given prefixes.
+	var prefixes []netip.Prefix
+	for i, prefix := range cn.Prefixes {
+		// Parse them.
+		p, err := netip.ParsePrefix(prefix.Cidr)
+		if err != nil {
+			return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr invalid: %v", i, err)
+		}
+
+		// Make sure they're in canonical form.
+		masked := p.Masked()
+		if masked.String() != p.String() {
+			return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr (%s) must be in canonical format (ie. all address bits within the subnet must be zero)", i, p.String())
+		}
+
+		// Make sure they're fully contained within clusterNet or are the /32 of a node's
+		// externalIP.
+
+		okay := false
+		if clusterNet.Contains(p.Addr()) && p.Bits() >= clusterNet.Bits() {
+			okay = true
+		}
+		if p.IsSingleIP() && p.Addr().String() == externalIP {
+			okay = true
+		}
+
+		if !okay {
+			return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr (%s) must be fully contained within cluster network (%s) or be the node's external IP (%s)", i, p.String(), clusterNet.String(), externalIP)
+		}
+
+		prefixes = append(prefixes, p)
+
+	}
+
+	// ... update its' clusternetworking bits ...
+	node.wireguardKey = cn.WireguardPubkey
+	node.networkPrefixes = prefixes
+	// ... and save it to etcd.
+	if err := nodeSave(ctx, l.leadership, node); err != nil {
+		return nil, err
+	}
+
+	return &ipb.UpdateNodeClusterNetworkingResponse{}, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 869cfd9..e5b6f57 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -198,10 +198,12 @@
 // WatchEvent, either a Node or NodeTombstone.
 func (kv nodeAtID) appendToEvent(ev *ipb.WatchEvent) {
 	if node := kv.value; node != nil {
+		np := node.proto()
 		ev.Nodes = append(ev.Nodes, &ipb.Node{
-			Id:     node.ID(),
-			Roles:  node.proto().Roles,
-			Status: node.status,
+			Id:         node.ID(),
+			Roles:      np.Roles,
+			Status:     np.Status,
+			Clusternet: np.Clusternet,
 		})
 	} else {
 		ev.NodeTombstones = append(ev.NodeTombstones, &ipb.WatchEvent_NodeTombstone{
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index cb87024..fcb3e1d 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -10,6 +10,7 @@
 	"encoding/hex"
 	"io"
 	"net"
+	"strings"
 	"testing"
 	"time"
 
@@ -1293,3 +1294,132 @@
 		t.Errorf("Certificate has been issued again for a different pubkey")
 	}
 }
+
+// TestUpdateNodeClusterNetworking exercises the validation and mutation
+// functionality of the UpdateNodeClusterNetworkship implementation in the
+// curator leader.
+func TestUpdateNodeClusterNetworking(t *testing.T) {
+	cl := fakeLeader(t)
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	cur := ipb.NewCuratorClient(cl.localNodeConn)
+	// Update the node's external address as it's used in tests.
+	_, err := cur.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
+		NodeId: cl.localNodeID,
+		Status: &cpb.NodeStatus{
+			ExternalAddress: "203.0.113.43",
+		},
+	})
+	if err != nil {
+		t.Errorf("Failed to update node external address: %v", err)
+	}
+
+	// Run a few table tests, but end up with an update that went through.
+	for i, te := range []struct {
+		req     *ipb.UpdateNodeClusterNetworkingRequest
+		wantErr string
+	}{
+		{&ipb.UpdateNodeClusterNetworkingRequest{}, "clusternet must be set"},
+		{&ipb.UpdateNodeClusterNetworkingRequest{
+			Clusternet: &cpb.NodeClusterNetworking{},
+		}, "wireguard_pubkey must be set"},
+		{&ipb.UpdateNodeClusterNetworkingRequest{
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: "beep boop i'm a key",
+			},
+		}, "must be a valid wireguard"},
+		{&ipb.UpdateNodeClusterNetworkingRequest{
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
+			},
+		}, ""},
+		{&ipb.UpdateNodeClusterNetworkingRequest{
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: ""},
+				},
+			},
+		}, "no '/'"},
+		{&ipb.UpdateNodeClusterNetworkingRequest{
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.0.0.128/16"},
+				},
+			},
+		}, "must be in canonical format"},
+		{&ipb.UpdateNodeClusterNetworkingRequest{
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					// Prefix outside of cluster net should not be allowed.
+					{Cidr: "10.0.0.0/15"},
+				},
+			},
+		}, "must be fully contained"},
+		{&ipb.UpdateNodeClusterNetworkingRequest{
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					// Random /32 should not be allowed.
+					{Cidr: "8.8.8.8/32"},
+				},
+			},
+		}, "must be fully contained"},
+		{&ipb.UpdateNodeClusterNetworkingRequest{
+			Clusternet: &cpb.NodeClusterNetworking{
+				WireguardPubkey: "GaNXuc/yl8IaXduX6PQ+ZxIG4HtBACubHrRI7rqfA20=",
+				Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+					{Cidr: "10.0.0.0/24"},
+					// Yes, this is allowed.
+					{Cidr: "10.0.0.0/16"},
+					{Cidr: "10.0.12.23/32"},
+					// External address should be allowed.
+					{Cidr: "203.0.113.43/32"},
+				},
+			},
+		}, ""},
+	} {
+		_, err := cur.UpdateNodeClusterNetworking(ctx, te.req)
+		if te.wantErr != "" {
+			if !strings.Contains(err.Error(), te.wantErr) {
+				t.Errorf("case %d: error should've contained %q, got %q", i, te.wantErr, err.Error())
+			}
+		} else {
+			if err != nil {
+				t.Errorf("case %d: should've passed, got: %v", i, err)
+			}
+		}
+	}
+
+	// Make sure the last request actually ended up in a node mutation.
+	w, err := cur.Watch(ctx, &ipb.WatchRequest{
+		Kind: &ipb.WatchRequest_NodeInCluster_{
+			NodeInCluster: &ipb.WatchRequest_NodeInCluster{
+				NodeId: cl.localNodeID,
+			},
+		},
+	})
+	if err != nil {
+		t.Error(err)
+	}
+	ev, err := w.Recv()
+	if err != nil {
+		t.Error(err)
+	}
+	cn := ev.Nodes[0].Clusternet
+	if want, got := "GaNXuc/yl8IaXduX6PQ+ZxIG4HtBACubHrRI7rqfA20=", cn.WireguardPubkey; want != got {
+		t.Errorf("Wrong wireguard key: wanted %q, got %q", want, got)
+	}
+	if want, got := 4, len(cn.Prefixes); want != got {
+		t.Errorf("Wanted %d prefixes, got %d", want, got)
+	} else {
+		for i, want := range []string{"10.0.0.0/24", "10.0.0.0/16", "10.0.12.23/32", "203.0.113.43/32"} {
+			if got := cn.Prefixes[i].Cidr; want != got {
+				t.Errorf("Prefix %d should be %q, got %q", i, want, got)
+			}
+		}
+	}
+}
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index 32fb2d1..a035b9d 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -130,6 +130,29 @@
         option (metropolis.proto.ext.authorization) = {
         };
     }
+
+    // UpdateNodeClusterNetworking is called by nodes when their local Cluster
+    // Networking state changes, ie. they are using a new WireGuard key or have
+    // new prefixes to announce. All per-node cluster networking information is
+    // then available for other nodes to consume. This information is then used
+    // to build up the WireGuard-based cluster networking mesh.
+    //
+    // The Curator performs basic validation on the submitted prefixes. It makes
+    // sure the submitted prefixes are either:
+    // 1. fully contained within the cluster's Kubernetes pod network, or
+    // 2. a /32 equal to a node's external IP address, which allows communicating
+    //    from node IPs to pods.
+    //
+    // These prefixes are always added to allowedIPs on WireGuard peers, but not
+    // automatically added as routes on the nodes. Instead, only the Kubernetes
+    // cluster network is programmed into the routing table, which catches all
+    // the prefixes of type 1. Prefixes of type 2. are only used to allow
+    // incoming traffic from nodes into pods.
+    rpc UpdateNodeClusterNetworking(UpdateNodeClusterNetworkingRequest) returns (UpdateNodeClusterNetworkingResponse) {
+        option (metropolis.proto.ext.authorization) = {
+            need: PERMISSION_UPDATE_NODE_SELF
+        };
+    }
 }
 
 // Node is the state and configuration of a node in the cluster.
@@ -140,6 +163,8 @@
     metropolis.proto.common.NodeRoles roles = 2;
     // Last reported status of the node, if available.
     metropolis.proto.common.NodeStatus status = 3;
+    // Cluster networking configuration/status of node, if available.
+    metropolis.proto.common.NodeClusterNetworking clusternet = 4;
 };
 
 // WatchRequest specifies what data the caller is interested in. This influences
@@ -345,4 +370,14 @@
     oneof kind {
         KubernetesWorker kubernetes_worker = 1;
     };
+}
+
+message UpdateNodeClusterNetworkingRequest {
+    // Details of the clusternet configuration/state of the node. Whatever is
+    // currently configured in the Curatoor will be fully replaced by data
+    // contained with this field, after validation.
+    metropolis.proto.common.NodeClusterNetworking clusternet = 1;
+}
+
+message UpdateNodeClusterNetworkingResponse {
 }
\ No newline at end of file
diff --git a/metropolis/node/core/curator/proto/private/storage.proto b/metropolis/node/core/curator/proto/private/storage.proto
index c4a876c..8344041 100644
--- a/metropolis/node/core/curator/proto/private/storage.proto
+++ b/metropolis/node/core/curator/proto/private/storage.proto
@@ -30,6 +30,11 @@
     // join_key is an ED25519 public key used to authenticate the join
     // operation. It's generated by the node during the registration process.
     bytes join_key = 6;
+
+    // clusternet, if set, is a node's Cluster Networking configuration. See
+    // metropolis.node.core.curator.api.Curator.UpdateNodeClusterNetworking for
+    // more details.
+    metropolis.proto.common.NodeClusterNetworking clusternet = 7;
 }
 
 // Information about the cluster owner, currently the only Metropolis management
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
index f2ee5c3..2250ce1 100644
--- a/metropolis/node/core/curator/state_node.go
+++ b/metropolis/node/core/curator/state_node.go
@@ -21,6 +21,7 @@
 	"crypto/x509"
 	"encoding/hex"
 	"fmt"
+	"net/netip"
 
 	clientv3 "go.etcd.io/etcd/client/v3"
 	"google.golang.org/grpc/codes"
@@ -87,6 +88,15 @@
 	// kubernetesWorker is set if this node is a Kubernetes worker, ie. running the
 	// Kubernetes dataplane which runs user workloads.
 	kubernetesWorker *NodeRoleKubernetesWorker
+
+	// wireguardKey, if set, is the Wireguard key of the node's cluster networking
+	// setup.
+	wireguardKey string
+	// networkingPrefixes are all the network routes exported by the node into the
+	// cluster networking mesh. All of them will be programmed as allowedIPs into a
+	// wireguard peer, but only the pod network will have a single large route
+	// installed into the host routing table.
+	networkPrefixes []netip.Prefix
 }
 
 // NewNodeForBootstrap creates a brand new node without regard for any other
@@ -255,6 +265,18 @@
 			Peers:           peers,
 		}
 	}
+	if n.wireguardKey != "" {
+		var prefixes []*cpb.NodeClusterNetworking_Prefix
+		for _, prefix := range n.networkPrefixes {
+			prefixes = append(prefixes, &cpb.NodeClusterNetworking_Prefix{
+				Cidr: prefix.String(),
+			})
+		}
+		msg.Clusternet = &cpb.NodeClusterNetworking{
+			WireguardPubkey: n.wireguardKey,
+			Prefixes:        prefixes,
+		}
+	}
 	return msg
 }
 
@@ -306,6 +328,21 @@
 			Peers: peers,
 		}
 	}
+	if cn := msg.Clusternet; cn != nil {
+		n.wireguardKey = cn.WireguardPubkey
+		for _, prefix := range cn.Prefixes {
+			if prefix.Cidr == "" {
+				continue
+			}
+			nip, err := netip.ParsePrefix(prefix.Cidr)
+			if err != nil {
+				// Eat error. When we serialize this back into a node, the invalid record will
+				// just be removed.
+				continue
+			}
+			n.networkPrefixes = append(n.networkPrefixes, nip)
+		}
+	}
 	return n, nil
 }
 
diff --git a/metropolis/proto/common/common.proto b/metropolis/proto/common/common.proto
index 1c01834..183f790 100644
--- a/metropolis/proto/common/common.proto
+++ b/metropolis/proto/common/common.proto
@@ -175,3 +175,17 @@
     };
     repeated Node nodes = 1;
 }
+
+
+// NodeClusterNetworking carries information about the cluster networking (ie.
+// WireGuard mesh) connectivity of a node.
+message NodeClusterNetworking {
+    message Prefix {
+        string cidr = 1;
+    }
+    // wireguard_pubkey is the base64-encoded public key used by the node.
+    string wireguard_pubkey = 1;
+    // prefixes are networking routes exported by the node to the cluster networking
+    // mesh, and are programmed by other nodes into their wireguard peer config.
+    repeated Prefix prefixes = 2;
+}