m/pkg/event: move MemoryValue to subpackage
This keeps metropolis/pkg/event as a pure interface package, and
moves the memory-backed implementation to a subpackage.
Test Plan: Refactor, coevered by tests.
X-Origin-Diff: phab/D764
GitOrigin-RevId: 1337bf55a7752293791b3efe8648bbf5f6e6e9e1
diff --git a/metropolis/pkg/event/BUILD.bazel b/metropolis/pkg/event/BUILD.bazel
index 4e3b33c..0217c8a 100644
--- a/metropolis/pkg/event/BUILD.bazel
+++ b/metropolis/pkg/event/BUILD.bazel
@@ -1,20 +1,8 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
- srcs = [
- "event.go",
- "memory_value.go",
- ],
+ srcs = ["event.go"],
importpath = "source.monogon.dev/metropolis/pkg/event",
visibility = ["//visibility:public"],
)
-
-go_test(
- name = "go_default_test",
- srcs = [
- "example_test.go",
- "memory_value_test.go",
- ],
- embed = [":go_default_library"],
-)
diff --git a/metropolis/pkg/event/memory/BUILD.bazel b/metropolis/pkg/event/memory/BUILD.bazel
new file mode 100644
index 0000000..4ba79d2
--- /dev/null
+++ b/metropolis/pkg/event/memory/BUILD.bazel
@@ -0,0 +1,19 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "go_default_library",
+ srcs = ["memory.go"],
+ importpath = "source.monogon.dev/metropolis/pkg/event/memory",
+ visibility = ["//visibility:public"],
+ deps = ["//metropolis/pkg/event:go_default_library"],
+)
+
+go_test(
+ name = "go_default_test",
+ srcs = [
+ "example_test.go",
+ "memory_test.go",
+ ],
+ embed = [":go_default_library"],
+ deps = ["//metropolis/pkg/event:go_default_library"],
+)
diff --git a/metropolis/pkg/event/example_test.go b/metropolis/pkg/event/memory/example_test.go
similarity index 91%
rename from metropolis/pkg/event/example_test.go
rename to metropolis/pkg/event/memory/example_test.go
index 3cdc661..a119666 100644
--- a/metropolis/pkg/event/example_test.go
+++ b/metropolis/pkg/event/memory/example_test.go
@@ -14,24 +14,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package event
+package memory
import (
"context"
"fmt"
"net"
"time"
+
+ "source.monogon.dev/metropolis/pkg/event"
)
// NetworkStatus is example data that will be stored in a Value.
type NetworkStatus struct {
ExternalAddress net.IP
- DefaultGateway net.IP
+ DefaultGateway net.IP
}
// NetworkStatusWatcher is a typesafe wrapper around a Watcher.
type NetworkStatusWatcher struct {
- watcher Watcher
+ watcher event.Watcher
}
// Get wraps Watcher.Get and performs type assertion.
@@ -48,7 +50,7 @@
// communicating the newest information about a machine's network configuration
// to consumers/watchers.
type NetworkService struct {
- Provider MemoryValue
+ Provider Value
}
// Watch is a thin wrapper around Value.Watch that returns the typesafe
@@ -66,23 +68,23 @@
func (s *NetworkService) Run(ctx context.Context) {
s.Provider.Set(NetworkStatus{
ExternalAddress: nil,
- DefaultGateway: nil,
+ DefaultGateway: nil,
})
select {
- case <-time.After(100*time.Millisecond):
+ case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
- return
+ return
}
fmt.Printf("NS: Got DHCP Lease\n")
s.Provider.Set(NetworkStatus{
ExternalAddress: net.ParseIP("203.0.113.24"),
- DefaultGateway: net.ParseIP("203.0.113.1"),
+ DefaultGateway: net.ParseIP("203.0.113.1"),
})
select {
- case <-time.After(100*time.Millisecond):
+ case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
return
}
@@ -90,7 +92,7 @@
fmt.Printf("NS: DHCP Address changed\n")
s.Provider.Set(NetworkStatus{
ExternalAddress: net.ParseIP("203.0.113.103"),
- DefaultGateway: net.ParseIP("203.0.113.1"),
+ DefaultGateway: net.ParseIP("203.0.113.1"),
})
time.Sleep(100 * time.Millisecond)
@@ -136,4 +138,3 @@
// NS: DHCP Address changed
// /etc/hosts: foo.example.com is now 203.0.113.103
}
-
diff --git a/metropolis/pkg/event/memory_value.go b/metropolis/pkg/event/memory/memory.go
similarity index 78%
rename from metropolis/pkg/event/memory_value.go
rename to metropolis/pkg/event/memory/memory.go
index adf412d..1e94a93 100644
--- a/metropolis/pkg/event/memory_value.go
+++ b/metropolis/pkg/event/memory/memory.go
@@ -14,46 +14,49 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package event
+package memory
import (
"context"
"fmt"
"sync"
+
+ "source.monogon.dev/metropolis/pkg/event"
)
var (
- // Type assert that *MemoryValue implements Value. We do this artificially, as
+ // Type assert that *Value implements Value. We do this artificially, as
// there currently is no code path that needs this to be strictly true. However,
// users of this library might want to rely on the Value type instead of
// particular Value implementations.
- _ Value = &MemoryValue{}
+ _ event.Value = &Value{}
)
-// MemoryValue implements a Value stored in memory. It is safe to construct an
-// empty object of this type. However, this must not be copied.
-type MemoryValue struct {
+// Value is a 'memory value', which implements a event.Value stored in memory.
+// It is safe to construct an empty object of this type. However, this must not
+// be copied.
+type Value struct {
// mu guards the inner, innerSet and watchers fields.
mu sync.RWMutex
- // inner is the latest data Set on the MemoryValue. It is used to provide
- // the newest version of the Set data to new Watchers.
+ // inner is the latest data Set on the Value. It is used to provide the
+ // newest version of the Set data to new watchers.
inner interface{}
// innerSet is true when inner has been Set at least once. It is used to
// differentiate between a nil and unset value.
innerSet bool
// watchers is the list of watchers that should be updated when new data is
- // Set. It will grow on every .Watch() and shrink any time a Watcher is
+ // Set. It will grow on every .Watch() and shrink any time a watcher is
// determined to have been closed.
- watchers []*MemoryWatcher
+ watchers []*watcher
- // Sync, if set to true, blocks all .Set() calls on the MemoryValue until
- // all Watchers derived from it actively .Get() the new value. This can be
- // used to ensure Watchers always receive a full log of all Set() calls.
+ // Sync, if set to true, blocks all .Set() calls on the Value until all
+ // Watchers derived from it actively .Get() the new value. This can be used
+ // to ensure Watchers always receive a full log of all Set() calls.
//
// This must not be changed after the first .Set/.Watch call.
//
// This is an experimental API and subject to change. It might be migrated
- // to per-Watcher settings defined within the main Value/Watcher
+ // to per-Watcher settings defined within the main event.Value/Watcher
// interfaces.
Sync bool
}
@@ -61,18 +64,18 @@
// Set updates the Value to the given data. It is safe to call this from
// multiple goroutines, including concurrently.
//
-// For more information about guarantees, see Value.Set.
-func (m *MemoryValue) Set(val interface{}) {
+// For more information about guarantees, see event.Value.Set.
+func (m *Value) Set(val interface{}) {
m.mu.Lock()
defer m.mu.Unlock()
- // Update the data that is provided on first Get() to Watchers.
+ // Update the data that is provided on first Get() to watchers.
m.inner = val
m.innerSet = true
// Go through all watchers, updating them on the new value and filtering out
// all closed watchers.
- newWatchers := make([]*MemoryWatcher, 0, len(m.watchers))
+ newWatchers := make([]*watcher, 0, len(m.watchers))
for _, w := range m.watchers {
w := w
if w.closed() {
@@ -84,15 +87,15 @@
m.watchers = newWatchers
}
-// MemoryWatcher implements the Watcher interface for watchers returned by
-// MemoryValue.
-type MemoryWatcher struct {
+// watcher implements the event.Watcher interface for watchers returned by
+// Value.
+type watcher struct {
// activeReqC is a channel used to request an active submission channel
// from a pending Get function, if any.
activeReqC chan chan interface{}
// deadletterSubmitC is a channel used to communicate a value that
// attempted to be submitted via activeReqC. This will be received by the
- // deadletter worker of this Watcher and passed on to the next .Get call
+ // deadletter worker of this watcher and passed on to the next .Get call
// that occurs.
deadletterSubmitC chan interface{}
@@ -101,29 +104,29 @@
// active. It is implemented as a channel to permit concurrent .Get() calls
// to error out instead of blocking.
getSem chan struct{}
- // close is a channel that is closed when this Watcher is itself Closed.
+ // close is a channel that is closed when this watcher is itself Closed.
close chan struct{}
}
// Watch retrieves a Watcher that keeps track on the version of the data
// contained within the Value that was last seen by a consumer.
//
-// For more information about guarantees, see Value.Watch.
-func (m *MemoryValue) Watch() Watcher {
- waiter := &MemoryWatcher{
+// For more information about guarantees, see event.Value.Watch.
+func (m *Value) Watch() event.Watcher {
+ waiter := &watcher{
activeReqC: make(chan chan interface{}),
deadletterSubmitC: make(chan interface{}),
close: make(chan struct{}),
getSem: make(chan struct{}, 1),
}
// Start the deadletter worker as a goroutine. It will be stopped when the
- // Watcher is Closed() (as signaled by the close channel).
+ // watcher is Closed() (as signaled by the close channel).
go waiter.deadletterWorker()
- // Append this watcher to the MemoryValue.
+ // Append this watcher to the Value.
m.mu.Lock()
m.watchers = append(m.watchers, waiter)
- // If the MemoryValue already has some value set, communicate that to the
+ // If the Value already has some value set, communicate that to the
// first Get call by going through the deadletter worker.
if m.innerSet {
waiter.deadletterSubmitC <- m.inner
@@ -140,7 +143,7 @@
// It watches the deadletterSubmitC channel for updated data, and overrides
// previously received data. Then, when a .Get() begins to pend (and respond to
// activeReqC receives), the deadletter worker will deliver that value.
-func (m *MemoryWatcher) deadletterWorker() {
+func (m *watcher) deadletterWorker() {
// Current value, and flag to mark it as set (vs. nil).
var cur interface{}
var set bool
@@ -183,7 +186,7 @@
}
// closed returns whether this watcher has been closed.
-func (m *MemoryWatcher) closed() bool {
+func (m *watcher) closed() bool {
select {
case _, ok := <-m.close:
if !ok {
@@ -194,8 +197,8 @@
return false
}
-// update is the high level update-this-watcher function called by MemoryValue.
-func (m *MemoryWatcher) update(sync bool, val interface{}) {
+// update is the high level update-this-watcher function called by Value.
+func (m *watcher) update(sync bool, val interface{}) {
// If synchronous delivery was requested, block until a watcher .Gets it.
if sync {
c := <-m.activeReqC
@@ -221,15 +224,15 @@
}
}
-func (m *MemoryWatcher) Close() error {
+func (m *watcher) Close() error {
close(m.deadletterSubmitC)
close(m.close)
return nil
}
-// Get blocks until a Value's data is available. See Watcher.Get for guarantees
-// and more information.
-func (m *MemoryWatcher) Get(ctx context.Context) (interface{}, error) {
+// Get blocks until a Value's data is available. See event.Watcher.Get for
+// guarantees and more information.
+func (m *watcher) Get(ctx context.Context) (interface{}, error) {
// Make sure we're the only active .Get call.
select {
case m.getSem <- struct{}{}:
@@ -277,4 +280,3 @@
}
}
}
-
diff --git a/metropolis/pkg/event/memory_value_test.go b/metropolis/pkg/event/memory/memory_test.go
similarity index 94%
rename from metropolis/pkg/event/memory_value_test.go
rename to metropolis/pkg/event/memory/memory_test.go
index 4b0487c..f4feb33 100644
--- a/metropolis/pkg/event/memory_value_test.go
+++ b/metropolis/pkg/event/memory/memory_test.go
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package event
+package memory
import (
"context"
@@ -25,10 +25,10 @@
"time"
)
-// TestAsync exercises the high-level behaviour of a MemoryValue, in which a
+// TestAsync exercises the high-level behaviour of a Value, in which a
// watcher is able to catch up to the newest Set value.
func TestAsync(t *testing.T) {
- p := MemoryValue{}
+ p := Value{}
p.Set(0)
ctx := context.Background()
@@ -59,12 +59,12 @@
}
}
-// TestSyncBlocks exercises the MemoryValue's 'Sync' field, which makes all
+// TestSyncBlocks exercises the Value's 'Sync' field, which makes all
// Set() calls block until all respective watchers .Get() the updated data.
// This particular test ensures that .Set() calls to a Watcher result in a
// prefect log of updates being transmitted to a watcher.
func TestSync(t *testing.T) {
- p := MemoryValue{
+ p := Value{
Sync: true,
}
values := make(chan int, 100)
@@ -104,12 +104,12 @@
}
}
-// TestSyncBlocks exercises the MemoryValue's 'Sync' field, which makes all
+// TestSyncBlocks exercises the Value's 'Sync' field, which makes all
// Set() calls block until all respective watchers .Get() the updated data.
// This particular test ensures that .Set() calls actually block when a watcher
// is unattended.
func TestSyncBlocks(t *testing.T) {
- p := MemoryValue{
+ p := Value{
Sync: true,
}
ctx := context.Background()
@@ -175,7 +175,7 @@
// TestMultipleGets verifies that calling .Get() on a single watcher from two
// goroutines is prevented by returning an error in exactly one of them.
func TestMultipleGets(t *testing.T) {
- p := MemoryValue{}
+ p := Value{}
ctx := context.Background()
w := p.Watch()
@@ -198,13 +198,13 @@
}
}
-// TestConcurrency attempts to stress the MemoryValue/MemoryWatcher
+// TestConcurrency attempts to stress the Value/Watcher
// implementation to design limits (a hundred simultaneous watchers), ensuring
// that the watchers all settle to the final set value.
func TestConcurrency(t *testing.T) {
ctx := context.Background()
- p := MemoryValue{}
+ p := Value{}
p.Set(0)
// Number of watchers to create.
@@ -274,7 +274,7 @@
// aborts that particular Get call, but also allows subsequent use of the same
// watcher.
func TestCanceling(t *testing.T) {
- p := MemoryValue{
+ p := Value{
Sync: true,
}
@@ -316,7 +316,7 @@
func TestSetAfterWatch(t *testing.T) {
ctx := context.Background()
- p := MemoryValue{}
+ p := Value{}
p.Set(0)
watcher := p.Watch()