blob: 4b620c04766f542020525da7ac61bde06638e412 [file] [log] [blame]
Serge Bazanskic89df2f2021-04-27 15:51:37 +02001package etcd
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log"
8 "os"
9 "strconv"
10 "sync"
11 "testing"
12 "time"
13
14 "go.etcd.io/etcd/clientv3"
15 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
16 "go.etcd.io/etcd/integration"
17 "google.golang.org/grpc/codes"
18
19 "source.monogon.dev/metropolis/node/core/consensus/client"
20 "source.monogon.dev/metropolis/pkg/event"
21)
22
23var (
24 cluster *integration.ClusterV3
25 endpoints []string
26)
27
28// TestMain brings up a 3 node etcd cluster for tests to use.
29func TestMain(m *testing.M) {
30 cfg := integration.ClusterConfig{
31 Size: 3,
32 GRPCKeepAliveMinTime: time.Millisecond,
33 }
34 cluster = integration.NewClusterV3(nil, &cfg)
35 endpoints = make([]string, 3)
36 for i := range endpoints {
37 endpoints[i] = cluster.Client(i).Endpoints()[0]
38 }
39
40 v := m.Run()
41 cluster.Terminate(nil)
42 os.Exit(v)
43}
44
45// setRaceWg creates a new WaitGroup and sets the given watcher to wait on this
46// WG after it performs the initial retrieval of a value from etcd, but before
47// it starts the watcher. This is used to test potential race conditions
48// present between these two steps.
49func setRaceWg(w event.Watcher) *sync.WaitGroup {
50 wg := sync.WaitGroup{}
51 w.(*watcher).testRaceWG = &wg
52 return &wg
53}
54
55// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
56// thie WG after an etcd watch channel is created. This is used in tests to
57// ensure that the watcher is fully created before it is tested.
58func setSetupWg(w event.Watcher) *sync.WaitGroup {
59 wg := sync.WaitGroup{}
60 w.(*watcher).testSetupWG = &wg
61 return &wg
62}
63
64// testClient is an etcd connection to the test cluster.
65type testClient struct {
66 client *clientv3.Client
67 namespaced client.Namespaced
68}
69
70func newTestClient(t *testing.T) *testClient {
71 t.Helper()
72 cli, err := clientv3.New(clientv3.Config{
73 Endpoints: endpoints,
74 DialTimeout: 1 * time.Second,
75 DialKeepAliveTime: 1 * time.Second,
76 DialKeepAliveTimeout: 1 * time.Second,
77 })
78 if err != nil {
79 t.Fatalf("clientv3.New: %v", err)
80 }
81
82 namespaced := client.NewLocal(cli)
83 return &testClient{
84 client: cli,
85 namespaced: namespaced,
86 }
87}
88
89func (d *testClient) close() {
90 d.client.Close()
91}
92
93// setEndpoints configures which endpoints (from {0,1,2}) the testClient is
94// connected to.
95func (d *testClient) setEndpoints(nums ...uint) {
96 var eps []string
97 for _, num := range nums {
98 eps = append(eps, endpoints[num])
99 }
100 d.client.SetEndpoints(eps...)
101}
102
103// put uses the testClient to store key with a given string value in etcd. It
104// contains retry logic that will block until the put is successful.
105func (d *testClient) put(t *testing.T, key, value string) {
106 t.Helper()
107 ctx, ctxC := context.WithCancel(context.Background())
108 defer ctxC()
109
110 for {
111 ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond)
112 _, err := d.namespaced.Put(ctxT, key, value)
113 ctxC()
114 if err == nil {
115 return
116 }
117 if err == ctxT.Err() {
118 log.Printf("Retrying after %v", err)
119 continue
120 }
121 // Retry on etcd unavailability - this will happen in this code as the
122 // etcd cluster repeatedly loses quorum.
123 var eerr rpctypes.EtcdError
124 if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable {
125 log.Printf("Retrying after %v", err)
126 continue
127 }
128 t.Fatalf("Put: %v", err)
129 }
130
131}
132
133// remove uses the testClient to remove the given key from etcd. It contains
134// retry logic that will block until the removal is successful.
135func (d *testClient) remove(t *testing.T, key string) {
136 t.Helper()
137 ctx, ctxC := context.WithCancel(context.Background())
138 defer ctxC()
139
140 _, err := d.namespaced.Delete(ctx, key)
141 if err == nil {
142 return
143 }
144 t.Fatalf("Delete: %v", err)
145}
146
147// expect runs a Get on the given Watcher, ensuring the returned value is a
148// given string.
149func expect(t *testing.T, w event.Watcher, value string) {
150 t.Helper()
151 ctx, ctxC := context.WithCancel(context.Background())
152 defer ctxC()
153
154 got, err := w.Get(ctx)
155 if err != nil {
156 t.Fatalf("Get: %v", err)
157 }
158
159 if got, want := string(got.([]byte)), value; got != want {
160 t.Errorf("Got value %q, wanted %q", want, got)
161 }
162}
163
164// expectTimeout ensures that the given watcher blocks on a Get call for at
165// least 100 milliseconds. This is used by tests to attempt to verify that the
166// watcher Get is fully blocked, but can cause false positives (eg. when Get
167// blocks for 101 milliseconds). Thus, this function should be used sparingly
168// and in tests that perform other baseline behaviour checks alongside this
169// test.
170func expectTimeout(t *testing.T, w event.Watcher) {
171 t.Helper()
172 ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
173 got, err := w.Get(ctx)
174 ctxC()
175
176 if !errors.Is(err, ctx.Err()) {
177 t.Fatalf("Expected timeout error, got %v, %v", got, err)
178 }
179}
180
181// wait wraps a watcher into a channel of strings, ensuring that the watcher
182// never errors on Get calls and always returns strings.
183func wait(t *testing.T, w event.Watcher) (chan string, func()) {
184 t.Helper()
185 ctx, ctxC := context.WithCancel(context.Background())
186
187 c := make(chan string)
188
189 go func() {
190 for {
191 got, err := w.Get(ctx)
192 if err != nil && errors.Is(err, ctx.Err()) {
193 return
194 }
195 if err != nil {
196 t.Fatalf("Get: %v", err)
197 }
198 c <- string(got.([]byte))
199 }
200 }()
201
202 return c, ctxC
203}
204
205// TestSimple exercises the simplest possible interaction with a watched value.
206func TestSimple(t *testing.T) {
207 tc := newTestClient(t)
208 defer tc.close()
209
210 k := "test-simple"
211 value := NewValue(tc.namespaced, k, NoDecoder)
212 tc.put(t, k, "one")
213
214 watcher := value.Watch()
215 defer watcher.Close()
216 expect(t, watcher, "one")
217
218 tc.put(t, k, "two")
219 expect(t, watcher, "two")
220
221 tc.put(t, k, "three")
222 tc.put(t, k, "four")
223 tc.put(t, k, "five")
224 tc.put(t, k, "six")
225
226 q, cancel := wait(t, watcher)
227 // Test will hang here if the above value does not receive the set "six".
228 for el := range q {
229 if el == "six" {
230 break
231 }
232 }
233 cancel()
234}
235
Serge Bazanski8d45a052021-10-18 17:24:24 +0200236// stringAt is a helper type for testing ranged watchers. It's returned by a
237// watcher whose decoder is set to stringDecoder.
238type stringAt struct {
239 key, value string
240}
241
242func stringAtDecoder(key, value []byte) (interface{}, error) {
243 valueS := ""
244 if value != nil {
245 valueS = string(value)
246 }
247 return stringAt{
248 key: string(key),
249 value: valueS,
250 }, nil
251}
252
253// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
254// the given map with the retrieved value.
255func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher, m map[string]string) {
256 t.Helper()
257
258 vr, err := w.Get(ctx)
259 if err != nil {
260 t.Fatalf("Get: %v", err)
261 }
262 v := vr.(stringAt)
263 m[v.key] = v.value
264}
265
266// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
267// retrieving updaates via Get in a fully blocking fashion.
268func TestSimpleRange(t *testing.T) {
269 tc := newTestClient(t)
270 defer tc.close()
271
272 ks := "test-simple-range/"
273 ke := "test-simple-range0"
274 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
275 tc.put(t, ks+"a", "one")
276 tc.put(t, ks+"b", "two")
277 tc.put(t, ks+"c", "three")
278 tc.put(t, ks+"b", "four")
279
280 w := value.Watch()
281 defer w.Close()
282
283 ctx, ctxC := context.WithCancel(context.Background())
284 defer ctxC()
285
286 res := make(map[string]string)
287 stringAtGet(ctx, t, w, res)
288 stringAtGet(ctx, t, w, res)
289 stringAtGet(ctx, t, w, res)
290
291 tc.put(t, ks+"a", "five")
292 tc.put(t, ks+"e", "six")
293
294 stringAtGet(ctx, t, w, res)
295 stringAtGet(ctx, t, w, res)
296
297 for _, te := range []struct {
298 k, w string
299 }{
300 {ks + "a", "five"},
301 {ks + "b", "four"},
302 {ks + "c", "three"},
303 {ks + "e", "six"},
304 } {
305 if want, got := te.w, res[te.k]; want != got {
306 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
307 }
308 }
309}
310
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200311// TestCancel ensures that watchers can resume after being canceled.
312func TestCancel(t *testing.T) {
313 tc := newTestClient(t)
314 defer tc.close()
315
316 k := "test-cancel"
317 value := NewValue(tc.namespaced, k, NoDecoder)
318 tc.put(t, k, "one")
319
320 watcher := value.Watch()
321 defer watcher.Close()
322 expect(t, watcher, "one")
323
324 ctx, ctxC := context.WithCancel(context.Background())
325 errs := make(chan error, 1)
326 go func() {
327 _, err := watcher.Get(ctx)
328 errs <- err
329 }()
330 ctxC()
331 if want, got := ctx.Err(), <-errs; !errors.Is(got, want) {
332 t.Fatalf("Wanted err %v, got %v", want, got)
333 }
334
335 // Successfully canceled watch, resuming should continue to work.
336 q, cancel := wait(t, watcher)
337 defer cancel()
338
339 tc.put(t, k, "two")
340 if want, got := "two", <-q; want != got {
341 t.Fatalf("Wanted val %q, got %q", want, got)
342 }
343}
344
345// TestCancelOnGet ensures that a context cancellation on an initial Get (which
346// translates to an etcd Get in a backoff loop) doesn't block.
347func TestCancelOnGet(t *testing.T) {
348 tc := newTestClient(t)
349 defer tc.close()
350
351 k := "test-cancel-on-get"
352 value := NewValue(tc.namespaced, k, NoDecoder)
353 watcher := value.Watch()
354 tc.put(t, k, "one")
355
356 // Cause partition between client endpoint and rest of cluster. Any read/write
357 // operations will now hang.
358 tc.setEndpoints(0)
359 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
360
361 // Perform the initial Get(), which should attempt to retrieve a KV entry from
362 // the etcd service. This should hang. Unfortunately, there's no easy way to do
363 // this without an arbitrary sleep hoping that the client actually gets to the
364 // underlying etcd.Get call. This can cause false positives (eg. false 'pass'
365 // results) in this test.
366 ctx, ctxC := context.WithCancel(context.Background())
367 errs := make(chan error, 1)
368 go func() {
369 _, err := watcher.Get(ctx)
370 errs <- err
371 }()
372 time.Sleep(time.Second)
373
374 // Now that the etcd.Get is hanging, cancel the context.
375 ctxC()
376 // And now unpartition the cluster, resuming reads.
377 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
378
379 // The etcd.Get() call should've returned with a context cancellation.
380 err := <-errs
381 switch {
382 case err == nil:
383 t.Errorf("watcher.Get() returned no error, wanted context error")
384 case errors.Is(err, ctx.Err()):
385 // Okay.
386 default:
387 t.Errorf("watcher.Get() returned %v, wanted context error", err)
388 }
389}
390
391// TestClientReconnect forces a 'reconnection' of an active watcher from a
392// running member to another member, by stopping the original member and
393// explicitly reconnecting the client to other available members.
394//
395// This doe not reflect a situation expected during Metropolis runtime, as we
396// do not expect splits between an etcd client and its connected member
397// (instead, all etcd clients only connect to their local member). However, it
398// is still an important safety test to perform, and it also exercies the
399// equivalent behaviour of an etcd client re-connecting for any other reason.
400func TestClientReconnect(t *testing.T) {
401 tc := newTestClient(t)
402 defer tc.close()
403 tc.setEndpoints(0)
404
405 k := "test-client-reconnect"
406 value := NewValue(tc.namespaced, k, NoDecoder)
407 tc.put(t, k, "one")
408
409 watcher := value.Watch()
410 defer watcher.Close()
411 expect(t, watcher, "one")
412
413 q, cancel := wait(t, watcher)
414 defer cancel()
415
416 cluster.Members[0].Stop(t)
417 defer cluster.Members[0].Restart(t)
418 cluster.WaitLeader(t)
419
420 tc.setEndpoints(1, 2)
421 tc.put(t, k, "two")
422
423 if want, got := "two", <-q; want != got {
424 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
425 }
426}
427
428// TestClientPartition forces a temporary partition of the etcd member while a
429// watcher is running, updates the value from across the partition, and undoes
430// the partition.
431// The partition is expected to be entirely transparent to the watcher.
432func TestClientPartition(t *testing.T) {
433 tcOne := newTestClient(t)
434 defer tcOne.close()
435 tcOne.setEndpoints(0)
436
437 tcRest := newTestClient(t)
438 defer tcRest.close()
439 tcRest.setEndpoints(1, 2)
440
441 k := "test-client-partition"
442 valueOne := NewValue(tcOne.namespaced, k, NoDecoder)
443 watcherOne := valueOne.Watch()
444 defer watcherOne.Close()
445 valueRest := NewValue(tcRest.namespaced, k, NoDecoder)
446 watcherRest := valueRest.Watch()
447 defer watcherRest.Close()
448
449 tcRest.put(t, k, "a")
450 expect(t, watcherOne, "a")
451 expect(t, watcherRest, "a")
452
453 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
454
455 tcRest.put(t, k, "b")
456 expect(t, watcherRest, "b")
457 expectTimeout(t, watcherOne)
458
459 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
460
461 expect(t, watcherOne, "b")
462 tcRest.put(t, k, "c")
463 expect(t, watcherOne, "c")
464 expect(t, watcherRest, "c")
465
466}
467
468// TestEarlyUse exercises the correct behaviour of the value watcher on a value
469// that is not yet set.
470func TestEarlyUse(t *testing.T) {
471 tc := newTestClient(t)
472 defer tc.close()
473
474 k := "test-early-use"
475
476 value := NewValue(tc.namespaced, k, NoDecoder)
477 watcher := value.Watch()
478 defer watcher.Close()
479
480 wg := setSetupWg(watcher)
481 wg.Add(1)
482 q, cancel := wait(t, watcher)
483 defer cancel()
484
485 wg.Done()
486
487 tc.put(t, k, "one")
488
489 if want, got := "one", <-q; want != got {
490 t.Fatalf("Expected %q, got %q", want, got)
491 }
492}
493
494// TestRemove exercises the basic functionality of handling deleted values.
495func TestRemove(t *testing.T) {
496 tc := newTestClient(t)
497 defer tc.close()
498
499 k := "test-remove"
500 tc.put(t, k, "one")
501
502 value := NewValue(tc.namespaced, k, NoDecoder)
503 watcher := value.Watch()
504 defer watcher.Close()
505
506 expect(t, watcher, "one")
507 tc.remove(t, k)
508 expect(t, watcher, "")
509}
510
Serge Bazanski8d45a052021-10-18 17:24:24 +0200511// TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a
512// value is removed.
513func TestRemoveRange(t *testing.T) {
514 tc := newTestClient(t)
515 defer tc.close()
516
517 ks := "test-remove-range/"
518 ke := "test-remove-range0"
519 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
520 tc.put(t, ks+"a", "one")
521 tc.put(t, ks+"b", "two")
522 tc.put(t, ks+"c", "three")
523 tc.put(t, ks+"b", "four")
524 tc.remove(t, ks+"c")
525
526 w := value.Watch()
527 defer w.Close()
528
529 ctx, ctxC := context.WithCancel(context.Background())
530 defer ctxC()
531
532 res := make(map[string]string)
533 stringAtGet(ctx, t, w, res)
534 stringAtGet(ctx, t, w, res)
535
536 for _, te := range []struct {
537 k, w string
538 }{
539 {ks + "a", "one"},
540 {ks + "b", "four"},
541 {ks + "c", ""},
542 } {
543 if want, got := te.w, res[te.k]; want != got {
544 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
545 }
546 }
547}
548
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200549// TestEmptyRace forces the watcher to retrieve an empty value from the K/V
550// store at first, and establishing the watch channel after a new value has
551// been stored in the same place.
552func TestEmptyRace(t *testing.T) {
553 tc := newTestClient(t)
554 defer tc.close()
555
556 k := "test-remove-race"
557 tc.put(t, k, "one")
558 tc.remove(t, k)
559
560 value := NewValue(tc.namespaced, k, NoDecoder)
561 watcher := value.Watch()
562 defer watcher.Close()
563
564 wg := setRaceWg(watcher)
565 wg.Add(1)
566 q, cancel := wait(t, watcher)
567 defer cancel()
568
569 tc.put(t, k, "two")
570 wg.Done()
571
572 if want, got := "two", <-q; want != got {
573 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
574 }
575}
576
577type errOrInt struct {
578 val int64
579 err error
580}
581
582// TestDecoder exercises the BytesDecoder functionality of the watcher, by
583// creating a value with a decoder that only accepts string-encoded integers
584// that are divisible by three. The test then proceeds to put a handful of
585// values into etcd, ensuring that undecodable values correctly return an error
586// on Get, but that the watcher continues to work after the error has been
587// returned.
588func TestDecoder(t *testing.T) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200589 decodeStringifiedNumbersDivisibleBy3 := func(_, data []byte) (interface{}, error) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200590 num, err := strconv.ParseInt(string(data), 10, 64)
591 if err != nil {
592 return nil, fmt.Errorf("not a valid number")
593 }
594 if (num % 3) != 0 {
595 return nil, fmt.Errorf("not divisible by 3")
596 }
597 return num, nil
598 }
599
600 tc := newTestClient(t)
601 defer tc.close()
602
603 ctx, ctxC := context.WithCancel(context.Background())
604 defer ctxC()
605
606 k := "test-decoder"
607 value := NewValue(tc.namespaced, k, decodeStringifiedNumbersDivisibleBy3)
608 watcher := value.Watch()
609 defer watcher.Close()
610 tc.put(t, k, "3")
611 _, err := watcher.Get(ctx)
612 if err != nil {
613 t.Fatalf("Initial Get: %v", err)
614 }
615
616 // Stream updates into arbitrarily-bounded test channel.
617 queue := make(chan errOrInt, 100)
618 go func() {
619 for {
620 res, err := watcher.Get(ctx)
621 if err != nil && errors.Is(err, ctx.Err()) {
622 return
623 }
624 if err != nil {
625 queue <- errOrInt{
626 err: err,
627 }
628 } else {
629 queue <- errOrInt{
630 val: res.(int64),
631 }
632 }
633 }
634 }()
635
636 var wantList []*int64
637 wantError := func(val string) {
638 wantList = append(wantList, nil)
639 tc.put(t, k, val)
640 }
641 wantValue := func(val string, decoded int64) {
642 wantList = append(wantList, &decoded)
643 tc.put(t, k, val)
644 }
645
646 wantError("")
647 wantValue("9", 9)
648 wantError("foo")
649 wantValue("18", 18)
650 wantError("10")
651 wantError("10")
652 wantValue("27", 27)
653 wantValue("36", 36)
654
655 for i, want := range wantList {
656 q := <-queue
657 if want == nil && q.err == nil {
658 t.Errorf("Case %d: wanted error, got no error and value %d", i, q.val)
659 }
660 if want != nil && (*want) != q.val {
661 t.Errorf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
662 }
663 }
664}
665
666// TestBacklog ensures that the watcher can handle a large backlog of changes
667// in etcd that the client didnt' keep up with, and that whatever final state
668// is available to the client when it actually gets to calling Get().
669func TestBacklog(t *testing.T) {
670 tc := newTestClient(t)
671 defer tc.close()
672
673 k := "test-backlog"
674 value := NewValue(tc.namespaced, k, NoDecoder)
675 watcher := value.Watch()
676 defer watcher.Close()
677
678 tc.put(t, k, "initial")
679 expect(t, watcher, "initial")
680
681 for i := 0; i < 1000; i++ {
682 tc.put(t, k, fmt.Sprintf("val-%d", i))
683 }
684
685 ctx, ctxC := context.WithTimeout(context.Background(), time.Second)
686 defer ctxC()
687 for {
688 valB, err := watcher.Get(ctx)
689 if err != nil {
690 t.Fatalf("Get() returned error before expected final value: %v", err)
691 }
692 val := string(valB.([]byte))
693 if val == "val-999" {
694 break
695 }
696 }
697}
Serge Bazanski8d45a052021-10-18 17:24:24 +0200698
699// TestBacklogRange ensures that the ranged etcd watcher can handle a large
700// backlog of changes in etcd that the client didn't keep up with.
701func TestBacklogRange(t *testing.T) {
702 tc := newTestClient(t)
703 defer tc.close()
704
705 ks := "test-backlog-range/"
706 ke := "test-backlog-range0"
707 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
708 w := value.Watch()
709 defer w.Close()
710
711 for i := 0; i < 100; i++ {
712 if i%2 == 0 {
713 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
714 } else {
715 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
716 }
717 }
718
719 ctx, ctxC := context.WithCancel(context.Background())
720 defer ctxC()
721
722 res := make(map[string]string)
723 stringAtGet(ctx, t, w, res)
724 stringAtGet(ctx, t, w, res)
725
726 for _, te := range []struct {
727 k, w string
728 }{
729 {ks + "a", "val-98"},
730 {ks + "b", "val-99"},
731 } {
732 if want, got := te.w, res[te.k]; want != got {
733 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
734 }
735 }
736}
737
738// TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers,
739// which effectively makes any Get operation non-blocking (but also showcases
740// that unless a Get without BacklogOnly is issues, no new data will appear by
741// itself in the watcher - which is an undocumented implementation detail of the
742// option).
743func TestBacklogOnly(t *testing.T) {
744 tc := newTestClient(t)
745 defer tc.close()
746 ctx, ctxC := context.WithCancel(context.Background())
747 defer ctxC()
748
749 k := "test-backlog-only"
750 tc.put(t, k, "initial")
751
752 value := NewValue(tc.namespaced, k, NoDecoder)
753 watcher := value.Watch()
754 defer watcher.Close()
755
756 d, err := watcher.Get(ctx, BacklogOnly)
757 if err != nil {
758 t.Fatalf("First Get failed: %v", err)
759 }
760 if want, got := "initial", string(d.([]byte)); want != got {
761 t.Fatalf("First Get: wanted value %q, got %q", want, got)
762 }
763
764 // As expected, next call to Get with BacklogOnly fails - there truly is no new
765 // updates to emit.
766 _, err = watcher.Get(ctx, BacklogOnly)
767 if want, got := BacklogDone, err; want != got {
768 t.Fatalf("Second Get: wanted %v, got %v", want, got)
769 }
770
771 // Implementation detail: even though there is a new value ('second'),
772 // BacklogOnly will still return BacklogDone.
773 tc.put(t, k, "second")
774 _, err = watcher.Get(ctx, BacklogOnly)
775 if want, got := BacklogDone, err; want != got {
776 t.Fatalf("Third Get: wanted %v, got %v", want, got)
777 }
778
779 // ... However, a Get without BacklogOnly will return the new value.
780 d, err = watcher.Get(ctx)
781 if err != nil {
782 t.Fatalf("Fourth Get failed: %v", err)
783 }
784 if want, got := "second", string(d.([]byte)); want != got {
785 t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
786 }
787}
788
789// TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers,
790// showcasing how it expected to be used for keeping up with the external state
791// of a range by synchronizing to a local map.
792func TestBacklogOnlyRange(t *testing.T) {
793 tc := newTestClient(t)
794 defer tc.close()
795 ctx, ctxC := context.WithCancel(context.Background())
796 defer ctxC()
797
798 ks := "test-backlog-only-range/"
799 ke := "test-backlog-only-range0"
800
801 for i := 0; i < 100; i++ {
802 if i%2 == 0 {
803 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
804 } else {
805 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
806 }
807 }
808
809 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
810 w := value.Watch()
811 defer w.Close()
812
813 // Collect results into a map from key to value.
814 res := make(map[string]string)
815
816 // Run first Get - this is the barrier defining what's part of the backlog.
817 g, err := w.Get(ctx, BacklogOnly)
818 if err != nil {
819 t.Fatalf("Get: %v", err)
820 }
821 kv := g.(stringAt)
822 res[kv.key] = kv.value
823
824 // These won't be part of the backlog.
825 tc.put(t, ks+"a", fmt.Sprintf("val-100"))
826 tc.put(t, ks+"b", fmt.Sprintf("val-101"))
827
828 // Retrieve the rest of the backlog until BacklogDone is returned.
829 nUpdates := 1
830 for {
831 g, err := w.Get(ctx, BacklogOnly)
832 if err == BacklogDone {
833 break
834 }
835 if err != nil {
836 t.Fatalf("Get: %v", err)
837 }
838 nUpdates += 1
839 kv := g.(stringAt)
840 res[kv.key] = kv.value
841 }
842
843 // The backlog should've been compacted to just two entries at their newest
844 // state.
845 if want, got := 2, nUpdates; want != got {
846 t.Fatalf("wanted backlog in %d updates, got it in %d", want, got)
847 }
848
849 for _, te := range []struct {
850 k, w string
851 }{
852 {ks + "a", "val-98"},
853 {ks + "b", "val-99"},
854 } {
855 if want, got := te.w, res[te.k]; want != got {
856 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
857 }
858 }
859}