blob: e3cc2cfaf846062ca0f1abf5d6ae6a988b1cf7e8 [file] [log] [blame]
Serge Bazanski99f47742021-08-04 20:21:42 +02001package curator
2
3import (
Serge Bazanski2893e982021-09-09 13:06:16 +02004 "context"
Mateusz Zalega2930e992022-04-25 12:52:35 +02005 "crypto/ed25519"
Serge Bazanski516d3002021-10-01 00:05:41 +02006 "crypto/subtle"
Serge Bazanski80861fd2021-11-02 22:14:06 +01007 "fmt"
Mateusz Zalega32b19292022-05-17 13:26:55 +02008 "io"
Serge Bazanski5839e972021-11-16 15:46:19 +01009 "net"
Mateusz Zalega312a2272022-04-25 12:03:58 +020010 "time"
Serge Bazanski2893e982021-09-09 13:06:16 +020011
Serge Bazanski99f47742021-08-04 20:21:42 +020012 "google.golang.org/grpc/codes"
13 "google.golang.org/grpc/status"
Serge Bazanski2893e982021-09-09 13:06:16 +020014 "google.golang.org/protobuf/proto"
Mateusz Zalega28800ad2022-07-08 14:56:02 +020015 tpb "google.golang.org/protobuf/types/known/timestamppb"
Serge Bazanski99f47742021-08-04 20:21:42 +020016
Serge Bazanski268dd8c2022-06-22 12:50:44 +020017 common "source.monogon.dev/metropolis/node"
Serge Bazanski80861fd2021-11-02 22:14:06 +010018 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Serge Bazanski2893e982021-09-09 13:06:16 +020019 "source.monogon.dev/metropolis/node/core/identity"
20 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski37110c32023-03-01 13:57:27 +000021 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanski99f47742021-08-04 20:21:42 +020022 "source.monogon.dev/metropolis/pkg/event/etcd"
Serge Bazanski5839e972021-11-16 15:46:19 +010023 "source.monogon.dev/metropolis/pkg/pki"
Serge Bazanski516d3002021-10-01 00:05:41 +020024 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski99f47742021-08-04 20:21:42 +020025)
26
Serge Bazanski80861fd2021-11-02 22:14:06 +010027// leaderCurator implements the Curator gRPC API (ipb.Curator) as a curator
Serge Bazanski99f47742021-08-04 20:21:42 +020028// leader.
29type leaderCurator struct {
Serge Bazanski3be48322021-10-05 17:24:26 +020030 *leadership
Serge Bazanski99f47742021-08-04 20:21:42 +020031}
32
33// Watch returns a stream of updates concerning some part of the cluster
34// managed by the curator.
35//
Serge Bazanski80861fd2021-11-02 22:14:06 +010036// See metropolis.node.core.curator.proto.api.Curator for more information about
37// the RPC semantics.
38//
39// TODO(q3k): Currently the watch RPCs are individually backed by etcd cluster
40// watches (via individual etcd event values), which might be problematic in
41// case of a significant amount of parallel Watches being issued to the Curator.
42// It might make sense to combine all pending Watch requests into a single watch
43// issued to the cluster, with an intermediary caching stage within the curator
44// instance. However, that is effectively implementing etcd learner/relay logic,
45// which has has to be carefully considered, especially with regards to serving
46// stale data.
47func (l *leaderCurator) Watch(req *ipb.WatchRequest, srv ipb.Curator_WatchServer) error {
48 switch x := req.Kind.(type) {
49 case *ipb.WatchRequest_NodeInCluster_:
50 return l.watchNodeInCluster(x.NodeInCluster, srv)
51 case *ipb.WatchRequest_NodesInCluster_:
52 return l.watchNodesInCluster(x.NodesInCluster, srv)
53 default:
Serge Bazanski99f47742021-08-04 20:21:42 +020054 return status.Error(codes.Unimplemented, "unsupported watch kind")
55 }
Serge Bazanski80861fd2021-11-02 22:14:06 +010056}
57
58// watchNodeInCluster implements the Watch API when dealing with a single
59// node-in-cluster request. Effectively, it pipes an etcd value watcher into the
60// Watch API.
61func (l *leaderCurator) watchNodeInCluster(nic *ipb.WatchRequest_NodeInCluster, srv ipb.Curator_WatchServer) error {
62 ctx := srv.Context()
63
Serge Bazanski99f47742021-08-04 20:21:42 +020064 // Constructing arbitrary etcd path: this is okay, as we only have node objects
65 // underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node
66 // that doesn't exist, and that will just hang . All access is privileged, so
67 // there's also no need to filter anything.
Serge Bazanski80861fd2021-11-02 22:14:06 +010068 nodePath, err := nodeEtcdPrefix.Key(nic.NodeId)
Serge Bazanski080f7ff2021-09-09 13:01:00 +020069 if err != nil {
70 return status.Errorf(codes.InvalidArgument, "invalid node name: %v", err)
71 }
Serge Bazanski80861fd2021-11-02 22:14:06 +010072 value := etcd.NewValue(l.etcd, nodePath, nodeValueConverter)
Serge Bazanski99f47742021-08-04 20:21:42 +020073
Serge Bazanski99f47742021-08-04 20:21:42 +020074 w := value.Watch()
75 defer w.Close()
76
77 for {
Serge Bazanski37110c32023-03-01 13:57:27 +000078 nodeKV, err := w.Get(ctx)
Serge Bazanski99f47742021-08-04 20:21:42 +020079 if err != nil {
80 if rpcErr, ok := rpcError(err); ok {
81 return rpcErr
82 }
Serge Bazanski5a637b02022-02-18 12:18:04 +010083 rpc.Trace(ctx).Printf("etcd watch failed: %v", err)
Serge Bazanskibc7614e2021-09-09 13:07:09 +020084 return status.Error(codes.Unavailable, "internal error")
Serge Bazanski99f47742021-08-04 20:21:42 +020085 }
Serge Bazanski80861fd2021-11-02 22:14:06 +010086
87 ev := &ipb.WatchEvent{}
Serge Bazanski80861fd2021-11-02 22:14:06 +010088 nodeKV.appendToEvent(ev)
Serge Bazanski99f47742021-08-04 20:21:42 +020089 if err := srv.Send(ev); err != nil {
90 return err
91 }
92 }
93}
Serge Bazanski2893e982021-09-09 13:06:16 +020094
Serge Bazanski80861fd2021-11-02 22:14:06 +010095// watchNodesInCluster implements the Watch API when dealing with a
96// all-nodes-in-cluster request. Effectively, it pipes a ranged etcd value
97// watcher into the Watch API.
98func (l *leaderCurator) watchNodesInCluster(_ *ipb.WatchRequest_NodesInCluster, srv ipb.Curator_WatchServer) error {
99 ctx := srv.Context()
100
101 start, end := nodeEtcdPrefix.KeyRange()
Serge Bazanski37110c32023-03-01 13:57:27 +0000102 value := etcd.NewValue[*nodeAtID](l.etcd, start, nodeValueConverter, etcd.Range(end))
Serge Bazanski80861fd2021-11-02 22:14:06 +0100103
104 w := value.Watch()
105 defer w.Close()
106
107 // Perform initial fetch from etcd.
108 nodes := make(map[string]*Node)
109 for {
Serge Bazanski37110c32023-03-01 13:57:27 +0000110 nodeKV, err := w.Get(ctx, event.BacklogOnly[*nodeAtID]())
111 if err == event.BacklogDone {
Serge Bazanski80861fd2021-11-02 22:14:06 +0100112 break
113 }
114 if err != nil {
Serge Bazanski5a637b02022-02-18 12:18:04 +0100115 rpc.Trace(ctx).Printf("etcd watch failed (initial fetch): %v", err)
Serge Bazanski80861fd2021-11-02 22:14:06 +0100116 return status.Error(codes.Unavailable, "internal error during initial fetch")
117 }
Serge Bazanski80861fd2021-11-02 22:14:06 +0100118 if nodeKV.value != nil {
119 nodes[nodeKV.id] = nodeKV.value
120 }
121 }
122
123 // Initial send, chunked to not go over 2MiB (half of the default gRPC message
124 // size limit).
125 //
126 // TODO(q3k): formalize message limits, set const somewhere.
127 we := &ipb.WatchEvent{}
128 for _, n := range nodes {
129 we.Nodes = append(we.Nodes, &ipb.Node{
130 Id: n.ID(),
131 Roles: n.proto().Roles,
132 Status: n.status,
133 })
134 if proto.Size(we) > (2 << 20) {
135 if err := srv.Send(we); err != nil {
136 return err
137 }
138 we = &ipb.WatchEvent{}
139 }
140 }
141 // Send last update message. This might be empty, but we need to send the
142 // LAST_BACKLOGGED marker.
143 we.Progress = ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED
144 if err := srv.Send(we); err != nil {
145 return err
146 }
147
148 // Send updates as they arrive from etcd watcher.
149 for {
Serge Bazanski37110c32023-03-01 13:57:27 +0000150 nodeKV, err := w.Get(ctx)
Serge Bazanski80861fd2021-11-02 22:14:06 +0100151 if err != nil {
Serge Bazanski5a637b02022-02-18 12:18:04 +0100152 rpc.Trace(ctx).Printf("etcd watch failed (update): %v", err)
Serge Bazanski80861fd2021-11-02 22:14:06 +0100153 return status.Errorf(codes.Unavailable, "internal error during update")
154 }
155 we := &ipb.WatchEvent{}
Serge Bazanski80861fd2021-11-02 22:14:06 +0100156 nodeKV.appendToEvent(we)
157 if err := srv.Send(we); err != nil {
158 return err
159 }
160 }
161}
162
163// nodeAtID is a key/pair container for a node update received from an etcd
164// watcher. The value will be nil if this update represents a node being
165// deleted.
166type nodeAtID struct {
167 id string
168 value *Node
169}
170
171// nodeValueConverter is called by etcd node value watchers to convert updates
172// from the cluster into nodeAtID, ensuring data integrity and checking
173// invariants.
Serge Bazanski37110c32023-03-01 13:57:27 +0000174func nodeValueConverter(key, value []byte) (*nodeAtID, error) {
Serge Bazanski80861fd2021-11-02 22:14:06 +0100175 res := nodeAtID{
176 id: nodeEtcdPrefix.ExtractID(string(key)),
177 }
178 if len(value) > 0 {
179 node, err := nodeUnmarshal(value)
180 if err != nil {
181 return nil, err
182 }
183 res.value = node
184 if res.id != res.value.ID() {
185 return nil, fmt.Errorf("node ID mismatch (etcd key: %q, value: %q)", res.id, res.value.ID())
186 }
187 }
188 if res.id == "" {
189 // This shouldn't happen, to the point where this might be better handled by a
190 // panic.
191 return nil, fmt.Errorf("invalid node key %q", key)
192 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000193 return &res, nil
Serge Bazanski80861fd2021-11-02 22:14:06 +0100194}
195
196// appendToId records a node update represented by nodeAtID into a Curator
197// WatchEvent, either a Node or NodeTombstone.
198func (kv nodeAtID) appendToEvent(ev *ipb.WatchEvent) {
199 if node := kv.value; node != nil {
Serge Bazanskie6bc2272023-03-28 16:28:13 +0200200 np := node.proto()
Serge Bazanski80861fd2021-11-02 22:14:06 +0100201 ev.Nodes = append(ev.Nodes, &ipb.Node{
Serge Bazanskie6bc2272023-03-28 16:28:13 +0200202 Id: node.ID(),
203 Roles: np.Roles,
204 Status: np.Status,
205 Clusternet: np.Clusternet,
Serge Bazanski80861fd2021-11-02 22:14:06 +0100206 })
207 } else {
208 ev.NodeTombstones = append(ev.NodeTombstones, &ipb.WatchEvent_NodeTombstone{
209 NodeId: kv.id,
210 })
211 }
212}
213
Serge Bazanski2893e982021-09-09 13:06:16 +0200214// UpdateNodeStatus is called by nodes in the cluster to report their own
215// status. This status is recorded by the curator and can be retrieed via
216// Watch.
Serge Bazanski80861fd2021-11-02 22:14:06 +0100217func (l *leaderCurator) UpdateNodeStatus(ctx context.Context, req *ipb.UpdateNodeStatusRequest) (*ipb.UpdateNodeStatusResponse, error) {
Serge Bazanski2893e982021-09-09 13:06:16 +0200218 // Ensure that the given node_id matches the calling node. We currently
219 // only allow for direct self-reporting of status by nodes.
220 pi := rpc.GetPeerInfo(ctx)
221 if pi == nil || pi.Node == nil {
222 return nil, status.Error(codes.PermissionDenied, "only nodes can update node status")
223 }
224 id := identity.NodeID(pi.Node.PublicKey)
225 if id != req.NodeId {
226 return nil, status.Errorf(codes.PermissionDenied, "node %q cannot update the status of node %q", id, req.NodeId)
227 }
228
229 // Verify sent status. Currently we assume the entire status must be set at
230 // once, and cannot be unset.
231 if req.Status == nil || req.Status.ExternalAddress == "" {
232 return nil, status.Errorf(codes.InvalidArgument, "Status and Status.ExternalAddress must be set")
233 }
234
Serge Bazanski5839e972021-11-16 15:46:19 +0100235 if net.ParseIP(req.Status.ExternalAddress) == nil {
236 return nil, status.Errorf(codes.InvalidArgument, "Status.ExternalAddress must be a valid IP address")
237 }
238
Serge Bazanski2893e982021-09-09 13:06:16 +0200239 // As we're performing a node update with two etcd transactions below (one
240 // to retrieve, one to save and upate node), take a local lock to ensure
241 // that we don't have a race between either two UpdateNodeStatus calls or
242 // an UpdateNodeStatus call and some other mutation to the node store.
243 l.muNodes.Lock()
244 defer l.muNodes.Unlock()
245
246 // Retrieve node ...
Serge Bazanski030a5512021-11-18 16:39:39 +0100247 node, err := nodeLoad(ctx, l.leadership, id)
Serge Bazanski2893e982021-09-09 13:06:16 +0200248 if err != nil {
Serge Bazanski030a5512021-11-18 16:39:39 +0100249 return nil, err
Serge Bazanski2893e982021-09-09 13:06:16 +0200250 }
251 // ... update its' status ...
252 node.status = req.Status
Mateusz Zalega28800ad2022-07-08 14:56:02 +0200253 node.status.Timestamp = tpb.Now()
Serge Bazanski2893e982021-09-09 13:06:16 +0200254 // ... and save it to etcd.
Serge Bazanski030a5512021-11-18 16:39:39 +0100255 if err := nodeSave(ctx, l.leadership, node); err != nil {
256 return nil, err
Serge Bazanski2893e982021-09-09 13:06:16 +0200257 }
258
Serge Bazanski80861fd2021-11-02 22:14:06 +0100259 return &ipb.UpdateNodeStatusResponse{}, nil
Serge Bazanski2893e982021-09-09 13:06:16 +0200260}
Serge Bazanski516d3002021-10-01 00:05:41 +0200261
Mateusz Zalega32b19292022-05-17 13:26:55 +0200262func (l *leaderCurator) Heartbeat(stream ipb.Curator_HeartbeatServer) error {
263 // Ensure that the given node_id matches the calling node. We currently
264 // only allow for direct self-reporting of status by nodes.
265 ctx := stream.Context()
266 pi := rpc.GetPeerInfo(ctx)
267 if pi == nil || pi.Node == nil {
268 return status.Error(codes.PermissionDenied, "only nodes can send heartbeats")
269 }
270 id := identity.NodeID(pi.Node.PublicKey)
271
272 for {
273 _, err := stream.Recv()
274 if err == io.EOF {
275 return nil
276 }
277 if err != nil {
278 return err
279 }
280
281 // Update the node's timestamp within the local Curator state.
282 l.ls.heartbeatTimestamps.Store(id, time.Now())
283
284 rsp := &ipb.HeartbeatUpdateResponse{}
285 if err := stream.Send(rsp); err != nil {
286 return err
287 }
288 }
289}
290
Serge Bazanski516d3002021-10-01 00:05:41 +0200291func (l *leaderCurator) RegisterNode(ctx context.Context, req *ipb.RegisterNodeRequest) (*ipb.RegisterNodeResponse, error) {
292 // Call is unauthenticated - verify the other side has connected with an
293 // ephemeral certificate. That certificate's pubkey will become the node's
294 // pubkey.
295 pi := rpc.GetPeerInfo(ctx)
Serge Bazanski67788782023-03-28 20:13:18 +0200296 if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil {
Serge Bazanski516d3002021-10-01 00:05:41 +0200297 return nil, status.Error(codes.Unauthenticated, "connection must be established with a self-signed ephemeral certificate")
298 }
299 pubkey := pi.Unauthenticated.SelfSignedPublicKey
300
Mateusz Zalega2930e992022-04-25 12:52:35 +0200301 // Check the Join Key size.
302 if want, got := ed25519.PublicKeySize, len(req.JoinKey); want != got {
303 return nil, status.Errorf(codes.InvalidArgument, "join_key must be set and be %d bytes long", want)
304 }
Mateusz Zalega312a2272022-04-25 12:03:58 +0200305
Serge Bazanski516d3002021-10-01 00:05:41 +0200306 // Verify that call contains a RegisterTicket and that this RegisterTicket is
307 // valid.
308 wantTicket, err := l.ensureRegisterTicket(ctx)
309 if err != nil {
Serge Bazanski5a637b02022-02-18 12:18:04 +0100310 rpc.Trace(ctx).Printf("could not ensure register ticket: %v", err)
Serge Bazanski516d3002021-10-01 00:05:41 +0200311 return nil, status.Error(codes.Unavailable, "could not retrieve register ticket")
312 }
313 gotTicket := req.RegisterTicket
314 if subtle.ConstantTimeCompare(wantTicket, gotTicket) != 1 {
315 return nil, status.Error(codes.PermissionDenied, "registerticket invalid")
316 }
317
318 // Doing a read-then-write operation below, take lock.
319 //
320 // MVP: This can lock up the cluster if too many RegisterNode calls get issued,
321 // we should either ratelimit these or remove the need to lock.
322 l.muNodes.Lock()
323 defer l.muNodes.Unlock()
324
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100325 cl, err := clusterLoad(ctx, l.leadership)
326 if err != nil {
327 return nil, err
328 }
Serge Bazanskifd6d4eb2023-05-25 14:45:48 +0200329 nodeStorageSecurity, err := cl.NodeStorageSecurity()
330 if err != nil {
331 rpc.Trace(ctx).Printf("NodeStorageSecurity: %v", err)
332 return nil, status.Error(codes.InvalidArgument, "cannot generate recommended node storage security")
333 }
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100334
335 // Figure out if node should be using TPM.
336 tpmUsage, err := cl.NodeTPMUsage(req.HaveLocalTpm)
337 if err != nil {
338 return nil, status.Errorf(codes.PermissionDenied, "%s", err)
339 }
340
Serge Bazanski516d3002021-10-01 00:05:41 +0200341 // Check if there already is a node with this pubkey in the cluster.
342 id := identity.NodeID(pubkey)
Serge Bazanski030a5512021-11-18 16:39:39 +0100343 node, err := nodeLoad(ctx, l.leadership, id)
344 if err == nil {
Serge Bazanski516d3002021-10-01 00:05:41 +0200345 // If the existing node is in the NEW state already, there's nothing to do,
346 // return no error. This can happen in case of spurious retries from the calling
347 // node.
348 if node.state == cpb.NodeState_NODE_STATE_NEW {
349 return &ipb.RegisterNodeResponse{}, nil
350 }
351 // We can return a bit more information to the calling node here, as if it's in
352 // possession of the private key corresponding to an existing node in the
353 // cluster, it should have access to the status of the node without danger of
Serge Bazanski56114472021-10-11 14:47:54 +0200354 // leaking data about other nodes.
355 //
Serge Bazanski5a637b02022-02-18 12:18:04 +0100356 rpc.Trace(ctx).Printf("node %s already exists in cluster, failing", id)
Serge Bazanski516d3002021-10-01 00:05:41 +0200357 return nil, status.Errorf(codes.FailedPrecondition, "node already exists in cluster, state %s", node.state.String())
358 }
Serge Bazanski030a5512021-11-18 16:39:39 +0100359 if err != errNodeNotFound {
360 return nil, err
361 }
Serge Bazanski516d3002021-10-01 00:05:41 +0200362
363 // No node exists, create one.
Serge Bazanski030a5512021-11-18 16:39:39 +0100364 node = &Node{
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100365 pubkey: pubkey,
366 jkey: req.JoinKey,
367 state: cpb.NodeState_NODE_STATE_NEW,
368 tpmUsage: tpmUsage,
Serge Bazanski516d3002021-10-01 00:05:41 +0200369 }
Serge Bazanski030a5512021-11-18 16:39:39 +0100370 if err := nodeSave(ctx, l.leadership, node); err != nil {
371 return nil, err
Serge Bazanski516d3002021-10-01 00:05:41 +0200372 }
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100373
374 // Eat error, as we just deserialized this from a proto.
375 clusterConfig, _ := cl.proto()
376 return &ipb.RegisterNodeResponse{
Serge Bazanskifd6d4eb2023-05-25 14:45:48 +0200377 ClusterConfiguration: clusterConfig,
378 TpmUsage: tpmUsage,
379 RecommendedNodeStorageSecurity: nodeStorageSecurity,
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100380 }, nil
Serge Bazanski516d3002021-10-01 00:05:41 +0200381}
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100382
383func (l *leaderCurator) CommitNode(ctx context.Context, req *ipb.CommitNodeRequest) (*ipb.CommitNodeResponse, error) {
384 // Call is unauthenticated - verify the other side has connected with an
385 // ephemeral certificate. That certificate's pubkey will become the node's
386 // pubkey.
387 pi := rpc.GetPeerInfo(ctx)
Serge Bazanski67788782023-03-28 20:13:18 +0200388 if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil {
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100389 return nil, status.Error(codes.Unauthenticated, "connection must be established with a self-signed ephemeral certificate")
390 }
391 pubkey := pi.Unauthenticated.SelfSignedPublicKey
392
Serge Bazanskifd6d4eb2023-05-25 14:45:48 +0200393 // First pass check of node storage security, before loading the cluster data and
394 // taking a lock on it.
395 switch req.StorageSecurity {
396 case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_INSECURE:
397 case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_ENCRYPTED:
398 case cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_AUTHENTICATED_ENCRYPTED:
399 default:
400 return nil, status.Error(codes.InvalidArgument, "invalid storage_security (is it set?)")
401 }
402
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100403 // Doing a read-then-write operation below, take lock.
404 //
405 // MVP: This can lock up the cluster if too many RegisterNode calls get issued,
406 // we should either ratelimit these or remove the need to lock.
407 l.muNodes.Lock()
408 defer l.muNodes.Unlock()
409
Serge Bazanskifd6d4eb2023-05-25 14:45:48 +0200410 cl, err := clusterLoad(ctx, l.leadership)
411 if err != nil {
412 return nil, err
413 }
414 if err := cl.ValidateNodeStorage(req.StorageSecurity); err != nil {
415 return nil, err
416 }
417
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100418 // Retrieve the node and act on its current state, either returning early or
419 // mutating it and continuing with the rest of the Commit logic.
Serge Bazanski030a5512021-11-18 16:39:39 +0100420 id := identity.NodeID(pubkey)
421 node, err := nodeLoad(ctx, l.leadership, id)
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100422 if err != nil {
Serge Bazanski030a5512021-11-18 16:39:39 +0100423 return nil, err
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100424 }
Serge Bazanski030a5512021-11-18 16:39:39 +0100425
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100426 switch node.state {
427 case cpb.NodeState_NODE_STATE_NEW:
428 return nil, status.Error(codes.PermissionDenied, "node is NEW, wait for attestation/approval")
429 case cpb.NodeState_NODE_STATE_DISOWNED:
430 // This node has been since disowned by the cluster for some reason, the
431 // register flow should be aborted.
432 return nil, status.Error(codes.FailedPrecondition, "node is DISOWNED, abort register flow")
433 case cpb.NodeState_NODE_STATE_UP:
434 // This can happen due to a network failure when we already handled a
435 // CommitNode, but we weren't able to respond to the user. CommitNode is
436 // non-idempotent, so just abort, the node should retry from scratch and this
437 // node should be manually disowned/deleted by system owners.
438 return nil, status.Error(codes.FailedPrecondition, "node is already UP, abort register flow")
439 case cpb.NodeState_NODE_STATE_STANDBY:
440 // This is what we want.
441 default:
442 return nil, status.Errorf(codes.Internal, "node is in unknown state: %v", node.state)
443 }
444
445 // Check the given CUK is valid.
446 // TODO(q3k): unify length with localstorage/crypt keySize.
Serge Bazanskifd6d4eb2023-05-25 14:45:48 +0200447 if req.StorageSecurity != cpb.NodeStorageSecurity_NODE_STORAGE_SECURITY_INSECURE {
448 if want, got := 32, len(req.ClusterUnlockKey); want != got {
449 return nil, status.Errorf(codes.InvalidArgument, "invalid ClusterUnlockKey length, wanted %d bytes, got %d", want, got)
450 }
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100451 }
452
453 // Generate certificate for node, save new node state, return.
454
455 // If this fails we are safe to let the client retry, as the PKI code is
456 // idempotent.
Serge Bazanski5839e972021-11-16 15:46:19 +0100457 caCertBytes, err := pkiCA.Ensure(ctx, l.etcd)
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100458 if err != nil {
Serge Bazanski5839e972021-11-16 15:46:19 +0100459 return nil, status.Errorf(codes.Unavailable, "could not get CA certificate: %v", err)
460 }
461 nodeCert := &pki.Certificate{
462 Namespace: &pkiNamespace,
463 Issuer: pkiCA,
464 Template: identity.NodeCertificate(node.pubkey),
465 Mode: pki.CertificateExternal,
466 PublicKey: node.pubkey,
467 Name: fmt.Sprintf("node-%s", identity.NodeID(pubkey)),
468 }
469 nodeCertBytes, err := nodeCert.Ensure(ctx, l.etcd)
470 if err != nil {
471 return nil, status.Errorf(codes.Unavailable, "could not emit node credentials: %v", err)
472 }
473
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100474 node.state = cpb.NodeState_NODE_STATE_UP
475 node.clusterUnlockKey = req.ClusterUnlockKey
Serge Bazanski030a5512021-11-18 16:39:39 +0100476 if err := nodeSave(ctx, l.leadership, node); err != nil {
477 return nil, err
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100478 }
479
480 // From this point on, any failure (in the server, or in the network, ...) dooms
481 // the node from making progress in registering, as Commit is non-idempotent.
482
483 return &ipb.CommitNodeResponse{
484 CaCertificate: caCertBytes,
485 NodeCertificate: nodeCertBytes,
486 }, nil
487}
Mateusz Zalega312a2272022-04-25 12:03:58 +0200488
489func (l *leaderCurator) JoinNode(ctx context.Context, req *ipb.JoinNodeRequest) (*ipb.JoinNodeResponse, error) {
490 // Gather peer information.
491 pi := rpc.GetPeerInfo(ctx)
Serge Bazanski67788782023-03-28 20:13:18 +0200492 if pi == nil || pi.Unauthenticated == nil || pi.Unauthenticated.SelfSignedPublicKey == nil {
Mateusz Zalega312a2272022-04-25 12:03:58 +0200493 return nil, status.Error(codes.PermissionDenied, "connection must be established with a self-signed ephemeral certificate")
494 }
495 // The node will attempt to connect using its Join Key. jkey will contain
496 // its public part.
497 jkey := pi.Unauthenticated.SelfSignedPublicKey
498
499 // Take the lock to prevent data races during the next step.
500 l.muNodes.Lock()
501 defer l.muNodes.Unlock()
502
503 // Resolve the Node ID using Join Key, then use the ID to load node
504 // information from etcd.
505 id, err := nodeIdByJoinKey(ctx, l.leadership, jkey)
506 if err != nil {
507 return nil, err
508 }
509 node, err := nodeLoad(ctx, l.leadership, id)
510 if err != nil {
511 return nil, err
512 }
513
Serge Bazanskie4a4ce12023-03-22 18:29:54 +0100514 cl, err := clusterLoad(ctx, l.leadership)
515 if err != nil {
516 return nil, err
517 }
518
519 switch cl.TPMMode {
520 case cpb.ClusterConfiguration_TPM_MODE_REQUIRED:
521 if !req.UsingSealedConfiguration {
522 return nil, status.Errorf(codes.PermissionDenied, "cannot join this cluster with an unsealed configuration")
523 }
524 case cpb.ClusterConfiguration_TPM_MODE_DISABLED:
525 if req.UsingSealedConfiguration {
526 return nil, status.Errorf(codes.PermissionDenied, "cannot join this cluster with a sealed configuration")
527 }
528 }
529
530 if node.tpmUsage == cpb.NodeTPMUsage_NODE_TPM_PRESENT_AND_USED && !req.UsingSealedConfiguration {
531 return nil, status.Errorf(codes.PermissionDenied, "node registered with TPM, cannot join without one")
532 }
533 if node.tpmUsage != cpb.NodeTPMUsage_NODE_TPM_PRESENT_AND_USED && req.UsingSealedConfiguration {
534 return nil, status.Errorf(codes.PermissionDenied, "node registered without TPM, cannot join with one")
535 }
536
Mateusz Zalega312a2272022-04-25 12:03:58 +0200537 // Don't progress further unless the node is already UP.
538 if node.state != cpb.NodeState_NODE_STATE_UP {
539 return nil, status.Errorf(codes.FailedPrecondition, "node isn't UP, cannot join")
540 }
541
542 // Return the Node's CUK, completing the Join Flow.
543 return &ipb.JoinNodeResponse{
544 ClusterUnlockKey: node.clusterUnlockKey,
545 }, nil
546}
Serge Bazanski268dd8c2022-06-22 12:50:44 +0200547
548func (l *leaderCurator) GetCurrentLeader(_ *ipb.GetCurrentLeaderRequest, srv ipb.CuratorLocal_GetCurrentLeaderServer) error {
549 ctx := srv.Context()
550
551 // We're the leader.
552 node, err := nodeLoad(ctx, l.leadership, l.leaderID)
553 if err != nil {
554 rpc.Trace(ctx).Printf("nodeLoad(%q) failed: %v", l.leaderID, err)
555 return status.Errorf(codes.Unavailable, "failed to load leader node")
556 }
557 host := ""
558 if node.status != nil && node.status.ExternalAddress != "" {
559 host = node.status.ExternalAddress
560 }
561
562 err = srv.Send(&ipb.GetCurrentLeaderResponse{
563 LeaderNodeId: l.leaderID,
564 LeaderHost: host,
565 LeaderPort: int32(common.CuratorServicePort),
566 ThisNodeId: l.leaderID,
567 })
568 if err != nil {
569 return err
570 }
571
572 <-ctx.Done()
573 rpc.Trace(ctx).Printf("Interrupting due to context cancellation")
574 return nil
575}