blob: f1eb5ed3425fd3d9c4dde9bd57b10a9a3c464431 [file] [log] [blame]
Serge Bazanskic89df2f2021-04-27 15:51:37 +02001package etcd
2
3import (
4 "context"
5 "errors"
Lorenz Brund13c1c62022-03-30 19:58:58 +02006 "flag"
Serge Bazanskic89df2f2021-04-27 15:51:37 +02007 "fmt"
8 "log"
9 "os"
10 "strconv"
11 "sync"
12 "testing"
13 "time"
14
Lorenz Brund13c1c62022-03-30 19:58:58 +020015 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
16 "go.etcd.io/etcd/client/pkg/v3/testutil"
17 clientv3 "go.etcd.io/etcd/client/v3"
18 "go.etcd.io/etcd/tests/v3/integration"
Serge Bazanski98e05e12023-04-05 12:44:14 +020019 "go.uber.org/zap"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020020 "google.golang.org/grpc/codes"
Serge Bazanski98e05e12023-04-05 12:44:14 +020021 "google.golang.org/grpc/grpclog"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020022
23 "source.monogon.dev/metropolis/node/core/consensus/client"
24 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanski98e05e12023-04-05 12:44:14 +020025 "source.monogon.dev/metropolis/pkg/logtree"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020026)
27
28var (
29 cluster *integration.ClusterV3
30 endpoints []string
31)
32
33// TestMain brings up a 3 node etcd cluster for tests to use.
34func TestMain(m *testing.M) {
Serge Bazanski98e05e12023-04-05 12:44:14 +020035 // This logtree's data is not output anywhere.
36 lt := logtree.New()
37
Serge Bazanskic89df2f2021-04-27 15:51:37 +020038 cfg := integration.ClusterConfig{
39 Size: 3,
40 GRPCKeepAliveMinTime: time.Millisecond,
Serge Bazanski98e05e12023-04-05 12:44:14 +020041 LoggerBuilder: func(memberName string) *zap.Logger {
42 dn := logtree.DN("etcd." + memberName)
43 return logtree.Zapify(lt.MustLeveledFor(dn), zap.WarnLevel)
44 },
Serge Bazanskic89df2f2021-04-27 15:51:37 +020045 }
Lorenz Brund13c1c62022-03-30 19:58:58 +020046 tb, cancel := testutil.NewTestingTBProthesis("curator")
47 defer cancel()
48 flag.Parse()
Serge Bazanskidefff522022-05-16 17:28:16 +020049 integration.BeforeTestExternal(tb)
Serge Bazanski98e05e12023-04-05 12:44:14 +020050 grpclog.SetLoggerV2(logtree.GRPCify(lt.MustLeveledFor("grpc")))
Lorenz Brund13c1c62022-03-30 19:58:58 +020051 cluster = integration.NewClusterV3(tb, &cfg)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020052 endpoints = make([]string, 3)
53 for i := range endpoints {
54 endpoints[i] = cluster.Client(i).Endpoints()[0]
55 }
56
57 v := m.Run()
Lorenz Brund13c1c62022-03-30 19:58:58 +020058 cluster.Terminate(tb)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020059 os.Exit(v)
60}
61
62// setRaceWg creates a new WaitGroup and sets the given watcher to wait on this
63// WG after it performs the initial retrieval of a value from etcd, but before
64// it starts the watcher. This is used to test potential race conditions
65// present between these two steps.
Serge Bazanski37110c32023-03-01 13:57:27 +000066func setRaceWg[T any](w event.Watcher[T]) *sync.WaitGroup {
Serge Bazanskic89df2f2021-04-27 15:51:37 +020067 wg := sync.WaitGroup{}
Serge Bazanski37110c32023-03-01 13:57:27 +000068 w.(*watcher[T]).testRaceWG = &wg
Serge Bazanskic89df2f2021-04-27 15:51:37 +020069 return &wg
70}
71
72// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
73// thie WG after an etcd watch channel is created. This is used in tests to
74// ensure that the watcher is fully created before it is tested.
Serge Bazanski37110c32023-03-01 13:57:27 +000075func setSetupWg[T any](w event.Watcher[T]) *sync.WaitGroup {
Serge Bazanskic89df2f2021-04-27 15:51:37 +020076 wg := sync.WaitGroup{}
Serge Bazanski37110c32023-03-01 13:57:27 +000077 w.(*watcher[T]).testSetupWG = &wg
Serge Bazanskic89df2f2021-04-27 15:51:37 +020078 return &wg
79}
80
81// testClient is an etcd connection to the test cluster.
82type testClient struct {
83 client *clientv3.Client
84 namespaced client.Namespaced
85}
86
87func newTestClient(t *testing.T) *testClient {
88 t.Helper()
89 cli, err := clientv3.New(clientv3.Config{
90 Endpoints: endpoints,
91 DialTimeout: 1 * time.Second,
92 DialKeepAliveTime: 1 * time.Second,
93 DialKeepAliveTimeout: 1 * time.Second,
94 })
95 if err != nil {
96 t.Fatalf("clientv3.New: %v", err)
97 }
98
99 namespaced := client.NewLocal(cli)
100 return &testClient{
101 client: cli,
102 namespaced: namespaced,
103 }
104}
105
106func (d *testClient) close() {
107 d.client.Close()
108}
109
110// setEndpoints configures which endpoints (from {0,1,2}) the testClient is
111// connected to.
112func (d *testClient) setEndpoints(nums ...uint) {
113 var eps []string
114 for _, num := range nums {
115 eps = append(eps, endpoints[num])
116 }
117 d.client.SetEndpoints(eps...)
118}
119
120// put uses the testClient to store key with a given string value in etcd. It
121// contains retry logic that will block until the put is successful.
122func (d *testClient) put(t *testing.T, key, value string) {
123 t.Helper()
124 ctx, ctxC := context.WithCancel(context.Background())
125 defer ctxC()
126
127 for {
128 ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond)
129 _, err := d.namespaced.Put(ctxT, key, value)
130 ctxC()
131 if err == nil {
132 return
133 }
Tim Windelschmidtd5f851b2024-04-23 14:59:37 +0200134 if errors.Is(err, ctxT.Err()) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200135 log.Printf("Retrying after %v", err)
136 continue
137 }
138 // Retry on etcd unavailability - this will happen in this code as the
139 // etcd cluster repeatedly loses quorum.
140 var eerr rpctypes.EtcdError
141 if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable {
142 log.Printf("Retrying after %v", err)
143 continue
144 }
145 t.Fatalf("Put: %v", err)
146 }
147
148}
149
150// remove uses the testClient to remove the given key from etcd. It contains
151// retry logic that will block until the removal is successful.
152func (d *testClient) remove(t *testing.T, key string) {
153 t.Helper()
154 ctx, ctxC := context.WithCancel(context.Background())
155 defer ctxC()
156
157 _, err := d.namespaced.Delete(ctx, key)
158 if err == nil {
159 return
160 }
161 t.Fatalf("Delete: %v", err)
162}
163
164// expect runs a Get on the given Watcher, ensuring the returned value is a
165// given string.
Serge Bazanski37110c32023-03-01 13:57:27 +0000166func expect(t *testing.T, w event.Watcher[StringAt], value string) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200167 t.Helper()
168 ctx, ctxC := context.WithCancel(context.Background())
169 defer ctxC()
170
171 got, err := w.Get(ctx)
172 if err != nil {
173 t.Fatalf("Get: %v", err)
174 }
175
Serge Bazanski37110c32023-03-01 13:57:27 +0000176 if got, want := got.Value, value; got != want {
Serge Bazanskibbb873d2022-06-24 14:22:39 +0200177 t.Errorf("Wanted value %q, got %q", want, got)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200178 }
179}
180
181// expectTimeout ensures that the given watcher blocks on a Get call for at
182// least 100 milliseconds. This is used by tests to attempt to verify that the
183// watcher Get is fully blocked, but can cause false positives (eg. when Get
184// blocks for 101 milliseconds). Thus, this function should be used sparingly
185// and in tests that perform other baseline behaviour checks alongside this
186// test.
Serge Bazanski37110c32023-03-01 13:57:27 +0000187func expectTimeout[T any](t *testing.T, w event.Watcher[T]) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200188 t.Helper()
189 ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
190 got, err := w.Get(ctx)
191 ctxC()
192
193 if !errors.Is(err, ctx.Err()) {
194 t.Fatalf("Expected timeout error, got %v, %v", got, err)
195 }
196}
197
198// wait wraps a watcher into a channel of strings, ensuring that the watcher
199// never errors on Get calls and always returns strings.
Serge Bazanski37110c32023-03-01 13:57:27 +0000200func wait(t *testing.T, w event.Watcher[StringAt]) (chan string, func()) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200201 t.Helper()
202 ctx, ctxC := context.WithCancel(context.Background())
203
204 c := make(chan string)
205
206 go func() {
207 for {
208 got, err := w.Get(ctx)
209 if err != nil && errors.Is(err, ctx.Err()) {
210 return
211 }
212 if err != nil {
Tim Windelschmidt88049722024-04-11 23:09:23 +0200213 t.Errorf("Get: %v", err)
214 close(c)
215 return
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200216 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000217 c <- got.Value
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200218 }
219 }()
220
221 return c, ctxC
222}
223
224// TestSimple exercises the simplest possible interaction with a watched value.
225func TestSimple(t *testing.T) {
226 tc := newTestClient(t)
227 defer tc.close()
228
229 k := "test-simple"
Serge Bazanski37110c32023-03-01 13:57:27 +0000230 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200231 tc.put(t, k, "one")
232
233 watcher := value.Watch()
234 defer watcher.Close()
235 expect(t, watcher, "one")
236
237 tc.put(t, k, "two")
238 expect(t, watcher, "two")
239
240 tc.put(t, k, "three")
241 tc.put(t, k, "four")
242 tc.put(t, k, "five")
243 tc.put(t, k, "six")
244
245 q, cancel := wait(t, watcher)
246 // Test will hang here if the above value does not receive the set "six".
Serge Bazanski37110c32023-03-01 13:57:27 +0000247 log.Printf("a")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200248 for el := range q {
Serge Bazanski37110c32023-03-01 13:57:27 +0000249 log.Printf("%q", el)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200250 if el == "six" {
251 break
252 }
253 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000254 log.Printf("b")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200255 cancel()
256}
257
Serge Bazanski8d45a052021-10-18 17:24:24 +0200258// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
259// the given map with the retrieved value.
Serge Bazanski37110c32023-03-01 13:57:27 +0000260func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher[StringAt], m map[string]string) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200261 t.Helper()
262
263 vr, err := w.Get(ctx)
264 if err != nil {
265 t.Fatalf("Get: %v", err)
266 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000267 m[vr.Key] = vr.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200268}
269
270// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
271// retrieving updaates via Get in a fully blocking fashion.
272func TestSimpleRange(t *testing.T) {
273 tc := newTestClient(t)
274 defer tc.close()
275
276 ks := "test-simple-range/"
277 ke := "test-simple-range0"
Serge Bazanski37110c32023-03-01 13:57:27 +0000278 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200279 tc.put(t, ks+"a", "one")
280 tc.put(t, ks+"b", "two")
281 tc.put(t, ks+"c", "three")
282 tc.put(t, ks+"b", "four")
283
284 w := value.Watch()
285 defer w.Close()
286
287 ctx, ctxC := context.WithCancel(context.Background())
288 defer ctxC()
289
290 res := make(map[string]string)
291 stringAtGet(ctx, t, w, res)
292 stringAtGet(ctx, t, w, res)
293 stringAtGet(ctx, t, w, res)
294
295 tc.put(t, ks+"a", "five")
296 tc.put(t, ks+"e", "six")
297
298 stringAtGet(ctx, t, w, res)
299 stringAtGet(ctx, t, w, res)
300
301 for _, te := range []struct {
302 k, w string
303 }{
304 {ks + "a", "five"},
305 {ks + "b", "four"},
306 {ks + "c", "three"},
307 {ks + "e", "six"},
308 } {
309 if want, got := te.w, res[te.k]; want != got {
310 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
311 }
312 }
313}
314
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200315// TestCancel ensures that watchers can resume after being canceled.
316func TestCancel(t *testing.T) {
317 tc := newTestClient(t)
318 defer tc.close()
319
320 k := "test-cancel"
Serge Bazanski37110c32023-03-01 13:57:27 +0000321 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200322 tc.put(t, k, "one")
323
324 watcher := value.Watch()
325 defer watcher.Close()
326 expect(t, watcher, "one")
327
328 ctx, ctxC := context.WithCancel(context.Background())
329 errs := make(chan error, 1)
330 go func() {
331 _, err := watcher.Get(ctx)
332 errs <- err
333 }()
334 ctxC()
335 if want, got := ctx.Err(), <-errs; !errors.Is(got, want) {
336 t.Fatalf("Wanted err %v, got %v", want, got)
337 }
338
339 // Successfully canceled watch, resuming should continue to work.
340 q, cancel := wait(t, watcher)
341 defer cancel()
342
343 tc.put(t, k, "two")
344 if want, got := "two", <-q; want != got {
345 t.Fatalf("Wanted val %q, got %q", want, got)
346 }
347}
348
349// TestCancelOnGet ensures that a context cancellation on an initial Get (which
350// translates to an etcd Get in a backoff loop) doesn't block.
351func TestCancelOnGet(t *testing.T) {
352 tc := newTestClient(t)
353 defer tc.close()
354
355 k := "test-cancel-on-get"
Serge Bazanski37110c32023-03-01 13:57:27 +0000356 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200357 watcher := value.Watch()
358 tc.put(t, k, "one")
359
360 // Cause partition between client endpoint and rest of cluster. Any read/write
361 // operations will now hang.
362 tc.setEndpoints(0)
363 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
Serge Bazanskiad10ece2022-05-17 11:20:17 +0200364 // Let raft timeouts expire so that the leader is aware a partition has occurred
365 // and stops serving data if it is not part of a quorum anymore.
366 //
367 // Otherwise, if Member[0] was the leader, there will be a window of opportunity
368 // during which it will continue to serve read data even though it has been
369 // partitioned off. This is an effect of how etcd handles linearizable reads:
370 // they go through the leader, but do not go through raft.
371 //
372 // The value is the default etcd leader timeout (1s) + some wiggle room.
373 time.Sleep(time.Second + time.Millisecond*100)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200374
375 // Perform the initial Get(), which should attempt to retrieve a KV entry from
376 // the etcd service. This should hang. Unfortunately, there's no easy way to do
377 // this without an arbitrary sleep hoping that the client actually gets to the
378 // underlying etcd.Get call. This can cause false positives (eg. false 'pass'
379 // results) in this test.
380 ctx, ctxC := context.WithCancel(context.Background())
381 errs := make(chan error, 1)
382 go func() {
383 _, err := watcher.Get(ctx)
384 errs <- err
385 }()
386 time.Sleep(time.Second)
387
388 // Now that the etcd.Get is hanging, cancel the context.
389 ctxC()
390 // And now unpartition the cluster, resuming reads.
391 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
392
393 // The etcd.Get() call should've returned with a context cancellation.
394 err := <-errs
395 switch {
396 case err == nil:
397 t.Errorf("watcher.Get() returned no error, wanted context error")
398 case errors.Is(err, ctx.Err()):
399 // Okay.
400 default:
401 t.Errorf("watcher.Get() returned %v, wanted context error", err)
402 }
403}
404
405// TestClientReconnect forces a 'reconnection' of an active watcher from a
406// running member to another member, by stopping the original member and
407// explicitly reconnecting the client to other available members.
408//
409// This doe not reflect a situation expected during Metropolis runtime, as we
410// do not expect splits between an etcd client and its connected member
411// (instead, all etcd clients only connect to their local member). However, it
412// is still an important safety test to perform, and it also exercies the
413// equivalent behaviour of an etcd client re-connecting for any other reason.
414func TestClientReconnect(t *testing.T) {
415 tc := newTestClient(t)
416 defer tc.close()
417 tc.setEndpoints(0)
418
419 k := "test-client-reconnect"
Serge Bazanski37110c32023-03-01 13:57:27 +0000420 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200421 tc.put(t, k, "one")
422
423 watcher := value.Watch()
424 defer watcher.Close()
425 expect(t, watcher, "one")
426
427 q, cancel := wait(t, watcher)
428 defer cancel()
429
430 cluster.Members[0].Stop(t)
431 defer cluster.Members[0].Restart(t)
432 cluster.WaitLeader(t)
433
434 tc.setEndpoints(1, 2)
435 tc.put(t, k, "two")
436
437 if want, got := "two", <-q; want != got {
438 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
439 }
440}
441
442// TestClientPartition forces a temporary partition of the etcd member while a
443// watcher is running, updates the value from across the partition, and undoes
444// the partition.
445// The partition is expected to be entirely transparent to the watcher.
446func TestClientPartition(t *testing.T) {
447 tcOne := newTestClient(t)
448 defer tcOne.close()
449 tcOne.setEndpoints(0)
450
451 tcRest := newTestClient(t)
452 defer tcRest.close()
453 tcRest.setEndpoints(1, 2)
454
455 k := "test-client-partition"
Serge Bazanski37110c32023-03-01 13:57:27 +0000456 valueOne := NewValue(tcOne.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200457 watcherOne := valueOne.Watch()
458 defer watcherOne.Close()
Serge Bazanski37110c32023-03-01 13:57:27 +0000459 valueRest := NewValue(tcRest.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200460 watcherRest := valueRest.Watch()
461 defer watcherRest.Close()
462
463 tcRest.put(t, k, "a")
464 expect(t, watcherOne, "a")
465 expect(t, watcherRest, "a")
466
467 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
468
469 tcRest.put(t, k, "b")
470 expect(t, watcherRest, "b")
471 expectTimeout(t, watcherOne)
472
473 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
474
475 expect(t, watcherOne, "b")
476 tcRest.put(t, k, "c")
477 expect(t, watcherOne, "c")
478 expect(t, watcherRest, "c")
479
480}
481
482// TestEarlyUse exercises the correct behaviour of the value watcher on a value
483// that is not yet set.
484func TestEarlyUse(t *testing.T) {
485 tc := newTestClient(t)
486 defer tc.close()
487
488 k := "test-early-use"
489
Serge Bazanski37110c32023-03-01 13:57:27 +0000490 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200491 watcher := value.Watch()
492 defer watcher.Close()
493
494 wg := setSetupWg(watcher)
495 wg.Add(1)
496 q, cancel := wait(t, watcher)
497 defer cancel()
498
499 wg.Done()
500
501 tc.put(t, k, "one")
502
503 if want, got := "one", <-q; want != got {
504 t.Fatalf("Expected %q, got %q", want, got)
505 }
506}
507
508// TestRemove exercises the basic functionality of handling deleted values.
509func TestRemove(t *testing.T) {
510 tc := newTestClient(t)
511 defer tc.close()
512
513 k := "test-remove"
514 tc.put(t, k, "one")
515
Serge Bazanski37110c32023-03-01 13:57:27 +0000516 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200517 watcher := value.Watch()
518 defer watcher.Close()
519
520 expect(t, watcher, "one")
521 tc.remove(t, k)
522 expect(t, watcher, "")
523}
524
Serge Bazanski8d45a052021-10-18 17:24:24 +0200525// TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a
526// value is removed.
527func TestRemoveRange(t *testing.T) {
528 tc := newTestClient(t)
529 defer tc.close()
530
531 ks := "test-remove-range/"
532 ke := "test-remove-range0"
Serge Bazanski37110c32023-03-01 13:57:27 +0000533 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200534 tc.put(t, ks+"a", "one")
535 tc.put(t, ks+"b", "two")
536 tc.put(t, ks+"c", "three")
537 tc.put(t, ks+"b", "four")
538 tc.remove(t, ks+"c")
539
540 w := value.Watch()
541 defer w.Close()
542
543 ctx, ctxC := context.WithCancel(context.Background())
544 defer ctxC()
545
546 res := make(map[string]string)
547 stringAtGet(ctx, t, w, res)
548 stringAtGet(ctx, t, w, res)
549
550 for _, te := range []struct {
551 k, w string
552 }{
553 {ks + "a", "one"},
554 {ks + "b", "four"},
555 {ks + "c", ""},
556 } {
557 if want, got := te.w, res[te.k]; want != got {
558 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
559 }
560 }
561}
562
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200563// TestEmptyRace forces the watcher to retrieve an empty value from the K/V
564// store at first, and establishing the watch channel after a new value has
565// been stored in the same place.
566func TestEmptyRace(t *testing.T) {
567 tc := newTestClient(t)
568 defer tc.close()
569
570 k := "test-remove-race"
571 tc.put(t, k, "one")
572 tc.remove(t, k)
573
Serge Bazanski37110c32023-03-01 13:57:27 +0000574 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200575 watcher := value.Watch()
576 defer watcher.Close()
577
578 wg := setRaceWg(watcher)
579 wg.Add(1)
580 q, cancel := wait(t, watcher)
581 defer cancel()
582
583 tc.put(t, k, "two")
584 wg.Done()
585
586 if want, got := "two", <-q; want != got {
587 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
588 }
589}
590
591type errOrInt struct {
592 val int64
593 err error
594}
595
596// TestDecoder exercises the BytesDecoder functionality of the watcher, by
597// creating a value with a decoder that only accepts string-encoded integers
598// that are divisible by three. The test then proceeds to put a handful of
599// values into etcd, ensuring that undecodable values correctly return an error
600// on Get, but that the watcher continues to work after the error has been
601// returned.
602func TestDecoder(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +0000603 decoderDivisibleByThree := func(_, value []byte) (int64, error) {
604 num, err := strconv.ParseInt(string(value), 10, 64)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200605 if err != nil {
Serge Bazanski37110c32023-03-01 13:57:27 +0000606 return 0, fmt.Errorf("not a valid number")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200607 }
608 if (num % 3) != 0 {
Serge Bazanski37110c32023-03-01 13:57:27 +0000609 return 0, fmt.Errorf("not divisible by 3")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200610 }
611 return num, nil
612 }
613
614 tc := newTestClient(t)
615 defer tc.close()
616
617 ctx, ctxC := context.WithCancel(context.Background())
618 defer ctxC()
619
620 k := "test-decoder"
Serge Bazanski37110c32023-03-01 13:57:27 +0000621 value := NewValue(tc.namespaced, k, decoderDivisibleByThree)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200622 watcher := value.Watch()
623 defer watcher.Close()
624 tc.put(t, k, "3")
625 _, err := watcher.Get(ctx)
626 if err != nil {
627 t.Fatalf("Initial Get: %v", err)
628 }
629
630 // Stream updates into arbitrarily-bounded test channel.
631 queue := make(chan errOrInt, 100)
632 go func() {
633 for {
634 res, err := watcher.Get(ctx)
635 if err != nil && errors.Is(err, ctx.Err()) {
636 return
637 }
638 if err != nil {
639 queue <- errOrInt{
640 err: err,
641 }
642 } else {
643 queue <- errOrInt{
Serge Bazanski37110c32023-03-01 13:57:27 +0000644 val: res,
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200645 }
646 }
647 }
648 }()
649
650 var wantList []*int64
651 wantError := func(val string) {
652 wantList = append(wantList, nil)
653 tc.put(t, k, val)
654 }
655 wantValue := func(val string, decoded int64) {
656 wantList = append(wantList, &decoded)
657 tc.put(t, k, val)
658 }
659
660 wantError("")
661 wantValue("9", 9)
662 wantError("foo")
663 wantValue("18", 18)
664 wantError("10")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200665 wantValue("27", 27)
666 wantValue("36", 36)
667
668 for i, want := range wantList {
669 q := <-queue
670 if want == nil && q.err == nil {
Serge Bazanski832bc772022-04-07 12:33:01 +0200671 t.Fatalf("Case %d: wanted error, got no error and value %d", i, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200672 }
673 if want != nil && (*want) != q.val {
Serge Bazanski832bc772022-04-07 12:33:01 +0200674 t.Fatalf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200675 }
676 }
677}
678
679// TestBacklog ensures that the watcher can handle a large backlog of changes
680// in etcd that the client didnt' keep up with, and that whatever final state
681// is available to the client when it actually gets to calling Get().
682func TestBacklog(t *testing.T) {
683 tc := newTestClient(t)
684 defer tc.close()
685
686 k := "test-backlog"
Serge Bazanski37110c32023-03-01 13:57:27 +0000687 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200688 watcher := value.Watch()
689 defer watcher.Close()
690
691 tc.put(t, k, "initial")
692 expect(t, watcher, "initial")
693
694 for i := 0; i < 1000; i++ {
695 tc.put(t, k, fmt.Sprintf("val-%d", i))
696 }
697
698 ctx, ctxC := context.WithTimeout(context.Background(), time.Second)
699 defer ctxC()
700 for {
701 valB, err := watcher.Get(ctx)
702 if err != nil {
703 t.Fatalf("Get() returned error before expected final value: %v", err)
704 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000705 if valB.Value == "val-999" {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200706 break
707 }
708 }
709}
Serge Bazanski8d45a052021-10-18 17:24:24 +0200710
711// TestBacklogRange ensures that the ranged etcd watcher can handle a large
712// backlog of changes in etcd that the client didn't keep up with.
713func TestBacklogRange(t *testing.T) {
714 tc := newTestClient(t)
715 defer tc.close()
716
717 ks := "test-backlog-range/"
718 ke := "test-backlog-range0"
Serge Bazanski37110c32023-03-01 13:57:27 +0000719 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200720 w := value.Watch()
721 defer w.Close()
722
723 for i := 0; i < 100; i++ {
724 if i%2 == 0 {
725 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
726 } else {
727 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
728 }
729 }
730
731 ctx, ctxC := context.WithCancel(context.Background())
732 defer ctxC()
733
734 res := make(map[string]string)
735 stringAtGet(ctx, t, w, res)
736 stringAtGet(ctx, t, w, res)
737
738 for _, te := range []struct {
739 k, w string
740 }{
741 {ks + "a", "val-98"},
742 {ks + "b", "val-99"},
743 } {
744 if want, got := te.w, res[te.k]; want != got {
745 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
746 }
747 }
748}
749
750// TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers,
751// which effectively makes any Get operation non-blocking (but also showcases
752// that unless a Get without BacklogOnly is issues, no new data will appear by
753// itself in the watcher - which is an undocumented implementation detail of the
754// option).
755func TestBacklogOnly(t *testing.T) {
756 tc := newTestClient(t)
757 defer tc.close()
758 ctx, ctxC := context.WithCancel(context.Background())
759 defer ctxC()
760
761 k := "test-backlog-only"
762 tc.put(t, k, "initial")
763
Serge Bazanski37110c32023-03-01 13:57:27 +0000764 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanski8d45a052021-10-18 17:24:24 +0200765 watcher := value.Watch()
766 defer watcher.Close()
767
Serge Bazanski37110c32023-03-01 13:57:27 +0000768 d, err := watcher.Get(ctx, event.BacklogOnly[StringAt]())
Serge Bazanski8d45a052021-10-18 17:24:24 +0200769 if err != nil {
770 t.Fatalf("First Get failed: %v", err)
771 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000772 if want, got := "initial", d.Value; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200773 t.Fatalf("First Get: wanted value %q, got %q", want, got)
774 }
775
776 // As expected, next call to Get with BacklogOnly fails - there truly is no new
777 // updates to emit.
Serge Bazanski37110c32023-03-01 13:57:27 +0000778 _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
Tim Windelschmidt513df182024-04-18 23:44:50 +0200779 if want, got := event.ErrBacklogDone, err; !errors.Is(got, want) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200780 t.Fatalf("Second Get: wanted %v, got %v", want, got)
781 }
782
783 // Implementation detail: even though there is a new value ('second'),
Tim Windelschmidt513df182024-04-18 23:44:50 +0200784 // BacklogOnly will still return ErrBacklogDone.
Serge Bazanski8d45a052021-10-18 17:24:24 +0200785 tc.put(t, k, "second")
Serge Bazanski37110c32023-03-01 13:57:27 +0000786 _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
Tim Windelschmidt513df182024-04-18 23:44:50 +0200787 if want, got := event.ErrBacklogDone, err; !errors.Is(got, want) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200788 t.Fatalf("Third Get: wanted %v, got %v", want, got)
789 }
790
791 // ... However, a Get without BacklogOnly will return the new value.
792 d, err = watcher.Get(ctx)
793 if err != nil {
794 t.Fatalf("Fourth Get failed: %v", err)
795 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000796 if want, got := "second", d.Value; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200797 t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
798 }
799}
800
801// TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers,
802// showcasing how it expected to be used for keeping up with the external state
803// of a range by synchronizing to a local map.
804func TestBacklogOnlyRange(t *testing.T) {
805 tc := newTestClient(t)
806 defer tc.close()
807 ctx, ctxC := context.WithCancel(context.Background())
808 defer ctxC()
809
810 ks := "test-backlog-only-range/"
811 ke := "test-backlog-only-range0"
812
813 for i := 0; i < 100; i++ {
814 if i%2 == 0 {
815 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
816 } else {
817 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
818 }
819 }
820
Serge Bazanski37110c32023-03-01 13:57:27 +0000821 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200822 w := value.Watch()
823 defer w.Close()
824
825 // Collect results into a map from key to value.
826 res := make(map[string]string)
827
828 // Run first Get - this is the barrier defining what's part of the backlog.
Serge Bazanski37110c32023-03-01 13:57:27 +0000829 g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
Serge Bazanski8d45a052021-10-18 17:24:24 +0200830 if err != nil {
831 t.Fatalf("Get: %v", err)
832 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000833 res[g.Key] = g.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200834
835 // These won't be part of the backlog.
Tim Windelschmidt92316fd2024-04-18 23:06:40 +0200836 tc.put(t, ks+"a", "val-100")
837 tc.put(t, ks+"b", "val-101")
Serge Bazanski8d45a052021-10-18 17:24:24 +0200838
Tim Windelschmidt513df182024-04-18 23:44:50 +0200839 // Retrieve the rest of the backlog until ErrBacklogDone is returned.
Serge Bazanski8d45a052021-10-18 17:24:24 +0200840 nUpdates := 1
841 for {
Serge Bazanski37110c32023-03-01 13:57:27 +0000842 g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
Tim Windelschmidt513df182024-04-18 23:44:50 +0200843 if errors.Is(err, event.ErrBacklogDone) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200844 break
845 }
846 if err != nil {
847 t.Fatalf("Get: %v", err)
848 }
849 nUpdates += 1
Serge Bazanski37110c32023-03-01 13:57:27 +0000850 res[g.Key] = g.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200851 }
852
853 // The backlog should've been compacted to just two entries at their newest
854 // state.
855 if want, got := 2, nUpdates; want != got {
856 t.Fatalf("wanted backlog in %d updates, got it in %d", want, got)
857 }
858
859 for _, te := range []struct {
860 k, w string
861 }{
862 {ks + "a", "val-98"},
863 {ks + "b", "val-99"},
864 } {
865 if want, got := te.w, res[te.k]; want != got {
866 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
867 }
868 }
869}