blob: eac4635d6aaec0e3c61872c2bffc08d519754277 [file] [log] [blame]
package curator
import (
"bytes"
"context"
"crypto/ed25519"
"sort"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
dpb "google.golang.org/protobuf/types/known/durationpb"
"source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
type leaderManagement struct {
*leadership
// node certificate on which leaderManagement runs. It's used by
// GetClusterInformation which needs access to the CA pubkey.
// Alternatively this could be stored in etcd, instead of being dependency
// injected here.
node *identity.Node
}
const (
// registerTicketSize is the size, in bytes, of the RegisterTicket used to
// perform early perimeter checks for nodes which wish to register into the
// cluster.
//
// The size was picked to offer resistance against on-line bruteforcing attacks
// in even the worst case scenario (no ratelimiting, no monitoring, zero latency
// between attacker and cluster). 256 bits of entropy require 3.6e68 requests
// per second to bruteforce the ticket within 10 years. The ticket doesn't need
// to be manually copied by humans, so the relatively overkill size also doesn't
// impact usability.
registerTicketSize = 32
// HeartbeatPeriod is the duration between consecutive heartbeat update
// messages sent by the node.
HeartbeatInterval = time.Second * 5
// HeartbeatTimeout is the duration after which a node is considered to be
// timing out, given no recent heartbeat updates were received by the leader.
HeartbeatTimeout = HeartbeatInterval * 2
)
const (
// registerTicketEtcdPath is the etcd key under which private.RegisterTicket is
// stored.
registerTicketEtcdPath = "/global/register_ticket"
)
func (l *leaderManagement) GetRegisterTicket(ctx context.Context, req *apb.GetRegisterTicketRequest) (*apb.GetRegisterTicketResponse, error) {
ticket, err := l.ensureRegisterTicket(ctx)
if err != nil {
return nil, err
}
return &apb.GetRegisterTicketResponse{
Ticket: ticket,
}, nil
}
// GetClusterInfo implements Management.GetClusterInfo, which returns summary
// information about the Metropolis cluster.
func (l *leaderManagement) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (*apb.GetClusterInfoResponse, error) {
res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
}
// Sort nodes by public key, filter out Up, use top 15 in cluster directory
// (limited to an arbitrary amount that doesn't overload callers with
// unnecesssary information).
//
// MVP: this should be formalized and possibly re-designed/engineered.
kvs := res.Responses[0].GetResponseRange().Kvs
var nodes []*Node
for _, kv := range kvs {
node, err := nodeUnmarshal(kv.Value)
if err != nil {
rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err)
continue
}
if node.state != cpb.NodeState_NODE_STATE_UP {
continue
}
nodes = append(nodes, node)
}
sort.Slice(nodes, func(i, j int) bool {
return bytes.Compare(nodes[i].pubkey, nodes[j].pubkey) < 0
})
if len(nodes) > 15 {
nodes = nodes[:15]
}
// Build cluster directory.
directory := &cpb.ClusterDirectory{
Nodes: make([]*cpb.ClusterDirectory_Node, len(nodes)),
}
for i, node := range nodes {
var addresses []*cpb.ClusterDirectory_Node_Address
if node.status != nil && node.status.ExternalAddress != "" {
addresses = append(addresses, &cpb.ClusterDirectory_Node_Address{
Host: node.status.ExternalAddress,
})
}
directory.Nodes[i] = &cpb.ClusterDirectory_Node{
PublicKey: node.pubkey,
Addresses: addresses,
}
}
resp := &apb.GetClusterInfoResponse{
ClusterDirectory: directory,
CaCertificate: l.node.ClusterCA().Raw,
}
cl, err := clusterLoad(ctx, l.leadership)
if err == nil {
resp.ClusterConfiguration, _ = cl.proto()
}
return resp, nil
}
// nodeHeartbeatTimestamp returns the node nid's last heartbeat timestamp, as
// seen from the Curator leader's perspective. If no heartbeats were received
// from the node, a zero time.Time value is returned.
func (l *leaderManagement) nodeHeartbeatTimestamp(nid string) time.Time {
smv, ok := l.ls.heartbeatTimestamps.Load(nid)
if ok {
return smv.(time.Time)
}
return time.Time{}
}
// nodeHealth returns the node's health, along with the duration since last
// heartbeat was received, given a current timestamp.
func (l *leaderManagement) nodeHealth(node *Node, now time.Time) (apb.Node_Health, time.Duration) {
// Get the last received node heartbeat's timestamp.
nid := identity.NodeID(node.pubkey)
nts := l.nodeHeartbeatTimestamp(nid)
// lhb is the duration since the last heartbeat was received.
lhb := now.Sub(nts)
// Determine the node's health based on the heartbeat timestamp.
var nh apb.Node_Health
if node.state == cpb.NodeState_NODE_STATE_UP {
// Only UP nodes can send heartbeats.
switch {
// If no heartbeats were received, but the leadership has only just
// started, the node's health is unknown.
case nts.IsZero() && (now.Sub(l.ls.startTs) < HeartbeatTimeout):
nh = apb.Node_UNKNOWN
// If the leader had received heartbeats from the node, but the last
// heartbeat is stale, the node is timing out.
case lhb > HeartbeatTimeout:
nh = apb.Node_HEARTBEAT_TIMEOUT
// Otherwise, the node can be declared healthy.
default:
nh = apb.Node_HEALTHY
}
} else {
// Since node isn't UP, its health is unknown. Non-UP nodes can't access
// the heartbeat RPC.
nh = apb.Node_UNKNOWN
}
return nh, lhb
}
// GetNodes implements Management.GetNodes, which returns a list of nodes from
// the point of view of the cluster.
func (l *leaderManagement) GetNodes(req *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
ctx := srv.Context()
l.muNodes.Lock()
defer l.muNodes.Unlock()
// Retrieve all nodes from etcd in a single Get call.
res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
if err != nil {
return status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
}
// Create a CEL filter program, to be used in the reply loop below.
filter, err := buildNodeFilter(ctx, req.Filter)
if err != nil {
return err
}
// Get a singular monotonic timestamp to reference node heartbeat timestamps
// against.
now := time.Now()
// Convert etcd data into proto nodes, send one streaming response for each
// node.
kvs := res.Responses[0].GetResponseRange().Kvs
for _, kv := range kvs {
node, err := nodeUnmarshal(kv.Value)
if err != nil {
rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err)
continue
}
// Convert node roles.
roles := &cpb.NodeRoles{}
if node.kubernetesController != nil {
roles.KubernetesController = &cpb.NodeRoles_KubernetesController{}
}
if node.kubernetesWorker != nil {
roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{}
}
if node.consensusMember != nil {
roles.ConsensusMember = &cpb.NodeRoles_ConsensusMember{}
}
// Assess the node's health.
health, lhb := l.nodeHealth(node, now)
entry := apb.Node{
Pubkey: node.pubkey,
Id: identity.NodeID(node.pubkey),
State: node.state,
Status: node.status,
Roles: roles,
TimeSinceHeartbeat: dpb.New(lhb),
Health: health,
TpmUsage: node.tpmUsage,
}
// Evaluate the filter expression for this node. Send the node, if it's
// kept by the filter.
keep, err := filter(ctx, &entry)
if err != nil {
return err
}
if !keep {
continue
}
if err := srv.Send(&entry); err != nil {
return err
}
}
return nil
}
func (l *leaderManagement) ApproveNode(ctx context.Context, req *apb.ApproveNodeRequest) (*apb.ApproveNodeResponse, error) {
// MVP: check if policy allows for this node to be approved for this cluster.
// This should happen automatically, if possible, via hardware attestation
// against policy, not manually.
// Ensure the given key resembles a public key before using it to generate
// a node iD. This key is then used to craft an arbitrary etcd path, so
// let's do an early check in case the user set something that's obviously
// not a public key.
if len(req.Pubkey) != ed25519.PublicKeySize {
return nil, status.Errorf(codes.InvalidArgument, "pubkey must be %d bytes long", ed25519.PublicKeySize)
}
l.muNodes.Lock()
defer l.muNodes.Unlock()
// Find node for this pubkey.
id := identity.NodeID(req.Pubkey)
node, err := nodeLoad(ctx, l.leadership, id)
if err != nil {
return nil, err
}
// Ensure node is either UP/STANDBY (no-op) or NEW (set to STANDBY).
switch node.state {
case cpb.NodeState_NODE_STATE_UP, cpb.NodeState_NODE_STATE_STANDBY:
// No-op for idempotency.
return &apb.ApproveNodeResponse{}, nil
case cpb.NodeState_NODE_STATE_NEW:
// What we can act on.
default:
return nil, status.Errorf(codes.FailedPrecondition, "node in state %s cannot be approved", node.state)
}
// Set node to be STANDBY.
node.state = cpb.NodeState_NODE_STATE_STANDBY
if err := nodeSave(ctx, l.leadership, node); err != nil {
return nil, err
}
return &apb.ApproveNodeResponse{}, nil
}
// UpdateNodeRoles implements Management.UpdateNodeRoles, which in addition to
// adjusting the affected node's representation within the cluster, can also
// trigger the addition of a new etcd learner node.
func (l *leaderManagement) UpdateNodeRoles(ctx context.Context, req *apb.UpdateNodeRolesRequest) (*apb.UpdateNodeRolesResponse, error) {
// Nodes are identifiable by either of their public keys or (string) node IDs.
// In case a public key was provided, convert it to a corresponding node ID
// here.
var id string
switch rid := req.Node.(type) {
case *apb.UpdateNodeRolesRequest_Pubkey:
if len(rid.Pubkey) != ed25519.PublicKeySize {
return nil, status.Errorf(codes.InvalidArgument, "pubkey must be %d bytes long", ed25519.PublicKeySize)
}
// Convert the pubkey into node ID.
id = identity.NodeID(rid.Pubkey)
case *apb.UpdateNodeRolesRequest_Id:
id = rid.Id
default:
return nil, status.Errorf(codes.InvalidArgument, "exactly one of pubkey or id must be set")
}
// Take l.muNodes before modifying the node.
l.muNodes.Lock()
defer l.muNodes.Unlock()
// Find the node matching the requested public key.
node, err := nodeLoad(ctx, l.leadership, id)
if err == errNodeNotFound {
return nil, status.Errorf(codes.NotFound, "node %s not found", id)
}
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "while loading node %s: %v", id, err)
}
// Adjust each role, if a corresponding value is set within the request. Do
// nothing, if the role is already matches the requested value.
if req.ConsensusMember != nil {
if *req.ConsensusMember {
// Add a new etcd learner node.
w := l.consensus.Watch()
defer w.Close()
st, err := w.Get(ctx, consensus.FilterRunning)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not get running consensus: %v", err)
}
join, err := st.AddNode(ctx, node.pubkey)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not add node: %v", err)
}
// Modify the node's state to reflect the change.
node.EnableConsensusMember(join)
} else {
if node.kubernetesController != nil {
return nil, status.Errorf(codes.FailedPrecondition, "could not remove consensus member role while node is a kubernetes controller")
}
node.DisableConsensusMember()
}
}
if req.KubernetesController != nil {
if *req.KubernetesController {
if node.consensusMember == nil {
return nil, status.Errorf(codes.FailedPrecondition, "could not set role: Kubernetes controller nodes must also be consensus members")
}
node.EnableKubernetesController()
} else {
node.DisableKubernetesController()
}
}
if req.KubernetesWorker != nil {
if *req.KubernetesWorker {
node.EnableKubernetesWorker()
} else {
node.DisableKubernetesWorker()
}
}
if err := nodeSave(ctx, l.leadership, node); err != nil {
return nil, err
}
return &apb.UpdateNodeRolesResponse{}, nil
}