m/n/c/curator: implement Curator.RegisterNode
This is the 'Register' call from the cluster lifecycle design document.
We don't yet call it from node startup code, but we do exercise it in a
Curator test.
Change-Id: Ife617b148a25fc8aecb0ed15f78a758ca4538016
Reviewed-on: https://review.monogon.dev/c/monogon/+/423
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 0ce248c..3f39af3 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -14,6 +14,7 @@
"state.go",
"state_node.go",
"state_pki.go",
+ "state_registerticket.go",
],
importpath = "source.monogon.dev/metropolis/node/core/curator",
visibility = ["//visibility:public"],
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index 9e25097..08772af 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -35,6 +35,10 @@
// This lock has to be taken any time such RMW operation takes place when not
// additionally guarded using etcd transactions.
muNodes sync.Mutex
+
+ // muRegisterTicket guards changes to the register ticket. Its usage semantics
+ // are the same as for muNodes, as described above.
+ muRegisterTicket sync.Mutex
}
var (
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 3c2a87c..c566040 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -2,6 +2,7 @@
import (
"context"
+ "crypto/subtle"
"fmt"
"go.etcd.io/etcd/clientv3"
@@ -13,6 +14,7 @@
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/event/etcd"
+ cpb "source.monogon.dev/metropolis/proto/common"
)
// leaderCurator implements the Curator gRPC API (ipb.Curator) as a curator
@@ -268,3 +270,86 @@
return &ipb.UpdateNodeStatusResponse{}, nil
}
+
+func (l *leaderCurator) RegisterNode(ctx context.Context, req *ipb.RegisterNodeRequest) (*ipb.RegisterNodeResponse, error) {
+ // Call is unauthenticated - verify the other side has connected with an
+ // ephemeral certificate. That certificate's pubkey will become the node's
+ // pubkey.
+ pi := rpc.GetPeerInfo(ctx)
+ if pi == nil || pi.Unauthenticated == nil {
+ return nil, status.Error(codes.Unauthenticated, "connection must be established with a self-signed ephemeral certificate")
+ }
+ pubkey := pi.Unauthenticated.SelfSignedPublicKey
+
+ // Verify that call contains a RegisterTicket and that this RegisterTicket is
+ // valid.
+ wantTicket, err := l.ensureRegisterTicket(ctx)
+ if err != nil {
+ // TODO(q3k): log err
+ return nil, status.Error(codes.Unavailable, "could not retrieve register ticket")
+ }
+ gotTicket := req.RegisterTicket
+ if subtle.ConstantTimeCompare(wantTicket, gotTicket) != 1 {
+ return nil, status.Error(codes.PermissionDenied, "registerticket invalid")
+ }
+
+ // Doing a read-then-write operation below, take lock.
+ //
+ // MVP: This can lock up the cluster if too many RegisterNode calls get issued,
+ // we should either ratelimit these or remove the need to lock.
+ l.muNodes.Lock()
+ defer l.muNodes.Unlock()
+
+ // Check if there already is a node with this pubkey in the cluster.
+ id := identity.NodeID(pubkey)
+ key, err := nodeEtcdPrefix.Key(id)
+ if err != nil {
+ // TODO(q3k): log err
+ return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
+ }
+ res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
+ if err != nil {
+ if rpcErr, ok := rpcError(err); ok {
+ return nil, rpcErr
+ }
+ // TODO(q3k): log this
+ return nil, status.Errorf(codes.Unavailable, "could not retrieve node %s: %v", id, err)
+ }
+ kvs := res.Responses[0].GetResponseRange().Kvs
+ if len(kvs) > 0 {
+ node, err := nodeUnmarshal(kvs[0].Value)
+ if err != nil {
+ // TODO(q3k): log this
+ return nil, status.Errorf(codes.Unavailable, "could not unmarshal node")
+ }
+ // If the existing node is in the NEW state already, there's nothing to do,
+ // return no error. This can happen in case of spurious retries from the calling
+ // node.
+ if node.state == cpb.NodeState_NODE_STATE_NEW {
+ return &ipb.RegisterNodeResponse{}, nil
+ }
+ // We can return a bit more information to the calling node here, as if it's in
+ // possession of the private key corresponding to an existing node in the
+ // cluster, it should have access to the status of the node without danger of
+ // leaking data about other nodes. TODO(q3k): log this
+ return nil, status.Errorf(codes.FailedPrecondition, "node already exists in cluster, state %s", node.state.String())
+ }
+
+ // No node exists, create one.
+ node := &Node{
+ pubkey: pubkey,
+ state: cpb.NodeState_NODE_STATE_NEW,
+ }
+ nodeBytes, err := proto.Marshal(node.proto())
+ if err != nil {
+ // TODO(q3k): log this
+ return nil, status.Errorf(codes.Unavailable, "could not marshal new node")
+ }
+ _, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(nodeBytes)))
+ if err != nil {
+ // TODO(q3k): log this
+ return nil, status.Error(codes.Unavailable, "could not save new node")
+ }
+
+ return &ipb.RegisterNodeResponse{}, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 849f9b8..676688d 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -4,15 +4,11 @@
"bytes"
"context"
"crypto/ed25519"
- "crypto/rand"
"sort"
- "go.etcd.io/etcd/clientv3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "google.golang.org/protobuf/proto"
- ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/identity"
apb "source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
@@ -49,40 +45,12 @@
)
func (l *leaderManagement) GetRegisterTicket(ctx context.Context, req *apb.GetRegisterTicketRequest) (*apb.GetRegisterTicketResponse, error) {
- // Retrieve existing ticket, if any.
- res, err := l.txnAsLeader(ctx, clientv3.OpGet(registerTicketEtcdPath))
+ ticket, err := l.ensureRegisterTicket(ctx)
if err != nil {
- return nil, status.Errorf(codes.Unavailable, "could not retrieve register ticket: %v", err)
+ return nil, err
}
- kvs := res.Responses[0].GetResponseRange().Kvs
- if len(kvs) > 0 {
- // Ticket already generated, return.
- return &apb.GetRegisterTicketResponse{
- Ticket: kvs[0].Value,
- }, nil
- }
-
- // No ticket, generate one.
- ticket := &ppb.RegisterTicket{
- Opaque: make([]byte, registerTicketSize),
- }
- _, err = rand.Read(ticket.Opaque)
- if err != nil {
- return nil, status.Errorf(codes.Unavailable, "could not generate new ticket: %v", err)
- }
- ticketBytes, err := proto.Marshal(ticket)
- if err != nil {
- return nil, status.Errorf(codes.Unavailable, "could not marshal new ticket: %v", err)
- }
-
- // Commit new ticket to etcd.
- _, err = l.txnAsLeader(ctx, clientv3.OpPut(registerTicketEtcdPath, string(ticketBytes)))
- if err != nil {
- return nil, status.Errorf(codes.Unavailable, "could not save new ticket: %v", err)
- }
-
return &apb.GetRegisterTicketResponse{
- Ticket: ticketBytes,
+ Ticket: ticket,
}, nil
}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 23d24ed..97ea58e 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -123,6 +123,13 @@
t.Fatalf("Dialing local GRPC failed: %v", err)
}
+ // Create an ephemeral node gRPC client for the 'other node'.
+ otherNode := ephemeral.Nodes[1]
+ ocl, err := rpc.NewEphemeralClientTest(externalLis, otherNode.TLSCredentials().PrivateKey.(ed25519.PrivateKey), ephemeral.CA)
+ if err != nil {
+ t.Fatalf("Dialing external GRPC failed: %v", err)
+ }
+
// Close the clients on context cancel.
go func() {
<-ctx.Done()
@@ -134,7 +141,8 @@
mgmtConn: mcl,
localNodeConn: lcl,
localNodeID: nodeCredentials.ID(),
- otherNodeID: ephemeral.Nodes[1].ID(),
+ otherNodeConn: ocl,
+ otherNodeID: otherNode.ID(),
caPubKey: ephemeral.CA.PublicKey.(ed25519.PublicKey),
cancel: ctxC,
etcd: cl,
@@ -153,6 +161,9 @@
localNodeConn grpc.ClientConnInterface
// localNodeID is the NodeID of the fake node that the leader is running on.
localNodeID string
+ // otherNodeConn is an connection from some other node (otherNodeID) into the
+ // cluster, authenticated using an ephemeral certificate.
+ otherNodeConn grpc.ClientConnInterface
// otherNodeID is the NodeID of some other node present in the curator
// state.
otherNodeID string
@@ -423,8 +434,10 @@
}
}
-// TestManagementRegisterTicket exercises the Management.GetRegisterTicket RPC.
-func TestManagementRegisterTicket(t *testing.T) {
+// TestRegistration exercises the Management.GetRegisterTicket RPC and
+// Curator.RegisterNode. A node should be able to register itself into a running
+// cluster, and have a NEW state.
+func TestRegistration(t *testing.T) {
cl := fakeLeader(t)
defer cl.cancel()
@@ -450,6 +463,15 @@
if !bytes.Equal(res1.Ticket, res2.Ticket) {
t.Errorf("Unexpected ticket change between calls")
}
+
+ // Register 'other node' into cluster.
+ cur := ipb.NewCuratorClient(cl.otherNodeConn)
+ _, err = cur.RegisterNode(ctx, &ipb.RegisterNodeRequest{
+ RegisterTicket: res1.Ticket,
+ })
+ if err != nil {
+ t.Fatalf("RegisterNode failed: %v", err)
+ }
}
// TestClusterUpdateNodeStatus exercises the Curator.UpdateNodeStatus RPC by
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 8a8dcd8..aea6f1d 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -383,3 +383,12 @@
})
return
}
+
+func (l *listener) RegisterNode(ctx context.Context, req *cpb.RegisterNodeRequest) (res *cpb.RegisterNodeResponse, err error) {
+ err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ var err2 error
+ res, err2 = impl.RegisterNode(ctx, req)
+ return err2
+ })
+ return
+}
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index 8bf35df..92d0b97 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -43,7 +43,7 @@
// is eventually consistent with the state of the objects in the Curator.
rpc Watch(WatchRequest) returns (stream WatchEvent) {
option (metropolis.proto.ext.authorization) = {
- need: PERMISSION_READ_CLUSTER_STATUS;
+ need: PERMISSION_READ_CLUSTER_STATUS
};
}
// UpdateNodestatus is called by nodes in the cluster to report their own
@@ -51,7 +51,18 @@
// Watch.
rpc UpdateNodeStatus(UpdateNodeStatusRequest) returns (UpdateNodeStatusResponse) {
option (metropolis.proto.ext.authorization) = {
- need: PERMISSION_UPDATE_NODE_SELF;
+ need: PERMISSION_UPDATE_NODE_SELF
+ };
+ }
+
+ // RegisterNode is called by nodes that wish to begin registering into the
+ // cluster. This will created a 'New' node in the cluster state.
+ rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {
+ option (metropolis.proto.ext.authorization) = {
+ // The node doesn't yet have any credentials and will provide a
+ // self-signed ephemeral certificate to prove ownership of an
+ // Ed25519 key.
+ allow_unauthenticated: true
};
}
}
@@ -138,3 +149,14 @@
message UpdateNodeStatusResponse {
}
+
+message RegisterNodeRequest {
+ // register_ticket is the opaque Register Ticket required from a node to
+ // begin registering it into a cluster. It's provided to the registering
+ // node by a cluster operator in NodeParameters, and it retrieved by an
+ // operator from a running cluster via Management.GetRegisterTicket.
+ bytes register_ticket = 1;
+}
+
+message RegisterNodeResponse {
+}
\ No newline at end of file
diff --git a/metropolis/node/core/curator/state_registerticket.go b/metropolis/node/core/curator/state_registerticket.go
new file mode 100644
index 0000000..e516661
--- /dev/null
+++ b/metropolis/node/core/curator/state_registerticket.go
@@ -0,0 +1,52 @@
+package curator
+
+import (
+ "context"
+ "crypto/rand"
+
+ "go.etcd.io/etcd/clientv3"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+
+ ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+)
+
+// ensureRegisterTicket returns the cluster's current RegisterTicket, creating
+// one if not yet present in the cluster state.
+func (l *leadership) ensureRegisterTicket(ctx context.Context) ([]byte, error) {
+ l.muRegisterTicket.Lock()
+ defer l.muRegisterTicket.Unlock()
+
+ // Retrieve existing ticket, if any.
+ res, err := l.txnAsLeader(ctx, clientv3.OpGet(registerTicketEtcdPath))
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not retrieve register ticket: %v", err)
+ }
+ kvs := res.Responses[0].GetResponseRange().Kvs
+ if len(kvs) > 0 {
+ // Ticket already generated, return.
+ return kvs[0].Value, nil
+ }
+
+ // No ticket, generate one.
+ ticket := &ppb.RegisterTicket{
+ Opaque: make([]byte, registerTicketSize),
+ }
+ _, err = rand.Read(ticket.Opaque)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not generate new ticket: %v", err)
+ }
+ ticketBytes, err := proto.Marshal(ticket)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not marshal new ticket: %v", err)
+ }
+
+ // Commit new ticket to etcd.
+ _, err = l.txnAsLeader(ctx, clientv3.OpPut(registerTicketEtcdPath, string(ticketBytes)))
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not save new ticket: %v", err)
+ }
+
+ return ticketBytes, nil
+}
diff --git a/metropolis/proto/api/configuration.proto b/metropolis/proto/api/configuration.proto
index 41e57d0..62d5094 100644
--- a/metropolis/proto/api/configuration.proto
+++ b/metropolis/proto/api/configuration.proto
@@ -18,6 +18,8 @@
package metropolis.proto.api;
option go_package = "source.monogon.dev/metropolis/proto/api";
+import "metropolis/proto/common/common.proto";
+
// NodeParameters is the data with which a Node is set booted. It contains the
// configuration required for a node to either bootstrap a new cluster, or
// register into an existing one.
@@ -25,10 +27,34 @@
// implementation-specific way (currently: either on ESP partition or via qemu
// fw_cfg).
message NodeParameters {
+ // ClusterBootstrap configures the node to attempt to create a new cluster
+ // from scratch. Further nodes can become part of the cluster by being
+ // configured with ClusterRegister, which should contain data retrieved from
+ // the newly bootstrapped cluster by its operator.
message ClusterBootstrap {
+ // owner_public_key is a raw Ed25519 public whose corresponding private
+ // key can be used to prove ownership of the cluster and retrieve
+ // management credentials for the cluster via an AAA.Escrow call.
bytes owner_public_key = 1;
}
+ // ClusterRegister configures the node to attempt to register into an
+ // existing cluster, ie. contact an existing running cluster and become
+ // its member.
message ClusterRegister {
+ // cluster_directory is a directory (mapping of names into IP addresses
+ // and public keys) of existing nodes in the cluster. It's used as the
+ // initial contact point of the already running cluster that the node
+ // should register into. It can be retrieved by an operator from
+ // a running cluster via Management.GetClusterInfo.
+ metropolis.proto.common.ClusterDirectory cluster_directory = 1;
+ // register_ticket is the opaque Register Ticket required from a node to
+ // begin registering it into a cluster. It can be retrieved by an
+ // operator from a running cluster via Management.GetRegisterTicket.
+ bytes register_ticket = 2;
+ // ca_public_key is the public key of the CA of the cluster that the
+ // node should expect when contacting nodes in cluster_directory and
+ // attempting to register into a cluster.
+ bytes ca_public_key = 3;
}
oneof cluster {
ClusterBootstrap cluster_bootstrap = 1;