m/n/c/curator: implement Curator.RegisterNode

This is the 'Register' call from the cluster lifecycle design document.
We don't yet call it from node startup code, but we do exercise it in a
Curator test.

Change-Id: Ife617b148a25fc8aecb0ed15f78a758ca4538016
Reviewed-on: https://review.monogon.dev/c/monogon/+/423
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 0ce248c..3f39af3 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -14,6 +14,7 @@
         "state.go",
         "state_node.go",
         "state_pki.go",
+        "state_registerticket.go",
     ],
     importpath = "source.monogon.dev/metropolis/node/core/curator",
     visibility = ["//visibility:public"],
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index 9e25097..08772af 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -35,6 +35,10 @@
 	// This lock has to be taken any time such RMW operation takes place when not
 	// additionally guarded using etcd transactions.
 	muNodes sync.Mutex
+
+	// muRegisterTicket guards changes to the register ticket. Its usage semantics
+	// are the same as for muNodes, as described above.
+	muRegisterTicket sync.Mutex
 }
 
 var (
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 3c2a87c..c566040 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -2,6 +2,7 @@
 
 import (
 	"context"
+	"crypto/subtle"
 	"fmt"
 
 	"go.etcd.io/etcd/clientv3"
@@ -13,6 +14,7 @@
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
 	"source.monogon.dev/metropolis/pkg/event/etcd"
+	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
 // leaderCurator implements the Curator gRPC API (ipb.Curator) as a curator
@@ -268,3 +270,86 @@
 
 	return &ipb.UpdateNodeStatusResponse{}, nil
 }
+
+func (l *leaderCurator) RegisterNode(ctx context.Context, req *ipb.RegisterNodeRequest) (*ipb.RegisterNodeResponse, error) {
+	// Call is unauthenticated - verify the other side has connected with an
+	// ephemeral certificate. That certificate's pubkey will become the node's
+	// pubkey.
+	pi := rpc.GetPeerInfo(ctx)
+	if pi == nil || pi.Unauthenticated == nil {
+		return nil, status.Error(codes.Unauthenticated, "connection must be established with a self-signed ephemeral certificate")
+	}
+	pubkey := pi.Unauthenticated.SelfSignedPublicKey
+
+	// Verify that call contains a RegisterTicket and that this RegisterTicket is
+	// valid.
+	wantTicket, err := l.ensureRegisterTicket(ctx)
+	if err != nil {
+		// TODO(q3k): log err
+		return nil, status.Error(codes.Unavailable, "could not retrieve register ticket")
+	}
+	gotTicket := req.RegisterTicket
+	if subtle.ConstantTimeCompare(wantTicket, gotTicket) != 1 {
+		return nil, status.Error(codes.PermissionDenied, "registerticket invalid")
+	}
+
+	// Doing a read-then-write operation below, take lock.
+	//
+	// MVP: This can lock up the cluster if too many RegisterNode calls get issued,
+	// we should either ratelimit these or remove the need to lock.
+	l.muNodes.Lock()
+	defer l.muNodes.Unlock()
+
+	// Check if there already is a node with this pubkey in the cluster.
+	id := identity.NodeID(pubkey)
+	key, err := nodeEtcdPrefix.Key(id)
+	if err != nil {
+		// TODO(q3k): log err
+		return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
+	}
+	res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
+	if err != nil {
+		if rpcErr, ok := rpcError(err); ok {
+			return nil, rpcErr
+		}
+		// TODO(q3k): log this
+		return nil, status.Errorf(codes.Unavailable, "could not retrieve node %s: %v", id, err)
+	}
+	kvs := res.Responses[0].GetResponseRange().Kvs
+	if len(kvs) > 0 {
+		node, err := nodeUnmarshal(kvs[0].Value)
+		if err != nil {
+			// TODO(q3k): log this
+			return nil, status.Errorf(codes.Unavailable, "could not unmarshal node")
+		}
+		// If the existing node is in the NEW state already, there's nothing to do,
+		// return no error. This can happen in case of spurious retries from the calling
+		// node.
+		if node.state == cpb.NodeState_NODE_STATE_NEW {
+			return &ipb.RegisterNodeResponse{}, nil
+		}
+		// We can return a bit more information to the calling node here, as if it's in
+		// possession of the private key corresponding to an existing node in the
+		// cluster, it should have access to the status of the node without danger of
+		// leaking data about other nodes. TODO(q3k): log this
+		return nil, status.Errorf(codes.FailedPrecondition, "node already exists in cluster, state %s", node.state.String())
+	}
+
+	// No node exists, create one.
+	node := &Node{
+		pubkey: pubkey,
+		state:  cpb.NodeState_NODE_STATE_NEW,
+	}
+	nodeBytes, err := proto.Marshal(node.proto())
+	if err != nil {
+		// TODO(q3k): log this
+		return nil, status.Errorf(codes.Unavailable, "could not marshal new node")
+	}
+	_, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(nodeBytes)))
+	if err != nil {
+		// TODO(q3k): log this
+		return nil, status.Error(codes.Unavailable, "could not save new node")
+	}
+
+	return &ipb.RegisterNodeResponse{}, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 849f9b8..676688d 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -4,15 +4,11 @@
 	"bytes"
 	"context"
 	"crypto/ed25519"
-	"crypto/rand"
 	"sort"
 
-	"go.etcd.io/etcd/clientv3"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
-	"google.golang.org/protobuf/proto"
 
-	ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
 	"source.monogon.dev/metropolis/node/core/identity"
 	apb "source.monogon.dev/metropolis/proto/api"
 	cpb "source.monogon.dev/metropolis/proto/common"
@@ -49,40 +45,12 @@
 )
 
 func (l *leaderManagement) GetRegisterTicket(ctx context.Context, req *apb.GetRegisterTicketRequest) (*apb.GetRegisterTicketResponse, error) {
-	// Retrieve existing ticket, if any.
-	res, err := l.txnAsLeader(ctx, clientv3.OpGet(registerTicketEtcdPath))
+	ticket, err := l.ensureRegisterTicket(ctx)
 	if err != nil {
-		return nil, status.Errorf(codes.Unavailable, "could not retrieve register ticket: %v", err)
+		return nil, err
 	}
-	kvs := res.Responses[0].GetResponseRange().Kvs
-	if len(kvs) > 0 {
-		// Ticket already generated, return.
-		return &apb.GetRegisterTicketResponse{
-			Ticket: kvs[0].Value,
-		}, nil
-	}
-
-	// No ticket, generate one.
-	ticket := &ppb.RegisterTicket{
-		Opaque: make([]byte, registerTicketSize),
-	}
-	_, err = rand.Read(ticket.Opaque)
-	if err != nil {
-		return nil, status.Errorf(codes.Unavailable, "could not generate new ticket: %v", err)
-	}
-	ticketBytes, err := proto.Marshal(ticket)
-	if err != nil {
-		return nil, status.Errorf(codes.Unavailable, "could not marshal new ticket: %v", err)
-	}
-
-	// Commit new ticket to etcd.
-	_, err = l.txnAsLeader(ctx, clientv3.OpPut(registerTicketEtcdPath, string(ticketBytes)))
-	if err != nil {
-		return nil, status.Errorf(codes.Unavailable, "could not save new ticket: %v", err)
-	}
-
 	return &apb.GetRegisterTicketResponse{
-		Ticket: ticketBytes,
+		Ticket: ticket,
 	}, nil
 }
 
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 23d24ed..97ea58e 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -123,6 +123,13 @@
 		t.Fatalf("Dialing local GRPC failed: %v", err)
 	}
 
+	// Create an ephemeral node gRPC client for the 'other node'.
+	otherNode := ephemeral.Nodes[1]
+	ocl, err := rpc.NewEphemeralClientTest(externalLis, otherNode.TLSCredentials().PrivateKey.(ed25519.PrivateKey), ephemeral.CA)
+	if err != nil {
+		t.Fatalf("Dialing external GRPC failed: %v", err)
+	}
+
 	// Close the clients on context cancel.
 	go func() {
 		<-ctx.Done()
@@ -134,7 +141,8 @@
 		mgmtConn:      mcl,
 		localNodeConn: lcl,
 		localNodeID:   nodeCredentials.ID(),
-		otherNodeID:   ephemeral.Nodes[1].ID(),
+		otherNodeConn: ocl,
+		otherNodeID:   otherNode.ID(),
 		caPubKey:      ephemeral.CA.PublicKey.(ed25519.PublicKey),
 		cancel:        ctxC,
 		etcd:          cl,
@@ -153,6 +161,9 @@
 	localNodeConn grpc.ClientConnInterface
 	// localNodeID is the NodeID of the fake node that the leader is running on.
 	localNodeID string
+	// otherNodeConn is an connection from some other node (otherNodeID) into the
+	// cluster, authenticated using an ephemeral certificate.
+	otherNodeConn grpc.ClientConnInterface
 	// otherNodeID is the NodeID of some other node present in the curator
 	// state.
 	otherNodeID string
@@ -423,8 +434,10 @@
 	}
 }
 
-// TestManagementRegisterTicket exercises the Management.GetRegisterTicket RPC.
-func TestManagementRegisterTicket(t *testing.T) {
+// TestRegistration exercises the Management.GetRegisterTicket RPC and
+// Curator.RegisterNode. A node should be able to register itself into a running
+// cluster, and have a NEW state.
+func TestRegistration(t *testing.T) {
 	cl := fakeLeader(t)
 	defer cl.cancel()
 
@@ -450,6 +463,15 @@
 	if !bytes.Equal(res1.Ticket, res2.Ticket) {
 		t.Errorf("Unexpected ticket change between calls")
 	}
+
+	// Register 'other node' into cluster.
+	cur := ipb.NewCuratorClient(cl.otherNodeConn)
+	_, err = cur.RegisterNode(ctx, &ipb.RegisterNodeRequest{
+		RegisterTicket: res1.Ticket,
+	})
+	if err != nil {
+		t.Fatalf("RegisterNode failed: %v", err)
+	}
 }
 
 // TestClusterUpdateNodeStatus exercises the Curator.UpdateNodeStatus RPC by
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 8a8dcd8..aea6f1d 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -383,3 +383,12 @@
 	})
 	return
 }
+
+func (l *listener) RegisterNode(ctx context.Context, req *cpb.RegisterNodeRequest) (res *cpb.RegisterNodeResponse, err error) {
+	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+		var err2 error
+		res, err2 = impl.RegisterNode(ctx, req)
+		return err2
+	})
+	return
+}
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index 8bf35df..92d0b97 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -43,7 +43,7 @@
     // is eventually consistent with the state of the objects in the Curator.
     rpc Watch(WatchRequest) returns (stream WatchEvent) {
         option (metropolis.proto.ext.authorization) = {
-            need: PERMISSION_READ_CLUSTER_STATUS;
+            need: PERMISSION_READ_CLUSTER_STATUS
         };
     }
     // UpdateNodestatus is called by nodes in the cluster to report their own
@@ -51,7 +51,18 @@
     // Watch.
     rpc UpdateNodeStatus(UpdateNodeStatusRequest) returns (UpdateNodeStatusResponse) {
         option (metropolis.proto.ext.authorization) = {
-            need: PERMISSION_UPDATE_NODE_SELF;
+            need: PERMISSION_UPDATE_NODE_SELF
+        };
+    }
+
+    // RegisterNode is called by nodes that wish to begin registering into the
+    // cluster. This will created a 'New' node in the cluster state.
+    rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {
+        option (metropolis.proto.ext.authorization) = {
+            // The node doesn't yet have any credentials and will provide a
+            // self-signed ephemeral certificate to prove ownership of an
+            // Ed25519 key.
+            allow_unauthenticated: true
         };
     }
 }
@@ -138,3 +149,14 @@
 
 message UpdateNodeStatusResponse {
 }
+
+message RegisterNodeRequest {
+    // register_ticket is the opaque Register Ticket required from a node to
+    // begin registering it into a cluster. It's provided to the registering
+    // node by a cluster operator in NodeParameters, and it retrieved by an
+    // operator from a running cluster via Management.GetRegisterTicket.
+    bytes register_ticket = 1;
+}
+
+message RegisterNodeResponse {
+}
\ No newline at end of file
diff --git a/metropolis/node/core/curator/state_registerticket.go b/metropolis/node/core/curator/state_registerticket.go
new file mode 100644
index 0000000..e516661
--- /dev/null
+++ b/metropolis/node/core/curator/state_registerticket.go
@@ -0,0 +1,52 @@
+package curator
+
+import (
+	"context"
+	"crypto/rand"
+
+	"go.etcd.io/etcd/clientv3"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/proto"
+
+	ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+)
+
+// ensureRegisterTicket returns the cluster's current RegisterTicket, creating
+// one if not yet present in the cluster state.
+func (l *leadership) ensureRegisterTicket(ctx context.Context) ([]byte, error) {
+	l.muRegisterTicket.Lock()
+	defer l.muRegisterTicket.Unlock()
+
+	// Retrieve existing ticket, if any.
+	res, err := l.txnAsLeader(ctx, clientv3.OpGet(registerTicketEtcdPath))
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not retrieve register ticket: %v", err)
+	}
+	kvs := res.Responses[0].GetResponseRange().Kvs
+	if len(kvs) > 0 {
+		// Ticket already generated, return.
+		return kvs[0].Value, nil
+	}
+
+	// No ticket, generate one.
+	ticket := &ppb.RegisterTicket{
+		Opaque: make([]byte, registerTicketSize),
+	}
+	_, err = rand.Read(ticket.Opaque)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not generate new ticket: %v", err)
+	}
+	ticketBytes, err := proto.Marshal(ticket)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not marshal new ticket: %v", err)
+	}
+
+	// Commit new ticket to etcd.
+	_, err = l.txnAsLeader(ctx, clientv3.OpPut(registerTicketEtcdPath, string(ticketBytes)))
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not save new ticket: %v", err)
+	}
+
+	return ticketBytes, nil
+}
diff --git a/metropolis/proto/api/configuration.proto b/metropolis/proto/api/configuration.proto
index 41e57d0..62d5094 100644
--- a/metropolis/proto/api/configuration.proto
+++ b/metropolis/proto/api/configuration.proto
@@ -18,6 +18,8 @@
 package metropolis.proto.api;
 option go_package = "source.monogon.dev/metropolis/proto/api";
 
+import "metropolis/proto/common/common.proto";
+
 // NodeParameters is the data with which a Node is set booted. It contains the
 // configuration required for a node to either bootstrap a new cluster, or
 // register into an existing one.
@@ -25,10 +27,34 @@
 // implementation-specific way (currently: either on ESP partition or via qemu
 // fw_cfg).
 message NodeParameters {
+    // ClusterBootstrap configures the node to attempt to create a new cluster
+    // from scratch. Further nodes can become part of the cluster by being
+    // configured with ClusterRegister, which should contain data retrieved from
+    // the newly bootstrapped cluster by its operator.
     message ClusterBootstrap {
+        // owner_public_key is a raw Ed25519 public whose corresponding private
+        // key can be used to prove ownership of the cluster and retrieve
+        // management credentials for the cluster via an AAA.Escrow call.
         bytes owner_public_key = 1;
     }
+    // ClusterRegister configures the node to attempt to register into an
+    // existing cluster, ie. contact an existing running cluster and become
+    // its member.
     message ClusterRegister {
+        // cluster_directory is a directory (mapping of names into IP addresses
+        // and public keys) of existing nodes in the cluster. It's used as the
+        // initial contact point of the already running cluster that the node
+        // should register into. It can be retrieved by an operator from
+        // a running cluster via Management.GetClusterInfo.
+        metropolis.proto.common.ClusterDirectory cluster_directory = 1;
+        // register_ticket is the opaque Register Ticket required from a node to
+        // begin registering it into a cluster. It can be retrieved by an
+        // operator from a running cluster via Management.GetRegisterTicket.
+        bytes register_ticket = 2;
+        // ca_public_key is the public key of the CA of the cluster that the
+        // node should expect when contacting nodes in cluster_directory and
+        // attempting to register into a cluster.
+        bytes ca_public_key = 3;
     }
     oneof cluster {
         ClusterBootstrap cluster_bootstrap = 1;