| package etcd |
| |
| import ( |
| "context" |
| "errors" |
| "flag" |
| "fmt" |
| "log" |
| "os" |
| "strconv" |
| "sync" |
| "testing" |
| "time" |
| |
| "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" |
| "go.etcd.io/etcd/client/pkg/v3/testutil" |
| clientv3 "go.etcd.io/etcd/client/v3" |
| "go.etcd.io/etcd/tests/v3/integration" |
| "google.golang.org/grpc/codes" |
| |
| "source.monogon.dev/metropolis/node/core/consensus/client" |
| "source.monogon.dev/metropolis/pkg/event" |
| ) |
| |
| var ( |
| cluster *integration.ClusterV3 |
| endpoints []string |
| ) |
| |
| // TestMain brings up a 3 node etcd cluster for tests to use. |
| func TestMain(m *testing.M) { |
| cfg := integration.ClusterConfig{ |
| Size: 3, |
| GRPCKeepAliveMinTime: time.Millisecond, |
| } |
| tb, cancel := testutil.NewTestingTBProthesis("curator") |
| defer cancel() |
| flag.Parse() |
| integration.BeforeTestExternal(tb) |
| cluster = integration.NewClusterV3(tb, &cfg) |
| endpoints = make([]string, 3) |
| for i := range endpoints { |
| endpoints[i] = cluster.Client(i).Endpoints()[0] |
| } |
| |
| v := m.Run() |
| cluster.Terminate(tb) |
| os.Exit(v) |
| } |
| |
| // setRaceWg creates a new WaitGroup and sets the given watcher to wait on this |
| // WG after it performs the initial retrieval of a value from etcd, but before |
| // it starts the watcher. This is used to test potential race conditions |
| // present between these two steps. |
| func setRaceWg(w event.Watcher) *sync.WaitGroup { |
| wg := sync.WaitGroup{} |
| w.(*watcher).testRaceWG = &wg |
| return &wg |
| } |
| |
| // setSetupWg creates a new WaitGroup and sets the given watcher to wait on |
| // thie WG after an etcd watch channel is created. This is used in tests to |
| // ensure that the watcher is fully created before it is tested. |
| func setSetupWg(w event.Watcher) *sync.WaitGroup { |
| wg := sync.WaitGroup{} |
| w.(*watcher).testSetupWG = &wg |
| return &wg |
| } |
| |
| // testClient is an etcd connection to the test cluster. |
| type testClient struct { |
| client *clientv3.Client |
| namespaced client.Namespaced |
| } |
| |
| func newTestClient(t *testing.T) *testClient { |
| t.Helper() |
| cli, err := clientv3.New(clientv3.Config{ |
| Endpoints: endpoints, |
| DialTimeout: 1 * time.Second, |
| DialKeepAliveTime: 1 * time.Second, |
| DialKeepAliveTimeout: 1 * time.Second, |
| }) |
| if err != nil { |
| t.Fatalf("clientv3.New: %v", err) |
| } |
| |
| namespaced := client.NewLocal(cli) |
| return &testClient{ |
| client: cli, |
| namespaced: namespaced, |
| } |
| } |
| |
| func (d *testClient) close() { |
| d.client.Close() |
| } |
| |
| // setEndpoints configures which endpoints (from {0,1,2}) the testClient is |
| // connected to. |
| func (d *testClient) setEndpoints(nums ...uint) { |
| var eps []string |
| for _, num := range nums { |
| eps = append(eps, endpoints[num]) |
| } |
| d.client.SetEndpoints(eps...) |
| } |
| |
| // put uses the testClient to store key with a given string value in etcd. It |
| // contains retry logic that will block until the put is successful. |
| func (d *testClient) put(t *testing.T, key, value string) { |
| t.Helper() |
| ctx, ctxC := context.WithCancel(context.Background()) |
| defer ctxC() |
| |
| for { |
| ctxT, ctxC := context.WithTimeout(ctx, 100*time.Millisecond) |
| _, err := d.namespaced.Put(ctxT, key, value) |
| ctxC() |
| if err == nil { |
| return |
| } |
| if err == ctxT.Err() { |
| log.Printf("Retrying after %v", err) |
| continue |
| } |
| // Retry on etcd unavailability - this will happen in this code as the |
| // etcd cluster repeatedly loses quorum. |
| var eerr rpctypes.EtcdError |
| if errors.As(err, &eerr) && eerr.Code() == codes.Unavailable { |
| log.Printf("Retrying after %v", err) |
| continue |
| } |
| t.Fatalf("Put: %v", err) |
| } |
| |
| } |
| |
| // remove uses the testClient to remove the given key from etcd. It contains |
| // retry logic that will block until the removal is successful. |
| func (d *testClient) remove(t *testing.T, key string) { |
| t.Helper() |
| ctx, ctxC := context.WithCancel(context.Background()) |
| defer ctxC() |
| |
| _, err := d.namespaced.Delete(ctx, key) |
| if err == nil { |
| return |
| } |
| t.Fatalf("Delete: %v", err) |
| } |
| |
| // expect runs a Get on the given Watcher, ensuring the returned value is a |
| // given string. |
| func expect(t *testing.T, w event.Watcher, value string) { |
| t.Helper() |
| ctx, ctxC := context.WithCancel(context.Background()) |
| defer ctxC() |
| |
| got, err := w.Get(ctx) |
| if err != nil { |
| t.Fatalf("Get: %v", err) |
| } |
| |
| if got, want := string(got.([]byte)), value; got != want { |
| t.Errorf("Wanted value %q, got %q", want, got) |
| } |
| } |
| |
| // expectTimeout ensures that the given watcher blocks on a Get call for at |
| // least 100 milliseconds. This is used by tests to attempt to verify that the |
| // watcher Get is fully blocked, but can cause false positives (eg. when Get |
| // blocks for 101 milliseconds). Thus, this function should be used sparingly |
| // and in tests that perform other baseline behaviour checks alongside this |
| // test. |
| func expectTimeout(t *testing.T, w event.Watcher) { |
| t.Helper() |
| ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond) |
| got, err := w.Get(ctx) |
| ctxC() |
| |
| if !errors.Is(err, ctx.Err()) { |
| t.Fatalf("Expected timeout error, got %v, %v", got, err) |
| } |
| } |
| |
| // wait wraps a watcher into a channel of strings, ensuring that the watcher |
| // never errors on Get calls and always returns strings. |
| func wait(t *testing.T, w event.Watcher) (chan string, func()) { |
| t.Helper() |
| ctx, ctxC := context.WithCancel(context.Background()) |
| |
| c := make(chan string) |
| |
| go func() { |
| for { |
| got, err := w.Get(ctx) |
| if err != nil && errors.Is(err, ctx.Err()) { |
| return |
| } |
| if err != nil { |
| t.Fatalf("Get: %v", err) |
| } |
| c <- string(got.([]byte)) |
| } |
| }() |
| |
| return c, ctxC |
| } |
| |
| // TestSimple exercises the simplest possible interaction with a watched value. |
| func TestSimple(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| |
| k := "test-simple" |
| value := NewValue(tc.namespaced, k, NoDecoder) |
| tc.put(t, k, "one") |
| |
| watcher := value.Watch() |
| defer watcher.Close() |
| expect(t, watcher, "one") |
| |
| tc.put(t, k, "two") |
| expect(t, watcher, "two") |
| |
| tc.put(t, k, "three") |
| tc.put(t, k, "four") |
| tc.put(t, k, "five") |
| tc.put(t, k, "six") |
| |
| q, cancel := wait(t, watcher) |
| // Test will hang here if the above value does not receive the set "six". |
| for el := range q { |
| if el == "six" { |
| break |
| } |
| } |
| cancel() |
| } |
| |
| // stringAt is a helper type for testing ranged watchers. It's returned by a |
| // watcher whose decoder is set to stringDecoder. |
| type stringAt struct { |
| key, value string |
| } |
| |
| func stringAtDecoder(key, value []byte) (interface{}, error) { |
| valueS := "" |
| if value != nil { |
| valueS = string(value) |
| } |
| return stringAt{ |
| key: string(key), |
| value: valueS, |
| }, nil |
| } |
| |
| // stringAtGet performs a Get from a Watcher, expecting a stringAt and updating |
| // the given map with the retrieved value. |
| func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher, m map[string]string) { |
| t.Helper() |
| |
| vr, err := w.Get(ctx) |
| if err != nil { |
| t.Fatalf("Get: %v", err) |
| } |
| v := vr.(stringAt) |
| m[v.key] = v.value |
| } |
| |
| // TestSimpleRange exercises the simplest behaviour of a ranged watcher, |
| // retrieving updaates via Get in a fully blocking fashion. |
| func TestSimpleRange(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| |
| ks := "test-simple-range/" |
| ke := "test-simple-range0" |
| value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke)) |
| tc.put(t, ks+"a", "one") |
| tc.put(t, ks+"b", "two") |
| tc.put(t, ks+"c", "three") |
| tc.put(t, ks+"b", "four") |
| |
| w := value.Watch() |
| defer w.Close() |
| |
| ctx, ctxC := context.WithCancel(context.Background()) |
| defer ctxC() |
| |
| res := make(map[string]string) |
| stringAtGet(ctx, t, w, res) |
| stringAtGet(ctx, t, w, res) |
| stringAtGet(ctx, t, w, res) |
| |
| tc.put(t, ks+"a", "five") |
| tc.put(t, ks+"e", "six") |
| |
| stringAtGet(ctx, t, w, res) |
| stringAtGet(ctx, t, w, res) |
| |
| for _, te := range []struct { |
| k, w string |
| }{ |
| {ks + "a", "five"}, |
| {ks + "b", "four"}, |
| {ks + "c", "three"}, |
| {ks + "e", "six"}, |
| } { |
| if want, got := te.w, res[te.k]; want != got { |
| t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got) |
| } |
| } |
| } |
| |
| // TestCancel ensures that watchers can resume after being canceled. |
| func TestCancel(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| |
| k := "test-cancel" |
| value := NewValue(tc.namespaced, k, NoDecoder) |
| tc.put(t, k, "one") |
| |
| watcher := value.Watch() |
| defer watcher.Close() |
| expect(t, watcher, "one") |
| |
| ctx, ctxC := context.WithCancel(context.Background()) |
| errs := make(chan error, 1) |
| go func() { |
| _, err := watcher.Get(ctx) |
| errs <- err |
| }() |
| ctxC() |
| if want, got := ctx.Err(), <-errs; !errors.Is(got, want) { |
| t.Fatalf("Wanted err %v, got %v", want, got) |
| } |
| |
| // Successfully canceled watch, resuming should continue to work. |
| q, cancel := wait(t, watcher) |
| defer cancel() |
| |
| tc.put(t, k, "two") |
| if want, got := "two", <-q; want != got { |
| t.Fatalf("Wanted val %q, got %q", want, got) |
| } |
| } |
| |
| // TestCancelOnGet ensures that a context cancellation on an initial Get (which |
| // translates to an etcd Get in a backoff loop) doesn't block. |
| func TestCancelOnGet(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| |
| k := "test-cancel-on-get" |
| value := NewValue(tc.namespaced, k, NoDecoder) |
| watcher := value.Watch() |
| tc.put(t, k, "one") |
| |
| // Cause partition between client endpoint and rest of cluster. Any read/write |
| // operations will now hang. |
| tc.setEndpoints(0) |
| cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2]) |
| // Let raft timeouts expire so that the leader is aware a partition has occurred |
| // and stops serving data if it is not part of a quorum anymore. |
| // |
| // Otherwise, if Member[0] was the leader, there will be a window of opportunity |
| // during which it will continue to serve read data even though it has been |
| // partitioned off. This is an effect of how etcd handles linearizable reads: |
| // they go through the leader, but do not go through raft. |
| // |
| // The value is the default etcd leader timeout (1s) + some wiggle room. |
| time.Sleep(time.Second + time.Millisecond*100) |
| |
| // Perform the initial Get(), which should attempt to retrieve a KV entry from |
| // the etcd service. This should hang. Unfortunately, there's no easy way to do |
| // this without an arbitrary sleep hoping that the client actually gets to the |
| // underlying etcd.Get call. This can cause false positives (eg. false 'pass' |
| // results) in this test. |
| ctx, ctxC := context.WithCancel(context.Background()) |
| errs := make(chan error, 1) |
| go func() { |
| _, err := watcher.Get(ctx) |
| errs <- err |
| }() |
| time.Sleep(time.Second) |
| |
| // Now that the etcd.Get is hanging, cancel the context. |
| ctxC() |
| // And now unpartition the cluster, resuming reads. |
| cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2]) |
| |
| // The etcd.Get() call should've returned with a context cancellation. |
| err := <-errs |
| switch { |
| case err == nil: |
| t.Errorf("watcher.Get() returned no error, wanted context error") |
| case errors.Is(err, ctx.Err()): |
| // Okay. |
| default: |
| t.Errorf("watcher.Get() returned %v, wanted context error", err) |
| } |
| } |
| |
| // TestClientReconnect forces a 'reconnection' of an active watcher from a |
| // running member to another member, by stopping the original member and |
| // explicitly reconnecting the client to other available members. |
| // |
| // This doe not reflect a situation expected during Metropolis runtime, as we |
| // do not expect splits between an etcd client and its connected member |
| // (instead, all etcd clients only connect to their local member). However, it |
| // is still an important safety test to perform, and it also exercies the |
| // equivalent behaviour of an etcd client re-connecting for any other reason. |
| func TestClientReconnect(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| tc.setEndpoints(0) |
| |
| k := "test-client-reconnect" |
| value := NewValue(tc.namespaced, k, NoDecoder) |
| tc.put(t, k, "one") |
| |
| watcher := value.Watch() |
| defer watcher.Close() |
| expect(t, watcher, "one") |
| |
| q, cancel := wait(t, watcher) |
| defer cancel() |
| |
| cluster.Members[0].Stop(t) |
| defer cluster.Members[0].Restart(t) |
| cluster.WaitLeader(t) |
| |
| tc.setEndpoints(1, 2) |
| tc.put(t, k, "two") |
| |
| if want, got := "two", <-q; want != got { |
| t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got) |
| } |
| } |
| |
| // TestClientPartition forces a temporary partition of the etcd member while a |
| // watcher is running, updates the value from across the partition, and undoes |
| // the partition. |
| // The partition is expected to be entirely transparent to the watcher. |
| func TestClientPartition(t *testing.T) { |
| tcOne := newTestClient(t) |
| defer tcOne.close() |
| tcOne.setEndpoints(0) |
| |
| tcRest := newTestClient(t) |
| defer tcRest.close() |
| tcRest.setEndpoints(1, 2) |
| |
| k := "test-client-partition" |
| valueOne := NewValue(tcOne.namespaced, k, NoDecoder) |
| watcherOne := valueOne.Watch() |
| defer watcherOne.Close() |
| valueRest := NewValue(tcRest.namespaced, k, NoDecoder) |
| watcherRest := valueRest.Watch() |
| defer watcherRest.Close() |
| |
| tcRest.put(t, k, "a") |
| expect(t, watcherOne, "a") |
| expect(t, watcherRest, "a") |
| |
| cluster.Members[0].InjectPartition(t, cluster.Members[1], cluster.Members[2]) |
| |
| tcRest.put(t, k, "b") |
| expect(t, watcherRest, "b") |
| expectTimeout(t, watcherOne) |
| |
| cluster.Members[0].RecoverPartition(t, cluster.Members[1], cluster.Members[2]) |
| |
| expect(t, watcherOne, "b") |
| tcRest.put(t, k, "c") |
| expect(t, watcherOne, "c") |
| expect(t, watcherRest, "c") |
| |
| } |
| |
| // TestEarlyUse exercises the correct behaviour of the value watcher on a value |
| // that is not yet set. |
| func TestEarlyUse(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| |
| k := "test-early-use" |
| |
| value := NewValue(tc.namespaced, k, NoDecoder) |
| watcher := value.Watch() |
| defer watcher.Close() |
| |
| wg := setSetupWg(watcher) |
| wg.Add(1) |
| q, cancel := wait(t, watcher) |
| defer cancel() |
| |
| wg.Done() |
| |
| tc.put(t, k, "one") |
| |
| if want, got := "one", <-q; want != got { |
| t.Fatalf("Expected %q, got %q", want, got) |
| } |
| } |
| |
| // TestRemove exercises the basic functionality of handling deleted values. |
| func TestRemove(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| |
| k := "test-remove" |
| tc.put(t, k, "one") |
| |
| value := NewValue(tc.namespaced, k, NoDecoder) |
| watcher := value.Watch() |
| defer watcher.Close() |
| |
| expect(t, watcher, "one") |
| tc.remove(t, k) |
| expect(t, watcher, "") |
| } |
| |
| // TestRemoveRange exercises the behaviour of a Get on a ranged watcher when a |
| // value is removed. |
| func TestRemoveRange(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| |
| ks := "test-remove-range/" |
| ke := "test-remove-range0" |
| value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke)) |
| tc.put(t, ks+"a", "one") |
| tc.put(t, ks+"b", "two") |
| tc.put(t, ks+"c", "three") |
| tc.put(t, ks+"b", "four") |
| tc.remove(t, ks+"c") |
| |
| w := value.Watch() |
| defer w.Close() |
| |
| ctx, ctxC := context.WithCancel(context.Background()) |
| defer ctxC() |
| |
| res := make(map[string]string) |
| stringAtGet(ctx, t, w, res) |
| stringAtGet(ctx, t, w, res) |
| |
| for _, te := range []struct { |
| k, w string |
| }{ |
| {ks + "a", "one"}, |
| {ks + "b", "four"}, |
| {ks + "c", ""}, |
| } { |
| if want, got := te.w, res[te.k]; want != got { |
| t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got) |
| } |
| } |
| } |
| |
| // TestEmptyRace forces the watcher to retrieve an empty value from the K/V |
| // store at first, and establishing the watch channel after a new value has |
| // been stored in the same place. |
| func TestEmptyRace(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| |
| k := "test-remove-race" |
| tc.put(t, k, "one") |
| tc.remove(t, k) |
| |
| value := NewValue(tc.namespaced, k, NoDecoder) |
| watcher := value.Watch() |
| defer watcher.Close() |
| |
| wg := setRaceWg(watcher) |
| wg.Add(1) |
| q, cancel := wait(t, watcher) |
| defer cancel() |
| |
| tc.put(t, k, "two") |
| wg.Done() |
| |
| if want, got := "two", <-q; want != got { |
| t.Fatalf("Watcher received incorrect data after client restart, wanted %q, got %q", want, got) |
| } |
| } |
| |
| type errOrInt struct { |
| val int64 |
| err error |
| } |
| |
| // TestDecoder exercises the BytesDecoder functionality of the watcher, by |
| // creating a value with a decoder that only accepts string-encoded integers |
| // that are divisible by three. The test then proceeds to put a handful of |
| // values into etcd, ensuring that undecodable values correctly return an error |
| // on Get, but that the watcher continues to work after the error has been |
| // returned. |
| func TestDecoder(t *testing.T) { |
| decodeStringifiedNumbersDivisibleBy3 := func(_, data []byte) (interface{}, error) { |
| num, err := strconv.ParseInt(string(data), 10, 64) |
| if err != nil { |
| return nil, fmt.Errorf("not a valid number") |
| } |
| if (num % 3) != 0 { |
| return nil, fmt.Errorf("not divisible by 3") |
| } |
| return num, nil |
| } |
| |
| tc := newTestClient(t) |
| defer tc.close() |
| |
| ctx, ctxC := context.WithCancel(context.Background()) |
| defer ctxC() |
| |
| k := "test-decoder" |
| value := NewValue(tc.namespaced, k, decodeStringifiedNumbersDivisibleBy3) |
| watcher := value.Watch() |
| defer watcher.Close() |
| tc.put(t, k, "3") |
| _, err := watcher.Get(ctx) |
| if err != nil { |
| t.Fatalf("Initial Get: %v", err) |
| } |
| |
| // Stream updates into arbitrarily-bounded test channel. |
| queue := make(chan errOrInt, 100) |
| go func() { |
| for { |
| res, err := watcher.Get(ctx) |
| if err != nil && errors.Is(err, ctx.Err()) { |
| return |
| } |
| if err != nil { |
| queue <- errOrInt{ |
| err: err, |
| } |
| } else { |
| queue <- errOrInt{ |
| val: res.(int64), |
| } |
| } |
| } |
| }() |
| |
| var wantList []*int64 |
| wantError := func(val string) { |
| wantList = append(wantList, nil) |
| tc.put(t, k, val) |
| } |
| wantValue := func(val string, decoded int64) { |
| wantList = append(wantList, &decoded) |
| tc.put(t, k, val) |
| } |
| |
| wantError("") |
| wantValue("9", 9) |
| wantError("foo") |
| wantValue("18", 18) |
| wantError("10") |
| wantValue("27", 27) |
| wantValue("36", 36) |
| |
| for i, want := range wantList { |
| q := <-queue |
| if want == nil && q.err == nil { |
| t.Fatalf("Case %d: wanted error, got no error and value %d", i, q.val) |
| } |
| if want != nil && (*want) != q.val { |
| t.Fatalf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val) |
| } |
| } |
| } |
| |
| // TestBacklog ensures that the watcher can handle a large backlog of changes |
| // in etcd that the client didnt' keep up with, and that whatever final state |
| // is available to the client when it actually gets to calling Get(). |
| func TestBacklog(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| |
| k := "test-backlog" |
| value := NewValue(tc.namespaced, k, NoDecoder) |
| watcher := value.Watch() |
| defer watcher.Close() |
| |
| tc.put(t, k, "initial") |
| expect(t, watcher, "initial") |
| |
| for i := 0; i < 1000; i++ { |
| tc.put(t, k, fmt.Sprintf("val-%d", i)) |
| } |
| |
| ctx, ctxC := context.WithTimeout(context.Background(), time.Second) |
| defer ctxC() |
| for { |
| valB, err := watcher.Get(ctx) |
| if err != nil { |
| t.Fatalf("Get() returned error before expected final value: %v", err) |
| } |
| val := string(valB.([]byte)) |
| if val == "val-999" { |
| break |
| } |
| } |
| } |
| |
| // TestBacklogRange ensures that the ranged etcd watcher can handle a large |
| // backlog of changes in etcd that the client didn't keep up with. |
| func TestBacklogRange(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| |
| ks := "test-backlog-range/" |
| ke := "test-backlog-range0" |
| value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke)) |
| w := value.Watch() |
| defer w.Close() |
| |
| for i := 0; i < 100; i++ { |
| if i%2 == 0 { |
| tc.put(t, ks+"a", fmt.Sprintf("val-%d", i)) |
| } else { |
| tc.put(t, ks+"b", fmt.Sprintf("val-%d", i)) |
| } |
| } |
| |
| ctx, ctxC := context.WithCancel(context.Background()) |
| defer ctxC() |
| |
| res := make(map[string]string) |
| stringAtGet(ctx, t, w, res) |
| stringAtGet(ctx, t, w, res) |
| |
| for _, te := range []struct { |
| k, w string |
| }{ |
| {ks + "a", "val-98"}, |
| {ks + "b", "val-99"}, |
| } { |
| if want, got := te.w, res[te.k]; want != got { |
| t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got) |
| } |
| } |
| } |
| |
| // TestBacklogOnly exercises the BacklogOnly option for non-ranged watchers, |
| // which effectively makes any Get operation non-blocking (but also showcases |
| // that unless a Get without BacklogOnly is issues, no new data will appear by |
| // itself in the watcher - which is an undocumented implementation detail of the |
| // option). |
| func TestBacklogOnly(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| ctx, ctxC := context.WithCancel(context.Background()) |
| defer ctxC() |
| |
| k := "test-backlog-only" |
| tc.put(t, k, "initial") |
| |
| value := NewValue(tc.namespaced, k, NoDecoder) |
| watcher := value.Watch() |
| defer watcher.Close() |
| |
| d, err := watcher.Get(ctx, BacklogOnly) |
| if err != nil { |
| t.Fatalf("First Get failed: %v", err) |
| } |
| if want, got := "initial", string(d.([]byte)); want != got { |
| t.Fatalf("First Get: wanted value %q, got %q", want, got) |
| } |
| |
| // As expected, next call to Get with BacklogOnly fails - there truly is no new |
| // updates to emit. |
| _, err = watcher.Get(ctx, BacklogOnly) |
| if want, got := BacklogDone, err; want != got { |
| t.Fatalf("Second Get: wanted %v, got %v", want, got) |
| } |
| |
| // Implementation detail: even though there is a new value ('second'), |
| // BacklogOnly will still return BacklogDone. |
| tc.put(t, k, "second") |
| _, err = watcher.Get(ctx, BacklogOnly) |
| if want, got := BacklogDone, err; want != got { |
| t.Fatalf("Third Get: wanted %v, got %v", want, got) |
| } |
| |
| // ... However, a Get without BacklogOnly will return the new value. |
| d, err = watcher.Get(ctx) |
| if err != nil { |
| t.Fatalf("Fourth Get failed: %v", err) |
| } |
| if want, got := "second", string(d.([]byte)); want != got { |
| t.Fatalf("Fourth Get: wanted value %q, got %q", want, got) |
| } |
| } |
| |
| // TestBacklogOnlyRange exercises the BacklogOnly option for ranged watchers, |
| // showcasing how it expected to be used for keeping up with the external state |
| // of a range by synchronizing to a local map. |
| func TestBacklogOnlyRange(t *testing.T) { |
| tc := newTestClient(t) |
| defer tc.close() |
| ctx, ctxC := context.WithCancel(context.Background()) |
| defer ctxC() |
| |
| ks := "test-backlog-only-range/" |
| ke := "test-backlog-only-range0" |
| |
| for i := 0; i < 100; i++ { |
| if i%2 == 0 { |
| tc.put(t, ks+"a", fmt.Sprintf("val-%d", i)) |
| } else { |
| tc.put(t, ks+"b", fmt.Sprintf("val-%d", i)) |
| } |
| } |
| |
| value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke)) |
| w := value.Watch() |
| defer w.Close() |
| |
| // Collect results into a map from key to value. |
| res := make(map[string]string) |
| |
| // Run first Get - this is the barrier defining what's part of the backlog. |
| g, err := w.Get(ctx, BacklogOnly) |
| if err != nil { |
| t.Fatalf("Get: %v", err) |
| } |
| kv := g.(stringAt) |
| res[kv.key] = kv.value |
| |
| // These won't be part of the backlog. |
| tc.put(t, ks+"a", fmt.Sprintf("val-100")) |
| tc.put(t, ks+"b", fmt.Sprintf("val-101")) |
| |
| // Retrieve the rest of the backlog until BacklogDone is returned. |
| nUpdates := 1 |
| for { |
| g, err := w.Get(ctx, BacklogOnly) |
| if err == BacklogDone { |
| break |
| } |
| if err != nil { |
| t.Fatalf("Get: %v", err) |
| } |
| nUpdates += 1 |
| kv := g.(stringAt) |
| res[kv.key] = kv.value |
| } |
| |
| // The backlog should've been compacted to just two entries at their newest |
| // state. |
| if want, got := 2, nUpdates; want != got { |
| t.Fatalf("wanted backlog in %d updates, got it in %d", want, got) |
| } |
| |
| for _, te := range []struct { |
| k, w string |
| }{ |
| {ks + "a", "val-98"}, |
| {ks + "b", "val-99"}, |
| } { |
| if want, got := te.w, res[te.k]; want != got { |
| t.Errorf("res[%q]: wanted %q, got %q", te.k, want, got) |
| } |
| } |
| } |