m/n/c/roleserve: implement ClusterAgent
The ClusterAgent is a runnable that is scheduled to run on all cluster
nodes. It's currently used to report the current node status to the
Cluster, and in the future can be used to implement hearbeat detection
for nodes.
Change-Id: Iff394e2cc37064d1e42fd27e40884dda83d88418
Reviewed-on: https://review.monogon.dev/c/monogon/+/341
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 405efb3..273cbd5 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -3,6 +3,7 @@
go_library(
name = "go_default_library",
srcs = [
+ "cluster_agent.go",
"kubernetes_worker.go",
"roleserve.go",
],
@@ -18,6 +19,7 @@
"//metropolis/pkg/event:go_default_library",
"//metropolis/pkg/event/memory:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
+ "//metropolis/proto/common:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)
diff --git a/metropolis/node/core/roleserve/cluster_agent.go b/metropolis/node/core/roleserve/cluster_agent.go
new file mode 100644
index 0000000..5e9c45d
--- /dev/null
+++ b/metropolis/node/core/roleserve/cluster_agent.go
@@ -0,0 +1,48 @@
+package roleserve
+
+import (
+ "context"
+ "fmt"
+ "net"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// runClusterAgent runs the ClusterAgent, a runnable responsible for reporting
+// the status of the local node to the cluster.
+//
+// This currently only reports the node's external address to the cluster
+// whenever it changes.
+func (s *Service) runClusterAgent(ctx context.Context) error {
+ w := s.Network.Watch()
+ defer w.Close()
+
+ var external net.IP
+
+ for {
+ st, err := w.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("getting network status failed: %w", err)
+ }
+
+ if external.Equal(st.ExternalAddress) {
+ continue
+ }
+
+ external = st.ExternalAddress
+ supervisor.Logger(ctx).Infof("New external address (%s), submitting update to cluster...", external.String())
+
+ _, err = s.curator.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
+ NodeId: s.NodeID,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: external.String(),
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("UpdateNodeStatus failed: %w", err)
+ }
+ supervisor.Logger(ctx).Infof("Updated.")
+ }
+}
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 7107674..d7e3d2f 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -127,6 +127,10 @@
return fmt.Errorf("failed to launch updater: %w", err)
}
+ if err := supervisor.Run(ctx, "cluster-agent", s.runClusterAgent); err != nil {
+ return fmt.Errorf("failed to launch cluster agent: %w", err)
+ }
+
if err := supervisor.Run(ctx, "kubernetes-worker", s.runKubernetesWorkerLauncher); err != nil {
return fmt.Errorf("failed to start kubernetes-worker launcher: %w", err)
}