m/n/core/cluster: migrate to events and etcd namespaced client
This moves the status of the cluster manager to use a local event
variable. Watchers (like the node startup code) can now use this to get
updates on the state of the node and its cluster membership in a way
that's more abstracted from a sequential startup. This will permit us to
move a lof othe startup code into code common across different node
lifecycle paths.
Test Plan: Refactor, exercised by e2e.
X-Origin-Diff: phab/D757
GitOrigin-RevId: 31a3600ad2aab90a1e7f84d741e7ea40a0422724
diff --git a/metropolis/node/core/consensus/BUILD.bazel b/metropolis/node/core/consensus/BUILD.bazel
index 669985a..dd3de3c 100644
--- a/metropolis/node/core/consensus/BUILD.bazel
+++ b/metropolis/node/core/consensus/BUILD.bazel
@@ -8,10 +8,10 @@
deps = [
"//metropolis/node:go_default_library",
"//metropolis/node/core/consensus/ca:go_default_library",
+ "//metropolis/node/core/consensus/client:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
"@io_etcd_go_etcd//clientv3:go_default_library",
- "@io_etcd_go_etcd//clientv3/namespace:go_default_library",
"@io_etcd_go_etcd//embed:go_default_library",
"@org_uber_go_atomic//:go_default_library",
],
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index 33a352a..683db19 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -39,12 +39,12 @@
"time"
"go.etcd.io/etcd/clientv3"
- "go.etcd.io/etcd/clientv3/namespace"
"go.etcd.io/etcd/embed"
"go.uber.org/atomic"
node "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus/ca"
+ "source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -389,17 +389,20 @@
}
}
-// KV returns and etcd KV client interface to the etcd member/cluster.
-func (s *Service) KV(module, space string) clientv3.KV {
+func (s *Service) Client() client.Namespaced {
s.stateMu.Lock()
defer s.stateMu.Unlock()
- return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
-}
-
-func (s *Service) KVRoot() clientv3.KV {
- s.stateMu.Lock()
- defer s.stateMu.Unlock()
- return s.state.cl.KV
+ // 'namespaced' is the root of all namespaced clients within the etcd K/V
+ // store, with further paths in a colon-separated format, eg.:
+ // namespaced:example/
+ // namespaced:foo:bar:baz/
+ client, err := client.NewLocal(s.state.cl).Sub("namespaced")
+ if err != nil {
+ // This error can only happen due to a malformed path, which is
+ // constant. Thus, this is a programming error and we panic.
+ panic(fmt.Errorf("Could not get consensus etcd client: %v", err))
+ }
+ return client
}
func (s *Service) Cluster() clientv3.Cluster {
diff --git a/metropolis/node/core/consensus/consensus_test.go b/metropolis/node/core/consensus/consensus_test.go
index 8e26535..2671432 100644
--- a/metropolis/node/core/consensus/consensus_test.go
+++ b/metropolis/node/core/consensus/consensus_test.go
@@ -96,7 +96,7 @@
supervisor.New(b.ctx, etcd.Run)
waitEtcd(t, etcd)
- kv := etcd.KV("foo", "bar")
+ kv := etcd.Client()
if _, err := kv.Put(b.ctx, "/foo", "bar"); err != nil {
t.Fatalf("test key creation failed: %v", err)
}
@@ -162,7 +162,7 @@
ctx, ctxC := context.WithCancel(b.ctx)
supervisor.New(ctx, etcd.Run)
waitEtcd(t, etcd)
- kv := etcd.KV("foo", "bar")
+ kv := etcd.Client()
if new {
if _, err := kv.Put(b.ctx, "/foo", "bar"); err != nil {
t.Fatalf("test key creation failed: %v", err)