blob: 86a1af6e38812861f384fb9bd23061f518eafe34 [file] [log] [blame]
Serge Bazanski6bd41592021-08-23 13:18:37 +02001package curator
2
3import (
Serge Bazanskibc671d02021-10-05 17:53:32 +02004 "bytes"
Serge Bazanski6bd41592021-08-23 13:18:37 +02005 "context"
Serge Bazanski1612d4b2021-11-12 13:54:15 +01006 "crypto/ed25519"
Serge Bazanskibc671d02021-10-05 17:53:32 +02007 "sort"
Mateusz Zalega32b19292022-05-17 13:26:55 +02008 "time"
Serge Bazanski6bd41592021-08-23 13:18:37 +02009
Serge Bazanski6bd41592021-08-23 13:18:37 +020010 "google.golang.org/grpc/codes"
11 "google.golang.org/grpc/status"
Serge Bazanski6bd41592021-08-23 13:18:37 +020012
Serge Bazanski2f58ac02021-10-05 11:47:20 +020013 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski5a637b02022-02-18 12:18:04 +010014 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski6bd41592021-08-23 13:18:37 +020015 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskibc671d02021-10-05 17:53:32 +020016 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski6bd41592021-08-23 13:18:37 +020017)
18
19type leaderManagement struct {
Serge Bazanski3be48322021-10-05 17:24:26 +020020 *leadership
Serge Bazanski2f58ac02021-10-05 11:47:20 +020021
22 // node certificate on which leaderManagement runs. It's used by
23 // GetClusterInformation which needs access to the CA pubkey.
24 // Alternatively this could be stored in etcd, instead of being dependency
25 // injected here.
26 node *identity.Node
Serge Bazanski6bd41592021-08-23 13:18:37 +020027}
28
29const (
30 // registerTicketSize is the size, in bytes, of the RegisterTicket used to
31 // perform early perimeter checks for nodes which wish to register into the
32 // cluster.
33 //
34 // The size was picked to offer resistance against on-line bruteforcing attacks
35 // in even the worst case scenario (no ratelimiting, no monitoring, zero latency
36 // between attacker and cluster). 256 bits of entropy require 3.6e68 requests
37 // per second to bruteforce the ticket within 10 years. The ticket doesn't need
38 // to be manually copied by humans, so the relatively overkill size also doesn't
39 // impact usability.
40 registerTicketSize = 32
Mateusz Zalega32b19292022-05-17 13:26:55 +020041
42 // HeartbeatPeriod is the duration between consecutive heartbeat update
43 // messages sent by the node.
44 HeartbeatInterval = time.Second * 5
45
46 // HeartbeatTimeout is the duration after which a node is considered to be
47 // timing out, given no recent heartbeat updates were received by the leader.
48 HeartbeatTimeout = HeartbeatInterval * 2
Serge Bazanski6bd41592021-08-23 13:18:37 +020049)
50
51const (
52 // registerTicketEtcdPath is the etcd key under which private.RegisterTicket is
53 // stored.
54 registerTicketEtcdPath = "/global/register_ticket"
55)
56
57func (l *leaderManagement) GetRegisterTicket(ctx context.Context, req *apb.GetRegisterTicketRequest) (*apb.GetRegisterTicketResponse, error) {
Serge Bazanski516d3002021-10-01 00:05:41 +020058 ticket, err := l.ensureRegisterTicket(ctx)
Serge Bazanski6bd41592021-08-23 13:18:37 +020059 if err != nil {
Serge Bazanski516d3002021-10-01 00:05:41 +020060 return nil, err
Serge Bazanski6bd41592021-08-23 13:18:37 +020061 }
Serge Bazanski6bd41592021-08-23 13:18:37 +020062 return &apb.GetRegisterTicketResponse{
Serge Bazanski516d3002021-10-01 00:05:41 +020063 Ticket: ticket,
Serge Bazanski6bd41592021-08-23 13:18:37 +020064 }, nil
65}
Serge Bazanskibc671d02021-10-05 17:53:32 +020066
Serge Bazanski56114472021-10-11 14:47:54 +020067// GetClusterInfo implements Management.GetClusterInfo, which returns summary
Serge Bazanskibc671d02021-10-05 17:53:32 +020068// information about the Metropolis cluster.
69func (l *leaderManagement) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (*apb.GetClusterInfoResponse, error) {
70 res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
71 if err != nil {
72 return nil, status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
73 }
74
75 // Sort nodes by public key, filter out Up, use top 15 in cluster directory
76 // (limited to an arbitrary amount that doesn't overload callers with
77 // unnecesssary information).
78 //
79 // MVP: this should be formalized and possibly re-designed/engineered.
80 kvs := res.Responses[0].GetResponseRange().Kvs
81 var nodes []*Node
82 for _, kv := range kvs {
83 node, err := nodeUnmarshal(kv.Value)
84 if err != nil {
Serge Bazanski5a637b02022-02-18 12:18:04 +010085 rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err)
Serge Bazanskibc671d02021-10-05 17:53:32 +020086 continue
87 }
88 if node.state != cpb.NodeState_NODE_STATE_UP {
89 continue
90 }
91 nodes = append(nodes, node)
92 }
93 sort.Slice(nodes, func(i, j int) bool {
94 return bytes.Compare(nodes[i].pubkey, nodes[j].pubkey) < 0
95 })
96 if len(nodes) > 15 {
97 nodes = nodes[:15]
98 }
99
100 // Build cluster directory.
101 directory := &cpb.ClusterDirectory{
102 Nodes: make([]*cpb.ClusterDirectory_Node, len(nodes)),
103 }
104 for i, node := range nodes {
105 var addresses []*cpb.ClusterDirectory_Node_Address
106 if node.status != nil && node.status.ExternalAddress != "" {
107 addresses = append(addresses, &cpb.ClusterDirectory_Node_Address{
108 Host: node.status.ExternalAddress,
109 })
110 }
111 directory.Nodes[i] = &cpb.ClusterDirectory_Node{
112 PublicKey: node.pubkey,
113 Addresses: addresses,
114 }
115 }
116
117 return &apb.GetClusterInfoResponse{
118 ClusterDirectory: directory,
Serge Bazanskifbd38e22021-10-08 14:41:16 +0200119 CaCertificate: l.node.ClusterCA().Raw,
Serge Bazanskibc671d02021-10-05 17:53:32 +0200120 }, nil
121}
Serge Bazanski56114472021-10-11 14:47:54 +0200122
Mateusz Zalega32b19292022-05-17 13:26:55 +0200123// nodeHeartbeatTimestamp returns the node nid's last heartbeat timestamp, as
124// seen from the Curator leader's perspective. If no heartbeats were received
125// from the node, a zero time.Time value is returned.
126func (l *leaderManagement) nodeHeartbeatTimestamp(nid string) time.Time {
127 smv, ok := l.ls.heartbeatTimestamps.Load(nid)
128 if ok {
129 return smv.(time.Time)
130 }
131 return time.Time{}
132}
133
134// nodeHealth returns the node's health, along with the duration since last
135// heartbeat was received, given a current timestamp.
136func (l *leaderManagement) nodeHealth(node *Node, now time.Time) (apb.Node_Health, time.Duration) {
137 // Get the last received node heartbeat's timestamp.
138 nid := identity.NodeID(node.pubkey)
139 nts := l.nodeHeartbeatTimestamp(nid)
140 // lhb is the duration since the last heartbeat was received.
141 lhb := now.Sub(nts)
142 // Determine the node's health based on the heartbeat timestamp.
143 var nh apb.Node_Health
144 if node.state == cpb.NodeState_NODE_STATE_UP {
145 // Only UP nodes can send heartbeats.
146 switch {
147 // If no heartbeats were received, but the leadership has only just
148 // started, the node's health is unknown.
149 case nts.IsZero() && (now.Sub(l.ls.startTs) < HeartbeatTimeout):
150 nh = apb.Node_UNKNOWN
151 // If the leader had received heartbeats from the node, but the last
152 // heartbeat is stale, the node is timing out.
153 case lhb > HeartbeatTimeout:
154 nh = apb.Node_HEARTBEAT_TIMEOUT
155 // Otherwise, the node can be declared healthy.
156 default:
157 nh = apb.Node_HEALTHY
158 }
159 } else {
160 // Since node isn't UP, its health is unknown. Non-UP nodes can't access
161 // the heartbeat RPC.
162 nh = apb.Node_UNKNOWN
163 }
164 return nh, lhb
165}
166
Serge Bazanski56114472021-10-11 14:47:54 +0200167// GetNodes implements Management.GetNodes, which returns a list of nodes from
168// the point of view of the cluster.
169func (l *leaderManagement) GetNodes(_ *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
170 ctx := srv.Context()
171
172 l.muNodes.Lock()
173 defer l.muNodes.Unlock()
174
175 // Retrieve all nodes from etcd in a single Get call.
176 res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
177 if err != nil {
178 return status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
179 }
180
Mateusz Zalega32b19292022-05-17 13:26:55 +0200181 // Get a singular monotonic timestamp to reference node heartbeat timestamps
182 // against.
183 now := time.Now()
184
Serge Bazanski56114472021-10-11 14:47:54 +0200185 // Convert etcd data into proto nodes, send one streaming response for each
186 // node.
187 kvs := res.Responses[0].GetResponseRange().Kvs
188 for _, kv := range kvs {
189 node, err := nodeUnmarshal(kv.Value)
190 if err != nil {
Serge Bazanski5a637b02022-02-18 12:18:04 +0100191 rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err)
Serge Bazanski56114472021-10-11 14:47:54 +0200192 continue
193 }
194
195 // Convert node roles.
196 roles := &cpb.NodeRoles{}
197 if node.kubernetesWorker != nil {
198 roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{}
199 }
200
Mateusz Zalega32b19292022-05-17 13:26:55 +0200201 // Assess the node's health.
202 health, lhb := l.nodeHealth(node, now)
203
Serge Bazanski56114472021-10-11 14:47:54 +0200204 if err := srv.Send(&apb.Node{
Mateusz Zalega32b19292022-05-17 13:26:55 +0200205 Pubkey: node.pubkey,
206 State: node.state,
207 Status: node.status,
208 Roles: roles,
209 HeartbeatTimestamp: lhb.Nanoseconds(),
210 Health: health,
Serge Bazanski56114472021-10-11 14:47:54 +0200211 }); err != nil {
212 return err
213 }
214 }
215
216 return nil
217}
Serge Bazanski1612d4b2021-11-12 13:54:15 +0100218
219func (l *leaderManagement) ApproveNode(ctx context.Context, req *apb.ApproveNodeRequest) (*apb.ApproveNodeResponse, error) {
220 // MVP: check if policy allows for this node to be approved for this cluster.
221 // This should happen automatically, if possible, via hardware attestation
222 // against policy, not manually.
223
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100224 // Ensure the given key resembles a public key before using it to generate
225 // a node iD. This key is then used to craft an arbitrary etcd path, so
226 // let's do an early check in case the user set something that's obviously
227 // not a public key.
Serge Bazanski1612d4b2021-11-12 13:54:15 +0100228 if len(req.Pubkey) != ed25519.PublicKeySize {
229 return nil, status.Errorf(codes.InvalidArgument, "pubkey must be %d bytes long", ed25519.PublicKeySize)
230 }
231
232 l.muNodes.Lock()
233 defer l.muNodes.Unlock()
234
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100235 // Find node for this pubkey.
Serge Bazanski1612d4b2021-11-12 13:54:15 +0100236 id := identity.NodeID(req.Pubkey)
Serge Bazanski030a5512021-11-18 16:39:39 +0100237 node, err := nodeLoad(ctx, l.leadership, id)
Serge Bazanski1612d4b2021-11-12 13:54:15 +0100238 if err != nil {
Serge Bazanski030a5512021-11-18 16:39:39 +0100239 return nil, err
Serge Bazanski1612d4b2021-11-12 13:54:15 +0100240 }
241
242 // Ensure node is either UP/STANDBY (no-op) or NEW (set to STANDBY).
243 switch node.state {
244 case cpb.NodeState_NODE_STATE_UP, cpb.NodeState_NODE_STATE_STANDBY:
245 // No-op for idempotency.
246 return &apb.ApproveNodeResponse{}, nil
247 case cpb.NodeState_NODE_STATE_NEW:
248 // What we can act on.
249 default:
250 return nil, status.Errorf(codes.FailedPrecondition, "node in state %s cannot be approved", node.state)
251 }
252
Serge Bazanskicb1e4da2021-11-11 16:42:52 +0100253 // Set node to be STANDBY.
Serge Bazanski1612d4b2021-11-12 13:54:15 +0100254 node.state = cpb.NodeState_NODE_STATE_STANDBY
Serge Bazanski030a5512021-11-18 16:39:39 +0100255 if err := nodeSave(ctx, l.leadership, node); err != nil {
256 return nil, err
Serge Bazanski1612d4b2021-11-12 13:54:15 +0100257 }
258
259 return &apb.ApproveNodeResponse{}, nil
260}