m/pkg/event: implement
This specifies event.{Value,Watcher}, an interface for data that might
be updated by its producer, and which is watched for such updates by
multiple consumers.
It also implements MemoryValue, a Value that is stored in memory.
Test Plan: adds unit tests.
X-Origin-Diff: phab/D706
GitOrigin-RevId: 271fd4e88969817b66318d3e03d50b70cf2819b8
diff --git a/metropolis/pkg/event/memory_value_test.go b/metropolis/pkg/event/memory_value_test.go
new file mode 100644
index 0000000..4b0487c
--- /dev/null
+++ b/metropolis/pkg/event/memory_value_test.go
@@ -0,0 +1,332 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package event
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+// TestAsync exercises the high-level behaviour of a MemoryValue, in which a
+// watcher is able to catch up to the newest Set value.
+func TestAsync(t *testing.T) {
+ p := MemoryValue{}
+ p.Set(0)
+
+ ctx := context.Background()
+
+ // The 0 from Set() should be available via .Get().
+ watcher := p.Watch()
+ val, err := watcher.Get(ctx)
+ if err != nil {
+ t.Fatalf("Get: %v", err)
+ }
+ if want, got := 0, val.(int); want != got {
+ t.Fatalf("Value: got %d, wanted %d", got, want)
+ }
+
+ // Send a large amount of updates that the watcher does not actively .Get().
+ for i := 1; i <= 100; i++ {
+ p.Set(i)
+ }
+
+ // The watcher should still end up with the newest .Set() value on the next
+ // .Get() call.
+ val, err = watcher.Get(ctx)
+ if err != nil {
+ t.Fatalf("Get: %v", err)
+ }
+ if want, got := 100, val.(int); want != got {
+ t.Fatalf("Value: got %d, wanted %d", got, want)
+ }
+}
+
+// TestSyncBlocks exercises the MemoryValue's 'Sync' field, which makes all
+// Set() calls block until all respective watchers .Get() the updated data.
+// This particular test ensures that .Set() calls to a Watcher result in a
+// prefect log of updates being transmitted to a watcher.
+func TestSync(t *testing.T) {
+ p := MemoryValue{
+ Sync: true,
+ }
+ values := make(chan int, 100)
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ ctx := context.Background()
+ watcher := p.Watch()
+ wg.Done()
+ for {
+ value, err := watcher.Get(ctx)
+ if err != nil {
+ panic(err)
+ }
+ values <- value.(int)
+ }
+ }()
+
+ p.Set(0)
+ wg.Wait()
+
+ want := []int{1, 2, 3, 4}
+ for _, w := range want {
+ p.Set(w)
+ }
+
+ timeout := time.After(time.Second)
+ for i, w := range append([]int{0}, want...) {
+ select {
+ case <-timeout:
+ t.Fatalf("timed out on value %d (%d)", i, w)
+ case val := <-values:
+ if w != val {
+ t.Errorf("value %d was %d, wanted %d", i, val, w)
+ }
+ }
+ }
+}
+
+// TestSyncBlocks exercises the MemoryValue's 'Sync' field, which makes all
+// Set() calls block until all respective watchers .Get() the updated data.
+// This particular test ensures that .Set() calls actually block when a watcher
+// is unattended.
+func TestSyncBlocks(t *testing.T) {
+ p := MemoryValue{
+ Sync: true,
+ }
+ ctx := context.Background()
+
+ // Shouldn't block, as there's no declared watchers.
+ p.Set(0)
+
+ watcher := p.Watch()
+
+ // Should retrieve the zero, more requests will pend.
+ value, err := watcher.Get(ctx)
+ if err != nil {
+ t.Fatalf("Get: %v", err)
+ }
+ if want, got := 0, value.(int); want != got {
+ t.Fatalf("Got initial value %d, wanted %d", got, want)
+ }
+
+ // .Set() Should block, as watcher is unattended.
+ //
+ // Whether something blocks in Go is untestable in a robust way (see: halting
+ // problem). We work around this this by introducing a 'stage' int64, which is
+ // put on the 'c' channel after the needs-to-block function returns. We then
+ // perform an action that should unblock this function right after updating
+ // 'stage' to a different value.
+ // Then, we observe what was put on the channel: If it's the initial value, it
+ // means the function didn't block when expected. Otherwise, it means the
+ // function unblocked when expected.
+ stage := int64(0)
+ c := make(chan int64, 1)
+ go func() {
+ p.Set(1)
+ c <- atomic.LoadInt64(&stage)
+ }()
+
+ // Getting should unblock the provider. Mark via 'stage' variable that
+ // unblocking now is expected.
+ atomic.StoreInt64(&stage, int64(1))
+ // Potential race: .Set() unblocks here due to some bug, before .Get() is
+ // called, and we record a false positive.
+ value, err = watcher.Get(ctx)
+ if err != nil {
+ t.Fatalf("Get: %v", err)
+ }
+
+ res := <-c
+ if res != int64(1) {
+ t.Fatalf("Set() returned before Get()")
+ }
+
+ if want, got := 1, value.(int); want != got {
+ t.Fatalf("Wanted value %d, got %d", want, got)
+ }
+
+ // Closing the watcher and setting should not block anymore.
+ if err := watcher.Close(); err != nil {
+ t.Fatalf("Close: %v", err)
+ }
+ // Last step, if this blocks we will get a deadlock error and the test will panic.
+ p.Set(2)
+}
+
+// TestMultipleGets verifies that calling .Get() on a single watcher from two
+// goroutines is prevented by returning an error in exactly one of them.
+func TestMultipleGets(t *testing.T) {
+ p := MemoryValue{}
+ ctx := context.Background()
+
+ w := p.Watch()
+
+ tryError := func(errs chan error) {
+ _, err := w.Get(ctx)
+ errs <- err
+ }
+ errs := make(chan error, 2)
+ go tryError(errs)
+ go tryError(errs)
+
+ for err := range errs {
+ if err == nil {
+ t.Fatalf("A Get call succeeded, while it should have blocked or returned an error")
+ } else {
+ // Found the error, test succeeded.
+ break
+ }
+ }
+}
+
+// TestConcurrency attempts to stress the MemoryValue/MemoryWatcher
+// implementation to design limits (a hundred simultaneous watchers), ensuring
+// that the watchers all settle to the final set value.
+func TestConcurrency(t *testing.T) {
+ ctx := context.Background()
+
+ p := MemoryValue{}
+ p.Set(0)
+
+ // Number of watchers to create.
+ watcherN := 100
+ // Expected final value to be Set().
+ final := 100
+ // Result channel per watcher.
+ resC := make([]chan error, watcherN)
+
+ // Spawn watcherN watchers.
+ for i := 0; i < watcherN; i++ {
+ resC[i] = make(chan error, 1)
+ go func(id int) {
+ // done is a helper function that will put an error on the
+ // respective watcher's resC.
+ done := func(err error) {
+ resC[id] <- err
+ close(resC[id])
+ }
+
+ watcher := p.Watch()
+ // prev is used to ensure the values received are monotonic.
+ prev := -1
+ for {
+ val, err := watcher.Get(ctx)
+ if err != nil {
+ done(err)
+ return
+ }
+
+ // Ensure monotonicity of received data.
+ if val.(int) <= prev {
+ done(fmt.Errorf("received out of order data: %d after %d", val, prev))
+ }
+ prev = val.(int)
+
+ // Quit when the final value is received.
+ if val == final {
+ done(nil)
+ return
+ }
+
+ // Sleep a bit, depending on the watcher. This makes each
+ // watcher behave slightly differently, and attempts to
+ // exercise races dependent on sleep time between subsequent
+ // Get calls.
+ time.Sleep(time.Millisecond * time.Duration(id))
+ }
+ }(i)
+ }
+
+ // Set 1..final on the value.
+ for i := 1; i <= final; i++ {
+ p.Set(i)
+ }
+
+ // Ensure all watchers exit with no error.
+ for i, c := range resC {
+ err := <-c
+ if err != nil {
+ t.Errorf("Watcher %d returned %v", i, err)
+ }
+ }
+}
+
+// TestCanceling exercises whether a context canceling in a .Get() gracefully
+// aborts that particular Get call, but also allows subsequent use of the same
+// watcher.
+func TestCanceling(t *testing.T) {
+ p := MemoryValue{
+ Sync: true,
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+
+ watcher := p.Watch()
+
+ // errs will contain the error returned by Get.
+ errs := make(chan error, 1)
+ go func() {
+ // This Get will block, as no initial data has been Set on the value.
+ _, err := watcher.Get(ctx)
+ errs <- err
+ }()
+
+ // Cancel the context, and expect that context error to propagate to the .Get().
+ ctxC()
+ if want, got := ctx.Err(), <-errs; want != got {
+ t.Fatalf("Get should've returned %v, got %v", want, got)
+ }
+
+ // Do another .Get() on the same watcher with a new context. Even though the
+ // call was aborted via a context cancel, the watcher should continue working.
+ ctx = context.Background()
+ go func() {
+ _, err := watcher.Get(ctx)
+ errs <- err
+ }()
+
+ // Unblock the .Get now.
+ p.Set(1)
+ if want, got := error(nil), <-errs; want != got {
+ t.Fatalf("Get should've returned %v, got %v", want, got)
+ }
+}
+
+// TestSetAfterWatch ensures that if a value is updated between a Watch and the
+// initial Get, only the newest Set value is returns.
+func TestSetAfterWatch(t *testing.T) {
+ ctx := context.Background()
+
+ p := MemoryValue{}
+ p.Set(0)
+
+ watcher := p.Watch()
+ p.Set(1)
+
+ data, err := watcher.Get(ctx)
+ if err != nil {
+ t.Fatalf("Get: %v", err)
+ }
+ if want, got := 1, data.(int); want != got {
+ t.Errorf("Get should've returned %v, got %v", want, got)
+ }
+}