m/pkg/event/etcd: implement ranged watchers
This adds a new mode of operation to etcd Values/Watchers in which a
range of etcd keys is watched for updates instead of a single key.
This allows the implementation of watching a collection of objects
stored in etcd for updates, eg. the node state in the Curator.
This has been implemented within the existing API of Event Values, which
is likely the biggest contention point of this change. An alternative
would be to design a separate API for multi-value use, but this should
allow us to more easily integrate with the existing code. We make use of
Go's options-as-varargs paradigm to not break any existing use of this
codebase.
Some behaviour of the Get() operation in ranged context is left
underdefined, but none of the expected users of this codebase are
expected to depend on this. Once the dust settles a bit, we can attempt
to formalize this more strongly.
Change-Id: I8f84d74332765e52b9bbec04b626d00f05c23071
Reviewed-on: https://review.monogon.dev/c/monogon/+/419
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/pkg/event/etcd/etcd.go b/metropolis/pkg/event/etcd/etcd.go
index ff40bcd..7b914f7 100644
--- a/metropolis/pkg/event/etcd/etcd.go
+++ b/metropolis/pkg/event/etcd/etcd.go
@@ -27,30 +27,76 @@
type Value struct {
etcd client.Namespaced
key string
+ keyEnd string
decoder BytesDecoder
}
-// NewValue creates a new Value for a given key in an etcd client. The
-// given decoder will be used to convert bytes retrieved from etcd into the
-// interface{} value retrieved by Get by this value's watcher.
-func NewValue(etcd client.Namespaced, key string, decoder BytesDecoder) *Value {
- return &Value{
- etcd: etcd,
- key: key,
- decoder: decoder,
+type Option struct {
+ rangeEnd string
+}
+
+// Range creates a Value that is backed a range of etcd key/value pairs from
+// 'key' passed to NewValue to 'end' passed to Range.
+//
+// The key range semantics (ie. lexicographic ordering) are the same as in etcd
+// ranges, so for example to retrieve all keys prefixed by `foo/` key should be
+// `foo/` and end should be `foo0`.
+//
+// For any update in the given range, the decoder will be called and its result
+// will trigger the return of a Get() call. The decoder should return a type
+// that lets the user distinguish which of the multiple objects in the range got
+// updated, as the Get() call returns no additional information about the
+// location of the retrieved object by itself.
+//
+// The order of values retrieved by Get() is currently fully arbitrary and must
+// not be relied on. It's possible that in the future the order of updates and
+// the blocking behaviour of Get will be formalized, but this is not yet the
+// case. Instead, the data returned should be treated as eventually consistent
+// with the etcd state.
+//
+// For some uses, it might be necessary to first retrieve all the objects
+// contained within the range before starting to block on updates - in this
+// case, the BacklogOnly option should be used when calling Get.
+func Range(end string) *Option {
+ return &Option{
+ rangeEnd: end,
}
}
+// NewValue creates a new Value for a given key(s) in an etcd client. The
+// given decoder will be used to convert bytes retrieved from etcd into the
+// interface{} value retrieved by Get by this value's watcher.
+func NewValue(etcd client.Namespaced, key string, decoder BytesDecoder, options ...*Option) *Value {
+ res := &Value{
+ etcd: etcd,
+ key: key,
+ keyEnd: key,
+ decoder: decoder,
+ }
+
+ for _, opt := range options {
+ if end := opt.rangeEnd; end != "" {
+ res.keyEnd = end
+ }
+ }
+
+ return res
+}
+
// BytesDecoder is a function that converts bytes retrieved from etcd into an
-// end-user facing value. If an error is returned, the Get call performed on a
-// watcher configured with this decoder will fail, swallowing that particular
-// update, but the watcher will continue to work.
-// Any provided BytesDecoder implementations must be safe to copy.
-type BytesDecoder = func(data []byte) (interface{}, error)
+// end-user facing value. Additionally, a key is available so that returned
+// values can be augmented with the location they were retrieved from. This is
+// especially useful when returning values resulting from an etcd range.
+//
+// If an error is returned, the Get call performed on a watcher configured with
+// this decoder will fail, swallowing that particular update, but the watcher
+// will continue to work. Any provided BytesDecoder implementations must be safe
+// to copy.
+type BytesDecoder = func(key []byte, data []byte) (interface{}, error)
// NoDecoder is a no-op decoder which passes through the retrieved bytes as a
// []byte type to the user.
-func NoDecoder(data []byte) (interface{}, error) {
+func NoDecoder(key []byte, data []byte) (interface{}, error) {
return data, nil
}
@@ -66,6 +112,13 @@
}
}
+// keyValue is an intermediate type used to keep etcd values in the watcher's
+// backlog.
+type keyValue struct {
+ key []byte
+ value []byte
+}
+
type watcher struct {
// Value copy, used to configure the behaviour of this watcher.
Value
@@ -79,11 +132,15 @@
// error if concurrent access is attempted.
getSem chan struct{}
- // backlogged is the value retrieved by an initial KV Get from etcd that
- // should be returned at the next opportunity, or nil if there isn't any.
- backlogged *[]byte
- // prev is the revision of a previously retrieved value within this
- // watcher, or nil if there hasn't been any.
+ // backlogged is a list of (key, value) pairs retrieved from etcd but not yet
+ // returned via Get. These items are not a replay of all the updates from etcd,
+ // but are already compacted to deduplicate updates to the same object (ie., if
+ // the update stream from etcd is for keys A, B, and A, the backlogged list will
+ // only contain one update for A and B each, with the first update for A being
+ // discarded upon arrival of the second update).
+ backlogged []*keyValue
+ // prev is the etcd store revision of a previously completed etcd Get/Watch
+ // call, used to resume a Watch call in case of failures.
prev *int64
// wc is the etcd watch channel, or nil if no channel is yet open.
wc clientv3.WatchChan
@@ -98,34 +155,49 @@
testSetupWG *sync.WaitGroup
}
+// setup initiates wc (the watch channel from etcd) after retrieving the initial
+// value(s) with a get operation.
func (w *watcher) setup(ctx context.Context) error {
if w.wc != nil {
return nil
}
+ ranged := w.key != w.keyEnd
- // First, check if some data under this key already exists.
+ // First, check if some data under this key/range already exists.
+
// We use an exponential backoff and retry here as the initial Get can fail
// if the cluster is unstable (eg. failing over). We only fail the retry if
// the context expires.
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 0
+
err := backoff.Retry(func() error {
- get, err := w.etcd.Get(ctx, w.key)
+
+ var getOpts []clientv3.OpOption
+ if ranged {
+ getOpts = append(getOpts, clientv3.WithRange(w.keyEnd))
+ }
+ get, err := w.etcd.Get(ctx, w.key, getOpts...)
if err != nil {
return fmt.Errorf("when retrieving initial value: %w", err)
}
+
+ // Assert that the etcd API is behaving as expected.
+ if !ranged && len(get.Kvs) > 1 {
+ panic("More than one key returned in unary GET response")
+ }
+
+ // After a successful Get, save the revision to watch from and re-build the
+ // backlog from scratch based on what was available in the etcd store at that
+ // time.
w.prev = &get.Header.Revision
- if len(get.Kvs) != 0 {
- // Assert that the etcd API is behaving as expected.
- if len(get.Kvs) > 1 {
- panic("More than one key returned in unary GET response")
- }
- // If an existing value is present, backlog it and set the prev value
- // accordingly.
- kv := get.Kvs[0]
- w.backlogged = &kv.Value
- } else {
- w.backlogged = nil
+
+ w.backlogged = nil
+ for _, kv := range get.Kvs {
+ w.backlogged = append(w.backlogged, &keyValue{
+ key: kv.Key,
+ value: kv.Value,
+ })
}
return nil
@@ -138,10 +210,11 @@
return err
}
- var watchOpts []clientv3.OpOption
- if w.prev != nil {
- watchOpts = append(watchOpts, clientv3.WithRev(*w.prev+1))
- } else {
+ watchOpts := []clientv3.OpOption{
+ clientv3.WithRev(*w.prev + 1),
+ }
+ if ranged {
+ watchOpts = append(watchOpts, clientv3.WithRange(w.keyEnd))
}
w.wc = w.etcd.Watch(w.ctx, w.key, watchOpts...)
@@ -151,23 +224,17 @@
return nil
}
-func (w *watcher) get(ctx context.Context) ([]byte, error) {
- // Return backlogged value, if present.
- if w.backlogged != nil {
- value := *w.backlogged
- w.backlogged = nil
- return value, nil
- }
-
- // Keep watching for a watch event.
- var event *clientv3.Event
+// backfill blocks until a backlog of items is available. An error is returned
+// if the context is canceled.
+func (w *watcher) backfill(ctx context.Context) error {
+ // Keep watching for watch events.
for {
var resp *clientv3.WatchResponse
select {
case r := <-w.wc:
resp = &r
case <-ctx.Done():
- return nil, ctx.Err()
+ return ctx.Err()
}
if resp.Canceled {
@@ -175,52 +242,128 @@
// cancellations. Any other error is something we need to handle,
// eg. a client close or compaction error.
if errors.Is(resp.Err(), ctx.Err()) {
- return nil, fmt.Errorf("watch canceled: %w", resp.Err())
+ return fmt.Errorf("watch canceled: %w", resp.Err())
}
// Attempt to reconnect.
- w.wc = nil
- w.setup(ctx)
+ if w.wc != nil {
+ // If a wc already exists, close it. This forces a reconnection
+ // by the next setup call.
+ w.ctxC()
+ w.ctx, w.ctxC = context.WithCancel(context.Background())
+ w.wc = nil
+ }
+ if err := w.setup(ctx); err != nil {
+ return fmt.Errorf("failed to setup watcher: %w", err)
+ }
continue
}
- if len(resp.Events) < 1 {
+ w.prev = &resp.Header.Revision
+ // Spurious watch event with no update? Keep trying.
+ if len(resp.Events) == 0 {
continue
}
- event = resp.Events[len(resp.Events)-1]
- break
- }
+ // Process updates into compacted list, transforming deletions into value: nil
+ // keyValues. This maps an etcd key into a pointer in the already existing
+ // backlog list. It will then be used to compact all updates into the smallest
+ // backlog possible (by overriding previously backlogged items for a key if this
+ // key is encountered again).
+ //
+ // TODO(q3k): this could be stored in the watcher state to not waste time on
+ // each update, but it's good enough for now.
+ lastUpdate := make(map[string]*keyValue)
+ for _, kv := range w.backlogged {
+ kv := kv
+ lastUpdate[string(kv.key)] = kv
+ }
+ for _, ev := range resp.Events {
+ var value []byte
+ switch ev.Type {
+ case clientv3.EventTypeDelete:
+ case clientv3.EventTypePut:
+ value = ev.Kv.Value
+ default:
+ return fmt.Errorf("invalid event type %v", ev.Type)
+ }
- w.prev = &event.Kv.ModRevision
- // Return deletions as nil, and new values as their content.
- switch event.Type {
- case clientv3.EventTypeDelete:
- return nil, nil
- case clientv3.EventTypePut:
- return event.Kv.Value, nil
- default:
- return nil, fmt.Errorf("invalid event type %v", event.Type)
+ keyS := string(ev.Kv.Key)
+ prev := lastUpdate[keyS]
+ if prev == nil {
+ kv := &keyValue{
+ key: ev.Kv.Key,
+ }
+ w.backlogged = append(w.backlogged, kv)
+ prev = kv
+ }
+ prev.value = value
+ }
+
+ // Still nothing in backlog? Keep trying.
+ if len(w.backlogged) == 0 {
+ continue
+ }
+
+ return nil
}
}
+type GetOption struct {
+ backlogOnly bool
+}
+
+var (
+ // BacklogOnly will prevent Get from blocking on waiting for more updates from
+ // etcd, by instead returning BacklogDone whenever no more data is currently
+ // locally available. This is different however, from establishing that there
+ // are no more pending updates from the etcd cluster - the only way to ensure
+ // the local client is up to date is by performing Get calls without this option
+ // set.
+ //
+ // This mode of retrieval should only be used for the retrieval of the existing
+ // data in the etcd cluster on the initial creation of the Watcher (by
+ // repeatedly calling Get until BacklogDone isreturned), and shouldn't be set
+ // for any subsequent call. Any use of this option after that initial fetch is
+ // undefined behaviour that exposes the internals of the Get implementation, and
+ // must not be relied on. However, in the future, this behaviour might be
+ // formalized.
+ //
+ // This mode is particularly useful for ranged watchers. Non-ranged watchers can
+ // still use this option to distinguish between blocking because of the
+ // nonexistence of an object vs. blocking because of networking issues. However,
+ // non-ranged retrieval semantics generally will rarely need to make this
+ // distinction.
+ BacklogOnly = GetOption{backlogOnly: true}
+
+ // BacklogDone is returned by Get when BacklogOnly is set and there is no more
+ // event data stored in the Watcher client, ie. when the initial cluster state
+ // of the requested key has been retrieved.
+ BacklogDone = errors.New("no more backlogged data")
+)
+
// Get implements the Get method of the Watcher interface.
-// It can return an error in two cases:
+// It can return an error in three cases:
// - the given context is canceled (in which case, the given error will wrap
// the context error)
// - the watcher's BytesDecoder returned an error (in which case the error
// returned by the BytesDecoder will be returned verbatim)
+// - it has been called with BacklogOnly and the Watcher has no more local
+// event data to return (see BacklogOnly for more information on the
+// semantics of this mode of operation)
// Note that transient and permanent etcd errors are never returned, and the
// Get call will attempt to recover from these errors as much as possible. This
// also means that the user of the Watcher will not be notified if the
// underlying etcd client disconnects from the cluster, or if the cluster loses
// quorum.
+//
// TODO(q3k): implement leases to allow clients to be notified when there are
// transient cluster/quorum/partition errors, if needed.
+//
// TODO(q3k): implement internal, limited buffering for backlogged data not yet
// consumed by client, as etcd client library seems to use an unbound buffer in
// case this happens ( see: watcherStream.buf in clientv3).
-func (w *watcher) Get(ctx context.Context) (interface{}, error) {
+func (w *watcher) Get(ctx context.Context, opts ...event.GetOption) (interface{}, error) {
select {
case w.getSem <- struct{}{}:
default:
@@ -230,6 +373,17 @@
<-w.getSem
}()
+ backlogOnly := false
+ for _, optI := range opts {
+ opt, ok := optI.(GetOption)
+ if !ok {
+ return nil, fmt.Errorf("get options must be of type etcd.GetOption")
+ }
+ if opt.backlogOnly {
+ backlogOnly = true
+ }
+ }
+
// Early check for context cancelations, preventing spurious contact with etcd
// if there's no need to.
if w.ctx.Err() != nil {
@@ -240,11 +394,34 @@
return nil, fmt.Errorf("when setting up watcher: %w", err)
}
- value, err := w.get(ctx)
- if err != nil {
- return nil, fmt.Errorf("when watching for new value: %w", err)
+ if backlogOnly && len(w.backlogged) == 0 {
+ return nil, BacklogDone
}
- return w.decoder(value)
+
+ // Update backlog from etcd if needed.
+ if len(w.backlogged) < 1 {
+ err := w.backfill(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("when watching for new value: %w", err)
+ }
+ }
+ // Backlog is now guaranteed to contain at least one element.
+
+ ranged := w.key != w.keyEnd
+ if !ranged {
+ // For non-ranged queries, drain backlog fully.
+ if len(w.backlogged) != 1 {
+ panic("multiple keys in nonranged value")
+ }
+ kv := w.backlogged[0]
+ w.backlogged = nil
+ return w.decoder(kv.key, kv.value)
+ } else {
+ // For ranged queries, pop one ranged query off the backlog.
+ kv := w.backlogged[0]
+ w.backlogged = w.backlogged[1:]
+ return w.decoder(kv.key, kv.value)
+ }
}
func (w *watcher) Close() error {
diff --git a/metropolis/pkg/event/etcd/etcd_test.go b/metropolis/pkg/event/etcd/etcd_test.go
index 79d9875..4b620c0 100644
--- a/metropolis/pkg/event/etcd/etcd_test.go
+++ b/metropolis/pkg/event/etcd/etcd_test.go
@@ -233,6 +233,81 @@
cancel()
}
+// stringAt is a helper type for testing ranged watchers. It's returned by a
+// watcher whose decoder is set to stringDecoder.
+type stringAt struct {
+ key, value string
+}
+
+func stringAtDecoder(key, value []byte) (interface{}, error) {
+ valueS := ""
+ if value != nil {
+ valueS = string(value)
+ }
+ return stringAt{
+ key: string(key),
+ value: valueS,
+ }, nil
+}
+
+// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
+// the given map with the retrieved value.
+func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher, m map[string]string) {
+ t.Helper()
+
+ vr, err := w.Get(ctx)
+ if err != nil {
+ t.Fatalf("Get: %v", err)
+ }
+ v := vr.(stringAt)
+ m[v.key] = v.value
+}
+
+// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
+// retrieving updaates via Get in a fully blocking fashion.
+func TestSimpleRange(t *testing.T) {
+ tc := newTestClient(t)
+ defer tc.close()
+
+ ks := "test-simple-range/"
+ ke := "test-simple-range0"
+ value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ tc.put(t, ks+"a", "one")
+ tc.put(t, ks+"b", "two")
+ tc.put(t, ks+"c", "three")
+ tc.put(t, ks+"b", "four")
+
+ w := value.Watch()
+ defer w.Close()
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ res := make(map[string]string)
+ stringAtGet(ctx, t, w, res)
+ stringAtGet(ctx, t, w, res)
+ stringAtGet(ctx, t, w, res)
+
+ tc.put(t, ks+"a", "five")
+ tc.put(t, ks+"e", "six")
+
+ stringAtGet(ctx, t, w, res)
+ stringAtGet(ctx, t, w, res)
+
+ for _, te := range []struct {
+ k, w string
+ }{
+ {ks + "a", "five"},
+ {ks + "b", "four"},
+ {ks + "c", "three"},
+ {ks + "e", "six"},
+ } {
+ if want, got := te.w, res[te.k]; want != got {
+ t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
+ }
+ }
+}
+
// TestCancel ensures that watchers can resume after being canceled.
func TestCancel(t *testing.T) {
tc := newTestClient(t)
@@ -433,6 +508,44 @@
expect(t, watcher, "")
}
+// TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a
+// value is removed.
+func TestRemoveRange(t *testing.T) {
+ tc := newTestClient(t)
+ defer tc.close()
+
+ ks := "test-remove-range/"
+ ke := "test-remove-range0"
+ value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ tc.put(t, ks+"a", "one")
+ tc.put(t, ks+"b", "two")
+ tc.put(t, ks+"c", "three")
+ tc.put(t, ks+"b", "four")
+ tc.remove(t, ks+"c")
+
+ w := value.Watch()
+ defer w.Close()
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ res := make(map[string]string)
+ stringAtGet(ctx, t, w, res)
+ stringAtGet(ctx, t, w, res)
+
+ for _, te := range []struct {
+ k, w string
+ }{
+ {ks + "a", "one"},
+ {ks + "b", "four"},
+ {ks + "c", ""},
+ } {
+ if want, got := te.w, res[te.k]; want != got {
+ t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
+ }
+ }
+}
+
// TestEmptyRace forces the watcher to retrieve an empty value from the K/V
// store at first, and establishing the watch channel after a new value has
// been stored in the same place.
@@ -473,7 +586,7 @@
// on Get, but that the watcher continues to work after the error has been
// returned.
func TestDecoder(t *testing.T) {
- decodeStringifiedNumbersDivisibleBy3 := func(data []byte) (interface{}, error) {
+ decodeStringifiedNumbersDivisibleBy3 := func(_, data []byte) (interface{}, error) {
num, err := strconv.ParseInt(string(data), 10, 64)
if err != nil {
return nil, fmt.Errorf("not a valid number")
@@ -582,3 +695,165 @@
}
}
}
+
+// TestBacklogRange ensures that the ranged etcd watcher can handle a large
+// backlog of changes in etcd that the client didn't keep up with.
+func TestBacklogRange(t *testing.T) {
+ tc := newTestClient(t)
+ defer tc.close()
+
+ ks := "test-backlog-range/"
+ ke := "test-backlog-range0"
+ value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ w := value.Watch()
+ defer w.Close()
+
+ for i := 0; i < 100; i++ {
+ if i%2 == 0 {
+ tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
+ } else {
+ tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
+ }
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ res := make(map[string]string)
+ stringAtGet(ctx, t, w, res)
+ stringAtGet(ctx, t, w, res)
+
+ for _, te := range []struct {
+ k, w string
+ }{
+ {ks + "a", "val-98"},
+ {ks + "b", "val-99"},
+ } {
+ if want, got := te.w, res[te.k]; want != got {
+ t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
+ }
+ }
+}
+
+// TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers,
+// which effectively makes any Get operation non-blocking (but also showcases
+// that unless a Get without BacklogOnly is issues, no new data will appear by
+// itself in the watcher - which is an undocumented implementation detail of the
+// option).
+func TestBacklogOnly(t *testing.T) {
+ tc := newTestClient(t)
+ defer tc.close()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ k := "test-backlog-only"
+ tc.put(t, k, "initial")
+
+ value := NewValue(tc.namespaced, k, NoDecoder)
+ watcher := value.Watch()
+ defer watcher.Close()
+
+ d, err := watcher.Get(ctx, BacklogOnly)
+ if err != nil {
+ t.Fatalf("First Get failed: %v", err)
+ }
+ if want, got := "initial", string(d.([]byte)); want != got {
+ t.Fatalf("First Get: wanted value %q, got %q", want, got)
+ }
+
+ // As expected, next call to Get with BacklogOnly fails - there truly is no new
+ // updates to emit.
+ _, err = watcher.Get(ctx, BacklogOnly)
+ if want, got := BacklogDone, err; want != got {
+ t.Fatalf("Second Get: wanted %v, got %v", want, got)
+ }
+
+ // Implementation detail: even though there is a new value ('second'),
+ // BacklogOnly will still return BacklogDone.
+ tc.put(t, k, "second")
+ _, err = watcher.Get(ctx, BacklogOnly)
+ if want, got := BacklogDone, err; want != got {
+ t.Fatalf("Third Get: wanted %v, got %v", want, got)
+ }
+
+ // ... However, a Get without BacklogOnly will return the new value.
+ d, err = watcher.Get(ctx)
+ if err != nil {
+ t.Fatalf("Fourth Get failed: %v", err)
+ }
+ if want, got := "second", string(d.([]byte)); want != got {
+ t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
+ }
+}
+
+// TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers,
+// showcasing how it expected to be used for keeping up with the external state
+// of a range by synchronizing to a local map.
+func TestBacklogOnlyRange(t *testing.T) {
+ tc := newTestClient(t)
+ defer tc.close()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ ks := "test-backlog-only-range/"
+ ke := "test-backlog-only-range0"
+
+ for i := 0; i < 100; i++ {
+ if i%2 == 0 {
+ tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
+ } else {
+ tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
+ }
+ }
+
+ value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ w := value.Watch()
+ defer w.Close()
+
+ // Collect results into a map from key to value.
+ res := make(map[string]string)
+
+ // Run first Get - this is the barrier defining what's part of the backlog.
+ g, err := w.Get(ctx, BacklogOnly)
+ if err != nil {
+ t.Fatalf("Get: %v", err)
+ }
+ kv := g.(stringAt)
+ res[kv.key] = kv.value
+
+ // These won't be part of the backlog.
+ tc.put(t, ks+"a", fmt.Sprintf("val-100"))
+ tc.put(t, ks+"b", fmt.Sprintf("val-101"))
+
+ // Retrieve the rest of the backlog until BacklogDone is returned.
+ nUpdates := 1
+ for {
+ g, err := w.Get(ctx, BacklogOnly)
+ if err == BacklogDone {
+ break
+ }
+ if err != nil {
+ t.Fatalf("Get: %v", err)
+ }
+ nUpdates += 1
+ kv := g.(stringAt)
+ res[kv.key] = kv.value
+ }
+
+ // The backlog should've been compacted to just two entries at their newest
+ // state.
+ if want, got := 2, nUpdates; want != got {
+ t.Fatalf("wanted backlog in %d updates, got it in %d", want, got)
+ }
+
+ for _, te := range []struct {
+ k, w string
+ }{
+ {ks + "a", "val-98"},
+ {ks + "b", "val-99"},
+ } {
+ if want, got := te.w, res[te.k]; want != got {
+ t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
+ }
+ }
+}