m/n/core/roleserve: persist node roles across reboots

This allows us nodes to attempt to bring up some services before they
get fully connectivity to the cluster.

This is especially useful if a node cannot establish connectivity to the
cluster, eg. because it's the only control plane node that just started
up.

Fixes https://github.com/monogon-dev/monogon/issues/226

Change-Id: I030ccc02851e74ceb8dc043203083aa5b6854b55
Reviewed-on: https://review.monogon.dev/c/monogon/+/1842
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/localstorage/storage.go b/metropolis/node/core/localstorage/storage.go
index a37ce8d..eb56294 100644
--- a/metropolis/node/core/localstorage/storage.go
+++ b/metropolis/node/core/localstorage/storage.go
@@ -89,7 +89,8 @@
 
 type DataNodeDirectory struct {
 	declarative.Directory
-	Credentials PKIDirectory `dir:"credentials"`
+	Credentials    PKIDirectory     `dir:"credentials"`
+	PersistedRoles declarative.File `file:"roles.pb"`
 }
 
 type DataEtcdDirectory struct {
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 3e68818..84fd9fd 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -42,6 +42,7 @@
         "//metropolis/proto/common",
         "@org_golang_google_grpc//:go_default_library",
         "@org_golang_google_protobuf//encoding/prototext",
+        "@org_golang_google_protobuf//proto",
     ],
 )
 
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index f0fa273..ddc5811 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -139,6 +139,7 @@
 	}
 
 	s.rolefetch = &workerRoleFetch{
+		storageRoot:       s.StorageRoot,
 		curatorConnection: &s.CuratorConnection,
 
 		localRoles: &s.localRoles,
diff --git a/metropolis/node/core/roleserve/worker_rolefetch.go b/metropolis/node/core/roleserve/worker_rolefetch.go
index fb1fb33..40d2ffd 100644
--- a/metropolis/node/core/roleserve/worker_rolefetch.go
+++ b/metropolis/node/core/roleserve/worker_rolefetch.go
@@ -4,9 +4,13 @@
 	"context"
 	"fmt"
 
-	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"google.golang.org/protobuf/proto"
+
+	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
@@ -14,6 +18,7 @@
 // responsible for populating localRoles based on a curatorConnection whenever
 // the node is HOME and cluster credentials / curator access is available.
 type workerRoleFetch struct {
+	storageRoot       *localstorage.Root
 	curatorConnection *memory.Value[*curatorConnection]
 
 	// localRoles will be written.
@@ -21,54 +26,113 @@
 }
 
 func (s *workerRoleFetch) run(ctx context.Context) error {
+	// Wait for a 'curator connection' first. This isn't a real connection but just a
+	// set of credentials and a potentially working resolver. Here, we just use its
+	// presence as a marking that the local disk has been mounted and thus we can
+	// attempt to read a persisted cluster directory.
 	w := s.curatorConnection.Watch()
-	defer w.Close()
-	supervisor.Logger(ctx).Infof("Waiting for curator connection...")
-	cc, err := w.Get(ctx)
+	_, err := w.Get(ctx)
 	if err != nil {
+		w.Close()
 		return err
 	}
-	supervisor.Logger(ctx).Infof("Got curator connection, starting...")
+	w.Close()
 
-	nodeID := cc.nodeID()
-	cur := ipb.NewCuratorClient(cc.conn)
-
-	// Start watch for current node, update localRoles whenever we get something
-	// new.
-	srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
-		NodeInCluster: &ipb.WatchRequest_NodeInCluster{
-			NodeId: nodeID,
-		},
-	}})
-	if err != nil {
-		return fmt.Errorf("watch failed: %w", err)
-	}
-	defer srv.CloseSend()
-
-	supervisor.Signal(ctx, supervisor.SignalHealthy)
-	for {
-		ev, err := srv.Recv()
+	// Read persisted roles if available.
+	exists, _ := s.storageRoot.Data.Node.PersistedRoles.Exists()
+	if exists {
+		supervisor.Logger(ctx).Infof("Attempting to read persisted node roles...")
+		data, err := s.storageRoot.Data.Node.PersistedRoles.Read()
 		if err != nil {
-			return fmt.Errorf("watch event receive failed: %w", err)
-		}
-		for _, n := range ev.Nodes {
-			n := n
-			// Skip spuriously sent other nodes.
-			if n.Id != nodeID {
-				continue
+			supervisor.Logger(ctx).Errorf("Failed to read persisted roles: %w", err)
+		} else {
+			var nr cpb.NodeRoles
+			if err := proto.Unmarshal(data, &nr); err != nil {
+				supervisor.Logger(ctx).Errorf("Failed to unmarshal persisted roles: %w", err)
+			} else {
+				supervisor.Logger(ctx).Infof("Got persisted role data from disk:")
 			}
-			supervisor.Logger(ctx).Infof("Got new node data. Roles:")
-			if n.Roles.ConsensusMember != nil {
-				supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
+			if nr.ConsensusMember != nil {
+				supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", nr.ConsensusMember.Peers)
 			}
-			if n.Roles.KubernetesController != nil {
+			if nr.KubernetesController != nil {
 				supervisor.Logger(ctx).Infof(" - kubernetes controller")
 			}
-			if n.Roles.KubernetesWorker != nil {
+			if nr.KubernetesWorker != nil {
 				supervisor.Logger(ctx).Infof(" - kubernetes worker")
 			}
-			s.localRoles.Set(n.Roles)
-			break
+			s.localRoles.Set(&nr)
 		}
+	} else {
+		supervisor.Logger(ctx).Infof("No persisted node roles.")
 	}
+
+	// Run networked part in a sub-runnable so that network errors don't cause us to
+	// retry the above and make us possibly trigger spurious restarts.
+	supervisor.Run(ctx, "watcher", func(ctx context.Context) error {
+		w := s.curatorConnection.Watch()
+		defer w.Close()
+		cc, err := w.Get(ctx)
+		if err != nil {
+			return err
+		}
+
+		nodeID := cc.nodeID()
+		cur := ipb.NewCuratorClient(cc.conn)
+
+		// Start watch for current node, update localRoles whenever we get something
+		// new.
+		srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
+			NodeInCluster: &ipb.WatchRequest_NodeInCluster{
+				NodeId: nodeID,
+			},
+		}})
+		if err != nil {
+			return fmt.Errorf("watch failed: %w", err)
+		}
+		defer srv.CloseSend()
+
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		for {
+			ev, err := srv.Recv()
+			if err != nil {
+				return fmt.Errorf("watch event receive failed: %w", err)
+			}
+			for _, n := range ev.Nodes {
+				n := n
+				// Skip spuriously sent other nodes.
+				if n.Id != nodeID {
+					continue
+				}
+				supervisor.Logger(ctx).Infof("Got new node data. Roles:")
+				if n.Roles.ConsensusMember != nil {
+					supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
+				}
+				if n.Roles.KubernetesController != nil {
+					supervisor.Logger(ctx).Infof(" - kubernetes controller")
+				}
+				if n.Roles.KubernetesWorker != nil {
+					supervisor.Logger(ctx).Infof(" - kubernetes worker")
+				}
+				s.localRoles.Set(n.Roles)
+
+				// Persist role data to disk.
+				bytes, err := proto.Marshal(n.Roles)
+				if err != nil {
+					supervisor.Logger(ctx).Errorf("Failed to marshal node roles: %w", err)
+				} else {
+					err = s.storageRoot.Data.Node.PersistedRoles.Write(bytes, 0400)
+					if err != nil {
+						supervisor.Logger(ctx).Errorf("Failed to write node roles: %w", err)
+					}
+				}
+				break
+			}
+		}
+	})
+
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	<-ctx.Done()
+	return ctx.Err()
+
 }