m/n/core/cluster: migrate to events and etcd namespaced client

This moves the status of the cluster manager to use a local event
variable. Watchers (like the node startup code) can now use this to get
updates on the state of the node and its cluster membership in a way
that's more abstracted from a sequential startup. This will permit us to
move a lof othe startup code into code common across different node
lifecycle paths.

Test Plan: Refactor, exercised by e2e.

X-Origin-Diff: phab/D757
GitOrigin-RevId: 31a3600ad2aab90a1e7f84d741e7ea40a0422724
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index c6e01e5..a3e2593 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -13,8 +13,10 @@
     visibility = ["//metropolis/node/core:__subpackages__"],
     deps = [
         "//metropolis/node/core/consensus:go_default_library",
+        "//metropolis/node/core/consensus/client:go_default_library",
         "//metropolis/node/core/localstorage:go_default_library",
         "//metropolis/node/core/network:go_default_library",
+        "//metropolis/pkg/event:go_default_library",
         "//metropolis/pkg/pki:go_default_library",
         "//metropolis/pkg/supervisor:go_default_library",
         "//metropolis/proto/api:go_default_library",
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index 1277c2a..b194f25 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -22,20 +22,44 @@
 	"source.monogon.dev/metropolis/pkg/pki"
 )
 
+// ClusterState is the state of the cluster from the point of view of the
+// current node. Clients within the node code can watch this state to change
+// their behaviour as needed.
 type ClusterState int
 
 const (
+	// ClusterStateUnknown means the node has not yet determined the existence
+	// of a cluster it should join or start. This is a transient, initial state
+	// that should only manifest during boot.
 	ClusterUnknown ClusterState = iota
+	// ClusterForeign means the node is attempting to register into an already
+	// existing cluster with which it managed to make preliminary contact, but
+	// which the cluster has not yet fully productionized (eg. the node is
+	// still being hardware attested, or the operator needs to confirm the
+	// registration of this node).
 	ClusterForeign
+	// ClusterTrusted means the node is attempting to register into an already
+	// registered cluster, and has been trusted by it. The node is now
+	// attempting to finally commit into registering the cluster.
 	ClusterTrusted
+	// ClusterHome means the node is part of a cluster. This is the bulk of
+	// time in which this node will spend its time.
 	ClusterHome
+	// ClusterDisowning means the node has been disowned (ie., removed) by the
+	// cluster, and that it will not be ever part of any cluster again, and
+	// that it will be decommissioned by the operator.
 	ClusterDisowning
+	// ClusterSplit means that the node would usually be Home in a cluster, but
+	// has been split from the consensus of the cluster. This can happen for
+	// nodes running consensus when consensus is lost (eg. when there is no
+	// quorum or this node has been netsplit), and for other nodes if they have
+	// lost network connectivity to the consensus nodes. Clients should make
+	// their own decision what action to perform in this state, depending on
+	// the level of consistency required and whether it makes sense for the
+	// node to fence its services off.
+	ClusterSplit
 )
 
-type Cluster struct {
-	State ClusterState
-}
-
 func (s ClusterState) String() string {
 	switch s {
 	case ClusterForeign:
@@ -46,17 +70,12 @@
 		return "ClusterHome"
 	case ClusterDisowning:
 		return "ClusterDisowning"
+	case ClusterSplit:
+		return "ClusterSplit"
 	}
 	return fmt.Sprintf("Invalid(%d)", s)
 }
 
-var clusterStateTransitions = map[ClusterState][]ClusterState{
-	ClusterUnknown: {ClusterForeign, ClusterHome, ClusterDisowning},
-	ClusterForeign: {ClusterTrusted},
-	ClusterTrusted: {ClusterHome},
-	ClusterHome:    {ClusterHome, ClusterDisowning},
-}
-
 var (
 	PKINamespace = pki.Namespaced("/cluster-pki/")
 	PKICA        = PKINamespace.New(pki.SelfSigned, "cluster-ca", pki.CA("Metropolis Cluster CA"))
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
index 65af212..874d3ae 100644
--- a/metropolis/node/core/cluster/manager.go
+++ b/metropolis/node/core/cluster/manager.go
@@ -26,16 +26,60 @@
 	"google.golang.org/protobuf/proto"
 
 	"source.monogon.dev/metropolis/node/core/consensus"
+	"source.monogon.dev/metropolis/node/core/consensus/client"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	apb "source.monogon.dev/metropolis/proto/api"
 	ppb "source.monogon.dev/metropolis/proto/private"
 )
 
-type managerResult struct {
-	node *Node
-	err  error
+// Status is returned to Cluster clients (ie., node code) on Manager.Watch/.Get.
+type Status struct {
+	// State is the current state of the cluster, as seen by the node.
+	State ClusterState
+	// Node is the configuration of this node in the cluster.
+	Node *Node
+
+	consensusClient client.Namespaced
+}
+
+// ConsensusUser is the to-level user of an etcd client in Metropolis node
+// code. These need to be defined ahead of time in an Go 'enum', and different
+// ConsensusUsers should not be shared by different codepaths.
+type ConsensusUser string
+
+const (
+	ConsensusUserKubernetesPKI ConsensusUser = "kubernetes-pki"
+)
+
+// ConsensusClient returns an etcd/consensus client for a given ConsensusUser.
+func (s *Status) ConsensusClient(user ConsensusUser) (client.Namespaced, error) {
+	// Ensure that we already are connected to etcd and are in a state in which we
+	// should be handing out cluster connectivity.
+	if s.consensusClient == nil {
+		return nil, fmt.Errorf("not connected")
+	}
+	switch s.State {
+	case ClusterHome:
+	case ClusterSplit:
+		return nil, fmt.Errorf("refusing connection with cluster state %v", s.State)
+	default:
+	}
+
+	// Ensure only defined 'applications' are used to prevent programmer error and
+	// casting to ConsensusUser from an arbitrary string.
+	switch user {
+	case ConsensusUserKubernetesPKI:
+	default:
+		return nil, fmt.Errorf("unknown ConsensusUser %q", user)
+	}
+	client, err := s.consensusClient.Sub(string(user))
+	if err != nil {
+		return nil, fmt.Errorf("retrieving subclient failed: %w", err)
+	}
+	return client, nil
 }
 
 type state struct {
@@ -46,27 +90,49 @@
 	stateNode    ppb.Node_FSMState
 
 	configuration *ppb.SealedConfiguration
-
-	result  *managerResult
-	waiters []chan *managerResult
 }
 
-func (s *state) setResult(node *Node, err error) {
-	s.result = &managerResult{
-		node: node,
-		err:  err,
+type Watcher struct {
+	event.Watcher
+}
+
+func (w *Watcher) Get(ctx context.Context) (*Status, error) {
+	val, err := w.Watcher.Get(ctx)
+	if err != nil {
+		return nil, err
 	}
-	for _, w := range s.waiters {
-		go func(c chan *managerResult) {
-			c <- s.result
-		}(w)
+	status := val.(Status)
+	return &status, err
+}
+
+// GetHome waits until the cluster, from the point of view of this node, is in
+// the ClusterHome state. This can be used to wait for the cluster manager to
+// 'settle', before clients start more node services.
+func (w *Watcher) GetHome(ctx context.Context) (*Status, error) {
+	for {
+		status, err := w.Get(ctx)
+		if err != nil {
+			return nil, err
+		}
+		switch status.State {
+		case ClusterHome:
+			return status, nil
+		case ClusterDisowning:
+			return nil, fmt.Errorf("the cluster has disowned this node")
+		}
 	}
-	s.waiters = nil
+}
+
+func (m *Manager) Watch() Watcher {
+	return Watcher{
+		Watcher: m.status.Watch(),
+	}
 }
 
 type Manager struct {
 	storageRoot    *localstorage.Root
 	networkService *network.Service
+	status         event.MemoryValue
 
 	state
 
@@ -100,21 +166,6 @@
 	return &m.state, m.mu.RUnlock
 }
 
-func (m *Manager) Wait() (*Node, error) {
-	state, unlock := m.lock()
-
-	if state.result != nil {
-		unlock()
-		return state.result.node, state.result.err
-	}
-
-	c := make(chan *managerResult)
-	state.waiters = append(state.waiters, c)
-	unlock()
-	res := <-c
-	return res.node, res.err
-}
-
 // Run is the runnable of the Manager, to be started using the Supervisor. It
 // is one-shot, and should not be restarted.
 func (m *Manager) Run(ctx context.Context) error {
@@ -194,7 +245,7 @@
 		return nil, fmt.Errorf("could not find node parameters in ESP or qemu fwcfg")
 	}
 	if paramsFWCFG != nil && paramsESP != nil {
-		supervisor.Logger(ctx).Warningf("Node parameters found both inboth ESP and qemu fwcfg, using the latter")
+		supervisor.Logger(ctx).Warningf("Node parameters found both in both ESP and qemu fwcfg, using the latter")
 		return paramsFWCFG, nil
 	} else if paramsFWCFG != nil {
 		return paramsFWCFG, nil
diff --git a/metropolis/node/core/cluster/manager_bootstrap.go b/metropolis/node/core/cluster/manager_bootstrap.go
index e6f94a2..b80bed6 100644
--- a/metropolis/node/core/cluster/manager_bootstrap.go
+++ b/metropolis/node/core/cluster/manager_bootstrap.go
@@ -96,17 +96,27 @@
 	}
 	supervisor.Logger(ctx).Info("Bootstrapping: consensus ready.")
 
-	kv := m.consensus.KVRoot()
-	node.KV = kv
+	nodesKV, err := m.consensus.Client().Sub("nodes")
+	if err != nil {
+		return fmt.Errorf("when retrieving nodes etcd subclient: %w", err)
+	}
+	pkiKV, err := m.consensus.Client().Sub("pki")
+	if err != nil {
+		return fmt.Errorf("when retrieving pki etcd subclient: %w", err)
+	}
+	applicationKV, err := m.consensus.Client().Sub("application")
+	if err != nil {
+		return fmt.Errorf("when retrieving application etcd subclient: %w", err)
+	}
 
 	// Create Metropolis CA and this node's certificate.
-	caCertBytes, _, err := PKICA.Ensure(ctx, kv)
+	caCertBytes, _, err := PKICA.Ensure(ctx, pkiKV)
 	if err != nil {
 		return fmt.Errorf("failed to create cluster CA: %w", err)
 	}
 	nodeCert := PKINamespace.New(PKICA, "", pki.Server([]string{node.ID()}, nil))
 	nodeCert.UseExistingKey(priv)
-	nodeCertBytes, _, err := nodeCert.Ensure(ctx, kv)
+	nodeCertBytes, _, err := nodeCert.Ensure(ctx, pkiKV)
 	if err != nil {
 		return fmt.Errorf("failed to create node certificate: %w", err)
 	}
@@ -121,15 +131,18 @@
 		return fmt.Errorf("failed to write node private key: %w", err)
 	}
 
-	// Update our Node obejct in etcd.
-	if err := node.Store(ctx, kv); err != nil {
+	// Update our Node object in etcd.
+	if err := node.Store(ctx, nodesKV); err != nil {
 		return fmt.Errorf("failed to store new node in etcd: %w", err)
 	}
 
-	state.setResult(&node, nil)
+	m.status.Set(Status{
+		State:           ClusterHome,
+		Node:            &node,
+		consensusClient: applicationKV,
+	})
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 	supervisor.Signal(ctx, supervisor.SignalDone)
 	return nil
 }
-
diff --git a/metropolis/node/core/cluster/node.go b/metropolis/node/core/cluster/node.go
index a38da8b..1d6e73d 100644
--- a/metropolis/node/core/cluster/node.go
+++ b/metropolis/node/core/cluster/node.go
@@ -57,10 +57,6 @@
 	// representing the lack of a role.
 	consensusMember  *NodeRoleConsensusMember
 	kubernetesWorker *NodeRoleKubernetesWorker
-
-	// At runtime, this represents an etcd client to the consensus cluster. This
-	// is used by applications (like Kubernetes).
-	KV clientv3.KV
 }
 
 // NodeRoleConsensusMember defines that the Node is a consensus (etcd) cluster
diff --git a/metropolis/node/core/consensus/BUILD.bazel b/metropolis/node/core/consensus/BUILD.bazel
index 669985a..dd3de3c 100644
--- a/metropolis/node/core/consensus/BUILD.bazel
+++ b/metropolis/node/core/consensus/BUILD.bazel
@@ -8,10 +8,10 @@
     deps = [
         "//metropolis/node:go_default_library",
         "//metropolis/node/core/consensus/ca:go_default_library",
+        "//metropolis/node/core/consensus/client:go_default_library",
         "//metropolis/node/core/localstorage:go_default_library",
         "//metropolis/pkg/supervisor:go_default_library",
         "@io_etcd_go_etcd//clientv3:go_default_library",
-        "@io_etcd_go_etcd//clientv3/namespace:go_default_library",
         "@io_etcd_go_etcd//embed:go_default_library",
         "@org_uber_go_atomic//:go_default_library",
     ],
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index 33a352a..683db19 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -39,12 +39,12 @@
 	"time"
 
 	"go.etcd.io/etcd/clientv3"
-	"go.etcd.io/etcd/clientv3/namespace"
 	"go.etcd.io/etcd/embed"
 	"go.uber.org/atomic"
 
 	node "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/consensus/ca"
+	"source.monogon.dev/metropolis/node/core/consensus/client"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
@@ -389,17 +389,20 @@
 	}
 }
 
-// KV returns and etcd KV client interface to the etcd member/cluster.
-func (s *Service) KV(module, space string) clientv3.KV {
+func (s *Service) Client() client.Namespaced {
 	s.stateMu.Lock()
 	defer s.stateMu.Unlock()
-	return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
-}
-
-func (s *Service) KVRoot() clientv3.KV {
-	s.stateMu.Lock()
-	defer s.stateMu.Unlock()
-	return s.state.cl.KV
+	// 'namespaced' is the root of all namespaced clients within the etcd K/V
+	// store, with further paths in a colon-separated format, eg.:
+	//   namespaced:example/
+	//   namespaced:foo:bar:baz/
+	client, err := client.NewLocal(s.state.cl).Sub("namespaced")
+	if err != nil {
+		// This error can only happen due to a malformed path, which is
+		// constant. Thus, this is a programming error and we panic.
+		panic(fmt.Errorf("Could not get consensus etcd client: %v", err))
+	}
+	return client
 }
 
 func (s *Service) Cluster() clientv3.Cluster {
diff --git a/metropolis/node/core/consensus/consensus_test.go b/metropolis/node/core/consensus/consensus_test.go
index 8e26535..2671432 100644
--- a/metropolis/node/core/consensus/consensus_test.go
+++ b/metropolis/node/core/consensus/consensus_test.go
@@ -96,7 +96,7 @@
 	supervisor.New(b.ctx, etcd.Run)
 	waitEtcd(t, etcd)
 
-	kv := etcd.KV("foo", "bar")
+	kv := etcd.Client()
 	if _, err := kv.Put(b.ctx, "/foo", "bar"); err != nil {
 		t.Fatalf("test key creation failed: %v", err)
 	}
@@ -162,7 +162,7 @@
 		ctx, ctxC := context.WithCancel(b.ctx)
 		supervisor.New(ctx, etcd.Run)
 		waitEtcd(t, etcd)
-		kv := etcd.KV("foo", "bar")
+		kv := etcd.Client()
 		if new {
 			if _, err := kv.Put(b.ctx, "/foo", "bar"); err != nil {
 				t.Fatalf("test key creation failed: %v", err)
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 442102f..4051663 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -150,38 +150,45 @@
 			return fmt.Errorf("when starting enrolment: %w", err)
 		}
 
-		// Wait until the cluster manager settles.
-		node, err := m.Wait()
+		// Wait until the node finds a home in the new cluster.
+		watcher := m.Watch()
+		status, err := watcher.GetHome(ctx)
 		if err != nil {
 			close(trapdoor)
-			return fmt.Errorf("enrolment failed, aborting: %w", err)
+			return fmt.Errorf("new couldn't find home in new cluster, aborting: %w", err)
 		}
 
 		// We are now in a cluster. We can thus access our 'node' object and start all services that
 		// we should be running.
 
 		logger.Info("Enrolment success, continuing startup.")
-		logger.Info(fmt.Sprintf("This node (%s) has roles:", node.String()))
-		if cm := node.ConsensusMember(); cm != nil {
+		logger.Info(fmt.Sprintf("This node (%s) has roles:", status.Node.String()))
+		if cm := status.Node.ConsensusMember(); cm != nil {
 			// There's no need to start anything for when we are a consensus member - the cluster
 			// manager does this for us if necessary (as creating/enrolling/joining a cluster is
 			// pretty tied into cluster lifecycle management).
 			logger.Info(fmt.Sprintf(" - etcd consensus member"))
 		}
-		if kw := node.KubernetesWorker(); kw != nil {
+		if kw := status.Node.KubernetesWorker(); kw != nil {
 			logger.Info(fmt.Sprintf(" - kubernetes worker"))
 		}
 
 		// If we're supposed to be a kubernetes worker, start kubernetes services and containerd.
 		// In the future, this might be split further into kubernetes control plane and data plane
 		// roles.
+		// TODO(q3k): watch on cluster status updates to start/stop kubernetes service.
 		var containerdSvc *containerd.Service
 		var kubeSvc *kubernetes.Service
-		if kw := node.KubernetesWorker(); kw != nil {
+		if kw := status.Node.KubernetesWorker(); kw != nil {
 			logger.Info("Starting Kubernetes worker services...")
 
+			kv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
+			if err != nil {
+				return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
+			}
+
 			// Ensure Kubernetes PKI objects exist in etcd.
-			kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), node.KV)
+			kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kv)
 			if err := kpki.EnsureAll(ctx); err != nil {
 				return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
 			}