blob: a5cde2028c6ce77311c97fe21e48ed70fb9fc66b [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanskic89df2f2021-04-27 15:51:37 +02004package etcd
5
6import (
7 "context"
8 "errors"
Lorenz Brund13c1c62022-03-30 19:58:58 +02009 "flag"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020010 "fmt"
11 "log"
12 "os"
13 "strconv"
14 "sync"
15 "testing"
16 "time"
17
Lorenz Brund13c1c62022-03-30 19:58:58 +020018 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
19 "go.etcd.io/etcd/client/pkg/v3/testutil"
20 clientv3 "go.etcd.io/etcd/client/v3"
21 "go.etcd.io/etcd/tests/v3/integration"
Serge Bazanski98e05e12023-04-05 12:44:14 +020022 "go.uber.org/zap"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020023 "google.golang.org/grpc/codes"
Serge Bazanski98e05e12023-04-05 12:44:14 +020024 "google.golang.org/grpc/grpclog"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020025
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020026 "source.monogon.dev/osbase/event"
27 "source.monogon.dev/osbase/logtree"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020028)
29
30var (
31 cluster *integration.ClusterV3
32 endpoints []string
33)
34
35// TestMain brings up a 3 node etcd cluster for tests to use.
36func TestMain(m *testing.M) {
Serge Bazanski98e05e12023-04-05 12:44:14 +020037 // This logtree's data is not output anywhere.
38 lt := logtree.New()
39
Serge Bazanskic89df2f2021-04-27 15:51:37 +020040 cfg := integration.ClusterConfig{
41 Size: 3,
42 GRPCKeepAliveMinTime: time.Millisecond,
Serge Bazanski98e05e12023-04-05 12:44:14 +020043 LoggerBuilder: func(memberName string) *zap.Logger {
44 dn := logtree.DN("etcd." + memberName)
45 return logtree.Zapify(lt.MustLeveledFor(dn), zap.WarnLevel)
46 },
Serge Bazanskic89df2f2021-04-27 15:51:37 +020047 }
Lorenz Brund13c1c62022-03-30 19:58:58 +020048 tb, cancel := testutil.NewTestingTBProthesis("curator")
49 defer cancel()
50 flag.Parse()
Serge Bazanskidefff522022-05-16 17:28:16 +020051 integration.BeforeTestExternal(tb)
Serge Bazanski98e05e12023-04-05 12:44:14 +020052 grpclog.SetLoggerV2(logtree.GRPCify(lt.MustLeveledFor("grpc")))
Lorenz Brund13c1c62022-03-30 19:58:58 +020053 cluster = integration.NewClusterV3(tb, &cfg)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020054 endpoints = make([]string, 3)
55 for i := range endpoints {
56 endpoints[i] = cluster.Client(i).Endpoints()[0]
57 }
58
59 v := m.Run()
Lorenz Brund13c1c62022-03-30 19:58:58 +020060 cluster.Terminate(tb)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020061 os.Exit(v)
62}
63
64// setRaceWg creates a new WaitGroup and sets the given watcher to wait on this
65// WG after it performs the initial retrieval of a value from etcd, but before
66// it starts the watcher. This is used to test potential race conditions
67// present between these two steps.
Serge Bazanski37110c32023-03-01 13:57:27 +000068func setRaceWg[T any](w event.Watcher[T]) *sync.WaitGroup {
Tim Windelschmidt3b5a9172024-05-23 13:33:52 +020069 var wg sync.WaitGroup
Serge Bazanski37110c32023-03-01 13:57:27 +000070 w.(*watcher[T]).testRaceWG = &wg
Serge Bazanskic89df2f2021-04-27 15:51:37 +020071 return &wg
72}
73
74// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
75// thie WG after an etcd watch channel is created. This is used in tests to
76// ensure that the watcher is fully created before it is tested.
Serge Bazanski37110c32023-03-01 13:57:27 +000077func setSetupWg[T any](w event.Watcher[T]) *sync.WaitGroup {
Tim Windelschmidt3b5a9172024-05-23 13:33:52 +020078 var wg sync.WaitGroup
Serge Bazanski37110c32023-03-01 13:57:27 +000079 w.(*watcher[T]).testSetupWG = &wg
Serge Bazanskic89df2f2021-04-27 15:51:37 +020080 return &wg
81}
82
83// testClient is an etcd connection to the test cluster.
84type testClient struct {
Tim Windelschmidt99e15112025-02-05 17:38:16 +010085 client *clientv3.Client
Serge Bazanskic89df2f2021-04-27 15:51:37 +020086}
87
88func newTestClient(t *testing.T) *testClient {
89 t.Helper()
90 cli, err := clientv3.New(clientv3.Config{
91 Endpoints: endpoints,
92 DialTimeout: 1 * time.Second,
93 DialKeepAliveTime: 1 * time.Second,
94 DialKeepAliveTimeout: 1 * time.Second,
95 })
96 if err != nil {
97 t.Fatalf("clientv3.New: %v", err)
98 }
99
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200100 return &testClient{
Tim Windelschmidt99e15112025-02-05 17:38:16 +0100101 client: cli,
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200102 }
103}
104
105func (d *testClient) close() {
106 d.client.Close()
107}
108
109// setEndpoints configures which endpoints (from {0,1,2}) the testClient is
110// connected to.
111func (d *testClient) setEndpoints(nums ...uint) {
112 var eps []string
113 for _, num := range nums {
114 eps = append(eps, endpoints[num])
115 }
116 d.client.SetEndpoints(eps...)
117}
118
119// put uses the testClient to store key with a given string value in etcd. It
120// contains retry logic that will block until the put is successful.
121func (d *testClient) put(t *testing.T, key, value string) {
122 t.Helper()
123 ctx, ctxC := context.WithCancel(context.Background())
124 defer ctxC()
125
126 for {
127 ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond)
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200128 _, err := d.client.Put(ctxT, key, value)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200129 ctxC()
130 if err == nil {
131 return
132 }
Tim Windelschmidtd5f851b2024-04-23 14:59:37 +0200133 if errors.Is(err, ctxT.Err()) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200134 log.Printf("Retrying after %v", err)
135 continue
136 }
137 // Retry on etcd unavailability - this will happen in this code as the
138 // etcd cluster repeatedly loses quorum.
139 var eerr rpctypes.EtcdError
140 if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable {
141 log.Printf("Retrying after %v", err)
142 continue
143 }
144 t.Fatalf("Put: %v", err)
145 }
146
147}
148
149// remove uses the testClient to remove the given key from etcd. It contains
150// retry logic that will block until the removal is successful.
151func (d *testClient) remove(t *testing.T, key string) {
152 t.Helper()
153 ctx, ctxC := context.WithCancel(context.Background())
154 defer ctxC()
155
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200156 _, err := d.client.Delete(ctx, key)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200157 if err == nil {
158 return
159 }
160 t.Fatalf("Delete: %v", err)
161}
162
163// expect runs a Get on the given Watcher, ensuring the returned value is a
164// given string.
Serge Bazanski37110c32023-03-01 13:57:27 +0000165func expect(t *testing.T, w event.Watcher[StringAt], value string) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200166 t.Helper()
167 ctx, ctxC := context.WithCancel(context.Background())
168 defer ctxC()
169
170 got, err := w.Get(ctx)
171 if err != nil {
172 t.Fatalf("Get: %v", err)
173 }
174
Serge Bazanski37110c32023-03-01 13:57:27 +0000175 if got, want := got.Value, value; got != want {
Serge Bazanskibbb873d2022-06-24 14:22:39 +0200176 t.Errorf("Wanted value %q, got %q", want, got)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200177 }
178}
179
180// expectTimeout ensures that the given watcher blocks on a Get call for at
181// least 100 milliseconds. This is used by tests to attempt to verify that the
182// watcher Get is fully blocked, but can cause false positives (eg. when Get
183// blocks for 101 milliseconds). Thus, this function should be used sparingly
184// and in tests that perform other baseline behaviour checks alongside this
185// test.
Serge Bazanski37110c32023-03-01 13:57:27 +0000186func expectTimeout[T any](t *testing.T, w event.Watcher[T]) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200187 t.Helper()
188 ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
189 got, err := w.Get(ctx)
190 ctxC()
191
192 if !errors.Is(err, ctx.Err()) {
193 t.Fatalf("Expected timeout error, got %v, %v", got, err)
194 }
195}
196
197// wait wraps a watcher into a channel of strings, ensuring that the watcher
198// never errors on Get calls and always returns strings.
Serge Bazanski37110c32023-03-01 13:57:27 +0000199func wait(t *testing.T, w event.Watcher[StringAt]) (chan string, func()) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200200 t.Helper()
201 ctx, ctxC := context.WithCancel(context.Background())
202
203 c := make(chan string)
204
205 go func() {
206 for {
207 got, err := w.Get(ctx)
208 if err != nil && errors.Is(err, ctx.Err()) {
209 return
210 }
211 if err != nil {
Tim Windelschmidt88049722024-04-11 23:09:23 +0200212 t.Errorf("Get: %v", err)
213 close(c)
214 return
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200215 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000216 c <- got.Value
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200217 }
218 }()
219
220 return c, ctxC
221}
222
223// TestSimple exercises the simplest possible interaction with a watched value.
224func TestSimple(t *testing.T) {
225 tc := newTestClient(t)
226 defer tc.close()
227
228 k := "test-simple"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200229 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200230 tc.put(t, k, "one")
231
232 watcher := value.Watch()
233 defer watcher.Close()
234 expect(t, watcher, "one")
235
236 tc.put(t, k, "two")
237 expect(t, watcher, "two")
238
239 tc.put(t, k, "three")
240 tc.put(t, k, "four")
241 tc.put(t, k, "five")
242 tc.put(t, k, "six")
243
244 q, cancel := wait(t, watcher)
245 // Test will hang here if the above value does not receive the set "six".
Serge Bazanski37110c32023-03-01 13:57:27 +0000246 log.Printf("a")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200247 for el := range q {
Serge Bazanski37110c32023-03-01 13:57:27 +0000248 log.Printf("%q", el)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200249 if el == "six" {
250 break
251 }
252 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000253 log.Printf("b")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200254 cancel()
255}
256
Serge Bazanski8d45a052021-10-18 17:24:24 +0200257// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
258// the given map with the retrieved value.
Serge Bazanski37110c32023-03-01 13:57:27 +0000259func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher[StringAt], m map[string]string) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200260 t.Helper()
261
262 vr, err := w.Get(ctx)
263 if err != nil {
264 t.Fatalf("Get: %v", err)
265 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000266 m[vr.Key] = vr.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200267}
268
269// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
270// retrieving updaates via Get in a fully blocking fashion.
271func TestSimpleRange(t *testing.T) {
272 tc := newTestClient(t)
273 defer tc.close()
274
275 ks := "test-simple-range/"
276 ke := "test-simple-range0"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200277 value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200278 tc.put(t, ks+"a", "one")
279 tc.put(t, ks+"b", "two")
280 tc.put(t, ks+"c", "three")
281 tc.put(t, ks+"b", "four")
282
283 w := value.Watch()
284 defer w.Close()
285
286 ctx, ctxC := context.WithCancel(context.Background())
287 defer ctxC()
288
289 res := make(map[string]string)
290 stringAtGet(ctx, t, w, res)
291 stringAtGet(ctx, t, w, res)
292 stringAtGet(ctx, t, w, res)
293
294 tc.put(t, ks+"a", "five")
295 tc.put(t, ks+"e", "six")
296
297 stringAtGet(ctx, t, w, res)
298 stringAtGet(ctx, t, w, res)
299
300 for _, te := range []struct {
301 k, w string
302 }{
303 {ks + "a", "five"},
304 {ks + "b", "four"},
305 {ks + "c", "three"},
306 {ks + "e", "six"},
307 } {
308 if want, got := te.w, res[te.k]; want != got {
309 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
310 }
311 }
312}
313
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200314// TestCancel ensures that watchers can resume after being canceled.
315func TestCancel(t *testing.T) {
316 tc := newTestClient(t)
317 defer tc.close()
318
319 k := "test-cancel"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200320 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200321 tc.put(t, k, "one")
322
323 watcher := value.Watch()
324 defer watcher.Close()
325 expect(t, watcher, "one")
326
327 ctx, ctxC := context.WithCancel(context.Background())
328 errs := make(chan error, 1)
329 go func() {
330 _, err := watcher.Get(ctx)
331 errs <- err
332 }()
333 ctxC()
334 if want, got := ctx.Err(), <-errs; !errors.Is(got, want) {
335 t.Fatalf("Wanted err %v, got %v", want, got)
336 }
337
338 // Successfully canceled watch, resuming should continue to work.
339 q, cancel := wait(t, watcher)
340 defer cancel()
341
342 tc.put(t, k, "two")
343 if want, got := "two", <-q; want != got {
344 t.Fatalf("Wanted val %q, got %q", want, got)
345 }
346}
347
348// TestCancelOnGet ensures that a context cancellation on an initial Get (which
349// translates to an etcd Get in a backoff loop) doesn't block.
350func TestCancelOnGet(t *testing.T) {
351 tc := newTestClient(t)
352 defer tc.close()
353
354 k := "test-cancel-on-get"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200355 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200356 watcher := value.Watch()
357 tc.put(t, k, "one")
358
359 // Cause partition between client endpoint and rest of cluster. Any read/write
360 // operations will now hang.
361 tc.setEndpoints(0)
362 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
Serge Bazanskiad10ece2022-05-17 11:20:17 +0200363 // Let raft timeouts expire so that the leader is aware a partition has occurred
364 // and stops serving data if it is not part of a quorum anymore.
365 //
366 // Otherwise, if Member[0] was the leader, there will be a window of opportunity
367 // during which it will continue to serve read data even though it has been
368 // partitioned off. This is an effect of how etcd handles linearizable reads:
369 // they go through the leader, but do not go through raft.
370 //
371 // The value is the default etcd leader timeout (1s) + some wiggle room.
372 time.Sleep(time.Second + time.Millisecond*100)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200373
374 // Perform the initial Get(), which should attempt to retrieve a KV entry from
375 // the etcd service. This should hang. Unfortunately, there's no easy way to do
376 // this without an arbitrary sleep hoping that the client actually gets to the
377 // underlying etcd.Get call. This can cause false positives (eg. false 'pass'
378 // results) in this test.
379 ctx, ctxC := context.WithCancel(context.Background())
380 errs := make(chan error, 1)
381 go func() {
382 _, err := watcher.Get(ctx)
383 errs <- err
384 }()
385 time.Sleep(time.Second)
386
387 // Now that the etcd.Get is hanging, cancel the context.
388 ctxC()
389 // And now unpartition the cluster, resuming reads.
390 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
391
392 // The etcd.Get() call should've returned with a context cancellation.
393 err := <-errs
394 switch {
395 case err == nil:
396 t.Errorf("watcher.Get() returned no error, wanted context error")
397 case errors.Is(err, ctx.Err()):
398 // Okay.
399 default:
400 t.Errorf("watcher.Get() returned %v, wanted context error", err)
401 }
402}
403
404// TestClientReconnect forces a 'reconnection' of an active watcher from a
405// running member to another member, by stopping the original member and
406// explicitly reconnecting the client to other available members.
407//
408// This doe not reflect a situation expected during Metropolis runtime, as we
409// do not expect splits between an etcd client and its connected member
410// (instead, all etcd clients only connect to their local member). However, it
411// is still an important safety test to perform, and it also exercies the
412// equivalent behaviour of an etcd client re-connecting for any other reason.
413func TestClientReconnect(t *testing.T) {
414 tc := newTestClient(t)
415 defer tc.close()
416 tc.setEndpoints(0)
417
418 k := "test-client-reconnect"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200419 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200420 tc.put(t, k, "one")
421
422 watcher := value.Watch()
423 defer watcher.Close()
424 expect(t, watcher, "one")
425
426 q, cancel := wait(t, watcher)
427 defer cancel()
428
429 cluster.Members[0].Stop(t)
430 defer cluster.Members[0].Restart(t)
431 cluster.WaitLeader(t)
432
433 tc.setEndpoints(1, 2)
434 tc.put(t, k, "two")
435
436 if want, got := "two", <-q; want != got {
437 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
438 }
439}
440
441// TestClientPartition forces a temporary partition of the etcd member while a
442// watcher is running, updates the value from across the partition, and undoes
443// the partition.
444// The partition is expected to be entirely transparent to the watcher.
445func TestClientPartition(t *testing.T) {
446 tcOne := newTestClient(t)
447 defer tcOne.close()
448 tcOne.setEndpoints(0)
449
450 tcRest := newTestClient(t)
451 defer tcRest.close()
452 tcRest.setEndpoints(1, 2)
453
454 k := "test-client-partition"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200455 valueOne := NewValue(tcOne.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200456 watcherOne := valueOne.Watch()
457 defer watcherOne.Close()
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200458 valueRest := NewValue(tcRest.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200459 watcherRest := valueRest.Watch()
460 defer watcherRest.Close()
461
462 tcRest.put(t, k, "a")
463 expect(t, watcherOne, "a")
464 expect(t, watcherRest, "a")
465
466 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
467
468 tcRest.put(t, k, "b")
469 expect(t, watcherRest, "b")
470 expectTimeout(t, watcherOne)
471
472 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
473
474 expect(t, watcherOne, "b")
475 tcRest.put(t, k, "c")
476 expect(t, watcherOne, "c")
477 expect(t, watcherRest, "c")
478
479}
480
481// TestEarlyUse exercises the correct behaviour of the value watcher on a value
482// that is not yet set.
483func TestEarlyUse(t *testing.T) {
484 tc := newTestClient(t)
485 defer tc.close()
486
487 k := "test-early-use"
488
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200489 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200490 watcher := value.Watch()
491 defer watcher.Close()
492
493 wg := setSetupWg(watcher)
494 wg.Add(1)
495 q, cancel := wait(t, watcher)
496 defer cancel()
497
498 wg.Done()
499
500 tc.put(t, k, "one")
501
502 if want, got := "one", <-q; want != got {
503 t.Fatalf("Expected %q, got %q", want, got)
504 }
505}
506
507// TestRemove exercises the basic functionality of handling deleted values.
508func TestRemove(t *testing.T) {
509 tc := newTestClient(t)
510 defer tc.close()
511
512 k := "test-remove"
513 tc.put(t, k, "one")
514
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200515 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200516 watcher := value.Watch()
517 defer watcher.Close()
518
519 expect(t, watcher, "one")
520 tc.remove(t, k)
521 expect(t, watcher, "")
522}
523
Serge Bazanski8d45a052021-10-18 17:24:24 +0200524// TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a
525// value is removed.
526func TestRemoveRange(t *testing.T) {
527 tc := newTestClient(t)
528 defer tc.close()
529
530 ks := "test-remove-range/"
531 ke := "test-remove-range0"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200532 value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200533 tc.put(t, ks+"a", "one")
534 tc.put(t, ks+"b", "two")
535 tc.put(t, ks+"c", "three")
536 tc.put(t, ks+"b", "four")
537 tc.remove(t, ks+"c")
538
539 w := value.Watch()
540 defer w.Close()
541
542 ctx, ctxC := context.WithCancel(context.Background())
543 defer ctxC()
544
545 res := make(map[string]string)
546 stringAtGet(ctx, t, w, res)
547 stringAtGet(ctx, t, w, res)
548
549 for _, te := range []struct {
550 k, w string
551 }{
552 {ks + "a", "one"},
553 {ks + "b", "four"},
554 {ks + "c", ""},
555 } {
556 if want, got := te.w, res[te.k]; want != got {
557 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
558 }
559 }
560}
561
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200562// TestEmptyRace forces the watcher to retrieve an empty value from the K/V
563// store at first, and establishing the watch channel after a new value has
564// been stored in the same place.
565func TestEmptyRace(t *testing.T) {
566 tc := newTestClient(t)
567 defer tc.close()
568
569 k := "test-remove-race"
570 tc.put(t, k, "one")
571 tc.remove(t, k)
572
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200573 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200574 watcher := value.Watch()
575 defer watcher.Close()
576
577 wg := setRaceWg(watcher)
578 wg.Add(1)
579 q, cancel := wait(t, watcher)
580 defer cancel()
581
582 tc.put(t, k, "two")
583 wg.Done()
584
585 if want, got := "two", <-q; want != got {
586 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
587 }
588}
589
590type errOrInt struct {
591 val int64
592 err error
593}
594
595// TestDecoder exercises the BytesDecoder functionality of the watcher, by
596// creating a value with a decoder that only accepts string-encoded integers
597// that are divisible by three. The test then proceeds to put a handful of
598// values into etcd, ensuring that undecodable values correctly return an error
599// on Get, but that the watcher continues to work after the error has been
600// returned.
601func TestDecoder(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +0000602 decoderDivisibleByThree := func(_, value []byte) (int64, error) {
603 num, err := strconv.ParseInt(string(value), 10, 64)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200604 if err != nil {
Serge Bazanski37110c32023-03-01 13:57:27 +0000605 return 0, fmt.Errorf("not a valid number")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200606 }
607 if (num % 3) != 0 {
Serge Bazanski37110c32023-03-01 13:57:27 +0000608 return 0, fmt.Errorf("not divisible by 3")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200609 }
610 return num, nil
611 }
612
613 tc := newTestClient(t)
614 defer tc.close()
615
616 ctx, ctxC := context.WithCancel(context.Background())
617 defer ctxC()
618
619 k := "test-decoder"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200620 value := NewValue(tc.client, k, decoderDivisibleByThree)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200621 watcher := value.Watch()
622 defer watcher.Close()
623 tc.put(t, k, "3")
624 _, err := watcher.Get(ctx)
625 if err != nil {
626 t.Fatalf("Initial Get: %v", err)
627 }
628
629 // Stream updates into arbitrarily-bounded test channel.
630 queue := make(chan errOrInt, 100)
631 go func() {
632 for {
633 res, err := watcher.Get(ctx)
634 if err != nil && errors.Is(err, ctx.Err()) {
635 return
636 }
637 if err != nil {
638 queue <- errOrInt{
639 err: err,
640 }
641 } else {
642 queue <- errOrInt{
Serge Bazanski37110c32023-03-01 13:57:27 +0000643 val: res,
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200644 }
645 }
646 }
647 }()
648
649 var wantList []*int64
650 wantError := func(val string) {
651 wantList = append(wantList, nil)
652 tc.put(t, k, val)
653 }
654 wantValue := func(val string, decoded int64) {
655 wantList = append(wantList, &decoded)
656 tc.put(t, k, val)
657 }
658
659 wantError("")
660 wantValue("9", 9)
661 wantError("foo")
662 wantValue("18", 18)
663 wantError("10")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200664 wantValue("27", 27)
665 wantValue("36", 36)
666
667 for i, want := range wantList {
668 q := <-queue
669 if want == nil && q.err == nil {
Serge Bazanski832bc772022-04-07 12:33:01 +0200670 t.Fatalf("Case %d: wanted error, got no error and value %d", i, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200671 }
672 if want != nil && (*want) != q.val {
Serge Bazanski832bc772022-04-07 12:33:01 +0200673 t.Fatalf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200674 }
675 }
676}
677
678// TestBacklog ensures that the watcher can handle a large backlog of changes
679// in etcd that the client didnt' keep up with, and that whatever final state
680// is available to the client when it actually gets to calling Get().
681func TestBacklog(t *testing.T) {
682 tc := newTestClient(t)
683 defer tc.close()
684
685 k := "test-backlog"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200686 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200687 watcher := value.Watch()
688 defer watcher.Close()
689
690 tc.put(t, k, "initial")
691 expect(t, watcher, "initial")
692
693 for i := 0; i < 1000; i++ {
694 tc.put(t, k, fmt.Sprintf("val-%d", i))
695 }
696
697 ctx, ctxC := context.WithTimeout(context.Background(), time.Second)
698 defer ctxC()
699 for {
700 valB, err := watcher.Get(ctx)
701 if err != nil {
702 t.Fatalf("Get() returned error before expected final value: %v", err)
703 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000704 if valB.Value == "val-999" {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200705 break
706 }
707 }
708}
Serge Bazanski8d45a052021-10-18 17:24:24 +0200709
710// TestBacklogRange ensures that the ranged etcd watcher can handle a large
711// backlog of changes in etcd that the client didn't keep up with.
712func TestBacklogRange(t *testing.T) {
713 tc := newTestClient(t)
714 defer tc.close()
715
716 ks := "test-backlog-range/"
717 ke := "test-backlog-range0"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200718 value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200719 w := value.Watch()
720 defer w.Close()
721
722 for i := 0; i < 100; i++ {
723 if i%2 == 0 {
724 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
725 } else {
726 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
727 }
728 }
729
730 ctx, ctxC := context.WithCancel(context.Background())
731 defer ctxC()
732
733 res := make(map[string]string)
734 stringAtGet(ctx, t, w, res)
735 stringAtGet(ctx, t, w, res)
736
737 for _, te := range []struct {
738 k, w string
739 }{
740 {ks + "a", "val-98"},
741 {ks + "b", "val-99"},
742 } {
743 if want, got := te.w, res[te.k]; want != got {
744 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
745 }
746 }
747}
748
749// TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers,
750// which effectively makes any Get operation non-blocking (but also showcases
751// that unless a Get without BacklogOnly is issues, no new data will appear by
752// itself in the watcher - which is an undocumented implementation detail of the
753// option).
754func TestBacklogOnly(t *testing.T) {
755 tc := newTestClient(t)
756 defer tc.close()
757 ctx, ctxC := context.WithCancel(context.Background())
758 defer ctxC()
759
760 k := "test-backlog-only"
761 tc.put(t, k, "initial")
762
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200763 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanski8d45a052021-10-18 17:24:24 +0200764 watcher := value.Watch()
765 defer watcher.Close()
766
Serge Bazanski37110c32023-03-01 13:57:27 +0000767 d, err := watcher.Get(ctx, event.BacklogOnly[StringAt]())
Serge Bazanski8d45a052021-10-18 17:24:24 +0200768 if err != nil {
769 t.Fatalf("First Get failed: %v", err)
770 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000771 if want, got := "initial", d.Value; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200772 t.Fatalf("First Get: wanted value %q, got %q", want, got)
773 }
774
775 // As expected, next call to Get with BacklogOnly fails - there truly is no new
776 // updates to emit.
Serge Bazanski37110c32023-03-01 13:57:27 +0000777 _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
Tim Windelschmidt513df182024-04-18 23:44:50 +0200778 if want, got := event.ErrBacklogDone, err; !errors.Is(got, want) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200779 t.Fatalf("Second Get: wanted %v, got %v", want, got)
780 }
781
782 // Implementation detail: even though there is a new value ('second'),
Tim Windelschmidt513df182024-04-18 23:44:50 +0200783 // BacklogOnly will still return ErrBacklogDone.
Serge Bazanski8d45a052021-10-18 17:24:24 +0200784 tc.put(t, k, "second")
Serge Bazanski37110c32023-03-01 13:57:27 +0000785 _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
Tim Windelschmidt513df182024-04-18 23:44:50 +0200786 if want, got := event.ErrBacklogDone, err; !errors.Is(got, want) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200787 t.Fatalf("Third Get: wanted %v, got %v", want, got)
788 }
789
790 // ... However, a Get without BacklogOnly will return the new value.
791 d, err = watcher.Get(ctx)
792 if err != nil {
793 t.Fatalf("Fourth Get failed: %v", err)
794 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000795 if want, got := "second", d.Value; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200796 t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
797 }
798}
799
800// TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers,
801// showcasing how it expected to be used for keeping up with the external state
802// of a range by synchronizing to a local map.
803func TestBacklogOnlyRange(t *testing.T) {
804 tc := newTestClient(t)
805 defer tc.close()
806 ctx, ctxC := context.WithCancel(context.Background())
807 defer ctxC()
808
809 ks := "test-backlog-only-range/"
810 ke := "test-backlog-only-range0"
811
812 for i := 0; i < 100; i++ {
813 if i%2 == 0 {
814 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
815 } else {
816 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
817 }
818 }
819
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200820 value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200821 w := value.Watch()
822 defer w.Close()
823
824 // Collect results into a map from key to value.
825 res := make(map[string]string)
826
827 // Run first Get - this is the barrier defining what's part of the backlog.
Serge Bazanski37110c32023-03-01 13:57:27 +0000828 g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
Serge Bazanski8d45a052021-10-18 17:24:24 +0200829 if err != nil {
830 t.Fatalf("Get: %v", err)
831 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000832 res[g.Key] = g.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200833
834 // These won't be part of the backlog.
Tim Windelschmidt92316fd2024-04-18 23:06:40 +0200835 tc.put(t, ks+"a", "val-100")
836 tc.put(t, ks+"b", "val-101")
Serge Bazanski8d45a052021-10-18 17:24:24 +0200837
Tim Windelschmidt513df182024-04-18 23:44:50 +0200838 // Retrieve the rest of the backlog until ErrBacklogDone is returned.
Serge Bazanski8d45a052021-10-18 17:24:24 +0200839 nUpdates := 1
840 for {
Serge Bazanski37110c32023-03-01 13:57:27 +0000841 g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
Tim Windelschmidt513df182024-04-18 23:44:50 +0200842 if errors.Is(err, event.ErrBacklogDone) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200843 break
844 }
845 if err != nil {
846 t.Fatalf("Get: %v", err)
847 }
848 nUpdates += 1
Serge Bazanski37110c32023-03-01 13:57:27 +0000849 res[g.Key] = g.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200850 }
851
852 // The backlog should've been compacted to just two entries at their newest
853 // state.
854 if want, got := 2, nUpdates; want != got {
855 t.Fatalf("wanted backlog in %d updates, got it in %d", want, got)
856 }
857
858 for _, te := range []struct {
859 k, w string
860 }{
861 {ks + "a", "val-98"},
862 {ks + "b", "val-99"},
863 } {
864 if want, got := te.w, res[te.k]; want != got {
865 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
866 }
867 }
868}