m/pkg/event/etcd: implement ranged watchers

This adds a new mode of operation to etcd Values/Watchers in which a
range of etcd keys is watched for updates instead of a single key.

This allows the implementation of watching a collection of objects
stored in etcd for updates, eg. the node state in the Curator.

This has been implemented within the existing API of Event Values, which
is likely the biggest contention point of this change. An alternative
would be to design a separate API for multi-value use, but this should
allow us to more easily integrate with the existing code. We make use of
Go's options-as-varargs paradigm to not break any existing use of this
codebase.

Some behaviour of the Get() operation in ranged context is left
underdefined, but none of the expected users of this codebase are
expected to depend on this. Once the dust settles a bit, we can attempt
to formalize this more strongly.

Change-Id: I8f84d74332765e52b9bbec04b626d00f05c23071
Reviewed-on: https://review.monogon.dev/c/monogon/+/419
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/pkg/event/etcd/etcd_test.go b/metropolis/pkg/event/etcd/etcd_test.go
index 79d9875..4b620c0 100644
--- a/metropolis/pkg/event/etcd/etcd_test.go
+++ b/metropolis/pkg/event/etcd/etcd_test.go
@@ -233,6 +233,81 @@
 	cancel()
 }
 
+// stringAt is a helper type for testing ranged watchers. It's returned by a
+// watcher whose decoder is set to stringDecoder.
+type stringAt struct {
+	key, value string
+}
+
+func stringAtDecoder(key, value []byte) (interface{}, error) {
+	valueS := ""
+	if value != nil {
+		valueS = string(value)
+	}
+	return stringAt{
+		key:   string(key),
+		value: valueS,
+	}, nil
+}
+
+// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
+// the given map with the retrieved value.
+func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher, m map[string]string) {
+	t.Helper()
+
+	vr, err := w.Get(ctx)
+	if err != nil {
+		t.Fatalf("Get: %v", err)
+	}
+	v := vr.(stringAt)
+	m[v.key] = v.value
+}
+
+// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
+// retrieving updaates via Get in a fully blocking fashion.
+func TestSimpleRange(t *testing.T) {
+	tc := newTestClient(t)
+	defer tc.close()
+
+	ks := "test-simple-range/"
+	ke := "test-simple-range0"
+	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	tc.put(t, ks+"a", "one")
+	tc.put(t, ks+"b", "two")
+	tc.put(t, ks+"c", "three")
+	tc.put(t, ks+"b", "four")
+
+	w := value.Watch()
+	defer w.Close()
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	res := make(map[string]string)
+	stringAtGet(ctx, t, w, res)
+	stringAtGet(ctx, t, w, res)
+	stringAtGet(ctx, t, w, res)
+
+	tc.put(t, ks+"a", "five")
+	tc.put(t, ks+"e", "six")
+
+	stringAtGet(ctx, t, w, res)
+	stringAtGet(ctx, t, w, res)
+
+	for _, te := range []struct {
+		k, w string
+	}{
+		{ks + "a", "five"},
+		{ks + "b", "four"},
+		{ks + "c", "three"},
+		{ks + "e", "six"},
+	} {
+		if want, got := te.w, res[te.k]; want != got {
+			t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
+		}
+	}
+}
+
 // TestCancel ensures that watchers can resume after being canceled.
 func TestCancel(t *testing.T) {
 	tc := newTestClient(t)
@@ -433,6 +508,44 @@
 	expect(t, watcher, "")
 }
 
+// TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a
+// value is removed.
+func TestRemoveRange(t *testing.T) {
+	tc := newTestClient(t)
+	defer tc.close()
+
+	ks := "test-remove-range/"
+	ke := "test-remove-range0"
+	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	tc.put(t, ks+"a", "one")
+	tc.put(t, ks+"b", "two")
+	tc.put(t, ks+"c", "three")
+	tc.put(t, ks+"b", "four")
+	tc.remove(t, ks+"c")
+
+	w := value.Watch()
+	defer w.Close()
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	res := make(map[string]string)
+	stringAtGet(ctx, t, w, res)
+	stringAtGet(ctx, t, w, res)
+
+	for _, te := range []struct {
+		k, w string
+	}{
+		{ks + "a", "one"},
+		{ks + "b", "four"},
+		{ks + "c", ""},
+	} {
+		if want, got := te.w, res[te.k]; want != got {
+			t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
+		}
+	}
+}
+
 // TestEmptyRace forces the watcher to retrieve an empty value from the K/V
 // store at first, and establishing the watch channel after a new value has
 // been stored in the same place.
@@ -473,7 +586,7 @@
 // on Get, but that the watcher continues to work after the error has been
 // returned.
 func TestDecoder(t *testing.T) {
-	decodeStringifiedNumbersDivisibleBy3 := func(data []byte) (interface{}, error) {
+	decodeStringifiedNumbersDivisibleBy3 := func(_, data []byte) (interface{}, error) {
 		num, err := strconv.ParseInt(string(data), 10, 64)
 		if err != nil {
 			return nil, fmt.Errorf("not a valid number")
@@ -582,3 +695,165 @@
 		}
 	}
 }
+
+// TestBacklogRange ensures that the ranged etcd watcher can handle a large
+// backlog of changes in etcd that the client didn't keep up with.
+func TestBacklogRange(t *testing.T) {
+	tc := newTestClient(t)
+	defer tc.close()
+
+	ks := "test-backlog-range/"
+	ke := "test-backlog-range0"
+	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	w := value.Watch()
+	defer w.Close()
+
+	for i := 0; i < 100; i++ {
+		if i%2 == 0 {
+			tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
+		} else {
+			tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
+		}
+	}
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	res := make(map[string]string)
+	stringAtGet(ctx, t, w, res)
+	stringAtGet(ctx, t, w, res)
+
+	for _, te := range []struct {
+		k, w string
+	}{
+		{ks + "a", "val-98"},
+		{ks + "b", "val-99"},
+	} {
+		if want, got := te.w, res[te.k]; want != got {
+			t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
+		}
+	}
+}
+
+// TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers,
+// which effectively makes any Get operation non-blocking (but also showcases
+// that unless a Get without BacklogOnly is issues, no new data will appear by
+// itself in the watcher - which is an undocumented implementation detail of the
+// option).
+func TestBacklogOnly(t *testing.T) {
+	tc := newTestClient(t)
+	defer tc.close()
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	k := "test-backlog-only"
+	tc.put(t, k, "initial")
+
+	value := NewValue(tc.namespaced, k, NoDecoder)
+	watcher := value.Watch()
+	defer watcher.Close()
+
+	d, err := watcher.Get(ctx, BacklogOnly)
+	if err != nil {
+		t.Fatalf("First Get failed: %v", err)
+	}
+	if want, got := "initial", string(d.([]byte)); want != got {
+		t.Fatalf("First Get: wanted value %q, got %q", want, got)
+	}
+
+	// As expected, next call to Get with BacklogOnly fails - there truly is no new
+	// updates to emit.
+	_, err = watcher.Get(ctx, BacklogOnly)
+	if want, got := BacklogDone, err; want != got {
+		t.Fatalf("Second Get: wanted %v, got %v", want, got)
+	}
+
+	// Implementation detail: even though there is a new value ('second'),
+	// BacklogOnly will still return BacklogDone.
+	tc.put(t, k, "second")
+	_, err = watcher.Get(ctx, BacklogOnly)
+	if want, got := BacklogDone, err; want != got {
+		t.Fatalf("Third Get: wanted %v, got %v", want, got)
+	}
+
+	// ... However, a Get  without BacklogOnly will return the new value.
+	d, err = watcher.Get(ctx)
+	if err != nil {
+		t.Fatalf("Fourth Get failed: %v", err)
+	}
+	if want, got := "second", string(d.([]byte)); want != got {
+		t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
+	}
+}
+
+// TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers,
+// showcasing how it expected to be used for keeping up with the external state
+// of a range by synchronizing to a local map.
+func TestBacklogOnlyRange(t *testing.T) {
+	tc := newTestClient(t)
+	defer tc.close()
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	ks := "test-backlog-only-range/"
+	ke := "test-backlog-only-range0"
+
+	for i := 0; i < 100; i++ {
+		if i%2 == 0 {
+			tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
+		} else {
+			tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
+		}
+	}
+
+	value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+	w := value.Watch()
+	defer w.Close()
+
+	// Collect results into a map from key to value.
+	res := make(map[string]string)
+
+	// Run first Get - this is the barrier defining what's part of the backlog.
+	g, err := w.Get(ctx, BacklogOnly)
+	if err != nil {
+		t.Fatalf("Get: %v", err)
+	}
+	kv := g.(stringAt)
+	res[kv.key] = kv.value
+
+	// These won't be part of the backlog.
+	tc.put(t, ks+"a", fmt.Sprintf("val-100"))
+	tc.put(t, ks+"b", fmt.Sprintf("val-101"))
+
+	// Retrieve the rest of the backlog until BacklogDone is returned.
+	nUpdates := 1
+	for {
+		g, err := w.Get(ctx, BacklogOnly)
+		if err == BacklogDone {
+			break
+		}
+		if err != nil {
+			t.Fatalf("Get: %v", err)
+		}
+		nUpdates += 1
+		kv := g.(stringAt)
+		res[kv.key] = kv.value
+	}
+
+	// The backlog should've been compacted to just two entries at their newest
+	// state.
+	if want, got := 2, nUpdates; want != got {
+		t.Fatalf("wanted backlog in %d updates, got it in %d", want, got)
+	}
+
+	for _, te := range []struct {
+		k, w string
+	}{
+		{ks + "a", "val-98"},
+		{ks + "b", "val-99"},
+	} {
+		if want, got := te.w, res[te.k]; want != got {
+			t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
+		}
+	}
+}