blob: 79d9875c784b8a531154f25f41272abcb54df0f7 [file] [log] [blame]
Serge Bazanskic89df2f2021-04-27 15:51:37 +02001package etcd
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log"
8 "os"
9 "strconv"
10 "sync"
11 "testing"
12 "time"
13
14 "go.etcd.io/etcd/clientv3"
15 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
16 "go.etcd.io/etcd/integration"
17 "google.golang.org/grpc/codes"
18
19 "source.monogon.dev/metropolis/node/core/consensus/client"
20 "source.monogon.dev/metropolis/pkg/event"
21)
22
23var (
24 cluster *integration.ClusterV3
25 endpoints []string
26)
27
28// TestMain brings up a 3 node etcd cluster for tests to use.
29func TestMain(m *testing.M) {
30 cfg := integration.ClusterConfig{
31 Size: 3,
32 GRPCKeepAliveMinTime: time.Millisecond,
33 }
34 cluster = integration.NewClusterV3(nil, &cfg)
35 endpoints = make([]string, 3)
36 for i := range endpoints {
37 endpoints[i] = cluster.Client(i).Endpoints()[0]
38 }
39
40 v := m.Run()
41 cluster.Terminate(nil)
42 os.Exit(v)
43}
44
45// setRaceWg creates a new WaitGroup and sets the given watcher to wait on this
46// WG after it performs the initial retrieval of a value from etcd, but before
47// it starts the watcher. This is used to test potential race conditions
48// present between these two steps.
49func setRaceWg(w event.Watcher) *sync.WaitGroup {
50 wg := sync.WaitGroup{}
51 w.(*watcher).testRaceWG = &wg
52 return &wg
53}
54
55// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
56// thie WG after an etcd watch channel is created. This is used in tests to
57// ensure that the watcher is fully created before it is tested.
58func setSetupWg(w event.Watcher) *sync.WaitGroup {
59 wg := sync.WaitGroup{}
60 w.(*watcher).testSetupWG = &wg
61 return &wg
62}
63
64// testClient is an etcd connection to the test cluster.
65type testClient struct {
66 client *clientv3.Client
67 namespaced client.Namespaced
68}
69
70func newTestClient(t *testing.T) *testClient {
71 t.Helper()
72 cli, err := clientv3.New(clientv3.Config{
73 Endpoints: endpoints,
74 DialTimeout: 1 * time.Second,
75 DialKeepAliveTime: 1 * time.Second,
76 DialKeepAliveTimeout: 1 * time.Second,
77 })
78 if err != nil {
79 t.Fatalf("clientv3.New: %v", err)
80 }
81
82 namespaced := client.NewLocal(cli)
83 return &testClient{
84 client: cli,
85 namespaced: namespaced,
86 }
87}
88
89func (d *testClient) close() {
90 d.client.Close()
91}
92
93// setEndpoints configures which endpoints (from {0,1,2}) the testClient is
94// connected to.
95func (d *testClient) setEndpoints(nums ...uint) {
96 var eps []string
97 for _, num := range nums {
98 eps = append(eps, endpoints[num])
99 }
100 d.client.SetEndpoints(eps...)
101}
102
103// put uses the testClient to store key with a given string value in etcd. It
104// contains retry logic that will block until the put is successful.
105func (d *testClient) put(t *testing.T, key, value string) {
106 t.Helper()
107 ctx, ctxC := context.WithCancel(context.Background())
108 defer ctxC()
109
110 for {
111 ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond)
112 _, err := d.namespaced.Put(ctxT, key, value)
113 ctxC()
114 if err == nil {
115 return
116 }
117 if err == ctxT.Err() {
118 log.Printf("Retrying after %v", err)
119 continue
120 }
121 // Retry on etcd unavailability - this will happen in this code as the
122 // etcd cluster repeatedly loses quorum.
123 var eerr rpctypes.EtcdError
124 if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable {
125 log.Printf("Retrying after %v", err)
126 continue
127 }
128 t.Fatalf("Put: %v", err)
129 }
130
131}
132
133// remove uses the testClient to remove the given key from etcd. It contains
134// retry logic that will block until the removal is successful.
135func (d *testClient) remove(t *testing.T, key string) {
136 t.Helper()
137 ctx, ctxC := context.WithCancel(context.Background())
138 defer ctxC()
139
140 _, err := d.namespaced.Delete(ctx, key)
141 if err == nil {
142 return
143 }
144 t.Fatalf("Delete: %v", err)
145}
146
147// expect runs a Get on the given Watcher, ensuring the returned value is a
148// given string.
149func expect(t *testing.T, w event.Watcher, value string) {
150 t.Helper()
151 ctx, ctxC := context.WithCancel(context.Background())
152 defer ctxC()
153
154 got, err := w.Get(ctx)
155 if err != nil {
156 t.Fatalf("Get: %v", err)
157 }
158
159 if got, want := string(got.([]byte)), value; got != want {
160 t.Errorf("Got value %q, wanted %q", want, got)
161 }
162}
163
164// expectTimeout ensures that the given watcher blocks on a Get call for at
165// least 100 milliseconds. This is used by tests to attempt to verify that the
166// watcher Get is fully blocked, but can cause false positives (eg. when Get
167// blocks for 101 milliseconds). Thus, this function should be used sparingly
168// and in tests that perform other baseline behaviour checks alongside this
169// test.
170func expectTimeout(t *testing.T, w event.Watcher) {
171 t.Helper()
172 ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
173 got, err := w.Get(ctx)
174 ctxC()
175
176 if !errors.Is(err, ctx.Err()) {
177 t.Fatalf("Expected timeout error, got %v, %v", got, err)
178 }
179}
180
181// wait wraps a watcher into a channel of strings, ensuring that the watcher
182// never errors on Get calls and always returns strings.
183func wait(t *testing.T, w event.Watcher) (chan string, func()) {
184 t.Helper()
185 ctx, ctxC := context.WithCancel(context.Background())
186
187 c := make(chan string)
188
189 go func() {
190 for {
191 got, err := w.Get(ctx)
192 if err != nil && errors.Is(err, ctx.Err()) {
193 return
194 }
195 if err != nil {
196 t.Fatalf("Get: %v", err)
197 }
198 c <- string(got.([]byte))
199 }
200 }()
201
202 return c, ctxC
203}
204
205// TestSimple exercises the simplest possible interaction with a watched value.
206func TestSimple(t *testing.T) {
207 tc := newTestClient(t)
208 defer tc.close()
209
210 k := "test-simple"
211 value := NewValue(tc.namespaced, k, NoDecoder)
212 tc.put(t, k, "one")
213
214 watcher := value.Watch()
215 defer watcher.Close()
216 expect(t, watcher, "one")
217
218 tc.put(t, k, "two")
219 expect(t, watcher, "two")
220
221 tc.put(t, k, "three")
222 tc.put(t, k, "four")
223 tc.put(t, k, "five")
224 tc.put(t, k, "six")
225
226 q, cancel := wait(t, watcher)
227 // Test will hang here if the above value does not receive the set "six".
228 for el := range q {
229 if el == "six" {
230 break
231 }
232 }
233 cancel()
234}
235
236// TestCancel ensures that watchers can resume after being canceled.
237func TestCancel(t *testing.T) {
238 tc := newTestClient(t)
239 defer tc.close()
240
241 k := "test-cancel"
242 value := NewValue(tc.namespaced, k, NoDecoder)
243 tc.put(t, k, "one")
244
245 watcher := value.Watch()
246 defer watcher.Close()
247 expect(t, watcher, "one")
248
249 ctx, ctxC := context.WithCancel(context.Background())
250 errs := make(chan error, 1)
251 go func() {
252 _, err := watcher.Get(ctx)
253 errs <- err
254 }()
255 ctxC()
256 if want, got := ctx.Err(), <-errs; !errors.Is(got, want) {
257 t.Fatalf("Wanted err %v, got %v", want, got)
258 }
259
260 // Successfully canceled watch, resuming should continue to work.
261 q, cancel := wait(t, watcher)
262 defer cancel()
263
264 tc.put(t, k, "two")
265 if want, got := "two", <-q; want != got {
266 t.Fatalf("Wanted val %q, got %q", want, got)
267 }
268}
269
270// TestCancelOnGet ensures that a context cancellation on an initial Get (which
271// translates to an etcd Get in a backoff loop) doesn't block.
272func TestCancelOnGet(t *testing.T) {
273 tc := newTestClient(t)
274 defer tc.close()
275
276 k := "test-cancel-on-get"
277 value := NewValue(tc.namespaced, k, NoDecoder)
278 watcher := value.Watch()
279 tc.put(t, k, "one")
280
281 // Cause partition between client endpoint and rest of cluster. Any read/write
282 // operations will now hang.
283 tc.setEndpoints(0)
284 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
285
286 // Perform the initial Get(), which should attempt to retrieve a KV entry from
287 // the etcd service. This should hang. Unfortunately, there's no easy way to do
288 // this without an arbitrary sleep hoping that the client actually gets to the
289 // underlying etcd.Get call. This can cause false positives (eg. false 'pass'
290 // results) in this test.
291 ctx, ctxC := context.WithCancel(context.Background())
292 errs := make(chan error, 1)
293 go func() {
294 _, err := watcher.Get(ctx)
295 errs <- err
296 }()
297 time.Sleep(time.Second)
298
299 // Now that the etcd.Get is hanging, cancel the context.
300 ctxC()
301 // And now unpartition the cluster, resuming reads.
302 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
303
304 // The etcd.Get() call should've returned with a context cancellation.
305 err := <-errs
306 switch {
307 case err == nil:
308 t.Errorf("watcher.Get() returned no error, wanted context error")
309 case errors.Is(err, ctx.Err()):
310 // Okay.
311 default:
312 t.Errorf("watcher.Get() returned %v, wanted context error", err)
313 }
314}
315
316// TestClientReconnect forces a 'reconnection' of an active watcher from a
317// running member to another member, by stopping the original member and
318// explicitly reconnecting the client to other available members.
319//
320// This doe not reflect a situation expected during Metropolis runtime, as we
321// do not expect splits between an etcd client and its connected member
322// (instead, all etcd clients only connect to their local member). However, it
323// is still an important safety test to perform, and it also exercies the
324// equivalent behaviour of an etcd client re-connecting for any other reason.
325func TestClientReconnect(t *testing.T) {
326 tc := newTestClient(t)
327 defer tc.close()
328 tc.setEndpoints(0)
329
330 k := "test-client-reconnect"
331 value := NewValue(tc.namespaced, k, NoDecoder)
332 tc.put(t, k, "one")
333
334 watcher := value.Watch()
335 defer watcher.Close()
336 expect(t, watcher, "one")
337
338 q, cancel := wait(t, watcher)
339 defer cancel()
340
341 cluster.Members[0].Stop(t)
342 defer cluster.Members[0].Restart(t)
343 cluster.WaitLeader(t)
344
345 tc.setEndpoints(1, 2)
346 tc.put(t, k, "two")
347
348 if want, got := "two", <-q; want != got {
349 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
350 }
351}
352
353// TestClientPartition forces a temporary partition of the etcd member while a
354// watcher is running, updates the value from across the partition, and undoes
355// the partition.
356// The partition is expected to be entirely transparent to the watcher.
357func TestClientPartition(t *testing.T) {
358 tcOne := newTestClient(t)
359 defer tcOne.close()
360 tcOne.setEndpoints(0)
361
362 tcRest := newTestClient(t)
363 defer tcRest.close()
364 tcRest.setEndpoints(1, 2)
365
366 k := "test-client-partition"
367 valueOne := NewValue(tcOne.namespaced, k, NoDecoder)
368 watcherOne := valueOne.Watch()
369 defer watcherOne.Close()
370 valueRest := NewValue(tcRest.namespaced, k, NoDecoder)
371 watcherRest := valueRest.Watch()
372 defer watcherRest.Close()
373
374 tcRest.put(t, k, "a")
375 expect(t, watcherOne, "a")
376 expect(t, watcherRest, "a")
377
378 cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2])
379
380 tcRest.put(t, k, "b")
381 expect(t, watcherRest, "b")
382 expectTimeout(t, watcherOne)
383
384 cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2])
385
386 expect(t, watcherOne, "b")
387 tcRest.put(t, k, "c")
388 expect(t, watcherOne, "c")
389 expect(t, watcherRest, "c")
390
391}
392
393// TestEarlyUse exercises the correct behaviour of the value watcher on a value
394// that is not yet set.
395func TestEarlyUse(t *testing.T) {
396 tc := newTestClient(t)
397 defer tc.close()
398
399 k := "test-early-use"
400
401 value := NewValue(tc.namespaced, k, NoDecoder)
402 watcher := value.Watch()
403 defer watcher.Close()
404
405 wg := setSetupWg(watcher)
406 wg.Add(1)
407 q, cancel := wait(t, watcher)
408 defer cancel()
409
410 wg.Done()
411
412 tc.put(t, k, "one")
413
414 if want, got := "one", <-q; want != got {
415 t.Fatalf("Expected %q, got %q", want, got)
416 }
417}
418
419// TestRemove exercises the basic functionality of handling deleted values.
420func TestRemove(t *testing.T) {
421 tc := newTestClient(t)
422 defer tc.close()
423
424 k := "test-remove"
425 tc.put(t, k, "one")
426
427 value := NewValue(tc.namespaced, k, NoDecoder)
428 watcher := value.Watch()
429 defer watcher.Close()
430
431 expect(t, watcher, "one")
432 tc.remove(t, k)
433 expect(t, watcher, "")
434}
435
436// TestEmptyRace forces the watcher to retrieve an empty value from the K/V
437// store at first, and establishing the watch channel after a new value has
438// been stored in the same place.
439func TestEmptyRace(t *testing.T) {
440 tc := newTestClient(t)
441 defer tc.close()
442
443 k := "test-remove-race"
444 tc.put(t, k, "one")
445 tc.remove(t, k)
446
447 value := NewValue(tc.namespaced, k, NoDecoder)
448 watcher := value.Watch()
449 defer watcher.Close()
450
451 wg := setRaceWg(watcher)
452 wg.Add(1)
453 q, cancel := wait(t, watcher)
454 defer cancel()
455
456 tc.put(t, k, "two")
457 wg.Done()
458
459 if want, got := "two", <-q; want != got {
460 t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got)
461 }
462}
463
464type errOrInt struct {
465 val int64
466 err error
467}
468
469// TestDecoder exercises the BytesDecoder functionality of the watcher, by
470// creating a value with a decoder that only accepts string-encoded integers
471// that are divisible by three. The test then proceeds to put a handful of
472// values into etcd, ensuring that undecodable values correctly return an error
473// on Get, but that the watcher continues to work after the error has been
474// returned.
475func TestDecoder(t *testing.T) {
476 decodeStringifiedNumbersDivisibleBy3 := func(data []byte) (interface{}, error) {
477 num, err := strconv.ParseInt(string(data), 10, 64)
478 if err != nil {
479 return nil, fmt.Errorf("not a valid number")
480 }
481 if (num % 3) != 0 {
482 return nil, fmt.Errorf("not divisible by 3")
483 }
484 return num, nil
485 }
486
487 tc := newTestClient(t)
488 defer tc.close()
489
490 ctx, ctxC := context.WithCancel(context.Background())
491 defer ctxC()
492
493 k := "test-decoder"
494 value := NewValue(tc.namespaced, k, decodeStringifiedNumbersDivisibleBy3)
495 watcher := value.Watch()
496 defer watcher.Close()
497 tc.put(t, k, "3")
498 _, err := watcher.Get(ctx)
499 if err != nil {
500 t.Fatalf("Initial Get: %v", err)
501 }
502
503 // Stream updates into arbitrarily-bounded test channel.
504 queue := make(chan errOrInt, 100)
505 go func() {
506 for {
507 res, err := watcher.Get(ctx)
508 if err != nil && errors.Is(err, ctx.Err()) {
509 return
510 }
511 if err != nil {
512 queue <- errOrInt{
513 err: err,
514 }
515 } else {
516 queue <- errOrInt{
517 val: res.(int64),
518 }
519 }
520 }
521 }()
522
523 var wantList []*int64
524 wantError := func(val string) {
525 wantList = append(wantList, nil)
526 tc.put(t, k, val)
527 }
528 wantValue := func(val string, decoded int64) {
529 wantList = append(wantList, &decoded)
530 tc.put(t, k, val)
531 }
532
533 wantError("")
534 wantValue("9", 9)
535 wantError("foo")
536 wantValue("18", 18)
537 wantError("10")
538 wantError("10")
539 wantValue("27", 27)
540 wantValue("36", 36)
541
542 for i, want := range wantList {
543 q := <-queue
544 if want == nil && q.err == nil {
545 t.Errorf("Case %d: wanted error, got no error and value %d", i, q.val)
546 }
547 if want != nil && (*want) != q.val {
548 t.Errorf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
549 }
550 }
551}
552
553// TestBacklog ensures that the watcher can handle a large backlog of changes
554// in etcd that the client didnt' keep up with, and that whatever final state
555// is available to the client when it actually gets to calling Get().
556func TestBacklog(t *testing.T) {
557 tc := newTestClient(t)
558 defer tc.close()
559
560 k := "test-backlog"
561 value := NewValue(tc.namespaced, k, NoDecoder)
562 watcher := value.Watch()
563 defer watcher.Close()
564
565 tc.put(t, k, "initial")
566 expect(t, watcher, "initial")
567
568 for i := 0; i < 1000; i++ {
569 tc.put(t, k, fmt.Sprintf("val-%d", i))
570 }
571
572 ctx, ctxC := context.WithTimeout(context.Background(), time.Second)
573 defer ctxC()
574 for {
575 valB, err := watcher.Get(ctx)
576 if err != nil {
577 t.Fatalf("Get() returned error before expected final value: %v", err)
578 }
579 val := string(valB.([]byte))
580 if val == "val-999" {
581 break
582 }
583 }
584}