m/n/core/cluster: migrate to events and etcd namespaced client

This moves the status of the cluster manager to use a local event
variable. Watchers (like the node startup code) can now use this to get
updates on the state of the node and its cluster membership in a way
that's more abstracted from a sequential startup. This will permit us to
move a lof othe startup code into code common across different node
lifecycle paths.

Test Plan: Refactor, exercised by e2e.

X-Origin-Diff: phab/D757
GitOrigin-RevId: 31a3600ad2aab90a1e7f84d741e7ea40a0422724
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 442102f..4051663 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -150,38 +150,45 @@
 			return fmt.Errorf("when starting enrolment: %w", err)
 		}
 
-		// Wait until the cluster manager settles.
-		node, err := m.Wait()
+		// Wait until the node finds a home in the new cluster.
+		watcher := m.Watch()
+		status, err := watcher.GetHome(ctx)
 		if err != nil {
 			close(trapdoor)
-			return fmt.Errorf("enrolment failed, aborting: %w", err)
+			return fmt.Errorf("new couldn't find home in new cluster, aborting: %w", err)
 		}
 
 		// We are now in a cluster. We can thus access our 'node' object and start all services that
 		// we should be running.
 
 		logger.Info("Enrolment success, continuing startup.")
-		logger.Info(fmt.Sprintf("This node (%s) has roles:", node.String()))
-		if cm := node.ConsensusMember(); cm != nil {
+		logger.Info(fmt.Sprintf("This node (%s) has roles:", status.Node.String()))
+		if cm := status.Node.ConsensusMember(); cm != nil {
 			// There's no need to start anything for when we are a consensus member - the cluster
 			// manager does this for us if necessary (as creating/enrolling/joining a cluster is
 			// pretty tied into cluster lifecycle management).
 			logger.Info(fmt.Sprintf(" - etcd consensus member"))
 		}
-		if kw := node.KubernetesWorker(); kw != nil {
+		if kw := status.Node.KubernetesWorker(); kw != nil {
 			logger.Info(fmt.Sprintf(" - kubernetes worker"))
 		}
 
 		// If we're supposed to be a kubernetes worker, start kubernetes services and containerd.
 		// In the future, this might be split further into kubernetes control plane and data plane
 		// roles.
+		// TODO(q3k): watch on cluster status updates to start/stop kubernetes service.
 		var containerdSvc *containerd.Service
 		var kubeSvc *kubernetes.Service
-		if kw := node.KubernetesWorker(); kw != nil {
+		if kw := status.Node.KubernetesWorker(); kw != nil {
 			logger.Info("Starting Kubernetes worker services...")
 
+			kv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
+			if err != nil {
+				return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
+			}
+
 			// Ensure Kubernetes PKI objects exist in etcd.
-			kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), node.KV)
+			kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kv)
 			if err := kpki.EnsureAll(ctx); err != nil {
 				return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
 			}