m/node/core: run hostsfile from roleserver, provide feedback on cluster directory
Not providing a heartbeat and status until we save a cluster directory
to ESP is a quick and dirty way to make sure we don't mark a node as
HEALTHY until it has performed the bare minimum of setup to be
rebootable.
This is important in our E2E tests to reduce flakiness.
In the future we should have some node status field or general 'sync'
API exposed, but this will do for now.
Change-Id: Ibad9e91f01abeacdfe4400ef7cb36ca17f68ba0a
Reviewed-on: https://review.monogon.dev/c/monogon/+/1498
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 1818bf4..4a79349 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -11,6 +11,7 @@
"worker_clusternet.go",
"worker_controlplane.go",
"worker_heartbeat.go",
+ "worker_hostsfile.go",
"worker_kubernetes.go",
"worker_nodemgmt.go",
"worker_rolefetch.go",
@@ -28,6 +29,7 @@
"//metropolis/node/core/localstorage",
"//metropolis/node/core/mgmt",
"//metropolis/node/core/network",
+ "//metropolis/node/core/network/hostsfile",
"//metropolis/node/core/rpc",
"//metropolis/node/core/rpc/resolver",
"//metropolis/node/kubernetes",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 86b5b5a..a665044 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -80,11 +80,12 @@
type Service struct {
Config
- ClusterMembership memory.Value[*ClusterMembership]
- KubernetesStatus memory.Value[*KubernetesStatus]
- bootstrapData memory.Value[*bootstrapData]
- localRoles memory.Value[*cpb.NodeRoles]
- podNetwork memory.Value[*clusternet.Prefixes]
+ ClusterMembership memory.Value[*ClusterMembership]
+ KubernetesStatus memory.Value[*KubernetesStatus]
+ bootstrapData memory.Value[*bootstrapData]
+ localRoles memory.Value[*cpb.NodeRoles]
+ podNetwork memory.Value[*clusternet.Prefixes]
+ clusterDirectorySaved memory.Value[bool]
controlPlane *workerControlPlane
statusPush *workerStatusPush
@@ -93,6 +94,7 @@
rolefetch *workerRoleFetch
nodeMgmt *workerNodeMgmt
clusternet *workerClusternet
+ hostsfile *workerHostsfile
}
// New creates a Role Server services from a Config.
@@ -112,7 +114,8 @@
s.statusPush = &workerStatusPush{
network: s.Network,
- clusterMembership: &s.ClusterMembership,
+ clusterMembership: &s.ClusterMembership,
+ clusterDirectorySaved: &s.clusterDirectorySaved,
}
s.heartbeat = &workerHeartbeat{
@@ -142,6 +145,7 @@
clusterMembership: &s.ClusterMembership,
logTree: s.LogTree,
}
+
s.clusternet = &workerClusternet{
storageRoot: s.StorageRoot,
@@ -149,6 +153,13 @@
podNetwork: &s.podNetwork,
}
+ s.hostsfile = &workerHostsfile{
+ storageRoot: s.StorageRoot,
+ network: s.Network,
+ clusterMembership: &s.ClusterMembership,
+ clusterDirectorySaved: &s.clusterDirectorySaved,
+ }
+
return s
}
@@ -197,6 +208,7 @@
pubkey: credentials.PublicKey(),
resolver: s.Resolver,
})
+ s.clusterDirectorySaved.Set(true)
}
// Run the Role Server service, which uses intermediary workload launchers to
@@ -209,6 +221,7 @@
supervisor.Run(ctx, "rolefetch", s.rolefetch.run)
supervisor.Run(ctx, "nodemgmt", s.nodeMgmt.run)
supervisor.Run(ctx, "clusternet", s.clusternet.run)
+ supervisor.Run(ctx, "hostsfile", s.hostsfile.run)
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
diff --git a/metropolis/node/core/roleserve/worker_hostsfile.go b/metropolis/node/core/roleserve/worker_hostsfile.go
new file mode 100644
index 0000000..6cc35cc
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_hostsfile.go
@@ -0,0 +1,62 @@
+package roleserve
+
+import (
+ "context"
+
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/node/core/network/hostsfile"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+// workerHostsfile run the //metropolis/node/core/network/hostsfile service,
+// which in turn populates /etc/hosts, /etc/machine-id and updates the ESP-stored
+// ClusterDirectory (used to Join the cluster after a machine reboots).
+type workerHostsfile struct {
+ storageRoot *localstorage.Root
+
+ // network will be read. It provides data about the local node's address.
+ network *network.Service
+ // clusterMembership will be read. It provides data about the node identity but
+ // also provides access to the rest of the cluster's data via the Curator API.
+ clusterMembership *memory.Value[*ClusterMembership]
+
+ // clusterDirectorySaved will be written. A value of true indicates that the
+ // cluster directory has been successfully written at least once to the ESP.
+ clusterDirectorySaved *memory.Value[bool]
+}
+
+func (s *workerHostsfile) run(ctx context.Context) error {
+ w := s.clusterMembership.Watch()
+ defer w.Close()
+ supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+ cm, err := w.Get(ctx, FilterHome())
+ if err != nil {
+ return err
+ }
+ supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
+
+ // TODO(issues/193): stop dialing the curator everywhere.
+ conn, err := cm.DialCurator()
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+ cur := ipb.NewCuratorClient(conn)
+
+ svc := hostsfile.Service{
+ Config: hostsfile.Config{
+ Network: s.network,
+ Ephemeral: &s.storageRoot.Ephemeral,
+ ESP: &s.storageRoot.ESP,
+ NodeID: cm.NodeID(),
+ Curator: cur,
+ ClusterDirectorySaved: s.clusterDirectorySaved,
+ },
+ }
+
+ return svc.Run(ctx)
+}
diff --git a/metropolis/node/core/roleserve/worker_statuspush.go b/metropolis/node/core/roleserve/worker_statuspush.go
index ed806a0..4cea7c9 100644
--- a/metropolis/node/core/roleserve/worker_statuspush.go
+++ b/metropolis/node/core/roleserve/worker_statuspush.go
@@ -8,10 +8,12 @@
"google.golang.org/protobuf/encoding/prototext"
common "source.monogon.dev/metropolis/node"
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -22,6 +24,8 @@
// clusterMembership will be read.
clusterMembership *memory.Value[*ClusterMembership]
+ // clusterDirectorySaved will be read.
+ clusterDirectorySaved *memory.Value[bool]
}
// workerStatusPushChannels contain all the channels between the status pusher's
@@ -132,6 +136,16 @@
supervisor.Run(ctx, "map-cluster-membership", func(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
+ // Do not submit heartbeat until we've got the cluster directory saved.
+ //
+ // TODO(q3k): add a node status field for this instead.
+ supervisor.Logger(ctx).Infof("Waiting for cluster directory to be saved...")
+ cdw := s.clusterDirectorySaved.Watch()
+ _, err := cdw.Get(ctx, event.Filter(func(t bool) bool { return t }))
+ if err != nil {
+ return fmt.Errorf("getting cluster directory state failed: %w", err)
+ }
+
var conn *grpc.ClientConn
defer func() {
if conn != nil {
@@ -141,7 +155,7 @@
w := s.clusterMembership.Watch()
defer w.Close()
- supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+ supervisor.Logger(ctx).Infof("Cluster directory saved, waiting for cluster membership...")
for {
cm, err := w.Get(ctx, FilterHome())
if err != nil {