m/n/roleserve: reactive service management

Bottom line up first: this starts etcd, the curator and Kubernetes on
nodes that register into the cluster. Effectively, this is multi-node
support.

This significantly refactors the node roleserver to start both the
control plane and Kubernetes on demand, based on roles assigned by the
cluster (or due to bootstrapping a new cluster). Most importantly, we
pretty much remove all cluster-bootstrapping code from the node startup
process, thereby making the first node and any subsequent nodes not go
through different codepaths.

In addition, access to the cluster Curators is now also mediated via
the roleserver, which is the component aware whether the node code
should connect to the local curator (if the control plane is running) or
to remote curators (if the control plane is not [yet] running).

This implementation is a bit verbose as we make heavy use of untyped
Event Values, and we add quite a few lines repeated of code to combine
data from different values into something that a goroutine can wait on.
Once Go 1.18 lands we should be able to make this code much nicer.

There's still a few things that need to be implemented for all flows to
be working fully (notably, we can end up with stale curator clients,
curator clients are not load balanced across multiple curators, and
cluster directories for connecting to the curator do not get updated
after startup). However, these are all features that we should be able
to easily implement once this lands.

Currently this is only covered by the e2e test. The individual workers
within roleserver should be able to be independently tested, and this is
something I plan on doing very soon as another change on top, while this
one is being reviewed.

With time, the two large startup components (the cluster "enrolment"
manager and the roleserver) have slightly lost their original purpose
and their names aren't exactly fitting anymore. I might rename them in
an upcoming change, if anyone has any good naming ideas I'm all ears :).

Change-Id: Iaf0fc9f6fdd2122e6aae19607be1648382063e66
Reviewed-on: https://review.monogon.dev/c/monogon/+/532
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 557ed68..2c2e885 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -1,45 +1,59 @@
 // package roleserve implements the roleserver/“Role Server”.
 //
-// The Role Server is responsible for watching the cluster/node state and
-// starting/stopping any 'application' code (also called workload code) based on
-// the Node's roles.
+// The Role Server runs on every node and is responsible for running all of the
+// node's role dependant services, like the control plane (Consensus/etcd and
+// Curator) and Kubernetes. It watches the node roles as assigned by the
+// cluster's curator, updates the status of the node within the curator, and
+// spawns on-demand services.
 //
-// Each workload code (which would usually be a supervisor runnable) is started
-// by a dedicated 'launcher'. These launchers wait for node roles to be
-// available from the curator, and either start the related workload sub-runners
-// or do nothing ; then they declare themselves as healthy to the supervisor. If
-// at any point the role of the node changes (meaning that the node should now
-// start or stop the workloads) the launcher just exits and the supervisor
-// will restart it.
 //
-// Currently, this is used to start up the Kubernetes worker code.
+//  .-----------.          .--------.  Watches  .------------.
+//  | Cluster   |--------->| Role   |<----------| Node Roles |
+//  | Enrolment | Provides | Server |  Updates  '------------'
+//  '-----------'   Data   |        |----.      .-------------.
+//                         '--------'    '----->| Node Status |
+//                    Spawns |    | Spawns      '-------------'
+//                     .-----'    '-----.
+//                     V                V
+//                 .-----------. .------------.
+//                 | Consensus | | Kubernetes |
+//                 | & Curator | |            |
+//                 '-----------' '------------'
+//
+// The internal state of the Role Server (eg. status of services, input from
+// Cluster Enrolment, current node roles as retrieved from the cluster) is
+// stored as in-memory Event Value variables, with some of them being exposed
+// externally for other services to consume (ie. ones that wish to depend on
+// some information managed by the Role Server but which do not need to be
+// spawned on demand by the Role Server). These Event Values and code which acts
+// upon them form a reactive/dataflow-driven model which drives the Role Server
+// logic forward.
+//
+// The Role Server also has to handle the complex bootstrap problem involved in
+// simultaneously accessing the control plane (for node roles and other cluster
+// data) while maintaining (possibly the only one in the cluster) control plane
+// instance. The state of of resolution of this bootstrap problem is maintained
+// within ClusterMembership, which contains critical information about the
+// control plane, like the information required to connect to a Curator (local
+// or remote). It is updated both by external processes (ie. data from the
+// Cluster Enrolment) as well as logic responsible for spawning the control
+// plane.
+//
 package roleserve
 
 import (
 	"context"
-	"fmt"
+	"crypto/ed25519"
 
-	"google.golang.org/grpc"
-
-	cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
-	"source.monogon.dev/metropolis/node/kubernetes"
-	"source.monogon.dev/metropolis/node/kubernetes/pki"
-	"source.monogon.dev/metropolis/pkg/event"
-	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
 // Config is the configuration of the role server.
 type Config struct {
-	// CuratorDial is a function that the roleserver will use to dial the curator.
-	// As both the curator listener and roleserver might restart, this dial function
-	// is needed to possibly re-establish connectivity after a full restart of
-	// either.
-	CuratorDial func() (*grpc.ClientConn, error)
-
 	// StorageRoot is a handle to access all of the Node's storage. This is needed
 	// as the roleserver spawns complex workloads like Kubernetes which need access
 	// to a broad range of storage.
@@ -47,13 +61,6 @@
 
 	// Network is a handle to the network service, used by workloads.
 	Network *network.Service
-
-	// KPKI is a handle to initialized Kubernetes PKI stored on etcd. In the future
-	// this will probably be provisioned by the Kubernetes workload itself.
-	KPKI *pki.PKI
-
-	// Node is the node identity on which the roleserver is running.
-	Node *identity.Node
 }
 
 // Service is the roleserver/“Role Server” service. See the package-level
@@ -61,125 +68,84 @@
 type Service struct {
 	Config
 
-	value memory.Value
+	ClusterMembership ClusterMembershipValue
+	KubernetesStatus  KubernetesStatusValue
+	bootstrapData     bootstrapDataValue
+	localRoles        localRolesValue
 
-	// kwC is a channel populated with updates to the local Node object from the
-	// curator, passed over to the Kubernetes Worker launcher.
-	kwC chan *cpb.Node
-	// kwSvcC is a channel populated by the Kubernetes Worker launcher when the
-	// service is started (which then contains the value of spawned Kubernetes
-	// workload service) or stopped (which is then nil).
-	kwSvcC chan *kubernetes.Service
-
-	// gRPC channel to curator, acquired via Config.CuratorDial, active for the
-	// lifecycle of the Service runnable. It's used by the updater
-	// sub-runnable.
-	curator cpb.CuratorClient
-}
-
-// Status is updated by the role service any time one of the subordinate
-// workload services is started or stopped.
-type Status struct {
-	// Kubernetes is set to the Kubernetes workload Service if started/restarted or
-	// nil if stopped.
-	Kubernetes *kubernetes.Service
+	controlPlane *workerControlPlane
+	statusPush   *workerStatusPush
+	kubernetes   *workerKubernetes
+	rolefetch    *workerRoleFetch
 }
 
 // New creates a Role Server services from a Config.
 func New(c Config) *Service {
-	return &Service{
+	s := &Service{
 		Config: c,
-		kwC:    make(chan *cpb.Node),
-		kwSvcC: make(chan *kubernetes.Service),
 	}
+
+	s.controlPlane = &workerControlPlane{
+		storageRoot: s.StorageRoot,
+
+		bootstrapData:     &s.bootstrapData,
+		clusterMembership: &s.ClusterMembership,
+		localRoles:        &s.localRoles,
+	}
+
+	s.statusPush = &workerStatusPush{
+		network: s.Network,
+
+		clusterMembership: &s.ClusterMembership,
+	}
+
+	s.kubernetes = &workerKubernetes{
+		network:     s.Network,
+		storageRoot: s.StorageRoot,
+
+		localRoles:        &s.localRoles,
+		clusterMembership: &s.ClusterMembership,
+
+		kubernetesStatus: &s.KubernetesStatus,
+	}
+
+	s.rolefetch = &workerRoleFetch{
+		clusterMembership: &s.ClusterMembership,
+
+		localRoles: &s.localRoles,
+	}
+
+	return s
 }
 
-type Watcher struct {
-	event.Watcher
+func (s *Service) ProvideBootstrapData(privkey ed25519.PrivateKey, iok, cuk []byte) {
+	s.ClusterMembership.set(&ClusterMembership{
+		pubkey: privkey.Public().(ed25519.PublicKey),
+	})
+	s.bootstrapData.set(&bootstrapData{
+		nodePrivateKey:   privkey,
+		initialOwnerKey:  iok,
+		clusterUnlockKey: cuk,
+	})
 }
 
-func (s *Service) Watch() Watcher {
-	return Watcher{
-		Watcher: s.value.Watch(),
-	}
-}
-
-func (w *Watcher) Get(ctx context.Context) (*Status, error) {
-	v, err := w.Watcher.Get(ctx)
-	if err != nil {
-		return nil, err
-	}
-	st := v.(Status)
-	return &st, nil
+func (s *Service) ProvideRegisterData(credentials identity.NodeCredentials, directory *cpb.ClusterDirectory) {
+	s.ClusterMembership.set(&ClusterMembership{
+		remoteCurators: directory,
+		credentials:    &credentials,
+		pubkey:         credentials.PublicKey(),
+	})
 }
 
 // Run the Role Server service, which uses intermediary workload launchers to
 // start/stop subordinate services as the Node's roles change.
 func (s *Service) Run(ctx context.Context) error {
-	supervisor.Logger(ctx).Info("Dialing curator...")
-	conn, err := s.CuratorDial()
-	if err != nil {
-		return fmt.Errorf("could not dial cluster curator: %w", err)
-	}
-	defer conn.Close()
-	s.curator = cpb.NewCuratorClient(conn)
-
-	if err := supervisor.Run(ctx, "updater", s.runUpdater); err != nil {
-		return fmt.Errorf("failed to launch updater: %w", err)
-	}
-
-	if err := supervisor.Run(ctx, "cluster-agent", s.runClusterAgent); err != nil {
-		return fmt.Errorf("failed to launch cluster agent: %w", err)
-	}
-
-	if err := supervisor.Run(ctx, "kubernetes-worker", s.runKubernetesWorkerLauncher); err != nil {
-		return fmt.Errorf("failed to start kubernetes-worker launcher: %w", err)
-	}
-
+	supervisor.Run(ctx, "controlplane", s.controlPlane.run)
+	supervisor.Run(ctx, "kubernetes", s.kubernetes.run)
+	supervisor.Run(ctx, "statuspush", s.statusPush.run)
+	supervisor.Run(ctx, "rolefetch", s.rolefetch.run)
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
-	status := Status{}
-	for {
-		select {
-		case <-ctx.Done():
-			return ctx.Err()
-		case svc := <-s.kwSvcC:
-			status.Kubernetes = svc
-			s.value.Set(status)
-		}
-	}
-}
-
-// runUpdater runs the updater, a runnable which watchers the cluster via
-// curator for any pertinent node changes and distributes them to respective
-// workload launchers.
-//
-// TODO(q3k): this should probably be implemented somewhere as a curator client
-// library, maybe one that implements the Event Value interface.
-func (s *Service) runUpdater(ctx context.Context) error {
-	srv, err := s.curator.Watch(ctx, &cpb.WatchRequest{Kind: &cpb.WatchRequest_NodeInCluster_{
-		NodeInCluster: &cpb.WatchRequest_NodeInCluster{
-			NodeId: s.Node.ID(),
-		},
-	}})
-	if err != nil {
-		return fmt.Errorf("watch failed: %w", err)
-	}
-	defer srv.CloseSend()
-
-	supervisor.Signal(ctx, supervisor.SignalHealthy)
-	for {
-		ev, err := srv.Recv()
-		if err != nil {
-			return fmt.Errorf("watch event receive failed: %w", err)
-		}
-		supervisor.Logger(ctx).Infof("Received node event: %+v", ev)
-		for _, node := range ev.Nodes {
-			if node.Id != s.Node.ID() {
-				continue
-			}
-			s.kwC <- node
-		}
-	}
-
+	<-ctx.Done()
+	return ctx.Err()
 }