| package curator |
| |
| import ( |
| "context" |
| "crypto/ed25519" |
| "crypto/subtle" |
| "fmt" |
| "io" |
| "net" |
| "time" |
| |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| "google.golang.org/protobuf/proto" |
| tpb "google.golang.org/protobuf/types/known/timestamppb" |
| |
| common "source.monogon.dev/metropolis/node" |
| ipb "source.monogon.dev/metropolis/node/core/curator/proto/api" |
| "source.monogon.dev/metropolis/node/core/identity" |
| "source.monogon.dev/metropolis/node/core/rpc" |
| "source.monogon.dev/metropolis/pkg/event" |
| "source.monogon.dev/metropolis/pkg/event/etcd" |
| "source.monogon.dev/metropolis/pkg/pki" |
| cpb "source.monogon.dev/metropolis/proto/common" |
| ) |
| |
| // leaderCurator implements the Curator gRPC API (ipb.Curator) as a curator |
| // leader. |
| type leaderCurator struct { |
| *leadership |
| } |
| |
| // Watch returns a stream of updates concerning some part of the cluster |
| // managed by the curator. |
| // |
| // See metropolis.node.core.curator.proto.api.Curator for more information about |
| // the RPC semantics. |
| // |
| // TODO(q3k): Currently the watch RPCs are individually backed by etcd cluster |
| // watches (via individual etcd event values), which might be problematic in |
| // case of a significant amount of parallel Watches being issued to the Curator. |
| // It might make sense to combine all pending Watch requests into a single watch |
| // issued to the cluster, with an intermediary caching stage within the curator |
| // instance. However, that is effectively implementing etcd learner/relay logic, |
| // which has has to be carefully considered, especially with regards to serving |
| // stale data. |
| func (l *leaderCurator) Watch(req *ipb.WatchRequest, srv ipb.Curator_WatchServer) error { |
| switch x := req.Kind.(type) { |
| case *ipb.WatchRequest_NodeInCluster_: |
| return l.watchNodeInCluster(x.NodeInCluster, srv) |
| case *ipb.WatchRequest_NodesInCluster_: |
| return l.watchNodesInCluster(x.NodesInCluster, srv) |
| default: |
| return status.Error(codes.Unimplemented, "unsupported watch kind") |
| } |
| } |
| |
| // watchNodeInCluster implements the Watch API when dealing with a single |
| // node-in-cluster request. Effectively, it pipes an etcd value watcher into the |
| // Watch API. |
| func (l *leaderCurator) watchNodeInCluster(nic *ipb.WatchRequest_NodeInCluster, srv ipb.Curator_WatchServer) error { |
| ctx := srv.Context() |
| |
| // Constructing arbitrary etcd path: this is okay, as we only have node objects |
| // underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node |
| // that doesn't exist, and that will just hang . All access is privileged, so |
| // there's also no need to filter anything. |
| nodePath, err := nodeEtcdPrefix.Key(nic.NodeId) |
| if err != nil { |
| return status.Errorf(codes.InvalidArgument, "invalid node name: %v", err) |
| } |
| value := etcd.NewValue(l.etcd, nodePath, nodeValueConverter) |
| |
| w := value.Watch() |
| defer w.Close() |
| |
| for { |
| nodeKV, err := w.Get(ctx) |
| if err != nil { |
| if rpcErr, ok := rpcError(err); ok { |
| return rpcErr |
| } |
| rpc.Trace(ctx).Printf("etcd watch failed: %v", err) |
| return status.Error(codes.Unavailable, "internal error") |
| } |
| |
| ev := &ipb.WatchEvent{} |
| nodeKV.appendToEvent(ev) |
| if err := srv.Send(ev); err != nil { |
| return err |
| } |
| } |
| } |
| |
| // watchNodesInCluster implements the Watch API when dealing with a |
| // all-nodes-in-cluster request. Effectively, it pipes a ranged etcd value |
| // watcher into the Watch API. |
| func (l *leaderCurator) watchNodesInCluster(_ *ipb.WatchRequest_NodesInCluster, srv ipb.Curator_WatchServer) error { |
| ctx := srv.Context() |
| |
| start, end := nodeEtcdPrefix.KeyRange() |
| value := etcd.NewValue[*nodeAtID](l.etcd, start, nodeValueConverter, etcd.Range(end)) |
| |
| w := value.Watch() |
| defer w.Close() |
| |
| // Perform initial fetch from etcd. |
| nodes := make(map[string]*Node) |
| for { |
| nodeKV, err := w.Get(ctx, event.BacklogOnly[*nodeAtID]()) |
| if err == event.BacklogDone { |
| break |
| } |
| if err != nil { |
| rpc.Trace(ctx).Printf("etcd watch failed (initial fetch): %v", err) |
| return status.Error(codes.Unavailable, "internal error during initial fetch") |
| } |
| if nodeKV.value != nil { |
| nodes[nodeKV.id] = nodeKV.value |
| } |
| } |
| |
| // Initial send, chunked to not go over 2MiB (half of the default gRPC message |
| // size limit). |
| // |
| // TODO(q3k): formalize message limits, set const somewhere. |
| we := &ipb.WatchEvent{} |
| for _, n := range nodes { |
| n.appendToEvent(we) |
| if proto.Size(we) > (2 << 20) { |
| if err := srv.Send(we); err != nil { |
| return err |
| } |
| we = &ipb.WatchEvent{} |
| } |
| } |
| // Send last update message. This might be empty, but we need to send the |
| // LAST_BACKLOGGED marker. |
| we.Progress = ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED |
| if err := srv.Send(we); err != nil { |
| return err |
| } |
| |
| // Send updates as they arrive from etcd watcher. |
| for { |
| nodeKV, err := w.Get(ctx) |
| if err != nil { |
| rpc.Trace(ctx).Printf("etcd watch failed (update): %v", err) |
| return status.Errorf(codes.Unavailable, "internal error during update") |
| } |
| we := &ipb.WatchEvent{} |
| nodeKV.appendToEvent(we) |
| if err := srv.Send(we); err != nil { |
| return err |
| } |
| } |
| } |
| |
| // nodeAtID is a key/pair container for a node update received from an etcd |
| // watcher. The value will be nil if this update represents a node being |
| // deleted. |
| type nodeAtID struct { |
| id string |
| value *Node |
| } |
| |
| // nodeValueConverter is called by etcd node value watchers to convert updates |
| // from the cluster into nodeAtID, ensuring data integrity and checking |
| // invariants. |
| func nodeValueConverter(key, value []byte) (*nodeAtID, error) { |
| res := nodeAtID{ |
| id: nodeEtcdPrefix.ExtractID(string(key)), |
| } |
| if len(value) > 0 { |
| node, err := nodeUnmarshal(value) |
| if err != nil { |
| return nil, err |
| } |
| res.value = node |
| if res.id != res.value.ID() { |
| return nil, fmt.Errorf("node ID mismatch (etcd key: %q, value: %q)", res.id, res.value.ID()) |
| } |
| } |
| if res.id == "" { |
| // This shouldn't happen, to the point where this might be better handled by a |
| // panic. |
| return nil, fmt.Errorf("invalid node key %q", key) |
| } |
| return &res, nil |
| } |
| |
| // appendToId records a node state represented by Node into a Curator |
| // WatchEvent. |
| func (n *Node) appendToEvent(ev *ipb.WatchEvent) { |
| np := n.proto() |
| ev.Nodes = append(ev.Nodes, &ipb.Node{ |
| Id: n.ID(), |
| Roles: np.Roles, |
| Status: np.Status, |
| Clusternet: np.Clusternet, |
| State: np.FsmState, |
| }) |
| } |
| |
| // appendToId records a node update represented by nodeAtID into a Curator |
| // WatchEvent, either a Node or NodeTombstone. |
| func (kv nodeAtID) appendToEvent(ev *ipb.WatchEvent) { |
| if node := kv.value; node != nil { |
| node.appendToEvent(ev) |
| } else { |
| ev.NodeTombstones = append(ev.NodeTombstones, &ipb.WatchEvent_NodeTombstone{ |
| NodeId: kv.id, |
| }) |
| } |
| } |
| |
| // UpdateNodeStatus is called by nodes in the cluster to report their own |
| // status. This status is recorded by the curator and can be retrieed via |
| // Watch. |
| func (l *leaderCurator) UpdateNodeStatus(ctx context.Context, req *ipb.UpdateNodeStatusRequest) (*ipb.UpdateNodeStatusResponse, error) { |
| // Ensure that the given node_id matches the calling node. We currently |
| // only allow for direct self-reporting of status by nodes. |
| pi := rpc.GetPeerInfo(ctx) |
| if pi == nil || pi.Node == nil { |
| return nil, status.Error(codes.PermissionDenied, "only nodes can update node status") |
| } |
| id := identity.NodeID(pi.Node.PublicKey) |
| if id != req.NodeId { |
| return nil, status.Errorf(codes.PermissionDenied, "node %q cannot update the status of node %q", id, req.NodeId) |
| } |
| |
| // Verify sent status. Currently we assume the entire status must be set at |
| // once, and cannot be unset. |
| if req.Status == nil || req.Status.ExternalAddress == "" { |
| return nil, status.Errorf(codes.InvalidArgument, "Status and Status.ExternalAddress must be set") |
| } |
| |
| if net.ParseIP(req.Status.ExternalAddress) == nil { |
| return nil, status.Errorf(codes.InvalidArgument, "Status.ExternalAddress must be a valid IP address") |
| } |
| |
| // As we're performing a node update with two etcd transactions below (one |
| // to retrieve, one to save and upate node), take a local lock to ensure |
| // that we don't have a race between either two UpdateNodeStatus calls or |
| // an UpdateNodeStatus call and some other mutation to the node store. |
| l.muNodes.Lock() |
| defer l.muNodes.Unlock() |
| |
| // Retrieve node ... |
| node, err := nodeLoad(ctx, l.leadership, id) |
| if err != nil { |
| return nil, err |
| } |
| // ... update its' status ... |
| node.status = req.Status |
| node.status.Timestamp = tpb.Now() |
| // ... and save it to etcd. |
| if err := nodeSave(ctx, l.leadership, node); err != nil { |
| return nil, err |
| } |
| |
| return &ipb.UpdateNodeStatusResponse{}, nil |
| } |
| |
| func (l *leaderCurator) Heartbeat(stream ipb.Curator_HeartbeatServer) error { |
| // Ensure that the given node_id matches the calling node. We currently |
| // only allow for direct self-reporting of status by nodes. |
| ctx := stream.Context() |
| pi := rpc.GetPeerInfo(ctx) |
| if pi == nil || pi.Node == nil { |
| return status.Error(codes.PermissionDenied, "only nodes can send heartbeats") |
| } |
| id := identity.NodeID(pi.Node.PublicKey) |
| |
| for { |
| _, err := stream.Recv() |
| if err == io.EOF { |
| return nil |
| } |
| if err != nil { |
| return err |
| } |
| |
| // Update the node's timestamp within the local Curator state. |
| l.ls.heartbeatTimestamps.Store(id, time.Now()) |
| |
| rsp := &ipb.HeartbeatUpdateResponse{} |
| if err := stream.Send(rsp); err != nil { |
| return err |
| } |
| } |
| } |
| |
| func (l *leaderCurator) RegisterNode(ctx context.Context, req *ipb.RegisterNodeRequest) (*ipb.RegisterNodeResponse, error) { |
| // Call is unauthenticated - verify the other side has connected with an |
| // ephemeral certificate. That certificate's pubkey will become the node's |
| // pubkey. |
| pi := rpc.GetPeerInfo(ctx) |
| if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil { |
| return nil, status.Error(codes.Unauthenticated, "connection must be established with a self-signed ephemeral certificate") |
| } |
| pubkey := pi.Unauthenticated.SelfSignedPublicKey |
| |
| // Check the Join Key size. |
| if want, got := ed25519.PublicKeySize, len(req.JoinKey); want != got { |
| return nil, status.Errorf(codes.InvalidArgument, "join_key must be set and be %d bytes long", want) |
| } |
| |
| // Verify that call contains a RegisterTicket and that this RegisterTicket is |
| // valid. |
| wantTicket, err := l.ensureRegisterTicket(ctx) |
| if err != nil { |
| rpc.Trace(ctx).Printf("could not ensure register ticket: %v", err) |
| return nil, status.Error(codes.Unavailable, "could not retrieve register ticket") |
| } |
| gotTicket := req.RegisterTicket |
| if subtle.ConstantTimeCompare(wantTicket, gotTicket) != 1 { |
| return nil, status.Error(codes.PermissionDenied, "registerticket invalid") |
| } |
| |
| // Doing a read-then-write operation below, take lock. |
| // |
| // MVP: This can lock up the cluster if too many RegisterNode calls get issued, |
| // we should either ratelimit these or remove the need to lock. |
| l.muNodes.Lock() |
| defer l.muNodes.Unlock() |
| |
| cl, err := clusterLoad(ctx, l.leadership) |
| if err != nil { |
| return nil, err |
| } |
| nodeStorageSecurity, err := cl.NodeStorageSecurity() |
| if err != nil { |
| rpc.Trace(ctx).Printf("NodeStorageSecurity: %v", err) |
| return nil, status.Error(codes.InvalidArgument, "cannot generate recommended node storage security") |
| } |
| |
| // Figure out if node should be using TPM. |
| tpmUsage, err := cl.NodeTPMUsage(req.HaveLocalTpm) |
| if err != nil { |
| return nil, status.Errorf(codes.PermissionDenied, "%s", err) |
| } |
| |
| // Check if there already is a node with this pubkey in the cluster. |
| id := identity.NodeID(pubkey) |
| node, err := nodeLoad(ctx, l.leadership, id) |
| if err == nil { |
| // If the existing node is in the NEW state already, there's nothing to do, |
| // return no error. This can happen in case of spurious retries from the calling |
| // node. |
| if node.state == cpb.NodeState_NODE_STATE_NEW { |
| return &ipb.RegisterNodeResponse{}, nil |
| } |
| // We can return a bit more information to the calling node here, as if it's in |
| // possession of the private key corresponding to an existing node in the |
| // cluster, it should have access to the status of the node without danger of |
| // leaking data about other nodes. |
| // |
| rpc.Trace(ctx).Printf("node %s already exists in cluster, failing", id) |
| return nil, status.Errorf(codes.FailedPrecondition, "node already exists in cluster, state %s", node.state.String()) |
| } |
| if err != errNodeNotFound { |
| return nil, err |
| } |
| |
| // No node exists, create one. |
| node = &Node{ |
| pubkey: pubkey, |
| jkey: req.JoinKey, |
| state: cpb.NodeState_NODE_STATE_NEW, |
| tpmUsage: tpmUsage, |
| } |
| if err := nodeSave(ctx, l.leadership, node); err != nil { |
| return nil, err |
| } |
| |
| // Eat error, as we just deserialized this from a proto. |
| clusterConfig, _ := cl.proto() |
| return &ipb.RegisterNodeResponse{ |
| ClusterConfiguration: clusterConfig, |
| TpmUsage: tpmUsage, |
| RecommendedNodeStorageSecurity: nodeStorageSecurity, |
| }, nil |
| } |
| |
| func (l *leaderCurator) CommitNode(ctx context.Context, req *ipb.CommitNodeRequest) (*ipb.CommitNodeResponse, error) { |
| // Call is unauthenticated - verify the other side has connected with an |
| // ephemeral certificate. That certificate's pubkey will become the node's |
| // pubkey. |
| pi := rpc.GetPeerInfo(ctx) |
| if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil { |
| return nil, status.Error(codes.Unauthenticated, "connection must be established with a self-signed ephemeral certificate") |
| } |
| pubkey := pi.Unauthenticated.SelfSignedPublicKey |
| |
| // First pass check of node storage security, before loading the cluster data and |
| // taking a lock on it. |
| switch req.StorageSecurity { |
| case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_INSECURE: |
| case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_ENCRYPTED: |
| case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_AUTHENTICATED_ENCRYPTED: |
| default: |
| return nil, status.Error(codes.InvalidArgument, "invalid storage_security (is it set?)") |
| } |
| |
| // Doing a read-then-write operation below, take lock. |
| // |
| // MVP: This can lock up the cluster if too many RegisterNode calls get issued, |
| // we should either ratelimit these or remove the need to lock. |
| l.muNodes.Lock() |
| defer l.muNodes.Unlock() |
| |
| cl, err := clusterLoad(ctx, l.leadership) |
| if err != nil { |
| return nil, err |
| } |
| if err := cl.ValidateNodeStorage(req.StorageSecurity); err != nil { |
| return nil, err |
| } |
| |
| // Retrieve the node and act on its current state, either returning early or |
| // mutating it and continuing with the rest of the Commit logic. |
| id := identity.NodeID(pubkey) |
| node, err := nodeLoad(ctx, l.leadership, id) |
| if err != nil { |
| return nil, err |
| } |
| |
| switch node.state { |
| case cpb.NodeState_NODE_STATE_NEW: |
| return nil, status.Error(codes.PermissionDenied, "node is NEW, wait for attestation/approval") |
| case cpb.NodeState_NODE_STATE_DECOMMISSIONED: |
| // This node has been since decommissioned by the cluster for some reason, the |
| // register flow should be aborted. |
| return nil, status.Error(codes.FailedPrecondition, "node is DECOMMISSIONED, abort register flow") |
| case cpb.NodeState_NODE_STATE_UP: |
| // This can happen due to a network failure when we already handled a |
| // CommitNode, but we weren't able to respond to the user. CommitNode is |
| // non-idempotent, so just abort, the node should retry from scratch and this |
| // node should be manually disowned/deleted by system owners. |
| return nil, status.Error(codes.FailedPrecondition, "node is already UP, abort register flow") |
| case cpb.NodeState_NODE_STATE_STANDBY: |
| // This is what we want. |
| default: |
| return nil, status.Errorf(codes.Internal, "node is in unknown state: %v", node.state) |
| } |
| |
| // Check the given CUK is valid. |
| // TODO(q3k): unify length with localstorage/crypt keySize. |
| if req.StorageSecurity != cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_INSECURE { |
| if want, got := 32, len(req.ClusterUnlockKey); want != got { |
| return nil, status.Errorf(codes.InvalidArgument, "invalid ClusterUnlockKey length, wanted %d bytes, got %d", want, got) |
| } |
| } |
| |
| // Generate certificate for node, save new node state, return. |
| |
| // If this fails we are safe to let the client retry, as the PKI code is |
| // idempotent. |
| caCertBytes, err := pkiCA.Ensure(ctx, l.etcd) |
| if err != nil { |
| return nil, status.Errorf(codes.Unavailable, "could not get CA certificate: %v", err) |
| } |
| nodeCert := &pki.Certificate{ |
| Namespace: &pkiNamespace, |
| Issuer: pkiCA, |
| Template: identity.NodeCertificate(node.pubkey), |
| Mode: pki.CertificateExternal, |
| PublicKey: node.pubkey, |
| Name: fmt.Sprintf("node-%s", identity.NodeID(pubkey)), |
| } |
| nodeCertBytes, err := nodeCert.Ensure(ctx, l.etcd) |
| if err != nil { |
| return nil, status.Errorf(codes.Unavailable, "could not emit node credentials: %v", err) |
| } |
| |
| node.state = cpb.NodeState_NODE_STATE_UP |
| node.clusterUnlockKey = req.ClusterUnlockKey |
| if err := nodeSave(ctx, l.leadership, node); err != nil { |
| return nil, err |
| } |
| |
| // From this point on, any failure (in the server, or in the network, ...) dooms |
| // the node from making progress in registering, as Commit is non-idempotent. |
| |
| return &ipb.CommitNodeResponse{ |
| CaCertificate: caCertBytes, |
| NodeCertificate: nodeCertBytes, |
| }, nil |
| } |
| |
| func (l *leaderCurator) JoinNode(ctx context.Context, req *ipb.JoinNodeRequest) (*ipb.JoinNodeResponse, error) { |
| // Gather peer information. |
| pi := rpc.GetPeerInfo(ctx) |
| if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil { |
| return nil, status.Error(codes.PermissionDenied, "connection must be established with a self-signed ephemeral certificate") |
| } |
| // The node will attempt to connect using its Join Key. jkey will contain |
| // its public part. |
| jkey := pi.Unauthenticated.SelfSignedPublicKey |
| |
| // Take the lock to prevent data races during the next step. |
| l.muNodes.Lock() |
| defer l.muNodes.Unlock() |
| |
| // Resolve the Node ID using Join Key, then use the ID to load node |
| // information from etcd. |
| id, err := nodeIdByJoinKey(ctx, l.leadership, jkey) |
| if err != nil { |
| return nil, err |
| } |
| node, err := nodeLoad(ctx, l.leadership, id) |
| if err != nil { |
| return nil, err |
| } |
| |
| cl, err := clusterLoad(ctx, l.leadership) |
| if err != nil { |
| return nil, err |
| } |
| |
| switch cl.TPMMode { |
| case cpb.ClusterConfiguration_TPM_MODE_REQUIRED: |
| if !req.UsingSealedConfiguration { |
| return nil, status.Errorf(codes.PermissionDenied, "cannot join this cluster with an unsealed configuration") |
| } |
| case cpb.ClusterConfiguration_TPM_MODE_DISABLED: |
| if req.UsingSealedConfiguration { |
| return nil, status.Errorf(codes.PermissionDenied, "cannot join this cluster with a sealed configuration") |
| } |
| } |
| |
| if node.tpmUsage == cpb.NodeTPMUsage_NODE_TPM_PRESENT_AND_USED && !req.UsingSealedConfiguration { |
| return nil, status.Errorf(codes.PermissionDenied, "node registered with TPM, cannot join without one") |
| } |
| if node.tpmUsage != cpb.NodeTPMUsage_NODE_TPM_PRESENT_AND_USED && req.UsingSealedConfiguration { |
| return nil, status.Errorf(codes.PermissionDenied, "node registered without TPM, cannot join with one") |
| } |
| |
| // Don't progress further unless the node is already UP. |
| if node.state != cpb.NodeState_NODE_STATE_UP { |
| return nil, status.Errorf(codes.FailedPrecondition, "node isn't UP, cannot join") |
| } |
| |
| // Return the Node's CUK, completing the Join Flow. |
| return &ipb.JoinNodeResponse{ |
| ClusterUnlockKey: node.clusterUnlockKey, |
| }, nil |
| } |
| |
| func (l *leaderCurator) GetCurrentLeader(_ *ipb.GetCurrentLeaderRequest, srv ipb.CuratorLocal_GetCurrentLeaderServer) error { |
| ctx := srv.Context() |
| |
| // We're the leader. |
| node, err := nodeLoad(ctx, l.leadership, l.leaderID) |
| if err != nil { |
| rpc.Trace(ctx).Printf("nodeLoad(%q) failed: %v", l.leaderID, err) |
| return status.Errorf(codes.Unavailable, "failed to load leader node") |
| } |
| host := "" |
| if node.status != nil && node.status.ExternalAddress != "" { |
| host = node.status.ExternalAddress |
| } |
| |
| err = srv.Send(&ipb.GetCurrentLeaderResponse{ |
| LeaderNodeId: l.leaderID, |
| LeaderHost: host, |
| LeaderPort: int32(common.CuratorServicePort), |
| ThisNodeId: l.leaderID, |
| }) |
| if err != nil { |
| return err |
| } |
| |
| <-ctx.Done() |
| rpc.Trace(ctx).Printf("Interrupting due to context cancellation") |
| return nil |
| } |