m/n/c/curator: implement Join Flow
This implements Join Flow in Curator, as described in Cluster Lifecycle
and Integrity design document.
Change-Id: Idabb471575e1d22a7eb7cce2ad29d18f1f94760a
Reviewed-on: https://review.monogon.dev/c/monogon/+/667
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/curator/bootstrap.go b/metropolis/node/core/curator/bootstrap.go
index 12b24fd..215bfb7 100644
--- a/metropolis/node/core/curator/bootstrap.go
+++ b/metropolis/node/core/curator/bootstrap.go
@@ -91,7 +91,7 @@
InitialCRL: crl,
})
- nodePath, err := node.etcdPath()
+ nodePath, err := node.etcdNodePath()
if err != nil {
return nil, nil, fmt.Errorf("failed to get node key: %w", err)
}
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 26f3004..1c7221f 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -5,6 +5,7 @@
"crypto/subtle"
"fmt"
"net"
+ "time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -245,6 +246,7 @@
}
// ... update its' status ...
node.status = req.Status
+ node.status.Timestamp = time.Now().UnixNano()
// ... and save it to etcd.
if err := nodeSave(ctx, l.leadership, node); err != nil {
return nil, err
@@ -263,6 +265,9 @@
}
pubkey := pi.Unauthenticated.SelfSignedPublicKey
+ // TODO(mateusz@monogon.tech): check req.JoinKey length once Join Flow is
+ // implemented on the client side.
+
// Verify that call contains a RegisterTicket and that this RegisterTicket is
// valid.
wantTicket, err := l.ensureRegisterTicket(ctx)
@@ -307,6 +312,7 @@
// No node exists, create one.
node = &Node{
pubkey: pubkey,
+ jkey: req.JoinKey,
state: cpb.NodeState_NODE_STATE_NEW,
}
if err := nodeSave(ctx, l.leadership, node); err != nil {
@@ -414,3 +420,39 @@
NodeCertificate: nodeCertBytes,
}, nil
}
+
+func (l *leaderCurator) JoinNode(ctx context.Context, req *ipb.JoinNodeRequest) (*ipb.JoinNodeResponse, error) {
+ // Gather peer information.
+ pi := rpc.GetPeerInfo(ctx)
+ if pi == nil || pi.Unauthenticated == nil {
+ return nil, status.Error(codes.PermissionDenied, "connection must be established with a self-signed ephemeral certificate")
+ }
+ // The node will attempt to connect using its Join Key. jkey will contain
+ // its public part.
+ jkey := pi.Unauthenticated.SelfSignedPublicKey
+
+ // Take the lock to prevent data races during the next step.
+ l.muNodes.Lock()
+ defer l.muNodes.Unlock()
+
+ // Resolve the Node ID using Join Key, then use the ID to load node
+ // information from etcd.
+ id, err := nodeIdByJoinKey(ctx, l.leadership, jkey)
+ if err != nil {
+ return nil, err
+ }
+ node, err := nodeLoad(ctx, l.leadership, id)
+ if err != nil {
+ return nil, err
+ }
+
+ // Don't progress further unless the node is already UP.
+ if node.state != cpb.NodeState_NODE_STATE_UP {
+ return nil, status.Errorf(codes.FailedPrecondition, "node isn't UP, cannot join")
+ }
+
+ // Return the Node's CUK, completing the Join Flow.
+ return &ipb.JoinNodeResponse{
+ ClusterUnlockKey: node.clusterUnlockKey,
+ }, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 7254fab..13564c4 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -119,12 +119,13 @@
// Build a curator leader object. This implements methods that will be
// exercised by tests.
- leader := newCuratorLeader(&leadership{
+ leadership := &leadership{
lockKey: lockKey,
lockRev: lockRev,
etcd: curEtcd,
consensus: consensus.TestServiceHandle(t, cluster.Client(0)),
- }, &nodeCredentials.Node)
+ }
+ leader := newCuratorLeader(leadership, &nodeCredentials.Node)
// Create a curator gRPC server which performs authentication as per the created
// ServerSecurity and is backed by the created leader.
@@ -179,6 +180,8 @@
}()
return fakeLeaderData{
+ l: leadership,
+ curatorLis: externalLis,
mgmtConn: mcl,
localNodeConn: lcl,
localNodeID: nodeCredentials.ID(),
@@ -194,6 +197,11 @@
// fakeLeaderData is returned by fakeLeader and contains information about the
// newly created leader and connections to its gRPC listeners.
type fakeLeaderData struct {
+ // l is a type internal to Curator, representing its ability to perform
+ // actions as a leader.
+ l *leadership
+ // curatorLis is a listener intended for Node connections.
+ curatorLis *bufconn.Listener
// mgmtConn is a gRPC connection to the leader's public gRPC interface,
// authenticated as a cluster manager.
mgmtConn grpc.ClientConnInterface
@@ -514,10 +522,17 @@
t.Errorf("Unexpected ticket change between calls")
}
+ // Generate the node's public join key to be used in the bootstrap process.
+ nodeJoinPub, _, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate node join keypair: %v", err)
+ }
+
// Register 'other node' into cluster.
cur := ipb.NewCuratorClient(cl.otherNodeConn)
_, err = cur.RegisterNode(ctx, &ipb.RegisterNodeRequest{
RegisterTicket: res1.Ticket,
+ JoinKey: nodeJoinPub,
})
if err != nil {
t.Fatalf("RegisterNode failed: %v", err)
@@ -579,6 +594,60 @@
expectOtherNode(cpb.NodeState_NODE_STATE_UP)
}
+// TestJoin exercises Join Flow, as described in "Cluster Lifecycle" design
+// document, assuming the node has already completed Register Flow.
+func TestJoin(t *testing.T) {
+ cl := fakeLeader(t)
+ defer cl.cancel()
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ // Build the test node and save it into etcd.
+ npub, _, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate node keypair: %v", err)
+ }
+ jpub, jpriv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate join keypair: %v", err)
+ }
+ cuk := []byte("fakefakefakefakefakefakefakefake")
+ node := Node{
+ clusterUnlockKey: cuk,
+ pubkey: npub,
+ jkey: jpub,
+ state: cpb.NodeState_NODE_STATE_UP,
+ }
+ if err := nodeSave(ctx, cl.l, &node); err != nil {
+ t.Fatalf("nodeSave failed: %v", err)
+ }
+
+ // Connect to Curator using the node's Join Credentials, as opposed to the
+ // node keypair, then join the cluster.
+ withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
+ return cl.curatorLis.Dial()
+ })
+ ephCreds, err := rpc.NewEphemeralCredentials(jpriv, cl.ca)
+ if err != nil {
+ t.Fatalf("NewEphemeralCredentials: %v", err)
+ }
+ eph, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(ephCreds))
+ if err != nil {
+ t.Fatalf("Dialing external GRPC failed: %v", err)
+ }
+ cur := ipb.NewCuratorClient(eph)
+ jr, err := cur.JoinNode(ctx, &ipb.JoinNodeRequest{})
+ if err != nil {
+ t.Fatalf("JoinNode failed: %v", err)
+ }
+
+ // Compare the received CUK with the one we started out with.
+ if bytes.Compare(cuk, jr.ClusterUnlockKey) != 0 {
+ t.Fatal("JoinNode returned an invalid CUK.")
+ }
+}
+
// TestClusterUpdateNodeStatus exercises the Curator.UpdateNodeStatus RPC by
// sending node updates and making sure they are reflected in subsequent Watch
// events.
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 705512d..9715c91 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -387,6 +387,15 @@
return
}
+func (l *listener) JoinNode(ctx context.Context, req *cpb.JoinNodeRequest) (res *cpb.JoinNodeResponse, err error) {
+ err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
+ var err2 error
+ res, err2 = impl.JoinNode(ctx, req)
+ return err2
+ })
+ return
+}
+
type managementGetNodesServer struct {
grpc.ServerStream
ctx context.Context
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index 9a03a15..425051f 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -99,6 +99,22 @@
allow_unauthenticated: true
};
}
+
+ // JoinNode is called by nodes (re)joining the cluster. Register Flow must
+ // be completed beforehand (see: CommitNode). This call will fail if the
+ // calling node is not in the UP state. This call is idempotent, and as
+ // such it can be retried.
+ //
+ // JoinNode is authenticated in the transport layer with a Join Key passed
+ // to Curator in an earlier RegisterNode call.
+ //
+ // Upon success, the node will receive its Cluster Unlock Key, enabling it
+ // to mount encrypted storage after combining with Node Unlock Key.
+ rpc JoinNode(JoinNodeRequest) returns (JoinNodeResponse) {
+ option (metropolis.proto.ext.authorization) = {
+ allow_unauthenticated: true
+ };
+ }
}
// Node is the state and configuration of a node in the cluster.
@@ -190,6 +206,9 @@
// node by a cluster operator in NodeParameters, and it retrieved by an
// operator from a running cluster via Management.GetRegisterTicket.
bytes register_ticket = 1;
+ // join_key is an ED25519 public key generated during registration. It's
+ // shared with Curator to authenticate the join procedure later on.
+ bytes join_key = 2;
}
message RegisterNodeResponse {
@@ -212,4 +231,14 @@
// node was connecting with. This certificate should be used by the node for
// communication with the cluster from this point onward.
bytes node_certificate = 2;
-}
\ No newline at end of file
+}
+
+message JoinNodeRequest {
+}
+
+message JoinNodeResponse {
+ // cluster_unlock_key (CUK) is the key submitted by the node through
+ // CommitNodeRequest, and returned in this message after authenticating
+ // with Join Credentials.
+ bytes cluster_unlock_key = 1;
+}
diff --git a/metropolis/node/core/curator/proto/private/storage.proto b/metropolis/node/core/curator/proto/private/storage.proto
index c8b6b79..c4a876c 100644
--- a/metropolis/node/core/curator/proto/private/storage.proto
+++ b/metropolis/node/core/curator/proto/private/storage.proto
@@ -26,6 +26,10 @@
metropolis.proto.common.NodeRoles roles = 4;
metropolis.proto.common.NodeStatus status = 5;
+
+ // join_key is an ED25519 public key used to authenticate the join
+ // operation. It's generated by the node during the registration process.
+ bytes join_key = 6;
}
// Information about the cluster owner, currently the only Metropolis management
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
index ce87723..82f9859 100644
--- a/metropolis/node/core/curator/state_node.go
+++ b/metropolis/node/core/curator/state_node.go
@@ -19,6 +19,7 @@
import (
"context"
"crypto/x509"
+ "encoding/hex"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
@@ -62,6 +63,11 @@
// The public key is used to generate the Node's canonical ID.
pubkey []byte
+ // jkey is the node's ED25519 public Join Key. The private part of the key
+ // never leaves the node. The key is generated by the node and passed to
+ // Curator during the registration process.
+ jkey []byte
+
// state is the state of this node as seen from the point of view of the
// cluster. See //metropolis/proto:common.proto for more information.
state cpb.NodeState
@@ -167,21 +173,31 @@
}
var (
+ // nodeEtcdPrefix is an etcd key prefix preceding cluster member node IDs,
+ // mapping to ppb.Node values.
nodeEtcdPrefix = mustNewEtcdPrefix("/nodes/")
+ // joinCredPrefix is an etcd key prefix preceding hex-encoded cluster member
+ // node join keys, mapping to node IDs.
+ joinCredPrefix = mustNewEtcdPrefix("/join_keys/")
)
-// etcdPath builds the etcd path in which this node's protobuf-serialized state
-// is stored in etcd.
-func (n *Node) etcdPath() (string, error) {
+// etcdNodePath builds the etcd path in which this node's protobuf-serialized
+// state is stored in etcd.
+func (n *Node) etcdNodePath() (string, error) {
return nodeEtcdPrefix.Key(n.ID())
}
+func (n *Node) etcdJoinKeyPath() (string, error) {
+ return joinCredPrefix.Key(hex.EncodeToString(n.jkey))
+}
+
// proto serializes the Node object into protobuf, to be used for saving to
// etcd.
func (n *Node) proto() *ppb.Node {
msg := &ppb.Node{
ClusterUnlockKey: n.clusterUnlockKey,
PublicKey: n.pubkey,
+ JoinKey: n.jkey,
FsmState: n.state,
Roles: &cpb.NodeRoles{},
Status: n.status,
@@ -215,6 +231,7 @@
n := &Node{
clusterUnlockKey: msg.ClusterUnlockKey,
pubkey: msg.PublicKey,
+ jkey: msg.JoinKey,
state: msg.FsmState,
status: msg.Status,
}
@@ -266,7 +283,7 @@
rpc.Trace(ctx).Printf("loadNode(%s)...", id)
key, err := nodeEtcdPrefix.Key(id)
if err != nil {
- // TODO(issues/85): log err
+ rpc.Trace(ctx).Printf("invalid node id: %v", err)
return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
}
res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
@@ -274,7 +291,7 @@
if rpcErr, ok := rpcError(err); ok {
return nil, rpcErr
}
- // TODO(issues/85): log this
+ rpc.Trace(ctx).Printf("could not retrieve node %s: %v", id, err)
return nil, status.Errorf(codes.Unavailable, "could not retrieve node %s: %v", id, err)
}
kvs := res.Responses[0].GetResponseRange().Kvs
@@ -284,7 +301,7 @@
}
node, err := nodeUnmarshal(kvs[0].Value)
if err != nil {
- // TODO(issues/85): log this
+ rpc.Trace(ctx).Printf("could not unmarshal node: %v", err)
return nil, status.Errorf(codes.Unavailable, "could not unmarshal node")
}
rpc.Trace(ctx).Printf("loadNode(%s): unmarshal ok", id)
@@ -294,26 +311,80 @@
// nodeSave attempts to save a node into etcd, within a given active leadership.
// All returned errors are gRPC statuses that safe to return to untrusted callers.
func nodeSave(ctx context.Context, l *leadership, n *Node) error {
+ // Build an etcd operation to save the node with a key based on its ID.
id := n.ID()
rpc.Trace(ctx).Printf("nodeSave(%s)...", id)
- key, err := nodeEtcdPrefix.Key(id)
+ nkey, err := nodeEtcdPrefix.Key(id)
if err != nil {
- // TODO(issues/85): log err
+ rpc.Trace(ctx).Printf("invalid node id: %v", err)
return status.Errorf(codes.InvalidArgument, "invalid node id")
}
nodeBytes, err := proto.Marshal(n.proto())
if err != nil {
- // TODO(issues/85): log this
+ rpc.Trace(ctx).Printf("could not marshal updated node: %v", err)
return status.Errorf(codes.Unavailable, "could not marshal updated node")
}
- _, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(nodeBytes)))
+ ons := clientv3.OpPut(nkey, string(nodeBytes))
+ ops := []clientv3.Op{ons}
+
+ // Build an etcd operation to map the node's Join Key into its ID for use in
+ // Join Flow, if jkey is set. Once Join Flow is implemented on the client
+ // side, this operation will become mandatory.
+ if n.jkey != nil {
+ jkey, err := n.etcdJoinKeyPath()
+ if err != nil {
+ // This should never happen.
+ rpc.Trace(ctx).Printf("invalid join key representation: %v", err)
+ return status.Errorf(codes.InvalidArgument, "invalid join key representation")
+ }
+ // TODO(mateusz@monogon.tech): ensure that if the join key index already
+ // exists, it points to the node we're saving. Refuse to save/update the
+ // node if it doesn't.
+ oks := clientv3.OpPut(jkey, id)
+ ops = append(ops, oks)
+ }
+
+ // Execute one or both operations atomically.
+ _, err = l.txnAsLeader(ctx, ops...)
if err != nil {
if rpcErr, ok := rpcError(err); ok {
return rpcErr
}
- // TODO(issues/85): log this
+ rpc.Trace(ctx).Printf("could not save updated node: %v", err)
return status.Error(codes.Unavailable, "could not save updated node")
}
rpc.Trace(ctx).Printf("nodeSave(%s): write ok", id)
return nil
}
+
+// nodeIdByJoinKey attempts to fetch a Node ID corresponding to the given Join
+// Key from etcd, within a given active leadership. All returned errors are
+// gRPC statuses that are safe to return to untrusted callers. If the given
+// Join Key is not found, errNodeNotFound will be returned along with an empty
+// string.
+func nodeIdByJoinKey(ctx context.Context, l *leadership, jkey []byte) (string, error) {
+ if len(jkey) == 0 {
+ return "", status.Errorf(codes.InvalidArgument, "join key is empty")
+ }
+
+ cred := hex.EncodeToString(jkey)
+ key, err := joinCredPrefix.Key(cred)
+ if err != nil {
+ // This should never happen.
+ rpc.Trace(ctx).Printf("invalid join key representation: %v", err)
+ return "", status.Errorf(codes.InvalidArgument, "invalid join key representation")
+ }
+ res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
+ if err != nil {
+ if rpcErr, ok := rpcError(err); ok {
+ return "", rpcErr
+ }
+ return "", status.Errorf(codes.Unavailable, "could not retrieve node id matching join key %s: %v", cred, err)
+ }
+ kvs := res.Responses[0].GetResponseRange().Kvs
+ if len(kvs) != 1 {
+ return "", errNodeNotFound
+ }
+ id := string(kvs[0].Value[:])
+ return id, nil
+}
diff --git a/metropolis/proto/common/common.proto b/metropolis/proto/common/common.proto
index 07ff460..46bd08f 100644
--- a/metropolis/proto/common/common.proto
+++ b/metropolis/proto/common/common.proto
@@ -132,6 +132,9 @@
// listening for gRPC, and role-specific services like etcd and
// Kubernetes).
string external_address = 1;
+ // timestamp is an epoch number associated with the last status update.
+ // It's set with a nanosecond granularity.
+ int64 timestamp = 2;
}
// The Cluster Directory is information about the network addressing of nodes