blob: 81aee5175882970bd68b3b40f48830df78befadc [file] [log] [blame]
Serge Bazanskic89df2f2021-04-27 15:51:37 +02001package etcd
2
3import (
4 "context"
5 "errors"
Lorenz Brund13c1c62022-03-30 19:58:58 +02006 "flag"
Serge Bazanskic89df2f2021-04-27 15:51:37 +02007 "fmt"
8 "log"
9 "os"
10 "strconv"
11 "sync"
12 "testing"
13 "time"
14
Lorenz Brund13c1c62022-03-30 19:58:58 +020015 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
16 "go.etcd.io/etcd/client/pkg/v3/testutil"
17 clientv3 "go.etcd.io/etcd/client/v3"
18 "go.etcd.io/etcd/tests/v3/integration"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020019 "google.golang.org/grpc/codes"
20
21 "source.monogon.dev/metropolis/node/core/consensus/client"
22 "source.monogon.dev/metropolis/pkg/event"
23)
24
25var (
26 cluster *integration.ClusterV3
27 endpoints []string
28)
29
30// TestMain brings up a 3 node etcd cluster for tests to use.
31func TestMain(m *testing.M) {
32 cfg := integration.ClusterConfig{
33 Size: 3,
34 GRPCKeepAliveMinTime: time.Millisecond,
35 }
Lorenz Brund13c1c62022-03-30 19:58:58 +020036 tb, cancel := testutil.NewTestingTBProthesis("curator")
37 defer cancel()
38 flag.Parse()
Serge Bazanskidefff522022-05-16 17:28:16 +020039 integration.BeforeTestExternal(tb)
Lorenz Brund13c1c62022-03-30 19:58:58 +020040 cluster = integration.NewClusterV3(tb, &cfg)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020041 endpoints = make([]string, 3)
42 for i := range endpoints {
43 endpoints[i] = cluster.Client(i).Endpoints()[0]
44 }
45
46 v := m.Run()
Lorenz Brund13c1c62022-03-30 19:58:58 +020047 cluster.Terminate(tb)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020048 os.Exit(v)
49}
50
51// setRaceWg creates a new WaitGroup and sets the given watcher to wait on this
52// WG after it performs the initial retrieval of a value from etcd, but before
53// it starts the watcher. This is used to test potential race conditions
54// present between these two steps.
Serge Bazanski37110c32023-03-01 13:57:27 +000055func setRaceWg[T any](w event.Watcher[T]) *sync.WaitGroup {
Serge Bazanskic89df2f2021-04-27 15:51:37 +020056 wg := sync.WaitGroup{}
Serge Bazanski37110c32023-03-01 13:57:27 +000057 w.(*watcher[T]).testRaceWG = &wg
Serge Bazanskic89df2f2021-04-27 15:51:37 +020058 return &wg
59}
60
61// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
62// thie WG after an etcd watch channel is created. This is used in tests to
63// ensure that the watcher is fully created before it is tested.
Serge Bazanski37110c32023-03-01 13:57:27 +000064func setSetupWg[T any](w event.Watcher[T]) *sync.WaitGroup {
Serge Bazanskic89df2f2021-04-27 15:51:37 +020065 wg := sync.WaitGroup{}
Serge Bazanski37110c32023-03-01 13:57:27 +000066 w.(*watcher[T]).testSetupWG = &wg
Serge Bazanskic89df2f2021-04-27 15:51:37 +020067 return &wg
68}
69
70// testClient is an etcd connection to the test cluster.
71type testClient struct {
72 client *clientv3.Client
73 namespaced client.Namespaced
74}
75
76func newTestClient(t *testing.T) *testClient {
77 t.Helper()
78 cli, err := clientv3.New(clientv3.Config{
79 Endpoints: endpoints,
80 DialTimeout: 1 * time.Second,
81 DialKeepAliveTime: 1 * time.Second,
82 DialKeepAliveTimeout: 1 * time.Second,
83 })
84 if err != nil {
85 t.Fatalf("clientv3.New: %v", err)
86 }
87
88 namespaced := client.NewLocal(cli)
89 return &testClient{
90 client: cli,
91 namespaced: namespaced,
92 }
93}
94
95func (d *testClient) close() {
96 d.client.Close()
97}
98
99// setEndpoints configures which endpoints (from {0,1,2}) the testClient is
100// connected to.
101func (d *testClient) setEndpoints(nums ...uint) {
102 var eps []string
103 for _, num := range nums {
104 eps = append(eps, endpoints[num])
105 }
106 d.client.SetEndpoints(eps...)
107}
108
109// put uses the testClient to store key with a given string value in etcd. It
110// contains retry logic that will block until the put is successful.
111func (d *testClient) put(t *testing.T, key, value string) {
112 t.Helper()
113 ctx, ctxC := context.WithCancel(context.Background())
114 defer ctxC()
115
116 for {
117 ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond)
118 _, err := d.namespaced.Put(ctxT, key, value)
119 ctxC()
120 if err == nil {
121 return
122 }
123 if err == ctxT.Err() {
124 log.Printf("Retrying after %v", err)
125 continue
126 }
127 // Retry on etcd unavailability - this will happen in this code as the
128 // etcd cluster repeatedly loses quorum.
129 var eerr rpctypes.EtcdError
130 if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable {
131 log.Printf("Retrying after %v", err)
132 continue
133 }
134 t.Fatalf("Put: %v", err)
135 }
136
137}
138
139// remove uses the testClient to remove the given key from etcd. It contains
140// retry logic that will block until the removal is successful.
141func (d *testClient) remove(t *testing.T, key string) {
142 t.Helper()
143 ctx, ctxC := context.WithCancel(context.Background())
144 defer ctxC()
145
146 _, err := d.namespaced.Delete(ctx, key)
147 if err == nil {
148 return
149 }
150 t.Fatalf("Delete: %v", err)
151}
152
153// expect runs a Get on the given Watcher, ensuring the returned value is a
154// given string.
Serge Bazanski37110c32023-03-01 13:57:27 +0000155func expect(t *testing.T, w event.Watcher[StringAt], value string) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200156 t.Helper()
157 ctx, ctxC := context.WithCancel(context.Background())
158 defer ctxC()
159
160 got, err := w.Get(ctx)
161 if err != nil {
162 t.Fatalf("Get: %v", err)
163 }
164
Serge Bazanski37110c32023-03-01 13:57:27 +0000165 if got, want := got.Value, value; got != want {
Serge Bazanskibbb873d2022-06-24 14:22:39 +0200166 t.Errorf("Wanted value %q, got %q", want, got)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200167 }
168}
169
170// expectTimeout ensures that the given watcher blocks on a Get call for at
171// least 100 milliseconds. This is used by tests to attempt to verify that the
172// watcher Get is fully blocked, but can cause false positives (eg. when Get
173// blocks for 101 milliseconds). Thus, this function should be used sparingly
174// and in tests that perform other baseline behaviour checks alongside this
175// test.
Serge Bazanski37110c32023-03-01 13:57:27 +0000176func expectTimeout[T any](t *testing.T, w event.Watcher[T]) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200177 t.Helper()
178 ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
179 got, err := w.Get(ctx)
180 ctxC()
181
182 if !errors.Is(err, ctx.Err()) {
183 t.Fatalf("Expected timeout error, got %v, %v", got, err)
184 }
185}
186
187// wait wraps a watcher into a channel of strings, ensuring that the watcher
188// never errors on Get calls and always returns strings.
Serge Bazanski37110c32023-03-01 13:57:27 +0000189func wait(t *testing.T, w event.Watcher[StringAt]) (chan string, func()) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200190 t.Helper()
191 ctx, ctxC := context.WithCancel(context.Background())
192
193 c := make(chan string)
194
195 go func() {
196 for {
197 got, err := w.Get(ctx)
198 if err != nil && errors.Is(err, ctx.Err()) {
199 return
200 }
201 if err != nil {
202 t.Fatalf("Get: %v", err)
203 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000204 c <- got.Value
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200205 }
206 }()
207
208 return c, ctxC
209}
210
211// TestSimple exercises the simplest possible interaction with a watched value.
212func TestSimple(t *testing.T) {
213 tc := newTestClient(t)
214 defer tc.close()
215
216 k := "test-simple"
Serge Bazanski37110c32023-03-01 13:57:27 +0000217 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200218 tc.put(t, k, "one")
219
220 watcher := value.Watch()
221 defer watcher.Close()
222 expect(t, watcher, "one")
223
224 tc.put(t, k, "two")
225 expect(t, watcher, "two")
226
227 tc.put(t, k, "three")
228 tc.put(t, k, "four")
229 tc.put(t, k, "five")
230 tc.put(t, k, "six")
231
232 q, cancel := wait(t, watcher)
233 // Test will hang here if the above value does not receive the set "six".
Serge Bazanski37110c32023-03-01 13:57:27 +0000234 log.Printf("a")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200235 for el := range q {
Serge Bazanski37110c32023-03-01 13:57:27 +0000236 log.Printf("%q", el)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200237 if el == "six" {
238 break
239 }
240 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000241 log.Printf("b")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200242 cancel()
243}
244
Serge Bazanski8d45a052021-10-18 17:24:24 +0200245// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
246// the given map with the retrieved value.
Serge Bazanski37110c32023-03-01 13:57:27 +0000247func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher[StringAt], m map[string]string) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200248 t.Helper()
249
250 vr, err := w.Get(ctx)
251 if err != nil {
252 t.Fatalf("Get: %v", err)
253 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000254 m[vr.Key] = vr.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200255}
256
257// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
258// retrieving updaates via Get in a fully blocking fashion.
259func TestSimpleRange(t *testing.T) {
260 tc := newTestClient(t)
261 defer tc.close()
262
263 ks := "test-simple-range/"
264 ke := "test-simple-range0"
Serge Bazanski37110c32023-03-01 13:57:27 +0000265 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200266 tc.put(t, ks+"a", "one")
267 tc.put(t, ks+"b", "two")
268 tc.put(t, ks+"c", "three")
269 tc.put(t, ks+"b", "four")
270
271 w := value.Watch()
272 defer w.Close()
273
274 ctx, ctxC := context.WithCancel(context.Background())
275 defer ctxC()
276
277 res := make(map[string]string)
278 stringAtGet(ctx, t, w, res)
279 stringAtGet(ctx, t, w, res)
280 stringAtGet(ctx, t, w, res)
281
282 tc.put(t, ks+"a", "five")
283 tc.put(t, ks+"e", "six")
284
285 stringAtGet(ctx, t, w, res)
286 stringAtGet(ctx, t, w, res)
287
288 for _, te := range []struct {
289 k, w string
290 }{
291 {ks + "a", "five"},
292 {ks + "b", "four"},
293 {ks + "c", "three"},
294 {ks + "e", "six"},
295 } {
296 if want, got := te.w, res[te.k]; want != got {
297 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
298 }
299 }
300}
301
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200302// TestCancel ensures that watchers can resume after being canceled.
303func TestCancel(t *testing.T) {
304 tc := newTestClient(t)
305 defer tc.close()
306
307 k := "test-cancel"
Serge Bazanski37110c32023-03-01 13:57:27 +0000308 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200309 tc.put(t, k, "one")
310
311 watcher := value.Watch()
312 defer watcher.Close()
313 expect(t, watcher, "one")
314
315 ctx, ctxC := context.WithCancel(context.Background())
316 errs := make(chan error, 1)
317 go func() {
318 _, err := watcher.Get(ctx)
319 errs <- err
320 }()
321 ctxC()
322 if want, got := ctx.Err(), <-errs; !errors.Is(got, want) {
323 t.Fatalf("Wanted err %v, got %v", want, got)
324 }
325
326 // Successfully canceled watch, resuming should continue to work.
327 q, cancel := wait(t, watcher)
328 defer cancel()
329
330 tc.put(t, k, "two")
331 if want, got := "two", <-q; want != got {
332 t.Fatalf("Wanted val %q, got %q", want, got)
333 }
334}
335
336// TestCancelOnGet ensures that a context cancellation on an initial Get (which
337// translates to an etcd Get in a backoff loop) doesn't block.
338func TestCancelOnGet(t *testing.T) {
339 tc := newTestClient(t)
340 defer tc.close()
341
342 k := "test-cancel-on-get"
Serge Bazanski37110c32023-03-01 13:57:27 +0000343 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200344 watcher := value.Watch()
345 tc.put(t, k, "one")
346
347 // Cause partition between client endpoint and rest of cluster. Any read/write
348 // operations will now hang.
349 tc.setEndpoints(0)
350 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
Serge Bazanskiad10ece2022-05-17 11:20:17 +0200351 // Let raft timeouts expire so that the leader is aware a partition has occurred
352 // and stops serving data if it is not part of a quorum anymore.
353 //
354 // Otherwise, if Member[0] was the leader, there will be a window of opportunity
355 // during which it will continue to serve read data even though it has been
356 // partitioned off. This is an effect of how etcd handles linearizable reads:
357 // they go through the leader, but do not go through raft.
358 //
359 // The value is the default etcd leader timeout (1s) + some wiggle room.
360 time.Sleep(time.Second + time.Millisecond*100)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200361
362 // Perform the initial Get(), which should attempt to retrieve a KV entry from
363 // the etcd service. This should hang. Unfortunately, there's no easy way to do
364 // this without an arbitrary sleep hoping that the client actually gets to the
365 // underlying etcd.Get call. This can cause false positives (eg. false 'pass'
366 // results) in this test.
367 ctx, ctxC := context.WithCancel(context.Background())
368 errs := make(chan error, 1)
369 go func() {
370 _, err := watcher.Get(ctx)
371 errs <- err
372 }()
373 time.Sleep(time.Second)
374
375 // Now that the etcd.Get is hanging, cancel the context.
376 ctxC()
377 // And now unpartition the cluster, resuming reads.
378 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
379
380 // The etcd.Get() call should've returned with a context cancellation.
381 err := <-errs
382 switch {
383 case err == nil:
384 t.Errorf("watcher.Get() returned no error, wanted context error")
385 case errors.Is(err, ctx.Err()):
386 // Okay.
387 default:
388 t.Errorf("watcher.Get() returned %v, wanted context error", err)
389 }
390}
391
392// TestClientReconnect forces a 'reconnection' of an active watcher from a
393// running member to another member, by stopping the original member and
394// explicitly reconnecting the client to other available members.
395//
396// This doe not reflect a situation expected during Metropolis runtime, as we
397// do not expect splits between an etcd client and its connected member
398// (instead, all etcd clients only connect to their local member). However, it
399// is still an important safety test to perform, and it also exercies the
400// equivalent behaviour of an etcd client re-connecting for any other reason.
401func TestClientReconnect(t *testing.T) {
402 tc := newTestClient(t)
403 defer tc.close()
404 tc.setEndpoints(0)
405
406 k := "test-client-reconnect"
Serge Bazanski37110c32023-03-01 13:57:27 +0000407 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200408 tc.put(t, k, "one")
409
410 watcher := value.Watch()
411 defer watcher.Close()
412 expect(t, watcher, "one")
413
414 q, cancel := wait(t, watcher)
415 defer cancel()
416
417 cluster.Members[0].Stop(t)
418 defer cluster.Members[0].Restart(t)
419 cluster.WaitLeader(t)
420
421 tc.setEndpoints(1, 2)
422 tc.put(t, k, "two")
423
424 if want, got := "two", <-q; want != got {
425 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
426 }
427}
428
429// TestClientPartition forces a temporary partition of the etcd member while a
430// watcher is running, updates the value from across the partition, and undoes
431// the partition.
432// The partition is expected to be entirely transparent to the watcher.
433func TestClientPartition(t *testing.T) {
434 tcOne := newTestClient(t)
435 defer tcOne.close()
436 tcOne.setEndpoints(0)
437
438 tcRest := newTestClient(t)
439 defer tcRest.close()
440 tcRest.setEndpoints(1, 2)
441
442 k := "test-client-partition"
Serge Bazanski37110c32023-03-01 13:57:27 +0000443 valueOne := NewValue(tcOne.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200444 watcherOne := valueOne.Watch()
445 defer watcherOne.Close()
Serge Bazanski37110c32023-03-01 13:57:27 +0000446 valueRest := NewValue(tcRest.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200447 watcherRest := valueRest.Watch()
448 defer watcherRest.Close()
449
450 tcRest.put(t, k, "a")
451 expect(t, watcherOne, "a")
452 expect(t, watcherRest, "a")
453
454 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
455
456 tcRest.put(t, k, "b")
457 expect(t, watcherRest, "b")
458 expectTimeout(t, watcherOne)
459
460 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
461
462 expect(t, watcherOne, "b")
463 tcRest.put(t, k, "c")
464 expect(t, watcherOne, "c")
465 expect(t, watcherRest, "c")
466
467}
468
469// TestEarlyUse exercises the correct behaviour of the value watcher on a value
470// that is not yet set.
471func TestEarlyUse(t *testing.T) {
472 tc := newTestClient(t)
473 defer tc.close()
474
475 k := "test-early-use"
476
Serge Bazanski37110c32023-03-01 13:57:27 +0000477 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200478 watcher := value.Watch()
479 defer watcher.Close()
480
481 wg := setSetupWg(watcher)
482 wg.Add(1)
483 q, cancel := wait(t, watcher)
484 defer cancel()
485
486 wg.Done()
487
488 tc.put(t, k, "one")
489
490 if want, got := "one", <-q; want != got {
491 t.Fatalf("Expected %q, got %q", want, got)
492 }
493}
494
495// TestRemove exercises the basic functionality of handling deleted values.
496func TestRemove(t *testing.T) {
497 tc := newTestClient(t)
498 defer tc.close()
499
500 k := "test-remove"
501 tc.put(t, k, "one")
502
Serge Bazanski37110c32023-03-01 13:57:27 +0000503 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200504 watcher := value.Watch()
505 defer watcher.Close()
506
507 expect(t, watcher, "one")
508 tc.remove(t, k)
509 expect(t, watcher, "")
510}
511
Serge Bazanski8d45a052021-10-18 17:24:24 +0200512// TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a
513// value is removed.
514func TestRemoveRange(t *testing.T) {
515 tc := newTestClient(t)
516 defer tc.close()
517
518 ks := "test-remove-range/"
519 ke := "test-remove-range0"
Serge Bazanski37110c32023-03-01 13:57:27 +0000520 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200521 tc.put(t, ks+"a", "one")
522 tc.put(t, ks+"b", "two")
523 tc.put(t, ks+"c", "three")
524 tc.put(t, ks+"b", "four")
525 tc.remove(t, ks+"c")
526
527 w := value.Watch()
528 defer w.Close()
529
530 ctx, ctxC := context.WithCancel(context.Background())
531 defer ctxC()
532
533 res := make(map[string]string)
534 stringAtGet(ctx, t, w, res)
535 stringAtGet(ctx, t, w, res)
536
537 for _, te := range []struct {
538 k, w string
539 }{
540 {ks + "a", "one"},
541 {ks + "b", "four"},
542 {ks + "c", ""},
543 } {
544 if want, got := te.w, res[te.k]; want != got {
545 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
546 }
547 }
548}
549
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200550// TestEmptyRace forces the watcher to retrieve an empty value from the K/V
551// store at first, and establishing the watch channel after a new value has
552// been stored in the same place.
553func TestEmptyRace(t *testing.T) {
554 tc := newTestClient(t)
555 defer tc.close()
556
557 k := "test-remove-race"
558 tc.put(t, k, "one")
559 tc.remove(t, k)
560
Serge Bazanski37110c32023-03-01 13:57:27 +0000561 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200562 watcher := value.Watch()
563 defer watcher.Close()
564
565 wg := setRaceWg(watcher)
566 wg.Add(1)
567 q, cancel := wait(t, watcher)
568 defer cancel()
569
570 tc.put(t, k, "two")
571 wg.Done()
572
573 if want, got := "two", <-q; want != got {
574 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
575 }
576}
577
578type errOrInt struct {
579 val int64
580 err error
581}
582
583// TestDecoder exercises the BytesDecoder functionality of the watcher, by
584// creating a value with a decoder that only accepts string-encoded integers
585// that are divisible by three. The test then proceeds to put a handful of
586// values into etcd, ensuring that undecodable values correctly return an error
587// on Get, but that the watcher continues to work after the error has been
588// returned.
589func TestDecoder(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +0000590 decoderDivisibleByThree := func(_, value []byte) (int64, error) {
591 num, err := strconv.ParseInt(string(value), 10, 64)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200592 if err != nil {
Serge Bazanski37110c32023-03-01 13:57:27 +0000593 return 0, fmt.Errorf("not a valid number")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200594 }
595 if (num % 3) != 0 {
Serge Bazanski37110c32023-03-01 13:57:27 +0000596 return 0, fmt.Errorf("not divisible by 3")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200597 }
598 return num, nil
599 }
600
601 tc := newTestClient(t)
602 defer tc.close()
603
604 ctx, ctxC := context.WithCancel(context.Background())
605 defer ctxC()
606
607 k := "test-decoder"
Serge Bazanski37110c32023-03-01 13:57:27 +0000608 value := NewValue(tc.namespaced, k, decoderDivisibleByThree)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200609 watcher := value.Watch()
610 defer watcher.Close()
611 tc.put(t, k, "3")
612 _, err := watcher.Get(ctx)
613 if err != nil {
614 t.Fatalf("Initial Get: %v", err)
615 }
616
617 // Stream updates into arbitrarily-bounded test channel.
618 queue := make(chan errOrInt, 100)
619 go func() {
620 for {
621 res, err := watcher.Get(ctx)
622 if err != nil && errors.Is(err, ctx.Err()) {
623 return
624 }
625 if err != nil {
626 queue <- errOrInt{
627 err: err,
628 }
629 } else {
630 queue <- errOrInt{
Serge Bazanski37110c32023-03-01 13:57:27 +0000631 val: res,
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200632 }
633 }
634 }
635 }()
636
637 var wantList []*int64
638 wantError := func(val string) {
639 wantList = append(wantList, nil)
640 tc.put(t, k, val)
641 }
642 wantValue := func(val string, decoded int64) {
643 wantList = append(wantList, &decoded)
644 tc.put(t, k, val)
645 }
646
647 wantError("")
648 wantValue("9", 9)
649 wantError("foo")
650 wantValue("18", 18)
651 wantError("10")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200652 wantValue("27", 27)
653 wantValue("36", 36)
654
655 for i, want := range wantList {
656 q := <-queue
657 if want == nil && q.err == nil {
Serge Bazanski832bc772022-04-07 12:33:01 +0200658 t.Fatalf("Case %d: wanted error, got no error and value %d", i, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200659 }
660 if want != nil && (*want) != q.val {
Serge Bazanski832bc772022-04-07 12:33:01 +0200661 t.Fatalf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200662 }
663 }
664}
665
666// TestBacklog ensures that the watcher can handle a large backlog of changes
667// in etcd that the client didnt' keep up with, and that whatever final state
668// is available to the client when it actually gets to calling Get().
669func TestBacklog(t *testing.T) {
670 tc := newTestClient(t)
671 defer tc.close()
672
673 k := "test-backlog"
Serge Bazanski37110c32023-03-01 13:57:27 +0000674 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200675 watcher := value.Watch()
676 defer watcher.Close()
677
678 tc.put(t, k, "initial")
679 expect(t, watcher, "initial")
680
681 for i := 0; i < 1000; i++ {
682 tc.put(t, k, fmt.Sprintf("val-%d", i))
683 }
684
685 ctx, ctxC := context.WithTimeout(context.Background(), time.Second)
686 defer ctxC()
687 for {
688 valB, err := watcher.Get(ctx)
689 if err != nil {
690 t.Fatalf("Get() returned error before expected final value: %v", err)
691 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000692 if valB.Value == "val-999" {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200693 break
694 }
695 }
696}
Serge Bazanski8d45a052021-10-18 17:24:24 +0200697
698// TestBacklogRange ensures that the ranged etcd watcher can handle a large
699// backlog of changes in etcd that the client didn't keep up with.
700func TestBacklogRange(t *testing.T) {
701 tc := newTestClient(t)
702 defer tc.close()
703
704 ks := "test-backlog-range/"
705 ke := "test-backlog-range0"
Serge Bazanski37110c32023-03-01 13:57:27 +0000706 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200707 w := value.Watch()
708 defer w.Close()
709
710 for i := 0; i < 100; i++ {
711 if i%2 == 0 {
712 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
713 } else {
714 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
715 }
716 }
717
718 ctx, ctxC := context.WithCancel(context.Background())
719 defer ctxC()
720
721 res := make(map[string]string)
722 stringAtGet(ctx, t, w, res)
723 stringAtGet(ctx, t, w, res)
724
725 for _, te := range []struct {
726 k, w string
727 }{
728 {ks + "a", "val-98"},
729 {ks + "b", "val-99"},
730 } {
731 if want, got := te.w, res[te.k]; want != got {
732 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
733 }
734 }
735}
736
737// TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers,
738// which effectively makes any Get operation non-blocking (but also showcases
739// that unless a Get without BacklogOnly is issues, no new data will appear by
740// itself in the watcher - which is an undocumented implementation detail of the
741// option).
742func TestBacklogOnly(t *testing.T) {
743 tc := newTestClient(t)
744 defer tc.close()
745 ctx, ctxC := context.WithCancel(context.Background())
746 defer ctxC()
747
748 k := "test-backlog-only"
749 tc.put(t, k, "initial")
750
Serge Bazanski37110c32023-03-01 13:57:27 +0000751 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanski8d45a052021-10-18 17:24:24 +0200752 watcher := value.Watch()
753 defer watcher.Close()
754
Serge Bazanski37110c32023-03-01 13:57:27 +0000755 d, err := watcher.Get(ctx, event.BacklogOnly[StringAt]())
Serge Bazanski8d45a052021-10-18 17:24:24 +0200756 if err != nil {
757 t.Fatalf("First Get failed: %v", err)
758 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000759 if want, got := "initial", d.Value; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200760 t.Fatalf("First Get: wanted value %q, got %q", want, got)
761 }
762
763 // As expected, next call to Get with BacklogOnly fails - there truly is no new
764 // updates to emit.
Serge Bazanski37110c32023-03-01 13:57:27 +0000765 _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
766 if want, got := event.BacklogDone, err; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200767 t.Fatalf("Second Get: wanted %v, got %v", want, got)
768 }
769
770 // Implementation detail: even though there is a new value ('second'),
771 // BacklogOnly will still return BacklogDone.
772 tc.put(t, k, "second")
Serge Bazanski37110c32023-03-01 13:57:27 +0000773 _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
774 if want, got := event.BacklogDone, err; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200775 t.Fatalf("Third Get: wanted %v, got %v", want, got)
776 }
777
778 // ... However, a Get without BacklogOnly will return the new value.
779 d, err = watcher.Get(ctx)
780 if err != nil {
781 t.Fatalf("Fourth Get failed: %v", err)
782 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000783 if want, got := "second", d.Value; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200784 t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
785 }
786}
787
788// TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers,
789// showcasing how it expected to be used for keeping up with the external state
790// of a range by synchronizing to a local map.
791func TestBacklogOnlyRange(t *testing.T) {
792 tc := newTestClient(t)
793 defer tc.close()
794 ctx, ctxC := context.WithCancel(context.Background())
795 defer ctxC()
796
797 ks := "test-backlog-only-range/"
798 ke := "test-backlog-only-range0"
799
800 for i := 0; i < 100; i++ {
801 if i%2 == 0 {
802 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
803 } else {
804 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
805 }
806 }
807
Serge Bazanski37110c32023-03-01 13:57:27 +0000808 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200809 w := value.Watch()
810 defer w.Close()
811
812 // Collect results into a map from key to value.
813 res := make(map[string]string)
814
815 // Run first Get - this is the barrier defining what's part of the backlog.
Serge Bazanski37110c32023-03-01 13:57:27 +0000816 g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
Serge Bazanski8d45a052021-10-18 17:24:24 +0200817 if err != nil {
818 t.Fatalf("Get: %v", err)
819 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000820 res[g.Key] = g.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200821
822 // These won't be part of the backlog.
823 tc.put(t, ks+"a", fmt.Sprintf("val-100"))
824 tc.put(t, ks+"b", fmt.Sprintf("val-101"))
825
826 // Retrieve the rest of the backlog until BacklogDone is returned.
827 nUpdates := 1
828 for {
Serge Bazanski37110c32023-03-01 13:57:27 +0000829 g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
830 if err == event.BacklogDone {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200831 break
832 }
833 if err != nil {
834 t.Fatalf("Get: %v", err)
835 }
836 nUpdates += 1
Serge Bazanski37110c32023-03-01 13:57:27 +0000837 res[g.Key] = g.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200838 }
839
840 // The backlog should've been compacted to just two entries at their newest
841 // state.
842 if want, got := 2, nUpdates; want != got {
843 t.Fatalf("wanted backlog in %d updates, got it in %d", want, got)
844 }
845
846 for _, te := range []struct {
847 k, w string
848 }{
849 {ks + "a", "val-98"},
850 {ks + "b", "val-99"},
851 } {
852 if want, got := te.w, res[te.k]; want != got {
853 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
854 }
855 }
856}