m/n/c/roleserve: implement ClusterAgent

The ClusterAgent is a runnable that is scheduled to run on all cluster
nodes. It's currently used to report the current node status to the
Cluster, and in the future can be used to implement hearbeat detection
for nodes.

Change-Id: Iff394e2cc37064d1e42fd27e40884dda83d88418
Reviewed-on: https://review.monogon.dev/c/monogon/+/341
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 405efb3..273cbd5 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -3,6 +3,7 @@
 go_library(
     name = "go_default_library",
     srcs = [
+        "cluster_agent.go",
         "kubernetes_worker.go",
         "roleserve.go",
     ],
@@ -18,6 +19,7 @@
         "//metropolis/pkg/event:go_default_library",
         "//metropolis/pkg/event/memory:go_default_library",
         "//metropolis/pkg/supervisor:go_default_library",
+        "//metropolis/proto/common:go_default_library",
         "@org_golang_google_grpc//:go_default_library",
     ],
 )
diff --git a/metropolis/node/core/roleserve/cluster_agent.go b/metropolis/node/core/roleserve/cluster_agent.go
new file mode 100644
index 0000000..5e9c45d
--- /dev/null
+++ b/metropolis/node/core/roleserve/cluster_agent.go
@@ -0,0 +1,48 @@
+package roleserve
+
+import (
+	"context"
+	"fmt"
+	"net"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// runClusterAgent runs the ClusterAgent, a runnable responsible for reporting
+// the status of the local node to the cluster.
+//
+// This currently only reports the node's external address to the cluster
+// whenever it changes.
+func (s *Service) runClusterAgent(ctx context.Context) error {
+	w := s.Network.Watch()
+	defer w.Close()
+
+	var external net.IP
+
+	for {
+		st, err := w.Get(ctx)
+		if err != nil {
+			return fmt.Errorf("getting network status failed: %w", err)
+		}
+
+		if external.Equal(st.ExternalAddress) {
+			continue
+		}
+
+		external = st.ExternalAddress
+		supervisor.Logger(ctx).Infof("New external address (%s), submitting update to cluster...", external.String())
+
+		_, err = s.curator.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
+			NodeId: s.NodeID,
+			Status: &cpb.NodeStatus{
+				ExternalAddress: external.String(),
+			},
+		})
+		if err != nil {
+			return fmt.Errorf("UpdateNodeStatus failed: %w", err)
+		}
+		supervisor.Logger(ctx).Infof("Updated.")
+	}
+}
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 7107674..d7e3d2f 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -127,6 +127,10 @@
 		return fmt.Errorf("failed to launch updater: %w", err)
 	}
 
+	if err := supervisor.Run(ctx, "cluster-agent", s.runClusterAgent); err != nil {
+		return fmt.Errorf("failed to launch cluster agent: %w", err)
+	}
+
 	if err := supervisor.Run(ctx, "kubernetes-worker", s.runKubernetesWorkerLauncher); err != nil {
 		return fmt.Errorf("failed to start kubernetes-worker launcher: %w", err)
 	}
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index d1102b1..2838af3 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -132,6 +132,33 @@
 					t.Fatalf("Received certificate for wrong private key")
 				}
 
+				// Connect to management endpoint and retrieve cluster directory.
+				authClient, err := rpc.NewAuthenticatedClient(remote, *cert, nil)
+				if err != nil {
+					return fmt.Errorf("NewAuthenticatedClient: %w", err)
+				}
+				mgmt := apb.NewManagementClient(authClient)
+				res, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+				if err != nil {
+					return fmt.Errorf("GetClusterInfo: %w", err)
+				}
+
+				// Ensure the node is there with its address.
+				nodes := res.ClusterDirectory.Nodes
+				if want, got := 1, len(nodes); want != got {
+					return fmt.Errorf("wanted %d nodes in cluster directory, got %d", want, got)
+				}
+				node := nodes[0]
+				if want, got := ed25519.PublicKeySize, len(node.PublicKey); want != got {
+					return fmt.Errorf("wanted %d bytes long public key, got %d", want, got)
+				}
+				if want, got := 1, len(node.Addresses); want != got {
+					return fmt.Errorf("wanted %d node address, got %d", want, got)
+				}
+				if want, got := "10.42.0.10", node.Addresses[0].Host; want != got {
+					return fmt.Errorf("wanted status address %q, got %q", want, got)
+				}
+
 				return nil
 			})
 		})