m/n/roleserve: reactive service management

Bottom line up first: this starts etcd, the curator and Kubernetes on
nodes that register into the cluster. Effectively, this is multi-node
support.

This significantly refactors the node roleserver to start both the
control plane and Kubernetes on demand, based on roles assigned by the
cluster (or due to bootstrapping a new cluster). Most importantly, we
pretty much remove all cluster-bootstrapping code from the node startup
process, thereby making the first node and any subsequent nodes not go
through different codepaths.

In addition, access to the cluster Curators is now also mediated via
the roleserver, which is the component aware whether the node code
should connect to the local curator (if the control plane is running) or
to remote curators (if the control plane is not [yet] running).

This implementation is a bit verbose as we make heavy use of untyped
Event Values, and we add quite a few lines repeated of code to combine
data from different values into something that a goroutine can wait on.
Once Go 1.18 lands we should be able to make this code much nicer.

There's still a few things that need to be implemented for all flows to
be working fully (notably, we can end up with stale curator clients,
curator clients are not load balanced across multiple curators, and
cluster directories for connecting to the curator do not get updated
after startup). However, these are all features that we should be able
to easily implement once this lands.

Currently this is only covered by the e2e test. The individual workers
within roleserver should be able to be independently tested, and this is
something I plan on doing very soon as another change on top, while this
one is being reviewed.

With time, the two large startup components (the cluster "enrolment"
manager and the roleserver) have slightly lost their original purpose
and their names aren't exactly fitting anymore. I might rename them in
an upcoming change, if anyone has any good naming ideas I'm all ears :).

Change-Id: Iaf0fc9f6fdd2122e6aae19607be1648382063e66
Reviewed-on: https://review.monogon.dev/c/monogon/+/532
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index 45c1164..2002571 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -7,27 +7,21 @@
         "cluster_bootstrap.go",
         "cluster_register.go",
         "platform.go",
-        "status.go",
-        "watcher.go",
     ],
     importpath = "source.monogon.dev/metropolis/node/core/cluster",
     visibility = ["//metropolis/node/core:__subpackages__"],
     deps = [
         "//metropolis/node:go_default_library",
         "//metropolis/node/core/consensus:go_default_library",
-        "//metropolis/node/core/consensus/client:go_default_library",
-        "//metropolis/node/core/curator:go_default_library",
         "//metropolis/node/core/curator/proto/api:go_default_library",
         "//metropolis/node/core/identity:go_default_library",
         "//metropolis/node/core/localstorage:go_default_library",
         "//metropolis/node/core/network:go_default_library",
-        "//metropolis/node/core/network/hostsfile:go_default_library",
+        "//metropolis/node/core/roleserve:go_default_library",
         "//metropolis/node/core/rpc:go_default_library",
-        "//metropolis/pkg/event:go_default_library",
         "//metropolis/pkg/event/memory:go_default_library",
         "//metropolis/pkg/supervisor:go_default_library",
         "//metropolis/proto/api:go_default_library",
-        "//metropolis/proto/common:go_default_library",
         "//metropolis/proto/private:go_default_library",
         "@com_github_cenkalti_backoff_v4//:go_default_library",
         "@org_golang_google_protobuf//proto:go_default_library",
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index ea3bfee..d51d7d9 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -41,6 +41,7 @@
 	"source.monogon.dev/metropolis/node/core/consensus"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/node/core/roleserve"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	apb "source.monogon.dev/metropolis/proto/api"
@@ -58,6 +59,7 @@
 type Manager struct {
 	storageRoot    *localstorage.Root
 	networkService *network.Service
+	roleServer     *roleserve.Service
 	status         memory.Value
 
 	state
@@ -70,10 +72,11 @@
 // NewManager creates a new cluster Manager. The given localstorage Root must
 // be places, but not yet started (and will be started as the Manager makes
 // progress). The given network Service must already be running.
-func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
+func NewManager(storageRoot *localstorage.Root, networkService *network.Service, rs *roleserve.Service) *Manager {
 	return &Manager{
 		storageRoot:    storageRoot,
 		networkService: networkService,
+		roleServer:     rs,
 
 		state: state{},
 	}
diff --git a/metropolis/node/core/cluster/cluster_bootstrap.go b/metropolis/node/core/cluster/cluster_bootstrap.go
index c5167a5..d40f179 100644
--- a/metropolis/node/core/cluster/cluster_bootstrap.go
+++ b/metropolis/node/core/cluster/cluster_bootstrap.go
@@ -20,28 +20,21 @@
 	"context"
 	"crypto/ed25519"
 	"crypto/rand"
-	"crypto/subtle"
 	"encoding/hex"
 	"fmt"
 
-	"source.monogon.dev/metropolis/node/core/consensus"
-	"source.monogon.dev/metropolis/node/core/consensus/client"
-	"source.monogon.dev/metropolis/node/core/curator"
-	"source.monogon.dev/metropolis/node/core/identity"
-	"source.monogon.dev/metropolis/node/core/network/hostsfile"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	apb "source.monogon.dev/metropolis/proto/api"
-	cpb "source.monogon.dev/metropolis/proto/common"
 	ppb "source.monogon.dev/metropolis/proto/private"
 )
 
 func (m *Manager) bootstrap(ctx context.Context, bootstrap *apb.NodeParameters_ClusterBootstrap) error {
 	supervisor.Logger(ctx).Infof("Bootstrapping new cluster, owner public key: %s", hex.EncodeToString(bootstrap.OwnerPublicKey))
+
 	state, unlock := m.lock()
 	defer unlock()
 
 	ownerKey := bootstrap.OwnerPublicKey
-
 	state.configuration = &ppb.SealedConfiguration{}
 
 	// Mount new storage with generated CUK, and save LUK into sealed config proto.
@@ -57,78 +50,7 @@
 	}
 	supervisor.Logger(ctx).Infof("Bootstrapping: node public key: %s", hex.EncodeToString([]byte(pub)))
 
-	node := curator.NewNodeForBootstrap(cuk, pub)
-
-	// Run worker to keep updating /ephemeral/hosts (and thus, /etc/hosts) with
-	// our own IP address. This ensures that the node's ID always resolves to
-	// its current external IP address.
-	// TODO(q3k): move this out into roleserver.
-	hostsfileSvc := hostsfile.Service{
-		Config: hostsfile.Config{
-			NodePublicKey: pub,
-			Network:       m.networkService,
-			Ephemeral:     &m.storageRoot.Ephemeral,
-		},
-	}
-	if err := supervisor.Run(ctx, "hostsfile", hostsfileSvc.Run); err != nil {
-		return err
-	}
-
-	// Bring up consensus with this node as the only member.
-	m.consensus = consensus.New(consensus.Config{
-		Data:           &m.storageRoot.Data.Etcd,
-		Ephemeral:      &m.storageRoot.Ephemeral.Consensus,
-		NodePrivateKey: priv,
-	})
-
-	supervisor.Logger(ctx).Infof("Bootstrapping: starting consensus...")
-	if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
-		return fmt.Errorf("when starting consensus: %w", err)
-	}
-
-	var ckv client.Namespaced
-	cw := m.consensus.Watch()
-	for {
-		st, err := cw.Get(ctx)
-		if err != nil {
-			return fmt.Errorf("when waiting for consensus status: %w", err)
-		}
-		if !st.Running() {
-			continue
-		}
-		ckv, err = st.CuratorClient()
-		if err != nil {
-			return fmt.Errorf("when retrieving curator client")
-		}
-		break
-	}
-
-	node.EnableKubernetesWorker()
-	caCertBytes, nodeCertBytes, err := curator.BootstrapNodeFinish(ctx, ckv, &node, ownerKey)
-	if err != nil {
-		return fmt.Errorf("failed to finish bootstrap: %w", err)
-	}
-
-	creds, err := identity.NewNodeCredentials(priv, nodeCertBytes, caCertBytes)
-	if err != nil {
-		return fmt.Errorf("failed to use newly bootstrapped node credentials: %w", err)
-	}
-
-	// Overly cautious check: ensure that the credentials are for the public key
-	// we've generated.
-	if want, got := pub, []byte(creds.PublicKey()); subtle.ConstantTimeCompare(want, got) != 1 {
-		return fmt.Errorf("newly bootstrapped node credentials emitted for wrong public key")
-	}
-
-	if err := creds.Save(&m.storageRoot.Data.Node.Credentials); err != nil {
-		return fmt.Errorf("failed to write node credentials: %w", err)
-	}
-
-	m.status.Set(Status{
-		State:       cpb.ClusterState_CLUSTER_STATE_HOME,
-		Consensus:   m.consensus,
-		Credentials: creds,
-	})
+	m.roleServer.ProvideBootstrapData(priv, ownerKey, cuk)
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 	supervisor.Signal(ctx, supervisor.SignalDone)
diff --git a/metropolis/node/core/cluster/cluster_register.go b/metropolis/node/core/cluster/cluster_register.go
index 1a413c6..76458b8 100644
--- a/metropolis/node/core/cluster/cluster_register.go
+++ b/metropolis/node/core/cluster/cluster_register.go
@@ -18,7 +18,6 @@
 	"source.monogon.dev/metropolis/node/core/rpc"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	apb "source.monogon.dev/metropolis/proto/api"
-	cpb "source.monogon.dev/metropolis/proto/common"
 	ppb "source.monogon.dev/metropolis/proto/private"
 )
 
@@ -30,7 +29,7 @@
 // test multiple nodes in a cluster. It does notably not run either
 // etcd/consensus or the curator, which also prevents Kubernetes from running.
 func (m *Manager) register(ctx context.Context, register *apb.NodeParameters_ClusterRegister) error {
-	// Do a validation pass on the provided NodeParameters.Register data, fail early
+	//// Do a validation pass on the provided NodeParameters.Register data, fail early
 	// if it looks invalid.
 	ca, err := x509.ParseCertificate(register.CaCertificate)
 	if err != nil {
@@ -51,7 +50,7 @@
 		}
 		for j, add := range node.Addresses {
 			if add.Host == "" || net.ParseIP(add.Host) == nil {
-				return fmt.Errorf("NodeParameters.ClusterDirectory.Nodes[%d].Addresses[%d] invalid: not a parseable address", i, j)
+				return fmt.Errorf("NodeParameters.ClusterDirectory.Nodes[%d].Addresses[%d] (%q) invalid: not a parseable address", i, j, add.Host)
 			}
 		}
 		if len(node.PublicKey) != ed25519.PublicKeySize {
@@ -103,6 +102,14 @@
 		return fmt.Errorf("could not create ephemeral client to %q: %w", remote, err)
 	}
 	cur := ipb.NewCuratorClient(eph)
+
+	// Register this node.
+	//
+	// MVP: From this point on forward, we have very little resiliency to failure,
+	// we barely scrape by with usage of retry loops. We should break down all the
+	// logic into some sort of state machine where we can atomically make progress
+	// on each of the stages and get rid of the retry loops. The cluster enrolment
+	// code should let us do this quite easily.
 	_, err = cur.RegisterNode(ctx, &ipb.RegisterNodeRequest{
 		RegisterTicket: register.RegisterTicket,
 	})
@@ -132,11 +139,10 @@
 	if err != nil {
 		return fmt.Errorf("NewNodeCredentials failed after receiving certificate from cluster: %w", err)
 	}
-	status := Status{
-		State:       cpb.ClusterState_CLUSTER_STATE_HOME,
-		Credentials: creds,
-	}
-	m.status.Set(status)
+
+	m.roleServer.ProvideRegisterData(*creds, register.ClusterDirectory)
+	// TODO(q3k): save keypair to localstorage
+
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 	supervisor.Signal(ctx, supervisor.SignalDone)
 	return nil
diff --git a/metropolis/node/core/cluster/status.go b/metropolis/node/core/cluster/status.go
deleted file mode 100644
index da898bf..0000000
--- a/metropolis/node/core/cluster/status.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package cluster
-
-import (
-	"errors"
-
-	"source.monogon.dev/metropolis/node/core/consensus"
-	"source.monogon.dev/metropolis/node/core/identity"
-	cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-var (
-	ErrNoLocalConsensus = errors.New("this node does not have direct access to etcd")
-)
-
-// Status is returned to Cluster clients (ie., node code) on Manager.Watch/.Get.
-type Status struct {
-	// State is the current state of the cluster, as seen by the node.
-	State cpb.ClusterState
-
-	// Consensus is a handle to a running Consensus service, or nil if this node
-	// does not run a Consensus instance.
-	Consensus consensus.ServiceHandle
-
-	// Credentials used for the node to authenticate to the Curator and other
-	// cluster services.
-	Credentials *identity.NodeCredentials
-}
diff --git a/metropolis/node/core/cluster/watcher.go b/metropolis/node/core/cluster/watcher.go
deleted file mode 100644
index 1251f2d..0000000
--- a/metropolis/node/core/cluster/watcher.go
+++ /dev/null
@@ -1,46 +0,0 @@
-package cluster
-
-import (
-	"context"
-	"fmt"
-
-	"source.monogon.dev/metropolis/pkg/event"
-	cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-type Watcher struct {
-	event.Watcher
-}
-
-func (w *Watcher) Get(ctx context.Context) (*Status, error) {
-	val, err := w.Watcher.Get(ctx)
-	if err != nil {
-		return nil, err
-	}
-	status := val.(Status)
-	return &status, err
-}
-
-// GetHome waits until the cluster, from the point of view of this node, is in
-// the ClusterHome state. This can be used to wait for the cluster manager to
-// 'settle', before clients start more node services.
-func (w *Watcher) GetHome(ctx context.Context) (*Status, error) {
-	for {
-		status, err := w.Get(ctx)
-		if err != nil {
-			return nil, err
-		}
-		switch status.State {
-		case cpb.ClusterState_CLUSTER_STATE_HOME:
-			return status, nil
-		case cpb.ClusterState_CLUSTER_STATE_DISOWNING:
-			return nil, fmt.Errorf("the cluster has disowned this node")
-		}
-	}
-}
-
-func (m *Manager) Watch() Watcher {
-	return Watcher{
-		Watcher: m.status.Watch(),
-	}
-}