blob: e7e12276140853818b0e5c1b5730083d97130f0e [file] [log] [blame]
Serge Bazanskic89df2f2021-04-27 15:51:37 +02001package etcd
2
3import (
4 "context"
5 "errors"
Lorenz Brund13c1c62022-03-30 19:58:58 +02006 "flag"
Serge Bazanskic89df2f2021-04-27 15:51:37 +02007 "fmt"
8 "log"
9 "os"
10 "strconv"
11 "sync"
12 "testing"
13 "time"
14
Lorenz Brund13c1c62022-03-30 19:58:58 +020015 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
16 "go.etcd.io/etcd/client/pkg/v3/testutil"
17 clientv3 "go.etcd.io/etcd/client/v3"
18 "go.etcd.io/etcd/tests/v3/integration"
Serge Bazanski98e05e12023-04-05 12:44:14 +020019 "go.uber.org/zap"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020020 "google.golang.org/grpc/codes"
Serge Bazanski98e05e12023-04-05 12:44:14 +020021 "google.golang.org/grpc/grpclog"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020022
23 "source.monogon.dev/metropolis/node/core/consensus/client"
24 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanski98e05e12023-04-05 12:44:14 +020025 "source.monogon.dev/metropolis/pkg/logtree"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020026)
27
28var (
29 cluster *integration.ClusterV3
30 endpoints []string
31)
32
33// TestMain brings up a 3 node etcd cluster for tests to use.
34func TestMain(m *testing.M) {
Serge Bazanski98e05e12023-04-05 12:44:14 +020035 // This logtree's data is not output anywhere.
36 lt := logtree.New()
37
Serge Bazanskic89df2f2021-04-27 15:51:37 +020038 cfg := integration.ClusterConfig{
39 Size: 3,
40 GRPCKeepAliveMinTime: time.Millisecond,
Serge Bazanski98e05e12023-04-05 12:44:14 +020041 LoggerBuilder: func(memberName string) *zap.Logger {
42 dn := logtree.DN("etcd." + memberName)
43 return logtree.Zapify(lt.MustLeveledFor(dn), zap.WarnLevel)
44 },
Serge Bazanskic89df2f2021-04-27 15:51:37 +020045 }
Lorenz Brund13c1c62022-03-30 19:58:58 +020046 tb, cancel := testutil.NewTestingTBProthesis("curator")
47 defer cancel()
48 flag.Parse()
Serge Bazanskidefff522022-05-16 17:28:16 +020049 integration.BeforeTestExternal(tb)
Serge Bazanski98e05e12023-04-05 12:44:14 +020050 grpclog.SetLoggerV2(logtree.GRPCify(lt.MustLeveledFor("grpc")))
Lorenz Brund13c1c62022-03-30 19:58:58 +020051 cluster = integration.NewClusterV3(tb, &cfg)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020052 endpoints = make([]string, 3)
53 for i := range endpoints {
54 endpoints[i] = cluster.Client(i).Endpoints()[0]
55 }
56
57 v := m.Run()
Lorenz Brund13c1c62022-03-30 19:58:58 +020058 cluster.Terminate(tb)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020059 os.Exit(v)
60}
61
62// setRaceWg creates a new WaitGroup and sets the given watcher to wait on this
63// WG after it performs the initial retrieval of a value from etcd, but before
64// it starts the watcher. This is used to test potential race conditions
65// present between these two steps.
Serge Bazanski37110c32023-03-01 13:57:27 +000066func setRaceWg[T any](w event.Watcher[T]) *sync.WaitGroup {
Serge Bazanskic89df2f2021-04-27 15:51:37 +020067 wg := sync.WaitGroup{}
Serge Bazanski37110c32023-03-01 13:57:27 +000068 w.(*watcher[T]).testRaceWG = &wg
Serge Bazanskic89df2f2021-04-27 15:51:37 +020069 return &wg
70}
71
72// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
73// thie WG after an etcd watch channel is created. This is used in tests to
74// ensure that the watcher is fully created before it is tested.
Serge Bazanski37110c32023-03-01 13:57:27 +000075func setSetupWg[T any](w event.Watcher[T]) *sync.WaitGroup {
Serge Bazanskic89df2f2021-04-27 15:51:37 +020076 wg := sync.WaitGroup{}
Serge Bazanski37110c32023-03-01 13:57:27 +000077 w.(*watcher[T]).testSetupWG = &wg
Serge Bazanskic89df2f2021-04-27 15:51:37 +020078 return &wg
79}
80
81// testClient is an etcd connection to the test cluster.
82type testClient struct {
83 client *clientv3.Client
84 namespaced client.Namespaced
85}
86
87func newTestClient(t *testing.T) *testClient {
88 t.Helper()
89 cli, err := clientv3.New(clientv3.Config{
90 Endpoints: endpoints,
91 DialTimeout: 1 * time.Second,
92 DialKeepAliveTime: 1 * time.Second,
93 DialKeepAliveTimeout: 1 * time.Second,
94 })
95 if err != nil {
96 t.Fatalf("clientv3.New: %v", err)
97 }
98
99 namespaced := client.NewLocal(cli)
100 return &testClient{
101 client: cli,
102 namespaced: namespaced,
103 }
104}
105
106func (d *testClient) close() {
107 d.client.Close()
108}
109
110// setEndpoints configures which endpoints (from {0,1,2}) the testClient is
111// connected to.
112func (d *testClient) setEndpoints(nums ...uint) {
113 var eps []string
114 for _, num := range nums {
115 eps = append(eps, endpoints[num])
116 }
117 d.client.SetEndpoints(eps...)
118}
119
120// put uses the testClient to store key with a given string value in etcd. It
121// contains retry logic that will block until the put is successful.
122func (d *testClient) put(t *testing.T, key, value string) {
123 t.Helper()
124 ctx, ctxC := context.WithCancel(context.Background())
125 defer ctxC()
126
127 for {
128 ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond)
129 _, err := d.namespaced.Put(ctxT, key, value)
130 ctxC()
131 if err == nil {
132 return
133 }
134 if err == ctxT.Err() {
135 log.Printf("Retrying after %v", err)
136 continue
137 }
138 // Retry on etcd unavailability - this will happen in this code as the
139 // etcd cluster repeatedly loses quorum.
140 var eerr rpctypes.EtcdError
141 if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable {
142 log.Printf("Retrying after %v", err)
143 continue
144 }
145 t.Fatalf("Put: %v", err)
146 }
147
148}
149
150// remove uses the testClient to remove the given key from etcd. It contains
151// retry logic that will block until the removal is successful.
152func (d *testClient) remove(t *testing.T, key string) {
153 t.Helper()
154 ctx, ctxC := context.WithCancel(context.Background())
155 defer ctxC()
156
157 _, err := d.namespaced.Delete(ctx, key)
158 if err == nil {
159 return
160 }
161 t.Fatalf("Delete: %v", err)
162}
163
164// expect runs a Get on the given Watcher, ensuring the returned value is a
165// given string.
Serge Bazanski37110c32023-03-01 13:57:27 +0000166func expect(t *testing.T, w event.Watcher[StringAt], value string) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200167 t.Helper()
168 ctx, ctxC := context.WithCancel(context.Background())
169 defer ctxC()
170
171 got, err := w.Get(ctx)
172 if err != nil {
173 t.Fatalf("Get: %v", err)
174 }
175
Serge Bazanski37110c32023-03-01 13:57:27 +0000176 if got, want := got.Value, value; got != want {
Serge Bazanskibbb873d2022-06-24 14:22:39 +0200177 t.Errorf("Wanted value %q, got %q", want, got)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200178 }
179}
180
181// expectTimeout ensures that the given watcher blocks on a Get call for at
182// least 100 milliseconds. This is used by tests to attempt to verify that the
183// watcher Get is fully blocked, but can cause false positives (eg. when Get
184// blocks for 101 milliseconds). Thus, this function should be used sparingly
185// and in tests that perform other baseline behaviour checks alongside this
186// test.
Serge Bazanski37110c32023-03-01 13:57:27 +0000187func expectTimeout[T any](t *testing.T, w event.Watcher[T]) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200188 t.Helper()
189 ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
190 got, err := w.Get(ctx)
191 ctxC()
192
193 if !errors.Is(err, ctx.Err()) {
194 t.Fatalf("Expected timeout error, got %v, %v", got, err)
195 }
196}
197
198// wait wraps a watcher into a channel of strings, ensuring that the watcher
199// never errors on Get calls and always returns strings.
Serge Bazanski37110c32023-03-01 13:57:27 +0000200func wait(t *testing.T, w event.Watcher[StringAt]) (chan string, func()) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200201 t.Helper()
202 ctx, ctxC := context.WithCancel(context.Background())
203
204 c := make(chan string)
205
206 go func() {
207 for {
208 got, err := w.Get(ctx)
209 if err != nil && errors.Is(err, ctx.Err()) {
210 return
211 }
212 if err != nil {
213 t.Fatalf("Get: %v", err)
214 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000215 c <- got.Value
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200216 }
217 }()
218
219 return c, ctxC
220}
221
222// TestSimple exercises the simplest possible interaction with a watched value.
223func TestSimple(t *testing.T) {
224 tc := newTestClient(t)
225 defer tc.close()
226
227 k := "test-simple"
Serge Bazanski37110c32023-03-01 13:57:27 +0000228 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200229 tc.put(t, k, "one")
230
231 watcher := value.Watch()
232 defer watcher.Close()
233 expect(t, watcher, "one")
234
235 tc.put(t, k, "two")
236 expect(t, watcher, "two")
237
238 tc.put(t, k, "three")
239 tc.put(t, k, "four")
240 tc.put(t, k, "five")
241 tc.put(t, k, "six")
242
243 q, cancel := wait(t, watcher)
244 // Test will hang here if the above value does not receive the set "six".
Serge Bazanski37110c32023-03-01 13:57:27 +0000245 log.Printf("a")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200246 for el := range q {
Serge Bazanski37110c32023-03-01 13:57:27 +0000247 log.Printf("%q", el)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200248 if el == "six" {
249 break
250 }
251 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000252 log.Printf("b")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200253 cancel()
254}
255
Serge Bazanski8d45a052021-10-18 17:24:24 +0200256// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
257// the given map with the retrieved value.
Serge Bazanski37110c32023-03-01 13:57:27 +0000258func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher[StringAt], m map[string]string) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200259 t.Helper()
260
261 vr, err := w.Get(ctx)
262 if err != nil {
263 t.Fatalf("Get: %v", err)
264 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000265 m[vr.Key] = vr.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200266}
267
268// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
269// retrieving updaates via Get in a fully blocking fashion.
270func TestSimpleRange(t *testing.T) {
271 tc := newTestClient(t)
272 defer tc.close()
273
274 ks := "test-simple-range/"
275 ke := "test-simple-range0"
Serge Bazanski37110c32023-03-01 13:57:27 +0000276 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200277 tc.put(t, ks+"a", "one")
278 tc.put(t, ks+"b", "two")
279 tc.put(t, ks+"c", "three")
280 tc.put(t, ks+"b", "four")
281
282 w := value.Watch()
283 defer w.Close()
284
285 ctx, ctxC := context.WithCancel(context.Background())
286 defer ctxC()
287
288 res := make(map[string]string)
289 stringAtGet(ctx, t, w, res)
290 stringAtGet(ctx, t, w, res)
291 stringAtGet(ctx, t, w, res)
292
293 tc.put(t, ks+"a", "five")
294 tc.put(t, ks+"e", "six")
295
296 stringAtGet(ctx, t, w, res)
297 stringAtGet(ctx, t, w, res)
298
299 for _, te := range []struct {
300 k, w string
301 }{
302 {ks + "a", "five"},
303 {ks + "b", "four"},
304 {ks + "c", "three"},
305 {ks + "e", "six"},
306 } {
307 if want, got := te.w, res[te.k]; want != got {
308 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
309 }
310 }
311}
312
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200313// TestCancel ensures that watchers can resume after being canceled.
314func TestCancel(t *testing.T) {
315 tc := newTestClient(t)
316 defer tc.close()
317
318 k := "test-cancel"
Serge Bazanski37110c32023-03-01 13:57:27 +0000319 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200320 tc.put(t, k, "one")
321
322 watcher := value.Watch()
323 defer watcher.Close()
324 expect(t, watcher, "one")
325
326 ctx, ctxC := context.WithCancel(context.Background())
327 errs := make(chan error, 1)
328 go func() {
329 _, err := watcher.Get(ctx)
330 errs <- err
331 }()
332 ctxC()
333 if want, got := ctx.Err(), <-errs; !errors.Is(got, want) {
334 t.Fatalf("Wanted err %v, got %v", want, got)
335 }
336
337 // Successfully canceled watch, resuming should continue to work.
338 q, cancel := wait(t, watcher)
339 defer cancel()
340
341 tc.put(t, k, "two")
342 if want, got := "two", <-q; want != got {
343 t.Fatalf("Wanted val %q, got %q", want, got)
344 }
345}
346
347// TestCancelOnGet ensures that a context cancellation on an initial Get (which
348// translates to an etcd Get in a backoff loop) doesn't block.
349func TestCancelOnGet(t *testing.T) {
350 tc := newTestClient(t)
351 defer tc.close()
352
353 k := "test-cancel-on-get"
Serge Bazanski37110c32023-03-01 13:57:27 +0000354 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200355 watcher := value.Watch()
356 tc.put(t, k, "one")
357
358 // Cause partition between client endpoint and rest of cluster. Any read/write
359 // operations will now hang.
360 tc.setEndpoints(0)
361 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
Serge Bazanskiad10ece2022-05-17 11:20:17 +0200362 // Let raft timeouts expire so that the leader is aware a partition has occurred
363 // and stops serving data if it is not part of a quorum anymore.
364 //
365 // Otherwise, if Member[0] was the leader, there will be a window of opportunity
366 // during which it will continue to serve read data even though it has been
367 // partitioned off. This is an effect of how etcd handles linearizable reads:
368 // they go through the leader, but do not go through raft.
369 //
370 // The value is the default etcd leader timeout (1s) + some wiggle room.
371 time.Sleep(time.Second + time.Millisecond*100)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200372
373 // Perform the initial Get(), which should attempt to retrieve a KV entry from
374 // the etcd service. This should hang. Unfortunately, there's no easy way to do
375 // this without an arbitrary sleep hoping that the client actually gets to the
376 // underlying etcd.Get call. This can cause false positives (eg. false 'pass'
377 // results) in this test.
378 ctx, ctxC := context.WithCancel(context.Background())
379 errs := make(chan error, 1)
380 go func() {
381 _, err := watcher.Get(ctx)
382 errs <- err
383 }()
384 time.Sleep(time.Second)
385
386 // Now that the etcd.Get is hanging, cancel the context.
387 ctxC()
388 // And now unpartition the cluster, resuming reads.
389 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
390
391 // The etcd.Get() call should've returned with a context cancellation.
392 err := <-errs
393 switch {
394 case err == nil:
395 t.Errorf("watcher.Get() returned no error, wanted context error")
396 case errors.Is(err, ctx.Err()):
397 // Okay.
398 default:
399 t.Errorf("watcher.Get() returned %v, wanted context error", err)
400 }
401}
402
403// TestClientReconnect forces a 'reconnection' of an active watcher from a
404// running member to another member, by stopping the original member and
405// explicitly reconnecting the client to other available members.
406//
407// This doe not reflect a situation expected during Metropolis runtime, as we
408// do not expect splits between an etcd client and its connected member
409// (instead, all etcd clients only connect to their local member). However, it
410// is still an important safety test to perform, and it also exercies the
411// equivalent behaviour of an etcd client re-connecting for any other reason.
412func TestClientReconnect(t *testing.T) {
413 tc := newTestClient(t)
414 defer tc.close()
415 tc.setEndpoints(0)
416
417 k := "test-client-reconnect"
Serge Bazanski37110c32023-03-01 13:57:27 +0000418 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200419 tc.put(t, k, "one")
420
421 watcher := value.Watch()
422 defer watcher.Close()
423 expect(t, watcher, "one")
424
425 q, cancel := wait(t, watcher)
426 defer cancel()
427
428 cluster.Members[0].Stop(t)
429 defer cluster.Members[0].Restart(t)
430 cluster.WaitLeader(t)
431
432 tc.setEndpoints(1, 2)
433 tc.put(t, k, "two")
434
435 if want, got := "two", <-q; want != got {
436 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
437 }
438}
439
440// TestClientPartition forces a temporary partition of the etcd member while a
441// watcher is running, updates the value from across the partition, and undoes
442// the partition.
443// The partition is expected to be entirely transparent to the watcher.
444func TestClientPartition(t *testing.T) {
445 tcOne := newTestClient(t)
446 defer tcOne.close()
447 tcOne.setEndpoints(0)
448
449 tcRest := newTestClient(t)
450 defer tcRest.close()
451 tcRest.setEndpoints(1, 2)
452
453 k := "test-client-partition"
Serge Bazanski37110c32023-03-01 13:57:27 +0000454 valueOne := NewValue(tcOne.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200455 watcherOne := valueOne.Watch()
456 defer watcherOne.Close()
Serge Bazanski37110c32023-03-01 13:57:27 +0000457 valueRest := NewValue(tcRest.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200458 watcherRest := valueRest.Watch()
459 defer watcherRest.Close()
460
461 tcRest.put(t, k, "a")
462 expect(t, watcherOne, "a")
463 expect(t, watcherRest, "a")
464
465 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
466
467 tcRest.put(t, k, "b")
468 expect(t, watcherRest, "b")
469 expectTimeout(t, watcherOne)
470
471 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
472
473 expect(t, watcherOne, "b")
474 tcRest.put(t, k, "c")
475 expect(t, watcherOne, "c")
476 expect(t, watcherRest, "c")
477
478}
479
480// TestEarlyUse exercises the correct behaviour of the value watcher on a value
481// that is not yet set.
482func TestEarlyUse(t *testing.T) {
483 tc := newTestClient(t)
484 defer tc.close()
485
486 k := "test-early-use"
487
Serge Bazanski37110c32023-03-01 13:57:27 +0000488 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200489 watcher := value.Watch()
490 defer watcher.Close()
491
492 wg := setSetupWg(watcher)
493 wg.Add(1)
494 q, cancel := wait(t, watcher)
495 defer cancel()
496
497 wg.Done()
498
499 tc.put(t, k, "one")
500
501 if want, got := "one", <-q; want != got {
502 t.Fatalf("Expected %q, got %q", want, got)
503 }
504}
505
506// TestRemove exercises the basic functionality of handling deleted values.
507func TestRemove(t *testing.T) {
508 tc := newTestClient(t)
509 defer tc.close()
510
511 k := "test-remove"
512 tc.put(t, k, "one")
513
Serge Bazanski37110c32023-03-01 13:57:27 +0000514 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200515 watcher := value.Watch()
516 defer watcher.Close()
517
518 expect(t, watcher, "one")
519 tc.remove(t, k)
520 expect(t, watcher, "")
521}
522
Serge Bazanski8d45a052021-10-18 17:24:24 +0200523// TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a
524// value is removed.
525func TestRemoveRange(t *testing.T) {
526 tc := newTestClient(t)
527 defer tc.close()
528
529 ks := "test-remove-range/"
530 ke := "test-remove-range0"
Serge Bazanski37110c32023-03-01 13:57:27 +0000531 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200532 tc.put(t, ks+"a", "one")
533 tc.put(t, ks+"b", "two")
534 tc.put(t, ks+"c", "three")
535 tc.put(t, ks+"b", "four")
536 tc.remove(t, ks+"c")
537
538 w := value.Watch()
539 defer w.Close()
540
541 ctx, ctxC := context.WithCancel(context.Background())
542 defer ctxC()
543
544 res := make(map[string]string)
545 stringAtGet(ctx, t, w, res)
546 stringAtGet(ctx, t, w, res)
547
548 for _, te := range []struct {
549 k, w string
550 }{
551 {ks + "a", "one"},
552 {ks + "b", "four"},
553 {ks + "c", ""},
554 } {
555 if want, got := te.w, res[te.k]; want != got {
556 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
557 }
558 }
559}
560
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200561// TestEmptyRace forces the watcher to retrieve an empty value from the K/V
562// store at first, and establishing the watch channel after a new value has
563// been stored in the same place.
564func TestEmptyRace(t *testing.T) {
565 tc := newTestClient(t)
566 defer tc.close()
567
568 k := "test-remove-race"
569 tc.put(t, k, "one")
570 tc.remove(t, k)
571
Serge Bazanski37110c32023-03-01 13:57:27 +0000572 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200573 watcher := value.Watch()
574 defer watcher.Close()
575
576 wg := setRaceWg(watcher)
577 wg.Add(1)
578 q, cancel := wait(t, watcher)
579 defer cancel()
580
581 tc.put(t, k, "two")
582 wg.Done()
583
584 if want, got := "two", <-q; want != got {
585 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
586 }
587}
588
589type errOrInt struct {
590 val int64
591 err error
592}
593
594// TestDecoder exercises the BytesDecoder functionality of the watcher, by
595// creating a value with a decoder that only accepts string-encoded integers
596// that are divisible by three. The test then proceeds to put a handful of
597// values into etcd, ensuring that undecodable values correctly return an error
598// on Get, but that the watcher continues to work after the error has been
599// returned.
600func TestDecoder(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +0000601 decoderDivisibleByThree := func(_, value []byte) (int64, error) {
602 num, err := strconv.ParseInt(string(value), 10, 64)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200603 if err != nil {
Serge Bazanski37110c32023-03-01 13:57:27 +0000604 return 0, fmt.Errorf("not a valid number")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200605 }
606 if (num % 3) != 0 {
Serge Bazanski37110c32023-03-01 13:57:27 +0000607 return 0, fmt.Errorf("not divisible by 3")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200608 }
609 return num, nil
610 }
611
612 tc := newTestClient(t)
613 defer tc.close()
614
615 ctx, ctxC := context.WithCancel(context.Background())
616 defer ctxC()
617
618 k := "test-decoder"
Serge Bazanski37110c32023-03-01 13:57:27 +0000619 value := NewValue(tc.namespaced, k, decoderDivisibleByThree)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200620 watcher := value.Watch()
621 defer watcher.Close()
622 tc.put(t, k, "3")
623 _, err := watcher.Get(ctx)
624 if err != nil {
625 t.Fatalf("Initial Get: %v", err)
626 }
627
628 // Stream updates into arbitrarily-bounded test channel.
629 queue := make(chan errOrInt, 100)
630 go func() {
631 for {
632 res, err := watcher.Get(ctx)
633 if err != nil && errors.Is(err, ctx.Err()) {
634 return
635 }
636 if err != nil {
637 queue <- errOrInt{
638 err: err,
639 }
640 } else {
641 queue <- errOrInt{
Serge Bazanski37110c32023-03-01 13:57:27 +0000642 val: res,
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200643 }
644 }
645 }
646 }()
647
648 var wantList []*int64
649 wantError := func(val string) {
650 wantList = append(wantList, nil)
651 tc.put(t, k, val)
652 }
653 wantValue := func(val string, decoded int64) {
654 wantList = append(wantList, &decoded)
655 tc.put(t, k, val)
656 }
657
658 wantError("")
659 wantValue("9", 9)
660 wantError("foo")
661 wantValue("18", 18)
662 wantError("10")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200663 wantValue("27", 27)
664 wantValue("36", 36)
665
666 for i, want := range wantList {
667 q := <-queue
668 if want == nil && q.err == nil {
Serge Bazanski832bc772022-04-07 12:33:01 +0200669 t.Fatalf("Case %d: wanted error, got no error and value %d", i, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200670 }
671 if want != nil && (*want) != q.val {
Serge Bazanski832bc772022-04-07 12:33:01 +0200672 t.Fatalf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200673 }
674 }
675}
676
677// TestBacklog ensures that the watcher can handle a large backlog of changes
678// in etcd that the client didnt' keep up with, and that whatever final state
679// is available to the client when it actually gets to calling Get().
680func TestBacklog(t *testing.T) {
681 tc := newTestClient(t)
682 defer tc.close()
683
684 k := "test-backlog"
Serge Bazanski37110c32023-03-01 13:57:27 +0000685 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200686 watcher := value.Watch()
687 defer watcher.Close()
688
689 tc.put(t, k, "initial")
690 expect(t, watcher, "initial")
691
692 for i := 0; i < 1000; i++ {
693 tc.put(t, k, fmt.Sprintf("val-%d", i))
694 }
695
696 ctx, ctxC := context.WithTimeout(context.Background(), time.Second)
697 defer ctxC()
698 for {
699 valB, err := watcher.Get(ctx)
700 if err != nil {
701 t.Fatalf("Get() returned error before expected final value: %v", err)
702 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000703 if valB.Value == "val-999" {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200704 break
705 }
706 }
707}
Serge Bazanski8d45a052021-10-18 17:24:24 +0200708
709// TestBacklogRange ensures that the ranged etcd watcher can handle a large
710// backlog of changes in etcd that the client didn't keep up with.
711func TestBacklogRange(t *testing.T) {
712 tc := newTestClient(t)
713 defer tc.close()
714
715 ks := "test-backlog-range/"
716 ke := "test-backlog-range0"
Serge Bazanski37110c32023-03-01 13:57:27 +0000717 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200718 w := value.Watch()
719 defer w.Close()
720
721 for i := 0; i < 100; i++ {
722 if i%2 == 0 {
723 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
724 } else {
725 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
726 }
727 }
728
729 ctx, ctxC := context.WithCancel(context.Background())
730 defer ctxC()
731
732 res := make(map[string]string)
733 stringAtGet(ctx, t, w, res)
734 stringAtGet(ctx, t, w, res)
735
736 for _, te := range []struct {
737 k, w string
738 }{
739 {ks + "a", "val-98"},
740 {ks + "b", "val-99"},
741 } {
742 if want, got := te.w, res[te.k]; want != got {
743 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
744 }
745 }
746}
747
748// TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers,
749// which effectively makes any Get operation non-blocking (but also showcases
750// that unless a Get without BacklogOnly is issues, no new data will appear by
751// itself in the watcher - which is an undocumented implementation detail of the
752// option).
753func TestBacklogOnly(t *testing.T) {
754 tc := newTestClient(t)
755 defer tc.close()
756 ctx, ctxC := context.WithCancel(context.Background())
757 defer ctxC()
758
759 k := "test-backlog-only"
760 tc.put(t, k, "initial")
761
Serge Bazanski37110c32023-03-01 13:57:27 +0000762 value := NewValue(tc.namespaced, k, DecoderStringAt)
Serge Bazanski8d45a052021-10-18 17:24:24 +0200763 watcher := value.Watch()
764 defer watcher.Close()
765
Serge Bazanski37110c32023-03-01 13:57:27 +0000766 d, err := watcher.Get(ctx, event.BacklogOnly[StringAt]())
Serge Bazanski8d45a052021-10-18 17:24:24 +0200767 if err != nil {
768 t.Fatalf("First Get failed: %v", err)
769 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000770 if want, got := "initial", d.Value; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200771 t.Fatalf("First Get: wanted value %q, got %q", want, got)
772 }
773
774 // As expected, next call to Get with BacklogOnly fails - there truly is no new
775 // updates to emit.
Serge Bazanski37110c32023-03-01 13:57:27 +0000776 _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
777 if want, got := event.BacklogDone, err; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200778 t.Fatalf("Second Get: wanted %v, got %v", want, got)
779 }
780
781 // Implementation detail: even though there is a new value ('second'),
782 // BacklogOnly will still return BacklogDone.
783 tc.put(t, k, "second")
Serge Bazanski37110c32023-03-01 13:57:27 +0000784 _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
785 if want, got := event.BacklogDone, err; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200786 t.Fatalf("Third Get: wanted %v, got %v", want, got)
787 }
788
789 // ... However, a Get without BacklogOnly will return the new value.
790 d, err = watcher.Get(ctx)
791 if err != nil {
792 t.Fatalf("Fourth Get failed: %v", err)
793 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000794 if want, got := "second", d.Value; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200795 t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
796 }
797}
798
799// TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers,
800// showcasing how it expected to be used for keeping up with the external state
801// of a range by synchronizing to a local map.
802func TestBacklogOnlyRange(t *testing.T) {
803 tc := newTestClient(t)
804 defer tc.close()
805 ctx, ctxC := context.WithCancel(context.Background())
806 defer ctxC()
807
808 ks := "test-backlog-only-range/"
809 ke := "test-backlog-only-range0"
810
811 for i := 0; i < 100; i++ {
812 if i%2 == 0 {
813 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
814 } else {
815 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
816 }
817 }
818
Serge Bazanski37110c32023-03-01 13:57:27 +0000819 value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200820 w := value.Watch()
821 defer w.Close()
822
823 // Collect results into a map from key to value.
824 res := make(map[string]string)
825
826 // Run first Get - this is the barrier defining what's part of the backlog.
Serge Bazanski37110c32023-03-01 13:57:27 +0000827 g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
Serge Bazanski8d45a052021-10-18 17:24:24 +0200828 if err != nil {
829 t.Fatalf("Get: %v", err)
830 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000831 res[g.Key] = g.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200832
833 // These won't be part of the backlog.
834 tc.put(t, ks+"a", fmt.Sprintf("val-100"))
835 tc.put(t, ks+"b", fmt.Sprintf("val-101"))
836
837 // Retrieve the rest of the backlog until BacklogDone is returned.
838 nUpdates := 1
839 for {
Serge Bazanski37110c32023-03-01 13:57:27 +0000840 g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
841 if err == event.BacklogDone {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200842 break
843 }
844 if err != nil {
845 t.Fatalf("Get: %v", err)
846 }
847 nUpdates += 1
Serge Bazanski37110c32023-03-01 13:57:27 +0000848 res[g.Key] = g.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200849 }
850
851 // The backlog should've been compacted to just two entries at their newest
852 // state.
853 if want, got := 2, nUpdates; want != got {
854 t.Fatalf("wanted backlog in %d updates, got it in %d", want, got)
855 }
856
857 for _, te := range []struct {
858 k, w string
859 }{
860 {ks + "a", "val-98"},
861 {ks + "b", "val-99"},
862 } {
863 if want, got := te.w, res[te.k]; want != got {
864 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
865 }
866 }
867}