blob: f95fa0b4926915adce2736189fe20bad123d5db4 [file] [log] [blame]
Serge Bazanskic89df2f2021-04-27 15:51:37 +02001package etcd
2
3import (
4 "context"
5 "errors"
Lorenz Brund13c1c62022-03-30 19:58:58 +02006 "flag"
Serge Bazanskic89df2f2021-04-27 15:51:37 +02007 "fmt"
8 "log"
9 "os"
10 "strconv"
11 "sync"
12 "testing"
13 "time"
14
Lorenz Brund13c1c62022-03-30 19:58:58 +020015 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
16 "go.etcd.io/etcd/client/pkg/v3/testutil"
17 clientv3 "go.etcd.io/etcd/client/v3"
18 "go.etcd.io/etcd/tests/v3/integration"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020019 "google.golang.org/grpc/codes"
20
21 "source.monogon.dev/metropolis/node/core/consensus/client"
22 "source.monogon.dev/metropolis/pkg/event"
23)
24
25var (
26 cluster *integration.ClusterV3
27 endpoints []string
28)
29
30// TestMain brings up a 3 node etcd cluster for tests to use.
31func TestMain(m *testing.M) {
32 cfg := integration.ClusterConfig{
33 Size: 3,
34 GRPCKeepAliveMinTime: time.Millisecond,
35 }
Lorenz Brund13c1c62022-03-30 19:58:58 +020036 tb, cancel := testutil.NewTestingTBProthesis("curator")
37 defer cancel()
38 flag.Parse()
Serge Bazanskidefff522022-05-16 17:28:16 +020039 integration.BeforeTestExternal(tb)
Lorenz Brund13c1c62022-03-30 19:58:58 +020040 cluster = integration.NewClusterV3(tb, &cfg)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020041 endpoints = make([]string, 3)
42 for i := range endpoints {
43 endpoints[i] = cluster.Client(i).Endpoints()[0]
44 }
45
46 v := m.Run()
Lorenz Brund13c1c62022-03-30 19:58:58 +020047 cluster.Terminate(tb)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020048 os.Exit(v)
49}
50
51// setRaceWg creates a new WaitGroup and sets the given watcher to wait on this
52// WG after it performs the initial retrieval of a value from etcd, but before
53// it starts the watcher. This is used to test potential race conditions
54// present between these two steps.
55func setRaceWg(w event.Watcher) *sync.WaitGroup {
56 wg := sync.WaitGroup{}
57 w.(*watcher).testRaceWG = &wg
58 return &wg
59}
60
61// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
62// thie WG after an etcd watch channel is created. This is used in tests to
63// ensure that the watcher is fully created before it is tested.
64func setSetupWg(w event.Watcher) *sync.WaitGroup {
65 wg := sync.WaitGroup{}
66 w.(*watcher).testSetupWG = &wg
67 return &wg
68}
69
70// testClient is an etcd connection to the test cluster.
71type testClient struct {
72 client *clientv3.Client
73 namespaced client.Namespaced
74}
75
76func newTestClient(t *testing.T) *testClient {
77 t.Helper()
78 cli, err := clientv3.New(clientv3.Config{
79 Endpoints: endpoints,
80 DialTimeout: 1 * time.Second,
81 DialKeepAliveTime: 1 * time.Second,
82 DialKeepAliveTimeout: 1 * time.Second,
83 })
84 if err != nil {
85 t.Fatalf("clientv3.New: %v", err)
86 }
87
88 namespaced := client.NewLocal(cli)
89 return &testClient{
90 client: cli,
91 namespaced: namespaced,
92 }
93}
94
95func (d *testClient) close() {
96 d.client.Close()
97}
98
99// setEndpoints configures which endpoints (from {0,1,2}) the testClient is
100// connected to.
101func (d *testClient) setEndpoints(nums ...uint) {
102 var eps []string
103 for _, num := range nums {
104 eps = append(eps, endpoints[num])
105 }
106 d.client.SetEndpoints(eps...)
107}
108
109// put uses the testClient to store key with a given string value in etcd. It
110// contains retry logic that will block until the put is successful.
111func (d *testClient) put(t *testing.T, key, value string) {
112 t.Helper()
113 ctx, ctxC := context.WithCancel(context.Background())
114 defer ctxC()
115
116 for {
117 ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond)
118 _, err := d.namespaced.Put(ctxT, key, value)
119 ctxC()
120 if err == nil {
121 return
122 }
123 if err == ctxT.Err() {
124 log.Printf("Retrying after %v", err)
125 continue
126 }
127 // Retry on etcd unavailability - this will happen in this code as the
128 // etcd cluster repeatedly loses quorum.
129 var eerr rpctypes.EtcdError
130 if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable {
131 log.Printf("Retrying after %v", err)
132 continue
133 }
134 t.Fatalf("Put: %v", err)
135 }
136
137}
138
139// remove uses the testClient to remove the given key from etcd. It contains
140// retry logic that will block until the removal is successful.
141func (d *testClient) remove(t *testing.T, key string) {
142 t.Helper()
143 ctx, ctxC := context.WithCancel(context.Background())
144 defer ctxC()
145
146 _, err := d.namespaced.Delete(ctx, key)
147 if err == nil {
148 return
149 }
150 t.Fatalf("Delete: %v", err)
151}
152
153// expect runs a Get on the given Watcher, ensuring the returned value is a
154// given string.
155func expect(t *testing.T, w event.Watcher, value string) {
156 t.Helper()
157 ctx, ctxC := context.WithCancel(context.Background())
158 defer ctxC()
159
160 got, err := w.Get(ctx)
161 if err != nil {
162 t.Fatalf("Get: %v", err)
163 }
164
165 if got, want := string(got.([]byte)), value; got != want {
166 t.Errorf("Got value %q, wanted %q", want, got)
167 }
168}
169
170// expectTimeout ensures that the given watcher blocks on a Get call for at
171// least 100 milliseconds. This is used by tests to attempt to verify that the
172// watcher Get is fully blocked, but can cause false positives (eg. when Get
173// blocks for 101 milliseconds). Thus, this function should be used sparingly
174// and in tests that perform other baseline behaviour checks alongside this
175// test.
176func expectTimeout(t *testing.T, w event.Watcher) {
177 t.Helper()
178 ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
179 got, err := w.Get(ctx)
180 ctxC()
181
182 if !errors.Is(err, ctx.Err()) {
183 t.Fatalf("Expected timeout error, got %v, %v", got, err)
184 }
185}
186
187// wait wraps a watcher into a channel of strings, ensuring that the watcher
188// never errors on Get calls and always returns strings.
189func wait(t *testing.T, w event.Watcher) (chan string, func()) {
190 t.Helper()
191 ctx, ctxC := context.WithCancel(context.Background())
192
193 c := make(chan string)
194
195 go func() {
196 for {
197 got, err := w.Get(ctx)
198 if err != nil && errors.Is(err, ctx.Err()) {
199 return
200 }
201 if err != nil {
202 t.Fatalf("Get: %v", err)
203 }
204 c <- string(got.([]byte))
205 }
206 }()
207
208 return c, ctxC
209}
210
211// TestSimple exercises the simplest possible interaction with a watched value.
212func TestSimple(t *testing.T) {
213 tc := newTestClient(t)
214 defer tc.close()
215
216 k := "test-simple"
217 value := NewValue(tc.namespaced, k, NoDecoder)
218 tc.put(t, k, "one")
219
220 watcher := value.Watch()
221 defer watcher.Close()
222 expect(t, watcher, "one")
223
224 tc.put(t, k, "two")
225 expect(t, watcher, "two")
226
227 tc.put(t, k, "three")
228 tc.put(t, k, "four")
229 tc.put(t, k, "five")
230 tc.put(t, k, "six")
231
232 q, cancel := wait(t, watcher)
233 // Test will hang here if the above value does not receive the set "six".
234 for el := range q {
235 if el == "six" {
236 break
237 }
238 }
239 cancel()
240}
241
Serge Bazanski8d45a052021-10-18 17:24:24 +0200242// stringAt is a helper type for testing ranged watchers. It's returned by a
243// watcher whose decoder is set to stringDecoder.
244type stringAt struct {
245 key, value string
246}
247
248func stringAtDecoder(key, value []byte) (interface{}, error) {
249 valueS := ""
250 if value != nil {
251 valueS = string(value)
252 }
253 return stringAt{
254 key: string(key),
255 value: valueS,
256 }, nil
257}
258
259// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
260// the given map with the retrieved value.
261func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher, m map[string]string) {
262 t.Helper()
263
264 vr, err := w.Get(ctx)
265 if err != nil {
266 t.Fatalf("Get: %v", err)
267 }
268 v := vr.(stringAt)
269 m[v.key] = v.value
270}
271
272// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
273// retrieving updaates via Get in a fully blocking fashion.
274func TestSimpleRange(t *testing.T) {
275 tc := newTestClient(t)
276 defer tc.close()
277
278 ks := "test-simple-range/"
279 ke := "test-simple-range0"
280 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
281 tc.put(t, ks+"a", "one")
282 tc.put(t, ks+"b", "two")
283 tc.put(t, ks+"c", "three")
284 tc.put(t, ks+"b", "four")
285
286 w := value.Watch()
287 defer w.Close()
288
289 ctx, ctxC := context.WithCancel(context.Background())
290 defer ctxC()
291
292 res := make(map[string]string)
293 stringAtGet(ctx, t, w, res)
294 stringAtGet(ctx, t, w, res)
295 stringAtGet(ctx, t, w, res)
296
297 tc.put(t, ks+"a", "five")
298 tc.put(t, ks+"e", "six")
299
300 stringAtGet(ctx, t, w, res)
301 stringAtGet(ctx, t, w, res)
302
303 for _, te := range []struct {
304 k, w string
305 }{
306 {ks + "a", "five"},
307 {ks + "b", "four"},
308 {ks + "c", "three"},
309 {ks + "e", "six"},
310 } {
311 if want, got := te.w, res[te.k]; want != got {
312 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
313 }
314 }
315}
316
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200317// TestCancel ensures that watchers can resume after being canceled.
318func TestCancel(t *testing.T) {
319 tc := newTestClient(t)
320 defer tc.close()
321
322 k := "test-cancel"
323 value := NewValue(tc.namespaced, k, NoDecoder)
324 tc.put(t, k, "one")
325
326 watcher := value.Watch()
327 defer watcher.Close()
328 expect(t, watcher, "one")
329
330 ctx, ctxC := context.WithCancel(context.Background())
331 errs := make(chan error, 1)
332 go func() {
333 _, err := watcher.Get(ctx)
334 errs <- err
335 }()
336 ctxC()
337 if want, got := ctx.Err(), <-errs; !errors.Is(got, want) {
338 t.Fatalf("Wanted err %v, got %v", want, got)
339 }
340
341 // Successfully canceled watch, resuming should continue to work.
342 q, cancel := wait(t, watcher)
343 defer cancel()
344
345 tc.put(t, k, "two")
346 if want, got := "two", <-q; want != got {
347 t.Fatalf("Wanted val %q, got %q", want, got)
348 }
349}
350
351// TestCancelOnGet ensures that a context cancellation on an initial Get (which
352// translates to an etcd Get in a backoff loop) doesn't block.
353func TestCancelOnGet(t *testing.T) {
354 tc := newTestClient(t)
355 defer tc.close()
356
357 k := "test-cancel-on-get"
358 value := NewValue(tc.namespaced, k, NoDecoder)
359 watcher := value.Watch()
360 tc.put(t, k, "one")
361
362 // Cause partition between client endpoint and rest of cluster. Any read/write
363 // operations will now hang.
364 tc.setEndpoints(0)
365 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
366
367 // Perform the initial Get(), which should attempt to retrieve a KV entry from
368 // the etcd service. This should hang. Unfortunately, there's no easy way to do
369 // this without an arbitrary sleep hoping that the client actually gets to the
370 // underlying etcd.Get call. This can cause false positives (eg. false 'pass'
371 // results) in this test.
372 ctx, ctxC := context.WithCancel(context.Background())
373 errs := make(chan error, 1)
374 go func() {
375 _, err := watcher.Get(ctx)
376 errs <- err
377 }()
378 time.Sleep(time.Second)
379
380 // Now that the etcd.Get is hanging, cancel the context.
381 ctxC()
382 // And now unpartition the cluster, resuming reads.
383 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
384
385 // The etcd.Get() call should've returned with a context cancellation.
386 err := <-errs
387 switch {
388 case err == nil:
389 t.Errorf("watcher.Get() returned no error, wanted context error")
390 case errors.Is(err, ctx.Err()):
391 // Okay.
392 default:
393 t.Errorf("watcher.Get() returned %v, wanted context error", err)
394 }
395}
396
397// TestClientReconnect forces a 'reconnection' of an active watcher from a
398// running member to another member, by stopping the original member and
399// explicitly reconnecting the client to other available members.
400//
401// This doe not reflect a situation expected during Metropolis runtime, as we
402// do not expect splits between an etcd client and its connected member
403// (instead, all etcd clients only connect to their local member). However, it
404// is still an important safety test to perform, and it also exercies the
405// equivalent behaviour of an etcd client re-connecting for any other reason.
406func TestClientReconnect(t *testing.T) {
407 tc := newTestClient(t)
408 defer tc.close()
409 tc.setEndpoints(0)
410
411 k := "test-client-reconnect"
412 value := NewValue(tc.namespaced, k, NoDecoder)
413 tc.put(t, k, "one")
414
415 watcher := value.Watch()
416 defer watcher.Close()
417 expect(t, watcher, "one")
418
419 q, cancel := wait(t, watcher)
420 defer cancel()
421
422 cluster.Members[0].Stop(t)
423 defer cluster.Members[0].Restart(t)
424 cluster.WaitLeader(t)
425
426 tc.setEndpoints(1, 2)
427 tc.put(t, k, "two")
428
429 if want, got := "two", <-q; want != got {
430 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
431 }
432}
433
434// TestClientPartition forces a temporary partition of the etcd member while a
435// watcher is running, updates the value from across the partition, and undoes
436// the partition.
437// The partition is expected to be entirely transparent to the watcher.
438func TestClientPartition(t *testing.T) {
439 tcOne := newTestClient(t)
440 defer tcOne.close()
441 tcOne.setEndpoints(0)
442
443 tcRest := newTestClient(t)
444 defer tcRest.close()
445 tcRest.setEndpoints(1, 2)
446
447 k := "test-client-partition"
448 valueOne := NewValue(tcOne.namespaced, k, NoDecoder)
449 watcherOne := valueOne.Watch()
450 defer watcherOne.Close()
451 valueRest := NewValue(tcRest.namespaced, k, NoDecoder)
452 watcherRest := valueRest.Watch()
453 defer watcherRest.Close()
454
455 tcRest.put(t, k, "a")
456 expect(t, watcherOne, "a")
457 expect(t, watcherRest, "a")
458
459 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
460
461 tcRest.put(t, k, "b")
462 expect(t, watcherRest, "b")
463 expectTimeout(t, watcherOne)
464
465 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
466
467 expect(t, watcherOne, "b")
468 tcRest.put(t, k, "c")
469 expect(t, watcherOne, "c")
470 expect(t, watcherRest, "c")
471
472}
473
474// TestEarlyUse exercises the correct behaviour of the value watcher on a value
475// that is not yet set.
476func TestEarlyUse(t *testing.T) {
477 tc := newTestClient(t)
478 defer tc.close()
479
480 k := "test-early-use"
481
482 value := NewValue(tc.namespaced, k, NoDecoder)
483 watcher := value.Watch()
484 defer watcher.Close()
485
486 wg := setSetupWg(watcher)
487 wg.Add(1)
488 q, cancel := wait(t, watcher)
489 defer cancel()
490
491 wg.Done()
492
493 tc.put(t, k, "one")
494
495 if want, got := "one", <-q; want != got {
496 t.Fatalf("Expected %q, got %q", want, got)
497 }
498}
499
500// TestRemove exercises the basic functionality of handling deleted values.
501func TestRemove(t *testing.T) {
502 tc := newTestClient(t)
503 defer tc.close()
504
505 k := "test-remove"
506 tc.put(t, k, "one")
507
508 value := NewValue(tc.namespaced, k, NoDecoder)
509 watcher := value.Watch()
510 defer watcher.Close()
511
512 expect(t, watcher, "one")
513 tc.remove(t, k)
514 expect(t, watcher, "")
515}
516
Serge Bazanski8d45a052021-10-18 17:24:24 +0200517// TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a
518// value is removed.
519func TestRemoveRange(t *testing.T) {
520 tc := newTestClient(t)
521 defer tc.close()
522
523 ks := "test-remove-range/"
524 ke := "test-remove-range0"
525 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
526 tc.put(t, ks+"a", "one")
527 tc.put(t, ks+"b", "two")
528 tc.put(t, ks+"c", "three")
529 tc.put(t, ks+"b", "four")
530 tc.remove(t, ks+"c")
531
532 w := value.Watch()
533 defer w.Close()
534
535 ctx, ctxC := context.WithCancel(context.Background())
536 defer ctxC()
537
538 res := make(map[string]string)
539 stringAtGet(ctx, t, w, res)
540 stringAtGet(ctx, t, w, res)
541
542 for _, te := range []struct {
543 k, w string
544 }{
545 {ks + "a", "one"},
546 {ks + "b", "four"},
547 {ks + "c", ""},
548 } {
549 if want, got := te.w, res[te.k]; want != got {
550 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
551 }
552 }
553}
554
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200555// TestEmptyRace forces the watcher to retrieve an empty value from the K/V
556// store at first, and establishing the watch channel after a new value has
557// been stored in the same place.
558func TestEmptyRace(t *testing.T) {
559 tc := newTestClient(t)
560 defer tc.close()
561
562 k := "test-remove-race"
563 tc.put(t, k, "one")
564 tc.remove(t, k)
565
566 value := NewValue(tc.namespaced, k, NoDecoder)
567 watcher := value.Watch()
568 defer watcher.Close()
569
570 wg := setRaceWg(watcher)
571 wg.Add(1)
572 q, cancel := wait(t, watcher)
573 defer cancel()
574
575 tc.put(t, k, "two")
576 wg.Done()
577
578 if want, got := "two", <-q; want != got {
579 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
580 }
581}
582
583type errOrInt struct {
584 val int64
585 err error
586}
587
588// TestDecoder exercises the BytesDecoder functionality of the watcher, by
589// creating a value with a decoder that only accepts string-encoded integers
590// that are divisible by three. The test then proceeds to put a handful of
591// values into etcd, ensuring that undecodable values correctly return an error
592// on Get, but that the watcher continues to work after the error has been
593// returned.
594func TestDecoder(t *testing.T) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200595 decodeStringifiedNumbersDivisibleBy3 := func(_, data []byte) (interface{}, error) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200596 num, err := strconv.ParseInt(string(data), 10, 64)
597 if err != nil {
598 return nil, fmt.Errorf("not a valid number")
599 }
600 if (num % 3) != 0 {
601 return nil, fmt.Errorf("not divisible by 3")
602 }
603 return num, nil
604 }
605
606 tc := newTestClient(t)
607 defer tc.close()
608
609 ctx, ctxC := context.WithCancel(context.Background())
610 defer ctxC()
611
612 k := "test-decoder"
613 value := NewValue(tc.namespaced, k, decodeStringifiedNumbersDivisibleBy3)
614 watcher := value.Watch()
615 defer watcher.Close()
616 tc.put(t, k, "3")
617 _, err := watcher.Get(ctx)
618 if err != nil {
619 t.Fatalf("Initial Get: %v", err)
620 }
621
622 // Stream updates into arbitrarily-bounded test channel.
623 queue := make(chan errOrInt, 100)
624 go func() {
625 for {
626 res, err := watcher.Get(ctx)
627 if err != nil && errors.Is(err, ctx.Err()) {
628 return
629 }
630 if err != nil {
631 queue <- errOrInt{
632 err: err,
633 }
634 } else {
635 queue <- errOrInt{
636 val: res.(int64),
637 }
638 }
639 }
640 }()
641
642 var wantList []*int64
643 wantError := func(val string) {
644 wantList = append(wantList, nil)
645 tc.put(t, k, val)
646 }
647 wantValue := func(val string, decoded int64) {
648 wantList = append(wantList, &decoded)
649 tc.put(t, k, val)
650 }
651
652 wantError("")
653 wantValue("9", 9)
654 wantError("foo")
655 wantValue("18", 18)
656 wantError("10")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200657 wantValue("27", 27)
658 wantValue("36", 36)
659
660 for i, want := range wantList {
661 q := <-queue
662 if want == nil && q.err == nil {
Serge Bazanski832bc772022-04-07 12:33:01 +0200663 t.Fatalf("Case %d: wanted error, got no error and value %d", i, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200664 }
665 if want != nil && (*want) != q.val {
Serge Bazanski832bc772022-04-07 12:33:01 +0200666 t.Fatalf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200667 }
668 }
669}
670
671// TestBacklog ensures that the watcher can handle a large backlog of changes
672// in etcd that the client didnt' keep up with, and that whatever final state
673// is available to the client when it actually gets to calling Get().
674func TestBacklog(t *testing.T) {
675 tc := newTestClient(t)
676 defer tc.close()
677
678 k := "test-backlog"
679 value := NewValue(tc.namespaced, k, NoDecoder)
680 watcher := value.Watch()
681 defer watcher.Close()
682
683 tc.put(t, k, "initial")
684 expect(t, watcher, "initial")
685
686 for i := 0; i < 1000; i++ {
687 tc.put(t, k, fmt.Sprintf("val-%d", i))
688 }
689
690 ctx, ctxC := context.WithTimeout(context.Background(), time.Second)
691 defer ctxC()
692 for {
693 valB, err := watcher.Get(ctx)
694 if err != nil {
695 t.Fatalf("Get() returned error before expected final value: %v", err)
696 }
697 val := string(valB.([]byte))
698 if val == "val-999" {
699 break
700 }
701 }
702}
Serge Bazanski8d45a052021-10-18 17:24:24 +0200703
704// TestBacklogRange ensures that the ranged etcd watcher can handle a large
705// backlog of changes in etcd that the client didn't keep up with.
706func TestBacklogRange(t *testing.T) {
707 tc := newTestClient(t)
708 defer tc.close()
709
710 ks := "test-backlog-range/"
711 ke := "test-backlog-range0"
712 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
713 w := value.Watch()
714 defer w.Close()
715
716 for i := 0; i < 100; i++ {
717 if i%2 == 0 {
718 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
719 } else {
720 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
721 }
722 }
723
724 ctx, ctxC := context.WithCancel(context.Background())
725 defer ctxC()
726
727 res := make(map[string]string)
728 stringAtGet(ctx, t, w, res)
729 stringAtGet(ctx, t, w, res)
730
731 for _, te := range []struct {
732 k, w string
733 }{
734 {ks + "a", "val-98"},
735 {ks + "b", "val-99"},
736 } {
737 if want, got := te.w, res[te.k]; want != got {
738 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
739 }
740 }
741}
742
743// TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers,
744// which effectively makes any Get operation non-blocking (but also showcases
745// that unless a Get without BacklogOnly is issues, no new data will appear by
746// itself in the watcher - which is an undocumented implementation detail of the
747// option).
748func TestBacklogOnly(t *testing.T) {
749 tc := newTestClient(t)
750 defer tc.close()
751 ctx, ctxC := context.WithCancel(context.Background())
752 defer ctxC()
753
754 k := "test-backlog-only"
755 tc.put(t, k, "initial")
756
757 value := NewValue(tc.namespaced, k, NoDecoder)
758 watcher := value.Watch()
759 defer watcher.Close()
760
761 d, err := watcher.Get(ctx, BacklogOnly)
762 if err != nil {
763 t.Fatalf("First Get failed: %v", err)
764 }
765 if want, got := "initial", string(d.([]byte)); want != got {
766 t.Fatalf("First Get: wanted value %q, got %q", want, got)
767 }
768
769 // As expected, next call to Get with BacklogOnly fails - there truly is no new
770 // updates to emit.
771 _, err = watcher.Get(ctx, BacklogOnly)
772 if want, got := BacklogDone, err; want != got {
773 t.Fatalf("Second Get: wanted %v, got %v", want, got)
774 }
775
776 // Implementation detail: even though there is a new value ('second'),
777 // BacklogOnly will still return BacklogDone.
778 tc.put(t, k, "second")
779 _, err = watcher.Get(ctx, BacklogOnly)
780 if want, got := BacklogDone, err; want != got {
781 t.Fatalf("Third Get: wanted %v, got %v", want, got)
782 }
783
784 // ... However, a Get without BacklogOnly will return the new value.
785 d, err = watcher.Get(ctx)
786 if err != nil {
787 t.Fatalf("Fourth Get failed: %v", err)
788 }
789 if want, got := "second", string(d.([]byte)); want != got {
790 t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
791 }
792}
793
794// TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers,
795// showcasing how it expected to be used for keeping up with the external state
796// of a range by synchronizing to a local map.
797func TestBacklogOnlyRange(t *testing.T) {
798 tc := newTestClient(t)
799 defer tc.close()
800 ctx, ctxC := context.WithCancel(context.Background())
801 defer ctxC()
802
803 ks := "test-backlog-only-range/"
804 ke := "test-backlog-only-range0"
805
806 for i := 0; i < 100; i++ {
807 if i%2 == 0 {
808 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
809 } else {
810 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
811 }
812 }
813
814 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
815 w := value.Watch()
816 defer w.Close()
817
818 // Collect results into a map from key to value.
819 res := make(map[string]string)
820
821 // Run first Get - this is the barrier defining what's part of the backlog.
822 g, err := w.Get(ctx, BacklogOnly)
823 if err != nil {
824 t.Fatalf("Get: %v", err)
825 }
826 kv := g.(stringAt)
827 res[kv.key] = kv.value
828
829 // These won't be part of the backlog.
830 tc.put(t, ks+"a", fmt.Sprintf("val-100"))
831 tc.put(t, ks+"b", fmt.Sprintf("val-101"))
832
833 // Retrieve the rest of the backlog until BacklogDone is returned.
834 nUpdates := 1
835 for {
836 g, err := w.Get(ctx, BacklogOnly)
837 if err == BacklogDone {
838 break
839 }
840 if err != nil {
841 t.Fatalf("Get: %v", err)
842 }
843 nUpdates += 1
844 kv := g.(stringAt)
845 res[kv.key] = kv.value
846 }
847
848 // The backlog should've been compacted to just two entries at their newest
849 // state.
850 if want, got := 2, nUpdates; want != got {
851 t.Fatalf("wanted backlog in %d updates, got it in %d", want, got)
852 }
853
854 for _, te := range []struct {
855 k, w string
856 }{
857 {ks + "a", "val-98"},
858 {ks + "b", "val-99"},
859 } {
860 if want, got := te.w, res[te.k]; want != got {
861 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
862 }
863 }
864}