m/node/core: run hostsfile from roleserver, provide feedback on cluster directory
Not providing a heartbeat and status until we save a cluster directory
to ESP is a quick and dirty way to make sure we don't mark a node as
HEALTHY until it has performed the bare minimum of setup to be
rebootable.
This is important in our E2E tests to reduce flakiness.
In the future we should have some node status field or general 'sync'
API exposed, but this will do for now.
Change-Id: Ibad9e91f01abeacdfe4400ef7cb36ca17f68ba0a
Reviewed-on: https://review.monogon.dev/c/monogon/+/1498
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/node/core/BUILD.bazel b/metropolis/node/core/BUILD.bazel
index 07f4f6e..142f090 100644
--- a/metropolis/node/core/BUILD.bazel
+++ b/metropolis/node/core/BUILD.bazel
@@ -27,7 +27,6 @@
"//metropolis/node/core/localstorage/declarative",
"//metropolis/node/core/mgmt",
"//metropolis/node/core/network",
- "//metropolis/node/core/network/hostsfile",
"//metropolis/node/core/roleserve",
"//metropolis/node/core/rpc/resolver",
"//metropolis/node/core/time",
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 0538478..721482b 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -30,7 +30,6 @@
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/localstorage/declarative"
"source.monogon.dev/metropolis/node/core/network"
- "source.monogon.dev/metropolis/node/core/network/hostsfile"
"source.monogon.dev/metropolis/node/core/roleserve"
"source.monogon.dev/metropolis/node/core/rpc/resolver"
timesvc "source.monogon.dev/metropolis/node/core/time"
@@ -160,19 +159,6 @@
return fmt.Errorf("failed to start role service: %w", err)
}
- // Start the hostsfile service.
- hostsfileSvc := hostsfile.Service{
- Config: hostsfile.Config{
- Roleserver: rs,
- Network: networkSvc,
- Ephemeral: &root.Ephemeral,
- ESP: &root.ESP,
- },
- }
- if err := supervisor.Run(ctx, "hostsfile", hostsfileSvc.Run); err != nil {
- return fmt.Errorf("failed to start hostsfile service: %w", err)
- }
-
if err := runDebugService(ctx, rs, lt, root); err != nil {
return fmt.Errorf("when starting debug service: %w", err)
}
diff --git a/metropolis/node/core/network/hostsfile/BUILD.bazel b/metropolis/node/core/network/hostsfile/BUILD.bazel
index 14b5701..e86850e 100644
--- a/metropolis/node/core/network/hostsfile/BUILD.bazel
+++ b/metropolis/node/core/network/hostsfile/BUILD.bazel
@@ -9,7 +9,7 @@
"//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/network",
- "//metropolis/node/core/roleserve",
+ "//metropolis/pkg/event",
"//metropolis/pkg/supervisor",
"//metropolis/proto/common",
"@org_golang_google_grpc//:go_default_library",
diff --git a/metropolis/node/core/network/hostsfile/hostsfile.go b/metropolis/node/core/network/hostsfile/hostsfile.go
index a4455ec..ea0a357 100644
--- a/metropolis/node/core/network/hostsfile/hostsfile.go
+++ b/metropolis/node/core/network/hostsfile/hostsfile.go
@@ -1,4 +1,4 @@
-// hostsfile implements a service which owns and writes all node-local
+// Package hostsfile implements a service which owns and writes all node-local
// files/interfaces used by the system to resolve the local node's name and the
// names of other nodes in the cluster:
//
@@ -27,11 +27,12 @@
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
- "source.monogon.dev/metropolis/node/core/roleserve"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -44,10 +45,13 @@
Ephemeral *localstorage.EphemeralDirectory
// ESP is the root of the node's EFI System Partition.
ESP *localstorage.ESPDirectory
-
- // Roleserver is an instance of the roleserver service which will be queried for
- // ClusterMembership and a Curator client.
- Roleserver *roleserve.Service
+ // NodeID of the node the service is running on.
+ NodeID string
+ // Curator gRPC client authenticated as local node.
+ Curator ipb.CuratorClient
+ // ClusterDirectorySaved will be written with a boolean indicating whether the
+ // ClusterDirectory has been successfully persisted to the ESP.
+ ClusterDirectorySaved event.Value[bool]
}
// Service is the hostsfile service instance. See package-level documentation
@@ -55,10 +59,6 @@
type Service struct {
Config
- // localC is a channel populated by the local sub-runnable with the newest
- // available information about the local node's address. It is automatically
- // created and closed by Run.
- localC chan string
// clusterC is a channel populated by the cluster sub-runnable with the newest
// available information about the cluster nodes. It is automatically created and
// closed by Run.
@@ -129,21 +129,12 @@
}
func (s *Service) Run(ctx context.Context) error {
- s.localC = make(chan string)
- defer close(s.localC)
+ s.ClusterDirectorySaved.Set(false)
+
+ localC := make(chan *network.Status)
s.clusterC = make(chan nodeMap)
- defer close(s.clusterC)
- cmw := s.Roleserver.ClusterMembership.Watch()
- defer cmw.Close()
- supervisor.Logger(ctx).Infof("Waiting for node ID...")
- nodeID, err := roleserve.GetNodeID(ctx, cmw)
- if err != nil {
- return err
- }
- supervisor.Logger(ctx).Infof("Got node ID, starting...")
-
- if err := supervisor.Run(ctx, "local", s.runLocal); err != nil {
+ if err := supervisor.Run(ctx, "local", event.Pipe(s.Network.Value(), localC)); err != nil {
return err
}
if err := supervisor.Run(ctx, "cluster", s.runCluster); err != nil {
@@ -152,10 +143,10 @@
// Immediately update machine-id and hostname, we don't need network addresses
// for that.
- if err := s.Ephemeral.MachineID.Write([]byte(nodeID), 0644); err != nil {
+ if err := s.Ephemeral.MachineID.Write([]byte(s.NodeID), 0644); err != nil {
return fmt.Errorf("failed to write /ephemeral/machine-id: %w", err)
}
- if err := unix.Sethostname([]byte(nodeID)); err != nil {
+ if err := unix.Sethostname([]byte(s.NodeID)); err != nil {
return fmt.Errorf("failed to set runtime hostname: %w", err)
}
// Immediately write an /etc/hosts just containing localhost, even if we don't
@@ -172,13 +163,17 @@
select {
case <-ctx.Done():
return ctx.Err()
- case u := <-s.localC:
+ case st := <-localC:
// Ignore spurious updates.
- if nodes[nodeID].address == u {
- break
+ if st.ExternalAddress == nil {
+ continue
+ }
+ u := st.ExternalAddress.String()
+ if nodes[s.NodeID].address == u {
+ continue
}
supervisor.Logger(ctx).Infof("Got new local address: %s", u)
- nodes[nodeID] = nodeInfo{
+ nodes[s.NodeID] = nodeInfo{
address: u,
local: true,
}
@@ -196,7 +191,7 @@
// We're not interested in what the cluster thinks about our local node, as that
// might be outdated (eg. when we haven't yet reported a new local address to
// the cluster).
- if id == nodeID {
+ if id == s.NodeID {
continue
}
if nodes[id].address == info.address {
@@ -218,8 +213,8 @@
}
// Check that we are self-resolvable.
- if _, err := net.ResolveIPAddr("ip", nodeID); err != nil {
- supervisor.Logger(ctx).Errorf("Failed to self-resolve %q: %v", nodeID, err)
+ if _, err := net.ResolveIPAddr("ip", s.NodeID); err != nil {
+ supervisor.Logger(ctx).Errorf("Failed to self-resolve %q: %v", s.NodeID, err)
}
// Update this node's ClusterDirectory.
@@ -233,22 +228,7 @@
return err
}
unix.Sync()
- }
-}
-
-// runLocal updates s.localC with the IP address of the local node, as retrieved
-// from the network service.
-func (s *Service) runLocal(ctx context.Context) error {
- nw := s.Network.Watch()
- for {
- ns, err := nw.Get(ctx)
- if err != nil {
- return err
- }
- addr := ns.ExternalAddress.String()
- if addr != "" {
- s.localC <- addr
- }
+ s.ClusterDirectorySaved.Set(true)
}
}
@@ -257,24 +237,7 @@
// reflects the up-to-date view of the cluster returned from the Curator Watch
// call, including any node deletions.
func (s *Service) runCluster(ctx context.Context) error {
- cmw := s.Roleserver.ClusterMembership.Watch()
- defer cmw.Close()
-
- supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := cmw.Get(ctx, roleserve.FilterHome())
- if err != nil {
- return err
- }
- supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
-
- con, err := cm.DialCurator()
- if err != nil {
- return err
- }
- defer con.Close()
- cur := ipb.NewCuratorClient(con)
-
- w, err := cur.Watch(ctx, &ipb.WatchRequest{
+ w, err := s.Curator.Watch(ctx, &ipb.WatchRequest{
Kind: &ipb.WatchRequest_NodesInCluster_{
NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
},
diff --git a/metropolis/node/core/network/main.go b/metropolis/node/core/network/main.go
index 8740d5a..6fb01eb 100644
--- a/metropolis/node/core/network/main.go
+++ b/metropolis/node/core/network/main.go
@@ -84,6 +84,13 @@
return s.status.Watch()
}
+// Value returns the underlying event.Value for the network service status.
+//
+// TODO(q3k): just expose s.status directly and remove the Watch and Event methods.
+func (s *Service) Value() event.Value[*Status] {
+ return &s.status
+}
+
// ConfigureDNS sets a DNS ExtraDirective on the built-in DNS server of the
// network Service. See //metropolis/node/core/network/dns for more
// information.
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 1818bf4..4a79349 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -11,6 +11,7 @@
"worker_clusternet.go",
"worker_controlplane.go",
"worker_heartbeat.go",
+ "worker_hostsfile.go",
"worker_kubernetes.go",
"worker_nodemgmt.go",
"worker_rolefetch.go",
@@ -28,6 +29,7 @@
"//metropolis/node/core/localstorage",
"//metropolis/node/core/mgmt",
"//metropolis/node/core/network",
+ "//metropolis/node/core/network/hostsfile",
"//metropolis/node/core/rpc",
"//metropolis/node/core/rpc/resolver",
"//metropolis/node/kubernetes",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 86b5b5a..a665044 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -80,11 +80,12 @@
type Service struct {
Config
- ClusterMembership memory.Value[*ClusterMembership]
- KubernetesStatus memory.Value[*KubernetesStatus]
- bootstrapData memory.Value[*bootstrapData]
- localRoles memory.Value[*cpb.NodeRoles]
- podNetwork memory.Value[*clusternet.Prefixes]
+ ClusterMembership memory.Value[*ClusterMembership]
+ KubernetesStatus memory.Value[*KubernetesStatus]
+ bootstrapData memory.Value[*bootstrapData]
+ localRoles memory.Value[*cpb.NodeRoles]
+ podNetwork memory.Value[*clusternet.Prefixes]
+ clusterDirectorySaved memory.Value[bool]
controlPlane *workerControlPlane
statusPush *workerStatusPush
@@ -93,6 +94,7 @@
rolefetch *workerRoleFetch
nodeMgmt *workerNodeMgmt
clusternet *workerClusternet
+ hostsfile *workerHostsfile
}
// New creates a Role Server services from a Config.
@@ -112,7 +114,8 @@
s.statusPush = &workerStatusPush{
network: s.Network,
- clusterMembership: &s.ClusterMembership,
+ clusterMembership: &s.ClusterMembership,
+ clusterDirectorySaved: &s.clusterDirectorySaved,
}
s.heartbeat = &workerHeartbeat{
@@ -142,6 +145,7 @@
clusterMembership: &s.ClusterMembership,
logTree: s.LogTree,
}
+
s.clusternet = &workerClusternet{
storageRoot: s.StorageRoot,
@@ -149,6 +153,13 @@
podNetwork: &s.podNetwork,
}
+ s.hostsfile = &workerHostsfile{
+ storageRoot: s.StorageRoot,
+ network: s.Network,
+ clusterMembership: &s.ClusterMembership,
+ clusterDirectorySaved: &s.clusterDirectorySaved,
+ }
+
return s
}
@@ -197,6 +208,7 @@
pubkey: credentials.PublicKey(),
resolver: s.Resolver,
})
+ s.clusterDirectorySaved.Set(true)
}
// Run the Role Server service, which uses intermediary workload launchers to
@@ -209,6 +221,7 @@
supervisor.Run(ctx, "rolefetch", s.rolefetch.run)
supervisor.Run(ctx, "nodemgmt", s.nodeMgmt.run)
supervisor.Run(ctx, "clusternet", s.clusternet.run)
+ supervisor.Run(ctx, "hostsfile", s.hostsfile.run)
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
diff --git a/metropolis/node/core/roleserve/worker_hostsfile.go b/metropolis/node/core/roleserve/worker_hostsfile.go
new file mode 100644
index 0000000..6cc35cc
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_hostsfile.go
@@ -0,0 +1,62 @@
+package roleserve
+
+import (
+ "context"
+
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/node/core/network/hostsfile"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+// workerHostsfile run the //metropolis/node/core/network/hostsfile service,
+// which in turn populates /etc/hosts, /etc/machine-id and updates the ESP-stored
+// ClusterDirectory (used to Join the cluster after a machine reboots).
+type workerHostsfile struct {
+ storageRoot *localstorage.Root
+
+ // network will be read. It provides data about the local node's address.
+ network *network.Service
+ // clusterMembership will be read. It provides data about the node identity but
+ // also provides access to the rest of the cluster's data via the Curator API.
+ clusterMembership *memory.Value[*ClusterMembership]
+
+ // clusterDirectorySaved will be written. A value of true indicates that the
+ // cluster directory has been successfully written at least once to the ESP.
+ clusterDirectorySaved *memory.Value[bool]
+}
+
+func (s *workerHostsfile) run(ctx context.Context) error {
+ w := s.clusterMembership.Watch()
+ defer w.Close()
+ supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+ cm, err := w.Get(ctx, FilterHome())
+ if err != nil {
+ return err
+ }
+ supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
+
+ // TODO(issues/193): stop dialing the curator everywhere.
+ conn, err := cm.DialCurator()
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+ cur := ipb.NewCuratorClient(conn)
+
+ svc := hostsfile.Service{
+ Config: hostsfile.Config{
+ Network: s.network,
+ Ephemeral: &s.storageRoot.Ephemeral,
+ ESP: &s.storageRoot.ESP,
+ NodeID: cm.NodeID(),
+ Curator: cur,
+ ClusterDirectorySaved: s.clusterDirectorySaved,
+ },
+ }
+
+ return svc.Run(ctx)
+}
diff --git a/metropolis/node/core/roleserve/worker_statuspush.go b/metropolis/node/core/roleserve/worker_statuspush.go
index ed806a0..4cea7c9 100644
--- a/metropolis/node/core/roleserve/worker_statuspush.go
+++ b/metropolis/node/core/roleserve/worker_statuspush.go
@@ -8,10 +8,12 @@
"google.golang.org/protobuf/encoding/prototext"
common "source.monogon.dev/metropolis/node"
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -22,6 +24,8 @@
// clusterMembership will be read.
clusterMembership *memory.Value[*ClusterMembership]
+ // clusterDirectorySaved will be read.
+ clusterDirectorySaved *memory.Value[bool]
}
// workerStatusPushChannels contain all the channels between the status pusher's
@@ -132,6 +136,16 @@
supervisor.Run(ctx, "map-cluster-membership", func(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
+ // Do not submit heartbeat until we've got the cluster directory saved.
+ //
+ // TODO(q3k): add a node status field for this instead.
+ supervisor.Logger(ctx).Infof("Waiting for cluster directory to be saved...")
+ cdw := s.clusterDirectorySaved.Watch()
+ _, err := cdw.Get(ctx, event.Filter(func(t bool) bool { return t }))
+ if err != nil {
+ return fmt.Errorf("getting cluster directory state failed: %w", err)
+ }
+
var conn *grpc.ClientConn
defer func() {
if conn != nil {
@@ -141,7 +155,7 @@
w := s.clusterMembership.Watch()
defer w.Close()
- supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+ supervisor.Logger(ctx).Infof("Cluster directory saved, waiting for cluster membership...")
for {
cm, err := w.Get(ctx, FilterHome())
if err != nil {