blob: 2f5e283ac96e25a33059cc5ae79b34818e10d192 [file] [log] [blame]
Serge Bazanskic89df2f2021-04-27 15:51:37 +02001package etcd
2
3import (
4 "context"
5 "errors"
Lorenz Brund13c1c62022-03-30 19:58:58 +02006 "flag"
Serge Bazanskic89df2f2021-04-27 15:51:37 +02007 "fmt"
8 "log"
9 "os"
10 "strconv"
11 "sync"
12 "testing"
13 "time"
14
Lorenz Brund13c1c62022-03-30 19:58:58 +020015 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
16 "go.etcd.io/etcd/client/pkg/v3/testutil"
17 clientv3 "go.etcd.io/etcd/client/v3"
18 "go.etcd.io/etcd/tests/v3/integration"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020019 "google.golang.org/grpc/codes"
20
21 "source.monogon.dev/metropolis/node/core/consensus/client"
22 "source.monogon.dev/metropolis/pkg/event"
23)
24
25var (
26 cluster *integration.ClusterV3
27 endpoints []string
28)
29
30// TestMain brings up a 3 node etcd cluster for tests to use.
31func TestMain(m *testing.M) {
32 cfg := integration.ClusterConfig{
33 Size: 3,
34 GRPCKeepAliveMinTime: time.Millisecond,
35 }
Lorenz Brund13c1c62022-03-30 19:58:58 +020036 tb, cancel := testutil.NewTestingTBProthesis("curator")
37 defer cancel()
38 flag.Parse()
Serge Bazanskidefff522022-05-16 17:28:16 +020039 integration.BeforeTestExternal(tb)
Lorenz Brund13c1c62022-03-30 19:58:58 +020040 cluster = integration.NewClusterV3(tb, &cfg)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020041 endpoints = make([]string, 3)
42 for i := range endpoints {
43 endpoints[i] = cluster.Client(i).Endpoints()[0]
44 }
45
46 v := m.Run()
Lorenz Brund13c1c62022-03-30 19:58:58 +020047 cluster.Terminate(tb)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020048 os.Exit(v)
49}
50
51// setRaceWg creates a new WaitGroup and sets the given watcher to wait on this
52// WG after it performs the initial retrieval of a value from etcd, but before
53// it starts the watcher. This is used to test potential race conditions
54// present between these two steps.
55func setRaceWg(w event.Watcher) *sync.WaitGroup {
56 wg := sync.WaitGroup{}
57 w.(*watcher).testRaceWG = &wg
58 return &wg
59}
60
61// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
62// thie WG after an etcd watch channel is created. This is used in tests to
63// ensure that the watcher is fully created before it is tested.
64func setSetupWg(w event.Watcher) *sync.WaitGroup {
65 wg := sync.WaitGroup{}
66 w.(*watcher).testSetupWG = &wg
67 return &wg
68}
69
70// testClient is an etcd connection to the test cluster.
71type testClient struct {
72 client *clientv3.Client
73 namespaced client.Namespaced
74}
75
76func newTestClient(t *testing.T) *testClient {
77 t.Helper()
78 cli, err := clientv3.New(clientv3.Config{
79 Endpoints: endpoints,
80 DialTimeout: 1 * time.Second,
81 DialKeepAliveTime: 1 * time.Second,
82 DialKeepAliveTimeout: 1 * time.Second,
83 })
84 if err != nil {
85 t.Fatalf("clientv3.New: %v", err)
86 }
87
88 namespaced := client.NewLocal(cli)
89 return &testClient{
90 client: cli,
91 namespaced: namespaced,
92 }
93}
94
95func (d *testClient) close() {
96 d.client.Close()
97}
98
99// setEndpoints configures which endpoints (from {0,1,2}) the testClient is
100// connected to.
101func (d *testClient) setEndpoints(nums ...uint) {
102 var eps []string
103 for _, num := range nums {
104 eps = append(eps, endpoints[num])
105 }
106 d.client.SetEndpoints(eps...)
107}
108
109// put uses the testClient to store key with a given string value in etcd. It
110// contains retry logic that will block until the put is successful.
111func (d *testClient) put(t *testing.T, key, value string) {
112 t.Helper()
113 ctx, ctxC := context.WithCancel(context.Background())
114 defer ctxC()
115
116 for {
117 ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond)
118 _, err := d.namespaced.Put(ctxT, key, value)
119 ctxC()
120 if err == nil {
121 return
122 }
123 if err == ctxT.Err() {
124 log.Printf("Retrying after %v", err)
125 continue
126 }
127 // Retry on etcd unavailability - this will happen in this code as the
128 // etcd cluster repeatedly loses quorum.
129 var eerr rpctypes.EtcdError
130 if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable {
131 log.Printf("Retrying after %v", err)
132 continue
133 }
134 t.Fatalf("Put: %v", err)
135 }
136
137}
138
139// remove uses the testClient to remove the given key from etcd. It contains
140// retry logic that will block until the removal is successful.
141func (d *testClient) remove(t *testing.T, key string) {
142 t.Helper()
143 ctx, ctxC := context.WithCancel(context.Background())
144 defer ctxC()
145
146 _, err := d.namespaced.Delete(ctx, key)
147 if err == nil {
148 return
149 }
150 t.Fatalf("Delete: %v", err)
151}
152
153// expect runs a Get on the given Watcher, ensuring the returned value is a
154// given string.
155func expect(t *testing.T, w event.Watcher, value string) {
156 t.Helper()
157 ctx, ctxC := context.WithCancel(context.Background())
158 defer ctxC()
159
160 got, err := w.Get(ctx)
161 if err != nil {
162 t.Fatalf("Get: %v", err)
163 }
164
165 if got, want := string(got.([]byte)), value; got != want {
166 t.Errorf("Got value %q, wanted %q", want, got)
167 }
168}
169
170// expectTimeout ensures that the given watcher blocks on a Get call for at
171// least 100 milliseconds. This is used by tests to attempt to verify that the
172// watcher Get is fully blocked, but can cause false positives (eg. when Get
173// blocks for 101 milliseconds). Thus, this function should be used sparingly
174// and in tests that perform other baseline behaviour checks alongside this
175// test.
176func expectTimeout(t *testing.T, w event.Watcher) {
177 t.Helper()
178 ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
179 got, err := w.Get(ctx)
180 ctxC()
181
182 if !errors.Is(err, ctx.Err()) {
183 t.Fatalf("Expected timeout error, got %v, %v", got, err)
184 }
185}
186
187// wait wraps a watcher into a channel of strings, ensuring that the watcher
188// never errors on Get calls and always returns strings.
189func wait(t *testing.T, w event.Watcher) (chan string, func()) {
190 t.Helper()
191 ctx, ctxC := context.WithCancel(context.Background())
192
193 c := make(chan string)
194
195 go func() {
196 for {
197 got, err := w.Get(ctx)
198 if err != nil && errors.Is(err, ctx.Err()) {
199 return
200 }
201 if err != nil {
202 t.Fatalf("Get: %v", err)
203 }
204 c <- string(got.([]byte))
205 }
206 }()
207
208 return c, ctxC
209}
210
211// TestSimple exercises the simplest possible interaction with a watched value.
212func TestSimple(t *testing.T) {
213 tc := newTestClient(t)
214 defer tc.close()
215
216 k := "test-simple"
217 value := NewValue(tc.namespaced, k, NoDecoder)
218 tc.put(t, k, "one")
219
220 watcher := value.Watch()
221 defer watcher.Close()
222 expect(t, watcher, "one")
223
224 tc.put(t, k, "two")
225 expect(t, watcher, "two")
226
227 tc.put(t, k, "three")
228 tc.put(t, k, "four")
229 tc.put(t, k, "five")
230 tc.put(t, k, "six")
231
232 q, cancel := wait(t, watcher)
233 // Test will hang here if the above value does not receive the set "six".
234 for el := range q {
235 if el == "six" {
236 break
237 }
238 }
239 cancel()
240}
241
Serge Bazanski8d45a052021-10-18 17:24:24 +0200242// stringAt is a helper type for testing ranged watchers. It's returned by a
243// watcher whose decoder is set to stringDecoder.
244type stringAt struct {
245 key, value string
246}
247
248func stringAtDecoder(key, value []byte) (interface{}, error) {
249 valueS := ""
250 if value != nil {
251 valueS = string(value)
252 }
253 return stringAt{
254 key: string(key),
255 value: valueS,
256 }, nil
257}
258
259// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
260// the given map with the retrieved value.
261func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher, m map[string]string) {
262 t.Helper()
263
264 vr, err := w.Get(ctx)
265 if err != nil {
266 t.Fatalf("Get: %v", err)
267 }
268 v := vr.(stringAt)
269 m[v.key] = v.value
270}
271
272// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
273// retrieving updaates via Get in a fully blocking fashion.
274func TestSimpleRange(t *testing.T) {
275 tc := newTestClient(t)
276 defer tc.close()
277
278 ks := "test-simple-range/"
279 ke := "test-simple-range0"
280 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
281 tc.put(t, ks+"a", "one")
282 tc.put(t, ks+"b", "two")
283 tc.put(t, ks+"c", "three")
284 tc.put(t, ks+"b", "four")
285
286 w := value.Watch()
287 defer w.Close()
288
289 ctx, ctxC := context.WithCancel(context.Background())
290 defer ctxC()
291
292 res := make(map[string]string)
293 stringAtGet(ctx, t, w, res)
294 stringAtGet(ctx, t, w, res)
295 stringAtGet(ctx, t, w, res)
296
297 tc.put(t, ks+"a", "five")
298 tc.put(t, ks+"e", "six")
299
300 stringAtGet(ctx, t, w, res)
301 stringAtGet(ctx, t, w, res)
302
303 for _, te := range []struct {
304 k, w string
305 }{
306 {ks + "a", "five"},
307 {ks + "b", "four"},
308 {ks + "c", "three"},
309 {ks + "e", "six"},
310 } {
311 if want, got := te.w, res[te.k]; want != got {
312 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
313 }
314 }
315}
316
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200317// TestCancel ensures that watchers can resume after being canceled.
318func TestCancel(t *testing.T) {
319 tc := newTestClient(t)
320 defer tc.close()
321
322 k := "test-cancel"
323 value := NewValue(tc.namespaced, k, NoDecoder)
324 tc.put(t, k, "one")
325
326 watcher := value.Watch()
327 defer watcher.Close()
328 expect(t, watcher, "one")
329
330 ctx, ctxC := context.WithCancel(context.Background())
331 errs := make(chan error, 1)
332 go func() {
333 _, err := watcher.Get(ctx)
334 errs <- err
335 }()
336 ctxC()
337 if want, got := ctx.Err(), <-errs; !errors.Is(got, want) {
338 t.Fatalf("Wanted err %v, got %v", want, got)
339 }
340
341 // Successfully canceled watch, resuming should continue to work.
342 q, cancel := wait(t, watcher)
343 defer cancel()
344
345 tc.put(t, k, "two")
346 if want, got := "two", <-q; want != got {
347 t.Fatalf("Wanted val %q, got %q", want, got)
348 }
349}
350
351// TestCancelOnGet ensures that a context cancellation on an initial Get (which
352// translates to an etcd Get in a backoff loop) doesn't block.
353func TestCancelOnGet(t *testing.T) {
354 tc := newTestClient(t)
355 defer tc.close()
356
357 k := "test-cancel-on-get"
358 value := NewValue(tc.namespaced, k, NoDecoder)
359 watcher := value.Watch()
360 tc.put(t, k, "one")
361
362 // Cause partition between client endpoint and rest of cluster. Any read/write
363 // operations will now hang.
364 tc.setEndpoints(0)
365 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
Serge Bazanskiad10ece2022-05-17 11:20:17 +0200366 // Let raft timeouts expire so that the leader is aware a partition has occurred
367 // and stops serving data if it is not part of a quorum anymore.
368 //
369 // Otherwise, if Member[0] was the leader, there will be a window of opportunity
370 // during which it will continue to serve read data even though it has been
371 // partitioned off. This is an effect of how etcd handles linearizable reads:
372 // they go through the leader, but do not go through raft.
373 //
374 // The value is the default etcd leader timeout (1s) + some wiggle room.
375 time.Sleep(time.Second + time.Millisecond*100)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200376
377 // Perform the initial Get(), which should attempt to retrieve a KV entry from
378 // the etcd service. This should hang. Unfortunately, there's no easy way to do
379 // this without an arbitrary sleep hoping that the client actually gets to the
380 // underlying etcd.Get call. This can cause false positives (eg. false 'pass'
381 // results) in this test.
382 ctx, ctxC := context.WithCancel(context.Background())
383 errs := make(chan error, 1)
384 go func() {
385 _, err := watcher.Get(ctx)
386 errs <- err
387 }()
388 time.Sleep(time.Second)
389
390 // Now that the etcd.Get is hanging, cancel the context.
391 ctxC()
392 // And now unpartition the cluster, resuming reads.
393 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
394
395 // The etcd.Get() call should've returned with a context cancellation.
396 err := <-errs
397 switch {
398 case err == nil:
399 t.Errorf("watcher.Get() returned no error, wanted context error")
400 case errors.Is(err, ctx.Err()):
401 // Okay.
402 default:
403 t.Errorf("watcher.Get() returned %v, wanted context error", err)
404 }
405}
406
407// TestClientReconnect forces a 'reconnection' of an active watcher from a
408// running member to another member, by stopping the original member and
409// explicitly reconnecting the client to other available members.
410//
411// This doe not reflect a situation expected during Metropolis runtime, as we
412// do not expect splits between an etcd client and its connected member
413// (instead, all etcd clients only connect to their local member). However, it
414// is still an important safety test to perform, and it also exercies the
415// equivalent behaviour of an etcd client re-connecting for any other reason.
416func TestClientReconnect(t *testing.T) {
417 tc := newTestClient(t)
418 defer tc.close()
419 tc.setEndpoints(0)
420
421 k := "test-client-reconnect"
422 value := NewValue(tc.namespaced, k, NoDecoder)
423 tc.put(t, k, "one")
424
425 watcher := value.Watch()
426 defer watcher.Close()
427 expect(t, watcher, "one")
428
429 q, cancel := wait(t, watcher)
430 defer cancel()
431
432 cluster.Members[0].Stop(t)
433 defer cluster.Members[0].Restart(t)
434 cluster.WaitLeader(t)
435
436 tc.setEndpoints(1, 2)
437 tc.put(t, k, "two")
438
439 if want, got := "two", <-q; want != got {
440 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
441 }
442}
443
444// TestClientPartition forces a temporary partition of the etcd member while a
445// watcher is running, updates the value from across the partition, and undoes
446// the partition.
447// The partition is expected to be entirely transparent to the watcher.
448func TestClientPartition(t *testing.T) {
449 tcOne := newTestClient(t)
450 defer tcOne.close()
451 tcOne.setEndpoints(0)
452
453 tcRest := newTestClient(t)
454 defer tcRest.close()
455 tcRest.setEndpoints(1, 2)
456
457 k := "test-client-partition"
458 valueOne := NewValue(tcOne.namespaced, k, NoDecoder)
459 watcherOne := valueOne.Watch()
460 defer watcherOne.Close()
461 valueRest := NewValue(tcRest.namespaced, k, NoDecoder)
462 watcherRest := valueRest.Watch()
463 defer watcherRest.Close()
464
465 tcRest.put(t, k, "a")
466 expect(t, watcherOne, "a")
467 expect(t, watcherRest, "a")
468
469 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
470
471 tcRest.put(t, k, "b")
472 expect(t, watcherRest, "b")
473 expectTimeout(t, watcherOne)
474
475 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
476
477 expect(t, watcherOne, "b")
478 tcRest.put(t, k, "c")
479 expect(t, watcherOne, "c")
480 expect(t, watcherRest, "c")
481
482}
483
484// TestEarlyUse exercises the correct behaviour of the value watcher on a value
485// that is not yet set.
486func TestEarlyUse(t *testing.T) {
487 tc := newTestClient(t)
488 defer tc.close()
489
490 k := "test-early-use"
491
492 value := NewValue(tc.namespaced, k, NoDecoder)
493 watcher := value.Watch()
494 defer watcher.Close()
495
496 wg := setSetupWg(watcher)
497 wg.Add(1)
498 q, cancel := wait(t, watcher)
499 defer cancel()
500
501 wg.Done()
502
503 tc.put(t, k, "one")
504
505 if want, got := "one", <-q; want != got {
506 t.Fatalf("Expected %q, got %q", want, got)
507 }
508}
509
510// TestRemove exercises the basic functionality of handling deleted values.
511func TestRemove(t *testing.T) {
512 tc := newTestClient(t)
513 defer tc.close()
514
515 k := "test-remove"
516 tc.put(t, k, "one")
517
518 value := NewValue(tc.namespaced, k, NoDecoder)
519 watcher := value.Watch()
520 defer watcher.Close()
521
522 expect(t, watcher, "one")
523 tc.remove(t, k)
524 expect(t, watcher, "")
525}
526
Serge Bazanski8d45a052021-10-18 17:24:24 +0200527// TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a
528// value is removed.
529func TestRemoveRange(t *testing.T) {
530 tc := newTestClient(t)
531 defer tc.close()
532
533 ks := "test-remove-range/"
534 ke := "test-remove-range0"
535 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
536 tc.put(t, ks+"a", "one")
537 tc.put(t, ks+"b", "two")
538 tc.put(t, ks+"c", "three")
539 tc.put(t, ks+"b", "four")
540 tc.remove(t, ks+"c")
541
542 w := value.Watch()
543 defer w.Close()
544
545 ctx, ctxC := context.WithCancel(context.Background())
546 defer ctxC()
547
548 res := make(map[string]string)
549 stringAtGet(ctx, t, w, res)
550 stringAtGet(ctx, t, w, res)
551
552 for _, te := range []struct {
553 k, w string
554 }{
555 {ks + "a", "one"},
556 {ks + "b", "four"},
557 {ks + "c", ""},
558 } {
559 if want, got := te.w, res[te.k]; want != got {
560 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
561 }
562 }
563}
564
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200565// TestEmptyRace forces the watcher to retrieve an empty value from the K/V
566// store at first, and establishing the watch channel after a new value has
567// been stored in the same place.
568func TestEmptyRace(t *testing.T) {
569 tc := newTestClient(t)
570 defer tc.close()
571
572 k := "test-remove-race"
573 tc.put(t, k, "one")
574 tc.remove(t, k)
575
576 value := NewValue(tc.namespaced, k, NoDecoder)
577 watcher := value.Watch()
578 defer watcher.Close()
579
580 wg := setRaceWg(watcher)
581 wg.Add(1)
582 q, cancel := wait(t, watcher)
583 defer cancel()
584
585 tc.put(t, k, "two")
586 wg.Done()
587
588 if want, got := "two", <-q; want != got {
589 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
590 }
591}
592
593type errOrInt struct {
594 val int64
595 err error
596}
597
598// TestDecoder exercises the BytesDecoder functionality of the watcher, by
599// creating a value with a decoder that only accepts string-encoded integers
600// that are divisible by three. The test then proceeds to put a handful of
601// values into etcd, ensuring that undecodable values correctly return an error
602// on Get, but that the watcher continues to work after the error has been
603// returned.
604func TestDecoder(t *testing.T) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200605 decodeStringifiedNumbersDivisibleBy3 := func(_, data []byte) (interface{}, error) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200606 num, err := strconv.ParseInt(string(data), 10, 64)
607 if err != nil {
608 return nil, fmt.Errorf("not a valid number")
609 }
610 if (num % 3) != 0 {
611 return nil, fmt.Errorf("not divisible by 3")
612 }
613 return num, nil
614 }
615
616 tc := newTestClient(t)
617 defer tc.close()
618
619 ctx, ctxC := context.WithCancel(context.Background())
620 defer ctxC()
621
622 k := "test-decoder"
623 value := NewValue(tc.namespaced, k, decodeStringifiedNumbersDivisibleBy3)
624 watcher := value.Watch()
625 defer watcher.Close()
626 tc.put(t, k, "3")
627 _, err := watcher.Get(ctx)
628 if err != nil {
629 t.Fatalf("Initial Get: %v", err)
630 }
631
632 // Stream updates into arbitrarily-bounded test channel.
633 queue := make(chan errOrInt, 100)
634 go func() {
635 for {
636 res, err := watcher.Get(ctx)
637 if err != nil && errors.Is(err, ctx.Err()) {
638 return
639 }
640 if err != nil {
641 queue <- errOrInt{
642 err: err,
643 }
644 } else {
645 queue <- errOrInt{
646 val: res.(int64),
647 }
648 }
649 }
650 }()
651
652 var wantList []*int64
653 wantError := func(val string) {
654 wantList = append(wantList, nil)
655 tc.put(t, k, val)
656 }
657 wantValue := func(val string, decoded int64) {
658 wantList = append(wantList, &decoded)
659 tc.put(t, k, val)
660 }
661
662 wantError("")
663 wantValue("9", 9)
664 wantError("foo")
665 wantValue("18", 18)
666 wantError("10")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200667 wantValue("27", 27)
668 wantValue("36", 36)
669
670 for i, want := range wantList {
671 q := <-queue
672 if want == nil && q.err == nil {
Serge Bazanski832bc772022-04-07 12:33:01 +0200673 t.Fatalf("Case %d: wanted error, got no error and value %d", i, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200674 }
675 if want != nil && (*want) != q.val {
Serge Bazanski832bc772022-04-07 12:33:01 +0200676 t.Fatalf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200677 }
678 }
679}
680
681// TestBacklog ensures that the watcher can handle a large backlog of changes
682// in etcd that the client didnt' keep up with, and that whatever final state
683// is available to the client when it actually gets to calling Get().
684func TestBacklog(t *testing.T) {
685 tc := newTestClient(t)
686 defer tc.close()
687
688 k := "test-backlog"
689 value := NewValue(tc.namespaced, k, NoDecoder)
690 watcher := value.Watch()
691 defer watcher.Close()
692
693 tc.put(t, k, "initial")
694 expect(t, watcher, "initial")
695
696 for i := 0; i < 1000; i++ {
697 tc.put(t, k, fmt.Sprintf("val-%d", i))
698 }
699
700 ctx, ctxC := context.WithTimeout(context.Background(), time.Second)
701 defer ctxC()
702 for {
703 valB, err := watcher.Get(ctx)
704 if err != nil {
705 t.Fatalf("Get() returned error before expected final value: %v", err)
706 }
707 val := string(valB.([]byte))
708 if val == "val-999" {
709 break
710 }
711 }
712}
Serge Bazanski8d45a052021-10-18 17:24:24 +0200713
714// TestBacklogRange ensures that the ranged etcd watcher can handle a large
715// backlog of changes in etcd that the client didn't keep up with.
716func TestBacklogRange(t *testing.T) {
717 tc := newTestClient(t)
718 defer tc.close()
719
720 ks := "test-backlog-range/"
721 ke := "test-backlog-range0"
722 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
723 w := value.Watch()
724 defer w.Close()
725
726 for i := 0; i < 100; i++ {
727 if i%2 == 0 {
728 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
729 } else {
730 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
731 }
732 }
733
734 ctx, ctxC := context.WithCancel(context.Background())
735 defer ctxC()
736
737 res := make(map[string]string)
738 stringAtGet(ctx, t, w, res)
739 stringAtGet(ctx, t, w, res)
740
741 for _, te := range []struct {
742 k, w string
743 }{
744 {ks + "a", "val-98"},
745 {ks + "b", "val-99"},
746 } {
747 if want, got := te.w, res[te.k]; want != got {
748 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
749 }
750 }
751}
752
753// TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers,
754// which effectively makes any Get operation non-blocking (but also showcases
755// that unless a Get without BacklogOnly is issues, no new data will appear by
756// itself in the watcher - which is an undocumented implementation detail of the
757// option).
758func TestBacklogOnly(t *testing.T) {
759 tc := newTestClient(t)
760 defer tc.close()
761 ctx, ctxC := context.WithCancel(context.Background())
762 defer ctxC()
763
764 k := "test-backlog-only"
765 tc.put(t, k, "initial")
766
767 value := NewValue(tc.namespaced, k, NoDecoder)
768 watcher := value.Watch()
769 defer watcher.Close()
770
771 d, err := watcher.Get(ctx, BacklogOnly)
772 if err != nil {
773 t.Fatalf("First Get failed: %v", err)
774 }
775 if want, got := "initial", string(d.([]byte)); want != got {
776 t.Fatalf("First Get: wanted value %q, got %q", want, got)
777 }
778
779 // As expected, next call to Get with BacklogOnly fails - there truly is no new
780 // updates to emit.
781 _, err = watcher.Get(ctx, BacklogOnly)
782 if want, got := BacklogDone, err; want != got {
783 t.Fatalf("Second Get: wanted %v, got %v", want, got)
784 }
785
786 // Implementation detail: even though there is a new value ('second'),
787 // BacklogOnly will still return BacklogDone.
788 tc.put(t, k, "second")
789 _, err = watcher.Get(ctx, BacklogOnly)
790 if want, got := BacklogDone, err; want != got {
791 t.Fatalf("Third Get: wanted %v, got %v", want, got)
792 }
793
794 // ... However, a Get without BacklogOnly will return the new value.
795 d, err = watcher.Get(ctx)
796 if err != nil {
797 t.Fatalf("Fourth Get failed: %v", err)
798 }
799 if want, got := "second", string(d.([]byte)); want != got {
800 t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
801 }
802}
803
804// TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers,
805// showcasing how it expected to be used for keeping up with the external state
806// of a range by synchronizing to a local map.
807func TestBacklogOnlyRange(t *testing.T) {
808 tc := newTestClient(t)
809 defer tc.close()
810 ctx, ctxC := context.WithCancel(context.Background())
811 defer ctxC()
812
813 ks := "test-backlog-only-range/"
814 ke := "test-backlog-only-range0"
815
816 for i := 0; i < 100; i++ {
817 if i%2 == 0 {
818 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
819 } else {
820 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
821 }
822 }
823
824 value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
825 w := value.Watch()
826 defer w.Close()
827
828 // Collect results into a map from key to value.
829 res := make(map[string]string)
830
831 // Run first Get - this is the barrier defining what's part of the backlog.
832 g, err := w.Get(ctx, BacklogOnly)
833 if err != nil {
834 t.Fatalf("Get: %v", err)
835 }
836 kv := g.(stringAt)
837 res[kv.key] = kv.value
838
839 // These won't be part of the backlog.
840 tc.put(t, ks+"a", fmt.Sprintf("val-100"))
841 tc.put(t, ks+"b", fmt.Sprintf("val-101"))
842
843 // Retrieve the rest of the backlog until BacklogDone is returned.
844 nUpdates := 1
845 for {
846 g, err := w.Get(ctx, BacklogOnly)
847 if err == BacklogDone {
848 break
849 }
850 if err != nil {
851 t.Fatalf("Get: %v", err)
852 }
853 nUpdates += 1
854 kv := g.(stringAt)
855 res[kv.key] = kv.value
856 }
857
858 // The backlog should've been compacted to just two entries at their newest
859 // state.
860 if want, got := 2, nUpdates; want != got {
861 t.Fatalf("wanted backlog in %d updates, got it in %d", want, got)
862 }
863
864 for _, te := range []struct {
865 k, w string
866 }{
867 {ks + "a", "val-98"},
868 {ks + "b", "val-99"},
869 } {
870 if want, got := te.w, res[te.k]; want != got {
871 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
872 }
873 }
874}