osbase/event/memory: remove Sync feature

The Sync feature was unused, and I'm not sure using it anywhere is a
good idea. If you really do need reliable delivery of events to multiple
consumers, then you probably also would need some mechansim for a
consumer to restart at the point in the history it has last seen before
it failed. Removing this feature simplifies the implementation and
allows new features to be implemented.

Change-Id: I94731355b7208f91256bc4610cb31a2720ab6c68
Reviewed-on: https://review.monogon.dev/c/monogon/+/3717
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/osbase/event/memory/memory.go b/osbase/event/memory/memory.go
index 16818a0..09b120f 100644
--- a/osbase/event/memory/memory.go
+++ b/osbase/event/memory/memory.go
@@ -49,17 +49,6 @@
 	// Set. It will grow on every .Watch() and shrink any time a watcher is
 	// determined to have been closed.
 	watchers []*watcher[T]
-
-	// Sync, if set to true, blocks all .Set() calls on the Value until all
-	// Watchers derived from it actively .Get() the new value. This can be used
-	// to ensure Watchers always receive a full log of all Set() calls.
-	//
-	// This must not be changed after the first .Set/.Watch call.
-	//
-	// This is an experimental API and subject to change. It might be migrated
-	// to per-Watcher settings defined within the main event.Value/Watcher
-	// interfaces.
-	Sync bool
 }
 
 // Set updates the Value to the given data. It is safe to call this from
@@ -81,7 +70,7 @@
 		if w.closed() {
 			continue
 		}
-		w.update(m.Sync, val)
+		w.update(val)
 		newWatchers = append(newWatchers, w)
 	}
 	if cap(newWatchers) > len(newWatchers)*3 {
@@ -94,11 +83,9 @@
 // watcher implements the event.Watcher interface for watchers returned by
 // Value.
 type watcher[T any] struct {
-	// bufferedC is a buffered channel of size 1 for submitting values to the
+	// valueC is a buffered channel of size 1 for submitting values to the
 	// watcher.
-	bufferedC chan T
-	// unbufferedC is an unbuffered channel, which is used when Sync is enabled.
-	unbufferedC chan T
+	valueC chan T
 
 	// getSem is a channel-based semaphore (which is of size 1, and thus in
 	// fact a mutex) that is used to ensure that only a single .Get() call is
@@ -115,10 +102,9 @@
 // For more information about guarantees, see event.Value.Watch.
 func (m *Value[T]) Watch() event.Watcher[T] {
 	waiter := &watcher[T]{
-		bufferedC:   make(chan T, 1),
-		unbufferedC: make(chan T),
-		close:       make(chan struct{}),
-		getSem:      make(chan struct{}, 1),
+		valueC: make(chan T, 1),
+		close:  make(chan struct{}),
+		getSem: make(chan struct{}, 1),
 	}
 
 	m.mu.Lock()
@@ -139,9 +125,9 @@
 	}
 	// Append this watcher to the Value.
 	m.watchers = append(m.watchers, waiter)
-	// If the Value already has some value set, put it in the buffered channel.
+	// If the Value already has some value set, put it in the channel.
 	if m.innerSet {
-		waiter.bufferedC <- m.inner
+		waiter.valueC <- m.inner
 	}
 	m.mu.Unlock()
 
@@ -161,25 +147,14 @@
 }
 
 // update is the high level update-this-watcher function called by Value.
-func (m *watcher[T]) update(sync bool, val T) {
-	// If synchronous delivery was requested, block until a watcher .Gets it,
-	// or is closed.
-	if sync {
-		select {
-		case m.unbufferedC <- val:
-		case <-m.close:
-		}
-		return
-	}
-
-	// Otherwise, deliver asynchronously. If there is already a value in the
-	// buffered channel that was not retrieved, drop it.
+func (m *watcher[T]) update(val T) {
+	// If there is already a value in the channel that was not retrieved, drop it.
 	select {
-	case <-m.bufferedC:
+	case <-m.valueC:
 	default:
 	}
 	// The channel is now empty, so sending to it cannot block.
-	m.bufferedC <- val
+	m.valueC <- val
 }
 
 func (m *watcher[T]) Close() error {
@@ -212,22 +187,14 @@
 	}
 
 	for {
-		var val T
-		// For Sync values, ensure the initial value in the buffered
-		// channel is delivered first.
 		select {
-		case val = <-m.bufferedC:
-		default:
-			select {
-			case <-ctx.Done():
-				return empty, ctx.Err()
-			case val = <-m.bufferedC:
-			case val = <-m.unbufferedC:
+		case <-ctx.Done():
+			return empty, ctx.Err()
+		case val := <-m.valueC:
+			if predicate != nil && !predicate(val) {
+				continue
 			}
+			return val, nil
 		}
-		if predicate != nil && !predicate(val) {
-			continue
-		}
-		return val, nil
 	}
 }
diff --git a/osbase/event/memory/memory_test.go b/osbase/event/memory/memory_test.go
index b622565..b7d168b 100644
--- a/osbase/event/memory/memory_test.go
+++ b/osbase/event/memory/memory_test.go
@@ -20,8 +20,6 @@
 	"context"
 	"errors"
 	"fmt"
-	"sync"
-	"sync/atomic"
 	"testing"
 	"time"
 
@@ -62,119 +60,6 @@
 	}
 }
 
-// TestSyncBlocks exercises the Value's 'Sync' field, which makes all
-// Set() calls block until all respective watchers .Get() the updated data.
-// This particular test ensures that .Set() calls to a Watcher result in a
-// prefect log of updates being transmitted to a watcher.
-func TestSync(t *testing.T) {
-	p := Value[int]{
-		Sync: true,
-	}
-	values := make(chan int, 100)
-	var wg sync.WaitGroup
-	wg.Add(1)
-	go func() {
-		ctx := context.Background()
-		watcher := p.Watch()
-		wg.Done()
-		for {
-			value, err := watcher.Get(ctx)
-			if err != nil {
-				panic(err)
-			}
-			values <- value
-		}
-	}()
-
-	p.Set(0)
-	wg.Wait()
-
-	want := []int{1, 2, 3, 4}
-	for _, w := range want {
-		p.Set(w)
-	}
-
-	timeout := time.After(time.Second)
-	for i, w := range append([]int{0}, want...) {
-		select {
-		case <-timeout:
-			t.Fatalf("timed out on value %d (%d)", i, w)
-		case val := <-values:
-			if w != val {
-				t.Errorf("value %d was %d, wanted %d", i, val, w)
-			}
-		}
-	}
-}
-
-// TestSyncBlocks exercises the Value's 'Sync' field, which makes all
-// Set() calls block until all respective watchers .Get() the updated data.
-// This particular test ensures that .Set() calls actually block when a watcher
-// is unattended.
-func TestSyncBlocks(t *testing.T) {
-	p := Value[int]{
-		Sync: true,
-	}
-	ctx := context.Background()
-
-	// Shouldn't block, as there's no declared watchers.
-	p.Set(0)
-
-	watcher := p.Watch()
-
-	// Should retrieve the zero, more requests will pend.
-	value, err := watcher.Get(ctx)
-	if err != nil {
-		t.Fatalf("Get: %v", err)
-	}
-	if want, got := 0, value; want != got {
-		t.Fatalf("Got initial value %d, wanted %d", got, want)
-	}
-
-	// .Set() Should block, as watcher is unattended.
-	//
-	// Whether something blocks in Go is untestable in a robust way (see: halting
-	// problem). We work around this this by introducing a 'stage' int64, which is
-	// put on the 'c' channel after the needs-to-block function returns. We then
-	// perform an action that should unblock this function right after updating
-	// 'stage' to a different value.
-	// Then, we observe what was put on the channel: If it's the initial value, it
-	// means the function didn't block when expected. Otherwise, it means the
-	// function unblocked when expected.
-	stage := int64(0)
-	c := make(chan int64, 1)
-	go func() {
-		p.Set(1)
-		c <- atomic.LoadInt64(&stage)
-	}()
-
-	// Getting should unblock the provider. Mark via 'stage' variable that
-	// unblocking now is expected.
-	atomic.StoreInt64(&stage, int64(1))
-	// Potential race: .Set() unblocks here due to some bug, before .Get() is
-	// called, and we record a false positive.
-	value, err = watcher.Get(ctx)
-	if err != nil {
-		t.Fatalf("Get: %v", err)
-	}
-
-	res := <-c
-	if res != int64(1) {
-		t.Fatalf("Set() returned before Get()")
-	}
-
-	if want, got := 1, value; want != got {
-		t.Fatalf("Wanted value %d, got %d", want, got)
-	}
-
-	// Closing the watcher and setting should not block anymore.
-	if err := watcher.Close(); err != nil {
-		t.Fatalf("Close: %v", err)
-	}
-	// Last step, if this blocks we will get a deadlock error and the test will panic.
-	p.Set(2)
-}
-
 // TestMultipleGets verifies that calling .Get() on a single watcher from two
 // goroutines is prevented by returning an error in exactly one of them.
 func TestMultipleGets(t *testing.T) {
@@ -277,9 +162,7 @@
 // aborts that particular Get call, but also allows subsequent use of the same
 // watcher.
 func TestCanceling(t *testing.T) {
-	p := Value[int]{
-		Sync: true,
-	}
+	p := Value[int]{}
 
 	ctx, ctxC := context.WithCancel(context.Background())