blob: c6d50b1fa03856667f9e5e30b54f596966fab23f [file] [log] [blame]
Serge Bazanskic89df2f2021-04-27 15:51:37 +02001package etcd
2
3import (
4 "context"
5 "errors"
Lorenz Brund13c1c62022-03-30 19:58:58 +02006 "flag"
Serge Bazanskic89df2f2021-04-27 15:51:37 +02007 "fmt"
8 "log"
9 "os"
10 "strconv"
11 "sync"
12 "testing"
13 "time"
14
Lorenz Brund13c1c62022-03-30 19:58:58 +020015 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
16 "go.etcd.io/etcd/client/pkg/v3/testutil"
17 clientv3 "go.etcd.io/etcd/client/v3"
18 "go.etcd.io/etcd/tests/v3/integration"
Serge Bazanski98e05e12023-04-05 12:44:14 +020019 "go.uber.org/zap"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020020 "google.golang.org/grpc/codes"
Serge Bazanski98e05e12023-04-05 12:44:14 +020021 "google.golang.org/grpc/grpclog"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020022
Serge Bazanskic89df2f2021-04-27 15:51:37 +020023 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanski98e05e12023-04-05 12:44:14 +020024 "source.monogon.dev/metropolis/pkg/logtree"
Serge Bazanskic89df2f2021-04-27 15:51:37 +020025)
26
27var (
28 cluster *integration.ClusterV3
29 endpoints []string
30)
31
32// TestMain brings up a 3 node etcd cluster for tests to use.
33func TestMain(m *testing.M) {
Serge Bazanski98e05e12023-04-05 12:44:14 +020034 // This logtree's data is not output anywhere.
35 lt := logtree.New()
36
Serge Bazanskic89df2f2021-04-27 15:51:37 +020037 cfg := integration.ClusterConfig{
38 Size: 3,
39 GRPCKeepAliveMinTime: time.Millisecond,
Serge Bazanski98e05e12023-04-05 12:44:14 +020040 LoggerBuilder: func(memberName string) *zap.Logger {
41 dn := logtree.DN("etcd." + memberName)
42 return logtree.Zapify(lt.MustLeveledFor(dn), zap.WarnLevel)
43 },
Serge Bazanskic89df2f2021-04-27 15:51:37 +020044 }
Lorenz Brund13c1c62022-03-30 19:58:58 +020045 tb, cancel := testutil.NewTestingTBProthesis("curator")
46 defer cancel()
47 flag.Parse()
Serge Bazanskidefff522022-05-16 17:28:16 +020048 integration.BeforeTestExternal(tb)
Serge Bazanski98e05e12023-04-05 12:44:14 +020049 grpclog.SetLoggerV2(logtree.GRPCify(lt.MustLeveledFor("grpc")))
Lorenz Brund13c1c62022-03-30 19:58:58 +020050 cluster = integration.NewClusterV3(tb, &cfg)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020051 endpoints = make([]string, 3)
52 for i := range endpoints {
53 endpoints[i] = cluster.Client(i).Endpoints()[0]
54 }
55
56 v := m.Run()
Lorenz Brund13c1c62022-03-30 19:58:58 +020057 cluster.Terminate(tb)
Serge Bazanskic89df2f2021-04-27 15:51:37 +020058 os.Exit(v)
59}
60
61// setRaceWg creates a new WaitGroup and sets the given watcher to wait on this
62// WG after it performs the initial retrieval of a value from etcd, but before
63// it starts the watcher. This is used to test potential race conditions
64// present between these two steps.
Serge Bazanski37110c32023-03-01 13:57:27 +000065func setRaceWg[T any](w event.Watcher[T]) *sync.WaitGroup {
Serge Bazanskic89df2f2021-04-27 15:51:37 +020066 wg := sync.WaitGroup{}
Serge Bazanski37110c32023-03-01 13:57:27 +000067 w.(*watcher[T]).testRaceWG = &wg
Serge Bazanskic89df2f2021-04-27 15:51:37 +020068 return &wg
69}
70
71// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
72// thie WG after an etcd watch channel is created. This is used in tests to
73// ensure that the watcher is fully created before it is tested.
Serge Bazanski37110c32023-03-01 13:57:27 +000074func setSetupWg[T any](w event.Watcher[T]) *sync.WaitGroup {
Serge Bazanskic89df2f2021-04-27 15:51:37 +020075 wg := sync.WaitGroup{}
Serge Bazanski37110c32023-03-01 13:57:27 +000076 w.(*watcher[T]).testSetupWG = &wg
Serge Bazanskic89df2f2021-04-27 15:51:37 +020077 return &wg
78}
79
80// testClient is an etcd connection to the test cluster.
81type testClient struct {
82 client *clientv3.Client
Serge Bazanskic89df2f2021-04-27 15:51:37 +020083}
84
85func newTestClient(t *testing.T) *testClient {
86 t.Helper()
87 cli, err := clientv3.New(clientv3.Config{
88 Endpoints: endpoints,
89 DialTimeout: 1 * time.Second,
90 DialKeepAliveTime: 1 * time.Second,
91 DialKeepAliveTimeout: 1 * time.Second,
92 })
93 if err != nil {
94 t.Fatalf("clientv3.New: %v", err)
95 }
96
Serge Bazanskic89df2f2021-04-27 15:51:37 +020097 return &testClient{
98 client: cli,
Serge Bazanskic89df2f2021-04-27 15:51:37 +020099 }
100}
101
102func (d *testClient) close() {
103 d.client.Close()
104}
105
106// setEndpoints configures which endpoints (from {0,1,2}) the testClient is
107// connected to.
108func (d *testClient) setEndpoints(nums ...uint) {
109 var eps []string
110 for _, num := range nums {
111 eps = append(eps, endpoints[num])
112 }
113 d.client.SetEndpoints(eps...)
114}
115
116// put uses the testClient to store key with a given string value in etcd. It
117// contains retry logic that will block until the put is successful.
118func (d *testClient) put(t *testing.T, key, value string) {
119 t.Helper()
120 ctx, ctxC := context.WithCancel(context.Background())
121 defer ctxC()
122
123 for {
124 ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond)
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200125 _, err := d.client.Put(ctxT, key, value)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200126 ctxC()
127 if err == nil {
128 return
129 }
Tim Windelschmidtd5f851b2024-04-23 14:59:37 +0200130 if errors.Is(err, ctxT.Err()) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200131 log.Printf("Retrying after %v", err)
132 continue
133 }
134 // Retry on etcd unavailability - this will happen in this code as the
135 // etcd cluster repeatedly loses quorum.
136 var eerr rpctypes.EtcdError
137 if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable {
138 log.Printf("Retrying after %v", err)
139 continue
140 }
141 t.Fatalf("Put: %v", err)
142 }
143
144}
145
146// remove uses the testClient to remove the given key from etcd. It contains
147// retry logic that will block until the removal is successful.
148func (d *testClient) remove(t *testing.T, key string) {
149 t.Helper()
150 ctx, ctxC := context.WithCancel(context.Background())
151 defer ctxC()
152
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200153 _, err := d.client.Delete(ctx, key)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200154 if err == nil {
155 return
156 }
157 t.Fatalf("Delete: %v", err)
158}
159
160// expect runs a Get on the given Watcher, ensuring the returned value is a
161// given string.
Serge Bazanski37110c32023-03-01 13:57:27 +0000162func expect(t *testing.T, w event.Watcher[StringAt], value string) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200163 t.Helper()
164 ctx, ctxC := context.WithCancel(context.Background())
165 defer ctxC()
166
167 got, err := w.Get(ctx)
168 if err != nil {
169 t.Fatalf("Get: %v", err)
170 }
171
Serge Bazanski37110c32023-03-01 13:57:27 +0000172 if got, want := got.Value, value; got != want {
Serge Bazanskibbb873d2022-06-24 14:22:39 +0200173 t.Errorf("Wanted value %q, got %q", want, got)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200174 }
175}
176
177// expectTimeout ensures that the given watcher blocks on a Get call for at
178// least 100 milliseconds. This is used by tests to attempt to verify that the
179// watcher Get is fully blocked, but can cause false positives (eg. when Get
180// blocks for 101 milliseconds). Thus, this function should be used sparingly
181// and in tests that perform other baseline behaviour checks alongside this
182// test.
Serge Bazanski37110c32023-03-01 13:57:27 +0000183func expectTimeout[T any](t *testing.T, w event.Watcher[T]) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200184 t.Helper()
185 ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
186 got, err := w.Get(ctx)
187 ctxC()
188
189 if !errors.Is(err, ctx.Err()) {
190 t.Fatalf("Expected timeout error, got %v, %v", got, err)
191 }
192}
193
194// wait wraps a watcher into a channel of strings, ensuring that the watcher
195// never errors on Get calls and always returns strings.
Serge Bazanski37110c32023-03-01 13:57:27 +0000196func wait(t *testing.T, w event.Watcher[StringAt]) (chan string, func()) {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200197 t.Helper()
198 ctx, ctxC := context.WithCancel(context.Background())
199
200 c := make(chan string)
201
202 go func() {
203 for {
204 got, err := w.Get(ctx)
205 if err != nil && errors.Is(err, ctx.Err()) {
206 return
207 }
208 if err != nil {
Tim Windelschmidt88049722024-04-11 23:09:23 +0200209 t.Errorf("Get: %v", err)
210 close(c)
211 return
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200212 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000213 c <- got.Value
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200214 }
215 }()
216
217 return c, ctxC
218}
219
220// TestSimple exercises the simplest possible interaction with a watched value.
221func TestSimple(t *testing.T) {
222 tc := newTestClient(t)
223 defer tc.close()
224
225 k := "test-simple"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200226 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200227 tc.put(t, k, "one")
228
229 watcher := value.Watch()
230 defer watcher.Close()
231 expect(t, watcher, "one")
232
233 tc.put(t, k, "two")
234 expect(t, watcher, "two")
235
236 tc.put(t, k, "three")
237 tc.put(t, k, "four")
238 tc.put(t, k, "five")
239 tc.put(t, k, "six")
240
241 q, cancel := wait(t, watcher)
242 // Test will hang here if the above value does not receive the set "six".
Serge Bazanski37110c32023-03-01 13:57:27 +0000243 log.Printf("a")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200244 for el := range q {
Serge Bazanski37110c32023-03-01 13:57:27 +0000245 log.Printf("%q", el)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200246 if el == "six" {
247 break
248 }
249 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000250 log.Printf("b")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200251 cancel()
252}
253
Serge Bazanski8d45a052021-10-18 17:24:24 +0200254// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
255// the given map with the retrieved value.
Serge Bazanski37110c32023-03-01 13:57:27 +0000256func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher[StringAt], m map[string]string) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200257 t.Helper()
258
259 vr, err := w.Get(ctx)
260 if err != nil {
261 t.Fatalf("Get: %v", err)
262 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000263 m[vr.Key] = vr.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200264}
265
266// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
267// retrieving updaates via Get in a fully blocking fashion.
268func TestSimpleRange(t *testing.T) {
269 tc := newTestClient(t)
270 defer tc.close()
271
272 ks := "test-simple-range/"
273 ke := "test-simple-range0"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200274 value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200275 tc.put(t, ks+"a", "one")
276 tc.put(t, ks+"b", "two")
277 tc.put(t, ks+"c", "three")
278 tc.put(t, ks+"b", "four")
279
280 w := value.Watch()
281 defer w.Close()
282
283 ctx, ctxC := context.WithCancel(context.Background())
284 defer ctxC()
285
286 res := make(map[string]string)
287 stringAtGet(ctx, t, w, res)
288 stringAtGet(ctx, t, w, res)
289 stringAtGet(ctx, t, w, res)
290
291 tc.put(t, ks+"a", "five")
292 tc.put(t, ks+"e", "six")
293
294 stringAtGet(ctx, t, w, res)
295 stringAtGet(ctx, t, w, res)
296
297 for _, te := range []struct {
298 k, w string
299 }{
300 {ks + "a", "five"},
301 {ks + "b", "four"},
302 {ks + "c", "three"},
303 {ks + "e", "six"},
304 } {
305 if want, got := te.w, res[te.k]; want != got {
306 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
307 }
308 }
309}
310
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200311// TestCancel ensures that watchers can resume after being canceled.
312func TestCancel(t *testing.T) {
313 tc := newTestClient(t)
314 defer tc.close()
315
316 k := "test-cancel"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200317 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200318 tc.put(t, k, "one")
319
320 watcher := value.Watch()
321 defer watcher.Close()
322 expect(t, watcher, "one")
323
324 ctx, ctxC := context.WithCancel(context.Background())
325 errs := make(chan error, 1)
326 go func() {
327 _, err := watcher.Get(ctx)
328 errs <- err
329 }()
330 ctxC()
331 if want, got := ctx.Err(), <-errs; !errors.Is(got, want) {
332 t.Fatalf("Wanted err %v, got %v", want, got)
333 }
334
335 // Successfully canceled watch, resuming should continue to work.
336 q, cancel := wait(t, watcher)
337 defer cancel()
338
339 tc.put(t, k, "two")
340 if want, got := "two", <-q; want != got {
341 t.Fatalf("Wanted val %q, got %q", want, got)
342 }
343}
344
345// TestCancelOnGet ensures that a context cancellation on an initial Get (which
346// translates to an etcd Get in a backoff loop) doesn't block.
347func TestCancelOnGet(t *testing.T) {
348 tc := newTestClient(t)
349 defer tc.close()
350
351 k := "test-cancel-on-get"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200352 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200353 watcher := value.Watch()
354 tc.put(t, k, "one")
355
356 // Cause partition between client endpoint and rest of cluster. Any read/write
357 // operations will now hang.
358 tc.setEndpoints(0)
359 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
Serge Bazanskiad10ece2022-05-17 11:20:17 +0200360 // Let raft timeouts expire so that the leader is aware a partition has occurred
361 // and stops serving data if it is not part of a quorum anymore.
362 //
363 // Otherwise, if Member[0] was the leader, there will be a window of opportunity
364 // during which it will continue to serve read data even though it has been
365 // partitioned off. This is an effect of how etcd handles linearizable reads:
366 // they go through the leader, but do not go through raft.
367 //
368 // The value is the default etcd leader timeout (1s) + some wiggle room.
369 time.Sleep(time.Second + time.Millisecond*100)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200370
371 // Perform the initial Get(), which should attempt to retrieve a KV entry from
372 // the etcd service. This should hang. Unfortunately, there's no easy way to do
373 // this without an arbitrary sleep hoping that the client actually gets to the
374 // underlying etcd.Get call. This can cause false positives (eg. false 'pass'
375 // results) in this test.
376 ctx, ctxC := context.WithCancel(context.Background())
377 errs := make(chan error, 1)
378 go func() {
379 _, err := watcher.Get(ctx)
380 errs <- err
381 }()
382 time.Sleep(time.Second)
383
384 // Now that the etcd.Get is hanging, cancel the context.
385 ctxC()
386 // And now unpartition the cluster, resuming reads.
387 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
388
389 // The etcd.Get() call should've returned with a context cancellation.
390 err := <-errs
391 switch {
392 case err == nil:
393 t.Errorf("watcher.Get() returned no error, wanted context error")
394 case errors.Is(err, ctx.Err()):
395 // Okay.
396 default:
397 t.Errorf("watcher.Get() returned %v, wanted context error", err)
398 }
399}
400
401// TestClientReconnect forces a 'reconnection' of an active watcher from a
402// running member to another member, by stopping the original member and
403// explicitly reconnecting the client to other available members.
404//
405// This doe not reflect a situation expected during Metropolis runtime, as we
406// do not expect splits between an etcd client and its connected member
407// (instead, all etcd clients only connect to their local member). However, it
408// is still an important safety test to perform, and it also exercies the
409// equivalent behaviour of an etcd client re-connecting for any other reason.
410func TestClientReconnect(t *testing.T) {
411 tc := newTestClient(t)
412 defer tc.close()
413 tc.setEndpoints(0)
414
415 k := "test-client-reconnect"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200416 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200417 tc.put(t, k, "one")
418
419 watcher := value.Watch()
420 defer watcher.Close()
421 expect(t, watcher, "one")
422
423 q, cancel := wait(t, watcher)
424 defer cancel()
425
426 cluster.Members[0].Stop(t)
427 defer cluster.Members[0].Restart(t)
428 cluster.WaitLeader(t)
429
430 tc.setEndpoints(1, 2)
431 tc.put(t, k, "two")
432
433 if want, got := "two", <-q; want != got {
434 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
435 }
436}
437
438// TestClientPartition forces a temporary partition of the etcd member while a
439// watcher is running, updates the value from across the partition, and undoes
440// the partition.
441// The partition is expected to be entirely transparent to the watcher.
442func TestClientPartition(t *testing.T) {
443 tcOne := newTestClient(t)
444 defer tcOne.close()
445 tcOne.setEndpoints(0)
446
447 tcRest := newTestClient(t)
448 defer tcRest.close()
449 tcRest.setEndpoints(1, 2)
450
451 k := "test-client-partition"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200452 valueOne := NewValue(tcOne.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200453 watcherOne := valueOne.Watch()
454 defer watcherOne.Close()
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200455 valueRest := NewValue(tcRest.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200456 watcherRest := valueRest.Watch()
457 defer watcherRest.Close()
458
459 tcRest.put(t, k, "a")
460 expect(t, watcherOne, "a")
461 expect(t, watcherRest, "a")
462
463 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
464
465 tcRest.put(t, k, "b")
466 expect(t, watcherRest, "b")
467 expectTimeout(t, watcherOne)
468
469 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
470
471 expect(t, watcherOne, "b")
472 tcRest.put(t, k, "c")
473 expect(t, watcherOne, "c")
474 expect(t, watcherRest, "c")
475
476}
477
478// TestEarlyUse exercises the correct behaviour of the value watcher on a value
479// that is not yet set.
480func TestEarlyUse(t *testing.T) {
481 tc := newTestClient(t)
482 defer tc.close()
483
484 k := "test-early-use"
485
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200486 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200487 watcher := value.Watch()
488 defer watcher.Close()
489
490 wg := setSetupWg(watcher)
491 wg.Add(1)
492 q, cancel := wait(t, watcher)
493 defer cancel()
494
495 wg.Done()
496
497 tc.put(t, k, "one")
498
499 if want, got := "one", <-q; want != got {
500 t.Fatalf("Expected %q, got %q", want, got)
501 }
502}
503
504// TestRemove exercises the basic functionality of handling deleted values.
505func TestRemove(t *testing.T) {
506 tc := newTestClient(t)
507 defer tc.close()
508
509 k := "test-remove"
510 tc.put(t, k, "one")
511
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200512 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200513 watcher := value.Watch()
514 defer watcher.Close()
515
516 expect(t, watcher, "one")
517 tc.remove(t, k)
518 expect(t, watcher, "")
519}
520
Serge Bazanski8d45a052021-10-18 17:24:24 +0200521// TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a
522// value is removed.
523func TestRemoveRange(t *testing.T) {
524 tc := newTestClient(t)
525 defer tc.close()
526
527 ks := "test-remove-range/"
528 ke := "test-remove-range0"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200529 value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200530 tc.put(t, ks+"a", "one")
531 tc.put(t, ks+"b", "two")
532 tc.put(t, ks+"c", "three")
533 tc.put(t, ks+"b", "four")
534 tc.remove(t, ks+"c")
535
536 w := value.Watch()
537 defer w.Close()
538
539 ctx, ctxC := context.WithCancel(context.Background())
540 defer ctxC()
541
542 res := make(map[string]string)
543 stringAtGet(ctx, t, w, res)
544 stringAtGet(ctx, t, w, res)
545
546 for _, te := range []struct {
547 k, w string
548 }{
549 {ks + "a", "one"},
550 {ks + "b", "four"},
551 {ks + "c", ""},
552 } {
553 if want, got := te.w, res[te.k]; want != got {
554 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
555 }
556 }
557}
558
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200559// TestEmptyRace forces the watcher to retrieve an empty value from the K/V
560// store at first, and establishing the watch channel after a new value has
561// been stored in the same place.
562func TestEmptyRace(t *testing.T) {
563 tc := newTestClient(t)
564 defer tc.close()
565
566 k := "test-remove-race"
567 tc.put(t, k, "one")
568 tc.remove(t, k)
569
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200570 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200571 watcher := value.Watch()
572 defer watcher.Close()
573
574 wg := setRaceWg(watcher)
575 wg.Add(1)
576 q, cancel := wait(t, watcher)
577 defer cancel()
578
579 tc.put(t, k, "two")
580 wg.Done()
581
582 if want, got := "two", <-q; want != got {
583 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
584 }
585}
586
587type errOrInt struct {
588 val int64
589 err error
590}
591
592// TestDecoder exercises the BytesDecoder functionality of the watcher, by
593// creating a value with a decoder that only accepts string-encoded integers
594// that are divisible by three. The test then proceeds to put a handful of
595// values into etcd, ensuring that undecodable values correctly return an error
596// on Get, but that the watcher continues to work after the error has been
597// returned.
598func TestDecoder(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +0000599 decoderDivisibleByThree := func(_, value []byte) (int64, error) {
600 num, err := strconv.ParseInt(string(value), 10, 64)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200601 if err != nil {
Serge Bazanski37110c32023-03-01 13:57:27 +0000602 return 0, fmt.Errorf("not a valid number")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200603 }
604 if (num % 3) != 0 {
Serge Bazanski37110c32023-03-01 13:57:27 +0000605 return 0, fmt.Errorf("not divisible by 3")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200606 }
607 return num, nil
608 }
609
610 tc := newTestClient(t)
611 defer tc.close()
612
613 ctx, ctxC := context.WithCancel(context.Background())
614 defer ctxC()
615
616 k := "test-decoder"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200617 value := NewValue(tc.client, k, decoderDivisibleByThree)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200618 watcher := value.Watch()
619 defer watcher.Close()
620 tc.put(t, k, "3")
621 _, err := watcher.Get(ctx)
622 if err != nil {
623 t.Fatalf("Initial Get: %v", err)
624 }
625
626 // Stream updates into arbitrarily-bounded test channel.
627 queue := make(chan errOrInt, 100)
628 go func() {
629 for {
630 res, err := watcher.Get(ctx)
631 if err != nil && errors.Is(err, ctx.Err()) {
632 return
633 }
634 if err != nil {
635 queue <- errOrInt{
636 err: err,
637 }
638 } else {
639 queue <- errOrInt{
Serge Bazanski37110c32023-03-01 13:57:27 +0000640 val: res,
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200641 }
642 }
643 }
644 }()
645
646 var wantList []*int64
647 wantError := func(val string) {
648 wantList = append(wantList, nil)
649 tc.put(t, k, val)
650 }
651 wantValue := func(val string, decoded int64) {
652 wantList = append(wantList, &decoded)
653 tc.put(t, k, val)
654 }
655
656 wantError("")
657 wantValue("9", 9)
658 wantError("foo")
659 wantValue("18", 18)
660 wantError("10")
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200661 wantValue("27", 27)
662 wantValue("36", 36)
663
664 for i, want := range wantList {
665 q := <-queue
666 if want == nil && q.err == nil {
Serge Bazanski832bc772022-04-07 12:33:01 +0200667 t.Fatalf("Case %d: wanted error, got no error and value %d", i, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200668 }
669 if want != nil && (*want) != q.val {
Serge Bazanski832bc772022-04-07 12:33:01 +0200670 t.Fatalf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200671 }
672 }
673}
674
675// TestBacklog ensures that the watcher can handle a large backlog of changes
676// in etcd that the client didnt' keep up with, and that whatever final state
677// is available to the client when it actually gets to calling Get().
678func TestBacklog(t *testing.T) {
679 tc := newTestClient(t)
680 defer tc.close()
681
682 k := "test-backlog"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200683 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200684 watcher := value.Watch()
685 defer watcher.Close()
686
687 tc.put(t, k, "initial")
688 expect(t, watcher, "initial")
689
690 for i := 0; i < 1000; i++ {
691 tc.put(t, k, fmt.Sprintf("val-%d", i))
692 }
693
694 ctx, ctxC := context.WithTimeout(context.Background(), time.Second)
695 defer ctxC()
696 for {
697 valB, err := watcher.Get(ctx)
698 if err != nil {
699 t.Fatalf("Get() returned error before expected final value: %v", err)
700 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000701 if valB.Value == "val-999" {
Serge Bazanskic89df2f2021-04-27 15:51:37 +0200702 break
703 }
704 }
705}
Serge Bazanski8d45a052021-10-18 17:24:24 +0200706
707// TestBacklogRange ensures that the ranged etcd watcher can handle a large
708// backlog of changes in etcd that the client didn't keep up with.
709func TestBacklogRange(t *testing.T) {
710 tc := newTestClient(t)
711 defer tc.close()
712
713 ks := "test-backlog-range/"
714 ke := "test-backlog-range0"
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200715 value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200716 w := value.Watch()
717 defer w.Close()
718
719 for i := 0; i < 100; i++ {
720 if i%2 == 0 {
721 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
722 } else {
723 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
724 }
725 }
726
727 ctx, ctxC := context.WithCancel(context.Background())
728 defer ctxC()
729
730 res := make(map[string]string)
731 stringAtGet(ctx, t, w, res)
732 stringAtGet(ctx, t, w, res)
733
734 for _, te := range []struct {
735 k, w string
736 }{
737 {ks + "a", "val-98"},
738 {ks + "b", "val-99"},
739 } {
740 if want, got := te.w, res[te.k]; want != got {
741 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
742 }
743 }
744}
745
746// TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers,
747// which effectively makes any Get operation non-blocking (but also showcases
748// that unless a Get without BacklogOnly is issues, no new data will appear by
749// itself in the watcher - which is an undocumented implementation detail of the
750// option).
751func TestBacklogOnly(t *testing.T) {
752 tc := newTestClient(t)
753 defer tc.close()
754 ctx, ctxC := context.WithCancel(context.Background())
755 defer ctxC()
756
757 k := "test-backlog-only"
758 tc.put(t, k, "initial")
759
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200760 value := NewValue(tc.client, k, DecoderStringAt)
Serge Bazanski8d45a052021-10-18 17:24:24 +0200761 watcher := value.Watch()
762 defer watcher.Close()
763
Serge Bazanski37110c32023-03-01 13:57:27 +0000764 d, err := watcher.Get(ctx, event.BacklogOnly[StringAt]())
Serge Bazanski8d45a052021-10-18 17:24:24 +0200765 if err != nil {
766 t.Fatalf("First Get failed: %v", err)
767 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000768 if want, got := "initial", d.Value; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200769 t.Fatalf("First Get: wanted value %q, got %q", want, got)
770 }
771
772 // As expected, next call to Get with BacklogOnly fails - there truly is no new
773 // updates to emit.
Serge Bazanski37110c32023-03-01 13:57:27 +0000774 _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
Tim Windelschmidt513df182024-04-18 23:44:50 +0200775 if want, got := event.ErrBacklogDone, err; !errors.Is(got, want) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200776 t.Fatalf("Second Get: wanted %v, got %v", want, got)
777 }
778
779 // Implementation detail: even though there is a new value ('second'),
Tim Windelschmidt513df182024-04-18 23:44:50 +0200780 // BacklogOnly will still return ErrBacklogDone.
Serge Bazanski8d45a052021-10-18 17:24:24 +0200781 tc.put(t, k, "second")
Serge Bazanski37110c32023-03-01 13:57:27 +0000782 _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
Tim Windelschmidt513df182024-04-18 23:44:50 +0200783 if want, got := event.ErrBacklogDone, err; !errors.Is(got, want) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200784 t.Fatalf("Third Get: wanted %v, got %v", want, got)
785 }
786
787 // ... However, a Get without BacklogOnly will return the new value.
788 d, err = watcher.Get(ctx)
789 if err != nil {
790 t.Fatalf("Fourth Get failed: %v", err)
791 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000792 if want, got := "second", d.Value; want != got {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200793 t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
794 }
795}
796
797// TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers,
798// showcasing how it expected to be used for keeping up with the external state
799// of a range by synchronizing to a local map.
800func TestBacklogOnlyRange(t *testing.T) {
801 tc := newTestClient(t)
802 defer tc.close()
803 ctx, ctxC := context.WithCancel(context.Background())
804 defer ctxC()
805
806 ks := "test-backlog-only-range/"
807 ke := "test-backlog-only-range0"
808
809 for i := 0; i < 100; i++ {
810 if i%2 == 0 {
811 tc.put(t, ks+"a", fmt.Sprintf("val-%d", i))
812 } else {
813 tc.put(t, ks+"b", fmt.Sprintf("val-%d", i))
814 }
815 }
816
Tim Windelschmidtda1c9502024-05-08 01:24:29 +0200817 value := NewValue(tc.client, ks, DecoderStringAt, Range(ke))
Serge Bazanski8d45a052021-10-18 17:24:24 +0200818 w := value.Watch()
819 defer w.Close()
820
821 // Collect results into a map from key to value.
822 res := make(map[string]string)
823
824 // Run first Get - this is the barrier defining what's part of the backlog.
Serge Bazanski37110c32023-03-01 13:57:27 +0000825 g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
Serge Bazanski8d45a052021-10-18 17:24:24 +0200826 if err != nil {
827 t.Fatalf("Get: %v", err)
828 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000829 res[g.Key] = g.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200830
831 // These won't be part of the backlog.
Tim Windelschmidt92316fd2024-04-18 23:06:40 +0200832 tc.put(t, ks+"a", "val-100")
833 tc.put(t, ks+"b", "val-101")
Serge Bazanski8d45a052021-10-18 17:24:24 +0200834
Tim Windelschmidt513df182024-04-18 23:44:50 +0200835 // Retrieve the rest of the backlog until ErrBacklogDone is returned.
Serge Bazanski8d45a052021-10-18 17:24:24 +0200836 nUpdates := 1
837 for {
Serge Bazanski37110c32023-03-01 13:57:27 +0000838 g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
Tim Windelschmidt513df182024-04-18 23:44:50 +0200839 if errors.Is(err, event.ErrBacklogDone) {
Serge Bazanski8d45a052021-10-18 17:24:24 +0200840 break
841 }
842 if err != nil {
843 t.Fatalf("Get: %v", err)
844 }
845 nUpdates += 1
Serge Bazanski37110c32023-03-01 13:57:27 +0000846 res[g.Key] = g.Value
Serge Bazanski8d45a052021-10-18 17:24:24 +0200847 }
848
849 // The backlog should've been compacted to just two entries at their newest
850 // state.
851 if want, got := 2, nUpdates; want != got {
852 t.Fatalf("wanted backlog in %d updates, got it in %d", want, got)
853 }
854
855 for _, te := range []struct {
856 k, w string
857 }{
858 {ks + "a", "val-98"},
859 {ks + "b", "val-99"},
860 } {
861 if want, got := te.w, res[te.k]; want != got {
862 t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got)
863 }
864 }
865}