blob: ff08453f02e74d837de041c425857b856935888c [file] [log] [blame]
Serge Bazanski99f47742021-08-04 20:21:42 +02001package curator
2
3import (
Serge Bazanski2893e982021-09-09 13:06:16 +02004 "context"
Mateusz Zalega2930e992022-04-25 12:52:35 +02005 "crypto/ed25519"
Serge Bazanski516d3002021-10-01 00:05:41 +02006 "crypto/subtle"
Serge Bazanski80861fd2021-11-02 22:14:06 +01007 "fmt"
Mateusz Zalega32b19292022-05-17 13:26:55 +02008 "io"
Serge Bazanski5839e972021-11-16 15:46:19 +01009 "net"
Mateusz Zalega312a2272022-04-25 12:03:58 +020010 "time"
Serge Bazanski2893e982021-09-09 13:06:16 +020011
Serge Bazanski99f47742021-08-04 20:21:42 +020012 "google.golang.org/grpc/codes"
13 "google.golang.org/grpc/status"
Serge Bazanski2893e982021-09-09 13:06:16 +020014 "google.golang.org/protobuf/proto"
Mateusz Zalega28800ad2022-07-08 14:56:02 +020015 tpb "google.golang.org/protobuf/types/known/timestamppb"
Serge Bazanski99f47742021-08-04 20:21:42 +020016
Serge Bazanski268dd8c2022-06-22 12:50:44 +020017 common "source.monogon.dev/metropolis/node"
Serge Bazanski80861fd2021-11-02 22:14:06 +010018 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Serge Bazanski2893e982021-09-09 13:06:16 +020019 "source.monogon.dev/metropolis/node/core/identity"
20 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski37110c32023-03-01 13:57:27 +000021 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanski99f47742021-08-04 20:21:42 +020022 "source.monogon.dev/metropolis/pkg/event/etcd"
Serge Bazanski5839e972021-11-16 15:46:19 +010023 "source.monogon.dev/metropolis/pkg/pki"
Serge Bazanski516d3002021-10-01 00:05:41 +020024 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski99f47742021-08-04 20:21:42 +020025)
26
Serge Bazanski80861fd2021-11-02 22:14:06 +010027// leaderCurator implements the Curator gRPC API (ipb.Curator) as a curator
Serge Bazanski99f47742021-08-04 20:21:42 +020028// leader.
29type leaderCurator struct {
Serge Bazanski3be48322021-10-05 17:24:26 +020030 *leadership
Serge Bazanski99f47742021-08-04 20:21:42 +020031}
32
33// Watch returns a stream of updates concerning some part of the cluster
34// managed by the curator.
35//
Serge Bazanski80861fd2021-11-02 22:14:06 +010036// See metropolis.node.core.curator.proto.api.Curator for more information about
37// the RPC semantics.
38//
39// TODO(q3k): Currently the watch RPCs are individually backed by etcd cluster
40// watches (via individual etcd event values), which might be problematic in
41// case of a significant amount of parallel Watches being issued to the Curator.
42// It might make sense to combine all pending Watch requests into a single watch
43// issued to the cluster, with an intermediary caching stage within the curator
44// instance. However, that is effectively implementing etcd learner/relay logic,
45// which has has to be carefully considered, especially with regards to serving
46// stale data.
47func (l *leaderCurator) Watch(req *ipb.WatchRequest, srv ipb.Curator_WatchServer) error {
48 switch x := req.Kind.(type) {
49 case *ipb.WatchRequest_NodeInCluster_:
50 return l.watchNodeInCluster(x.NodeInCluster, srv)
51 case *ipb.WatchRequest_NodesInCluster_:
52 return l.watchNodesInCluster(x.NodesInCluster, srv)
53 default:
Serge Bazanski99f47742021-08-04 20:21:42 +020054 return status.Error(codes.Unimplemented, "unsupported watch kind")
55 }
Serge Bazanski80861fd2021-11-02 22:14:06 +010056}
57
58// watchNodeInCluster implements the Watch API when dealing with a single
59// node-in-cluster request. Effectively, it pipes an etcd value watcher into the
60// Watch API.
61func (l *leaderCurator) watchNodeInCluster(nic *ipb.WatchRequest_NodeInCluster, srv ipb.Curator_WatchServer) error {
62 ctx := srv.Context()
63
Serge Bazanski99f47742021-08-04 20:21:42 +020064 // Constructing arbitrary etcd path: this is okay, as we only have node objects
65 // underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node
66 // that doesn't exist, and that will just hang . All access is privileged, so
67 // there's also no need to filter anything.
Serge Bazanski80861fd2021-11-02 22:14:06 +010068 nodePath, err := nodeEtcdPrefix.Key(nic.NodeId)
Serge Bazanski080f7ff2021-09-09 13:01:00 +020069 if err != nil {
70 return status.Errorf(codes.InvalidArgument, "invalid node name: %v", err)
71 }
Serge Bazanski80861fd2021-11-02 22:14:06 +010072 value := etcd.NewValue(l.etcd, nodePath, nodeValueConverter)
Serge Bazanski99f47742021-08-04 20:21:42 +020073
Serge Bazanski99f47742021-08-04 20:21:42 +020074 w := value.Watch()
75 defer w.Close()
76
77 for {
Serge Bazanski37110c32023-03-01 13:57:27 +000078 nodeKV, err := w.Get(ctx)
Serge Bazanski99f47742021-08-04 20:21:42 +020079 if err != nil {
80 if rpcErr, ok := rpcError(err); ok {
81 return rpcErr
82 }
Serge Bazanski5a637b02022-02-18 12:18:04 +010083 rpc.Trace(ctx).Printf("etcd watch failed: %v", err)
Serge Bazanskibc7614e2021-09-09 13:07:09 +020084 return status.Error(codes.Unavailable, "internal error")
Serge Bazanski99f47742021-08-04 20:21:42 +020085 }
Serge Bazanski80861fd2021-11-02 22:14:06 +010086
87 ev := &ipb.WatchEvent{}
Serge Bazanski80861fd2021-11-02 22:14:06 +010088 nodeKV.appendToEvent(ev)
Serge Bazanski99f47742021-08-04 20:21:42 +020089 if err := srv.Send(ev); err != nil {
90 return err
91 }
92 }
93}
Serge Bazanski2893e982021-09-09 13:06:16 +020094
Serge Bazanski80861fd2021-11-02 22:14:06 +010095// watchNodesInCluster implements the Watch API when dealing with a
96// all-nodes-in-cluster request. Effectively, it pipes a ranged etcd value
97// watcher into the Watch API.
98func (l *leaderCurator) watchNodesInCluster(_ *ipb.WatchRequest_NodesInCluster, srv ipb.Curator_WatchServer) error {
99 ctx := srv.Context()
100
101 start, end := nodeEtcdPrefix.KeyRange()
Serge Bazanski37110c32023-03-01 13:57:27 +0000102 value := etcd.NewValue[*nodeAtID](l.etcd, start, nodeValueConverter, etcd.Range(end))
Serge Bazanski80861fd2021-11-02 22:14:06 +0100103
104 w := value.Watch()
105 defer w.Close()
106
107 // Perform initial fetch from etcd.
108 nodes := make(map[string]*Node)
109 for {
Serge Bazanski37110c32023-03-01 13:57:27 +0000110 nodeKV, err := w.Get(ctx, event.BacklogOnly[*nodeAtID]())
111 if err == event.BacklogDone {
Serge Bazanski80861fd2021-11-02 22:14:06 +0100112 break
113 }
114 if err != nil {
Serge Bazanski5a637b02022-02-18 12:18:04 +0100115 rpc.Trace(ctx).Printf("etcd watch failed (initial fetch): %v", err)
Serge Bazanski80861fd2021-11-02 22:14:06 +0100116 return status.Error(codes.Unavailable, "internal error during initial fetch")
117 }
Serge Bazanski80861fd2021-11-02 22:14:06 +0100118 if nodeKV.value != nil {
119 nodes[nodeKV.id] = nodeKV.value
120 }
121 }
122
123 // Initial send, chunked to not go over 2MiB (half of the default gRPC message
124 // size limit).
125 //
126 // TODO(q3k): formalize message limits, set const somewhere.
127 we := &ipb.WatchEvent{}
128 for _, n := range nodes {
Serge Bazanski93910e62023-07-06 16:15:06 +0200129 n.appendToEvent(we)
Serge Bazanski80861fd2021-11-02 22:14:06 +0100130 if proto.Size(we) > (2 << 20) {
131 if err := srv.Send(we); err != nil {
132 return err
133 }
134 we = &ipb.WatchEvent{}
135 }
136 }
137 // Send last update message. This might be empty, but we need to send the
138 // LAST_BACKLOGGED marker.
139 we.Progress = ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED
140 if err := srv.Send(we); err != nil {
141 return err
142 }
143
144 // Send updates as they arrive from etcd watcher.
145 for {
Serge Bazanski37110c32023-03-01 13:57:27 +0000146 nodeKV, err := w.Get(ctx)
Serge Bazanski80861fd2021-11-02 22:14:06 +0100147 if err != nil {
Serge Bazanski5a637b02022-02-18 12:18:04 +0100148 rpc.Trace(ctx).Printf("etcd watch failed (update): %v", err)
Serge Bazanski80861fd2021-11-02 22:14:06 +0100149 return status.Errorf(codes.Unavailable, "internal error during update")
150 }
151 we := &ipb.WatchEvent{}
Serge Bazanski80861fd2021-11-02 22:14:06 +0100152 nodeKV.appendToEvent(we)
153 if err := srv.Send(we); err != nil {
154 return err
155 }
156 }
157}
158
159// nodeAtID is a key/pair container for a node update received from an etcd
160// watcher. The value will be nil if this update represents a node being
161// deleted.
162type nodeAtID struct {
163 id string
164 value *Node
165}
166
167// nodeValueConverter is called by etcd node value watchers to convert updates
168// from the cluster into nodeAtID, ensuring data integrity and checking
169// invariants.
Serge Bazanski37110c32023-03-01 13:57:27 +0000170func nodeValueConverter(key, value []byte) (*nodeAtID, error) {
Serge Bazanski80861fd2021-11-02 22:14:06 +0100171 res := nodeAtID{
172 id: nodeEtcdPrefix.ExtractID(string(key)),
173 }
174 if len(value) > 0 {
175 node, err := nodeUnmarshal(value)
176 if err != nil {
177 return nil, err
178 }
179 res.value = node
180 if res.id != res.value.ID() {
181 return nil, fmt.Errorf("node ID mismatch (etcd key: %q, value: %q)", res.id, res.value.ID())
182 }
183 }
184 if res.id == "" {
185 // This shouldn't happen, to the point where this might be better handled by a
186 // panic.
187 return nil, fmt.Errorf("invalid node key %q", key)
188 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000189 return &res, nil
Serge Bazanski80861fd2021-11-02 22:14:06 +0100190}
191
Serge Bazanski93910e62023-07-06 16:15:06 +0200192// appendToId records a node state represented by Node into a Curator
193// WatchEvent.
194func (n *Node) appendToEvent(ev *ipb.WatchEvent) {
195 np := n.proto()
196 ev.Nodes = append(ev.Nodes, &ipb.Node{
197 Id: n.ID(),
198 Roles: np.Roles,
199 Status: np.Status,
200 Clusternet: np.Clusternet,
201 })
202}
203
Serge Bazanski80861fd2021-11-02 22:14:06 +0100204// appendToId records a node update represented by nodeAtID into a Curator
205// WatchEvent, either a Node or NodeTombstone.
206func (kv nodeAtID) appendToEvent(ev *ipb.WatchEvent) {
207 if node := kv.value; node != nil {
Serge Bazanski93910e62023-07-06 16:15:06 +0200208 node.appendToEvent(ev)
Serge Bazanski80861fd2021-11-02 22:14:06 +0100209 } else {
210 ev.NodeTombstones = append(ev.NodeTombstones, &ipb.WatchEvent_NodeTombstone{
211 NodeId: kv.id,
212 })
213 }
214}
215
Serge Bazanski2893e982021-09-09 13:06:16 +0200216// UpdateNodeStatus is called by nodes in the cluster to report their own
217// status. This status is recorded by the curator and can be retrieed via
218// Watch.
Serge Bazanski80861fd2021-11-02 22:14:06 +0100219func (l *leaderCurator) UpdateNodeStatus(ctx context.Context, req *ipb.UpdateNodeStatusRequest) (*ipb.UpdateNodeStatusResponse, error) {
Serge Bazanski2893e982021-09-09 13:06:16 +0200220 // Ensure that the given node_id matches the calling node. We currently
221 // only allow for direct self-reporting of status by nodes.
222 pi := rpc.GetPeerInfo(ctx)
223 if pi == nil || pi.Node == nil {
224 return nil, status.Error(codes.PermissionDenied, "only nodes can update node status")
225 }
226 id := identity.NodeID(pi.Node.PublicKey)
227 if id != req.NodeId {
228 return nil, status.Errorf(codes.PermissionDenied, "node %q cannot update the status of node %q", id, req.NodeId)
229 }
230
231 // Verify sent status. Currently we assume the entire status must be set at
232 // once, and cannot be unset.
233 if req.Status == nil || req.Status.ExternalAddress == "" {
234 return nil, status.Errorf(codes.InvalidArgument, "Status and Status.ExternalAddress must be set")
235 }
236
Serge Bazanski5839e972021-11-16 15:46:19 +0100237 if net.ParseIP(req.Status.ExternalAddress) == nil {
238 return nil, status.Errorf(codes.InvalidArgument, "Status.ExternalAddress must be a valid IP address")
239 }
240
Serge Bazanski2893e982021-09-09 13:06:16 +0200241 // As we're performing a node update with two etcd transactions below (one
242 // to retrieve, one to save and upate node), take a local lock to ensure
243 // that we don't have a race between either two UpdateNodeStatus calls or
244 // an UpdateNodeStatus call and some other mutation to the node store.
245 l.muNodes.Lock()
246 defer l.muNodes.Unlock()
247
248 // Retrieve node ...
Serge Bazanski030a5512021-11-18 16:39:39 +0100249 node, err := nodeLoad(ctx, l.leadership, id)
Serge Bazanski2893e982021-09-09 13:06:16 +0200250 if err != nil {
Serge Bazanski030a5512021-11-18 16:39:39 +0100251 return nil, err
Serge Bazanski2893e982021-09-09 13:06:16 +0200252 }
253 // ... update its' status ...
254 node.status = req.Status
Mateusz Zalega28800ad2022-07-08 14:56:02 +0200255 node.status.Timestamp = tpb.Now()
Serge Bazanski2893e982021-09-09 13:06:16 +0200256 // ... and save it to etcd.
Serge Bazanski030a5512021-11-18 16:39:39 +0100257 if err := nodeSave(ctx, l.leadership, node); err != nil {
258 return nil, err
Serge Bazanski2893e982021-09-09 13:06:16 +0200259 }
260
Serge Bazanski80861fd2021-11-02 22:14:06 +0100261 return &ipb.UpdateNodeStatusResponse{}, nil
Serge Bazanski2893e982021-09-09 13:06:16 +0200262}
Serge Bazanski516d3002021-10-01 00:05:41 +0200263
Mateusz Zalega32b19292022-05-17 13:26:55 +0200264func (l *leaderCurator) Heartbeat(stream ipb.Curator_HeartbeatServer) error {
265 // Ensure that the given node_id matches the calling node. We currently
266 // only allow for direct self-reporting of status by nodes.
267 ctx := stream.Context()
268 pi := rpc.GetPeerInfo(ctx)
269 if pi == nil || pi.Node == nil {
270 return status.Error(codes.PermissionDenied, "only nodes can send heartbeats")
271 }
272 id := identity.NodeID(pi.Node.PublicKey)
273
274 for {
275 _, err := stream.Recv()
276 if err == io.EOF {
277 return nil
278 }
279 if err != nil {
280 return err
281 }
282
283 // Update the node's timestamp within the local Curator state.
284 l.ls.heartbeatTimestamps.Store(id, time.Now())
285
286 rsp := &ipb.HeartbeatUpdateResponse{}
287 if err := stream.Send(rsp); err != nil {
288 return err
289 }
290 }
291}
292
Serge Bazanski516d3002021-10-01 00:05:41 +0200293func (l *leaderCurator) RegisterNode(ctx context.Context, req *ipb.RegisterNodeRequest) (*ipb.RegisterNodeResponse, error) {
294 // Call is unauthenticated - verify the other side has connected with an
295 // ephemeral certificate. That certificate's pubkey will become the node's
296 // pubkey.
297 pi := rpc.GetPeerInfo(ctx)
Serge Bazanski67788782023-03-28 20:13:18 +0200298 if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil {
Serge Bazanski516d3002021-10-01 00:05:41 +0200299 return nil, status.Error(codes.Unauthenticated, "connection must be established with a self-signed ephemeral certificate")
300 }
301 pubkey := pi.Unauthenticated.SelfSignedPublicKey
302
Mateusz Zalega2930e992022-04-25 12:52:35 +0200303 // Check the Join Key size.
304 if want, got := ed25519.PublicKeySize, len(req.JoinKey); want != got {
305 return nil, status.Errorf(codes.InvalidArgument, "join_key must be set and be %d bytes long", want)
306 }
Mateusz Zalega312a2272022-04-25 12:03:58 +0200307
Serge Bazanski516d3002021-10-01 00:05:41 +0200308 // Verify that call contains a RegisterTicket and that this RegisterTicket is
309 // valid.
310 wantTicket, err := l.ensureRegisterTicket(ctx)
311 if err != nil {
Serge Bazanski5a637b02022-02-18 12:18:04 +0100312 rpc.Trace(ctx).Printf("could not ensure register ticket: %v", err)
Serge Bazanski516d3002021-10-01 00:05:41 +0200313 return nil, status.Error(codes.Unavailable, "could not retrieve register ticket")
314 }
315 gotTicket := req.RegisterTicket
316 if subtle.ConstantTimeCompare(wantTicket, gotTicket) != 1 {
317 return nil, status.Error(codes.PermissionDenied, "registerticket invalid")
318 }
319
320 // Doing a read-then-write operation below, take lock.
321 //
322 // MVP: This can lock up the cluster if too many RegisterNode calls get issued,
323 // we should either ratelimit these or remove the need to lock.
324 l.muNodes.Lock()
325 defer l.muNodes.Unlock()
326
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100327 cl, err := clusterLoad(ctx, l.leadership)
328 if err != nil {
329 return nil, err
330 }
Serge Bazanskifd6d4eb2023-05-25 14:45:48 +0200331 nodeStorageSecurity, err := cl.NodeStorageSecurity()
332 if err != nil {
333 rpc.Trace(ctx).Printf("NodeStorageSecurity: %v", err)
334 return nil, status.Error(codes.InvalidArgument, "cannot generate recommended node storage security")
335 }
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100336
337 // Figure out if node should be using TPM.
338 tpmUsage, err := cl.NodeTPMUsage(req.HaveLocalTpm)
339 if err != nil {
340 return nil, status.Errorf(codes.PermissionDenied, "%s", err)
341 }
342
Serge Bazanski516d3002021-10-01 00:05:41 +0200343 // Check if there already is a node with this pubkey in the cluster.
344 id := identity.NodeID(pubkey)
Serge Bazanski030a5512021-11-18 16:39:39 +0100345 node, err := nodeLoad(ctx, l.leadership, id)
346 if err == nil {
Serge Bazanski516d3002021-10-01 00:05:41 +0200347 // If the existing node is in the NEW state already, there's nothing to do,
348 // return no error. This can happen in case of spurious retries from the calling
349 // node.
350 if node.state == cpb.NodeState_NODE_STATE_NEW {
351 return &ipb.RegisterNodeResponse{}, nil
352 }
353 // We can return a bit more information to the calling node here, as if it's in
354 // possession of the private key corresponding to an existing node in the
355 // cluster, it should have access to the status of the node without danger of
Serge Bazanski56114472021-10-11 14:47:54 +0200356 // leaking data about other nodes.
357 //
Serge Bazanski5a637b02022-02-18 12:18:04 +0100358 rpc.Trace(ctx).Printf("node %s already exists in cluster, failing", id)
Serge Bazanski516d3002021-10-01 00:05:41 +0200359 return nil, status.Errorf(codes.FailedPrecondition, "node already exists in cluster, state %s", node.state.String())
360 }
Serge Bazanski030a5512021-11-18 16:39:39 +0100361 if err != errNodeNotFound {
362 return nil, err
363 }
Serge Bazanski516d3002021-10-01 00:05:41 +0200364
365 // No node exists, create one.
Serge Bazanski030a5512021-11-18 16:39:39 +0100366 node = &Node{
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100367 pubkey: pubkey,
368 jkey: req.JoinKey,
369 state: cpb.NodeState_NODE_STATE_NEW,
370 tpmUsage: tpmUsage,
Serge Bazanski516d3002021-10-01 00:05:41 +0200371 }
Serge Bazanski030a5512021-11-18 16:39:39 +0100372 if err := nodeSave(ctx, l.leadership, node); err != nil {
373 return nil, err
Serge Bazanski516d3002021-10-01 00:05:41 +0200374 }
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100375
376 // Eat error, as we just deserialized this from a proto.
377 clusterConfig, _ := cl.proto()
378 return &ipb.RegisterNodeResponse{
Serge Bazanskifd6d4eb2023-05-25 14:45:48 +0200379 ClusterConfiguration: clusterConfig,
380 TpmUsage: tpmUsage,
381 RecommendedNodeStorageSecurity: nodeStorageSecurity,
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100382 }, nil
Serge Bazanski516d3002021-10-01 00:05:41 +0200383}
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100384
385func (l *leaderCurator) CommitNode(ctx context.Context, req *ipb.CommitNodeRequest) (*ipb.CommitNodeResponse, error) {
386 // Call is unauthenticated - verify the other side has connected with an
387 // ephemeral certificate. That certificate's pubkey will become the node's
388 // pubkey.
389 pi := rpc.GetPeerInfo(ctx)
Serge Bazanski67788782023-03-28 20:13:18 +0200390 if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil {
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100391 return nil, status.Error(codes.Unauthenticated, "connection must be established with a self-signed ephemeral certificate")
392 }
393 pubkey := pi.Unauthenticated.SelfSignedPublicKey
394
Serge Bazanskifd6d4eb2023-05-25 14:45:48 +0200395 // First pass check of node storage security, before loading the cluster data and
396 // taking a lock on it.
397 switch req.StorageSecurity {
398 case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_INSECURE:
399 case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_ENCRYPTED:
400 case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_AUTHENTICATED_ENCRYPTED:
401 default:
402 return nil, status.Error(codes.InvalidArgument, "invalid storage_security (is it set?)")
403 }
404
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100405 // Doing a read-then-write operation below, take lock.
406 //
407 // MVP: This can lock up the cluster if too many RegisterNode calls get issued,
408 // we should either ratelimit these or remove the need to lock.
409 l.muNodes.Lock()
410 defer l.muNodes.Unlock()
411
Serge Bazanskifd6d4eb2023-05-25 14:45:48 +0200412 cl, err := clusterLoad(ctx, l.leadership)
413 if err != nil {
414 return nil, err
415 }
416 if err := cl.ValidateNodeStorage(req.StorageSecurity); err != nil {
417 return nil, err
418 }
419
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100420 // Retrieve the node and act on its current state, either returning early or
421 // mutating it and continuing with the rest of the Commit logic.
Serge Bazanski030a5512021-11-18 16:39:39 +0100422 id := identity.NodeID(pubkey)
423 node, err := nodeLoad(ctx, l.leadership, id)
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100424 if err != nil {
Serge Bazanski030a5512021-11-18 16:39:39 +0100425 return nil, err
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100426 }
Serge Bazanski030a5512021-11-18 16:39:39 +0100427
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100428 switch node.state {
429 case cpb.NodeState_NODE_STATE_NEW:
430 return nil, status.Error(codes.PermissionDenied, "node is NEW, wait for attestation/approval")
431 case cpb.NodeState_NODE_STATE_DISOWNED:
432 // This node has been since disowned by the cluster for some reason, the
433 // register flow should be aborted.
434 return nil, status.Error(codes.FailedPrecondition, "node is DISOWNED, abort register flow")
435 case cpb.NodeState_NODE_STATE_UP:
436 // This can happen due to a network failure when we already handled a
437 // CommitNode, but we weren't able to respond to the user. CommitNode is
438 // non-idempotent, so just abort, the node should retry from scratch and this
439 // node should be manually disowned/deleted by system owners.
440 return nil, status.Error(codes.FailedPrecondition, "node is already UP, abort register flow")
441 case cpb.NodeState_NODE_STATE_STANDBY:
442 // This is what we want.
443 default:
444 return nil, status.Errorf(codes.Internal, "node is in unknown state: %v", node.state)
445 }
446
447 // Check the given CUK is valid.
448 // TODO(q3k): unify length with localstorage/crypt keySize.
Serge Bazanskifd6d4eb2023-05-25 14:45:48 +0200449 if req.StorageSecurity != cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_INSECURE {
450 if want, got := 32, len(req.ClusterUnlockKey); want != got {
451 return nil, status.Errorf(codes.InvalidArgument, "invalid ClusterUnlockKey length, wanted %d bytes, got %d", want, got)
452 }
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100453 }
454
455 // Generate certificate for node, save new node state, return.
456
457 // If this fails we are safe to let the client retry, as the PKI code is
458 // idempotent.
Serge Bazanski5839e972021-11-16 15:46:19 +0100459 caCertBytes, err := pkiCA.Ensure(ctx, l.etcd)
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100460 if err != nil {
Serge Bazanski5839e972021-11-16 15:46:19 +0100461 return nil, status.Errorf(codes.Unavailable, "could not get CA certificate: %v", err)
462 }
463 nodeCert := &pki.Certificate{
464 Namespace: &pkiNamespace,
465 Issuer: pkiCA,
466 Template: identity.NodeCertificate(node.pubkey),
467 Mode: pki.CertificateExternal,
468 PublicKey: node.pubkey,
469 Name: fmt.Sprintf("node-%s", identity.NodeID(pubkey)),
470 }
471 nodeCertBytes, err := nodeCert.Ensure(ctx, l.etcd)
472 if err != nil {
473 return nil, status.Errorf(codes.Unavailable, "could not emit node credentials: %v", err)
474 }
475
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100476 node.state = cpb.NodeState_NODE_STATE_UP
477 node.clusterUnlockKey = req.ClusterUnlockKey
Serge Bazanski030a5512021-11-18 16:39:39 +0100478 if err := nodeSave(ctx, l.leadership, node); err != nil {
479 return nil, err
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100480 }
481
482 // From this point on, any failure (in the server, or in the network, ...) dooms
483 // the node from making progress in registering, as Commit is non-idempotent.
484
485 return &ipb.CommitNodeResponse{
486 CaCertificate: caCertBytes,
487 NodeCertificate: nodeCertBytes,
488 }, nil
489}
Mateusz Zalega312a2272022-04-25 12:03:58 +0200490
491func (l *leaderCurator) JoinNode(ctx context.Context, req *ipb.JoinNodeRequest) (*ipb.JoinNodeResponse, error) {
492 // Gather peer information.
493 pi := rpc.GetPeerInfo(ctx)
Serge Bazanski67788782023-03-28 20:13:18 +0200494 if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil {
Mateusz Zalega312a2272022-04-25 12:03:58 +0200495 return nil, status.Error(codes.PermissionDenied, "connection must be established with a self-signed ephemeral certificate")
496 }
497 // The node will attempt to connect using its Join Key. jkey will contain
498 // its public part.
499 jkey := pi.Unauthenticated.SelfSignedPublicKey
500
501 // Take the lock to prevent data races during the next step.
502 l.muNodes.Lock()
503 defer l.muNodes.Unlock()
504
505 // Resolve the Node ID using Join Key, then use the ID to load node
506 // information from etcd.
507 id, err := nodeIdByJoinKey(ctx, l.leadership, jkey)
508 if err != nil {
509 return nil, err
510 }
511 node, err := nodeLoad(ctx, l.leadership, id)
512 if err != nil {
513 return nil, err
514 }
515
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100516 cl, err := clusterLoad(ctx, l.leadership)
517 if err != nil {
518 return nil, err
519 }
520
521 switch cl.TPMMode {
522 case cpb.ClusterConfiguration_TPM_MODE_REQUIRED:
523 if !req.UsingSealedConfiguration {
524 return nil, status.Errorf(codes.PermissionDenied, "cannot join this cluster with an unsealed configuration")
525 }
526 case cpb.ClusterConfiguration_TPM_MODE_DISABLED:
527 if req.UsingSealedConfiguration {
528 return nil, status.Errorf(codes.PermissionDenied, "cannot join this cluster with a sealed configuration")
529 }
530 }
531
532 if node.tpmUsage == cpb.NodeTPMUsage_NODE_TPM_PRESENT_AND_USED && !req.UsingSealedConfiguration {
533 return nil, status.Errorf(codes.PermissionDenied, "node registered with TPM, cannot join without one")
534 }
535 if node.tpmUsage != cpb.NodeTPMUsage_NODE_TPM_PRESENT_AND_USED && req.UsingSealedConfiguration {
536 return nil, status.Errorf(codes.PermissionDenied, "node registered without TPM, cannot join with one")
537 }
538
Mateusz Zalega312a2272022-04-25 12:03:58 +0200539 // Don't progress further unless the node is already UP.
540 if node.state != cpb.NodeState_NODE_STATE_UP {
541 return nil, status.Errorf(codes.FailedPrecondition, "node isn't UP, cannot join")
542 }
543
544 // Return the Node's CUK, completing the Join Flow.
545 return &ipb.JoinNodeResponse{
546 ClusterUnlockKey: node.clusterUnlockKey,
547 }, nil
548}
Serge Bazanski268dd8c2022-06-22 12:50:44 +0200549
550func (l *leaderCurator) GetCurrentLeader(_ *ipb.GetCurrentLeaderRequest, srv ipb.CuratorLocal_GetCurrentLeaderServer) error {
551 ctx := srv.Context()
552
553 // We're the leader.
554 node, err := nodeLoad(ctx, l.leadership, l.leaderID)
555 if err != nil {
556 rpc.Trace(ctx).Printf("nodeLoad(%q) failed: %v", l.leaderID, err)
557 return status.Errorf(codes.Unavailable, "failed to load leader node")
558 }
559 host := ""
560 if node.status != nil && node.status.ExternalAddress != "" {
561 host = node.status.ExternalAddress
562 }
563
564 err = srv.Send(&ipb.GetCurrentLeaderResponse{
565 LeaderNodeId: l.leaderID,
566 LeaderHost: host,
567 LeaderPort: int32(common.CuratorServicePort),
568 ThisNodeId: l.leaderID,
569 })
570 if err != nil {
571 return err
572 }
573
574 <-ctx.Done()
575 rpc.Trace(ctx).Printf("Interrupting due to context cancellation")
576 return nil
577}