osbase/event/memory: remove Sync feature
The Sync feature was unused, and I'm not sure using it anywhere is a
good idea. If you really do need reliable delivery of events to multiple
consumers, then you probably also would need some mechansim for a
consumer to restart at the point in the history it has last seen before
it failed. Removing this feature simplifies the implementation and
allows new features to be implemented.
Change-Id: I94731355b7208f91256bc4610cb31a2720ab6c68
Reviewed-on: https://review.monogon.dev/c/monogon/+/3717
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/osbase/event/memory/memory.go b/osbase/event/memory/memory.go
index 16818a0..09b120f 100644
--- a/osbase/event/memory/memory.go
+++ b/osbase/event/memory/memory.go
@@ -49,17 +49,6 @@
// Set. It will grow on every .Watch() and shrink any time a watcher is
// determined to have been closed.
watchers []*watcher[T]
-
- // Sync, if set to true, blocks all .Set() calls on the Value until all
- // Watchers derived from it actively .Get() the new value. This can be used
- // to ensure Watchers always receive a full log of all Set() calls.
- //
- // This must not be changed after the first .Set/.Watch call.
- //
- // This is an experimental API and subject to change. It might be migrated
- // to per-Watcher settings defined within the main event.Value/Watcher
- // interfaces.
- Sync bool
}
// Set updates the Value to the given data. It is safe to call this from
@@ -81,7 +70,7 @@
if w.closed() {
continue
}
- w.update(m.Sync, val)
+ w.update(val)
newWatchers = append(newWatchers, w)
}
if cap(newWatchers) > len(newWatchers)*3 {
@@ -94,11 +83,9 @@
// watcher implements the event.Watcher interface for watchers returned by
// Value.
type watcher[T any] struct {
- // bufferedC is a buffered channel of size 1 for submitting values to the
+ // valueC is a buffered channel of size 1 for submitting values to the
// watcher.
- bufferedC chan T
- // unbufferedC is an unbuffered channel, which is used when Sync is enabled.
- unbufferedC chan T
+ valueC chan T
// getSem is a channel-based semaphore (which is of size 1, and thus in
// fact a mutex) that is used to ensure that only a single .Get() call is
@@ -115,10 +102,9 @@
// For more information about guarantees, see event.Value.Watch.
func (m *Value[T]) Watch() event.Watcher[T] {
waiter := &watcher[T]{
- bufferedC: make(chan T, 1),
- unbufferedC: make(chan T),
- close: make(chan struct{}),
- getSem: make(chan struct{}, 1),
+ valueC: make(chan T, 1),
+ close: make(chan struct{}),
+ getSem: make(chan struct{}, 1),
}
m.mu.Lock()
@@ -139,9 +125,9 @@
}
// Append this watcher to the Value.
m.watchers = append(m.watchers, waiter)
- // If the Value already has some value set, put it in the buffered channel.
+ // If the Value already has some value set, put it in the channel.
if m.innerSet {
- waiter.bufferedC <- m.inner
+ waiter.valueC <- m.inner
}
m.mu.Unlock()
@@ -161,25 +147,14 @@
}
// update is the high level update-this-watcher function called by Value.
-func (m *watcher[T]) update(sync bool, val T) {
- // If synchronous delivery was requested, block until a watcher .Gets it,
- // or is closed.
- if sync {
- select {
- case m.unbufferedC <- val:
- case <-m.close:
- }
- return
- }
-
- // Otherwise, deliver asynchronously. If there is already a value in the
- // buffered channel that was not retrieved, drop it.
+func (m *watcher[T]) update(val T) {
+ // If there is already a value in the channel that was not retrieved, drop it.
select {
- case <-m.bufferedC:
+ case <-m.valueC:
default:
}
// The channel is now empty, so sending to it cannot block.
- m.bufferedC <- val
+ m.valueC <- val
}
func (m *watcher[T]) Close() error {
@@ -212,22 +187,14 @@
}
for {
- var val T
- // For Sync values, ensure the initial value in the buffered
- // channel is delivered first.
select {
- case val = <-m.bufferedC:
- default:
- select {
- case <-ctx.Done():
- return empty, ctx.Err()
- case val = <-m.bufferedC:
- case val = <-m.unbufferedC:
+ case <-ctx.Done():
+ return empty, ctx.Err()
+ case val := <-m.valueC:
+ if predicate != nil && !predicate(val) {
+ continue
}
+ return val, nil
}
- if predicate != nil && !predicate(val) {
- continue
- }
- return val, nil
}
}