m/n/core/cluster: migrate to events and etcd namespaced client

This moves the status of the cluster manager to use a local event
variable. Watchers (like the node startup code) can now use this to get
updates on the state of the node and its cluster membership in a way
that's more abstracted from a sequential startup. This will permit us to
move a lof othe startup code into code common across different node
lifecycle paths.

Test Plan: Refactor, exercised by e2e.

X-Origin-Diff: phab/D757
GitOrigin-RevId: 31a3600ad2aab90a1e7f84d741e7ea40a0422724
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index 33a352a..683db19 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -39,12 +39,12 @@
 	"time"
 
 	"go.etcd.io/etcd/clientv3"
-	"go.etcd.io/etcd/clientv3/namespace"
 	"go.etcd.io/etcd/embed"
 	"go.uber.org/atomic"
 
 	node "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/consensus/ca"
+	"source.monogon.dev/metropolis/node/core/consensus/client"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
@@ -389,17 +389,20 @@
 	}
 }
 
-// KV returns and etcd KV client interface to the etcd member/cluster.
-func (s *Service) KV(module, space string) clientv3.KV {
+func (s *Service) Client() client.Namespaced {
 	s.stateMu.Lock()
 	defer s.stateMu.Unlock()
-	return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
-}
-
-func (s *Service) KVRoot() clientv3.KV {
-	s.stateMu.Lock()
-	defer s.stateMu.Unlock()
-	return s.state.cl.KV
+	// 'namespaced' is the root of all namespaced clients within the etcd K/V
+	// store, with further paths in a colon-separated format, eg.:
+	//   namespaced:example/
+	//   namespaced:foo:bar:baz/
+	client, err := client.NewLocal(s.state.cl).Sub("namespaced")
+	if err != nil {
+		// This error can only happen due to a malformed path, which is
+		// constant. Thus, this is a programming error and we panic.
+		panic(fmt.Errorf("Could not get consensus etcd client: %v", err))
+	}
+	return client
 }
 
 func (s *Service) Cluster() clientv3.Cluster {