metropolis/pkg/(event|pki): remove dependency to consensus/client
We are currently depending on the consensus client, even when we just
need a plain etcd client. This change removes the dependency by adding
a ThinClient interface that wraps KV amd Watcher from clientv3.
Change-Id: I33994ab35ebb3a63c9dda4b588b7f529535e3083
Reviewed-on: https://review.monogon.dev/c/monogon/+/3082
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/metropolis/pkg/event/etcd/BUILD.bazel b/metropolis/pkg/event/etcd/BUILD.bazel
index 37df147..22766ff 100644
--- a/metropolis/pkg/event/etcd/BUILD.bazel
+++ b/metropolis/pkg/event/etcd/BUILD.bazel
@@ -6,7 +6,6 @@
importpath = "source.monogon.dev/metropolis/pkg/event/etcd",
visibility = ["//visibility:public"],
deps = [
- "//metropolis/node/core/consensus/client",
"//metropolis/pkg/event",
"@com_github_cenkalti_backoff_v4//:backoff",
"@io_etcd_go_etcd_client_v3//:client",
@@ -18,7 +17,6 @@
srcs = ["etcd_test.go"],
embed = [":etcd"],
deps = [
- "//metropolis/node/core/consensus/client",
"//metropolis/pkg/event",
"//metropolis/pkg/logtree",
"@io_etcd_go_etcd_api_v3//v3rpc/rpctypes",
diff --git a/metropolis/pkg/event/etcd/etcd.go b/metropolis/pkg/event/etcd/etcd.go
index 71e8056..afcea35 100644
--- a/metropolis/pkg/event/etcd/etcd.go
+++ b/metropolis/pkg/event/etcd/etcd.go
@@ -10,7 +10,6 @@
"github.com/cenkalti/backoff/v4"
clientv3 "go.etcd.io/etcd/client/v3"
- "source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/pkg/event"
)
@@ -22,12 +21,19 @@
_ event.ValueWatch[StringAt] = &Value[StringAt]{}
)
+// ThinClient is a small wrapper interface to combine
+// clientv3.KV and clientv3.Watcher.
+type ThinClient interface {
+ clientv3.KV
+ clientv3.Watcher
+}
+
// Value is an 'Event Value' backed in an etcd cluster, accessed over an
// etcd client. This is a stateless handle and can be copied and shared across
// goroutines.
type Value[T any] struct {
decoder func(key, value []byte) (T, error)
- etcd client.Namespaced
+ etcd ThinClient
key string
keyEnd string
}
@@ -67,7 +73,7 @@
// NewValue creates a new Value for a given key(s) in an etcd client. The
// given decoder will be used to convert bytes retrieved from etcd into the
// interface{} value retrieved by Get by this value's watcher.
-func NewValue[T any](etcd client.Namespaced, key string, decoder func(key, value []byte) (T, error), options ...*Option) *Value[T] {
+func NewValue[T any](etcd ThinClient, key string, decoder func(key, value []byte) (T, error), options ...*Option) *Value[T] {
res := &Value[T]{
decoder: decoder,
etcd: etcd,
diff --git a/metropolis/pkg/event/etcd/etcd_test.go b/metropolis/pkg/event/etcd/etcd_test.go
index f1eb5ed..c6d50b1 100644
--- a/metropolis/pkg/event/etcd/etcd_test.go
+++ b/metropolis/pkg/event/etcd/etcd_test.go
@@ -20,7 +20,6 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
- "source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/logtree"
)
@@ -81,7 +80,6 @@
// testClient is an etcd connection to the test cluster.
type testClient struct {
client *clientv3.Client
- namespaced client.Namespaced
}
func newTestClient(t *testing.T) *testClient {
@@ -96,10 +94,8 @@
t.Fatalf("clientv3.New: %v", err)
}
- namespaced := client.NewLocal(cli)
return &testClient{
client: cli,
- namespaced: namespaced,
}
}
@@ -126,7 +122,7 @@
for {
ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond)
- _, err := d.namespaced.Put(ctxT, key, value)
+ _, err := d.client.Put(ctxT, key, value)
ctxC()
if err == nil {
return
@@ -154,7 +150,7 @@
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
- _, err := d.namespaced.Delete(ctx, key)
+ _, err := d.client.Delete(ctx, key)
if err == nil {
return
}
@@ -227,7 +223,7 @@
defer tc.close()
k := "test-simple"
- value := NewValue(tc.namespaced, k, DecoderStringAt)
+ value := NewValue(tc.client, k, DecoderStringAt)
tc.put(t, k, "one")
watcher := value.Watch()
@@ -275,7 +271,7 @@
ks := "test-simple-range/"
ke := "test-simple-range0"
- value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
+ value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
tc.put(t, ks+"a", "one")
tc.put(t, ks+"b", "two")
tc.put(t, ks+"c", "three")
@@ -318,7 +314,7 @@
defer tc.close()
k := "test-cancel"
- value := NewValue(tc.namespaced, k, DecoderStringAt)
+ value := NewValue(tc.client, k, DecoderStringAt)
tc.put(t, k, "one")
watcher := value.Watch()
@@ -353,7 +349,7 @@
defer tc.close()
k := "test-cancel-on-get"
- value := NewValue(tc.namespaced, k, DecoderStringAt)
+ value := NewValue(tc.client, k, DecoderStringAt)
watcher := value.Watch()
tc.put(t, k, "one")
@@ -417,7 +413,7 @@
tc.setEndpoints(0)
k := "test-client-reconnect"
- value := NewValue(tc.namespaced, k, DecoderStringAt)
+ value := NewValue(tc.client, k, DecoderStringAt)
tc.put(t, k, "one")
watcher := value.Watch()
@@ -453,10 +449,10 @@
tcRest.setEndpoints(1, 2)
k := "test-client-partition"
- valueOne := NewValue(tcOne.namespaced, k, DecoderStringAt)
+ valueOne := NewValue(tcOne.client, k, DecoderStringAt)
watcherOne := valueOne.Watch()
defer watcherOne.Close()
- valueRest := NewValue(tcRest.namespaced, k, DecoderStringAt)
+ valueRest := NewValue(tcRest.client, k, DecoderStringAt)
watcherRest := valueRest.Watch()
defer watcherRest.Close()
@@ -487,7 +483,7 @@
k := "test-early-use"
- value := NewValue(tc.namespaced, k, DecoderStringAt)
+ value := NewValue(tc.client, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -513,7 +509,7 @@
k := "test-remove"
tc.put(t, k, "one")
- value := NewValue(tc.namespaced, k, DecoderStringAt)
+ value := NewValue(tc.client, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -530,7 +526,7 @@
ks := "test-remove-range/"
ke := "test-remove-range0"
- value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
+ value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
tc.put(t, ks+"a", "one")
tc.put(t, ks+"b", "two")
tc.put(t, ks+"c", "three")
@@ -571,7 +567,7 @@
tc.put(t, k, "one")
tc.remove(t, k)
- value := NewValue(tc.namespaced, k, DecoderStringAt)
+ value := NewValue(tc.client, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -618,7 +614,7 @@
defer ctxC()
k := "test-decoder"
- value := NewValue(tc.namespaced, k, decoderDivisibleByThree)
+ value := NewValue(tc.client, k, decoderDivisibleByThree)
watcher := value.Watch()
defer watcher.Close()
tc.put(t, k, "3")
@@ -684,7 +680,7 @@
defer tc.close()
k := "test-backlog"
- value := NewValue(tc.namespaced, k, DecoderStringAt)
+ value := NewValue(tc.client, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -716,7 +712,7 @@
ks := "test-backlog-range/"
ke := "test-backlog-range0"
- value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
+ value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
w := value.Watch()
defer w.Close()
@@ -761,7 +757,7 @@
k := "test-backlog-only"
tc.put(t, k, "initial")
- value := NewValue(tc.namespaced, k, DecoderStringAt)
+ value := NewValue(tc.client, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -818,7 +814,7 @@
}
}
- value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
+ value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
w := value.Watch()
defer w.Close()
diff --git a/metropolis/pkg/pki/BUILD.bazel b/metropolis/pkg/pki/BUILD.bazel
index 97e7fb1..d122ef4 100644
--- a/metropolis/pkg/pki/BUILD.bazel
+++ b/metropolis/pkg/pki/BUILD.bazel
@@ -11,7 +11,6 @@
importpath = "source.monogon.dev/metropolis/pkg/pki",
visibility = ["//visibility:public"],
deps = [
- "//metropolis/node/core/consensus/client",
"//metropolis/pkg/event",
"//metropolis/pkg/event/etcd",
"//metropolis/pkg/fileargs",
@@ -27,7 +26,6 @@
],
embed = [":pki"],
deps = [
- "//metropolis/node/core/consensus/client",
"//metropolis/pkg/logtree",
"@io_etcd_go_etcd_client_pkg_v3//testutil",
"@io_etcd_go_etcd_tests_v3//integration",
diff --git a/metropolis/pkg/pki/crl.go b/metropolis/pkg/pki/crl.go
index 23838a1..f72fb2d 100644
--- a/metropolis/pkg/pki/crl.go
+++ b/metropolis/pkg/pki/crl.go
@@ -11,7 +11,6 @@
clientv3 "go.etcd.io/etcd/client/v3"
- "source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/etcd"
)
@@ -145,7 +144,7 @@
// WatchCRL returns and Event Value compatible CRLWatcher which can be used to
// retrieve and watch for the newest CRL available from this CA certificate.
-func (c *Certificate) WatchCRL(cl client.Namespaced) event.Watcher[*CRL] {
+func (c *Certificate) WatchCRL(cl etcd.ThinClient) event.Watcher[*CRL] {
value := etcd.NewValue(cl, c.crlPath(), func(_, data []byte) (*CRL, error) {
crl, err := x509.ParseCRL(data)
if err != nil {
diff --git a/metropolis/pkg/pki/crl_test.go b/metropolis/pkg/pki/crl_test.go
index 8f9cf5d..e47eab9 100644
--- a/metropolis/pkg/pki/crl_test.go
+++ b/metropolis/pkg/pki/crl_test.go
@@ -7,8 +7,6 @@
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/integration"
-
- "source.monogon.dev/metropolis/node/core/consensus/client"
)
// TestRevoke exercises the CRL revocation and watching functionality of a CA
@@ -19,7 +17,7 @@
cluster := integration.NewClusterV3(tb, &integration.ClusterConfig{
Size: 1,
})
- cl := client.NewLocal(cluster.Client(0))
+ cl := cluster.Client(0)
defer cluster.Terminate(tb)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()