m/n/c/curator: implement Management.ApproveNode

This takes a node from NEW to STANDBY. This is the second-to-last
step requires in a node's regsitration flow.

Change-Id: I88f9c7d2cd824c7d3182195b784a725ec9528d28
Reviewed-on: https://review.monogon.dev/c/monogon/+/442
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 361a251..80d0448 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -3,10 +3,13 @@
 import (
 	"bytes"
 	"context"
+	"crypto/ed25519"
 	"sort"
 
+	"go.etcd.io/etcd/clientv3"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/proto"
 
 	"source.monogon.dev/metropolis/node/core/identity"
 	apb "source.monogon.dev/metropolis/proto/api"
@@ -151,3 +154,60 @@
 
 	return nil
 }
+
+func (l *leaderManagement) ApproveNode(ctx context.Context, req *apb.ApproveNodeRequest) (*apb.ApproveNodeResponse, error) {
+	// MVP: check if policy allows for this node to be approved for this cluster.
+	// This should happen automatically, if possible, via hardware attestation
+	// against policy, not manually.
+
+	if len(req.Pubkey) != ed25519.PublicKeySize {
+		return nil, status.Errorf(codes.InvalidArgument, "pubkey must be %d bytes long", ed25519.PublicKeySize)
+	}
+
+	l.muNodes.Lock()
+	defer l.muNodes.Unlock()
+
+	// Find node by pubkey/ID.
+	id := identity.NodeID(req.Pubkey)
+	key, err := nodeEtcdPrefix.Key(id)
+	if err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "pubkey invalid: %v", err)
+	}
+	res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not retrieve node: %v", err)
+	}
+	kvs := res.Responses[0].GetResponseRange().Kvs
+	if len(kvs) != 1 {
+		return nil, status.Errorf(codes.NotFound, "node not found")
+	}
+	node, err := nodeUnmarshal(kvs[0].Value)
+	if err != nil {
+		return nil, status.Errorf(codes.Internal, "could not deserialize node: %v", err)
+	}
+
+	// Ensure node is either UP/STANDBY (no-op) or NEW (set to STANDBY).
+	switch node.state {
+	case cpb.NodeState_NODE_STATE_UP, cpb.NodeState_NODE_STATE_STANDBY:
+		// No-op for idempotency.
+		return &apb.ApproveNodeResponse{}, nil
+	case cpb.NodeState_NODE_STATE_NEW:
+		// What we can act on.
+	default:
+		return nil, status.Errorf(codes.FailedPrecondition, "node in state %s cannot be approved", node.state)
+	}
+
+	node.state = cpb.NodeState_NODE_STATE_STANDBY
+	nodeBytes, err := proto.Marshal(node.proto())
+	if err != nil {
+		// TODO(issues/85): log this
+		return nil, status.Errorf(codes.Unavailable, "could not marshal updated node")
+	}
+	_, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(nodeBytes)))
+	if err != nil {
+		// TODO(issues/85): log this
+		return nil, status.Error(codes.Unavailable, "could not save updated node")
+	}
+
+	return &apb.ApproveNodeResponse{}, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 2608d08..354812a 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -435,9 +435,15 @@
 	}
 }
 
-// TestRegistration exercises the Management.GetRegisterTicket RPC and
-// Curator.RegisterNode. A node should be able to register itself into a running
-// cluster, and have a NEW state.
+// TestRegistration exercises the node 'Register' (a.k.a. Registration) flow,
+// which is described in the Cluster Lifecycle design document.
+//
+// It starts out with a node that's foreign to the cluster, and performs all
+// the steps required to make that node part of a cluster. It calls into the
+// Curator service as the registering node and the Management service as a
+// cluster manager. The node registered into the cluster is fully fake, ie. is
+// not an actual Metropolis node but instead is fully managed from within the
+// test as a set of credentials.
 func TestRegistration(t *testing.T) {
 	cl := fakeLeader(t)
 	defer cl.cancel()
@@ -473,6 +479,59 @@
 	if err != nil {
 		t.Fatalf("RegisterNode failed: %v", err)
 	}
+
+	// Expect node to now be 'NEW'.
+	res3, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+	if err != nil {
+		t.Fatalf("GetNodes failed: %v", err)
+	}
+
+	var otherNodePubkey []byte
+	for {
+		node, err := res3.Recv()
+		if err != nil {
+			t.Fatalf("Recv failed: %v", err)
+		}
+		if identity.NodeID(node.Pubkey) != cl.otherNodeID {
+			continue
+		}
+		if node.State != cpb.NodeState_NODE_STATE_NEW {
+			t.Fatalf("Expected node to be NEW, is %s", node.State)
+		}
+		otherNodePubkey = node.Pubkey
+		break
+	}
+
+	// Approve node.
+	_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{Pubkey: otherNodePubkey})
+	if err != nil {
+		t.Fatalf("ApproveNode failed: %v", err)
+	}
+
+	// Expect node to be 'STANDBY'.
+	res4, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+	if err != nil {
+		t.Fatalf("GetNodes failed: %v", err)
+	}
+	for {
+		node, err := res4.Recv()
+		if err != nil {
+			t.Fatalf("Recv failed: %v", err)
+		}
+		if identity.NodeID(node.Pubkey) != cl.otherNodeID {
+			continue
+		}
+		if node.State != cpb.NodeState_NODE_STATE_STANDBY {
+			t.Fatalf("Expected node to be STANDBY, is %s", node.State)
+		}
+		break
+	}
+
+	// Approve call should be idempotent and not fail when called a second time.
+	_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{Pubkey: otherNodePubkey})
+	if err != nil {
+		t.Fatalf("ApproveNode failed: %v", err)
+	}
 }
 
 // TestClusterUpdateNodeStatus exercises the Curator.UpdateNodeStatus RPC by
@@ -597,51 +656,3 @@
 		t.Fatalf("CaPublicKey mismatch (wanted %s, got %s)", hex.EncodeToString(want), hex.EncodeToString(got))
 	}
 }
-
-// TestRegisterNode is a smoke test for Curator.RegisterNode and
-// Management.GetNodes.
-func TestRegisterNode(t *testing.T) {
-	cl := fakeLeader(t)
-	defer cl.cancel()
-
-	ctx, ctxC := context.WithCancel(context.Background())
-	defer ctxC()
-
-	// Get RegisterTicket.
-	mgmt := apb.NewManagementClient(cl.mgmtConn)
-	resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
-	if err != nil {
-		t.Fatalf("GetRegisterTicket: %v", err)
-	}
-
-	// Register as other node.
-	curOther := ipb.NewCuratorClient(cl.otherNodeConn)
-	_, err = curOther.RegisterNode(ctx, &ipb.RegisterNodeRequest{RegisterTicket: resT.Ticket})
-	if err != nil {
-		t.Fatalf("RegisterNode: %v", err)
-	}
-
-	// Ensure that the cluster sees the node as NEW via GetNodes.
-	resN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
-	if err != nil {
-		t.Fatalf("GetNodes: %v", err)
-	}
-	// Receive nodes from GetNodes until we find the node that we just registered.
-	var otherNode *apb.Node
-	for {
-		node, err := resN.Recv()
-		if err != nil {
-			t.Fatalf("GetNodes.Recv: %v", err)
-		}
-
-		id := identity.NodeID(node.Pubkey)
-		if id == cl.otherNodeID {
-			otherNode = node
-			break
-		}
-	}
-	// Ensure that this node is marked as NEW.
-	if want, got := cpb.NodeState_NODE_STATE_NEW, otherNode.State; want != got {
-		t.Fatalf("Newly registerd should have state %s, got %s", want, got)
-	}
-}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index a562063..c067c02 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -415,3 +415,12 @@
 	}
 	return l.callImpl(srv.Context(), proxy)
 }
+
+func (l *listener) ApproveNode(ctx context.Context, req *apb.ApproveNodeRequest) (res *apb.ApproveNodeResponse, err error) {
+	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+		var err2 error
+		res, err2 = impl.ApproveNode(ctx, req)
+		return err2
+	})
+	return
+}
diff --git a/metropolis/proto/api/management.proto b/metropolis/proto/api/management.proto
index 63336c8..ba50849 100644
--- a/metropolis/proto/api/management.proto
+++ b/metropolis/proto/api/management.proto
@@ -38,6 +38,28 @@
             need: PERMISSION_READ_CLUSTER_STATUS
         };
     }
+
+    // ApproveNode progresses a node's registration process by changing its state
+    // in the cluster from NEW to STANDBY, if not yet STANDBY. This is required
+    // for the node to fully become part of the cluster (ie. have an UP state),
+    // and is required to be called by a manager manually.
+    //
+    // Managers can find out what nodes require approval by performing
+    // a GetNodes call and filtering for nodes in the NEW state. This call is
+    // idempotent and can be executed multiple times, and is a no-op if the node
+    // is already in the STANDBY or even UP states.
+    //
+    // In the future, approval process will be governed by cluster policy, but
+    // currently any node can be approved by a manager, and the manager is
+    // responsible for performing an out-of-band attestation of the node being/
+    // approved (eg. by verifying that the node that is being approved has the
+    // same public key as what the registering node displays in its startup
+    // logs).
+    rpc ApproveNode(ApproveNodeRequest) returns (ApproveNodeResponse) {
+        option (metropolis.proto.ext.authorization) = {
+            need: PERMISSION_APPROVE_NODE
+        };
+    }
 }
 
 message GetRegisterTicketRequest {
@@ -85,3 +107,13 @@
     // Roles assigned by the cluster. This is always set.
     metropolis.proto.common.NodeRoles roles = 4;
 }
+
+
+message ApproveNodeRequest {
+    // Raw public key of the node being approved, has to correspond to a node
+    // currently in the cluster.
+    bytes pubkey = 1;
+}
+
+message ApproveNodeResponse {
+}
\ No newline at end of file
diff --git a/metropolis/proto/ext/authorization.proto b/metropolis/proto/ext/authorization.proto
index 42beb88..60ad68a 100644
--- a/metropolis/proto/ext/authorization.proto
+++ b/metropolis/proto/ext/authorization.proto
@@ -22,6 +22,7 @@
     PERMISSION_GET_REGISTER_TICKET = 1;
     PERMISSION_READ_CLUSTER_STATUS = 2;
     PERMISSION_UPDATE_NODE_SELF = 3;
+    PERMISSION_APPROVE_NODE = 4;
 }
 
 // Authorization policy for an RPC method. This message/API does not have the