m/n/core/{curator,cluster}: refactor against new Consensus API

This updates the Curator and the Cluster Manager to use the new
Consensus API, notably to use JoinParameters and ServiceHandle.Watch.

Using JoinParameters end-to-end requires piping them through a node's
roles. For this we create a new ConsensusMember role and replicate all
the data from JoinParameters there.

We also move a whole bunch of logic that used to live in the Cluster
Manager's Status object away from it. Instead, now the Consensus
ServiceHandle is exposed directly to downstream users, providing the
same functionality.

Change-Id: I8cfa247011554553836019f60ea172dd6069f49c
Reviewed-on: https://review.monogon.dev/c/monogon/+/522
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index fe29abd..ea3bfee 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -120,12 +120,17 @@
 
 	switch inner := params.Cluster.(type) {
 	case *apb.NodeParameters_ClusterBootstrap_:
-		return m.bootstrap(ctx, inner.ClusterBootstrap)
+		err = m.bootstrap(ctx, inner.ClusterBootstrap)
 	case *apb.NodeParameters_ClusterRegister_:
-		return m.register(ctx, inner.ClusterRegister)
+		err = m.register(ctx, inner.ClusterRegister)
 	default:
-		return fmt.Errorf("node parameters misconfigured: neither cluster_bootstrap nor cluster_register set")
+		err = fmt.Errorf("node parameters misconfigured: neither cluster_bootstrap nor cluster_register set")
 	}
+
+	if err == nil {
+		supervisor.Logger(ctx).Info("Cluster enrolment done.")
+	}
+	return err
 }
 
 func (m *Manager) nodeParamsFWCFG(ctx context.Context) (*apb.NodeParameters, error) {
diff --git a/metropolis/node/core/cluster/cluster_bootstrap.go b/metropolis/node/core/cluster/cluster_bootstrap.go
index d83913d..c5167a5 100644
--- a/metropolis/node/core/cluster/cluster_bootstrap.go
+++ b/metropolis/node/core/cluster/cluster_bootstrap.go
@@ -86,7 +86,7 @@
 		return fmt.Errorf("when starting consensus: %w", err)
 	}
 
-	var metropolisKV client.Namespaced
+	var ckv client.Namespaced
 	cw := m.consensus.Watch()
 	for {
 		st, err := cw.Get(ctx)
@@ -96,42 +96,20 @@
 		if !st.Running() {
 			continue
 		}
-		metropolisKV, err = st.MetropolisClient()
+		ckv, err = st.CuratorClient()
 		if err != nil {
 			return fmt.Errorf("when retrieving curator client")
 		}
 		break
 	}
 
-	status := Status{
-		State:             cpb.ClusterState_CLUSTER_STATE_HOME,
-		HasLocalConsensus: true,
-		consensusClient:   metropolisKV,
-		// Credentials are set further down once created through a curator
-		// short-circuit bootstrap function.
-		Credentials: nil,
-	}
-
-	// Short circuit curator into storing the new node.
-	ckv, err := status.ConsensusClient(ConsensusUserCurator)
+	node.EnableKubernetesWorker()
+	caCertBytes, nodeCertBytes, err := curator.BootstrapNodeFinish(ctx, ckv, &node, ownerKey)
 	if err != nil {
-		return fmt.Errorf("when retrieving consensus user for curator: %w", err)
-	}
-
-	if err := curator.BootstrapFinish(ctx, ckv, &node, ownerKey); err != nil {
 		return fmt.Errorf("failed to finish bootstrap: %w", err)
 	}
 
-	// And short-circuit creating the curator CA and node certificate.
-	caCert, nodeCert, err := curator.BootstrapNodeCredentials(ctx, ckv, pub)
-	if err != nil {
-		return fmt.Errorf("failed to bootstrap node credentials: %w", err)
-	}
-
-	// Using the short-circuited credentials from the curator, build our
-	// NodeCredentials. That, and the public part of the credentials
-	// (NodeCertificate) are the primary output of the cluster manager.
-	creds, err := identity.NewNodeCredentials(priv, nodeCert, caCert)
+	creds, err := identity.NewNodeCredentials(priv, nodeCertBytes, caCertBytes)
 	if err != nil {
 		return fmt.Errorf("failed to use newly bootstrapped node credentials: %w", err)
 	}
@@ -146,8 +124,11 @@
 		return fmt.Errorf("failed to write node credentials: %w", err)
 	}
 
-	status.Credentials = creds
-	m.status.Set(status)
+	m.status.Set(Status{
+		State:       cpb.ClusterState_CLUSTER_STATE_HOME,
+		Consensus:   m.consensus,
+		Credentials: creds,
+	})
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 	supervisor.Signal(ctx, supervisor.SignalDone)
diff --git a/metropolis/node/core/cluster/cluster_register.go b/metropolis/node/core/cluster/cluster_register.go
index 81db8c9..1a413c6 100644
--- a/metropolis/node/core/cluster/cluster_register.go
+++ b/metropolis/node/core/cluster/cluster_register.go
@@ -133,9 +133,8 @@
 		return fmt.Errorf("NewNodeCredentials failed after receiving certificate from cluster: %w", err)
 	}
 	status := Status{
-		State:             cpb.ClusterState_CLUSTER_STATE_HOME,
-		HasLocalConsensus: false,
-		Credentials:       creds,
+		State:       cpb.ClusterState_CLUSTER_STATE_HOME,
+		Credentials: creds,
 	}
 	m.status.Set(status)
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
diff --git a/metropolis/node/core/cluster/status.go b/metropolis/node/core/cluster/status.go
index 481c818..da898bf 100644
--- a/metropolis/node/core/cluster/status.go
+++ b/metropolis/node/core/cluster/status.go
@@ -2,9 +2,8 @@
 
 import (
 	"errors"
-	"fmt"
 
-	"source.monogon.dev/metropolis/node/core/consensus/client"
+	"source.monogon.dev/metropolis/node/core/consensus"
 	"source.monogon.dev/metropolis/node/core/identity"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
@@ -18,60 +17,11 @@
 	// State is the current state of the cluster, as seen by the node.
 	State cpb.ClusterState
 
-	// hasLocalConsensus is true if the local node is running a local consensus
-	// (etcd) server.
-	HasLocalConsensus bool
-	// consensusClient is an etcd client to the local consensus server if the node
-	// has such a server and the cluster state is HOME or SPLIT.
-	consensusClient client.Namespaced
+	// Consensus is a handle to a running Consensus service, or nil if this node
+	// does not run a Consensus instance.
+	Consensus consensus.ServiceHandle
 
 	// Credentials used for the node to authenticate to the Curator and other
 	// cluster services.
 	Credentials *identity.NodeCredentials
 }
-
-// ConsensusUser is the to-level user of an etcd client in Metropolis node
-// code. These need to be defined ahead of time in an Go 'enum', and different
-// ConsensusUsers should not be shared by different codepaths.
-type ConsensusUser string
-
-const (
-	ConsensusUserKubernetesPKI ConsensusUser = "kubernetes-pki"
-	ConsensusUserCurator       ConsensusUser = "curator"
-)
-
-// ConsensusClient returns an etcd/consensus client for a given ConsensusUser.
-// The node must be running a local consensus/etcd server.
-func (s *Status) ConsensusClient(user ConsensusUser) (client.Namespaced, error) {
-	if !s.HasLocalConsensus {
-		return nil, ErrNoLocalConsensus
-	}
-
-	// Ensure that we already are connected to etcd and are in a state in which we
-	// should be handing out cluster connectivity.
-	if s.consensusClient == nil {
-		return nil, fmt.Errorf("not connected")
-	}
-	switch s.State {
-	case cpb.ClusterState_CLUSTER_STATE_HOME:
-	case cpb.ClusterState_CLUSTER_STATE_SPLIT:
-		// The consensus client is resistant to being split off, and will serve
-		// as soon as the split is resolved.
-	default:
-		return nil, fmt.Errorf("refusing connection with cluster state %v", s.State)
-	}
-
-	// Ensure only defined 'applications' are used to prevent programmer error and
-	// casting to ConsensusUser from an arbitrary string.
-	switch user {
-	case ConsensusUserKubernetesPKI:
-	case ConsensusUserCurator:
-	default:
-		return nil, fmt.Errorf("unknown ConsensusUser %q", user)
-	}
-	client, err := s.consensusClient.Sub(string(user))
-	if err != nil {
-		return nil, fmt.Errorf("retrieving subclient failed: %w", err)
-	}
-	return client, nil
-}