m/n/core/cluster: migrate to events and etcd namespaced client
This moves the status of the cluster manager to use a local event
variable. Watchers (like the node startup code) can now use this to get
updates on the state of the node and its cluster membership in a way
that's more abstracted from a sequential startup. This will permit us to
move a lof othe startup code into code common across different node
lifecycle paths.
Test Plan: Refactor, exercised by e2e.
X-Origin-Diff: phab/D757
GitOrigin-RevId: 31a3600ad2aab90a1e7f84d741e7ea40a0422724
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index 33a352a..683db19 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -39,12 +39,12 @@
"time"
"go.etcd.io/etcd/clientv3"
- "go.etcd.io/etcd/clientv3/namespace"
"go.etcd.io/etcd/embed"
"go.uber.org/atomic"
node "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus/ca"
+ "source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -389,17 +389,20 @@
}
}
-// KV returns and etcd KV client interface to the etcd member/cluster.
-func (s *Service) KV(module, space string) clientv3.KV {
+func (s *Service) Client() client.Namespaced {
s.stateMu.Lock()
defer s.stateMu.Unlock()
- return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
-}
-
-func (s *Service) KVRoot() clientv3.KV {
- s.stateMu.Lock()
- defer s.stateMu.Unlock()
- return s.state.cl.KV
+ // 'namespaced' is the root of all namespaced clients within the etcd K/V
+ // store, with further paths in a colon-separated format, eg.:
+ // namespaced:example/
+ // namespaced:foo:bar:baz/
+ client, err := client.NewLocal(s.state.cl).Sub("namespaced")
+ if err != nil {
+ // This error can only happen due to a malformed path, which is
+ // constant. Thus, this is a programming error and we panic.
+ panic(fmt.Errorf("Could not get consensus etcd client: %v", err))
+ }
+ return client
}
func (s *Service) Cluster() clientv3.Cluster {