m/n/c/curator: implement Join Flow

This implements Join Flow in Curator, as described in Cluster Lifecycle
and Integrity design document.

Change-Id: Idabb471575e1d22a7eb7cce2ad29d18f1f94760a
Reviewed-on: https://review.monogon.dev/c/monogon/+/667
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/curator/bootstrap.go b/metropolis/node/core/curator/bootstrap.go
index 12b24fd..215bfb7 100644
--- a/metropolis/node/core/curator/bootstrap.go
+++ b/metropolis/node/core/curator/bootstrap.go
@@ -91,7 +91,7 @@
 		InitialCRL:      crl,
 	})
 
-	nodePath, err := node.etcdPath()
+	nodePath, err := node.etcdNodePath()
 	if err != nil {
 		return nil, nil, fmt.Errorf("failed to get node key: %w", err)
 	}
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 26f3004..1c7221f 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -5,6 +5,7 @@
 	"crypto/subtle"
 	"fmt"
 	"net"
+	"time"
 
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -245,6 +246,7 @@
 	}
 	// ... update its' status ...
 	node.status = req.Status
+	node.status.Timestamp = time.Now().UnixNano()
 	// ... and save it to etcd.
 	if err := nodeSave(ctx, l.leadership, node); err != nil {
 		return nil, err
@@ -263,6 +265,9 @@
 	}
 	pubkey := pi.Unauthenticated.SelfSignedPublicKey
 
+	// TODO(mateusz@monogon.tech): check req.JoinKey length once Join Flow is
+	// implemented on the client side.
+
 	// Verify that call contains a RegisterTicket and that this RegisterTicket is
 	// valid.
 	wantTicket, err := l.ensureRegisterTicket(ctx)
@@ -307,6 +312,7 @@
 	// No node exists, create one.
 	node = &Node{
 		pubkey: pubkey,
+		jkey:   req.JoinKey,
 		state:  cpb.NodeState_NODE_STATE_NEW,
 	}
 	if err := nodeSave(ctx, l.leadership, node); err != nil {
@@ -414,3 +420,39 @@
 		NodeCertificate: nodeCertBytes,
 	}, nil
 }
+
+func (l *leaderCurator) JoinNode(ctx context.Context, req *ipb.JoinNodeRequest) (*ipb.JoinNodeResponse, error) {
+	// Gather peer information.
+	pi := rpc.GetPeerInfo(ctx)
+	if pi == nil || pi.Unauthenticated == nil {
+		return nil, status.Error(codes.PermissionDenied, "connection must be established with a self-signed ephemeral certificate")
+	}
+	// The node will attempt to connect using its Join Key. jkey will contain
+	// its public part.
+	jkey := pi.Unauthenticated.SelfSignedPublicKey
+
+	// Take the lock to prevent data races during the next step.
+	l.muNodes.Lock()
+	defer l.muNodes.Unlock()
+
+	// Resolve the Node ID using Join Key, then use the ID to load node
+	// information from etcd.
+	id, err := nodeIdByJoinKey(ctx, l.leadership, jkey)
+	if err != nil {
+		return nil, err
+	}
+	node, err := nodeLoad(ctx, l.leadership, id)
+	if err != nil {
+		return nil, err
+	}
+
+	// Don't progress further unless the node is already UP.
+	if node.state != cpb.NodeState_NODE_STATE_UP {
+		return nil, status.Errorf(codes.FailedPrecondition, "node isn't UP, cannot join")
+	}
+
+	// Return the Node's CUK, completing the Join Flow.
+	return &ipb.JoinNodeResponse{
+		ClusterUnlockKey: node.clusterUnlockKey,
+	}, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 7254fab..13564c4 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -119,12 +119,13 @@
 
 	// Build a curator leader object. This implements methods that will be
 	// exercised by tests.
-	leader := newCuratorLeader(&leadership{
+	leadership := &leadership{
 		lockKey:   lockKey,
 		lockRev:   lockRev,
 		etcd:      curEtcd,
 		consensus: consensus.TestServiceHandle(t, cluster.Client(0)),
-	}, &nodeCredentials.Node)
+	}
+	leader := newCuratorLeader(leadership, &nodeCredentials.Node)
 
 	// Create a curator gRPC server which performs authentication as per the created
 	// ServerSecurity and is backed by the created leader.
@@ -179,6 +180,8 @@
 	}()
 
 	return fakeLeaderData{
+		l:             leadership,
+		curatorLis:    externalLis,
 		mgmtConn:      mcl,
 		localNodeConn: lcl,
 		localNodeID:   nodeCredentials.ID(),
@@ -194,6 +197,11 @@
 // fakeLeaderData is returned by fakeLeader and contains information about the
 // newly created leader and connections to its gRPC listeners.
 type fakeLeaderData struct {
+	// l is a type internal to Curator, representing its ability to perform
+	// actions as a leader.
+	l *leadership
+	// curatorLis is a listener intended for Node connections.
+	curatorLis *bufconn.Listener
 	// mgmtConn is a gRPC connection to the leader's public gRPC interface,
 	// authenticated as a cluster manager.
 	mgmtConn grpc.ClientConnInterface
@@ -514,10 +522,17 @@
 		t.Errorf("Unexpected ticket change between calls")
 	}
 
+	// Generate the node's public join key to be used in the bootstrap process.
+	nodeJoinPub, _, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		t.Fatalf("could not generate node join keypair: %v", err)
+	}
+
 	// Register 'other node' into cluster.
 	cur := ipb.NewCuratorClient(cl.otherNodeConn)
 	_, err = cur.RegisterNode(ctx, &ipb.RegisterNodeRequest{
 		RegisterTicket: res1.Ticket,
+		JoinKey:        nodeJoinPub,
 	})
 	if err != nil {
 		t.Fatalf("RegisterNode failed: %v", err)
@@ -579,6 +594,60 @@
 	expectOtherNode(cpb.NodeState_NODE_STATE_UP)
 }
 
+// TestJoin exercises Join Flow, as described in "Cluster Lifecycle" design
+// document, assuming the node has already completed Register Flow.
+func TestJoin(t *testing.T) {
+	cl := fakeLeader(t)
+	defer cl.cancel()
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	// Build the test node and save it into etcd.
+	npub, _, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		t.Fatalf("could not generate node keypair: %v", err)
+	}
+	jpub, jpriv, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		t.Fatalf("could not generate join keypair: %v", err)
+	}
+	cuk := []byte("fakefakefakefakefakefakefakefake")
+	node := Node{
+		clusterUnlockKey: cuk,
+		pubkey:           npub,
+		jkey:             jpub,
+		state:            cpb.NodeState_NODE_STATE_UP,
+	}
+	if err := nodeSave(ctx, cl.l, &node); err != nil {
+		t.Fatalf("nodeSave failed: %v", err)
+	}
+
+	// Connect to Curator using the node's Join Credentials, as opposed to the
+	// node keypair, then join the cluster.
+	withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
+		return cl.curatorLis.Dial()
+	})
+	ephCreds, err := rpc.NewEphemeralCredentials(jpriv, cl.ca)
+	if err != nil {
+		t.Fatalf("NewEphemeralCredentials: %v", err)
+	}
+	eph, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(ephCreds))
+	if err != nil {
+		t.Fatalf("Dialing external GRPC failed: %v", err)
+	}
+	cur := ipb.NewCuratorClient(eph)
+	jr, err := cur.JoinNode(ctx, &ipb.JoinNodeRequest{})
+	if err != nil {
+		t.Fatalf("JoinNode failed: %v", err)
+	}
+
+	// Compare the received CUK with the one we started out with.
+	if bytes.Compare(cuk, jr.ClusterUnlockKey) != 0 {
+		t.Fatal("JoinNode returned an invalid CUK.")
+	}
+}
+
 // TestClusterUpdateNodeStatus exercises the Curator.UpdateNodeStatus RPC by
 // sending node updates and making sure they are reflected in subsequent Watch
 // events.
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 705512d..9715c91 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -387,6 +387,15 @@
 	return
 }
 
+func (l *listener) JoinNode(ctx context.Context, req *cpb.JoinNodeRequest) (res *cpb.JoinNodeResponse, err error) {
+	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
+		var err2 error
+		res, err2 = impl.JoinNode(ctx, req)
+		return err2
+	})
+	return
+}
+
 type managementGetNodesServer struct {
 	grpc.ServerStream
 	ctx context.Context
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index 9a03a15..425051f 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -99,6 +99,22 @@
             allow_unauthenticated: true
         };
     }
+
+    // JoinNode is called by nodes (re)joining the cluster. Register Flow must
+    // be completed beforehand (see: CommitNode). This call will fail if the
+    // calling node is not in the UP state. This call is idempotent, and as
+    // such it can be retried.
+    //
+    // JoinNode is authenticated in the transport layer with a Join Key passed
+    // to Curator in an earlier RegisterNode call.
+    //
+    // Upon success, the node will receive its Cluster Unlock Key, enabling it
+    // to mount encrypted storage after combining with Node Unlock Key.
+    rpc JoinNode(JoinNodeRequest) returns (JoinNodeResponse) {
+        option (metropolis.proto.ext.authorization) = {
+            allow_unauthenticated: true
+        };
+    }
 }
 
 // Node is the state and configuration of a node in the cluster.
@@ -190,6 +206,9 @@
     // node by a cluster operator in NodeParameters, and it retrieved by an
     // operator from a running cluster via Management.GetRegisterTicket.
     bytes register_ticket = 1;
+    // join_key is an ED25519 public key generated during registration. It's
+    // shared with Curator to authenticate the join procedure later on.
+    bytes join_key = 2;
 }
 
 message RegisterNodeResponse {
@@ -212,4 +231,14 @@
     // node was connecting with. This certificate should be used by the node for
     // communication with the cluster from this point onward.
     bytes node_certificate = 2;
-}
\ No newline at end of file
+}
+
+message JoinNodeRequest {
+}
+
+message JoinNodeResponse {
+    // cluster_unlock_key (CUK) is the key submitted by the node through
+    // CommitNodeRequest, and returned in this message after authenticating
+    // with Join Credentials.
+    bytes cluster_unlock_key = 1;
+}
diff --git a/metropolis/node/core/curator/proto/private/storage.proto b/metropolis/node/core/curator/proto/private/storage.proto
index c8b6b79..c4a876c 100644
--- a/metropolis/node/core/curator/proto/private/storage.proto
+++ b/metropolis/node/core/curator/proto/private/storage.proto
@@ -26,6 +26,10 @@
     metropolis.proto.common.NodeRoles roles = 4;
 
     metropolis.proto.common.NodeStatus status = 5;
+
+    // join_key is an ED25519 public key used to authenticate the join
+    // operation. It's generated by the node during the registration process.
+    bytes join_key = 6;
 }
 
 // Information about the cluster owner, currently the only Metropolis management
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
index ce87723..82f9859 100644
--- a/metropolis/node/core/curator/state_node.go
+++ b/metropolis/node/core/curator/state_node.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"crypto/x509"
+	"encoding/hex"
 	"fmt"
 
 	clientv3 "go.etcd.io/etcd/client/v3"
@@ -62,6 +63,11 @@
 	// The public key is used to generate the Node's canonical ID.
 	pubkey []byte
 
+	// jkey is the node's ED25519 public Join Key. The private part of the key
+	// never leaves the node. The key is generated by the node and passed to
+	// Curator during the registration process.
+	jkey []byte
+
 	// state is the state of this node as seen from the point of view of the
 	// cluster. See //metropolis/proto:common.proto for more information.
 	state cpb.NodeState
@@ -167,21 +173,31 @@
 }
 
 var (
+	// nodeEtcdPrefix is an etcd key prefix preceding cluster member node IDs,
+	// mapping to ppb.Node values.
 	nodeEtcdPrefix = mustNewEtcdPrefix("/nodes/")
+	// joinCredPrefix is an etcd key prefix preceding hex-encoded cluster member
+	// node join keys, mapping to node IDs.
+	joinCredPrefix = mustNewEtcdPrefix("/join_keys/")
 )
 
-// etcdPath builds the etcd path in which this node's protobuf-serialized state
-// is stored in etcd.
-func (n *Node) etcdPath() (string, error) {
+// etcdNodePath builds the etcd path in which this node's protobuf-serialized
+// state is stored in etcd.
+func (n *Node) etcdNodePath() (string, error) {
 	return nodeEtcdPrefix.Key(n.ID())
 }
 
+func (n *Node) etcdJoinKeyPath() (string, error) {
+	return joinCredPrefix.Key(hex.EncodeToString(n.jkey))
+}
+
 // proto serializes the Node object into protobuf, to be used for saving to
 // etcd.
 func (n *Node) proto() *ppb.Node {
 	msg := &ppb.Node{
 		ClusterUnlockKey: n.clusterUnlockKey,
 		PublicKey:        n.pubkey,
+		JoinKey:          n.jkey,
 		FsmState:         n.state,
 		Roles:            &cpb.NodeRoles{},
 		Status:           n.status,
@@ -215,6 +231,7 @@
 	n := &Node{
 		clusterUnlockKey: msg.ClusterUnlockKey,
 		pubkey:           msg.PublicKey,
+		jkey:             msg.JoinKey,
 		state:            msg.FsmState,
 		status:           msg.Status,
 	}
@@ -266,7 +283,7 @@
 	rpc.Trace(ctx).Printf("loadNode(%s)...", id)
 	key, err := nodeEtcdPrefix.Key(id)
 	if err != nil {
-		// TODO(issues/85): log err
+		rpc.Trace(ctx).Printf("invalid node id: %v", err)
 		return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
 	}
 	res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
@@ -274,7 +291,7 @@
 		if rpcErr, ok := rpcError(err); ok {
 			return nil, rpcErr
 		}
-		// TODO(issues/85): log this
+		rpc.Trace(ctx).Printf("could not retrieve node %s: %v", id, err)
 		return nil, status.Errorf(codes.Unavailable, "could not retrieve node %s: %v", id, err)
 	}
 	kvs := res.Responses[0].GetResponseRange().Kvs
@@ -284,7 +301,7 @@
 	}
 	node, err := nodeUnmarshal(kvs[0].Value)
 	if err != nil {
-		// TODO(issues/85): log this
+		rpc.Trace(ctx).Printf("could not unmarshal node: %v", err)
 		return nil, status.Errorf(codes.Unavailable, "could not unmarshal node")
 	}
 	rpc.Trace(ctx).Printf("loadNode(%s): unmarshal ok", id)
@@ -294,26 +311,80 @@
 // nodeSave attempts to save a node into etcd, within a given active leadership.
 // All returned errors are gRPC statuses that safe to return to untrusted callers.
 func nodeSave(ctx context.Context, l *leadership, n *Node) error {
+	// Build an etcd operation to save the node with a key based on its ID.
 	id := n.ID()
 	rpc.Trace(ctx).Printf("nodeSave(%s)...", id)
-	key, err := nodeEtcdPrefix.Key(id)
+	nkey, err := nodeEtcdPrefix.Key(id)
 	if err != nil {
-		// TODO(issues/85): log err
+		rpc.Trace(ctx).Printf("invalid node id: %v", err)
 		return status.Errorf(codes.InvalidArgument, "invalid node id")
 	}
 	nodeBytes, err := proto.Marshal(n.proto())
 	if err != nil {
-		// TODO(issues/85): log this
+		rpc.Trace(ctx).Printf("could not marshal updated node: %v", err)
 		return status.Errorf(codes.Unavailable, "could not marshal updated node")
 	}
-	_, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(nodeBytes)))
+	ons := clientv3.OpPut(nkey, string(nodeBytes))
+	ops := []clientv3.Op{ons}
+
+	// Build an etcd operation to map the node's Join Key into its ID for use in
+	// Join Flow, if jkey is set. Once Join Flow is implemented on the client
+	// side, this operation will become mandatory.
+	if n.jkey != nil {
+		jkey, err := n.etcdJoinKeyPath()
+		if err != nil {
+			// This should never happen.
+			rpc.Trace(ctx).Printf("invalid join key representation: %v", err)
+			return status.Errorf(codes.InvalidArgument, "invalid join key representation")
+		}
+		// TODO(mateusz@monogon.tech): ensure that if the join key index already
+		// exists, it points to the node we're saving. Refuse to save/update the
+		// node if it doesn't.
+		oks := clientv3.OpPut(jkey, id)
+		ops = append(ops, oks)
+	}
+
+	// Execute one or both operations atomically.
+	_, err = l.txnAsLeader(ctx, ops...)
 	if err != nil {
 		if rpcErr, ok := rpcError(err); ok {
 			return rpcErr
 		}
-		// TODO(issues/85): log this
+		rpc.Trace(ctx).Printf("could not save updated node: %v", err)
 		return status.Error(codes.Unavailable, "could not save updated node")
 	}
 	rpc.Trace(ctx).Printf("nodeSave(%s): write ok", id)
 	return nil
 }
+
+// nodeIdByJoinKey attempts to fetch a Node ID corresponding to the given Join
+// Key from etcd, within a given active leadership. All returned errors are
+// gRPC statuses that are safe to return to untrusted callers. If the given
+// Join Key is not found, errNodeNotFound will be returned along with an empty
+// string.
+func nodeIdByJoinKey(ctx context.Context, l *leadership, jkey []byte) (string, error) {
+	if len(jkey) == 0 {
+		return "", status.Errorf(codes.InvalidArgument, "join key is empty")
+	}
+
+	cred := hex.EncodeToString(jkey)
+	key, err := joinCredPrefix.Key(cred)
+	if err != nil {
+		// This should never happen.
+		rpc.Trace(ctx).Printf("invalid join key representation: %v", err)
+		return "", status.Errorf(codes.InvalidArgument, "invalid join key representation")
+	}
+	res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
+	if err != nil {
+		if rpcErr, ok := rpcError(err); ok {
+			return "", rpcErr
+		}
+		return "", status.Errorf(codes.Unavailable, "could not retrieve node id matching join key %s: %v", cred, err)
+	}
+	kvs := res.Responses[0].GetResponseRange().Kvs
+	if len(kvs) != 1 {
+		return "", errNodeNotFound
+	}
+	id := string(kvs[0].Value[:])
+	return id, nil
+}
diff --git a/metropolis/proto/common/common.proto b/metropolis/proto/common/common.proto
index 07ff460..46bd08f 100644
--- a/metropolis/proto/common/common.proto
+++ b/metropolis/proto/common/common.proto
@@ -132,6 +132,9 @@
     // listening for gRPC, and role-specific services like etcd and
     // Kubernetes).
     string external_address = 1;
+    // timestamp is an epoch number associated with the last status update.
+    // It's set with a nanosecond granularity.
+    int64 timestamp = 2;
 }
 
 // The Cluster Directory is information about the network addressing of nodes