m/pkg/event/memory: simplify and fix implementation
The old implementation is very complicated and has multiple concurrency
bugs. One of these bugs is already described in a block comment. Another
bug is that if a watcher is closed while being updated, `update` will
try to send on the closed channel `deadletterSubmitC`, which panics.
This changes it to a much simpler implementation.
It could be further simplified it by dropping the Sync flag entirely,
which is currently unused. Or we could use the buffered channel also for
Sync values, and just skip dropping previous values. This still ensures
that all values are delivered, but changes the semantics slightly: Set
would no longer block until Get is called, if there is space in the
channel buffer.
Additionally, there is no longer a memory leak when a watcher is
repeatedly closed and readded, but Set never called. I added a test for
this.
Fixes #127
Change-Id: I2775a36cf2d097c5961a09a387428774a068e1f5
Reviewed-on: https://review.monogon.dev/c/monogon/+/2875
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Vouch-Run-CI: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/pkg/event/memory/memory_test.go b/metropolis/pkg/event/memory/memory_test.go
index 41f121e..98a1501 100644
--- a/metropolis/pkg/event/memory/memory_test.go
+++ b/metropolis/pkg/event/memory/memory_test.go
@@ -23,6 +23,8 @@
"sync/atomic"
"testing"
"time"
+
+ "source.monogon.dev/metropolis/pkg/event"
)
// TestAsync exercises the high-level behaviour of a Value, in which a
@@ -330,3 +332,39 @@
t.Errorf("Get should've returned %v, got %v", want, got)
}
}
+
+// TestWatchersList ensures that the list of watchers is managed correctly,
+// i.e. there is no memory leak and closed watchers are removed while
+// keeping all non-closed watchers.
+func TestWatchersList(t *testing.T) {
+ ctx := context.Background()
+ p := Value[int]{}
+
+ var watchers []event.Watcher[int]
+ for i := 0; i < 100; i++ {
+ watchers = append(watchers, p.Watch())
+ }
+ for i := 0; i < 10000; i++ {
+ watchers[10].Close()
+ watchers[10] = p.Watch()
+ }
+
+ if want, got := 1000, cap(p.watchers); want <= got {
+ t.Fatalf("Got capacity %d, wanted less than %d", got, want)
+ }
+
+ p.Set(1)
+ if want, got := 100, len(p.watchers); want != got {
+ t.Fatalf("Got %d watchers, wanted %d", got, want)
+ }
+
+ for _, watcher := range watchers {
+ data, err := watcher.Get(ctx)
+ if err != nil {
+ t.Fatalf("Get: %v", err)
+ }
+ if want, got := 1, data; want != got {
+ t.Errorf("Get should've returned %v, got %v", want, got)
+ }
+ }
+}