Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 1 | package curator |
| 2 | |
| 3 | import ( |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 4 | "context" |
Mateusz Zalega | 2930e99 | 2022-04-25 12:52:35 +0200 | [diff] [blame] | 5 | "crypto/ed25519" |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 6 | "crypto/subtle" |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 7 | "fmt" |
Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 8 | "io" |
Serge Bazanski | 5839e97 | 2021-11-16 15:46:19 +0100 | [diff] [blame] | 9 | "net" |
Mateusz Zalega | 312a227 | 2022-04-25 12:03:58 +0200 | [diff] [blame] | 10 | "time" |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 11 | |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 12 | "google.golang.org/grpc/codes" |
| 13 | "google.golang.org/grpc/status" |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 14 | "google.golang.org/protobuf/proto" |
Mateusz Zalega | 28800ad | 2022-07-08 14:56:02 +0200 | [diff] [blame] | 15 | tpb "google.golang.org/protobuf/types/known/timestamppb" |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 16 | |
Serge Bazanski | 268dd8c | 2022-06-22 12:50:44 +0200 | [diff] [blame] | 17 | common "source.monogon.dev/metropolis/node" |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 18 | ipb "source.monogon.dev/metropolis/node/core/curator/proto/api" |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 19 | "source.monogon.dev/metropolis/node/core/identity" |
| 20 | "source.monogon.dev/metropolis/node/core/rpc" |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 21 | "source.monogon.dev/metropolis/pkg/event" |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 22 | "source.monogon.dev/metropolis/pkg/event/etcd" |
Serge Bazanski | 5839e97 | 2021-11-16 15:46:19 +0100 | [diff] [blame] | 23 | "source.monogon.dev/metropolis/pkg/pki" |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 24 | cpb "source.monogon.dev/metropolis/proto/common" |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 25 | ) |
| 26 | |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 27 | // leaderCurator implements the Curator gRPC API (ipb.Curator) as a curator |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 28 | // leader. |
| 29 | type leaderCurator struct { |
Serge Bazanski | 3be4832 | 2021-10-05 17:24:26 +0200 | [diff] [blame] | 30 | *leadership |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 31 | } |
| 32 | |
| 33 | // Watch returns a stream of updates concerning some part of the cluster |
| 34 | // managed by the curator. |
| 35 | // |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 36 | // See metropolis.node.core.curator.proto.api.Curator for more information about |
| 37 | // the RPC semantics. |
| 38 | // |
| 39 | // TODO(q3k): Currently the watch RPCs are individually backed by etcd cluster |
| 40 | // watches (via individual etcd event values), which might be problematic in |
| 41 | // case of a significant amount of parallel Watches being issued to the Curator. |
| 42 | // It might make sense to combine all pending Watch requests into a single watch |
| 43 | // issued to the cluster, with an intermediary caching stage within the curator |
| 44 | // instance. However, that is effectively implementing etcd learner/relay logic, |
| 45 | // which has has to be carefully considered, especially with regards to serving |
| 46 | // stale data. |
| 47 | func (l *leaderCurator) Watch(req *ipb.WatchRequest, srv ipb.Curator_WatchServer) error { |
| 48 | switch x := req.Kind.(type) { |
| 49 | case *ipb.WatchRequest_NodeInCluster_: |
| 50 | return l.watchNodeInCluster(x.NodeInCluster, srv) |
| 51 | case *ipb.WatchRequest_NodesInCluster_: |
| 52 | return l.watchNodesInCluster(x.NodesInCluster, srv) |
| 53 | default: |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 54 | return status.Error(codes.Unimplemented, "unsupported watch kind") |
| 55 | } |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 56 | } |
| 57 | |
| 58 | // watchNodeInCluster implements the Watch API when dealing with a single |
| 59 | // node-in-cluster request. Effectively, it pipes an etcd value watcher into the |
| 60 | // Watch API. |
| 61 | func (l *leaderCurator) watchNodeInCluster(nic *ipb.WatchRequest_NodeInCluster, srv ipb.Curator_WatchServer) error { |
| 62 | ctx := srv.Context() |
| 63 | |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 64 | // Constructing arbitrary etcd path: this is okay, as we only have node objects |
| 65 | // underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node |
| 66 | // that doesn't exist, and that will just hang . All access is privileged, so |
| 67 | // there's also no need to filter anything. |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 68 | nodePath, err := nodeEtcdPrefix.Key(nic.NodeId) |
Serge Bazanski | 080f7ff | 2021-09-09 13:01:00 +0200 | [diff] [blame] | 69 | if err != nil { |
| 70 | return status.Errorf(codes.InvalidArgument, "invalid node name: %v", err) |
| 71 | } |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 72 | value := etcd.NewValue(l.etcd, nodePath, nodeValueConverter) |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 73 | |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 74 | w := value.Watch() |
| 75 | defer w.Close() |
| 76 | |
| 77 | for { |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 78 | nodeKV, err := w.Get(ctx) |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 79 | if err != nil { |
| 80 | if rpcErr, ok := rpcError(err); ok { |
| 81 | return rpcErr |
| 82 | } |
Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 83 | rpc.Trace(ctx).Printf("etcd watch failed: %v", err) |
Serge Bazanski | bc7614e | 2021-09-09 13:07:09 +0200 | [diff] [blame] | 84 | return status.Error(codes.Unavailable, "internal error") |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 85 | } |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 86 | |
| 87 | ev := &ipb.WatchEvent{} |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 88 | nodeKV.appendToEvent(ev) |
Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 89 | if err := srv.Send(ev); err != nil { |
| 90 | return err |
| 91 | } |
| 92 | } |
| 93 | } |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 94 | |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 95 | // watchNodesInCluster implements the Watch API when dealing with a |
| 96 | // all-nodes-in-cluster request. Effectively, it pipes a ranged etcd value |
| 97 | // watcher into the Watch API. |
| 98 | func (l *leaderCurator) watchNodesInCluster(_ *ipb.WatchRequest_NodesInCluster, srv ipb.Curator_WatchServer) error { |
| 99 | ctx := srv.Context() |
| 100 | |
| 101 | start, end := nodeEtcdPrefix.KeyRange() |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 102 | value := etcd.NewValue[*nodeAtID](l.etcd, start, nodeValueConverter, etcd.Range(end)) |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 103 | |
| 104 | w := value.Watch() |
| 105 | defer w.Close() |
| 106 | |
| 107 | // Perform initial fetch from etcd. |
| 108 | nodes := make(map[string]*Node) |
| 109 | for { |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 110 | nodeKV, err := w.Get(ctx, event.BacklogOnly[*nodeAtID]()) |
| 111 | if err == event.BacklogDone { |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 112 | break |
| 113 | } |
| 114 | if err != nil { |
Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 115 | rpc.Trace(ctx).Printf("etcd watch failed (initial fetch): %v", err) |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 116 | return status.Error(codes.Unavailable, "internal error during initial fetch") |
| 117 | } |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 118 | if nodeKV.value != nil { |
| 119 | nodes[nodeKV.id] = nodeKV.value |
| 120 | } |
| 121 | } |
| 122 | |
| 123 | // Initial send, chunked to not go over 2MiB (half of the default gRPC message |
| 124 | // size limit). |
| 125 | // |
| 126 | // TODO(q3k): formalize message limits, set const somewhere. |
| 127 | we := &ipb.WatchEvent{} |
| 128 | for _, n := range nodes { |
| 129 | we.Nodes = append(we.Nodes, &ipb.Node{ |
| 130 | Id: n.ID(), |
| 131 | Roles: n.proto().Roles, |
| 132 | Status: n.status, |
| 133 | }) |
| 134 | if proto.Size(we) > (2 << 20) { |
| 135 | if err := srv.Send(we); err != nil { |
| 136 | return err |
| 137 | } |
| 138 | we = &ipb.WatchEvent{} |
| 139 | } |
| 140 | } |
| 141 | // Send last update message. This might be empty, but we need to send the |
| 142 | // LAST_BACKLOGGED marker. |
| 143 | we.Progress = ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED |
| 144 | if err := srv.Send(we); err != nil { |
| 145 | return err |
| 146 | } |
| 147 | |
| 148 | // Send updates as they arrive from etcd watcher. |
| 149 | for { |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 150 | nodeKV, err := w.Get(ctx) |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 151 | if err != nil { |
Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 152 | rpc.Trace(ctx).Printf("etcd watch failed (update): %v", err) |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 153 | return status.Errorf(codes.Unavailable, "internal error during update") |
| 154 | } |
| 155 | we := &ipb.WatchEvent{} |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 156 | nodeKV.appendToEvent(we) |
| 157 | if err := srv.Send(we); err != nil { |
| 158 | return err |
| 159 | } |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | // nodeAtID is a key/pair container for a node update received from an etcd |
| 164 | // watcher. The value will be nil if this update represents a node being |
| 165 | // deleted. |
| 166 | type nodeAtID struct { |
| 167 | id string |
| 168 | value *Node |
| 169 | } |
| 170 | |
| 171 | // nodeValueConverter is called by etcd node value watchers to convert updates |
| 172 | // from the cluster into nodeAtID, ensuring data integrity and checking |
| 173 | // invariants. |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 174 | func nodeValueConverter(key, value []byte) (*nodeAtID, error) { |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 175 | res := nodeAtID{ |
| 176 | id: nodeEtcdPrefix.ExtractID(string(key)), |
| 177 | } |
| 178 | if len(value) > 0 { |
| 179 | node, err := nodeUnmarshal(value) |
| 180 | if err != nil { |
| 181 | return nil, err |
| 182 | } |
| 183 | res.value = node |
| 184 | if res.id != res.value.ID() { |
| 185 | return nil, fmt.Errorf("node ID mismatch (etcd key: %q, value: %q)", res.id, res.value.ID()) |
| 186 | } |
| 187 | } |
| 188 | if res.id == "" { |
| 189 | // This shouldn't happen, to the point where this might be better handled by a |
| 190 | // panic. |
| 191 | return nil, fmt.Errorf("invalid node key %q", key) |
| 192 | } |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 193 | return &res, nil |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 194 | } |
| 195 | |
| 196 | // appendToId records a node update represented by nodeAtID into a Curator |
| 197 | // WatchEvent, either a Node or NodeTombstone. |
| 198 | func (kv nodeAtID) appendToEvent(ev *ipb.WatchEvent) { |
| 199 | if node := kv.value; node != nil { |
Serge Bazanski | e6bc227 | 2023-03-28 16:28:13 +0200 | [diff] [blame] | 200 | np := node.proto() |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 201 | ev.Nodes = append(ev.Nodes, &ipb.Node{ |
Serge Bazanski | e6bc227 | 2023-03-28 16:28:13 +0200 | [diff] [blame] | 202 | Id: node.ID(), |
| 203 | Roles: np.Roles, |
| 204 | Status: np.Status, |
| 205 | Clusternet: np.Clusternet, |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 206 | }) |
| 207 | } else { |
| 208 | ev.NodeTombstones = append(ev.NodeTombstones, &ipb.WatchEvent_NodeTombstone{ |
| 209 | NodeId: kv.id, |
| 210 | }) |
| 211 | } |
| 212 | } |
| 213 | |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 214 | // UpdateNodeStatus is called by nodes in the cluster to report their own |
| 215 | // status. This status is recorded by the curator and can be retrieed via |
| 216 | // Watch. |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 217 | func (l *leaderCurator) UpdateNodeStatus(ctx context.Context, req *ipb.UpdateNodeStatusRequest) (*ipb.UpdateNodeStatusResponse, error) { |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 218 | // Ensure that the given node_id matches the calling node. We currently |
| 219 | // only allow for direct self-reporting of status by nodes. |
| 220 | pi := rpc.GetPeerInfo(ctx) |
| 221 | if pi == nil || pi.Node == nil { |
| 222 | return nil, status.Error(codes.PermissionDenied, "only nodes can update node status") |
| 223 | } |
| 224 | id := identity.NodeID(pi.Node.PublicKey) |
| 225 | if id != req.NodeId { |
| 226 | return nil, status.Errorf(codes.PermissionDenied, "node %q cannot update the status of node %q", id, req.NodeId) |
| 227 | } |
| 228 | |
| 229 | // Verify sent status. Currently we assume the entire status must be set at |
| 230 | // once, and cannot be unset. |
| 231 | if req.Status == nil || req.Status.ExternalAddress == "" { |
| 232 | return nil, status.Errorf(codes.InvalidArgument, "Status and Status.ExternalAddress must be set") |
| 233 | } |
| 234 | |
Serge Bazanski | 5839e97 | 2021-11-16 15:46:19 +0100 | [diff] [blame] | 235 | if net.ParseIP(req.Status.ExternalAddress) == nil { |
| 236 | return nil, status.Errorf(codes.InvalidArgument, "Status.ExternalAddress must be a valid IP address") |
| 237 | } |
| 238 | |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 239 | // As we're performing a node update with two etcd transactions below (one |
| 240 | // to retrieve, one to save and upate node), take a local lock to ensure |
| 241 | // that we don't have a race between either two UpdateNodeStatus calls or |
| 242 | // an UpdateNodeStatus call and some other mutation to the node store. |
| 243 | l.muNodes.Lock() |
| 244 | defer l.muNodes.Unlock() |
| 245 | |
| 246 | // Retrieve node ... |
Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 247 | node, err := nodeLoad(ctx, l.leadership, id) |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 248 | if err != nil { |
Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 249 | return nil, err |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 250 | } |
| 251 | // ... update its' status ... |
| 252 | node.status = req.Status |
Mateusz Zalega | 28800ad | 2022-07-08 14:56:02 +0200 | [diff] [blame] | 253 | node.status.Timestamp = tpb.Now() |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 254 | // ... and save it to etcd. |
Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 255 | if err := nodeSave(ctx, l.leadership, node); err != nil { |
| 256 | return nil, err |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 257 | } |
| 258 | |
Serge Bazanski | 80861fd | 2021-11-02 22:14:06 +0100 | [diff] [blame] | 259 | return &ipb.UpdateNodeStatusResponse{}, nil |
Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 260 | } |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 261 | |
Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 262 | func (l *leaderCurator) Heartbeat(stream ipb.Curator_HeartbeatServer) error { |
| 263 | // Ensure that the given node_id matches the calling node. We currently |
| 264 | // only allow for direct self-reporting of status by nodes. |
| 265 | ctx := stream.Context() |
| 266 | pi := rpc.GetPeerInfo(ctx) |
| 267 | if pi == nil || pi.Node == nil { |
| 268 | return status.Error(codes.PermissionDenied, "only nodes can send heartbeats") |
| 269 | } |
| 270 | id := identity.NodeID(pi.Node.PublicKey) |
| 271 | |
| 272 | for { |
| 273 | _, err := stream.Recv() |
| 274 | if err == io.EOF { |
| 275 | return nil |
| 276 | } |
| 277 | if err != nil { |
| 278 | return err |
| 279 | } |
| 280 | |
| 281 | // Update the node's timestamp within the local Curator state. |
| 282 | l.ls.heartbeatTimestamps.Store(id, time.Now()) |
| 283 | |
| 284 | rsp := &ipb.HeartbeatUpdateResponse{} |
| 285 | if err := stream.Send(rsp); err != nil { |
| 286 | return err |
| 287 | } |
| 288 | } |
| 289 | } |
| 290 | |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 291 | func (l *leaderCurator) RegisterNode(ctx context.Context, req *ipb.RegisterNodeRequest) (*ipb.RegisterNodeResponse, error) { |
| 292 | // Call is unauthenticated - verify the other side has connected with an |
| 293 | // ephemeral certificate. That certificate's pubkey will become the node's |
| 294 | // pubkey. |
| 295 | pi := rpc.GetPeerInfo(ctx) |
Serge Bazanski | 6778878 | 2023-03-28 20:13:18 +0200 | [diff] [blame] | 296 | if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil { |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 297 | return nil, status.Error(codes.Unauthenticated, "connection must be established with a self-signed ephemeral certificate") |
| 298 | } |
| 299 | pubkey := pi.Unauthenticated.SelfSignedPublicKey |
| 300 | |
Mateusz Zalega | 2930e99 | 2022-04-25 12:52:35 +0200 | [diff] [blame] | 301 | // Check the Join Key size. |
| 302 | if want, got := ed25519.PublicKeySize, len(req.JoinKey); want != got { |
| 303 | return nil, status.Errorf(codes.InvalidArgument, "join_key must be set and be %d bytes long", want) |
| 304 | } |
Mateusz Zalega | 312a227 | 2022-04-25 12:03:58 +0200 | [diff] [blame] | 305 | |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 306 | // Verify that call contains a RegisterTicket and that this RegisterTicket is |
| 307 | // valid. |
| 308 | wantTicket, err := l.ensureRegisterTicket(ctx) |
| 309 | if err != nil { |
Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 310 | rpc.Trace(ctx).Printf("could not ensure register ticket: %v", err) |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 311 | return nil, status.Error(codes.Unavailable, "could not retrieve register ticket") |
| 312 | } |
| 313 | gotTicket := req.RegisterTicket |
| 314 | if subtle.ConstantTimeCompare(wantTicket, gotTicket) != 1 { |
| 315 | return nil, status.Error(codes.PermissionDenied, "registerticket invalid") |
| 316 | } |
| 317 | |
| 318 | // Doing a read-then-write operation below, take lock. |
| 319 | // |
| 320 | // MVP: This can lock up the cluster if too many RegisterNode calls get issued, |
| 321 | // we should either ratelimit these or remove the need to lock. |
| 322 | l.muNodes.Lock() |
| 323 | defer l.muNodes.Unlock() |
| 324 | |
Serge Bazanski | e4a4ce1 | 2023-03-22 18:29:54 +0100 | [diff] [blame] | 325 | cl, err := clusterLoad(ctx, l.leadership) |
| 326 | if err != nil { |
| 327 | return nil, err |
| 328 | } |
Serge Bazanski | fd6d4eb | 2023-05-25 14:45:48 +0200 | [diff] [blame] | 329 | nodeStorageSecurity, err := cl.NodeStorageSecurity() |
| 330 | if err != nil { |
| 331 | rpc.Trace(ctx).Printf("NodeStorageSecurity: %v", err) |
| 332 | return nil, status.Error(codes.InvalidArgument, "cannot generate recommended node storage security") |
| 333 | } |
Serge Bazanski | e4a4ce1 | 2023-03-22 18:29:54 +0100 | [diff] [blame] | 334 | |
| 335 | // Figure out if node should be using TPM. |
| 336 | tpmUsage, err := cl.NodeTPMUsage(req.HaveLocalTpm) |
| 337 | if err != nil { |
| 338 | return nil, status.Errorf(codes.PermissionDenied, "%s", err) |
| 339 | } |
| 340 | |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 341 | // Check if there already is a node with this pubkey in the cluster. |
| 342 | id := identity.NodeID(pubkey) |
Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 343 | node, err := nodeLoad(ctx, l.leadership, id) |
| 344 | if err == nil { |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 345 | // If the existing node is in the NEW state already, there's nothing to do, |
| 346 | // return no error. This can happen in case of spurious retries from the calling |
| 347 | // node. |
| 348 | if node.state == cpb.NodeState_NODE_STATE_NEW { |
| 349 | return &ipb.RegisterNodeResponse{}, nil |
| 350 | } |
| 351 | // We can return a bit more information to the calling node here, as if it's in |
| 352 | // possession of the private key corresponding to an existing node in the |
| 353 | // cluster, it should have access to the status of the node without danger of |
Serge Bazanski | 5611447 | 2021-10-11 14:47:54 +0200 | [diff] [blame] | 354 | // leaking data about other nodes. |
| 355 | // |
Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 356 | rpc.Trace(ctx).Printf("node %s already exists in cluster, failing", id) |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 357 | return nil, status.Errorf(codes.FailedPrecondition, "node already exists in cluster, state %s", node.state.String()) |
| 358 | } |
Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 359 | if err != errNodeNotFound { |
| 360 | return nil, err |
| 361 | } |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 362 | |
| 363 | // No node exists, create one. |
Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 364 | node = &Node{ |
Serge Bazanski | e4a4ce1 | 2023-03-22 18:29:54 +0100 | [diff] [blame] | 365 | pubkey: pubkey, |
| 366 | jkey: req.JoinKey, |
| 367 | state: cpb.NodeState_NODE_STATE_NEW, |
| 368 | tpmUsage: tpmUsage, |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 369 | } |
Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 370 | if err := nodeSave(ctx, l.leadership, node); err != nil { |
| 371 | return nil, err |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 372 | } |
Serge Bazanski | e4a4ce1 | 2023-03-22 18:29:54 +0100 | [diff] [blame] | 373 | |
| 374 | // Eat error, as we just deserialized this from a proto. |
| 375 | clusterConfig, _ := cl.proto() |
| 376 | return &ipb.RegisterNodeResponse{ |
Serge Bazanski | fd6d4eb | 2023-05-25 14:45:48 +0200 | [diff] [blame] | 377 | ClusterConfiguration: clusterConfig, |
| 378 | TpmUsage: tpmUsage, |
| 379 | RecommendedNodeStorageSecurity: nodeStorageSecurity, |
Serge Bazanski | e4a4ce1 | 2023-03-22 18:29:54 +0100 | [diff] [blame] | 380 | }, nil |
Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 381 | } |
Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 382 | |
| 383 | func (l *leaderCurator) CommitNode(ctx context.Context, req *ipb.CommitNodeRequest) (*ipb.CommitNodeResponse, error) { |
| 384 | // Call is unauthenticated - verify the other side has connected with an |
| 385 | // ephemeral certificate. That certificate's pubkey will become the node's |
| 386 | // pubkey. |
| 387 | pi := rpc.GetPeerInfo(ctx) |
Serge Bazanski | 6778878 | 2023-03-28 20:13:18 +0200 | [diff] [blame] | 388 | if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil { |
Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 389 | return nil, status.Error(codes.Unauthenticated, "connection must be established with a self-signed ephemeral certificate") |
| 390 | } |
| 391 | pubkey := pi.Unauthenticated.SelfSignedPublicKey |
| 392 | |
Serge Bazanski | fd6d4eb | 2023-05-25 14:45:48 +0200 | [diff] [blame] | 393 | // First pass check of node storage security, before loading the cluster data and |
| 394 | // taking a lock on it. |
| 395 | switch req.StorageSecurity { |
| 396 | case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_INSECURE: |
| 397 | case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_ENCRYPTED: |
| 398 | case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_AUTHENTICATED_ENCRYPTED: |
| 399 | default: |
| 400 | return nil, status.Error(codes.InvalidArgument, "invalid storage_security (is it set?)") |
| 401 | } |
| 402 | |
Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 403 | // Doing a read-then-write operation below, take lock. |
| 404 | // |
| 405 | // MVP: This can lock up the cluster if too many RegisterNode calls get issued, |
| 406 | // we should either ratelimit these or remove the need to lock. |
| 407 | l.muNodes.Lock() |
| 408 | defer l.muNodes.Unlock() |
| 409 | |
Serge Bazanski | fd6d4eb | 2023-05-25 14:45:48 +0200 | [diff] [blame] | 410 | cl, err := clusterLoad(ctx, l.leadership) |
| 411 | if err != nil { |
| 412 | return nil, err |
| 413 | } |
| 414 | if err := cl.ValidateNodeStorage(req.StorageSecurity); err != nil { |
| 415 | return nil, err |
| 416 | } |
| 417 | |
Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 418 | // Retrieve the node and act on its current state, either returning early or |
| 419 | // mutating it and continuing with the rest of the Commit logic. |
Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 420 | id := identity.NodeID(pubkey) |
| 421 | node, err := nodeLoad(ctx, l.leadership, id) |
Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 422 | if err != nil { |
Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 423 | return nil, err |
Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 424 | } |
Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 425 | |
Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 426 | switch node.state { |
| 427 | case cpb.NodeState_NODE_STATE_NEW: |
| 428 | return nil, status.Error(codes.PermissionDenied, "node is NEW, wait for attestation/approval") |
| 429 | case cpb.NodeState_NODE_STATE_DISOWNED: |
| 430 | // This node has been since disowned by the cluster for some reason, the |
| 431 | // register flow should be aborted. |
| 432 | return nil, status.Error(codes.FailedPrecondition, "node is DISOWNED, abort register flow") |
| 433 | case cpb.NodeState_NODE_STATE_UP: |
| 434 | // This can happen due to a network failure when we already handled a |
| 435 | // CommitNode, but we weren't able to respond to the user. CommitNode is |
| 436 | // non-idempotent, so just abort, the node should retry from scratch and this |
| 437 | // node should be manually disowned/deleted by system owners. |
| 438 | return nil, status.Error(codes.FailedPrecondition, "node is already UP, abort register flow") |
| 439 | case cpb.NodeState_NODE_STATE_STANDBY: |
| 440 | // This is what we want. |
| 441 | default: |
| 442 | return nil, status.Errorf(codes.Internal, "node is in unknown state: %v", node.state) |
| 443 | } |
| 444 | |
| 445 | // Check the given CUK is valid. |
| 446 | // TODO(q3k): unify length with localstorage/crypt keySize. |
Serge Bazanski | fd6d4eb | 2023-05-25 14:45:48 +0200 | [diff] [blame] | 447 | if req.StorageSecurity != cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_INSECURE { |
| 448 | if want, got := 32, len(req.ClusterUnlockKey); want != got { |
| 449 | return nil, status.Errorf(codes.InvalidArgument, "invalid ClusterUnlockKey length, wanted %d bytes, got %d", want, got) |
| 450 | } |
Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 451 | } |
| 452 | |
| 453 | // Generate certificate for node, save new node state, return. |
| 454 | |
| 455 | // If this fails we are safe to let the client retry, as the PKI code is |
| 456 | // idempotent. |
Serge Bazanski | 5839e97 | 2021-11-16 15:46:19 +0100 | [diff] [blame] | 457 | caCertBytes, err := pkiCA.Ensure(ctx, l.etcd) |
Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 458 | if err != nil { |
Serge Bazanski | 5839e97 | 2021-11-16 15:46:19 +0100 | [diff] [blame] | 459 | return nil, status.Errorf(codes.Unavailable, "could not get CA certificate: %v", err) |
| 460 | } |
| 461 | nodeCert := &pki.Certificate{ |
| 462 | Namespace: &pkiNamespace, |
| 463 | Issuer: pkiCA, |
| 464 | Template: identity.NodeCertificate(node.pubkey), |
| 465 | Mode: pki.CertificateExternal, |
| 466 | PublicKey: node.pubkey, |
| 467 | Name: fmt.Sprintf("node-%s", identity.NodeID(pubkey)), |
| 468 | } |
| 469 | nodeCertBytes, err := nodeCert.Ensure(ctx, l.etcd) |
| 470 | if err != nil { |
| 471 | return nil, status.Errorf(codes.Unavailable, "could not emit node credentials: %v", err) |
| 472 | } |
| 473 | |
Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 474 | node.state = cpb.NodeState_NODE_STATE_UP |
| 475 | node.clusterUnlockKey = req.ClusterUnlockKey |
Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 476 | if err := nodeSave(ctx, l.leadership, node); err != nil { |
| 477 | return nil, err |
Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 478 | } |
| 479 | |
| 480 | // From this point on, any failure (in the server, or in the network, ...) dooms |
| 481 | // the node from making progress in registering, as Commit is non-idempotent. |
| 482 | |
| 483 | return &ipb.CommitNodeResponse{ |
| 484 | CaCertificate: caCertBytes, |
| 485 | NodeCertificate: nodeCertBytes, |
| 486 | }, nil |
| 487 | } |
Mateusz Zalega | 312a227 | 2022-04-25 12:03:58 +0200 | [diff] [blame] | 488 | |
| 489 | func (l *leaderCurator) JoinNode(ctx context.Context, req *ipb.JoinNodeRequest) (*ipb.JoinNodeResponse, error) { |
| 490 | // Gather peer information. |
| 491 | pi := rpc.GetPeerInfo(ctx) |
Serge Bazanski | 6778878 | 2023-03-28 20:13:18 +0200 | [diff] [blame] | 492 | if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil { |
Mateusz Zalega | 312a227 | 2022-04-25 12:03:58 +0200 | [diff] [blame] | 493 | return nil, status.Error(codes.PermissionDenied, "connection must be established with a self-signed ephemeral certificate") |
| 494 | } |
| 495 | // The node will attempt to connect using its Join Key. jkey will contain |
| 496 | // its public part. |
| 497 | jkey := pi.Unauthenticated.SelfSignedPublicKey |
| 498 | |
| 499 | // Take the lock to prevent data races during the next step. |
| 500 | l.muNodes.Lock() |
| 501 | defer l.muNodes.Unlock() |
| 502 | |
| 503 | // Resolve the Node ID using Join Key, then use the ID to load node |
| 504 | // information from etcd. |
| 505 | id, err := nodeIdByJoinKey(ctx, l.leadership, jkey) |
| 506 | if err != nil { |
| 507 | return nil, err |
| 508 | } |
| 509 | node, err := nodeLoad(ctx, l.leadership, id) |
| 510 | if err != nil { |
| 511 | return nil, err |
| 512 | } |
| 513 | |
Serge Bazanski | e4a4ce1 | 2023-03-22 18:29:54 +0100 | [diff] [blame] | 514 | cl, err := clusterLoad(ctx, l.leadership) |
| 515 | if err != nil { |
| 516 | return nil, err |
| 517 | } |
| 518 | |
| 519 | switch cl.TPMMode { |
| 520 | case cpb.ClusterConfiguration_TPM_MODE_REQUIRED: |
| 521 | if !req.UsingSealedConfiguration { |
| 522 | return nil, status.Errorf(codes.PermissionDenied, "cannot join this cluster with an unsealed configuration") |
| 523 | } |
| 524 | case cpb.ClusterConfiguration_TPM_MODE_DISABLED: |
| 525 | if req.UsingSealedConfiguration { |
| 526 | return nil, status.Errorf(codes.PermissionDenied, "cannot join this cluster with a sealed configuration") |
| 527 | } |
| 528 | } |
| 529 | |
| 530 | if node.tpmUsage == cpb.NodeTPMUsage_NODE_TPM_PRESENT_AND_USED && !req.UsingSealedConfiguration { |
| 531 | return nil, status.Errorf(codes.PermissionDenied, "node registered with TPM, cannot join without one") |
| 532 | } |
| 533 | if node.tpmUsage != cpb.NodeTPMUsage_NODE_TPM_PRESENT_AND_USED && req.UsingSealedConfiguration { |
| 534 | return nil, status.Errorf(codes.PermissionDenied, "node registered without TPM, cannot join with one") |
| 535 | } |
| 536 | |
Mateusz Zalega | 312a227 | 2022-04-25 12:03:58 +0200 | [diff] [blame] | 537 | // Don't progress further unless the node is already UP. |
| 538 | if node.state != cpb.NodeState_NODE_STATE_UP { |
| 539 | return nil, status.Errorf(codes.FailedPrecondition, "node isn't UP, cannot join") |
| 540 | } |
| 541 | |
| 542 | // Return the Node's CUK, completing the Join Flow. |
| 543 | return &ipb.JoinNodeResponse{ |
| 544 | ClusterUnlockKey: node.clusterUnlockKey, |
| 545 | }, nil |
| 546 | } |
Serge Bazanski | 268dd8c | 2022-06-22 12:50:44 +0200 | [diff] [blame] | 547 | |
| 548 | func (l *leaderCurator) GetCurrentLeader(_ *ipb.GetCurrentLeaderRequest, srv ipb.CuratorLocal_GetCurrentLeaderServer) error { |
| 549 | ctx := srv.Context() |
| 550 | |
| 551 | // We're the leader. |
| 552 | node, err := nodeLoad(ctx, l.leadership, l.leaderID) |
| 553 | if err != nil { |
| 554 | rpc.Trace(ctx).Printf("nodeLoad(%q) failed: %v", l.leaderID, err) |
| 555 | return status.Errorf(codes.Unavailable, "failed to load leader node") |
| 556 | } |
| 557 | host := "" |
| 558 | if node.status != nil && node.status.ExternalAddress != "" { |
| 559 | host = node.status.ExternalAddress |
| 560 | } |
| 561 | |
| 562 | err = srv.Send(&ipb.GetCurrentLeaderResponse{ |
| 563 | LeaderNodeId: l.leaderID, |
| 564 | LeaderHost: host, |
| 565 | LeaderPort: int32(common.CuratorServicePort), |
| 566 | ThisNodeId: l.leaderID, |
| 567 | }) |
| 568 | if err != nil { |
| 569 | return err |
| 570 | } |
| 571 | |
| 572 | <-ctx.Done() |
| 573 | rpc.Trace(ctx).Printf("Interrupting due to context cancellation") |
| 574 | return nil |
| 575 | } |