m/n/core: implement node heartbeats

This change introduces cluster member node health monitoring by
implementing a bidirectional RPC stream the nodes will periodically
send their heartbeat updates through. Management.GetNodes call was
modified to include the new node health information.

Relevant data available through the management API is non-persistent,
and stored within current Curator leader's local state. As such, it
will become briefly unavailable in an event of leader re-election. The
information returned, however, is guaranteed to be correct.

Change-Id: I916ac48f496941a7decc09d672ecf72a914b0d88
Reviewed-on: https://review.monogon.dev/c/monogon/+/694
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 413d9a0..3ac818d 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -9,6 +9,7 @@
         "value_kubernetes.go",
         "value_node.go",
         "worker_controlplane.go",
+        "worker_heartbeat.go",
         "worker_kubernetes.go",
         "worker_rolefetch.go",
         "worker_statuspush.go",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 667564d..82e7cca 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -75,6 +75,7 @@
 
 	controlPlane *workerControlPlane
 	statusPush   *workerStatusPush
+	heartbeat    *workerHeartbeat
 	kubernetes   *workerKubernetes
 	rolefetch    *workerRoleFetch
 }
@@ -99,6 +100,12 @@
 		clusterMembership: &s.ClusterMembership,
 	}
 
+	s.heartbeat = &workerHeartbeat{
+		network: s.Network,
+
+		clusterMembership: &s.ClusterMembership,
+	}
+
 	s.kubernetes = &workerKubernetes{
 		network:     s.Network,
 		storageRoot: s.StorageRoot,
@@ -153,6 +160,7 @@
 	supervisor.Run(ctx, "controlplane", s.controlPlane.run)
 	supervisor.Run(ctx, "kubernetes", s.kubernetes.run)
 	supervisor.Run(ctx, "statuspush", s.statusPush.run)
+	supervisor.Run(ctx, "heartbeat", s.heartbeat.run)
 	supervisor.Run(ctx, "rolefetch", s.rolefetch.run)
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
diff --git a/metropolis/node/core/roleserve/worker_heartbeat.go b/metropolis/node/core/roleserve/worker_heartbeat.go
new file mode 100644
index 0000000..db06845
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_heartbeat.go
@@ -0,0 +1,65 @@
+package roleserve
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"time"
+
+	"source.monogon.dev/metropolis/node/core/curator"
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// workerHeartbeat is a service that periodically updates node's heartbeat
+// timestamps within the cluster.
+type workerHeartbeat struct {
+	network *network.Service
+
+	// clusterMembership will be read.
+	clusterMembership *ClusterMembershipValue
+}
+
+func (s *workerHeartbeat) run(ctx context.Context) error {
+	nw := s.network.Watch()
+	defer nw.Close()
+
+	w := s.clusterMembership.Watch()
+	defer w.Close()
+	supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+	cm, err := w.GetHome(ctx)
+	if err != nil {
+		return err
+	}
+	supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
+
+	conn, err := cm.DialCurator()
+	if err != nil {
+		return err
+	}
+	defer conn.Close()
+	cur := ipb.NewCuratorClient(conn)
+
+	stream, err := cur.Heartbeat(ctx)
+	if err != nil {
+		return err
+	}
+
+	for {
+		if err := stream.Send(&ipb.HeartbeatUpdateRequest{}); err != nil {
+			return fmt.Errorf("while sending a heartbeat: %v", err)
+		}
+		next := time.Now().Add(curator.HeartbeatTimeout)
+
+		_, err := stream.Recv()
+		if err == io.EOF {
+			return fmt.Errorf("stream closed by the server. Restarting worker...")
+		}
+		if err != nil {
+			return fmt.Errorf("while receiving a heartbeat reply: %v", err)
+		}
+
+		time.Sleep(time.Until(next))
+	}
+}