m/n/c/cluster: implement register flow
Change-Id: I197cbfa96d34c9912c7fc19710db25276e7440fc
Reviewed-on: https://review.monogon.dev/c/monogon/+/454
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index 1d55e8d..45c1164 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -5,6 +5,7 @@
srcs = [
"cluster.go",
"cluster_bootstrap.go",
+ "cluster_register.go",
"platform.go",
"status.go",
"watcher.go",
@@ -12,13 +13,16 @@
importpath = "source.monogon.dev/metropolis/node/core/cluster",
visibility = ["//metropolis/node/core:__subpackages__"],
deps = [
+ "//metropolis/node:go_default_library",
"//metropolis/node/core/consensus:go_default_library",
"//metropolis/node/core/consensus/client:go_default_library",
"//metropolis/node/core/curator:go_default_library",
+ "//metropolis/node/core/curator/proto/api:go_default_library",
"//metropolis/node/core/identity:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/network:go_default_library",
"//metropolis/node/core/network/hostsfile:go_default_library",
+ "//metropolis/node/core/rpc:go_default_library",
"//metropolis/pkg/event:go_default_library",
"//metropolis/pkg/event/memory:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index 3ff1ad4..144a47b 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -128,10 +128,6 @@
}
}
-func (m *Manager) register(ctx context.Context, bootstrap *apb.NodeParameters_ClusterRegister) error {
- return fmt.Errorf("unimplemented")
-}
-
func (m *Manager) nodeParamsFWCFG(ctx context.Context) (*apb.NodeParameters, error) {
bytes, err := os.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/dev.monogon.metropolis/parameters.pb/raw")
if err != nil {
diff --git a/metropolis/node/core/cluster/cluster_bootstrap.go b/metropolis/node/core/cluster/cluster_bootstrap.go
index 4e1c05b..b9b7ef2 100644
--- a/metropolis/node/core/cluster/cluster_bootstrap.go
+++ b/metropolis/node/core/cluster/cluster_bootstrap.go
@@ -99,7 +99,7 @@
status := Status{
State: cpb.ClusterState_CLUSTER_STATE_HOME,
- hasLocalConsensus: true,
+ HasLocalConsensus: true,
consensusClient: metropolisKV,
// Credentials are set further down once created through a curator
// short-circuit bootstrap function.
diff --git a/metropolis/node/core/cluster/cluster_register.go b/metropolis/node/core/cluster/cluster_register.go
new file mode 100644
index 0000000..81db8c9
--- /dev/null
+++ b/metropolis/node/core/cluster/cluster_register.go
@@ -0,0 +1,144 @@
+package cluster
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "crypto/x509"
+ "encoding/hex"
+ "fmt"
+ "net"
+ "strconv"
+ "strings"
+ "time"
+
+ "source.monogon.dev/metropolis/node"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ apb "source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
+ ppb "source.monogon.dev/metropolis/proto/private"
+)
+
+// register performs the registration flow of the node in the cluster, ie. makes
+// this node part of an existing cluster.
+//
+// This is a temporary implementation that's not well hardened against
+// transitive failures, but is good enough to get us off the ground and able to
+// test multiple nodes in a cluster. It does notably not run either
+// etcd/consensus or the curator, which also prevents Kubernetes from running.
+func (m *Manager) register(ctx context.Context, register *apb.NodeParameters_ClusterRegister) error {
+ // Do a validation pass on the provided NodeParameters.Register data, fail early
+ // if it looks invalid.
+ ca, err := x509.ParseCertificate(register.CaCertificate)
+ if err != nil {
+ return fmt.Errorf("NodeParameters.Register invalid: CaCertificate could not parsed: %w", err)
+ }
+ if err := identity.VerifyCAInsecure(ca); err != nil {
+ return fmt.Errorf("NodeParameters.Register invalid: CaCertificate invalid: %w", err)
+ }
+ if len(register.RegisterTicket) == 0 {
+ return fmt.Errorf("NodeParameters.Register invalid: RegisterTicket not set")
+ }
+ if register.ClusterDirectory == nil || len(register.ClusterDirectory.Nodes) == 0 {
+ return fmt.Errorf("NodeParameters.ClusterDirectory invalid: must contain at least one node")
+ }
+ for i, node := range register.ClusterDirectory.Nodes {
+ if len(node.Addresses) == 0 {
+ return fmt.Errorf("NodeParameters.ClusterDirectory.Nodes[%d] invalid: must have at least one address", i)
+ }
+ for j, add := range node.Addresses {
+ if add.Host == "" || net.ParseIP(add.Host) == nil {
+ return fmt.Errorf("NodeParameters.ClusterDirectory.Nodes[%d].Addresses[%d] invalid: not a parseable address", i, j)
+ }
+ }
+ if len(node.PublicKey) != ed25519.PublicKeySize {
+ return fmt.Errorf("NodeParameters.ClusterDirectory.Nodes[%d] invalid: PublicKey invalid length or not set", i)
+ }
+ }
+
+ // Validation passed, let's take the state lock and start working on registering
+ // us into the cluster.
+
+ state, unlock := m.lock()
+ defer unlock()
+
+ // Tell the user what we're doing.
+ supervisor.Logger(ctx).Infof("Registering into existing cluster.")
+ supervisor.Logger(ctx).Infof(" Cluster CA public key: %s", hex.EncodeToString(ca.PublicKey.(ed25519.PublicKey)))
+ supervisor.Logger(ctx).Infof(" Register Ticket: %s", hex.EncodeToString(register.RegisterTicket))
+ supervisor.Logger(ctx).Infof(" Directory:")
+ for _, node := range register.ClusterDirectory.Nodes {
+ id := identity.NodeID(node.PublicKey)
+ var addresses []string
+ for _, add := range node.Addresses {
+ addresses = append(addresses, add.Host)
+ }
+ supervisor.Logger(ctx).Infof(" Node ID: %s, Addresses: %s", id, strings.Join(addresses, ","))
+ }
+
+ // Mount new storage with generated CUK, and save LUK into sealed config proto.
+ state.configuration = &ppb.SealedConfiguration{}
+ supervisor.Logger(ctx).Infof("Registering: mounting new storage...")
+ cuk, err := m.storageRoot.Data.MountNew(state.configuration)
+ if err != nil {
+ return fmt.Errorf("could not make and mount data partition: %w", err)
+ }
+
+ pub, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ return fmt.Errorf("could not generate node keypair: %w", err)
+ }
+ supervisor.Logger(ctx).Infof("Registering: node public key: %s", hex.EncodeToString([]byte(pub)))
+
+ // Attempt to connect to first node in cluster directory and to call Register.
+ //
+ // MVP: this should be properly client-side loadbalanced.
+ remote := register.ClusterDirectory.Nodes[0].Addresses[0].Host
+ remote = net.JoinHostPort(remote, strconv.Itoa(int(node.CuratorServicePort)))
+ eph, err := rpc.NewEphemeralClient(remote, priv, ca)
+ if err != nil {
+ return fmt.Errorf("could not create ephemeral client to %q: %w", remote, err)
+ }
+ cur := ipb.NewCuratorClient(eph)
+ _, err = cur.RegisterNode(ctx, &ipb.RegisterNodeRequest{
+ RegisterTicket: register.RegisterTicket,
+ })
+ if err != nil {
+ return fmt.Errorf("register call failed: %w", err)
+ }
+
+ // Attempt to commit in a loop. This will succeed once the node is approved.
+ supervisor.Logger(ctx).Infof("Registering: success, attempting to commit...")
+ var certBytes, caCertBytes []byte
+ for {
+ resC, err := cur.CommitNode(ctx, &ipb.CommitNodeRequest{
+ ClusterUnlockKey: cuk,
+ })
+ if err == nil {
+ supervisor.Logger(ctx).Infof("Registering: Commit succesfull, received certificate")
+ certBytes = resC.NodeCertificate
+ caCertBytes = resC.CaCertificate
+ break
+ }
+ supervisor.Logger(ctx).Infof("Registering: Commit failed, retrying: %v", err)
+ time.Sleep(time.Second)
+ }
+
+ // Node is now UP, build client and report it to downstream code.
+ creds, err := identity.NewNodeCredentials(priv, certBytes, caCertBytes)
+ if err != nil {
+ return fmt.Errorf("NewNodeCredentials failed after receiving certificate from cluster: %w", err)
+ }
+ status := Status{
+ State: cpb.ClusterState_CLUSTER_STATE_HOME,
+ HasLocalConsensus: false,
+ Credentials: creds,
+ }
+ m.status.Set(status)
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+}
diff --git a/metropolis/node/core/cluster/status.go b/metropolis/node/core/cluster/status.go
index 3dbfb56..481c818 100644
--- a/metropolis/node/core/cluster/status.go
+++ b/metropolis/node/core/cluster/status.go
@@ -20,7 +20,7 @@
// hasLocalConsensus is true if the local node is running a local consensus
// (etcd) server.
- hasLocalConsensus bool
+ HasLocalConsensus bool
// consensusClient is an etcd client to the local consensus server if the node
// has such a server and the cluster state is HOME or SPLIT.
consensusClient client.Namespaced
@@ -43,7 +43,7 @@
// ConsensusClient returns an etcd/consensus client for a given ConsensusUser.
// The node must be running a local consensus/etcd server.
func (s *Status) ConsensusClient(user ConsensusUser) (client.Namespaced, error) {
- if !s.hasLocalConsensus {
+ if !s.HasLocalConsensus {
return nil, ErrNoLocalConsensus
}
diff --git a/metropolis/node/core/debug_service.go b/metropolis/node/core/debug_service.go
index 48a4c9b..f253d0e 100644
--- a/metropolis/node/core/debug_service.go
+++ b/metropolis/node/core/debug_service.go
@@ -53,6 +53,9 @@
}
func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+ if s.roleserve == nil {
+ return nil, status.Errorf(codes.Unavailable, "node does not run roleserver/kubernetes")
+ }
w := s.roleserve.Watch()
defer w.Close()
for {
diff --git a/metropolis/node/core/identity/certificates.go b/metropolis/node/core/identity/certificates.go
index 95b7e0d..c15f913 100644
--- a/metropolis/node/core/identity/certificates.go
+++ b/metropolis/node/core/identity/certificates.go
@@ -61,15 +61,39 @@
}
}
+// VerifyCAInsecure ensures that the given certificate is a valid certificate
+// that is allowed to act as a CA and which is emitted for an Ed25519 keypair.
+//
+// It does _not_ ensure that the certificate is the local node's CA, and should
+// not be used for security checks, just for data validation checks.
+func VerifyCAInsecure(ca *x509.Certificate) error {
+ // Ensure ca certificate uses ED25519 keypair.
+ if _, ok := ca.PublicKey.(ed25519.PublicKey); !ok {
+ return fmt.Errorf("not issued for ed25519 keypair")
+ }
+ // Ensure CA certificate has the X.509 basic constraints extension. Everything
+ // else is legacy, we might as well weed that out early.
+ if !ca.BasicConstraintsValid {
+ return fmt.Errorf("does not have basic constraints")
+ }
+ // Ensure CA certificate can act as CA per BasicConstraints.
+ if !ca.IsCA {
+ return fmt.Errorf("not permitted to act as CA")
+ }
+ if ca.KeyUsage != 0 && ca.KeyUsage&x509.KeyUsageCertSign == 0 {
+ return fmt.Errorf("not permitted to sign certificates")
+ }
+ return nil
+}
+
// VerifyInCluster ensures that the given certificate has been signed by a CA
// certificate and are both certificates emitted for ed25519 keypairs.
//
// The subject certificate's public key is returned if verification is
// successful, and error is returned otherwise.
func VerifyInCluster(cert, ca *x509.Certificate) (ed25519.PublicKey, error) {
- // Ensure ca certificate uses ED25519 keypair.
- if _, ok := ca.PublicKey.(ed25519.PublicKey); !ok {
- return nil, fmt.Errorf("ca certificate not issued for ed25519 keypair")
+ if err := VerifyCAInsecure(ca); err != nil {
+ return nil, fmt.Errorf("ca certificate invalid: %w", err)
}
// Ensure subject cert is signed by ca.
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 5d495fa..0a63d48 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -144,71 +144,73 @@
return fmt.Errorf("new couldn't find home in new cluster, aborting: %w", err)
}
- // Here starts some hairy stopgap code. In the future, not all nodes will have
- // direct access to etcd (ie. the ability to retrieve an etcd client via
- // status.ConsensusClient).
- // However, we are not ready to implement this yet, as that would require
- // moving more logic into the curator (eg. some of the Kubernetes PKI logic).
+ // Currently, only the first node of the cluster runs etcd. That means only that
+ // node can run the curator, kubernetes and roleserver.
//
- // For now, we keep Kubernetes PKI initialization logic here, and just assume
- // that every node will have direct access to etcd.
+ // This is a temporary stopgap until we land a roleserver rewrite which fixes
+ // this.
- // Retrieve namespaced etcd KV clients for the two main direct etcd users:
- // - Curator
- // - Kubernetes PKI
- ckv, err := status.ConsensusClient(cluster.ConsensusUserCurator)
- if err != nil {
- close(trapdoor)
- return fmt.Errorf("failed to retrieve consensus curator client: %w", err)
- }
- kkv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
- if err != nil {
- close(trapdoor)
- return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
- }
+ var rs *roleserve.Service
+ if status.HasLocalConsensus {
+ // Retrieve namespaced etcd KV clients for the two main direct etcd users:
+ // - Curator
+ // - Kubernetes PKI
+ ckv, err := status.ConsensusClient(cluster.ConsensusUserCurator)
+ if err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to retrieve consensus curator client: %w", err)
+ }
+ kkv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
+ if err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
+ }
- // TODO(q3k): restart curator on credentials change?
+ // TODO(q3k): restart curator on credentials change?
- // Start cluster curator. The cluster curator is responsible for lifecycle
- // management of the cluster.
- // In the future, this will only be started on nodes that run etcd.
- c := curator.New(curator.Config{
- Etcd: ckv,
- NodeCredentials: status.Credentials,
- // TODO(q3k): make this configurable?
- LeaderTTL: time.Second * 5,
- Directory: &root.Ephemeral.Curator,
- })
- if err := supervisor.Run(ctx, "curator", c.Run); err != nil {
- close(trapdoor)
- return fmt.Errorf("when starting curator: %w", err)
- }
+ // Start cluster curator. The cluster curator is responsible for lifecycle
+ // management of the cluster.
+ // In the future, this will only be started on nodes that run etcd.
+ c := curator.New(curator.Config{
+ Etcd: ckv,
+ NodeCredentials: status.Credentials,
+ // TODO(q3k): make this configurable?
+ LeaderTTL: time.Second * 5,
+ Directory: &root.Ephemeral.Curator,
+ })
+ if err := supervisor.Run(ctx, "curator", c.Run); err != nil {
+ close(trapdoor)
+ return fmt.Errorf("when starting curator: %w", err)
+ }
- // We are now in a cluster. We can thus access our 'node' object and
- // start all services that we should be running.
- logger.Info("Enrolment success, continuing startup.")
+ // We are now in a cluster. We can thus access our 'node' object and
+ // start all services that we should be running.
+ logger.Info("Enrolment success, continuing startup.")
- // Ensure Kubernetes PKI objects exist in etcd. In the future, this logic will
- // be implemented in the curator.
- kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kkv)
- if err := kpki.EnsureAll(ctx); err != nil {
- close(trapdoor)
- return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
- }
+ // Ensure Kubernetes PKI objects exist in etcd. In the future, this logic will
+ // be implemented in the curator.
+ kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kkv)
+ if err := kpki.EnsureAll(ctx); err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
+ }
- // Start the role service. The role service connects to the curator and runs
- // all node-specific role code (eg. Kubernetes services).
- // supervisor.Logger(ctx).Infof("Starting role service...")
- rs := roleserve.New(roleserve.Config{
- CuratorDial: c.DialCluster,
- StorageRoot: root,
- Network: networkSvc,
- KPKI: kpki,
- NodeID: status.Credentials.ID(),
- })
- if err := supervisor.Run(ctx, "role", rs.Run); err != nil {
- close(trapdoor)
- return fmt.Errorf("failed to start role service: %w", err)
+ // Start the role service. The role service connects to the curator and runs
+ // all node-specific role code (eg. Kubernetes services).
+ // supervisor.Logger(ctx).Infof("Starting role service...")
+ rs = roleserve.New(roleserve.Config{
+ CuratorDial: c.DialCluster,
+ StorageRoot: root,
+ Network: networkSvc,
+ KPKI: kpki,
+ NodeID: status.Credentials.ID(),
+ })
+ if err := supervisor.Run(ctx, "role", rs.Run); err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to start role service: %w", err)
+ }
+ } else {
+ logger.Warningf("TODO(q3k): Dummy node - will not run Kubernetes/Curator/Roleserver...")
}
// Start the node debug service.