m/n/c/curator: implement Management.ApproveNode
This takes a node from NEW to STANDBY. This is the second-to-last
step requires in a node's regsitration flow.
Change-Id: I88f9c7d2cd824c7d3182195b784a725ec9528d28
Reviewed-on: https://review.monogon.dev/c/monogon/+/442
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 361a251..80d0448 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -3,10 +3,13 @@
import (
"bytes"
"context"
+ "crypto/ed25519"
"sort"
+ "go.etcd.io/etcd/clientv3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node/core/identity"
apb "source.monogon.dev/metropolis/proto/api"
@@ -151,3 +154,60 @@
return nil
}
+
+func (l *leaderManagement) ApproveNode(ctx context.Context, req *apb.ApproveNodeRequest) (*apb.ApproveNodeResponse, error) {
+ // MVP: check if policy allows for this node to be approved for this cluster.
+ // This should happen automatically, if possible, via hardware attestation
+ // against policy, not manually.
+
+ if len(req.Pubkey) != ed25519.PublicKeySize {
+ return nil, status.Errorf(codes.InvalidArgument, "pubkey must be %d bytes long", ed25519.PublicKeySize)
+ }
+
+ l.muNodes.Lock()
+ defer l.muNodes.Unlock()
+
+ // Find node by pubkey/ID.
+ id := identity.NodeID(req.Pubkey)
+ key, err := nodeEtcdPrefix.Key(id)
+ if err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "pubkey invalid: %v", err)
+ }
+ res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not retrieve node: %v", err)
+ }
+ kvs := res.Responses[0].GetResponseRange().Kvs
+ if len(kvs) != 1 {
+ return nil, status.Errorf(codes.NotFound, "node not found")
+ }
+ node, err := nodeUnmarshal(kvs[0].Value)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "could not deserialize node: %v", err)
+ }
+
+ // Ensure node is either UP/STANDBY (no-op) or NEW (set to STANDBY).
+ switch node.state {
+ case cpb.NodeState_NODE_STATE_UP, cpb.NodeState_NODE_STATE_STANDBY:
+ // No-op for idempotency.
+ return &apb.ApproveNodeResponse{}, nil
+ case cpb.NodeState_NODE_STATE_NEW:
+ // What we can act on.
+ default:
+ return nil, status.Errorf(codes.FailedPrecondition, "node in state %s cannot be approved", node.state)
+ }
+
+ node.state = cpb.NodeState_NODE_STATE_STANDBY
+ nodeBytes, err := proto.Marshal(node.proto())
+ if err != nil {
+ // TODO(issues/85): log this
+ return nil, status.Errorf(codes.Unavailable, "could not marshal updated node")
+ }
+ _, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(nodeBytes)))
+ if err != nil {
+ // TODO(issues/85): log this
+ return nil, status.Error(codes.Unavailable, "could not save updated node")
+ }
+
+ return &apb.ApproveNodeResponse{}, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 2608d08..354812a 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -435,9 +435,15 @@
}
}
-// TestRegistration exercises the Management.GetRegisterTicket RPC and
-// Curator.RegisterNode. A node should be able to register itself into a running
-// cluster, and have a NEW state.
+// TestRegistration exercises the node 'Register' (a.k.a. Registration) flow,
+// which is described in the Cluster Lifecycle design document.
+//
+// It starts out with a node that's foreign to the cluster, and performs all
+// the steps required to make that node part of a cluster. It calls into the
+// Curator service as the registering node and the Management service as a
+// cluster manager. The node registered into the cluster is fully fake, ie. is
+// not an actual Metropolis node but instead is fully managed from within the
+// test as a set of credentials.
func TestRegistration(t *testing.T) {
cl := fakeLeader(t)
defer cl.cancel()
@@ -473,6 +479,59 @@
if err != nil {
t.Fatalf("RegisterNode failed: %v", err)
}
+
+ // Expect node to now be 'NEW'.
+ res3, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+ if err != nil {
+ t.Fatalf("GetNodes failed: %v", err)
+ }
+
+ var otherNodePubkey []byte
+ for {
+ node, err := res3.Recv()
+ if err != nil {
+ t.Fatalf("Recv failed: %v", err)
+ }
+ if identity.NodeID(node.Pubkey) != cl.otherNodeID {
+ continue
+ }
+ if node.State != cpb.NodeState_NODE_STATE_NEW {
+ t.Fatalf("Expected node to be NEW, is %s", node.State)
+ }
+ otherNodePubkey = node.Pubkey
+ break
+ }
+
+ // Approve node.
+ _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{Pubkey: otherNodePubkey})
+ if err != nil {
+ t.Fatalf("ApproveNode failed: %v", err)
+ }
+
+ // Expect node to be 'STANDBY'.
+ res4, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+ if err != nil {
+ t.Fatalf("GetNodes failed: %v", err)
+ }
+ for {
+ node, err := res4.Recv()
+ if err != nil {
+ t.Fatalf("Recv failed: %v", err)
+ }
+ if identity.NodeID(node.Pubkey) != cl.otherNodeID {
+ continue
+ }
+ if node.State != cpb.NodeState_NODE_STATE_STANDBY {
+ t.Fatalf("Expected node to be STANDBY, is %s", node.State)
+ }
+ break
+ }
+
+ // Approve call should be idempotent and not fail when called a second time.
+ _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{Pubkey: otherNodePubkey})
+ if err != nil {
+ t.Fatalf("ApproveNode failed: %v", err)
+ }
}
// TestClusterUpdateNodeStatus exercises the Curator.UpdateNodeStatus RPC by
@@ -597,51 +656,3 @@
t.Fatalf("CaPublicKey mismatch (wanted %s, got %s)", hex.EncodeToString(want), hex.EncodeToString(got))
}
}
-
-// TestRegisterNode is a smoke test for Curator.RegisterNode and
-// Management.GetNodes.
-func TestRegisterNode(t *testing.T) {
- cl := fakeLeader(t)
- defer cl.cancel()
-
- ctx, ctxC := context.WithCancel(context.Background())
- defer ctxC()
-
- // Get RegisterTicket.
- mgmt := apb.NewManagementClient(cl.mgmtConn)
- resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
- if err != nil {
- t.Fatalf("GetRegisterTicket: %v", err)
- }
-
- // Register as other node.
- curOther := ipb.NewCuratorClient(cl.otherNodeConn)
- _, err = curOther.RegisterNode(ctx, &ipb.RegisterNodeRequest{RegisterTicket: resT.Ticket})
- if err != nil {
- t.Fatalf("RegisterNode: %v", err)
- }
-
- // Ensure that the cluster sees the node as NEW via GetNodes.
- resN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
- if err != nil {
- t.Fatalf("GetNodes: %v", err)
- }
- // Receive nodes from GetNodes until we find the node that we just registered.
- var otherNode *apb.Node
- for {
- node, err := resN.Recv()
- if err != nil {
- t.Fatalf("GetNodes.Recv: %v", err)
- }
-
- id := identity.NodeID(node.Pubkey)
- if id == cl.otherNodeID {
- otherNode = node
- break
- }
- }
- // Ensure that this node is marked as NEW.
- if want, got := cpb.NodeState_NODE_STATE_NEW, otherNode.State; want != got {
- t.Fatalf("Newly registerd should have state %s, got %s", want, got)
- }
-}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index a562063..c067c02 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -415,3 +415,12 @@
}
return l.callImpl(srv.Context(), proxy)
}
+
+func (l *listener) ApproveNode(ctx context.Context, req *apb.ApproveNodeRequest) (res *apb.ApproveNodeResponse, err error) {
+ err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ var err2 error
+ res, err2 = impl.ApproveNode(ctx, req)
+ return err2
+ })
+ return
+}
diff --git a/metropolis/proto/api/management.proto b/metropolis/proto/api/management.proto
index 63336c8..ba50849 100644
--- a/metropolis/proto/api/management.proto
+++ b/metropolis/proto/api/management.proto
@@ -38,6 +38,28 @@
need: PERMISSION_READ_CLUSTER_STATUS
};
}
+
+ // ApproveNode progresses a node's registration process by changing its state
+ // in the cluster from NEW to STANDBY, if not yet STANDBY. This is required
+ // for the node to fully become part of the cluster (ie. have an UP state),
+ // and is required to be called by a manager manually.
+ //
+ // Managers can find out what nodes require approval by performing
+ // a GetNodes call and filtering for nodes in the NEW state. This call is
+ // idempotent and can be executed multiple times, and is a no-op if the node
+ // is already in the STANDBY or even UP states.
+ //
+ // In the future, approval process will be governed by cluster policy, but
+ // currently any node can be approved by a manager, and the manager is
+ // responsible for performing an out-of-band attestation of the node being/
+ // approved (eg. by verifying that the node that is being approved has the
+ // same public key as what the registering node displays in its startup
+ // logs).
+ rpc ApproveNode(ApproveNodeRequest) returns (ApproveNodeResponse) {
+ option (metropolis.proto.ext.authorization) = {
+ need: PERMISSION_APPROVE_NODE
+ };
+ }
}
message GetRegisterTicketRequest {
@@ -85,3 +107,13 @@
// Roles assigned by the cluster. This is always set.
metropolis.proto.common.NodeRoles roles = 4;
}
+
+
+message ApproveNodeRequest {
+ // Raw public key of the node being approved, has to correspond to a node
+ // currently in the cluster.
+ bytes pubkey = 1;
+}
+
+message ApproveNodeResponse {
+}
\ No newline at end of file
diff --git a/metropolis/proto/ext/authorization.proto b/metropolis/proto/ext/authorization.proto
index 42beb88..60ad68a 100644
--- a/metropolis/proto/ext/authorization.proto
+++ b/metropolis/proto/ext/authorization.proto
@@ -22,6 +22,7 @@
PERMISSION_GET_REGISTER_TICKET = 1;
PERMISSION_READ_CLUSTER_STATUS = 2;
PERMISSION_UPDATE_NODE_SELF = 3;
+ PERMISSION_APPROVE_NODE = 4;
}
// Authorization policy for an RPC method. This message/API does not have the