m/n/c/{cluster,roleserve}: implement Join Flow
This implements Join Flow for:
- Registered nodes attempting to re-join the cluster.
- Nodes bootstrapping the cluster.
See: Cluster Lifecycle and Integrity design document
Change-Id: I74ab98fdec650c4f6aa59e34a16c0f95745dc0e9
Reviewed-on: https://review.monogon.dev/c/monogon/+/556
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index 78078bb..e8c7ff2 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -5,6 +5,7 @@
srcs = [
"cluster.go",
"cluster_bootstrap.go",
+ "cluster_join.go",
"cluster_register.go",
"platform.go",
],
@@ -22,9 +23,11 @@
"//metropolis/pkg/event/memory",
"//metropolis/pkg/supervisor",
"//metropolis/proto/api",
+ "//metropolis/proto/common",
"//metropolis/proto/private",
"@com_github_cenkalti_backoff_v4//:backoff",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_protobuf//proto",
+ "@org_golang_x_sys//unix",
],
)
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index d51d7d9..1de24f3 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -31,20 +31,25 @@
"errors"
"fmt"
"io"
+ "net"
"net/http"
"os"
+ "strings"
"sync"
"github.com/cenkalti/backoff/v4"
"google.golang.org/protobuf/proto"
+ "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus"
+ "source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/core/roleserve"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
ppb "source.monogon.dev/metropolis/proto/private"
)
@@ -107,7 +112,15 @@
configuration, err := m.storageRoot.ESP.Metropolis.SealedConfiguration.Unseal()
if err == nil {
supervisor.Logger(ctx).Info("Sealed configuration present. attempting to join cluster")
- return m.join(ctx, configuration)
+
+ // Read Cluster Directory and unmarshal it. Since the node is already
+ // registered with the cluster, the directory won't be bootstrapped from
+ // Node Parameters.
+ cd, err := m.storageRoot.ESP.Metropolis.ClusterDirectory.Unmarshal()
+ if err != nil {
+ return fmt.Errorf("while reading cluster directory: %w", err)
+ }
+ return m.join(ctx, configuration, cd)
}
if !errors.Is(err, localstorage.ErrNoSealed) {
@@ -241,6 +254,30 @@
}
}
-func (m *Manager) join(ctx context.Context, cfg *ppb.SealedConfiguration) error {
- return fmt.Errorf("unimplemented")
+// logClusterDirectory verbosely logs the whole Cluster Directory passed to it.
+func logClusterDirectory(ctx context.Context, cd *cpb.ClusterDirectory) {
+ for _, node := range cd.Nodes {
+ id := identity.NodeID(node.PublicKey)
+ var addresses []string
+ for _, add := range node.Addresses {
+ addresses = append(addresses, add.Host)
+ }
+ supervisor.Logger(ctx).Infof(" Node ID: %s, Addresses: %s", id, strings.Join(addresses, ","))
+ }
+}
+
+// curatorRemote returns a host:port pair pointing at one of the cluster's
+// available Curator endpoints. It will return an empty string, and an error,
+// if the cluster directory is empty.
+// TODO(issues/117): use dynamic cluster client instead
+func curatorRemote(cd *cpb.ClusterDirectory) (string, error) {
+ if len(cd.Nodes) == 0 {
+ return "", fmt.Errorf("the Cluster Directory is empty.")
+ }
+ n := cd.Nodes[0]
+ if len(n.Addresses) == 0 {
+ return "", fmt.Errorf("the first node in the Cluster Directory doesn't have an associated Address.")
+ }
+ r := n.Addresses[0].Host
+ return net.JoinHostPort(r, node.CuratorServicePort.PortString()), nil
}
diff --git a/metropolis/node/core/cluster/cluster_bootstrap.go b/metropolis/node/core/cluster/cluster_bootstrap.go
index d40f179..9de5339 100644
--- a/metropolis/node/core/cluster/cluster_bootstrap.go
+++ b/metropolis/node/core/cluster/cluster_bootstrap.go
@@ -43,6 +43,7 @@
if err != nil {
return fmt.Errorf("could not make and mount data partition: %w", err)
}
+ nuk := state.configuration.NodeUnlockKey
pub, priv, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
@@ -50,7 +51,13 @@
}
supervisor.Logger(ctx).Infof("Bootstrapping: node public key: %s", hex.EncodeToString([]byte(pub)))
- m.roleServer.ProvideBootstrapData(priv, ownerKey, cuk)
+ jpub, jpriv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ return fmt.Errorf("could not generate join keypair: %w", err)
+ }
+ supervisor.Logger(ctx).Infof("Bootstrapping: node public join key: %s", hex.EncodeToString([]byte(jpub)))
+
+ m.roleServer.ProvideBootstrapData(priv, ownerKey, cuk, nuk, jpriv)
supervisor.Signal(ctx, supervisor.SignalHealthy)
supervisor.Signal(ctx, supervisor.SignalDone)
diff --git a/metropolis/node/core/cluster/cluster_join.go b/metropolis/node/core/cluster/cluster_join.go
new file mode 100644
index 0000000..0cb68bb
--- /dev/null
+++ b/metropolis/node/core/cluster/cluster_join.go
@@ -0,0 +1,75 @@
+package cluster
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/x509"
+ "encoding/hex"
+ "fmt"
+
+ "google.golang.org/grpc"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
+ ppb "source.monogon.dev/metropolis/proto/private"
+)
+
+// join implements Join Flow of an already registered node.
+func (m *Manager) join(ctx context.Context, sc *ppb.SealedConfiguration, cd *cpb.ClusterDirectory) error {
+ // Generate a complete ED25519 Join Key based on the seed included in Sealed
+ // Configuration.
+ var jpriv ed25519.PrivateKey = sc.JoinKey
+
+ // Get Cluster CA from Sealed Configuration.
+ ca, err := x509.ParseCertificate(sc.ClusterCa)
+ if err != nil {
+ return fmt.Errorf("Cluster CA certificate present in Sealed Configuration could not be parsed: %w", err)
+ }
+
+ // Tell the user what we're doing.
+ hpkey := hex.EncodeToString(jpriv.Public().(ed25519.PublicKey))
+ supervisor.Logger(ctx).Infof("Joining an existing cluster.")
+ supervisor.Logger(ctx).Infof(" Node Join public key: %s", hpkey)
+ supervisor.Logger(ctx).Infof(" Directory:")
+ logClusterDirectory(ctx, cd)
+
+ // Attempt to connect to the first node in the cluster directory.
+ r, err := curatorRemote(cd)
+ if err != nil {
+ return fmt.Errorf("while picking a Curator endpoint: %w", err)
+ }
+ ephCreds, err := rpc.NewEphemeralCredentials(jpriv, ca)
+ if err != nil {
+ return fmt.Errorf("could not create ephemeral credentials: %w", err)
+ }
+ eph, err := grpc.Dial(r, grpc.WithTransportCredentials(ephCreds))
+ if err != nil {
+ return fmt.Errorf("could not create ephemeral client to %q: %w", r, err)
+ }
+ cur := ipb.NewCuratorClient(eph)
+
+ // Join the cluster and use the newly obtained CUK to mount the data
+ // partition.
+ jr, err := cur.JoinNode(ctx, &ipb.JoinNodeRequest{})
+ if err != nil {
+ return fmt.Errorf("join call failed: %w", err)
+ }
+ if err := m.storageRoot.Data.MountExisting(sc, jr.ClusterUnlockKey); err != nil {
+ return fmt.Errorf("while mounting Data: %w", err)
+ }
+
+ // Use the node credentials found in the data partition.
+ var creds identity.NodeCredentials
+ if err := creds.Read(&m.storageRoot.Data.Node.Credentials); err != nil {
+ return fmt.Errorf("while reading node credentials: %w", err)
+ }
+ m.roleServer.ProvideJoinData(creds, cd)
+
+ supervisor.Logger(ctx).Infof("Joined the cluster.")
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+}
diff --git a/metropolis/node/core/cluster/cluster_register.go b/metropolis/node/core/cluster/cluster_register.go
index 3acb7d7..c348c32 100644
--- a/metropolis/node/core/cluster/cluster_register.go
+++ b/metropolis/node/core/cluster/cluster_register.go
@@ -8,13 +8,12 @@
"encoding/hex"
"fmt"
"net"
- "strconv"
- "strings"
"time"
+ "golang.org/x/sys/unix"
"google.golang.org/grpc"
+ "google.golang.org/protobuf/proto"
- "source.monogon.dev/metropolis/node"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
@@ -71,14 +70,7 @@
supervisor.Logger(ctx).Infof(" Cluster CA public key: %s", hex.EncodeToString(ca.PublicKey.(ed25519.PublicKey)))
supervisor.Logger(ctx).Infof(" Register Ticket: %s", hex.EncodeToString(register.RegisterTicket))
supervisor.Logger(ctx).Infof(" Directory:")
- for _, node := range register.ClusterDirectory.Nodes {
- id := identity.NodeID(node.PublicKey)
- var addresses []string
- for _, add := range node.Addresses {
- addresses = append(addresses, add.Host)
- }
- supervisor.Logger(ctx).Infof(" Node ID: %s, Addresses: %s", id, strings.Join(addresses, ","))
- }
+ logClusterDirectory(ctx, register.ClusterDirectory)
// Mount new storage with generated CUK, MountNew will save NUK into sc, to be
// saved into the ESP after successful registration.
@@ -97,20 +89,29 @@
supervisor.Logger(ctx).Infof("Registering: node public key: %s", hex.EncodeToString([]byte(pub)))
// Attempt to connect to first node in cluster directory and to call Register.
- //
- // MVP: this should be properly client-side loadbalanced.
- remote := register.ClusterDirectory.Nodes[0].Addresses[0].Host
- remote = net.JoinHostPort(remote, strconv.Itoa(int(node.CuratorServicePort)))
+ r, err := curatorRemote(register.ClusterDirectory)
+ if err != nil {
+ return fmt.Errorf("while picking a Curator endpoint: %w", err)
+ }
ephCreds, err := rpc.NewEphemeralCredentials(priv, ca)
if err != nil {
return fmt.Errorf("could not create ephemeral credentials: %w", err)
}
- eph, err := grpc.Dial(remote, grpc.WithTransportCredentials(ephCreds))
+ eph, err := grpc.Dial(r, grpc.WithTransportCredentials(ephCreds))
if err != nil {
- return fmt.Errorf("could not create ephemeral client to %q: %w", remote, err)
+ return fmt.Errorf("could not create ephemeral client to %q: %w", r, err)
}
cur := ipb.NewCuratorClient(eph)
+ // Generate Join Credentials. The private key will be stored in
+ // SealedConfiguration only if RegisterNode succeeds.
+ jpub, jpriv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ return fmt.Errorf("could not generate join keypair: %w", err)
+ }
+ sc.JoinKey = jpriv
+ supervisor.Logger(ctx).Infof("Registering: join public key: %s", hex.EncodeToString([]byte(jpub)))
+
// Register this node.
//
// MVP: From this point on forward, we have very little resiliency to failure,
@@ -120,6 +121,7 @@
// code should let us do this quite easily.
_, err = cur.RegisterNode(ctx, &ipb.RegisterNodeRequest{
RegisterTicket: register.RegisterTicket,
+ JoinKey: jpub,
})
if err != nil {
return fmt.Errorf("register call failed: %w", err)
@@ -149,14 +151,25 @@
}
m.roleServer.ProvideRegisterData(*creds, register.ClusterDirectory)
- // Save NUK
- if err = m.storageRoot.ESP.Metropolis.SealedConfiguration.SealSecureBoot(&sc); err != nil {
- return fmt.Errorf("failed to seal and write configuration: %w", err)
- }
// Save Node Credentials
- if err = m.storageRoot.Data.Node.Credentials.WriteAll(certBytes, priv, caCertBytes); err != nil {
- return fmt.Errorf("while writing node credentials: %w", err)
+ if err = creds.Save(&m.storageRoot.Data.Node.Credentials); err != nil {
+ return fmt.Errorf("while saving node credentials: %w", err)
}
+ // Save the Cluster Directory into the ESP.
+ cdirRaw, err := proto.Marshal(register.ClusterDirectory)
+ if err != nil {
+ return fmt.Errorf("couldn't marshal ClusterDirectory: %w", err)
+ }
+ if err = m.storageRoot.ESP.Metropolis.ClusterDirectory.Write(cdirRaw, 0644); err != nil {
+ return err
+ }
+ // Include the Cluster CA in Sealed Configuration.
+ sc.ClusterCa = register.CaCertificate
+ // Save Cluster CA, NUK and Join Credentials into Sealed Configuration.
+ if err = m.storageRoot.ESP.Metropolis.SealedConfiguration.SealSecureBoot(&sc); err != nil {
+ return err
+ }
+ unix.Sync()
supervisor.Signal(ctx, supervisor.SignalHealthy)
supervisor.Signal(ctx, supervisor.SignalDone)