m/n/core/roleserve: persist node roles across reboots
This allows us nodes to attempt to bring up some services before they
get fully connectivity to the cluster.
This is especially useful if a node cannot establish connectivity to the
cluster, eg. because it's the only control plane node that just started
up.
Fixes https://github.com/monogon-dev/monogon/issues/226
Change-Id: I030ccc02851e74ceb8dc043203083aa5b6854b55
Reviewed-on: https://review.monogon.dev/c/monogon/+/1842
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/localstorage/storage.go b/metropolis/node/core/localstorage/storage.go
index a37ce8d..eb56294 100644
--- a/metropolis/node/core/localstorage/storage.go
+++ b/metropolis/node/core/localstorage/storage.go
@@ -89,7 +89,8 @@
type DataNodeDirectory struct {
declarative.Directory
- Credentials PKIDirectory `dir:"credentials"`
+ Credentials PKIDirectory `dir:"credentials"`
+ PersistedRoles declarative.File `file:"roles.pb"`
}
type DataEtcdDirectory struct {
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 3e68818..84fd9fd 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -42,6 +42,7 @@
"//metropolis/proto/common",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_protobuf//encoding/prototext",
+ "@org_golang_google_protobuf//proto",
],
)
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index f0fa273..ddc5811 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -139,6 +139,7 @@
}
s.rolefetch = &workerRoleFetch{
+ storageRoot: s.StorageRoot,
curatorConnection: &s.CuratorConnection,
localRoles: &s.localRoles,
diff --git a/metropolis/node/core/roleserve/worker_rolefetch.go b/metropolis/node/core/roleserve/worker_rolefetch.go
index fb1fb33..40d2ffd 100644
--- a/metropolis/node/core/roleserve/worker_rolefetch.go
+++ b/metropolis/node/core/roleserve/worker_rolefetch.go
@@ -4,9 +4,13 @@
"context"
"fmt"
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "google.golang.org/protobuf/proto"
+
+ "source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -14,6 +18,7 @@
// responsible for populating localRoles based on a curatorConnection whenever
// the node is HOME and cluster credentials / curator access is available.
type workerRoleFetch struct {
+ storageRoot *localstorage.Root
curatorConnection *memory.Value[*curatorConnection]
// localRoles will be written.
@@ -21,54 +26,113 @@
}
func (s *workerRoleFetch) run(ctx context.Context) error {
+ // Wait for a 'curator connection' first. This isn't a real connection but just a
+ // set of credentials and a potentially working resolver. Here, we just use its
+ // presence as a marking that the local disk has been mounted and thus we can
+ // attempt to read a persisted cluster directory.
w := s.curatorConnection.Watch()
- defer w.Close()
- supervisor.Logger(ctx).Infof("Waiting for curator connection...")
- cc, err := w.Get(ctx)
+ _, err := w.Get(ctx)
if err != nil {
+ w.Close()
return err
}
- supervisor.Logger(ctx).Infof("Got curator connection, starting...")
+ w.Close()
- nodeID := cc.nodeID()
- cur := ipb.NewCuratorClient(cc.conn)
-
- // Start watch for current node, update localRoles whenever we get something
- // new.
- srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
- NodeInCluster: &ipb.WatchRequest_NodeInCluster{
- NodeId: nodeID,
- },
- }})
- if err != nil {
- return fmt.Errorf("watch failed: %w", err)
- }
- defer srv.CloseSend()
-
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- for {
- ev, err := srv.Recv()
+ // Read persisted roles if available.
+ exists, _ := s.storageRoot.Data.Node.PersistedRoles.Exists()
+ if exists {
+ supervisor.Logger(ctx).Infof("Attempting to read persisted node roles...")
+ data, err := s.storageRoot.Data.Node.PersistedRoles.Read()
if err != nil {
- return fmt.Errorf("watch event receive failed: %w", err)
- }
- for _, n := range ev.Nodes {
- n := n
- // Skip spuriously sent other nodes.
- if n.Id != nodeID {
- continue
+ supervisor.Logger(ctx).Errorf("Failed to read persisted roles: %w", err)
+ } else {
+ var nr cpb.NodeRoles
+ if err := proto.Unmarshal(data, &nr); err != nil {
+ supervisor.Logger(ctx).Errorf("Failed to unmarshal persisted roles: %w", err)
+ } else {
+ supervisor.Logger(ctx).Infof("Got persisted role data from disk:")
}
- supervisor.Logger(ctx).Infof("Got new node data. Roles:")
- if n.Roles.ConsensusMember != nil {
- supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
+ if nr.ConsensusMember != nil {
+ supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", nr.ConsensusMember.Peers)
}
- if n.Roles.KubernetesController != nil {
+ if nr.KubernetesController != nil {
supervisor.Logger(ctx).Infof(" - kubernetes controller")
}
- if n.Roles.KubernetesWorker != nil {
+ if nr.KubernetesWorker != nil {
supervisor.Logger(ctx).Infof(" - kubernetes worker")
}
- s.localRoles.Set(n.Roles)
- break
+ s.localRoles.Set(&nr)
}
+ } else {
+ supervisor.Logger(ctx).Infof("No persisted node roles.")
}
+
+ // Run networked part in a sub-runnable so that network errors don't cause us to
+ // retry the above and make us possibly trigger spurious restarts.
+ supervisor.Run(ctx, "watcher", func(ctx context.Context) error {
+ w := s.curatorConnection.Watch()
+ defer w.Close()
+ cc, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+
+ nodeID := cc.nodeID()
+ cur := ipb.NewCuratorClient(cc.conn)
+
+ // Start watch for current node, update localRoles whenever we get something
+ // new.
+ srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
+ NodeInCluster: &ipb.WatchRequest_NodeInCluster{
+ NodeId: nodeID,
+ },
+ }})
+ if err != nil {
+ return fmt.Errorf("watch failed: %w", err)
+ }
+ defer srv.CloseSend()
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ for {
+ ev, err := srv.Recv()
+ if err != nil {
+ return fmt.Errorf("watch event receive failed: %w", err)
+ }
+ for _, n := range ev.Nodes {
+ n := n
+ // Skip spuriously sent other nodes.
+ if n.Id != nodeID {
+ continue
+ }
+ supervisor.Logger(ctx).Infof("Got new node data. Roles:")
+ if n.Roles.ConsensusMember != nil {
+ supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
+ }
+ if n.Roles.KubernetesController != nil {
+ supervisor.Logger(ctx).Infof(" - kubernetes controller")
+ }
+ if n.Roles.KubernetesWorker != nil {
+ supervisor.Logger(ctx).Infof(" - kubernetes worker")
+ }
+ s.localRoles.Set(n.Roles)
+
+ // Persist role data to disk.
+ bytes, err := proto.Marshal(n.Roles)
+ if err != nil {
+ supervisor.Logger(ctx).Errorf("Failed to marshal node roles: %w", err)
+ } else {
+ err = s.storageRoot.Data.Node.PersistedRoles.Write(bytes, 0400)
+ if err != nil {
+ supervisor.Logger(ctx).Errorf("Failed to write node roles: %w", err)
+ }
+ }
+ break
+ }
+ }
+ })
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ return ctx.Err()
+
}