m/n/c/curator: implement Curator.CommitNode

This takes a node from STANDBY to UP. This is the last step required in
a node's registration flow.

Change-Id: I6806e84abb862088335a76c42738db43aec75c62
Reviewed-on: https://review.monogon.dev/c/monogon/+/443
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 9ff7d91..c54dc11 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -355,3 +355,104 @@
 
 	return &ipb.RegisterNodeResponse{}, nil
 }
+
+func (l *leaderCurator) CommitNode(ctx context.Context, req *ipb.CommitNodeRequest) (*ipb.CommitNodeResponse, error) {
+	// Call is unauthenticated - verify the other side has connected with an
+	// ephemeral certificate. That certificate's pubkey will become the node's
+	// pubkey.
+	pi := rpc.GetPeerInfo(ctx)
+	if pi == nil || pi.Unauthenticated == nil {
+		return nil, status.Error(codes.Unauthenticated, "connection must be established with a self-signed ephemeral certificate")
+	}
+	pubkey := pi.Unauthenticated.SelfSignedPublicKey
+
+	// Doing a read-then-write operation below, take lock.
+	//
+	// MVP: This can lock up the cluster if too many RegisterNode calls get issued,
+	// we should either ratelimit these or remove the need to lock.
+	l.muNodes.Lock()
+	defer l.muNodes.Unlock()
+
+	// Check if there is a node with this pubkey in the cluster.
+	id := identity.NodeID(pubkey)
+	key, err := nodeEtcdPrefix.Key(id)
+	if err != nil {
+		// TODO(issues/85): log err
+		return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
+	}
+
+	// Retrieve the node and act on its current state, either returning early or
+	// mutating it and continuing with the rest of the Commit logic.
+	res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
+	if err != nil {
+		if rpcErr, ok := rpcError(err); ok {
+			return nil, rpcErr
+		}
+		// TODO(issues/85): log this
+		return nil, status.Errorf(codes.Unavailable, "could not retrieve node %s: %v", id, err)
+	}
+	kvs := res.Responses[0].GetResponseRange().Kvs
+	if len(kvs) != 1 {
+		return nil, status.Errorf(codes.NotFound, "node %s not found", id)
+	}
+	node, err := nodeUnmarshal(kvs[0].Value)
+	if err != nil {
+		// TODO(issues/85): log this
+		return nil, status.Errorf(codes.Unavailable, "could not unmarshal node")
+	}
+	switch node.state {
+	case cpb.NodeState_NODE_STATE_NEW:
+		return nil, status.Error(codes.PermissionDenied, "node is NEW, wait for attestation/approval")
+	case cpb.NodeState_NODE_STATE_DISOWNED:
+		// This node has been since disowned by the cluster for some reason, the
+		// register flow should be aborted.
+		return nil, status.Error(codes.FailedPrecondition, "node is DISOWNED, abort register flow")
+	case cpb.NodeState_NODE_STATE_UP:
+		// This can happen due to a network failure when we already handled a
+		// CommitNode, but we weren't able to respond to the user. CommitNode is
+		// non-idempotent, so just abort, the node should retry from scratch and this
+		// node should be manually disowned/deleted by system owners.
+		return nil, status.Error(codes.FailedPrecondition, "node is already UP, abort register flow")
+	case cpb.NodeState_NODE_STATE_STANDBY:
+		// This is what we want.
+	default:
+		return nil, status.Errorf(codes.Internal, "node is in unknown state: %v", node.state)
+	}
+
+	// Check the given CUK is valid.
+	// TODO(q3k): unify length with localstorage/crypt keySize.
+	if want, got := 32, len(req.ClusterUnlockKey); want != got {
+		return nil, status.Errorf(codes.InvalidArgument, "invalid ClusterUnlockKey length, wanted %d bytes, got %d", want, got)
+	}
+
+	// Generate certificate for node, save new node state, return.
+
+	// If this fails we are safe to let the client retry, as the PKI code is
+	// idempotent.
+	caCertBytes, nodeCertBytes, err := BootstrapNodeCredentials(ctx, l.etcd, pubkey)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not bootstrap node credentials: %v", err)
+	}
+
+	node.state = cpb.NodeState_NODE_STATE_UP
+	node.clusterUnlockKey = req.ClusterUnlockKey
+
+	nodeBytes, err := proto.Marshal(node.proto())
+	if err != nil {
+		// TODO(issues/85): log this
+		return nil, status.Errorf(codes.Unavailable, "could not marshal updated node")
+	}
+	_, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(nodeBytes)))
+	if err != nil {
+		// TODO(issues/85): log this
+		return nil, status.Error(codes.Unavailable, "could not save updated node")
+	}
+
+	// From this point on, any failure (in the server, or in the network, ...) dooms
+	// the node from making progress in registering, as Commit is non-idempotent.
+
+	return &ipb.CommitNodeResponse{
+		CaCertificate:   caCertBytes,
+		NodeCertificate: nodeCertBytes,
+	}, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 80d0448..b92229f 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -160,6 +160,10 @@
 	// This should happen automatically, if possible, via hardware attestation
 	// against policy, not manually.
 
+	// Ensure the given key resembles a public key before using it to generate
+	// a node iD. This key is then used to craft an arbitrary etcd path, so
+	// let's do an early check in case the user set something that's obviously
+	// not a public key.
 	if len(req.Pubkey) != ed25519.PublicKeySize {
 		return nil, status.Errorf(codes.InvalidArgument, "pubkey must be %d bytes long", ed25519.PublicKeySize)
 	}
@@ -167,7 +171,7 @@
 	l.muNodes.Lock()
 	defer l.muNodes.Unlock()
 
-	// Find node by pubkey/ID.
+	// Find node for this pubkey.
 	id := identity.NodeID(req.Pubkey)
 	key, err := nodeEtcdPrefix.Key(id)
 	if err != nil {
@@ -179,7 +183,7 @@
 	}
 	kvs := res.Responses[0].GetResponseRange().Kvs
 	if len(kvs) != 1 {
-		return nil, status.Errorf(codes.NotFound, "node not found")
+		return nil, status.Errorf(codes.NotFound, "node with given pubkey not found")
 	}
 	node, err := nodeUnmarshal(kvs[0].Value)
 	if err != nil {
@@ -197,6 +201,7 @@
 		return nil, status.Errorf(codes.FailedPrecondition, "node in state %s cannot be approved", node.state)
 	}
 
+	// Set node to be STANDBY.
 	node.state = cpb.NodeState_NODE_STATE_STANDBY
 	nodeBytes, err := proto.Marshal(node.proto())
 	if err != nil {
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 354812a..b716e19 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -144,6 +144,7 @@
 		localNodeID:   nodeCredentials.ID(),
 		otherNodeConn: ocl,
 		otherNodeID:   otherNode.ID(),
+		otherNodePriv: otherNode.TLSCredentials().PrivateKey.(ed25519.PrivateKey),
 		caPubKey:      ephemeral.CA.PublicKey.(ed25519.PublicKey),
 		cancel:        ctxC,
 		etcd:          cl,
@@ -168,6 +169,9 @@
 	// otherNodeID is the NodeID of some other node present in the curator
 	// state.
 	otherNodeID string
+	// otherNodePriv is the private key of the other node present in the curator
+	// state, ie. the private key for otherNodeID.
+	otherNodePriv ed25519.PrivateKey
 	// caPubKey is the public key of the CA for this cluster.
 	caPubKey ed25519.PublicKey
 	// cancel shuts down the fake leader and all client connections.
@@ -480,58 +484,60 @@
 		t.Fatalf("RegisterNode failed: %v", err)
 	}
 
-	// Expect node to now be 'NEW'.
-	res3, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
-	if err != nil {
-		t.Fatalf("GetNodes failed: %v", err)
-	}
-
-	var otherNodePubkey []byte
-	for {
-		node, err := res3.Recv()
+	expectOtherNode := func(state cpb.NodeState) {
+		t.Helper()
+		res, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
 		if err != nil {
-			t.Fatalf("Recv failed: %v", err)
+			t.Fatalf("GetNodes failed: %v", err)
 		}
-		if identity.NodeID(node.Pubkey) != cl.otherNodeID {
-			continue
+
+		for {
+			node, err := res.Recv()
+			if err != nil {
+				t.Fatalf("Recv failed: %v", err)
+			}
+			if identity.NodeID(node.Pubkey) != cl.otherNodeID {
+				continue
+			}
+			if node.State != state {
+				t.Fatalf("Expected node to be %s, got %s", state, node.State)
+			}
+			return
 		}
-		if node.State != cpb.NodeState_NODE_STATE_NEW {
-			t.Fatalf("Expected node to be NEW, is %s", node.State)
-		}
-		otherNodePubkey = node.Pubkey
-		break
 	}
+	otherNodePub := cl.otherNodePriv.Public().(ed25519.PublicKey)
+
+	// Expect node to now be 'NEW'.
+	expectOtherNode(cpb.NodeState_NODE_STATE_NEW)
 
 	// Approve node.
-	_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{Pubkey: otherNodePubkey})
+	_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{Pubkey: otherNodePub})
 	if err != nil {
 		t.Fatalf("ApproveNode failed: %v", err)
 	}
 
 	// Expect node to be 'STANDBY'.
-	res4, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
-	if err != nil {
-		t.Fatalf("GetNodes failed: %v", err)
-	}
-	for {
-		node, err := res4.Recv()
-		if err != nil {
-			t.Fatalf("Recv failed: %v", err)
-		}
-		if identity.NodeID(node.Pubkey) != cl.otherNodeID {
-			continue
-		}
-		if node.State != cpb.NodeState_NODE_STATE_STANDBY {
-			t.Fatalf("Expected node to be STANDBY, is %s", node.State)
-		}
-		break
-	}
+	expectOtherNode(cpb.NodeState_NODE_STATE_STANDBY)
 
 	// Approve call should be idempotent and not fail when called a second time.
-	_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{Pubkey: otherNodePubkey})
+	_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{Pubkey: otherNodePub})
 	if err != nil {
 		t.Fatalf("ApproveNode failed: %v", err)
 	}
+
+	// Make 'other node' commit itself into the cluster.
+	_, err = cur.CommitNode(ctx, &ipb.CommitNodeRequest{
+		ClusterUnlockKey: []byte("fakefakefakefakefakefakefakefake"),
+	})
+	if err != nil {
+		t.Fatalf("CommitNode failed: %v", err)
+	}
+
+	// TODO(q3k): once the test cluster's PKI is consistent with the curator's
+	// PKI, test the emitted credentials.
+
+	// Expect node to be 'UP'.
+	expectOtherNode(cpb.NodeState_NODE_STATE_UP)
 }
 
 // TestClusterUpdateNodeStatus exercises the Curator.UpdateNodeStatus RPC by
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index c067c02..ab013bb 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -393,6 +393,15 @@
 	return
 }
 
+func (l *listener) CommitNode(ctx context.Context, req *cpb.CommitNodeRequest) (res *cpb.CommitNodeResponse, err error) {
+	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+		var err2 error
+		res, err2 = impl.CommitNode(ctx, req)
+		return err2
+	})
+	return
+}
+
 type managementGetNodesServer struct {
 	grpc.ServerStream
 	ctx context.Context
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index 92d0b97..da54d03 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -65,6 +65,43 @@
             allow_unauthenticated: true
         };
     }
+
+    // CommitNode is called by nodes that wish to finish registering into
+    // the cluster. They must first call RegisterNode, after which they also
+    // need to be approved by the cluster (currently, this is done by a manager
+    // calling ApproveNode). Then, after performing this call, the node is
+    // considered as a fully functioning member of the cluster, and can begin
+    // assuming roles assigned to it, report its status, and perform other node
+    // functions.
+    //
+    // The node must perform the call using an ephemeral certificate, in the
+    // same way as the RegisterNode call was performed.
+    //
+    // This call will fail if the given node is not in the STANDBY state, ie.
+    // has not yet been approved to join the cluster. It is also _not_
+    // idempotent and cannot be retried. In case of a non-transient failure,
+    // the calling node should block forever and not retry the full registration
+    // process, and instead an administrative intervention (eg. node
+    // registration flow restart) should take care of these cases. This is a
+    // known limiting factor of the API, but allows for tighter security.
+    //
+    // This is the point at which the node submits its Cluster Unlock Key, the
+    // cluster part of its full disk encryption key. This key will be given to
+    // the node whenever it wants to join the cluster again, and the node will
+    // combine it with its locally sealed Node Unlock Key to recover the full
+    // key.
+    //
+    // When the RPC is successful, the curator will return the node's newly
+    // minted node certificate, which can then be used by the node to perform
+    // RPCs acting as this node. This certificate is what the node will use to
+    // perform all further communications with the Curator (until a reboot, in
+    // which case the join flow requires initial communication using an
+    // ephemeral client).
+    rpc CommitNode(CommitNodeRequest) returns (CommitNodeResponse) {
+        option (metropolis.proto.ext.authorization) = {
+            allow_unauthenticated: true
+        };
+    }
 }
 
 // Node is the state and configuration of a node in the cluster.
@@ -159,4 +196,23 @@
 }
 
 message RegisterNodeResponse {
+}
+
+message CommitNodeRequest {
+    // cluster_unlock_key (CUK) is the cluster part of the local storage full
+    // disk encryption key. The node submits it for safekeeping by the cluster,
+    // and keeps the local part (node unlock key, NUK) local, sealed by TPM.
+    bytes cluster_unlock_key = 1;
+}
+
+message CommitNodeResponse {
+    // ca_certificate is the x509 DER-encoded CA certificate for the cluster.
+    // The node should use this to verify the cluster identity when connecting
+    // to it from this point onward.
+    bytes ca_certificate = 1;
+    // node_certificate is the x509 DER-encoded certificate of the node, freshly
+    // minted by the cluster's curator, signed for the Ed25519 keypair that the
+    // node was connecting with. This certificate should be used by the node for
+    // communication with the cluster from this point onward.
+    bytes node_certificate = 2;
 }
\ No newline at end of file