m/pkg/event: make type-safe
This is a fairly large change which makes use of Go type parameters
(“generics”) to make the event library (and its memory/etcd
implementations) type safe.
Since we now have the event.Value interface strongly typed, we also move
options which were implementation-specific (like BacklogOnly)
to be part of that interface, instead of the previously type-asserted
specific implementations. Use of options that are not handled by a
particular implementation is a runtime error. Expressing this in the
type system is probably not worth the effort.
We also implement Filter to allow offloading some of the functionality previously implemented in type assertion wrappers into the library itself.
In the end, this ends up removing a bunch of type assertion code, at
the cost of a fairly sweeping change. Unfortunately, some of this is due
to IntelliJ suddenly deciding to reformat comments.
Change-Id: I1ca6d93db1b5c4055a21af3fb9e5e3d425c0d86e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1322
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/pkg/event/etcd/etcd_test.go b/metropolis/pkg/event/etcd/etcd_test.go
index 13f6ea8..81aee51 100644
--- a/metropolis/pkg/event/etcd/etcd_test.go
+++ b/metropolis/pkg/event/etcd/etcd_test.go
@@ -52,18 +52,18 @@
// WG after it performs the initial retrieval of a value from etcd, but before
// it starts the watcher. This is used to test potential race conditions
// present between these two steps.
-func setRaceWg(w event.Watcher) *sync.WaitGroup {
+func setRaceWg[T any](w event.Watcher[T]) *sync.WaitGroup {
wg := sync.WaitGroup{}
- w.(*watcher).testRaceWG = &wg
+ w.(*watcher[T]).testRaceWG = &wg
return &wg
}
// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
// thie WG after an etcd watch channel is created. This is used in tests to
// ensure that the watcher is fully created before it is tested.
-func setSetupWg(w event.Watcher) *sync.WaitGroup {
+func setSetupWg[T any](w event.Watcher[T]) *sync.WaitGroup {
wg := sync.WaitGroup{}
- w.(*watcher).testSetupWG = &wg
+ w.(*watcher[T]).testSetupWG = &wg
return &wg
}
@@ -152,7 +152,7 @@
// expect runs a Get on the given Watcher, ensuring the returned value is a
// given string.
-func expect(t *testing.T, w event.Watcher, value string) {
+func expect(t *testing.T, w event.Watcher[StringAt], value string) {
t.Helper()
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
@@ -162,7 +162,7 @@
t.Fatalf("Get: %v", err)
}
- if got, want := string(got.([]byte)), value; got != want {
+ if got, want := got.Value, value; got != want {
t.Errorf("Wanted value %q, got %q", want, got)
}
}
@@ -173,7 +173,7 @@
// blocks for 101 milliseconds). Thus, this function should be used sparingly
// and in tests that perform other baseline behaviour checks alongside this
// test.
-func expectTimeout(t *testing.T, w event.Watcher) {
+func expectTimeout[T any](t *testing.T, w event.Watcher[T]) {
t.Helper()
ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
got, err := w.Get(ctx)
@@ -186,7 +186,7 @@
// wait wraps a watcher into a channel of strings, ensuring that the watcher
// never errors on Get calls and always returns strings.
-func wait(t *testing.T, w event.Watcher) (chan string, func()) {
+func wait(t *testing.T, w event.Watcher[StringAt]) (chan string, func()) {
t.Helper()
ctx, ctxC := context.WithCancel(context.Background())
@@ -201,7 +201,7 @@
if err != nil {
t.Fatalf("Get: %v", err)
}
- c <- string(got.([]byte))
+ c <- got.Value
}
}()
@@ -214,7 +214,7 @@
defer tc.close()
k := "test-simple"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
tc.put(t, k, "one")
watcher := value.Watch()
@@ -231,42 +231,27 @@
q, cancel := wait(t, watcher)
// Test will hang here if the above value does not receive the set "six".
+ log.Printf("a")
for el := range q {
+ log.Printf("%q", el)
if el == "six" {
break
}
}
+ log.Printf("b")
cancel()
}
-// stringAt is a helper type for testing ranged watchers. It's returned by a
-// watcher whose decoder is set to stringDecoder.
-type stringAt struct {
- key, value string
-}
-
-func stringAtDecoder(key, value []byte) (interface{}, error) {
- valueS := ""
- if value != nil {
- valueS = string(value)
- }
- return stringAt{
- key: string(key),
- value: valueS,
- }, nil
-}
-
// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
// the given map with the retrieved value.
-func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher, m map[string]string) {
+func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher[StringAt], m map[string]string) {
t.Helper()
vr, err := w.Get(ctx)
if err != nil {
t.Fatalf("Get: %v", err)
}
- v := vr.(stringAt)
- m[v.key] = v.value
+ m[vr.Key] = vr.Value
}
// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
@@ -277,7 +262,7 @@
ks := "test-simple-range/"
ke := "test-simple-range0"
- value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
tc.put(t, ks+"a", "one")
tc.put(t, ks+"b", "two")
tc.put(t, ks+"c", "three")
@@ -320,7 +305,7 @@
defer tc.close()
k := "test-cancel"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
tc.put(t, k, "one")
watcher := value.Watch()
@@ -355,7 +340,7 @@
defer tc.close()
k := "test-cancel-on-get"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
tc.put(t, k, "one")
@@ -419,7 +404,7 @@
tc.setEndpoints(0)
k := "test-client-reconnect"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
tc.put(t, k, "one")
watcher := value.Watch()
@@ -455,10 +440,10 @@
tcRest.setEndpoints(1, 2)
k := "test-client-partition"
- valueOne := NewValue(tcOne.namespaced, k, NoDecoder)
+ valueOne := NewValue(tcOne.namespaced, k, DecoderStringAt)
watcherOne := valueOne.Watch()
defer watcherOne.Close()
- valueRest := NewValue(tcRest.namespaced, k, NoDecoder)
+ valueRest := NewValue(tcRest.namespaced, k, DecoderStringAt)
watcherRest := valueRest.Watch()
defer watcherRest.Close()
@@ -489,7 +474,7 @@
k := "test-early-use"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -515,7 +500,7 @@
k := "test-remove"
tc.put(t, k, "one")
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -532,7 +517,7 @@
ks := "test-remove-range/"
ke := "test-remove-range0"
- value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
tc.put(t, ks+"a", "one")
tc.put(t, ks+"b", "two")
tc.put(t, ks+"c", "three")
@@ -573,7 +558,7 @@
tc.put(t, k, "one")
tc.remove(t, k)
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -602,13 +587,13 @@
// on Get, but that the watcher continues to work after the error has been
// returned.
func TestDecoder(t *testing.T) {
- decodeStringifiedNumbersDivisibleBy3 := func(_, data []byte) (interface{}, error) {
- num, err := strconv.ParseInt(string(data), 10, 64)
+ decoderDivisibleByThree := func(_, value []byte) (int64, error) {
+ num, err := strconv.ParseInt(string(value), 10, 64)
if err != nil {
- return nil, fmt.Errorf("not a valid number")
+ return 0, fmt.Errorf("not a valid number")
}
if (num % 3) != 0 {
- return nil, fmt.Errorf("not divisible by 3")
+ return 0, fmt.Errorf("not divisible by 3")
}
return num, nil
}
@@ -620,7 +605,7 @@
defer ctxC()
k := "test-decoder"
- value := NewValue(tc.namespaced, k, decodeStringifiedNumbersDivisibleBy3)
+ value := NewValue(tc.namespaced, k, decoderDivisibleByThree)
watcher := value.Watch()
defer watcher.Close()
tc.put(t, k, "3")
@@ -643,7 +628,7 @@
}
} else {
queue <- errOrInt{
- val: res.(int64),
+ val: res,
}
}
}
@@ -686,7 +671,7 @@
defer tc.close()
k := "test-backlog"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -704,8 +689,7 @@
if err != nil {
t.Fatalf("Get() returned error before expected final value: %v", err)
}
- val := string(valB.([]byte))
- if val == "val-999" {
+ if valB.Value == "val-999" {
break
}
}
@@ -719,7 +703,7 @@
ks := "test-backlog-range/"
ke := "test-backlog-range0"
- value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
w := value.Watch()
defer w.Close()
@@ -764,30 +748,30 @@
k := "test-backlog-only"
tc.put(t, k, "initial")
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
- d, err := watcher.Get(ctx, BacklogOnly)
+ d, err := watcher.Get(ctx, event.BacklogOnly[StringAt]())
if err != nil {
t.Fatalf("First Get failed: %v", err)
}
- if want, got := "initial", string(d.([]byte)); want != got {
+ if want, got := "initial", d.Value; want != got {
t.Fatalf("First Get: wanted value %q, got %q", want, got)
}
// As expected, next call to Get with BacklogOnly fails - there truly is no new
// updates to emit.
- _, err = watcher.Get(ctx, BacklogOnly)
- if want, got := BacklogDone, err; want != got {
+ _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
+ if want, got := event.BacklogDone, err; want != got {
t.Fatalf("Second Get: wanted %v, got %v", want, got)
}
// Implementation detail: even though there is a new value ('second'),
// BacklogOnly will still return BacklogDone.
tc.put(t, k, "second")
- _, err = watcher.Get(ctx, BacklogOnly)
- if want, got := BacklogDone, err; want != got {
+ _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
+ if want, got := event.BacklogDone, err; want != got {
t.Fatalf("Third Get: wanted %v, got %v", want, got)
}
@@ -796,7 +780,7 @@
if err != nil {
t.Fatalf("Fourth Get failed: %v", err)
}
- if want, got := "second", string(d.([]byte)); want != got {
+ if want, got := "second", d.Value; want != got {
t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
}
}
@@ -821,7 +805,7 @@
}
}
- value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
w := value.Watch()
defer w.Close()
@@ -829,12 +813,11 @@
res := make(map[string]string)
// Run first Get - this is the barrier defining what's part of the backlog.
- g, err := w.Get(ctx, BacklogOnly)
+ g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
if err != nil {
t.Fatalf("Get: %v", err)
}
- kv := g.(stringAt)
- res[kv.key] = kv.value
+ res[g.Key] = g.Value
// These won't be part of the backlog.
tc.put(t, ks+"a", fmt.Sprintf("val-100"))
@@ -843,16 +826,15 @@
// Retrieve the rest of the backlog until BacklogDone is returned.
nUpdates := 1
for {
- g, err := w.Get(ctx, BacklogOnly)
- if err == BacklogDone {
+ g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
+ if err == event.BacklogDone {
break
}
if err != nil {
t.Fatalf("Get: %v", err)
}
nUpdates += 1
- kv := g.(stringAt)
- res[kv.key] = kv.value
+ res[g.Key] = g.Value
}
// The backlog should've been compacted to just two entries at their newest