m/n/core/cluster: migrate to events and etcd namespaced client

This moves the status of the cluster manager to use a local event
variable. Watchers (like the node startup code) can now use this to get
updates on the state of the node and its cluster membership in a way
that's more abstracted from a sequential startup. This will permit us to
move a lof othe startup code into code common across different node
lifecycle paths.

Test Plan: Refactor, exercised by e2e.

X-Origin-Diff: phab/D757
GitOrigin-RevId: 31a3600ad2aab90a1e7f84d741e7ea40a0422724
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
index 65af212..874d3ae 100644
--- a/metropolis/node/core/cluster/manager.go
+++ b/metropolis/node/core/cluster/manager.go
@@ -26,16 +26,60 @@
 	"google.golang.org/protobuf/proto"
 
 	"source.monogon.dev/metropolis/node/core/consensus"
+	"source.monogon.dev/metropolis/node/core/consensus/client"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	apb "source.monogon.dev/metropolis/proto/api"
 	ppb "source.monogon.dev/metropolis/proto/private"
 )
 
-type managerResult struct {
-	node *Node
-	err  error
+// Status is returned to Cluster clients (ie., node code) on Manager.Watch/.Get.
+type Status struct {
+	// State is the current state of the cluster, as seen by the node.
+	State ClusterState
+	// Node is the configuration of this node in the cluster.
+	Node *Node
+
+	consensusClient client.Namespaced
+}
+
+// ConsensusUser is the to-level user of an etcd client in Metropolis node
+// code. These need to be defined ahead of time in an Go 'enum', and different
+// ConsensusUsers should not be shared by different codepaths.
+type ConsensusUser string
+
+const (
+	ConsensusUserKubernetesPKI ConsensusUser = "kubernetes-pki"
+)
+
+// ConsensusClient returns an etcd/consensus client for a given ConsensusUser.
+func (s *Status) ConsensusClient(user ConsensusUser) (client.Namespaced, error) {
+	// Ensure that we already are connected to etcd and are in a state in which we
+	// should be handing out cluster connectivity.
+	if s.consensusClient == nil {
+		return nil, fmt.Errorf("not connected")
+	}
+	switch s.State {
+	case ClusterHome:
+	case ClusterSplit:
+		return nil, fmt.Errorf("refusing connection with cluster state %v", s.State)
+	default:
+	}
+
+	// Ensure only defined 'applications' are used to prevent programmer error and
+	// casting to ConsensusUser from an arbitrary string.
+	switch user {
+	case ConsensusUserKubernetesPKI:
+	default:
+		return nil, fmt.Errorf("unknown ConsensusUser %q", user)
+	}
+	client, err := s.consensusClient.Sub(string(user))
+	if err != nil {
+		return nil, fmt.Errorf("retrieving subclient failed: %w", err)
+	}
+	return client, nil
 }
 
 type state struct {
@@ -46,27 +90,49 @@
 	stateNode    ppb.Node_FSMState
 
 	configuration *ppb.SealedConfiguration
-
-	result  *managerResult
-	waiters []chan *managerResult
 }
 
-func (s *state) setResult(node *Node, err error) {
-	s.result = &managerResult{
-		node: node,
-		err:  err,
+type Watcher struct {
+	event.Watcher
+}
+
+func (w *Watcher) Get(ctx context.Context) (*Status, error) {
+	val, err := w.Watcher.Get(ctx)
+	if err != nil {
+		return nil, err
 	}
-	for _, w := range s.waiters {
-		go func(c chan *managerResult) {
-			c <- s.result
-		}(w)
+	status := val.(Status)
+	return &status, err
+}
+
+// GetHome waits until the cluster, from the point of view of this node, is in
+// the ClusterHome state. This can be used to wait for the cluster manager to
+// 'settle', before clients start more node services.
+func (w *Watcher) GetHome(ctx context.Context) (*Status, error) {
+	for {
+		status, err := w.Get(ctx)
+		if err != nil {
+			return nil, err
+		}
+		switch status.State {
+		case ClusterHome:
+			return status, nil
+		case ClusterDisowning:
+			return nil, fmt.Errorf("the cluster has disowned this node")
+		}
 	}
-	s.waiters = nil
+}
+
+func (m *Manager) Watch() Watcher {
+	return Watcher{
+		Watcher: m.status.Watch(),
+	}
 }
 
 type Manager struct {
 	storageRoot    *localstorage.Root
 	networkService *network.Service
+	status         event.MemoryValue
 
 	state
 
@@ -100,21 +166,6 @@
 	return &m.state, m.mu.RUnlock
 }
 
-func (m *Manager) Wait() (*Node, error) {
-	state, unlock := m.lock()
-
-	if state.result != nil {
-		unlock()
-		return state.result.node, state.result.err
-	}
-
-	c := make(chan *managerResult)
-	state.waiters = append(state.waiters, c)
-	unlock()
-	res := <-c
-	return res.node, res.err
-}
-
 // Run is the runnable of the Manager, to be started using the Supervisor. It
 // is one-shot, and should not be restarted.
 func (m *Manager) Run(ctx context.Context) error {
@@ -194,7 +245,7 @@
 		return nil, fmt.Errorf("could not find node parameters in ESP or qemu fwcfg")
 	}
 	if paramsFWCFG != nil && paramsESP != nil {
-		supervisor.Logger(ctx).Warningf("Node parameters found both inboth ESP and qemu fwcfg, using the latter")
+		supervisor.Logger(ctx).Warningf("Node parameters found both in both ESP and qemu fwcfg, using the latter")
 		return paramsFWCFG, nil
 	} else if paramsFWCFG != nil {
 		return paramsFWCFG, nil