m/n/c/{cluster,roleserve}: implement Join Flow

This implements Join Flow for:
- Registered nodes attempting to re-join the cluster.
- Nodes bootstrapping the cluster.

See: Cluster Lifecycle and Integrity design document

Change-Id: I74ab98fdec650c4f6aa59e34a16c0f95745dc0e9
Reviewed-on: https://review.monogon.dev/c/monogon/+/556
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index 78078bb..e8c7ff2 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -5,6 +5,7 @@
     srcs = [
         "cluster.go",
         "cluster_bootstrap.go",
+        "cluster_join.go",
         "cluster_register.go",
         "platform.go",
     ],
@@ -22,9 +23,11 @@
         "//metropolis/pkg/event/memory",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/api",
+        "//metropolis/proto/common",
         "//metropolis/proto/private",
         "@com_github_cenkalti_backoff_v4//:backoff",
         "@org_golang_google_grpc//:go_default_library",
         "@org_golang_google_protobuf//proto",
+        "@org_golang_x_sys//unix",
     ],
 )
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index d51d7d9..1de24f3 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -31,20 +31,25 @@
 	"errors"
 	"fmt"
 	"io"
+	"net"
 	"net/http"
 	"os"
+	"strings"
 	"sync"
 
 	"github.com/cenkalti/backoff/v4"
 	"google.golang.org/protobuf/proto"
 
+	"source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/consensus"
+	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/node/core/roleserve"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	apb "source.monogon.dev/metropolis/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
 	ppb "source.monogon.dev/metropolis/proto/private"
 )
 
@@ -107,7 +112,15 @@
 	configuration, err := m.storageRoot.ESP.Metropolis.SealedConfiguration.Unseal()
 	if err == nil {
 		supervisor.Logger(ctx).Info("Sealed configuration present. attempting to join cluster")
-		return m.join(ctx, configuration)
+
+		// Read Cluster Directory and unmarshal it. Since the node is already
+		// registered with the cluster, the directory won't be bootstrapped from
+		// Node Parameters.
+		cd, err := m.storageRoot.ESP.Metropolis.ClusterDirectory.Unmarshal()
+		if err != nil {
+			return fmt.Errorf("while reading cluster directory: %w", err)
+		}
+		return m.join(ctx, configuration, cd)
 	}
 
 	if !errors.Is(err, localstorage.ErrNoSealed) {
@@ -241,6 +254,30 @@
 	}
 }
 
-func (m *Manager) join(ctx context.Context, cfg *ppb.SealedConfiguration) error {
-	return fmt.Errorf("unimplemented")
+// logClusterDirectory verbosely logs the whole Cluster Directory passed to it.
+func logClusterDirectory(ctx context.Context, cd *cpb.ClusterDirectory) {
+	for _, node := range cd.Nodes {
+		id := identity.NodeID(node.PublicKey)
+		var addresses []string
+		for _, add := range node.Addresses {
+			addresses = append(addresses, add.Host)
+		}
+		supervisor.Logger(ctx).Infof("    Node ID: %s, Addresses: %s", id, strings.Join(addresses, ","))
+	}
+}
+
+// curatorRemote returns a host:port pair pointing at one of the cluster's
+// available Curator endpoints. It will return an empty string, and an error,
+// if the cluster directory is empty.
+// TODO(issues/117): use dynamic cluster client instead
+func curatorRemote(cd *cpb.ClusterDirectory) (string, error) {
+	if len(cd.Nodes) == 0 {
+		return "", fmt.Errorf("the Cluster Directory is empty.")
+	}
+	n := cd.Nodes[0]
+	if len(n.Addresses) == 0 {
+		return "", fmt.Errorf("the first node in the Cluster Directory doesn't have an associated Address.")
+	}
+	r := n.Addresses[0].Host
+	return net.JoinHostPort(r, node.CuratorServicePort.PortString()), nil
 }
diff --git a/metropolis/node/core/cluster/cluster_bootstrap.go b/metropolis/node/core/cluster/cluster_bootstrap.go
index d40f179..9de5339 100644
--- a/metropolis/node/core/cluster/cluster_bootstrap.go
+++ b/metropolis/node/core/cluster/cluster_bootstrap.go
@@ -43,6 +43,7 @@
 	if err != nil {
 		return fmt.Errorf("could not make and mount data partition: %w", err)
 	}
+	nuk := state.configuration.NodeUnlockKey
 
 	pub, priv, err := ed25519.GenerateKey(rand.Reader)
 	if err != nil {
@@ -50,7 +51,13 @@
 	}
 	supervisor.Logger(ctx).Infof("Bootstrapping: node public key: %s", hex.EncodeToString([]byte(pub)))
 
-	m.roleServer.ProvideBootstrapData(priv, ownerKey, cuk)
+	jpub, jpriv, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		return fmt.Errorf("could not generate join keypair: %w", err)
+	}
+	supervisor.Logger(ctx).Infof("Bootstrapping: node public join key: %s", hex.EncodeToString([]byte(jpub)))
+
+	m.roleServer.ProvideBootstrapData(priv, ownerKey, cuk, nuk, jpriv)
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 	supervisor.Signal(ctx, supervisor.SignalDone)
diff --git a/metropolis/node/core/cluster/cluster_join.go b/metropolis/node/core/cluster/cluster_join.go
new file mode 100644
index 0000000..0cb68bb
--- /dev/null
+++ b/metropolis/node/core/cluster/cluster_join.go
@@ -0,0 +1,75 @@
+package cluster
+
+import (
+	"context"
+	"crypto/ed25519"
+	"crypto/x509"
+	"encoding/hex"
+	"fmt"
+
+	"google.golang.org/grpc"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+	cpb "source.monogon.dev/metropolis/proto/common"
+	ppb "source.monogon.dev/metropolis/proto/private"
+)
+
+// join implements Join Flow of an already registered node.
+func (m *Manager) join(ctx context.Context, sc *ppb.SealedConfiguration, cd *cpb.ClusterDirectory) error {
+	// Generate a complete ED25519 Join Key based on the seed included in Sealed
+	// Configuration.
+	var jpriv ed25519.PrivateKey = sc.JoinKey
+
+	// Get Cluster CA from Sealed Configuration.
+	ca, err := x509.ParseCertificate(sc.ClusterCa)
+	if err != nil {
+		return fmt.Errorf("Cluster CA certificate present in Sealed Configuration could not be parsed: %w", err)
+	}
+
+	// Tell the user what we're doing.
+	hpkey := hex.EncodeToString(jpriv.Public().(ed25519.PublicKey))
+	supervisor.Logger(ctx).Infof("Joining an existing cluster.")
+	supervisor.Logger(ctx).Infof("  Node Join public key: %s", hpkey)
+	supervisor.Logger(ctx).Infof("  Directory:")
+	logClusterDirectory(ctx, cd)
+
+	// Attempt to connect to the first node in the cluster directory.
+	r, err := curatorRemote(cd)
+	if err != nil {
+		return fmt.Errorf("while picking a Curator endpoint: %w", err)
+	}
+	ephCreds, err := rpc.NewEphemeralCredentials(jpriv, ca)
+	if err != nil {
+		return fmt.Errorf("could not create ephemeral credentials: %w", err)
+	}
+	eph, err := grpc.Dial(r, grpc.WithTransportCredentials(ephCreds))
+	if err != nil {
+		return fmt.Errorf("could not create ephemeral client to %q: %w", r, err)
+	}
+	cur := ipb.NewCuratorClient(eph)
+
+	// Join the cluster and use the newly obtained CUK to mount the data
+	// partition.
+	jr, err := cur.JoinNode(ctx, &ipb.JoinNodeRequest{})
+	if err != nil {
+		return fmt.Errorf("join call failed: %w", err)
+	}
+	if err := m.storageRoot.Data.MountExisting(sc, jr.ClusterUnlockKey); err != nil {
+		return fmt.Errorf("while mounting Data: %w", err)
+	}
+
+	// Use the node credentials found in the data partition.
+	var creds identity.NodeCredentials
+	if err := creds.Read(&m.storageRoot.Data.Node.Credentials); err != nil {
+		return fmt.Errorf("while reading node credentials: %w", err)
+	}
+	m.roleServer.ProvideJoinData(creds, cd)
+
+	supervisor.Logger(ctx).Infof("Joined the cluster.")
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	supervisor.Signal(ctx, supervisor.SignalDone)
+	return nil
+}
diff --git a/metropolis/node/core/cluster/cluster_register.go b/metropolis/node/core/cluster/cluster_register.go
index 3acb7d7..c348c32 100644
--- a/metropolis/node/core/cluster/cluster_register.go
+++ b/metropolis/node/core/cluster/cluster_register.go
@@ -8,13 +8,12 @@
 	"encoding/hex"
 	"fmt"
 	"net"
-	"strconv"
-	"strings"
 	"time"
 
+	"golang.org/x/sys/unix"
 	"google.golang.org/grpc"
+	"google.golang.org/protobuf/proto"
 
-	"source.monogon.dev/metropolis/node"
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
@@ -71,14 +70,7 @@
 	supervisor.Logger(ctx).Infof("  Cluster CA public key: %s", hex.EncodeToString(ca.PublicKey.(ed25519.PublicKey)))
 	supervisor.Logger(ctx).Infof("  Register Ticket: %s", hex.EncodeToString(register.RegisterTicket))
 	supervisor.Logger(ctx).Infof("  Directory:")
-	for _, node := range register.ClusterDirectory.Nodes {
-		id := identity.NodeID(node.PublicKey)
-		var addresses []string
-		for _, add := range node.Addresses {
-			addresses = append(addresses, add.Host)
-		}
-		supervisor.Logger(ctx).Infof("    Node ID: %s, Addresses: %s", id, strings.Join(addresses, ","))
-	}
+	logClusterDirectory(ctx, register.ClusterDirectory)
 
 	// Mount new storage with generated CUK, MountNew will save NUK into sc, to be
 	// saved into the ESP after successful registration.
@@ -97,20 +89,29 @@
 	supervisor.Logger(ctx).Infof("Registering: node public key: %s", hex.EncodeToString([]byte(pub)))
 
 	// Attempt to connect to first node in cluster directory and to call Register.
-	//
-	// MVP: this should be properly client-side loadbalanced.
-	remote := register.ClusterDirectory.Nodes[0].Addresses[0].Host
-	remote = net.JoinHostPort(remote, strconv.Itoa(int(node.CuratorServicePort)))
+	r, err := curatorRemote(register.ClusterDirectory)
+	if err != nil {
+		return fmt.Errorf("while picking a Curator endpoint: %w", err)
+	}
 	ephCreds, err := rpc.NewEphemeralCredentials(priv, ca)
 	if err != nil {
 		return fmt.Errorf("could not create ephemeral credentials: %w", err)
 	}
-	eph, err := grpc.Dial(remote, grpc.WithTransportCredentials(ephCreds))
+	eph, err := grpc.Dial(r, grpc.WithTransportCredentials(ephCreds))
 	if err != nil {
-		return fmt.Errorf("could not create ephemeral client to %q: %w", remote, err)
+		return fmt.Errorf("could not create ephemeral client to %q: %w", r, err)
 	}
 	cur := ipb.NewCuratorClient(eph)
 
+	// Generate Join Credentials. The private key will be stored in
+	// SealedConfiguration only if RegisterNode succeeds.
+	jpub, jpriv, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		return fmt.Errorf("could not generate join keypair: %w", err)
+	}
+	sc.JoinKey = jpriv
+	supervisor.Logger(ctx).Infof("Registering: join public key: %s", hex.EncodeToString([]byte(jpub)))
+
 	// Register this node.
 	//
 	// MVP: From this point on forward, we have very little resiliency to failure,
@@ -120,6 +121,7 @@
 	// code should let us do this quite easily.
 	_, err = cur.RegisterNode(ctx, &ipb.RegisterNodeRequest{
 		RegisterTicket: register.RegisterTicket,
+		JoinKey:        jpub,
 	})
 	if err != nil {
 		return fmt.Errorf("register call failed: %w", err)
@@ -149,14 +151,25 @@
 	}
 	m.roleServer.ProvideRegisterData(*creds, register.ClusterDirectory)
 
-	// Save NUK
-	if err = m.storageRoot.ESP.Metropolis.SealedConfiguration.SealSecureBoot(&sc); err != nil {
-		return fmt.Errorf("failed to seal and write configuration: %w", err)
-	}
 	// Save Node Credentials
-	if err = m.storageRoot.Data.Node.Credentials.WriteAll(certBytes, priv, caCertBytes); err != nil {
-		return fmt.Errorf("while writing node credentials: %w", err)
+	if err = creds.Save(&m.storageRoot.Data.Node.Credentials); err != nil {
+		return fmt.Errorf("while saving node credentials: %w", err)
 	}
+	// Save the Cluster Directory into the ESP.
+	cdirRaw, err := proto.Marshal(register.ClusterDirectory)
+	if err != nil {
+		return fmt.Errorf("couldn't marshal ClusterDirectory: %w", err)
+	}
+	if err = m.storageRoot.ESP.Metropolis.ClusterDirectory.Write(cdirRaw, 0644); err != nil {
+		return err
+	}
+	// Include the Cluster CA in Sealed Configuration.
+	sc.ClusterCa = register.CaCertificate
+	// Save Cluster CA, NUK and Join Credentials into Sealed Configuration.
+	if err = m.storageRoot.ESP.Metropolis.SealedConfiguration.SealSecureBoot(&sc); err != nil {
+		return err
+	}
+	unix.Sync()
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 	supervisor.Signal(ctx, supervisor.SignalDone)