m/n/core: implement node heartbeats
This change introduces cluster member node health monitoring by
implementing a bidirectional RPC stream the nodes will periodically
send their heartbeat updates through. Management.GetNodes call was
modified to include the new node health information.
Relevant data available through the management API is non-persistent,
and stored within current Curator leader's local state. As such, it
will become briefly unavailable in an event of leader re-election. The
information returned, however, is guaranteed to be correct.
Change-Id: I916ac48f496941a7decc09d672ecf72a914b0d88
Reviewed-on: https://review.monogon.dev/c/monogon/+/694
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index d6894a2..095ee89 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -6,6 +6,7 @@
"fmt"
"strings"
"sync"
+ "time"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/codes"
@@ -17,6 +18,19 @@
"source.monogon.dev/metropolis/node/core/rpc"
)
+// leaderState is the transient state of the Curator leader. All the
+// information kept inside is lost whenever another leader is elected.
+type leaderState struct {
+ // heartbeatTimestamps maps node IDs to monotonic clock timestamps matching
+ // the last corresponding node heartbeats received by the current Curator
+ // leader.
+ heartbeatTimestamps sync.Map
+
+ // startTs is a local monotonic clock timestamp associated with this node's
+ // assumption of Curator leadership.
+ startTs time.Time
+}
+
// leadership represents the curator leader's ability to perform actions as a
// leader. It is available to all services implemented by the leader.
type leadership struct {
@@ -44,6 +58,9 @@
// muRegisterTicket guards changes to the register ticket. Its usage semantics
// are the same as for muNodes, as described above.
muRegisterTicket sync.Mutex
+
+ // ls contains the current leader's non-persistent local state.
+ ls leaderState
}
var (
@@ -113,6 +130,9 @@
}
func newCuratorLeader(l *leadership, node *identity.Node) *curatorLeader {
+ // Mark the start of this leader's tenure.
+ l.ls.startTs = time.Now()
+
return &curatorLeader{
leaderCurator{leadership: l},
leaderAAA{leadership: l},
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 6f48d45..296e856 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -5,6 +5,7 @@
"crypto/ed25519"
"crypto/subtle"
"fmt"
+ "io"
"net"
"time"
@@ -256,6 +257,35 @@
return &ipb.UpdateNodeStatusResponse{}, nil
}
+func (l *leaderCurator) Heartbeat(stream ipb.Curator_HeartbeatServer) error {
+ // Ensure that the given node_id matches the calling node. We currently
+ // only allow for direct self-reporting of status by nodes.
+ ctx := stream.Context()
+ pi := rpc.GetPeerInfo(ctx)
+ if pi == nil || pi.Node == nil {
+ return status.Error(codes.PermissionDenied, "only nodes can send heartbeats")
+ }
+ id := identity.NodeID(pi.Node.PublicKey)
+
+ for {
+ _, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ // Update the node's timestamp within the local Curator state.
+ l.ls.heartbeatTimestamps.Store(id, time.Now())
+
+ rsp := &ipb.HeartbeatUpdateResponse{}
+ if err := stream.Send(rsp); err != nil {
+ return err
+ }
+ }
+}
+
func (l *leaderCurator) RegisterNode(ctx context.Context, req *ipb.RegisterNodeRequest) (*ipb.RegisterNodeResponse, error) {
// Call is unauthenticated - verify the other side has connected with an
// ephemeral certificate. That certificate's pubkey will become the node's
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index c24a53d..86a1af6 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -5,6 +5,7 @@
"context"
"crypto/ed25519"
"sort"
+ "time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -37,6 +38,14 @@
// to be manually copied by humans, so the relatively overkill size also doesn't
// impact usability.
registerTicketSize = 32
+
+ // HeartbeatPeriod is the duration between consecutive heartbeat update
+ // messages sent by the node.
+ HeartbeatInterval = time.Second * 5
+
+ // HeartbeatTimeout is the duration after which a node is considered to be
+ // timing out, given no recent heartbeat updates were received by the leader.
+ HeartbeatTimeout = HeartbeatInterval * 2
)
const (
@@ -111,6 +120,50 @@
}, nil
}
+// nodeHeartbeatTimestamp returns the node nid's last heartbeat timestamp, as
+// seen from the Curator leader's perspective. If no heartbeats were received
+// from the node, a zero time.Time value is returned.
+func (l *leaderManagement) nodeHeartbeatTimestamp(nid string) time.Time {
+ smv, ok := l.ls.heartbeatTimestamps.Load(nid)
+ if ok {
+ return smv.(time.Time)
+ }
+ return time.Time{}
+}
+
+// nodeHealth returns the node's health, along with the duration since last
+// heartbeat was received, given a current timestamp.
+func (l *leaderManagement) nodeHealth(node *Node, now time.Time) (apb.Node_Health, time.Duration) {
+ // Get the last received node heartbeat's timestamp.
+ nid := identity.NodeID(node.pubkey)
+ nts := l.nodeHeartbeatTimestamp(nid)
+ // lhb is the duration since the last heartbeat was received.
+ lhb := now.Sub(nts)
+ // Determine the node's health based on the heartbeat timestamp.
+ var nh apb.Node_Health
+ if node.state == cpb.NodeState_NODE_STATE_UP {
+ // Only UP nodes can send heartbeats.
+ switch {
+ // If no heartbeats were received, but the leadership has only just
+ // started, the node's health is unknown.
+ case nts.IsZero() && (now.Sub(l.ls.startTs) < HeartbeatTimeout):
+ nh = apb.Node_UNKNOWN
+ // If the leader had received heartbeats from the node, but the last
+ // heartbeat is stale, the node is timing out.
+ case lhb > HeartbeatTimeout:
+ nh = apb.Node_HEARTBEAT_TIMEOUT
+ // Otherwise, the node can be declared healthy.
+ default:
+ nh = apb.Node_HEALTHY
+ }
+ } else {
+ // Since node isn't UP, its health is unknown. Non-UP nodes can't access
+ // the heartbeat RPC.
+ nh = apb.Node_UNKNOWN
+ }
+ return nh, lhb
+}
+
// GetNodes implements Management.GetNodes, which returns a list of nodes from
// the point of view of the cluster.
func (l *leaderManagement) GetNodes(_ *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
@@ -125,6 +178,10 @@
return status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
}
+ // Get a singular monotonic timestamp to reference node heartbeat timestamps
+ // against.
+ now := time.Now()
+
// Convert etcd data into proto nodes, send one streaming response for each
// node.
kvs := res.Responses[0].GetResponseRange().Kvs
@@ -141,11 +198,16 @@
roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{}
}
+ // Assess the node's health.
+ health, lhb := l.nodeHealth(node, now)
+
if err := srv.Send(&apb.Node{
- Pubkey: node.pubkey,
- State: node.state,
- Status: node.status,
- Roles: roles,
+ Pubkey: node.pubkey,
+ State: node.state,
+ Status: node.status,
+ Roles: roles,
+ HeartbeatTimestamp: lhb.Nanoseconds(),
+ Health: health,
}); err != nil {
return err
}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index fffe69c..9fa6bb4 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -10,6 +10,7 @@
"encoding/hex"
"net"
"testing"
+ "time"
"go.etcd.io/etcd/tests/v3/integration"
"google.golang.org/grpc"
@@ -719,6 +720,106 @@
}
}
+// TestClusterHeartbeat exercises curator.Heartbeat and mgmt.GetNodes RPCs by
+// verifying proper node health transitions as affected by leadership changes
+// and timely arrival of node heartbeat updates.
+func TestClusterHeartbeat(t *testing.T) {
+ cl := fakeLeader(t)
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ curator := ipb.NewCuratorClient(cl.localNodeConn)
+ mgmt := apb.NewManagementClient(cl.mgmtConn)
+
+ // expectNode is a helper function that fails if health of the node, as
+ // returned by mgmt.GetNodes call, does not match its argument.
+ expectNode := func(id string, health apb.Node_Health) {
+ t.Helper()
+ res, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+ if err != nil {
+ t.Fatalf("GetNodes failed: %v", err)
+ }
+
+ for {
+ node, err := res.Recv()
+ if err != nil {
+ t.Fatalf("Recv failed: %v", err)
+ }
+ if id != identity.NodeID(node.Pubkey) {
+ continue
+ }
+ if node.Health != health {
+ t.Fatalf("Expected node to be %s, got %s.", health, node.Health)
+ }
+ return
+ }
+ }
+
+ // Test case: the node is UP, and the curator leader has just been elected,
+ // with no recorded node heartbeats. In this case the node's health is
+ // UNKNOWN, since it wasn't given enough time to submit a single heartbeat.
+ cl.l.ls.startTs = time.Now()
+ expectNode(cl.localNodeID, apb.Node_UNKNOWN)
+
+ // Let's turn the clock forward a bit. In this case the node is still UP,
+ // but the current leadership has been assumed more than HeartbeatTimeout
+ // ago. If no heartbeats arrived during this period, the node is timing out.
+ cl.l.ls.startTs = cl.l.ls.startTs.Add(-HeartbeatTimeout)
+ expectNode(cl.localNodeID, apb.Node_HEARTBEAT_TIMEOUT)
+
+ // Now we'll simulate the node sending a couple of heartbeats. The node is
+ // expected to be HEALTHY after the first of them arrives.
+ stream, err := curator.Heartbeat(ctx)
+ if err != nil {
+ t.Fatalf("While initializing heartbeat stream: %v", err)
+ }
+ for i := 0; i < 3; i++ {
+ if err := stream.Send(&ipb.HeartbeatUpdateRequest{}); err != nil {
+ t.Fatalf("While sending a heartbeat: %v", err)
+ }
+
+ _, err := stream.Recv()
+ if err != nil {
+ t.Fatalf("While receiving a heartbeat reply: %v", err)
+ }
+
+ expectNode(cl.localNodeID, apb.Node_HEALTHY)
+ }
+
+ // This case tests timing out from a healthy state. The passage of time is
+ // simulated by an adjustment of curator leader's timestamp entry
+ // corresponding to the tested node's ID.
+ smv, _ := cl.l.ls.heartbeatTimestamps.Load(cl.localNodeID)
+ lts := smv.(time.Time)
+ lts = lts.Add(-HeartbeatTimeout)
+ cl.l.ls.heartbeatTimestamps.Store(cl.localNodeID, lts)
+ expectNode(cl.localNodeID, apb.Node_HEARTBEAT_TIMEOUT)
+
+ // This case verifies that health of non-UP nodes is assessed to be UNKNOWN,
+ // regardless of leadership tenure, since only UP nodes are capable of
+ // sending heartbeats.
+ npub, _, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate node keypair: %v", err)
+ }
+ jpub, _, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate join keypair: %v", err)
+ }
+ cuk := []byte("fakefakefakefakefakefakefakefake")
+ node := Node{
+ clusterUnlockKey: cuk,
+ pubkey: npub,
+ jkey: jpub,
+ state: cpb.NodeState_NODE_STATE_NEW,
+ }
+ if err := nodeSave(ctx, cl.l, &node); err != nil {
+ t.Fatalf("nodeSave failed: %v", err)
+ }
+ expectNode(identity.NodeID(npub), apb.Node_UNKNOWN)
+}
+
// TestManagementClusterInfo exercises GetClusterInfo after setting a status.
func TestManagementClusterInfo(t *testing.T) {
cl := fakeLeader(t)
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index 425051f..01998f9 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -52,6 +52,14 @@
};
}
+ // Heartbeat is used by nodes to periodicall update their heartbeat
+ // timestamps within the current Curator leader.
+ rpc Heartbeat(stream HeartbeatUpdateRequest) returns (stream HeartbeatUpdateResponse) {
+ option (metropolis.proto.ext.authorization) = {
+ need: PERMISSION_UPDATE_NODE_SELF
+ };
+ }
+
// RegisterNode is called by nodes that wish to begin registering into the
// cluster. This will created a 'New' node in the cluster state.
rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {
@@ -200,6 +208,12 @@
message UpdateNodeStatusResponse {
}
+message HeartbeatUpdateRequest {
+}
+
+message HeartbeatUpdateResponse {
+}
+
message RegisterNodeRequest {
// register_ticket is the opaque Register Ticket required from a node to
// begin registering it into a cluster. It's provided to the registering
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 413d9a0..3ac818d 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -9,6 +9,7 @@
"value_kubernetes.go",
"value_node.go",
"worker_controlplane.go",
+ "worker_heartbeat.go",
"worker_kubernetes.go",
"worker_rolefetch.go",
"worker_statuspush.go",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 667564d..82e7cca 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -75,6 +75,7 @@
controlPlane *workerControlPlane
statusPush *workerStatusPush
+ heartbeat *workerHeartbeat
kubernetes *workerKubernetes
rolefetch *workerRoleFetch
}
@@ -99,6 +100,12 @@
clusterMembership: &s.ClusterMembership,
}
+ s.heartbeat = &workerHeartbeat{
+ network: s.Network,
+
+ clusterMembership: &s.ClusterMembership,
+ }
+
s.kubernetes = &workerKubernetes{
network: s.Network,
storageRoot: s.StorageRoot,
@@ -153,6 +160,7 @@
supervisor.Run(ctx, "controlplane", s.controlPlane.run)
supervisor.Run(ctx, "kubernetes", s.kubernetes.run)
supervisor.Run(ctx, "statuspush", s.statusPush.run)
+ supervisor.Run(ctx, "heartbeat", s.heartbeat.run)
supervisor.Run(ctx, "rolefetch", s.rolefetch.run)
supervisor.Signal(ctx, supervisor.SignalHealthy)
diff --git a/metropolis/node/core/roleserve/worker_heartbeat.go b/metropolis/node/core/roleserve/worker_heartbeat.go
new file mode 100644
index 0000000..db06845
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_heartbeat.go
@@ -0,0 +1,65 @@
+package roleserve
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "time"
+
+ "source.monogon.dev/metropolis/node/core/curator"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// workerHeartbeat is a service that periodically updates node's heartbeat
+// timestamps within the cluster.
+type workerHeartbeat struct {
+ network *network.Service
+
+ // clusterMembership will be read.
+ clusterMembership *ClusterMembershipValue
+}
+
+func (s *workerHeartbeat) run(ctx context.Context) error {
+ nw := s.network.Watch()
+ defer nw.Close()
+
+ w := s.clusterMembership.Watch()
+ defer w.Close()
+ supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+ cm, err := w.GetHome(ctx)
+ if err != nil {
+ return err
+ }
+ supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
+
+ conn, err := cm.DialCurator()
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+ cur := ipb.NewCuratorClient(conn)
+
+ stream, err := cur.Heartbeat(ctx)
+ if err != nil {
+ return err
+ }
+
+ for {
+ if err := stream.Send(&ipb.HeartbeatUpdateRequest{}); err != nil {
+ return fmt.Errorf("while sending a heartbeat: %v", err)
+ }
+ next := time.Now().Add(curator.HeartbeatTimeout)
+
+ _, err := stream.Recv()
+ if err == io.EOF {
+ return fmt.Errorf("stream closed by the server. Restarting worker...")
+ }
+ if err != nil {
+ return fmt.Errorf("while receiving a heartbeat reply: %v", err)
+ }
+
+ time.Sleep(time.Until(next))
+ }
+}
diff --git a/metropolis/proto/api/management.proto b/metropolis/proto/api/management.proto
index ba50849..88cab77 100644
--- a/metropolis/proto/api/management.proto
+++ b/metropolis/proto/api/management.proto
@@ -106,8 +106,28 @@
metropolis.proto.common.NodeStatus status = 3;
// Roles assigned by the cluster. This is always set.
metropolis.proto.common.NodeRoles roles = 4;
-}
+ // Health describes node's health as seen from the cluster perspective.
+ enum Health {
+ INVALID = 0;
+ // UNKNOWN is used whenever there were no heartbeats received from a
+ // given node AND too little time has passed since last Curator leader
+ // election to know whether the node is actually timing out. UNKNOWN
+ // is also returned for nodes which NodeState does not equal
+ // NODE_STATE_UP.
+ UNKNOWN = 1;
+ // HEALTHY describes nodes that have sent a heartbeat recently.
+ HEALTHY = 2;
+ // HEARTBEAT_TIMEOUT describes nodes that have not sent a heartbeat in
+ // the interval specified by curator.HeartbeatTimeout.
+ HEARTBEAT_TIMEOUT = 3;
+ }
+ Health health = 5;
+ // heartbeat_timestamp is the duration since the last of the node's
+ // heartbeats was received, expressed in nanoseconds. It equals zero if no
+ // heartbeats were received.
+ int64 heartbeat_timestamp = 6;
+}
message ApproveNodeRequest {
// Raw public key of the node being approved, has to correspond to a node
@@ -116,4 +136,4 @@
}
message ApproveNodeResponse {
-}
\ No newline at end of file
+}
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index c0af1fd..89df286 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -20,6 +20,7 @@
"context"
"errors"
"fmt"
+ "io"
"log"
"net"
"net/http"
@@ -149,6 +150,40 @@
}
return nil
})
+ testEventual(t, "Heartbeat test successful", ctx, 60*time.Second, func(ctx context.Context) error {
+ // Ensure all cluster nodes are capable of sending heartbeat updates.
+ // This test assumes the expected count of nodes is already present in
+ // the cluster.
+ for {
+ srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+ if err != nil {
+ return fmt.Errorf("GetNodes: %w", err)
+ }
+
+ // Count the unhealthy nodes.
+ var unhealthy int
+ for {
+ node, err := srvN.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("GetNodes.Recv: %w", err)
+ }
+
+ if node.Health != apb.Node_HEALTHY {
+ unhealthy++
+ }
+ }
+
+ // If all nodes tested in this iteration are healthy, the test has
+ // been passed.
+ if unhealthy == 0 {
+ break
+ }
+ }
+ return nil
+ })
})
t.Run("Kubernetes", func(t *testing.T) {
t.Parallel()