blob: faa9629bdb432aedb6768cfaa7077d2ae945e271 [file] [log] [blame]
Serge Bazanskic89df2f2021-04-27 15:51:37 +02001package etcd
2
3import (
4 "context"
5 "errors"
Lorenz Brund13c1c62022-03-30 19:58:58 +02006 "flag"
Serge Bazanskic89df2f2021-04-27 15:51:37 +02007 "fmt"
8 "log"
9 "os"
10 "strconv"
11 "sync"
12 "testing"
13 "time"
14
Lorenz Brund13c1c62022-03-30 19:58:58 +020015 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
16 "go.etcd.io/etcd/client/pkg/v3/testutil"
17 clientv3 "go.etcd.io/etcd/client/v3"
18 "go.etcd.io/etcd/tests/v3/integration"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020019 "google.golang.org/grpc/codes"
20
21 "source.monogon.dev/metropolis/node/core/consensus/client"
22 "source.monogon.dev/metropolis/pkg/event"
23)
24
25var (
26 cluster *integration.ClusterV3
27 endpoints []string
28)
29
30// TestMain brings up a 3 node etcd cluster for tests to use.
31func TestMain(m *testing.M) {
32 cfg := integration.ClusterConfig{
33 Size: 3,
34 GRPCKeepAliveMinTime: time.Millisecond,
35 }
Lorenz Brund13c1c62022-03-30 19:58:58 +020036 tb, cancel := testutil.NewTestingTBProthesis("curator")
37 defer cancel()
38 flag.Parse()
39 integration.BeforeTest(tb)
40 cluster = integration.NewClusterV3(tb, &cfg)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020041 endpoints = make([]string, 3)
42 for i := range endpoints {
43 endpoints[i] = cluster.Client(i).Endpoints()[0]
44 }
45
46 v := m.Run()
Lorenz Brund13c1c62022-03-30 19:58:58 +020047 cluster.Terminate(tb)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020048 os.Exit(v)
49}
50
51// setRaceWg creates a new WaitGroup and sets the given watcher to wait on this
52// WG after it performs the initial retrieval of a value from etcd, but before
53// it starts the watcher. This is used to test potential race conditions
54// present between these two steps.
55func setRaceWg(w event.Watcher) *sync.WaitGroup {
56 wg := sync.WaitGroup{}
57 w.(*watcher).testRaceWG = &wg
58 return &wg
59}
60
61// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
62// thie WG after an etcd watch channel is created. This is used in tests to
63// ensure that the watcher is fully created before it is tested.
64func setSetupWg(w event.Watcher) *sync.WaitGroup {
65 wg := sync.WaitGroup{}
66 w.(*watcher).testSetupWG = &wg
67 return &wg
68}
69
70// testClient is an etcd connection to the test cluster.
71type testClient struct {
72 client *clientv3.Client
73 namespaced client.Namespaced
74}
75
76func newTestClient(t *testing.T) *testClient {
77 t.Helper()
78 cli, err := clientv3.New(clientv3.Config{
79 Endpoints: endpoints,
80 DialTimeout: 1 * time.Second,
81 DialKeepAliveTime: 1 * time.Second,
82 DialKeepAliveTimeout: 1 * time.Second,
83 })
84 if err != nil {
85 t.Fatalf("clientv3.New: %v", err)
86 }
87
88 namespaced := client.NewLocal(cli)
89 return &testClient{
90 client: cli,
91 namespaced: namespaced,
92 }
93}
94
95func (d *testClient) close() {
96 d.client.Close()
97}
98
99// setEndpoints configures which endpoints (from {0,1,2}) the testClient is
100// connected to.
101func (d *testClient) setEndpoints(nums ...uint) {
102 var eps []string
103 for _, num := range nums {
104 eps = append(eps, endpoints[num])
105 }
106 d.client.SetEndpoints(eps...)
107}
108
109// put uses the testClient to store key with a given string value in etcd. It
110// contains retry logic that will block until the put is successful.
111func (d *testClient) put(t *testing.T, key, value string) {
112 t.Helper()
113 ctx, ctxC := context.WithCancel(context.Background())
114 defer ctxC()
115
116 for {
117 ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond)
118 _, err := d.namespaced.Put(ctxT, key, value)
119 ctxC()
120 if err == nil {
121 return
122 }
123 if err == ctxT.Err() {
124 log.Printf("Retrying after %v", err)
125 continue
126 }
127 // Retry on etcd unavailability - this will happen in this code as the
128 // etcd cluster repeatedly loses quorum.
129 var eerr rpctypes.EtcdError
130 if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable {
131 log.Printf("Retrying after %v", err)
132 continue
133 }
134 t.Fatalf("Put: %v", err)
135 }
136
137}
138
139// remove uses the testClient to remove the given key from etcd. It contains
140// retry logic that will block until the removal is successful.
141func (d *testClient) remove(t *testing.T, key string) {
142 t.Helper()
143 ctx, ctxC := context.WithCancel(context.Background())
144 defer ctxC()
145
146 _, err := d.namespaced.Delete(ctx, key)
147 if err == nil {
148 return
149 }
150 t.Fatalf("Delete: %v", err)
151}
152
153// expect runs a Get on the given Watcher, ensuring the returned value is a
154// given string.
155func expect(t *testing.T, w event.Watcher, value string) {
156 t.Helper()
157 ctx, ctxC := context.WithCancel(context.Background())
158 defer ctxC()
159
160 got, err := w.Get(ctx)
161 if err != nil {
162 t.Fatalf("Get: %v", err)
163 }
164
165 if got, want := string(got.([]byte)), value; got != want {
166 t.Errorf("Got value %q, wanted %q", want, got)
167 }
168}
169
170// expectTimeout ensures that the given watcher blocks on a Get call for at
171// least 100 milliseconds. This is used by tests to attempt to verify that the
172// watcher Get is fully blocked, but can cause false positives (eg. when Get
173// blocks for 101 milliseconds). Thus, this function should be used sparingly
174// and in tests that perform other baseline behaviour checks alongside this
175// test.
176func expectTimeout(t *testing.T, w event.Watcher) {
177 t.Helper()
178 ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
179 got, err := w.Get(ctx)
180 ctxC()
181
182 if !errors.Is(err, ctx.Err()) {
183 t.Fatalf("Expected timeout error, got %v, %v", got, err)
184 }
185}
186
187// wait wraps a watcher into a channel of strings, ensuring that the watcher
188// never errors on Get calls and always returns strings.
189func wait(t *testing.T, w event.Watcher) (chan string, func()) {
190 t.Helper()
191 ctx, ctxC := context.WithCancel(context.Background())
192
193 c := make(chan string)
194
195 go func() {
196 for {
197 got, err := w.Get(ctx)
198 if err != nil && errors.Is(err, ctx.Err()) {
199 return
200 }
201 if err != nil {
202 t.Fatalf("Get: %v", err)
203 }
204 c <- string(got.([]byte))
205 }
206 }()
207
208 return c, ctxC
209}
210
211// TestSimple exercises the simplest possible interaction with a watched value.
212func TestSimple(t *testing.T) {
213 tc := newTestClient(t)
214 defer tc.close()
215
216 k := "test-simple"
217 value := NewValue(tc.namespaced, k, NoDecoder)
218 tc.put(t, k, "one")
219
220 watcher := value.Watch()
221 defer watcher.Close()
222 expect(t, watcher, "one")
223
224 tc.put(t, k, "two")
225 expect(t, watcher, "two")
226
227 tc.put(t, k, "three")
228 tc.put(t, k, "four")
229 tc.put(t, k, "five")
230 tc.put(t, k, "six")
231
232 q, cancel := wait(t, watcher)
233 // Test will hang here if the above value does not receive the set "six".
234 for el := range q {
235 if el == "six" {
236 break
237 }
238 }
239 cancel()
240}
241
Serge Bazanski8d45a052021-10-18 17:24:24 +0200242// stringAt is a helper type for testing ranged watchers. It's returned by a
243// watcher whose decoder is set to stringDecoder.
244type stringAt struct {
245 key, value string
246}
247
248func stringAtDecoder(key, value []byte) (interface{}, error) {
249 valueS := ""
250 if value != nil {
251 valueS = string(value)
252 }
253 return stringAt{
254 key: string(key),
255 value: valueS,
256 }, nil
257}
258
259// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
260// the given map with the retrieved value.
261func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher, m map[string]string) {
262 t.Helper()
263
264 vr, err := w.Get(ctx)
265 if err != nil {
266 t.Fatalf("Get: %v", err)
267 }
268 v := vr.(stringAt)
269 m[v.key] = v.value
270}
271
272// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
273// retrieving updaates via Get in a fully blocking fashion.
274func TestSimpleRange(t *testing.T) {
275 tc := newTestClient(t)
276 defer tc.close()
277
278 ks := "test-simple-range/"
279 ke := "test-simple-range0"
280 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
281 tc.put(t, ks+"a", "one")
282 tc.put(t, ks+"b", "two")
283 tc.put(t, ks+"c", "three")
284 tc.put(t, ks+"b", "four")
285
286 w := value.Watch()
287 defer w.Close()
288
289 ctx, ctxC := context.WithCancel(context.Background())
290 defer ctxC()
291
292 res := make(map[string]string)
293 stringAtGet(ctx, t, w, res)
294 stringAtGet(ctx, t, w, res)
295 stringAtGet(ctx, t, w, res)
296
297 tc.put(t, ks+"a", "five")
298 tc.put(t, ks+"e", "six")
299
300 stringAtGet(ctx, t, w, res)
301 stringAtGet(ctx, t, w, res)
302
303 for _, te := range []struct {
304 k, w string
305 }{
306 {ks + "a", "five"},
307 {ks + "b", "four"},
308 {ks + "c", "three"},
309 {ks + "e", "six"},
310 } {
311 if want, got := te.w, res[te.k]; want != got {
312 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
313 }
314 }
315}
316
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200317// TestCancel ensures that watchers can resume after being canceled.
318func TestCancel(t *testing.T) {
319 tc := newTestClient(t)
320 defer tc.close()
321
322 k := "test-cancel"
323 value := NewValue(tc.namespaced, k, NoDecoder)
324 tc.put(t, k, "one")
325
326 watcher := value.Watch()
327 defer watcher.Close()
328 expect(t, watcher, "one")
329
330 ctx, ctxC := context.WithCancel(context.Background())
331 errs := make(chan error, 1)
332 go func() {
333 _, err := watcher.Get(ctx)
334 errs <- err
335 }()
336 ctxC()
337 if want, got := ctx.Err(), <-errs; !errors.Is(got, want) {
338 t.Fatalf("Wanted err %v, got %v", want, got)
339 }
340
341 // Successfully canceled watch, resuming should continue to work.
342 q, cancel := wait(t, watcher)
343 defer cancel()
344
345 tc.put(t, k, "two")
346 if want, got := "two", <-q; want != got {
347 t.Fatalf("Wanted val %q, got %q", want, got)
348 }
349}
350
351// TestCancelOnGet ensures that a context cancellation on an initial Get (which
352// translates to an etcd Get in a backoff loop) doesn't block.
353func TestCancelOnGet(t *testing.T) {
354 tc := newTestClient(t)
355 defer tc.close()
356
357 k := "test-cancel-on-get"
358 value := NewValue(tc.namespaced, k, NoDecoder)
359 watcher := value.Watch()
360 tc.put(t, k, "one")
361
362 // Cause partition between client endpoint and rest of cluster. Any read/write
363 // operations will now hang.
364 tc.setEndpoints(0)
365 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
366
367 // Perform the initial Get(), which should attempt to retrieve a KV entry from
368 // the etcd service. This should hang. Unfortunately, there's no easy way to do
369 // this without an arbitrary sleep hoping that the client actually gets to the
370 // underlying etcd.Get call. This can cause false positives (eg. false 'pass'
371 // results) in this test.
372 ctx, ctxC := context.WithCancel(context.Background())
373 errs := make(chan error, 1)
374 go func() {
375 _, err := watcher.Get(ctx)
376 errs <- err
377 }()
378 time.Sleep(time.Second)
379
380 // Now that the etcd.Get is hanging, cancel the context.
381 ctxC()
382 // And now unpartition the cluster, resuming reads.
383 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
384
385 // The etcd.Get() call should've returned with a context cancellation.
386 err := <-errs
387 switch {
388 case err == nil:
389 t.Errorf("watcher.Get() returned no error, wanted context error")
390 case errors.Is(err, ctx.Err()):
391 // Okay.
392 default:
393 t.Errorf("watcher.Get() returned %v, wanted context error", err)
394 }
395}
396
397// TestClientReconnect forces a 'reconnection' of an active watcher from a
398// running member to another member, by stopping the original member and
399// explicitly reconnecting the client to other available members.
400//
401// This doe not reflect a situation expected during Metropolis runtime, as we
402// do not expect splits between an etcd client and its connected member
403// (instead, all etcd clients only connect to their local member). However, it
404// is still an important safety test to perform, and it also exercies the
405// equivalent behaviour of an etcd client re-connecting for any other reason.
406func TestClientReconnect(t *testing.T) {
407 tc := newTestClient(t)
408 defer tc.close()
409 tc.setEndpoints(0)
410
411 k := "test-client-reconnect"
412 value := NewValue(tc.namespaced, k, NoDecoder)
413 tc.put(t, k, "one")
414
415 watcher := value.Watch()
416 defer watcher.Close()
417 expect(t, watcher, "one")
418
419 q, cancel := wait(t, watcher)
420 defer cancel()
421
422 cluster.Members[0].Stop(t)
423 defer cluster.Members[0].Restart(t)
424 cluster.WaitLeader(t)
425
426 tc.setEndpoints(1, 2)
427 tc.put(t, k, "two")
428
429 if want, got := "two", <-q; want != got {
430 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
431 }
432}
433
434// TestClientPartition forces a temporary partition of the etcd member while a
435// watcher is running, updates the value from across the partition, and undoes
436// the partition.
437// The partition is expected to be entirely transparent to the watcher.
438func TestClientPartition(t *testing.T) {
439 tcOne := newTestClient(t)
440 defer tcOne.close()
441 tcOne.setEndpoints(0)
442
443 tcRest := newTestClient(t)
444 defer tcRest.close()
445 tcRest.setEndpoints(1, 2)
446
447 k := "test-client-partition"
448 valueOne := NewValue(tcOne.namespaced, k, NoDecoder)
449 watcherOne := valueOne.Watch()
450 defer watcherOne.Close()
451 valueRest := NewValue(tcRest.namespaced, k, NoDecoder)
452 watcherRest := valueRest.Watch()
453 defer watcherRest.Close()
454
455 tcRest.put(t, k, "a")
456 expect(t, watcherOne, "a")
457 expect(t, watcherRest, "a")
458
459 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
460
461 tcRest.put(t, k, "b")
462 expect(t, watcherRest, "b")
463 expectTimeout(t, watcherOne)
464
465 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
466
467 expect(t, watcherOne, "b")
468 tcRest.put(t, k, "c")
469 expect(t, watcherOne, "c")
470 expect(t, watcherRest, "c")
471
472}
473
474// TestEarlyUse exercises the correct behaviour of the value watcher on a value
475// that is not yet set.
476func TestEarlyUse(t *testing.T) {
477 tc := newTestClient(t)
478 defer tc.close()
479
480 k := "test-early-use"
481
482 value := NewValue(tc.namespaced, k, NoDecoder)
483 watcher := value.Watch()
484 defer watcher.Close()
485
486 wg := setSetupWg(watcher)
487 wg.Add(1)
488 q, cancel := wait(t, watcher)
489 defer cancel()
490
491 wg.Done()
492
493 tc.put(t, k, "one")
494
495 if want, got := "one", <-q; want != got {
496 t.Fatalf("Expected %q, got %q", want, got)
497 }
498}
499
500// TestRemove exercises the basic functionality of handling deleted values.
501func TestRemove(t *testing.T) {
502 tc := newTestClient(t)
503 defer tc.close()
504
505 k := "test-remove"
506 tc.put(t, k, "one")
507
508 value := NewValue(tc.namespaced, k, NoDecoder)
509 watcher := value.Watch()
510 defer watcher.Close()
511
512 expect(t, watcher, "one")
513 tc.remove(t, k)
514 expect(t, watcher, "")
515}
516
Serge Bazanski8d45a052021-10-18 17:24:24 +0200517// TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a
518// value is removed.
519func TestRemoveRange(t *testing.T) {
520 tc := newTestClient(t)
521 defer tc.close()
522
523 ks := "test-remove-range/"
524 ke := "test-remove-range0"
525 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
526 tc.put(t, ks+"a", "one")
527 tc.put(t, ks+"b", "two")
528 tc.put(t, ks+"c", "three")
529 tc.put(t, ks+"b", "four")
530 tc.remove(t, ks+"c")
531
532 w := value.Watch()
533 defer w.Close()
534
535 ctx, ctxC := context.WithCancel(context.Background())
536 defer ctxC()
537
538 res := make(map[string]string)
539 stringAtGet(ctx, t, w, res)
540 stringAtGet(ctx, t, w, res)
541
542 for _, te := range []struct {
543 k, w string
544 }{
545 {ks + "a", "one"},
546 {ks + "b", "four"},
547 {ks + "c", ""},
548 } {
549 if want, got := te.w, res[te.k]; want != got {
550 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
551 }
552 }
553}
554
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200555// TestEmptyRace forces the watcher to retrieve an empty value from the K/V
556// store at first, and establishing the watch channel after a new value has
557// been stored in the same place.
558func TestEmptyRace(t *testing.T) {
559 tc := newTestClient(t)
560 defer tc.close()
561
562 k := "test-remove-race"
563 tc.put(t, k, "one")
564 tc.remove(t, k)
565
566 value := NewValue(tc.namespaced, k, NoDecoder)
567 watcher := value.Watch()
568 defer watcher.Close()
569
570 wg := setRaceWg(watcher)
571 wg.Add(1)
572 q, cancel := wait(t, watcher)
573 defer cancel()
574
575 tc.put(t, k, "two")
576 wg.Done()
577
578 if want, got := "two", <-q; want != got {
579 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
580 }
581}
582
583type errOrInt struct {
584 val int64
585 err error
586}
587
588// TestDecoder exercises the BytesDecoder functionality of the watcher, by
589// creating a value with a decoder that only accepts string-encoded integers
590// that are divisible by three. The test then proceeds to put a handful of
591// values into etcd, ensuring that undecodable values correctly return an error
592// on Get, but that the watcher continues to work after the error has been
593// returned.
594func TestDecoder(t *testing.T) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200595 decodeStringifiedNumbersDivisibleBy3 := func(_, data []byte) (interface{}, error) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200596 num, err := strconv.ParseInt(string(data), 10, 64)
597 if err != nil {
598 return nil, fmt.Errorf("not a valid number")
599 }
600 if (num % 3) != 0 {
601 return nil, fmt.Errorf("not divisible by 3")
602 }
603 return num, nil
604 }
605
606 tc := newTestClient(t)
607 defer tc.close()
608
609 ctx, ctxC := context.WithCancel(context.Background())
610 defer ctxC()
611
612 k := "test-decoder"
613 value := NewValue(tc.namespaced, k, decodeStringifiedNumbersDivisibleBy3)
614 watcher := value.Watch()
615 defer watcher.Close()
616 tc.put(t, k, "3")
617 _, err := watcher.Get(ctx)
618 if err != nil {
619 t.Fatalf("Initial Get: %v", err)
620 }
621
622 // Stream updates into arbitrarily-bounded test channel.
623 queue := make(chan errOrInt, 100)
624 go func() {
625 for {
626 res, err := watcher.Get(ctx)
627 if err != nil && errors.Is(err, ctx.Err()) {
628 return
629 }
630 if err != nil {
631 queue <- errOrInt{
632 err: err,
633 }
634 } else {
635 queue <- errOrInt{
636 val: res.(int64),
637 }
638 }
639 }
640 }()
641
642 var wantList []*int64
643 wantError := func(val string) {
644 wantList = append(wantList, nil)
645 tc.put(t, k, val)
646 }
647 wantValue := func(val string, decoded int64) {
648 wantList = append(wantList, &decoded)
649 tc.put(t, k, val)
650 }
651
652 wantError("")
653 wantValue("9", 9)
654 wantError("foo")
655 wantValue("18", 18)
656 wantError("10")
657 wantError("10")
658 wantValue("27", 27)
659 wantValue("36", 36)
660
661 for i, want := range wantList {
662 q := <-queue
663 if want == nil && q.err == nil {
664 t.Errorf("Case %d: wanted error, got no error and value %d", i, q.val)
665 }
666 if want != nil && (*want) != q.val {
667 t.Errorf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
668 }
669 }
670}
671
672// TestBacklog ensures that the watcher can handle a large backlog of changes
673// in etcd that the client didnt' keep up with, and that whatever final state
674// is available to the client when it actually gets to calling Get().
675func TestBacklog(t *testing.T) {
676 tc := newTestClient(t)
677 defer tc.close()
678
679 k := "test-backlog"
680 value := NewValue(tc.namespaced, k, NoDecoder)
681 watcher := value.Watch()
682 defer watcher.Close()
683
684 tc.put(t, k, "initial")
685 expect(t, watcher, "initial")
686
687 for i := 0; i < 1000; i++ {
688 tc.put(t, k, fmt.Sprintf("val-%d", i))
689 }
690
691 ctx, ctxC := context.WithTimeout(context.Background(), time.Second)
692 defer ctxC()
693 for {
694 valB, err := watcher.Get(ctx)
695 if err != nil {
696 t.Fatalf("Get() returned error before expected final value: %v", err)
697 }
698 val := string(valB.([]byte))
699 if val == "val-999" {
700 break
701 }
702 }
703}
Serge Bazanski8d45a052021-10-18 17:24:24 +0200704
705// TestBacklogRange ensures that the ranged etcd watcher can handle a large
706// backlog of changes in etcd that the client didn't keep up with.
707func TestBacklogRange(t *testing.T) {
708 tc := newTestClient(t)
709 defer tc.close()
710
711 ks := "test-backlog-range/"
712 ke := "test-backlog-range0"
713 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
714 w := value.Watch()
715 defer w.Close()
716
717 for i := 0; i < 100; i++ {
718 if i%2 == 0 {
719 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
720 } else {
721 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
722 }
723 }
724
725 ctx, ctxC := context.WithCancel(context.Background())
726 defer ctxC()
727
728 res := make(map[string]string)
729 stringAtGet(ctx, t, w, res)
730 stringAtGet(ctx, t, w, res)
731
732 for _, te := range []struct {
733 k, w string
734 }{
735 {ks + "a", "val-98"},
736 {ks + "b", "val-99"},
737 } {
738 if want, got := te.w, res[te.k]; want != got {
739 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
740 }
741 }
742}
743
744// TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers,
745// which effectively makes any Get operation non-blocking (but also showcases
746// that unless a Get without BacklogOnly is issues, no new data will appear by
747// itself in the watcher - which is an undocumented implementation detail of the
748// option).
749func TestBacklogOnly(t *testing.T) {
750 tc := newTestClient(t)
751 defer tc.close()
752 ctx, ctxC := context.WithCancel(context.Background())
753 defer ctxC()
754
755 k := "test-backlog-only"
756 tc.put(t, k, "initial")
757
758 value := NewValue(tc.namespaced, k, NoDecoder)
759 watcher := value.Watch()
760 defer watcher.Close()
761
762 d, err := watcher.Get(ctx, BacklogOnly)
763 if err != nil {
764 t.Fatalf("First Get failed: %v", err)
765 }
766 if want, got := "initial", string(d.([]byte)); want != got {
767 t.Fatalf("First Get: wanted value %q, got %q", want, got)
768 }
769
770 // As expected, next call to Get with BacklogOnly fails - there truly is no new
771 // updates to emit.
772 _, err = watcher.Get(ctx, BacklogOnly)
773 if want, got := BacklogDone, err; want != got {
774 t.Fatalf("Second Get: wanted %v, got %v", want, got)
775 }
776
777 // Implementation detail: even though there is a new value ('second'),
778 // BacklogOnly will still return BacklogDone.
779 tc.put(t, k, "second")
780 _, err = watcher.Get(ctx, BacklogOnly)
781 if want, got := BacklogDone, err; want != got {
782 t.Fatalf("Third Get: wanted %v, got %v", want, got)
783 }
784
785 // ... However, a Get without BacklogOnly will return the new value.
786 d, err = watcher.Get(ctx)
787 if err != nil {
788 t.Fatalf("Fourth Get failed: %v", err)
789 }
790 if want, got := "second", string(d.([]byte)); want != got {
791 t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
792 }
793}
794
795// TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers,
796// showcasing how it expected to be used for keeping up with the external state
797// of a range by synchronizing to a local map.
798func TestBacklogOnlyRange(t *testing.T) {
799 tc := newTestClient(t)
800 defer tc.close()
801 ctx, ctxC := context.WithCancel(context.Background())
802 defer ctxC()
803
804 ks := "test-backlog-only-range/"
805 ke := "test-backlog-only-range0"
806
807 for i := 0; i < 100; i++ {
808 if i%2 == 0 {
809 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
810 } else {
811 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
812 }
813 }
814
815 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
816 w := value.Watch()
817 defer w.Close()
818
819 // Collect results into a map from key to value.
820 res := make(map[string]string)
821
822 // Run first Get - this is the barrier defining what's part of the backlog.
823 g, err := w.Get(ctx, BacklogOnly)
824 if err != nil {
825 t.Fatalf("Get: %v", err)
826 }
827 kv := g.(stringAt)
828 res[kv.key] = kv.value
829
830 // These won't be part of the backlog.
831 tc.put(t, ks+"a", fmt.Sprintf("val-100"))
832 tc.put(t, ks+"b", fmt.Sprintf("val-101"))
833
834 // Retrieve the rest of the backlog until BacklogDone is returned.
835 nUpdates := 1
836 for {
837 g, err := w.Get(ctx, BacklogOnly)
838 if err == BacklogDone {
839 break
840 }
841 if err != nil {
842 t.Fatalf("Get: %v", err)
843 }
844 nUpdates += 1
845 kv := g.(stringAt)
846 res[kv.key] = kv.value
847 }
848
849 // The backlog should've been compacted to just two entries at their newest
850 // state.
851 if want, got := 2, nUpdates; want != got {
852 t.Fatalf("wanted backlog in %d updates, got it in %d", want, got)
853 }
854
855 for _, te := range []struct {
856 k, w string
857 }{
858 {ks + "a", "val-98"},
859 {ks + "b", "val-99"},
860 } {
861 if want, got := te.w, res[te.k]; want != got {
862 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
863 }
864 }
865}