m/n/c/curator: implement cluster networking storage
This is just the Curator side, the client implementation will come in a
subsequent change.
Change-Id: I4a9b5ad5c77662e11122d0a1cea22d80ecfe4299
Reviewed-on: https://review.monogon.dev/c/monogon/+/1413
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index bb30bbe..24c2d97 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -10,6 +10,7 @@
"impl_leader.go",
"impl_leader_aaa.go",
"impl_leader_certificates.go",
+ "impl_leader_cluster_networking.go",
"impl_leader_curator.go",
"impl_leader_management.go",
"listener.go",
@@ -39,6 +40,7 @@
"@com_github_google_cel_go//cel:go_default_library",
"@com_github_google_cel_go//checker/decls:go_default_library",
"@com_github_google_cel_go//common/types:go_default_library",
+ "@com_zx2c4_golang_wireguard_wgctrl//wgtypes",
"@go_googleapis//google/api/expr/v1alpha1:expr_go_proto",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
diff --git a/metropolis/node/core/curator/impl_leader_cluster_networking.go b/metropolis/node/core/curator/impl_leader_cluster_networking.go
new file mode 100644
index 0000000..51d813a
--- /dev/null
+++ b/metropolis/node/core/curator/impl_leader_cluster_networking.go
@@ -0,0 +1,100 @@
+package curator
+
+import (
+ "context"
+ "net/netip"
+
+ "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/rpc"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+func (l *leaderCurator) UpdateNodeClusterNetworking(ctx context.Context, req *ipb.UpdateNodeClusterNetworkingRequest) (*ipb.UpdateNodeClusterNetworkingResponse, error) {
+ // Ensure that the given node_id matches the calling node. We currently
+ // only allow for direct self-reporting of status by nodes.
+ pi := rpc.GetPeerInfo(ctx)
+ if pi == nil || pi.Node == nil {
+ return nil, status.Error(codes.PermissionDenied, "only nodes can update node cluster networking")
+ }
+ id := identity.NodeID(pi.Node.PublicKey)
+
+ if req.Clusternet == nil {
+ return nil, status.Error(codes.InvalidArgument, "clusternet must be set")
+ }
+ cn := req.Clusternet
+ if cn.WireguardPubkey == "" {
+ return nil, status.Error(codes.InvalidArgument, "clusternet.wireguard_pubkey must be set")
+ }
+ _, err := wgtypes.ParseKey(cn.WireguardPubkey)
+ if err != nil {
+ return nil, status.Error(codes.InvalidArgument, "clusternet.wireguard_pubkey must be a valid wireguard public key")
+ }
+
+ // TODO(q3k): unhardcode this and synchronize with Kubernetes code.
+ clusterNet := netip.MustParsePrefix("10.0.0.0/16")
+
+ // Update node with new clusternetworking data. We're doing a load/modify/store,
+ // so lock here.
+ l.muNodes.Lock()
+ defer l.muNodes.Unlock()
+
+ // Retrieve node ...
+ node, err := nodeLoad(ctx, l.leadership, id)
+ if err != nil {
+ return nil, err
+ }
+
+ if node.status == nil {
+ return nil, status.Error(codes.FailedPrecondition, "node needs to submit at least one status update")
+ }
+ externalIP := node.status.ExternalAddress
+
+ // Parse/validate given prefixes.
+ var prefixes []netip.Prefix
+ for i, prefix := range cn.Prefixes {
+ // Parse them.
+ p, err := netip.ParsePrefix(prefix.Cidr)
+ if err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr invalid: %v", i, err)
+ }
+
+ // Make sure they're in canonical form.
+ masked := p.Masked()
+ if masked.String() != p.String() {
+ return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr (%s) must be in canonical format (ie. all address bits within the subnet must be zero)", i, p.String())
+ }
+
+ // Make sure they're fully contained within clusterNet or are the /32 of a node's
+ // externalIP.
+
+ okay := false
+ if clusterNet.Contains(p.Addr()) && p.Bits() >= clusterNet.Bits() {
+ okay = true
+ }
+ if p.IsSingleIP() && p.Addr().String() == externalIP {
+ okay = true
+ }
+
+ if !okay {
+ return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr (%s) must be fully contained within cluster network (%s) or be the node's external IP (%s)", i, p.String(), clusterNet.String(), externalIP)
+ }
+
+ prefixes = append(prefixes, p)
+
+ }
+
+ // ... update its' clusternetworking bits ...
+ node.wireguardKey = cn.WireguardPubkey
+ node.networkPrefixes = prefixes
+ // ... and save it to etcd.
+ if err := nodeSave(ctx, l.leadership, node); err != nil {
+ return nil, err
+ }
+
+ return &ipb.UpdateNodeClusterNetworkingResponse{}, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 869cfd9..e5b6f57 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -198,10 +198,12 @@
// WatchEvent, either a Node or NodeTombstone.
func (kv nodeAtID) appendToEvent(ev *ipb.WatchEvent) {
if node := kv.value; node != nil {
+ np := node.proto()
ev.Nodes = append(ev.Nodes, &ipb.Node{
- Id: node.ID(),
- Roles: node.proto().Roles,
- Status: node.status,
+ Id: node.ID(),
+ Roles: np.Roles,
+ Status: np.Status,
+ Clusternet: np.Clusternet,
})
} else {
ev.NodeTombstones = append(ev.NodeTombstones, &ipb.WatchEvent_NodeTombstone{
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index cb87024..fcb3e1d 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -10,6 +10,7 @@
"encoding/hex"
"io"
"net"
+ "strings"
"testing"
"time"
@@ -1293,3 +1294,132 @@
t.Errorf("Certificate has been issued again for a different pubkey")
}
}
+
+// TestUpdateNodeClusterNetworking exercises the validation and mutation
+// functionality of the UpdateNodeClusterNetworkship implementation in the
+// curator leader.
+func TestUpdateNodeClusterNetworking(t *testing.T) {
+ cl := fakeLeader(t)
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ cur := ipb.NewCuratorClient(cl.localNodeConn)
+ // Update the node's external address as it's used in tests.
+ _, err := cur.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
+ NodeId: cl.localNodeID,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "203.0.113.43",
+ },
+ })
+ if err != nil {
+ t.Errorf("Failed to update node external address: %v", err)
+ }
+
+ // Run a few table tests, but end up with an update that went through.
+ for i, te := range []struct {
+ req *ipb.UpdateNodeClusterNetworkingRequest
+ wantErr string
+ }{
+ {&ipb.UpdateNodeClusterNetworkingRequest{}, "clusternet must be set"},
+ {&ipb.UpdateNodeClusterNetworkingRequest{
+ Clusternet: &cpb.NodeClusterNetworking{},
+ }, "wireguard_pubkey must be set"},
+ {&ipb.UpdateNodeClusterNetworkingRequest{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: "beep boop i'm a key",
+ },
+ }, "must be a valid wireguard"},
+ {&ipb.UpdateNodeClusterNetworkingRequest{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
+ },
+ }, ""},
+ {&ipb.UpdateNodeClusterNetworkingRequest{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: ""},
+ },
+ },
+ }, "no '/'"},
+ {&ipb.UpdateNodeClusterNetworkingRequest{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.0.0.128/16"},
+ },
+ },
+ }, "must be in canonical format"},
+ {&ipb.UpdateNodeClusterNetworkingRequest{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ // Prefix outside of cluster net should not be allowed.
+ {Cidr: "10.0.0.0/15"},
+ },
+ },
+ }, "must be fully contained"},
+ {&ipb.UpdateNodeClusterNetworkingRequest{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ // Random /32 should not be allowed.
+ {Cidr: "8.8.8.8/32"},
+ },
+ },
+ }, "must be fully contained"},
+ {&ipb.UpdateNodeClusterNetworkingRequest{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: "GaNXuc/yl8IaXduX6PQ+ZxIG4HtBACubHrRI7rqfA20=",
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.0.0.0/24"},
+ // Yes, this is allowed.
+ {Cidr: "10.0.0.0/16"},
+ {Cidr: "10.0.12.23/32"},
+ // External address should be allowed.
+ {Cidr: "203.0.113.43/32"},
+ },
+ },
+ }, ""},
+ } {
+ _, err := cur.UpdateNodeClusterNetworking(ctx, te.req)
+ if te.wantErr != "" {
+ if !strings.Contains(err.Error(), te.wantErr) {
+ t.Errorf("case %d: error should've contained %q, got %q", i, te.wantErr, err.Error())
+ }
+ } else {
+ if err != nil {
+ t.Errorf("case %d: should've passed, got: %v", i, err)
+ }
+ }
+ }
+
+ // Make sure the last request actually ended up in a node mutation.
+ w, err := cur.Watch(ctx, &ipb.WatchRequest{
+ Kind: &ipb.WatchRequest_NodeInCluster_{
+ NodeInCluster: &ipb.WatchRequest_NodeInCluster{
+ NodeId: cl.localNodeID,
+ },
+ },
+ })
+ if err != nil {
+ t.Error(err)
+ }
+ ev, err := w.Recv()
+ if err != nil {
+ t.Error(err)
+ }
+ cn := ev.Nodes[0].Clusternet
+ if want, got := "GaNXuc/yl8IaXduX6PQ+ZxIG4HtBACubHrRI7rqfA20=", cn.WireguardPubkey; want != got {
+ t.Errorf("Wrong wireguard key: wanted %q, got %q", want, got)
+ }
+ if want, got := 4, len(cn.Prefixes); want != got {
+ t.Errorf("Wanted %d prefixes, got %d", want, got)
+ } else {
+ for i, want := range []string{"10.0.0.0/24", "10.0.0.0/16", "10.0.12.23/32", "203.0.113.43/32"} {
+ if got := cn.Prefixes[i].Cidr; want != got {
+ t.Errorf("Prefix %d should be %q, got %q", i, want, got)
+ }
+ }
+ }
+}
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index 32fb2d1..a035b9d 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -130,6 +130,29 @@
option (metropolis.proto.ext.authorization) = {
};
}
+
+ // UpdateNodeClusterNetworking is called by nodes when their local Cluster
+ // Networking state changes, ie. they are using a new WireGuard key or have
+ // new prefixes to announce. All per-node cluster networking information is
+ // then available for other nodes to consume. This information is then used
+ // to build up the WireGuard-based cluster networking mesh.
+ //
+ // The Curator performs basic validation on the submitted prefixes. It makes
+ // sure the submitted prefixes are either:
+ // 1. fully contained within the cluster's Kubernetes pod network, or
+ // 2. a /32 equal to a node's external IP address, which allows communicating
+ // from node IPs to pods.
+ //
+ // These prefixes are always added to allowedIPs on WireGuard peers, but not
+ // automatically added as routes on the nodes. Instead, only the Kubernetes
+ // cluster network is programmed into the routing table, which catches all
+ // the prefixes of type 1. Prefixes of type 2. are only used to allow
+ // incoming traffic from nodes into pods.
+ rpc UpdateNodeClusterNetworking(UpdateNodeClusterNetworkingRequest) returns (UpdateNodeClusterNetworkingResponse) {
+ option (metropolis.proto.ext.authorization) = {
+ need: PERMISSION_UPDATE_NODE_SELF
+ };
+ }
}
// Node is the state and configuration of a node in the cluster.
@@ -140,6 +163,8 @@
metropolis.proto.common.NodeRoles roles = 2;
// Last reported status of the node, if available.
metropolis.proto.common.NodeStatus status = 3;
+ // Cluster networking configuration/status of node, if available.
+ metropolis.proto.common.NodeClusterNetworking clusternet = 4;
};
// WatchRequest specifies what data the caller is interested in. This influences
@@ -345,4 +370,14 @@
oneof kind {
KubernetesWorker kubernetes_worker = 1;
};
+}
+
+message UpdateNodeClusterNetworkingRequest {
+ // Details of the clusternet configuration/state of the node. Whatever is
+ // currently configured in the Curatoor will be fully replaced by data
+ // contained with this field, after validation.
+ metropolis.proto.common.NodeClusterNetworking clusternet = 1;
+}
+
+message UpdateNodeClusterNetworkingResponse {
}
\ No newline at end of file
diff --git a/metropolis/node/core/curator/proto/private/storage.proto b/metropolis/node/core/curator/proto/private/storage.proto
index c4a876c..8344041 100644
--- a/metropolis/node/core/curator/proto/private/storage.proto
+++ b/metropolis/node/core/curator/proto/private/storage.proto
@@ -30,6 +30,11 @@
// join_key is an ED25519 public key used to authenticate the join
// operation. It's generated by the node during the registration process.
bytes join_key = 6;
+
+ // clusternet, if set, is a node's Cluster Networking configuration. See
+ // metropolis.node.core.curator.api.Curator.UpdateNodeClusterNetworking for
+ // more details.
+ metropolis.proto.common.NodeClusterNetworking clusternet = 7;
}
// Information about the cluster owner, currently the only Metropolis management
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
index f2ee5c3..2250ce1 100644
--- a/metropolis/node/core/curator/state_node.go
+++ b/metropolis/node/core/curator/state_node.go
@@ -21,6 +21,7 @@
"crypto/x509"
"encoding/hex"
"fmt"
+ "net/netip"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/codes"
@@ -87,6 +88,15 @@
// kubernetesWorker is set if this node is a Kubernetes worker, ie. running the
// Kubernetes dataplane which runs user workloads.
kubernetesWorker *NodeRoleKubernetesWorker
+
+ // wireguardKey, if set, is the Wireguard key of the node's cluster networking
+ // setup.
+ wireguardKey string
+ // networkingPrefixes are all the network routes exported by the node into the
+ // cluster networking mesh. All of them will be programmed as allowedIPs into a
+ // wireguard peer, but only the pod network will have a single large route
+ // installed into the host routing table.
+ networkPrefixes []netip.Prefix
}
// NewNodeForBootstrap creates a brand new node without regard for any other
@@ -255,6 +265,18 @@
Peers: peers,
}
}
+ if n.wireguardKey != "" {
+ var prefixes []*cpb.NodeClusterNetworking_Prefix
+ for _, prefix := range n.networkPrefixes {
+ prefixes = append(prefixes, &cpb.NodeClusterNetworking_Prefix{
+ Cidr: prefix.String(),
+ })
+ }
+ msg.Clusternet = &cpb.NodeClusterNetworking{
+ WireguardPubkey: n.wireguardKey,
+ Prefixes: prefixes,
+ }
+ }
return msg
}
@@ -306,6 +328,21 @@
Peers: peers,
}
}
+ if cn := msg.Clusternet; cn != nil {
+ n.wireguardKey = cn.WireguardPubkey
+ for _, prefix := range cn.Prefixes {
+ if prefix.Cidr == "" {
+ continue
+ }
+ nip, err := netip.ParsePrefix(prefix.Cidr)
+ if err != nil {
+ // Eat error. When we serialize this back into a node, the invalid record will
+ // just be removed.
+ continue
+ }
+ n.networkPrefixes = append(n.networkPrefixes, nip)
+ }
+ }
return n, nil
}
diff --git a/metropolis/proto/common/common.proto b/metropolis/proto/common/common.proto
index 1c01834..183f790 100644
--- a/metropolis/proto/common/common.proto
+++ b/metropolis/proto/common/common.proto
@@ -175,3 +175,17 @@
};
repeated Node nodes = 1;
}
+
+
+// NodeClusterNetworking carries information about the cluster networking (ie.
+// WireGuard mesh) connectivity of a node.
+message NodeClusterNetworking {
+ message Prefix {
+ string cidr = 1;
+ }
+ // wireguard_pubkey is the base64-encoded public key used by the node.
+ string wireguard_pubkey = 1;
+ // prefixes are networking routes exported by the node to the cluster networking
+ // mesh, and are programmed by other nodes into their wireguard peer config.
+ repeated Prefix prefixes = 2;
+}