| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 1 | package curator | 
 | 2 |  | 
 | 3 | import ( | 
| Serge Bazanski | bc671d0 | 2021-10-05 17:53:32 +0200 | [diff] [blame] | 4 | 	"bytes" | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 5 | 	"context" | 
| Serge Bazanski | 1612d4b | 2021-11-12 13:54:15 +0100 | [diff] [blame] | 6 | 	"crypto/ed25519" | 
| Serge Bazanski | bc671d0 | 2021-10-05 17:53:32 +0200 | [diff] [blame] | 7 | 	"sort" | 
| Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 8 | 	"time" | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 9 |  | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 10 | 	"google.golang.org/grpc/codes" | 
 | 11 | 	"google.golang.org/grpc/status" | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 12 |  | 
| Serge Bazanski | 2f58ac0 | 2021-10-05 11:47:20 +0200 | [diff] [blame] | 13 | 	"source.monogon.dev/metropolis/node/core/identity" | 
| Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 14 | 	"source.monogon.dev/metropolis/node/core/rpc" | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 15 | 	apb "source.monogon.dev/metropolis/proto/api" | 
| Serge Bazanski | bc671d0 | 2021-10-05 17:53:32 +0200 | [diff] [blame] | 16 | 	cpb "source.monogon.dev/metropolis/proto/common" | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 17 | ) | 
 | 18 |  | 
 | 19 | type leaderManagement struct { | 
| Serge Bazanski | 3be4832 | 2021-10-05 17:24:26 +0200 | [diff] [blame] | 20 | 	*leadership | 
| Serge Bazanski | 2f58ac0 | 2021-10-05 11:47:20 +0200 | [diff] [blame] | 21 |  | 
 | 22 | 	// node certificate on which leaderManagement runs. It's used by | 
 | 23 | 	// GetClusterInformation which needs access to the CA pubkey. | 
 | 24 | 	// Alternatively this could be stored in etcd, instead of being dependency | 
 | 25 | 	// injected here. | 
 | 26 | 	node *identity.Node | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 27 | } | 
 | 28 |  | 
 | 29 | const ( | 
 | 30 | 	// registerTicketSize is the size, in bytes, of the RegisterTicket used to | 
 | 31 | 	// perform early perimeter checks for nodes which wish to register into the | 
 | 32 | 	// cluster. | 
 | 33 | 	// | 
 | 34 | 	// The size was picked to offer resistance against on-line bruteforcing attacks | 
 | 35 | 	// in even the worst case scenario (no ratelimiting, no monitoring, zero latency | 
 | 36 | 	// between attacker and cluster). 256 bits of entropy require 3.6e68 requests | 
 | 37 | 	// per second to bruteforce the ticket within 10 years. The ticket doesn't need | 
 | 38 | 	// to be manually copied by humans, so the relatively overkill size also doesn't | 
 | 39 | 	// impact usability. | 
 | 40 | 	registerTicketSize = 32 | 
| Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 41 |  | 
 | 42 | 	// HeartbeatPeriod is the duration between consecutive heartbeat update | 
 | 43 | 	// messages sent by the node. | 
 | 44 | 	HeartbeatInterval = time.Second * 5 | 
 | 45 |  | 
 | 46 | 	// HeartbeatTimeout is the duration after which a node is considered to be | 
 | 47 | 	// timing out, given no recent heartbeat updates were received by the leader. | 
 | 48 | 	HeartbeatTimeout = HeartbeatInterval * 2 | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 49 | ) | 
 | 50 |  | 
 | 51 | const ( | 
 | 52 | 	// registerTicketEtcdPath is the etcd key under which private.RegisterTicket is | 
 | 53 | 	// stored. | 
 | 54 | 	registerTicketEtcdPath = "/global/register_ticket" | 
 | 55 | ) | 
 | 56 |  | 
 | 57 | func (l *leaderManagement) GetRegisterTicket(ctx context.Context, req *apb.GetRegisterTicketRequest) (*apb.GetRegisterTicketResponse, error) { | 
| Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 58 | 	ticket, err := l.ensureRegisterTicket(ctx) | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 59 | 	if err != nil { | 
| Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 60 | 		return nil, err | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 61 | 	} | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 62 | 	return &apb.GetRegisterTicketResponse{ | 
| Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 63 | 		Ticket: ticket, | 
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 64 | 	}, nil | 
 | 65 | } | 
| Serge Bazanski | bc671d0 | 2021-10-05 17:53:32 +0200 | [diff] [blame] | 66 |  | 
| Serge Bazanski | 5611447 | 2021-10-11 14:47:54 +0200 | [diff] [blame] | 67 | // GetClusterInfo implements Management.GetClusterInfo, which returns summary | 
| Serge Bazanski | bc671d0 | 2021-10-05 17:53:32 +0200 | [diff] [blame] | 68 | // information about the Metropolis cluster. | 
 | 69 | func (l *leaderManagement) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (*apb.GetClusterInfoResponse, error) { | 
 | 70 | 	res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range()) | 
 | 71 | 	if err != nil { | 
 | 72 | 		return nil, status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err) | 
 | 73 | 	} | 
 | 74 |  | 
 | 75 | 	// Sort nodes by public key, filter out Up, use top 15 in cluster directory | 
 | 76 | 	// (limited to an arbitrary amount that doesn't overload callers with | 
 | 77 | 	// unnecesssary information). | 
 | 78 | 	// | 
 | 79 | 	// MVP: this should be formalized and possibly re-designed/engineered. | 
 | 80 | 	kvs := res.Responses[0].GetResponseRange().Kvs | 
 | 81 | 	var nodes []*Node | 
 | 82 | 	for _, kv := range kvs { | 
 | 83 | 		node, err := nodeUnmarshal(kv.Value) | 
 | 84 | 		if err != nil { | 
| Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 85 | 			rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err) | 
| Serge Bazanski | bc671d0 | 2021-10-05 17:53:32 +0200 | [diff] [blame] | 86 | 			continue | 
 | 87 | 		} | 
 | 88 | 		if node.state != cpb.NodeState_NODE_STATE_UP { | 
 | 89 | 			continue | 
 | 90 | 		} | 
 | 91 | 		nodes = append(nodes, node) | 
 | 92 | 	} | 
 | 93 | 	sort.Slice(nodes, func(i, j int) bool { | 
 | 94 | 		return bytes.Compare(nodes[i].pubkey, nodes[j].pubkey) < 0 | 
 | 95 | 	}) | 
 | 96 | 	if len(nodes) > 15 { | 
 | 97 | 		nodes = nodes[:15] | 
 | 98 | 	} | 
 | 99 |  | 
 | 100 | 	// Build cluster directory. | 
 | 101 | 	directory := &cpb.ClusterDirectory{ | 
 | 102 | 		Nodes: make([]*cpb.ClusterDirectory_Node, len(nodes)), | 
 | 103 | 	} | 
 | 104 | 	for i, node := range nodes { | 
 | 105 | 		var addresses []*cpb.ClusterDirectory_Node_Address | 
 | 106 | 		if node.status != nil && node.status.ExternalAddress != "" { | 
 | 107 | 			addresses = append(addresses, &cpb.ClusterDirectory_Node_Address{ | 
 | 108 | 				Host: node.status.ExternalAddress, | 
 | 109 | 			}) | 
 | 110 | 		} | 
 | 111 | 		directory.Nodes[i] = &cpb.ClusterDirectory_Node{ | 
 | 112 | 			PublicKey: node.pubkey, | 
 | 113 | 			Addresses: addresses, | 
 | 114 | 		} | 
 | 115 | 	} | 
 | 116 |  | 
 | 117 | 	return &apb.GetClusterInfoResponse{ | 
 | 118 | 		ClusterDirectory: directory, | 
| Serge Bazanski | fbd38e2 | 2021-10-08 14:41:16 +0200 | [diff] [blame] | 119 | 		CaCertificate:    l.node.ClusterCA().Raw, | 
| Serge Bazanski | bc671d0 | 2021-10-05 17:53:32 +0200 | [diff] [blame] | 120 | 	}, nil | 
 | 121 | } | 
| Serge Bazanski | 5611447 | 2021-10-11 14:47:54 +0200 | [diff] [blame] | 122 |  | 
| Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 123 | // nodeHeartbeatTimestamp returns the node nid's last heartbeat timestamp, as | 
 | 124 | // seen from the Curator leader's perspective. If no heartbeats were received | 
 | 125 | // from the node, a zero time.Time value is returned. | 
 | 126 | func (l *leaderManagement) nodeHeartbeatTimestamp(nid string) time.Time { | 
 | 127 | 	smv, ok := l.ls.heartbeatTimestamps.Load(nid) | 
 | 128 | 	if ok { | 
 | 129 | 		return smv.(time.Time) | 
 | 130 | 	} | 
 | 131 | 	return time.Time{} | 
 | 132 | } | 
 | 133 |  | 
 | 134 | // nodeHealth returns the node's health, along with the duration since last | 
 | 135 | // heartbeat was received, given a current timestamp. | 
 | 136 | func (l *leaderManagement) nodeHealth(node *Node, now time.Time) (apb.Node_Health, time.Duration) { | 
 | 137 |   // Get the last received node heartbeat's timestamp. | 
 | 138 | 	nid := identity.NodeID(node.pubkey) | 
 | 139 | 	nts := l.nodeHeartbeatTimestamp(nid) | 
 | 140 | 	// lhb is the duration since the last heartbeat was received. | 
 | 141 | 	lhb := now.Sub(nts) | 
 | 142 | 	// Determine the node's health based on the heartbeat timestamp. | 
 | 143 | 	var nh apb.Node_Health | 
 | 144 | 	if node.state == cpb.NodeState_NODE_STATE_UP { | 
 | 145 | 		// Only UP nodes can send heartbeats. | 
 | 146 | 		switch { | 
 | 147 | 		// If no heartbeats were received, but the leadership has only just | 
 | 148 | 		// started, the node's health is unknown. | 
 | 149 | 		case nts.IsZero() && (now.Sub(l.ls.startTs) < HeartbeatTimeout): | 
 | 150 | 			nh = apb.Node_UNKNOWN | 
 | 151 | 		// If the leader had received heartbeats from the node, but the last | 
 | 152 | 		// heartbeat is stale, the node is timing out. | 
 | 153 | 		case lhb > HeartbeatTimeout: | 
 | 154 | 			nh = apb.Node_HEARTBEAT_TIMEOUT | 
 | 155 | 		// Otherwise, the node can be declared healthy. | 
 | 156 | 		default: | 
 | 157 | 			nh = apb.Node_HEALTHY | 
 | 158 | 		} | 
 | 159 | 	} else { | 
 | 160 | 		// Since node isn't UP, its health is unknown. Non-UP nodes can't access | 
 | 161 | 		// the heartbeat RPC. | 
 | 162 | 		nh = apb.Node_UNKNOWN | 
 | 163 | 	} | 
 | 164 | 	return nh, lhb | 
 | 165 | } | 
 | 166 |  | 
| Serge Bazanski | 5611447 | 2021-10-11 14:47:54 +0200 | [diff] [blame] | 167 | // GetNodes implements Management.GetNodes, which returns a list of nodes from | 
 | 168 | // the point of view of the cluster. | 
 | 169 | func (l *leaderManagement) GetNodes(_ *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error { | 
 | 170 | 	ctx := srv.Context() | 
 | 171 |  | 
 | 172 | 	l.muNodes.Lock() | 
 | 173 | 	defer l.muNodes.Unlock() | 
 | 174 |  | 
 | 175 | 	// Retrieve all nodes from etcd in a single Get call. | 
 | 176 | 	res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range()) | 
 | 177 | 	if err != nil { | 
 | 178 | 		return status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err) | 
 | 179 | 	} | 
 | 180 |  | 
| Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 181 | 	// Get a singular monotonic timestamp to reference node heartbeat timestamps | 
 | 182 | 	// against. | 
 | 183 | 	now := time.Now() | 
 | 184 |  | 
| Serge Bazanski | 5611447 | 2021-10-11 14:47:54 +0200 | [diff] [blame] | 185 | 	// Convert etcd data into proto nodes, send one streaming response for each | 
 | 186 | 	// node. | 
 | 187 | 	kvs := res.Responses[0].GetResponseRange().Kvs | 
 | 188 | 	for _, kv := range kvs { | 
 | 189 | 		node, err := nodeUnmarshal(kv.Value) | 
 | 190 | 		if err != nil { | 
| Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 191 | 			rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err) | 
| Serge Bazanski | 5611447 | 2021-10-11 14:47:54 +0200 | [diff] [blame] | 192 | 			continue | 
 | 193 | 		} | 
 | 194 |  | 
 | 195 | 		// Convert node roles. | 
 | 196 | 		roles := &cpb.NodeRoles{} | 
 | 197 | 		if node.kubernetesWorker != nil { | 
 | 198 | 			roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{} | 
 | 199 | 		} | 
 | 200 |  | 
| Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 201 | 		// Assess the node's health. | 
 | 202 | 		health, lhb := l.nodeHealth(node, now) | 
 | 203 |  | 
| Serge Bazanski | 5611447 | 2021-10-11 14:47:54 +0200 | [diff] [blame] | 204 | 		if err := srv.Send(&apb.Node{ | 
| Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 205 | 			Pubkey:             node.pubkey, | 
 | 206 | 			State:              node.state, | 
 | 207 | 			Status:             node.status, | 
 | 208 | 			Roles:              roles, | 
 | 209 | 			HeartbeatTimestamp: lhb.Nanoseconds(), | 
 | 210 | 			Health:             health, | 
| Serge Bazanski | 5611447 | 2021-10-11 14:47:54 +0200 | [diff] [blame] | 211 | 		}); err != nil { | 
 | 212 | 			return err | 
 | 213 | 		} | 
 | 214 | 	} | 
 | 215 |  | 
 | 216 | 	return nil | 
 | 217 | } | 
| Serge Bazanski | 1612d4b | 2021-11-12 13:54:15 +0100 | [diff] [blame] | 218 |  | 
 | 219 | func (l *leaderManagement) ApproveNode(ctx context.Context, req *apb.ApproveNodeRequest) (*apb.ApproveNodeResponse, error) { | 
 | 220 | 	// MVP: check if policy allows for this node to be approved for this cluster. | 
 | 221 | 	// This should happen automatically, if possible, via hardware attestation | 
 | 222 | 	// against policy, not manually. | 
 | 223 |  | 
| Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 224 | 	// Ensure the given key resembles a public key before using it to generate | 
 | 225 | 	// a node iD. This key is then used to craft an arbitrary etcd path, so | 
 | 226 | 	// let's do an early check in case the user set something that's obviously | 
 | 227 | 	// not a public key. | 
| Serge Bazanski | 1612d4b | 2021-11-12 13:54:15 +0100 | [diff] [blame] | 228 | 	if len(req.Pubkey) != ed25519.PublicKeySize { | 
 | 229 | 		return nil, status.Errorf(codes.InvalidArgument, "pubkey must be %d bytes long", ed25519.PublicKeySize) | 
 | 230 | 	} | 
 | 231 |  | 
 | 232 | 	l.muNodes.Lock() | 
 | 233 | 	defer l.muNodes.Unlock() | 
 | 234 |  | 
| Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 235 | 	// Find node for this pubkey. | 
| Serge Bazanski | 1612d4b | 2021-11-12 13:54:15 +0100 | [diff] [blame] | 236 | 	id := identity.NodeID(req.Pubkey) | 
| Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 237 | 	node, err := nodeLoad(ctx, l.leadership, id) | 
| Serge Bazanski | 1612d4b | 2021-11-12 13:54:15 +0100 | [diff] [blame] | 238 | 	if err != nil { | 
| Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 239 | 		return nil, err | 
| Serge Bazanski | 1612d4b | 2021-11-12 13:54:15 +0100 | [diff] [blame] | 240 | 	} | 
 | 241 |  | 
 | 242 | 	// Ensure node is either UP/STANDBY (no-op) or NEW (set to STANDBY). | 
 | 243 | 	switch node.state { | 
 | 244 | 	case cpb.NodeState_NODE_STATE_UP, cpb.NodeState_NODE_STATE_STANDBY: | 
 | 245 | 		// No-op for idempotency. | 
 | 246 | 		return &apb.ApproveNodeResponse{}, nil | 
 | 247 | 	case cpb.NodeState_NODE_STATE_NEW: | 
 | 248 | 		// What we can act on. | 
 | 249 | 	default: | 
 | 250 | 		return nil, status.Errorf(codes.FailedPrecondition, "node in state %s cannot be approved", node.state) | 
 | 251 | 	} | 
 | 252 |  | 
| Serge Bazanski | cb1e4da | 2021-11-11 16:42:52 +0100 | [diff] [blame] | 253 | 	// Set node to be STANDBY. | 
| Serge Bazanski | 1612d4b | 2021-11-12 13:54:15 +0100 | [diff] [blame] | 254 | 	node.state = cpb.NodeState_NODE_STATE_STANDBY | 
| Serge Bazanski | 030a551 | 2021-11-18 16:39:39 +0100 | [diff] [blame] | 255 | 	if err := nodeSave(ctx, l.leadership, node); err != nil { | 
 | 256 | 		return nil, err | 
| Serge Bazanski | 1612d4b | 2021-11-12 13:54:15 +0100 | [diff] [blame] | 257 | 	} | 
 | 258 |  | 
 | 259 | 	return &apb.ApproveNodeResponse{}, nil | 
 | 260 | } |