m/pkg/event/memory: simplify and fix implementation
The old implementation is very complicated and has multiple concurrency
bugs. One of these bugs is already described in a block comment. Another
bug is that if a watcher is closed while being updated, `update` will
try to send on the closed channel `deadletterSubmitC`, which panics.
This changes it to a much simpler implementation.
It could be further simplified it by dropping the Sync flag entirely,
which is currently unused. Or we could use the buffered channel also for
Sync values, and just skip dropping previous values. This still ensures
that all values are delivered, but changes the semantics slightly: Set
would no longer block until Get is called, if there is space in the
channel buffer.
Additionally, there is no longer a memory leak when a watcher is
repeatedly closed and readded, but Set never called. I added a test for
this.
Fixes #127
Change-Id: I2775a36cf2d097c5961a09a387428774a068e1f5
Reviewed-on: https://review.monogon.dev/c/monogon/+/2875
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Vouch-Run-CI: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/pkg/event/event.go b/metropolis/pkg/event/event.go
index 0898eb5..cb8b9b5 100644
--- a/metropolis/pkg/event/event.go
+++ b/metropolis/pkg/event/event.go
@@ -163,7 +163,7 @@
Get(context.Context, ...GetOption[T]) (T, error)
// Close must be called if the Watcher is not going to be used anymore -
- // otherwise, a goroutine will leak.
+ // otherwise, it will not be garbage collected.
Close() error
}
diff --git a/metropolis/pkg/event/memory/BUILD.bazel b/metropolis/pkg/event/memory/BUILD.bazel
index c8a8bae..da07dc3 100644
--- a/metropolis/pkg/event/memory/BUILD.bazel
+++ b/metropolis/pkg/event/memory/BUILD.bazel
@@ -15,6 +15,5 @@
"memory_test.go",
],
embed = [":memory"],
- # TODO: https://github.com/monogon-dev/monogon/issues/127
- flaky = True,
+ deps = ["//metropolis/pkg/event"],
)
diff --git a/metropolis/pkg/event/memory/memory.go b/metropolis/pkg/event/memory/memory.go
index f0a2ab9..c684c18 100644
--- a/metropolis/pkg/event/memory/memory.go
+++ b/metropolis/pkg/event/memory/memory.go
@@ -76,29 +76,29 @@
// Go through all watchers, updating them on the new value and filtering out
// all closed watchers.
- newWatchers := make([]*watcher[T], 0, len(m.watchers))
+ newWatchers := m.watchers[:0]
for _, w := range m.watchers {
- w := w
if w.closed() {
continue
}
w.update(m.Sync, val)
newWatchers = append(newWatchers, w)
}
+ if cap(newWatchers) > len(newWatchers)*3 {
+ reallocated := make([]*watcher[T], 0, len(newWatchers)*2)
+ newWatchers = append(reallocated, newWatchers...)
+ }
m.watchers = newWatchers
}
// watcher implements the event.Watcher interface for watchers returned by
// Value.
type watcher[T any] struct {
- // activeReqC is a channel used to request an active submission channel
- // from a pending Get function, if any.
- activeReqC chan chan T
- // deadletterSubmitC is a channel used to communicate a value that
- // attempted to be submitted via activeReqC. This will be received by the
- // deadletter worker of this watcher and passed on to the next .Get call
- // that occurs.
- deadletterSubmitC chan T
+ // bufferedC is a buffered channel of size 1 for submitting values to the
+ // watcher.
+ bufferedC chan T
+ // unbufferedC is an unbuffered channel, which is used when Sync is enabled.
+ unbufferedC chan T
// getSem is a channel-based semaphore (which is of size 1, and thus in
// fact a mutex) that is used to ensure that only a single .Get() call is
@@ -115,77 +115,39 @@
// For more information about guarantees, see event.Value.Watch.
func (m *Value[T]) Watch() event.Watcher[T] {
waiter := &watcher[T]{
- activeReqC: make(chan chan T),
- deadletterSubmitC: make(chan T),
- close: make(chan struct{}),
- getSem: make(chan struct{}, 1),
+ bufferedC: make(chan T, 1),
+ unbufferedC: make(chan T),
+ close: make(chan struct{}),
+ getSem: make(chan struct{}, 1),
}
- // Start the deadletter worker as a goroutine. It will be stopped when the
- // watcher is Closed() (as signaled by the close channel).
- go waiter.deadletterWorker()
- // Append this watcher to the Value.
m.mu.Lock()
+ // If the watchers slice is at capacity, drop closed watchers, and
+ // reallocate the slice at 2x length if it is not between 1.5x and 3x.
+ if len(m.watchers) == cap(m.watchers) {
+ newWatchers := m.watchers[:0]
+ for _, w := range m.watchers {
+ if !w.closed() {
+ newWatchers = append(newWatchers, w)
+ }
+ }
+ if cap(newWatchers)*2 < len(newWatchers)*3 || cap(newWatchers) > len(newWatchers)*3 {
+ reallocated := make([]*watcher[T], 0, len(newWatchers)*2)
+ newWatchers = append(reallocated, newWatchers...)
+ }
+ m.watchers = newWatchers
+ }
+ // Append this watcher to the Value.
m.watchers = append(m.watchers, waiter)
- // If the Value already has some value set, communicate that to the
- // first Get call by going through the deadletter worker.
+ // If the Value already has some value set, put it in the buffered channel.
if m.innerSet {
- waiter.deadletterSubmitC <- m.inner
+ waiter.bufferedC <- m.inner
}
m.mu.Unlock()
return waiter
}
-// deadletterWorker runs the 'deadletter worker', as goroutine that contains
-// any data that has been Set on the Value that is being watched that was
-// unable to be delivered directly to a pending .Get call.
-//
-// It watches the deadletterSubmitC channel for updated data, and overrides
-// previously received data. Then, when a .Get() begins to pend (and respond to
-// activeReqC receives), the deadletter worker will deliver that value.
-func (m *watcher[T]) deadletterWorker() {
- // Current value, and flag to mark it as set (vs. nil).
- var cur T
- var set bool
-
- for {
- if !set {
- // If no value is yet available, only attempt to receive one from the
- // submit channel, as there's nothing to submit to pending .Get() calls
- // yet.
- val, ok := <-m.deadletterSubmitC
- if !ok {
- // If the channel has been closed (by Close()), exit.
- return
- }
- cur = val
- set = true
- } else {
- // If a value is available, update the inner state. Otherwise, if a
- // .Get() is pending, submit our current state and unset it.
- select {
- case val, ok := <-m.deadletterSubmitC:
- if !ok {
- // If the channel has been closed (by Close()), exit.
- return
- }
- cur = val
- case c := <-m.activeReqC:
- // Potential race: a .Get() might've been active, but might've
- // quit by the time we're here (and will not receive on the
- // responded channel). Handle this gracefully by just returning
- // to the main loop if that's the case.
- select {
- case c <- cur:
- set = false
- default:
- }
- }
- }
- }
-}
-
// closed returns whether this watcher has been closed.
func (m *watcher[T]) closed() bool {
select {
@@ -200,33 +162,27 @@
// update is the high level update-this-watcher function called by Value.
func (m *watcher[T]) update(sync bool, val T) {
- // If synchronous delivery was requested, block until a watcher .Gets it.
+ // If synchronous delivery was requested, block until a watcher .Gets it,
+ // or is closed.
if sync {
- c := <-m.activeReqC
- c <- val
+ select {
+ case m.unbufferedC <- val:
+ case <-m.close:
+ }
return
}
- // Otherwise, deliver asynchronously. This means either delivering directly
- // to a pending .Get if one exists, or submitting to the deadletter worker
- // otherwise.
+ // Otherwise, deliver asynchronously. If there is already a value in the
+ // buffered channel that was not retrieved, drop it.
select {
- case c := <-m.activeReqC:
- // Potential race: a .Get() might've been active, but might've quit by
- // the time we're here (and will not receive on the responded channel).
- // Handle this gracefully by falling back to the deadletter worker.
- select {
- case c <- val:
- default:
- m.deadletterSubmitC <- val
- }
+ case <-m.bufferedC:
default:
- m.deadletterSubmitC <- val
}
+ // The channel is now empty, so sending to it cannot block.
+ m.bufferedC <- val
}
func (m *watcher[T]) Close() error {
- close(m.deadletterSubmitC)
close(m.close)
return nil
}
@@ -255,43 +211,23 @@
}
}
- c := make(chan T)
-
- // Start responding on activeReqC. This signals to .update and to the
- // deadletter worker that we're ready to accept data updates.
-
- // There is a potential for a race condition here that hasn't been observed
- // in tests but might happen:
- // 1) Value.Watch returns a Watcher 'w'.
- // 2) w.Set(0) is called, no .Get() is pending, so 0 is submitted to the
- // deadletter worker.
- // 3) w.Get() is called, and activeReqC begins to be served.
- // 4) Simultaneously:
- // a) w.Set(1) is called, attempting to submit via activeReqC
- // b) the deadletter worker attempts to submit via activeReqC
- //
- // This could theoretically cause .Get() to first return 1, and then 0, if
- // the Set activeReqC read and subsequent channel write is served before
- // the deadletter workers' read/write is.
- // As noted, however, this has not been observed in practice, even though
- // TestConcurrency explicitly attempts to trigger this condition. More
- // research needs to be done to attempt to trigger this (or to lawyer the
- // Go channel spec to see if this has some guarantees that resolve this
- // either way), or a preemptive fix can be attempted by adding monotonic
- // counters associated with each .Set() value, ensuring an older value does
- // not replace a newer value.
- //
- // TODO(q3k): investigate this.
for {
+ var val T
+ // For Sync values, ensure the initial value in the buffered
+ // channel is delivered first.
select {
- case <-ctx.Done():
- return empty, ctx.Err()
- case m.activeReqC <- c:
- case val := <-c:
- if predicate != nil && !predicate(val) {
- continue
+ case val = <-m.bufferedC:
+ default:
+ select {
+ case <-ctx.Done():
+ return empty, ctx.Err()
+ case val = <-m.bufferedC:
+ case val = <-m.unbufferedC:
}
- return val, nil
}
+ if predicate != nil && !predicate(val) {
+ continue
+ }
+ return val, nil
}
}
diff --git a/metropolis/pkg/event/memory/memory_test.go b/metropolis/pkg/event/memory/memory_test.go
index 41f121e..98a1501 100644
--- a/metropolis/pkg/event/memory/memory_test.go
+++ b/metropolis/pkg/event/memory/memory_test.go
@@ -23,6 +23,8 @@
"sync/atomic"
"testing"
"time"
+
+ "source.monogon.dev/metropolis/pkg/event"
)
// TestAsync exercises the high-level behaviour of a Value, in which a
@@ -330,3 +332,39 @@
t.Errorf("Get should've returned %v, got %v", want, got)
}
}
+
+// TestWatchersList ensures that the list of watchers is managed correctly,
+// i.e. there is no memory leak and closed watchers are removed while
+// keeping all non-closed watchers.
+func TestWatchersList(t *testing.T) {
+ ctx := context.Background()
+ p := Value[int]{}
+
+ var watchers []event.Watcher[int]
+ for i := 0; i < 100; i++ {
+ watchers = append(watchers, p.Watch())
+ }
+ for i := 0; i < 10000; i++ {
+ watchers[10].Close()
+ watchers[10] = p.Watch()
+ }
+
+ if want, got := 1000, cap(p.watchers); want <= got {
+ t.Fatalf("Got capacity %d, wanted less than %d", got, want)
+ }
+
+ p.Set(1)
+ if want, got := 100, len(p.watchers); want != got {
+ t.Fatalf("Got %d watchers, wanted %d", got, want)
+ }
+
+ for _, watcher := range watchers {
+ data, err := watcher.Get(ctx)
+ if err != nil {
+ t.Fatalf("Get: %v", err)
+ }
+ if want, got := 1, data; want != got {
+ t.Errorf("Get should've returned %v, got %v", want, got)
+ }
+ }
+}