|  | package etcd | 
|  |  | 
|  | import ( | 
|  | "context" | 
|  | "errors" | 
|  | "flag" | 
|  | "fmt" | 
|  | "log" | 
|  | "os" | 
|  | "strconv" | 
|  | "sync" | 
|  | "testing" | 
|  | "time" | 
|  |  | 
|  | "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" | 
|  | "go.etcd.io/etcd/client/pkg/v3/testutil" | 
|  | clientv3 "go.etcd.io/etcd/client/v3" | 
|  | "go.etcd.io/etcd/tests/v3/integration" | 
|  | "google.golang.org/grpc/codes" | 
|  |  | 
|  | "source.monogon.dev/metropolis/node/core/consensus/client" | 
|  | "source.monogon.dev/metropolis/pkg/event" | 
|  | ) | 
|  |  | 
|  | var ( | 
|  | cluster   *integration.ClusterV3 | 
|  | endpoints []string | 
|  | ) | 
|  |  | 
|  | // TestMain brings up a 3 node etcd cluster for tests to use. | 
|  | func TestMain(m *testing.M) { | 
|  | cfg := integration.ClusterConfig{ | 
|  | Size:                 3, | 
|  | GRPCKeepAliveMinTime: time.Millisecond, | 
|  | } | 
|  | tb, cancel := testutil.NewTestingTBProthesis("curator") | 
|  | defer cancel() | 
|  | flag.Parse() | 
|  | integration.BeforeTestExternal(tb) | 
|  | cluster = integration.NewClusterV3(tb, &cfg) | 
|  | endpoints = make([]string, 3) | 
|  | for i := range endpoints { | 
|  | endpoints[i] = cluster.Client(i).Endpoints()[0] | 
|  | } | 
|  |  | 
|  | v := m.Run() | 
|  | cluster.Terminate(tb) | 
|  | os.Exit(v) | 
|  | } | 
|  |  | 
|  | // setRaceWg creates a new WaitGroup and sets the given watcher to wait on this | 
|  | // WG after it performs the initial retrieval of a value from etcd, but before | 
|  | // it starts the watcher. This is used to test potential race conditions | 
|  | // present between these two steps. | 
|  | func setRaceWg[T any](w event.Watcher[T]) *sync.WaitGroup { | 
|  | wg := sync.WaitGroup{} | 
|  | w.(*watcher[T]).testRaceWG = &wg | 
|  | return &wg | 
|  | } | 
|  |  | 
|  | // setSetupWg creates a new WaitGroup and sets the given watcher to wait on | 
|  | // thie WG after an etcd watch channel is created. This is used in tests to | 
|  | // ensure that the watcher is fully created before it is tested. | 
|  | func setSetupWg[T any](w event.Watcher[T]) *sync.WaitGroup { | 
|  | wg := sync.WaitGroup{} | 
|  | w.(*watcher[T]).testSetupWG = &wg | 
|  | return &wg | 
|  | } | 
|  |  | 
|  | // testClient is an etcd connection to the test cluster. | 
|  | type testClient struct { | 
|  | client     *clientv3.Client | 
|  | namespaced client.Namespaced | 
|  | } | 
|  |  | 
|  | func newTestClient(t *testing.T) *testClient { | 
|  | t.Helper() | 
|  | cli, err := clientv3.New(clientv3.Config{ | 
|  | Endpoints:            endpoints, | 
|  | DialTimeout:          1 * time.Second, | 
|  | DialKeepAliveTime:    1 * time.Second, | 
|  | DialKeepAliveTimeout: 1 * time.Second, | 
|  | }) | 
|  | if err != nil { | 
|  | t.Fatalf("clientv3.New: %v", err) | 
|  | } | 
|  |  | 
|  | namespaced := client.NewLocal(cli) | 
|  | return &testClient{ | 
|  | client:     cli, | 
|  | namespaced: namespaced, | 
|  | } | 
|  | } | 
|  |  | 
|  | func (d *testClient) close() { | 
|  | d.client.Close() | 
|  | } | 
|  |  | 
|  | // setEndpoints configures which endpoints (from {0,1,2}) the testClient is | 
|  | // connected to. | 
|  | func (d *testClient) setEndpoints(nums ...uint) { | 
|  | var eps []string | 
|  | for _, num := range nums { | 
|  | eps = append(eps, endpoints[num]) | 
|  | } | 
|  | d.client.SetEndpoints(eps...) | 
|  | } | 
|  |  | 
|  | // put uses the testClient to store key with a given string value in etcd. It | 
|  | // contains retry logic that will block until the put is successful. | 
|  | func (d *testClient) put(t *testing.T, key, value string) { | 
|  | t.Helper() | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | defer ctxC() | 
|  |  | 
|  | for { | 
|  | ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond) | 
|  | _, err := d.namespaced.Put(ctxT, key, value) | 
|  | ctxC() | 
|  | if err == nil { | 
|  | return | 
|  | } | 
|  | if err == ctxT.Err() { | 
|  | log.Printf("Retrying after %v", err) | 
|  | continue | 
|  | } | 
|  | // Retry on etcd unavailability - this will happen in this code as the | 
|  | // etcd cluster repeatedly loses quorum. | 
|  | var eerr rpctypes.EtcdError | 
|  | if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable { | 
|  | log.Printf("Retrying after %v", err) | 
|  | continue | 
|  | } | 
|  | t.Fatalf("Put: %v", err) | 
|  | } | 
|  |  | 
|  | } | 
|  |  | 
|  | // remove uses the testClient to remove the given key from etcd. It contains | 
|  | // retry logic that will block until the removal is successful. | 
|  | func (d *testClient) remove(t *testing.T, key string) { | 
|  | t.Helper() | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | defer ctxC() | 
|  |  | 
|  | _, err := d.namespaced.Delete(ctx, key) | 
|  | if err == nil { | 
|  | return | 
|  | } | 
|  | t.Fatalf("Delete: %v", err) | 
|  | } | 
|  |  | 
|  | // expect runs a Get on the given Watcher, ensuring the returned value is a | 
|  | // given string. | 
|  | func expect(t *testing.T, w event.Watcher[StringAt], value string) { | 
|  | t.Helper() | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | defer ctxC() | 
|  |  | 
|  | got, err := w.Get(ctx) | 
|  | if err != nil { | 
|  | t.Fatalf("Get: %v", err) | 
|  | } | 
|  |  | 
|  | if got, want := got.Value, value; got != want { | 
|  | t.Errorf("Wanted value %q, got %q", want, got) | 
|  | } | 
|  | } | 
|  |  | 
|  | // expectTimeout ensures that the given watcher blocks on a Get call for at | 
|  | // least 100 milliseconds. This is used by tests to attempt to verify that the | 
|  | // watcher Get is fully blocked, but can cause false positives (eg. when Get | 
|  | // blocks for 101 milliseconds). Thus, this function should be used sparingly | 
|  | // and in tests that perform other baseline behaviour checks alongside this | 
|  | // test. | 
|  | func expectTimeout[T any](t *testing.T, w event.Watcher[T]) { | 
|  | t.Helper() | 
|  | ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond) | 
|  | got, err := w.Get(ctx) | 
|  | ctxC() | 
|  |  | 
|  | if !errors.Is(err, ctx.Err()) { | 
|  | t.Fatalf("Expected timeout error, got %v, %v", got, err) | 
|  | } | 
|  | } | 
|  |  | 
|  | // wait wraps a watcher into a channel of strings, ensuring that the watcher | 
|  | // never errors on Get calls and always returns strings. | 
|  | func wait(t *testing.T, w event.Watcher[StringAt]) (chan string, func()) { | 
|  | t.Helper() | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  |  | 
|  | c := make(chan string) | 
|  |  | 
|  | go func() { | 
|  | for { | 
|  | got, err := w.Get(ctx) | 
|  | if err != nil && errors.Is(err, ctx.Err()) { | 
|  | return | 
|  | } | 
|  | if err != nil { | 
|  | t.Fatalf("Get: %v", err) | 
|  | } | 
|  | c <- got.Value | 
|  | } | 
|  | }() | 
|  |  | 
|  | return c, ctxC | 
|  | } | 
|  |  | 
|  | // TestSimple exercises the simplest possible interaction with a watched value. | 
|  | func TestSimple(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  |  | 
|  | k := "test-simple" | 
|  | value := NewValue(tc.namespaced, k, DecoderStringAt) | 
|  | tc.put(t, k, "one") | 
|  |  | 
|  | watcher := value.Watch() | 
|  | defer watcher.Close() | 
|  | expect(t, watcher, "one") | 
|  |  | 
|  | tc.put(t, k, "two") | 
|  | expect(t, watcher, "two") | 
|  |  | 
|  | tc.put(t, k, "three") | 
|  | tc.put(t, k, "four") | 
|  | tc.put(t, k, "five") | 
|  | tc.put(t, k, "six") | 
|  |  | 
|  | q, cancel := wait(t, watcher) | 
|  | // Test will hang here if the above value does not receive the set "six". | 
|  | log.Printf("a") | 
|  | for el := range q { | 
|  | log.Printf("%q", el) | 
|  | if el == "six" { | 
|  | break | 
|  | } | 
|  | } | 
|  | log.Printf("b") | 
|  | cancel() | 
|  | } | 
|  |  | 
|  | // stringAtGet performs a Get from a Watcher, expecting a stringAt and updating | 
|  | // the given map with the retrieved value. | 
|  | func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher[StringAt], m map[string]string) { | 
|  | t.Helper() | 
|  |  | 
|  | vr, err := w.Get(ctx) | 
|  | if err != nil { | 
|  | t.Fatalf("Get: %v", err) | 
|  | } | 
|  | m[vr.Key] = vr.Value | 
|  | } | 
|  |  | 
|  | // TestSimpleRange exercises the simplest behaviour of a ranged watcher, | 
|  | // retrieving updaates via Get in a fully blocking fashion. | 
|  | func TestSimpleRange(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  |  | 
|  | ks := "test-simple-range/" | 
|  | ke := "test-simple-range0" | 
|  | value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke)) | 
|  | tc.put(t, ks+"a", "one") | 
|  | tc.put(t, ks+"b", "two") | 
|  | tc.put(t, ks+"c", "three") | 
|  | tc.put(t, ks+"b", "four") | 
|  |  | 
|  | w := value.Watch() | 
|  | defer w.Close() | 
|  |  | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | defer ctxC() | 
|  |  | 
|  | res := make(map[string]string) | 
|  | stringAtGet(ctx, t, w, res) | 
|  | stringAtGet(ctx, t, w, res) | 
|  | stringAtGet(ctx, t, w, res) | 
|  |  | 
|  | tc.put(t, ks+"a", "five") | 
|  | tc.put(t, ks+"e", "six") | 
|  |  | 
|  | stringAtGet(ctx, t, w, res) | 
|  | stringAtGet(ctx, t, w, res) | 
|  |  | 
|  | for _, te := range []struct { | 
|  | k, w string | 
|  | }{ | 
|  | {ks + "a", "five"}, | 
|  | {ks + "b", "four"}, | 
|  | {ks + "c", "three"}, | 
|  | {ks + "e", "six"}, | 
|  | } { | 
|  | if want, got := te.w, res[te.k]; want != got { | 
|  | t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // TestCancel ensures that watchers can resume after being canceled. | 
|  | func TestCancel(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  |  | 
|  | k := "test-cancel" | 
|  | value := NewValue(tc.namespaced, k, DecoderStringAt) | 
|  | tc.put(t, k, "one") | 
|  |  | 
|  | watcher := value.Watch() | 
|  | defer watcher.Close() | 
|  | expect(t, watcher, "one") | 
|  |  | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | errs := make(chan error, 1) | 
|  | go func() { | 
|  | _, err := watcher.Get(ctx) | 
|  | errs <- err | 
|  | }() | 
|  | ctxC() | 
|  | if want, got := ctx.Err(), <-errs; !errors.Is(got, want) { | 
|  | t.Fatalf("Wanted err %v, got %v", want, got) | 
|  | } | 
|  |  | 
|  | // Successfully canceled watch, resuming should continue to work. | 
|  | q, cancel := wait(t, watcher) | 
|  | defer cancel() | 
|  |  | 
|  | tc.put(t, k, "two") | 
|  | if want, got := "two", <-q; want != got { | 
|  | t.Fatalf("Wanted val %q, got %q", want, got) | 
|  | } | 
|  | } | 
|  |  | 
|  | // TestCancelOnGet ensures that a context cancellation on an initial Get (which | 
|  | // translates to an etcd Get in a backoff loop) doesn't block. | 
|  | func TestCancelOnGet(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  |  | 
|  | k := "test-cancel-on-get" | 
|  | value := NewValue(tc.namespaced, k, DecoderStringAt) | 
|  | watcher := value.Watch() | 
|  | tc.put(t, k, "one") | 
|  |  | 
|  | // Cause partition between client endpoint and rest of cluster. Any read/write | 
|  | // operations will now hang. | 
|  | tc.setEndpoints(0) | 
|  | cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2]) | 
|  | // Let raft timeouts expire so that the leader is aware a partition has occurred | 
|  | // and stops serving data if it is not part of a quorum anymore. | 
|  | // | 
|  | // Otherwise, if Member[0] was the leader, there will be a window of opportunity | 
|  | // during which it will continue to serve read data even though it has been | 
|  | // partitioned off. This is an effect of how etcd handles linearizable reads: | 
|  | // they go through the leader, but do not go through raft. | 
|  | // | 
|  | // The value is the default etcd leader timeout (1s) + some wiggle room. | 
|  | time.Sleep(time.Second + time.Millisecond*100) | 
|  |  | 
|  | // Perform the initial Get(), which should attempt to retrieve a KV entry from | 
|  | // the etcd service. This should hang. Unfortunately, there's no easy way to do | 
|  | // this without an arbitrary sleep hoping that the client actually gets to the | 
|  | // underlying etcd.Get call. This can cause false positives (eg. false 'pass' | 
|  | // results) in this test. | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | errs := make(chan error, 1) | 
|  | go func() { | 
|  | _, err := watcher.Get(ctx) | 
|  | errs <- err | 
|  | }() | 
|  | time.Sleep(time.Second) | 
|  |  | 
|  | // Now that the etcd.Get is hanging, cancel the context. | 
|  | ctxC() | 
|  | // And now unpartition the cluster, resuming reads. | 
|  | cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2]) | 
|  |  | 
|  | // The etcd.Get() call should've returned with a context cancellation. | 
|  | err := <-errs | 
|  | switch { | 
|  | case err == nil: | 
|  | t.Errorf("watcher.Get() returned no error, wanted context error") | 
|  | case errors.Is(err, ctx.Err()): | 
|  | // Okay. | 
|  | default: | 
|  | t.Errorf("watcher.Get() returned %v, wanted context error", err) | 
|  | } | 
|  | } | 
|  |  | 
|  | // TestClientReconnect forces a 'reconnection' of an active watcher from a | 
|  | // running member to another member, by stopping the original member and | 
|  | // explicitly reconnecting the client to other available members. | 
|  | // | 
|  | // This doe not reflect a situation expected during Metropolis runtime, as we | 
|  | // do not expect splits between an etcd client and its connected member | 
|  | // (instead, all etcd clients only connect to their local member). However, it | 
|  | // is still an important safety test to perform, and it also exercies the | 
|  | // equivalent behaviour of an etcd client re-connecting for any other reason. | 
|  | func TestClientReconnect(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  | tc.setEndpoints(0) | 
|  |  | 
|  | k := "test-client-reconnect" | 
|  | value := NewValue(tc.namespaced, k, DecoderStringAt) | 
|  | tc.put(t, k, "one") | 
|  |  | 
|  | watcher := value.Watch() | 
|  | defer watcher.Close() | 
|  | expect(t, watcher, "one") | 
|  |  | 
|  | q, cancel := wait(t, watcher) | 
|  | defer cancel() | 
|  |  | 
|  | cluster.Members[0].Stop(t) | 
|  | defer cluster.Members[0].Restart(t) | 
|  | cluster.WaitLeader(t) | 
|  |  | 
|  | tc.setEndpoints(1, 2) | 
|  | tc.put(t, k, "two") | 
|  |  | 
|  | if want, got := "two", <-q; want != got { | 
|  | t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got) | 
|  | } | 
|  | } | 
|  |  | 
|  | // TestClientPartition forces a temporary partition of the etcd member while a | 
|  | // watcher is running, updates the value from across the partition, and undoes | 
|  | // the partition. | 
|  | // The partition is expected to be entirely transparent to the watcher. | 
|  | func TestClientPartition(t *testing.T) { | 
|  | tcOne := newTestClient(t) | 
|  | defer tcOne.close() | 
|  | tcOne.setEndpoints(0) | 
|  |  | 
|  | tcRest := newTestClient(t) | 
|  | defer tcRest.close() | 
|  | tcRest.setEndpoints(1, 2) | 
|  |  | 
|  | k := "test-client-partition" | 
|  | valueOne := NewValue(tcOne.namespaced, k, DecoderStringAt) | 
|  | watcherOne := valueOne.Watch() | 
|  | defer watcherOne.Close() | 
|  | valueRest := NewValue(tcRest.namespaced, k, DecoderStringAt) | 
|  | watcherRest := valueRest.Watch() | 
|  | defer watcherRest.Close() | 
|  |  | 
|  | tcRest.put(t, k, "a") | 
|  | expect(t, watcherOne, "a") | 
|  | expect(t, watcherRest, "a") | 
|  |  | 
|  | cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2]) | 
|  |  | 
|  | tcRest.put(t, k, "b") | 
|  | expect(t, watcherRest, "b") | 
|  | expectTimeout(t, watcherOne) | 
|  |  | 
|  | cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2]) | 
|  |  | 
|  | expect(t, watcherOne, "b") | 
|  | tcRest.put(t, k, "c") | 
|  | expect(t, watcherOne, "c") | 
|  | expect(t, watcherRest, "c") | 
|  |  | 
|  | } | 
|  |  | 
|  | // TestEarlyUse exercises the correct behaviour of the value watcher on a value | 
|  | // that is not yet set. | 
|  | func TestEarlyUse(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  |  | 
|  | k := "test-early-use" | 
|  |  | 
|  | value := NewValue(tc.namespaced, k, DecoderStringAt) | 
|  | watcher := value.Watch() | 
|  | defer watcher.Close() | 
|  |  | 
|  | wg := setSetupWg(watcher) | 
|  | wg.Add(1) | 
|  | q, cancel := wait(t, watcher) | 
|  | defer cancel() | 
|  |  | 
|  | wg.Done() | 
|  |  | 
|  | tc.put(t, k, "one") | 
|  |  | 
|  | if want, got := "one", <-q; want != got { | 
|  | t.Fatalf("Expected %q, got %q", want, got) | 
|  | } | 
|  | } | 
|  |  | 
|  | // TestRemove exercises the basic functionality of handling deleted values. | 
|  | func TestRemove(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  |  | 
|  | k := "test-remove" | 
|  | tc.put(t, k, "one") | 
|  |  | 
|  | value := NewValue(tc.namespaced, k, DecoderStringAt) | 
|  | watcher := value.Watch() | 
|  | defer watcher.Close() | 
|  |  | 
|  | expect(t, watcher, "one") | 
|  | tc.remove(t, k) | 
|  | expect(t, watcher, "") | 
|  | } | 
|  |  | 
|  | // TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a | 
|  | // value is removed. | 
|  | func TestRemoveRange(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  |  | 
|  | ks := "test-remove-range/" | 
|  | ke := "test-remove-range0" | 
|  | value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke)) | 
|  | tc.put(t, ks+"a", "one") | 
|  | tc.put(t, ks+"b", "two") | 
|  | tc.put(t, ks+"c", "three") | 
|  | tc.put(t, ks+"b", "four") | 
|  | tc.remove(t, ks+"c") | 
|  |  | 
|  | w := value.Watch() | 
|  | defer w.Close() | 
|  |  | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | defer ctxC() | 
|  |  | 
|  | res := make(map[string]string) | 
|  | stringAtGet(ctx, t, w, res) | 
|  | stringAtGet(ctx, t, w, res) | 
|  |  | 
|  | for _, te := range []struct { | 
|  | k, w string | 
|  | }{ | 
|  | {ks + "a", "one"}, | 
|  | {ks + "b", "four"}, | 
|  | {ks + "c", ""}, | 
|  | } { | 
|  | if want, got := te.w, res[te.k]; want != got { | 
|  | t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // TestEmptyRace forces the watcher to retrieve an empty value from the K/V | 
|  | // store at first, and establishing the watch channel after a new value has | 
|  | // been stored in the same place. | 
|  | func TestEmptyRace(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  |  | 
|  | k := "test-remove-race" | 
|  | tc.put(t, k, "one") | 
|  | tc.remove(t, k) | 
|  |  | 
|  | value := NewValue(tc.namespaced, k, DecoderStringAt) | 
|  | watcher := value.Watch() | 
|  | defer watcher.Close() | 
|  |  | 
|  | wg := setRaceWg(watcher) | 
|  | wg.Add(1) | 
|  | q, cancel := wait(t, watcher) | 
|  | defer cancel() | 
|  |  | 
|  | tc.put(t, k, "two") | 
|  | wg.Done() | 
|  |  | 
|  | if want, got := "two", <-q; want != got { | 
|  | t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got) | 
|  | } | 
|  | } | 
|  |  | 
|  | type errOrInt struct { | 
|  | val int64 | 
|  | err error | 
|  | } | 
|  |  | 
|  | // TestDecoder exercises the BytesDecoder functionality of the watcher, by | 
|  | // creating a value with a decoder that only accepts string-encoded integers | 
|  | // that are divisible by three. The test then proceeds to put a handful of | 
|  | // values into etcd, ensuring that undecodable values correctly return an error | 
|  | // on Get, but that the watcher continues to work after the error has been | 
|  | // returned. | 
|  | func TestDecoder(t *testing.T) { | 
|  | decoderDivisibleByThree := func(_, value []byte) (int64, error) { | 
|  | num, err := strconv.ParseInt(string(value), 10, 64) | 
|  | if err != nil { | 
|  | return 0, fmt.Errorf("not a valid number") | 
|  | } | 
|  | if (num % 3) != 0 { | 
|  | return 0, fmt.Errorf("not divisible by 3") | 
|  | } | 
|  | return num, nil | 
|  | } | 
|  |  | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  |  | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | defer ctxC() | 
|  |  | 
|  | k := "test-decoder" | 
|  | value := NewValue(tc.namespaced, k, decoderDivisibleByThree) | 
|  | watcher := value.Watch() | 
|  | defer watcher.Close() | 
|  | tc.put(t, k, "3") | 
|  | _, err := watcher.Get(ctx) | 
|  | if err != nil { | 
|  | t.Fatalf("Initial Get: %v", err) | 
|  | } | 
|  |  | 
|  | // Stream updates into arbitrarily-bounded test channel. | 
|  | queue := make(chan errOrInt, 100) | 
|  | go func() { | 
|  | for { | 
|  | res, err := watcher.Get(ctx) | 
|  | if err != nil && errors.Is(err, ctx.Err()) { | 
|  | return | 
|  | } | 
|  | if err != nil { | 
|  | queue <- errOrInt{ | 
|  | err: err, | 
|  | } | 
|  | } else { | 
|  | queue <- errOrInt{ | 
|  | val: res, | 
|  | } | 
|  | } | 
|  | } | 
|  | }() | 
|  |  | 
|  | var wantList []*int64 | 
|  | wantError := func(val string) { | 
|  | wantList = append(wantList, nil) | 
|  | tc.put(t, k, val) | 
|  | } | 
|  | wantValue := func(val string, decoded int64) { | 
|  | wantList = append(wantList, &decoded) | 
|  | tc.put(t, k, val) | 
|  | } | 
|  |  | 
|  | wantError("") | 
|  | wantValue("9", 9) | 
|  | wantError("foo") | 
|  | wantValue("18", 18) | 
|  | wantError("10") | 
|  | wantValue("27", 27) | 
|  | wantValue("36", 36) | 
|  |  | 
|  | for i, want := range wantList { | 
|  | q := <-queue | 
|  | if want == nil && q.err == nil { | 
|  | t.Fatalf("Case %d: wanted error, got no error and value %d", i, q.val) | 
|  | } | 
|  | if want != nil && (*want) != q.val { | 
|  | t.Fatalf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // TestBacklog ensures that the watcher can handle a large backlog of changes | 
|  | // in etcd that the client didnt' keep up with, and that whatever final state | 
|  | // is available to the client when it actually gets to calling Get(). | 
|  | func TestBacklog(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  |  | 
|  | k := "test-backlog" | 
|  | value := NewValue(tc.namespaced, k, DecoderStringAt) | 
|  | watcher := value.Watch() | 
|  | defer watcher.Close() | 
|  |  | 
|  | tc.put(t, k, "initial") | 
|  | expect(t, watcher, "initial") | 
|  |  | 
|  | for i := 0; i < 1000; i++ { | 
|  | tc.put(t, k, fmt.Sprintf("val-%d", i)) | 
|  | } | 
|  |  | 
|  | ctx, ctxC := context.WithTimeout(context.Background(), time.Second) | 
|  | defer ctxC() | 
|  | for { | 
|  | valB, err := watcher.Get(ctx) | 
|  | if err != nil { | 
|  | t.Fatalf("Get() returned error before expected final value: %v", err) | 
|  | } | 
|  | if valB.Value == "val-999" { | 
|  | break | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // TestBacklogRange ensures that the ranged etcd watcher can handle a large | 
|  | // backlog of changes in etcd that the client didn't keep up with. | 
|  | func TestBacklogRange(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  |  | 
|  | ks := "test-backlog-range/" | 
|  | ke := "test-backlog-range0" | 
|  | value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke)) | 
|  | w := value.Watch() | 
|  | defer w.Close() | 
|  |  | 
|  | for i := 0; i < 100; i++ { | 
|  | if i%2 == 0 { | 
|  | tc.put(t, ks+"a", fmt.Sprintf("val-%d", i)) | 
|  | } else { | 
|  | tc.put(t, ks+"b", fmt.Sprintf("val-%d", i)) | 
|  | } | 
|  | } | 
|  |  | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | defer ctxC() | 
|  |  | 
|  | res := make(map[string]string) | 
|  | stringAtGet(ctx, t, w, res) | 
|  | stringAtGet(ctx, t, w, res) | 
|  |  | 
|  | for _, te := range []struct { | 
|  | k, w string | 
|  | }{ | 
|  | {ks + "a", "val-98"}, | 
|  | {ks + "b", "val-99"}, | 
|  | } { | 
|  | if want, got := te.w, res[te.k]; want != got { | 
|  | t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers, | 
|  | // which effectively makes any Get operation non-blocking (but also showcases | 
|  | // that unless a Get without BacklogOnly is issues, no new data will appear by | 
|  | // itself in the watcher - which is an undocumented implementation detail of the | 
|  | // option). | 
|  | func TestBacklogOnly(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | defer ctxC() | 
|  |  | 
|  | k := "test-backlog-only" | 
|  | tc.put(t, k, "initial") | 
|  |  | 
|  | value := NewValue(tc.namespaced, k, DecoderStringAt) | 
|  | watcher := value.Watch() | 
|  | defer watcher.Close() | 
|  |  | 
|  | d, err := watcher.Get(ctx, event.BacklogOnly[StringAt]()) | 
|  | if err != nil { | 
|  | t.Fatalf("First Get failed: %v", err) | 
|  | } | 
|  | if want, got := "initial", d.Value; want != got { | 
|  | t.Fatalf("First Get: wanted value %q, got %q", want, got) | 
|  | } | 
|  |  | 
|  | // As expected, next call to Get with BacklogOnly fails - there truly is no new | 
|  | // updates to emit. | 
|  | _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]()) | 
|  | if want, got := event.BacklogDone, err; want != got { | 
|  | t.Fatalf("Second Get: wanted %v, got %v", want, got) | 
|  | } | 
|  |  | 
|  | // Implementation detail: even though there is a new value ('second'), | 
|  | // BacklogOnly will still return BacklogDone. | 
|  | tc.put(t, k, "second") | 
|  | _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]()) | 
|  | if want, got := event.BacklogDone, err; want != got { | 
|  | t.Fatalf("Third Get: wanted %v, got %v", want, got) | 
|  | } | 
|  |  | 
|  | // ... However, a Get  without BacklogOnly will return the new value. | 
|  | d, err = watcher.Get(ctx) | 
|  | if err != nil { | 
|  | t.Fatalf("Fourth Get failed: %v", err) | 
|  | } | 
|  | if want, got := "second", d.Value; want != got { | 
|  | t.Fatalf("Fourth Get: wanted value %q, got %q", want, got) | 
|  | } | 
|  | } | 
|  |  | 
|  | // TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers, | 
|  | // showcasing how it expected to be used for keeping up with the external state | 
|  | // of a range by synchronizing to a local map. | 
|  | func TestBacklogOnlyRange(t *testing.T) { | 
|  | tc := newTestClient(t) | 
|  | defer tc.close() | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | defer ctxC() | 
|  |  | 
|  | ks := "test-backlog-only-range/" | 
|  | ke := "test-backlog-only-range0" | 
|  |  | 
|  | for i := 0; i < 100; i++ { | 
|  | if i%2 == 0 { | 
|  | tc.put(t, ks+"a", fmt.Sprintf("val-%d", i)) | 
|  | } else { | 
|  | tc.put(t, ks+"b", fmt.Sprintf("val-%d", i)) | 
|  | } | 
|  | } | 
|  |  | 
|  | value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke)) | 
|  | w := value.Watch() | 
|  | defer w.Close() | 
|  |  | 
|  | // Collect results into a map from key to value. | 
|  | res := make(map[string]string) | 
|  |  | 
|  | // Run first Get - this is the barrier defining what's part of the backlog. | 
|  | g, err := w.Get(ctx, event.BacklogOnly[StringAt]()) | 
|  | if err != nil { | 
|  | t.Fatalf("Get: %v", err) | 
|  | } | 
|  | res[g.Key] = g.Value | 
|  |  | 
|  | // These won't be part of the backlog. | 
|  | tc.put(t, ks+"a", fmt.Sprintf("val-100")) | 
|  | tc.put(t, ks+"b", fmt.Sprintf("val-101")) | 
|  |  | 
|  | // Retrieve the rest of the backlog until BacklogDone is returned. | 
|  | nUpdates := 1 | 
|  | for { | 
|  | g, err := w.Get(ctx, event.BacklogOnly[StringAt]()) | 
|  | if err == event.BacklogDone { | 
|  | break | 
|  | } | 
|  | if err != nil { | 
|  | t.Fatalf("Get: %v", err) | 
|  | } | 
|  | nUpdates += 1 | 
|  | res[g.Key] = g.Value | 
|  | } | 
|  |  | 
|  | // The backlog should've been compacted to just two entries at their newest | 
|  | // state. | 
|  | if want, got := 2, nUpdates; want != got { | 
|  | t.Fatalf("wanted backlog in %d updates, got it in %d", want, got) | 
|  | } | 
|  |  | 
|  | for _, te := range []struct { | 
|  | k, w string | 
|  | }{ | 
|  | {ks + "a", "val-98"}, | 
|  | {ks + "b", "val-99"}, | 
|  | } { | 
|  | if want, got := te.w, res[te.k]; want != got { | 
|  | t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got) | 
|  | } | 
|  | } | 
|  | } |