m/pkg/event: move MemoryValue to subpackage

This keeps metropolis/pkg/event as a pure interface package, and
moves the memory-backed implementation to a subpackage.

Test Plan: Refactor, coevered by tests.

X-Origin-Diff: phab/D764
GitOrigin-RevId: 1337bf55a7752293791b3efe8648bbf5f6e6e9e1
diff --git a/metropolis/pkg/event/BUILD.bazel b/metropolis/pkg/event/BUILD.bazel
index 4e3b33c..0217c8a 100644
--- a/metropolis/pkg/event/BUILD.bazel
+++ b/metropolis/pkg/event/BUILD.bazel
@@ -1,20 +1,8 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
 
 go_library(
     name = "go_default_library",
-    srcs = [
-        "event.go",
-        "memory_value.go",
-    ],
+    srcs = ["event.go"],
     importpath = "source.monogon.dev/metropolis/pkg/event",
     visibility = ["//visibility:public"],
 )
-
-go_test(
-    name = "go_default_test",
-    srcs = [
-        "example_test.go",
-        "memory_value_test.go",
-    ],
-    embed = [":go_default_library"],
-)
diff --git a/metropolis/pkg/event/memory/BUILD.bazel b/metropolis/pkg/event/memory/BUILD.bazel
new file mode 100644
index 0000000..4ba79d2
--- /dev/null
+++ b/metropolis/pkg/event/memory/BUILD.bazel
@@ -0,0 +1,19 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+    name = "go_default_library",
+    srcs = ["memory.go"],
+    importpath = "source.monogon.dev/metropolis/pkg/event/memory",
+    visibility = ["//visibility:public"],
+    deps = ["//metropolis/pkg/event:go_default_library"],
+)
+
+go_test(
+    name = "go_default_test",
+    srcs = [
+        "example_test.go",
+        "memory_test.go",
+    ],
+    embed = [":go_default_library"],
+    deps = ["//metropolis/pkg/event:go_default_library"],
+)
diff --git a/metropolis/pkg/event/example_test.go b/metropolis/pkg/event/memory/example_test.go
similarity index 91%
rename from metropolis/pkg/event/example_test.go
rename to metropolis/pkg/event/memory/example_test.go
index 3cdc661..a119666 100644
--- a/metropolis/pkg/event/example_test.go
+++ b/metropolis/pkg/event/memory/example_test.go
@@ -14,24 +14,26 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package event
+package memory
 
 import (
 	"context"
 	"fmt"
 	"net"
 	"time"
+
+	"source.monogon.dev/metropolis/pkg/event"
 )
 
 // NetworkStatus is example data that will be stored in a Value.
 type NetworkStatus struct {
 	ExternalAddress net.IP
-	DefaultGateway net.IP
+	DefaultGateway  net.IP
 }
 
 // NetworkStatusWatcher is a typesafe wrapper around a Watcher.
 type NetworkStatusWatcher struct {
-	watcher Watcher
+	watcher event.Watcher
 }
 
 // Get wraps Watcher.Get and performs type assertion.
@@ -48,7 +50,7 @@
 // communicating the newest information about a machine's network configuration
 // to consumers/watchers.
 type NetworkService struct {
-	Provider MemoryValue
+	Provider Value
 }
 
 // Watch is a thin wrapper around Value.Watch that returns the typesafe
@@ -66,23 +68,23 @@
 func (s *NetworkService) Run(ctx context.Context) {
 	s.Provider.Set(NetworkStatus{
 		ExternalAddress: nil,
-		DefaultGateway: nil,
+		DefaultGateway:  nil,
 	})
 
 	select {
-	case <-time.After(100*time.Millisecond):
+	case <-time.After(100 * time.Millisecond):
 	case <-ctx.Done():
-			return
+		return
 	}
 
 	fmt.Printf("NS: Got DHCP Lease\n")
 	s.Provider.Set(NetworkStatus{
 		ExternalAddress: net.ParseIP("203.0.113.24"),
-		DefaultGateway: net.ParseIP("203.0.113.1"),
+		DefaultGateway:  net.ParseIP("203.0.113.1"),
 	})
 
 	select {
-	case <-time.After(100*time.Millisecond):
+	case <-time.After(100 * time.Millisecond):
 	case <-ctx.Done():
 		return
 	}
@@ -90,7 +92,7 @@
 	fmt.Printf("NS: DHCP Address changed\n")
 	s.Provider.Set(NetworkStatus{
 		ExternalAddress: net.ParseIP("203.0.113.103"),
-		DefaultGateway: net.ParseIP("203.0.113.1"),
+		DefaultGateway:  net.ParseIP("203.0.113.1"),
 	})
 
 	time.Sleep(100 * time.Millisecond)
@@ -136,4 +138,3 @@
 	// NS: DHCP Address changed
 	// /etc/hosts: foo.example.com is now 203.0.113.103
 }
-
diff --git a/metropolis/pkg/event/memory_value.go b/metropolis/pkg/event/memory/memory.go
similarity index 78%
rename from metropolis/pkg/event/memory_value.go
rename to metropolis/pkg/event/memory/memory.go
index adf412d..1e94a93 100644
--- a/metropolis/pkg/event/memory_value.go
+++ b/metropolis/pkg/event/memory/memory.go
@@ -14,46 +14,49 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package event
+package memory
 
 import (
 	"context"
 	"fmt"
 	"sync"
+
+	"source.monogon.dev/metropolis/pkg/event"
 )
 
 var (
-	// Type assert that *MemoryValue implements Value. We do this artificially, as
+	// Type assert that *Value implements Value. We do this artificially, as
 	// there currently is no code path that needs this to be strictly true. However,
 	// users of this library might want to rely on the Value type instead of
 	// particular Value implementations.
-	_ Value = &MemoryValue{}
+	_ event.Value = &Value{}
 )
 
-// MemoryValue implements a Value stored in memory. It is safe to construct an
-// empty object of this type. However, this must not be copied.
-type MemoryValue struct {
+// Value is a 'memory value', which implements a event.Value stored in memory.
+// It is safe to construct an empty object of this type. However, this must not
+// be copied.
+type Value struct {
 	// mu guards the inner, innerSet and watchers fields.
 	mu sync.RWMutex
-	// inner is the latest data Set on the MemoryValue. It is used to provide
-	// the newest version of the Set data to new Watchers.
+	// inner is the latest data Set on the Value. It is used to provide the
+	// newest version of the Set data to new watchers.
 	inner interface{}
 	// innerSet is true when inner has been Set at least once. It is used to
 	// differentiate between a nil and unset value.
 	innerSet bool
 	// watchers is the list of watchers that should be updated when new data is
-	// Set. It will grow on every .Watch() and shrink any time a Watcher is
+	// Set. It will grow on every .Watch() and shrink any time a watcher is
 	// determined to have been closed.
-	watchers []*MemoryWatcher
+	watchers []*watcher
 
-	// Sync, if set to true, blocks all .Set() calls on the MemoryValue until
-	// all Watchers derived from it actively .Get() the new value. This can be
-	// used to ensure Watchers always receive a full log of all Set() calls.
+	// Sync, if set to true, blocks all .Set() calls on the Value until all
+	// Watchers derived from it actively .Get() the new value. This can be used
+	// to ensure Watchers always receive a full log of all Set() calls.
 	//
 	// This must not be changed after the first .Set/.Watch call.
 	//
 	// This is an experimental API and subject to change. It might be migrated
-	// to per-Watcher settings defined within the main Value/Watcher
+	// to per-Watcher settings defined within the main event.Value/Watcher
 	// interfaces.
 	Sync bool
 }
@@ -61,18 +64,18 @@
 // Set updates the Value to the given data. It is safe to call this from
 // multiple goroutines, including concurrently.
 //
-// For more information about guarantees, see Value.Set.
-func (m *MemoryValue) Set(val interface{}) {
+// For more information about guarantees, see event.Value.Set.
+func (m *Value) Set(val interface{}) {
 	m.mu.Lock()
 	defer m.mu.Unlock()
 
-	// Update the data that is provided on first Get() to Watchers.
+	// Update the data that is provided on first Get() to watchers.
 	m.inner = val
 	m.innerSet = true
 
 	// Go through all watchers, updating them on the new value and filtering out
 	// all closed watchers.
-	newWatchers := make([]*MemoryWatcher, 0, len(m.watchers))
+	newWatchers := make([]*watcher, 0, len(m.watchers))
 	for _, w := range m.watchers {
 		w := w
 		if w.closed() {
@@ -84,15 +87,15 @@
 	m.watchers = newWatchers
 }
 
-// MemoryWatcher implements the Watcher interface for watchers returned by
-// MemoryValue.
-type MemoryWatcher struct {
+// watcher implements the event.Watcher interface for watchers returned by
+// Value.
+type watcher struct {
 	// activeReqC is a channel used to request an active submission channel
 	// from a pending Get function, if any.
 	activeReqC chan chan interface{}
 	// deadletterSubmitC is a channel used to communicate a value that
 	// attempted to be submitted via activeReqC. This will be received by the
-	// deadletter worker of this Watcher and passed on to the next .Get call
+	// deadletter worker of this watcher and passed on to the next .Get call
 	// that occurs.
 	deadletterSubmitC chan interface{}
 
@@ -101,29 +104,29 @@
 	// active. It is implemented as a channel to permit concurrent .Get() calls
 	// to error out instead of blocking.
 	getSem chan struct{}
-	// close is a channel that is closed when this Watcher is itself Closed.
+	// close is a channel that is closed when this watcher is itself Closed.
 	close chan struct{}
 }
 
 // Watch retrieves a Watcher that keeps track on the version of the data
 // contained within the Value that was last seen by a consumer.
 //
-// For more information about guarantees, see Value.Watch.
-func (m *MemoryValue) Watch() Watcher {
-	waiter := &MemoryWatcher{
+// For more information about guarantees, see event.Value.Watch.
+func (m *Value) Watch() event.Watcher {
+	waiter := &watcher{
 		activeReqC:        make(chan chan interface{}),
 		deadletterSubmitC: make(chan interface{}),
 		close:             make(chan struct{}),
 		getSem:            make(chan struct{}, 1),
 	}
 	// Start the deadletter worker as a goroutine. It will be stopped when the
-	// Watcher is Closed() (as signaled by the close channel).
+	// watcher is Closed() (as signaled by the close channel).
 	go waiter.deadletterWorker()
 
-	// Append this watcher to the MemoryValue.
+	// Append this watcher to the Value.
 	m.mu.Lock()
 	m.watchers = append(m.watchers, waiter)
-	// If the MemoryValue already has some value set, communicate that to the
+	// If the Value already has some value set, communicate that to the
 	// first Get call by going through the deadletter worker.
 	if m.innerSet {
 		waiter.deadletterSubmitC <- m.inner
@@ -140,7 +143,7 @@
 // It watches the deadletterSubmitC channel for updated data, and overrides
 // previously received data. Then, when a .Get() begins to pend (and respond to
 // activeReqC receives), the deadletter worker will deliver that value.
-func (m *MemoryWatcher) deadletterWorker() {
+func (m *watcher) deadletterWorker() {
 	// Current value, and flag to mark it as set (vs. nil).
 	var cur interface{}
 	var set bool
@@ -183,7 +186,7 @@
 }
 
 // closed returns whether this watcher has been closed.
-func (m *MemoryWatcher) closed() bool {
+func (m *watcher) closed() bool {
 	select {
 	case _, ok := <-m.close:
 		if !ok {
@@ -194,8 +197,8 @@
 	return false
 }
 
-// update is the high level update-this-watcher function called by MemoryValue.
-func (m *MemoryWatcher) update(sync bool, val interface{}) {
+// update is the high level update-this-watcher function called by Value.
+func (m *watcher) update(sync bool, val interface{}) {
 	// If synchronous delivery was requested, block until a watcher .Gets it.
 	if sync {
 		c := <-m.activeReqC
@@ -221,15 +224,15 @@
 	}
 }
 
-func (m *MemoryWatcher) Close() error {
+func (m *watcher) Close() error {
 	close(m.deadletterSubmitC)
 	close(m.close)
 	return nil
 }
 
-// Get blocks until a Value's data is available. See Watcher.Get for guarantees
-// and more information.
-func (m *MemoryWatcher) Get(ctx context.Context) (interface{}, error) {
+// Get blocks until a Value's data is available. See event.Watcher.Get for
+// guarantees and more information.
+func (m *watcher) Get(ctx context.Context) (interface{}, error) {
 	// Make sure we're the only active .Get call.
 	select {
 	case m.getSem <- struct{}{}:
@@ -277,4 +280,3 @@
 		}
 	}
 }
-
diff --git a/metropolis/pkg/event/memory_value_test.go b/metropolis/pkg/event/memory/memory_test.go
similarity index 94%
rename from metropolis/pkg/event/memory_value_test.go
rename to metropolis/pkg/event/memory/memory_test.go
index 4b0487c..f4feb33 100644
--- a/metropolis/pkg/event/memory_value_test.go
+++ b/metropolis/pkg/event/memory/memory_test.go
@@ -14,7 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package event
+package memory
 
 import (
 	"context"
@@ -25,10 +25,10 @@
 	"time"
 )
 
-// TestAsync exercises the high-level behaviour of a MemoryValue, in which a
+// TestAsync exercises the high-level behaviour of a Value, in which a
 // watcher is able to catch up to the newest Set value.
 func TestAsync(t *testing.T) {
-	p := MemoryValue{}
+	p := Value{}
 	p.Set(0)
 
 	ctx := context.Background()
@@ -59,12 +59,12 @@
 	}
 }
 
-// TestSyncBlocks exercises the MemoryValue's 'Sync' field, which makes all
+// TestSyncBlocks exercises the Value's 'Sync' field, which makes all
 // Set() calls block until all respective watchers .Get() the updated data.
 // This particular test ensures that .Set() calls to a Watcher result in a
 // prefect log of updates being transmitted to a watcher.
 func TestSync(t *testing.T) {
-	p := MemoryValue{
+	p := Value{
 		Sync: true,
 	}
 	values := make(chan int, 100)
@@ -104,12 +104,12 @@
 	}
 }
 
-// TestSyncBlocks exercises the MemoryValue's 'Sync' field, which makes all
+// TestSyncBlocks exercises the Value's 'Sync' field, which makes all
 // Set() calls block until all respective watchers .Get() the updated data.
 // This particular test ensures that .Set() calls actually block when a watcher
 // is unattended.
 func TestSyncBlocks(t *testing.T) {
-	p := MemoryValue{
+	p := Value{
 		Sync: true,
 	}
 	ctx := context.Background()
@@ -175,7 +175,7 @@
 // TestMultipleGets verifies that calling .Get() on a single watcher from two
 // goroutines is prevented by returning an error in exactly one of them.
 func TestMultipleGets(t *testing.T) {
-	p := MemoryValue{}
+	p := Value{}
 	ctx := context.Background()
 
 	w := p.Watch()
@@ -198,13 +198,13 @@
 	}
 }
 
-// TestConcurrency attempts to stress the MemoryValue/MemoryWatcher
+// TestConcurrency attempts to stress the Value/Watcher
 // implementation to design limits (a hundred simultaneous watchers), ensuring
 // that the watchers all settle to the final set value.
 func TestConcurrency(t *testing.T) {
 	ctx := context.Background()
 
-	p := MemoryValue{}
+	p := Value{}
 	p.Set(0)
 
 	// Number of watchers to create.
@@ -274,7 +274,7 @@
 // aborts that particular Get call, but also allows subsequent use of the same
 // watcher.
 func TestCanceling(t *testing.T) {
-	p := MemoryValue{
+	p := Value{
 		Sync: true,
 	}
 
@@ -316,7 +316,7 @@
 func TestSetAfterWatch(t *testing.T) {
 	ctx := context.Background()
 
-	p := MemoryValue{}
+	p := Value{}
 	p.Set(0)
 
 	watcher := p.Watch()