blob: f0a2ab9e41c5bb45cc0c27da5a8dfc577722ddf0 [file] [log] [blame]
Serge Bazanskic00318e2021-03-03 12:39:24 +01001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020017package memory
Serge Bazanskic00318e2021-03-03 12:39:24 +010018
19import (
20 "context"
Serge Bazanski37110c32023-03-01 13:57:27 +000021 "errors"
Serge Bazanskic00318e2021-03-03 12:39:24 +010022 "fmt"
23 "sync"
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020024
25 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanskic00318e2021-03-03 12:39:24 +010026)
27
28var (
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020029 // Type assert that *Value implements Value. We do this artificially, as
Serge Bazanskic00318e2021-03-03 12:39:24 +010030 // there currently is no code path that needs this to be strictly true. However,
31 // users of this library might want to rely on the Value type instead of
32 // particular Value implementations.
Serge Bazanski37110c32023-03-01 13:57:27 +000033 _ event.Value[int] = &Value[int]{}
Serge Bazanskic00318e2021-03-03 12:39:24 +010034)
35
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020036// Value is a 'memory value', which implements a event.Value stored in memory.
37// It is safe to construct an empty object of this type. However, this must not
38// be copied.
Serge Bazanski37110c32023-03-01 13:57:27 +000039type Value[T any] struct {
Serge Bazanskic00318e2021-03-03 12:39:24 +010040 // mu guards the inner, innerSet and watchers fields.
41 mu sync.RWMutex
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020042 // inner is the latest data Set on the Value. It is used to provide the
43 // newest version of the Set data to new watchers.
Serge Bazanski37110c32023-03-01 13:57:27 +000044 inner T
Serge Bazanskic00318e2021-03-03 12:39:24 +010045 // innerSet is true when inner has been Set at least once. It is used to
46 // differentiate between a nil and unset value.
47 innerSet bool
48 // watchers is the list of watchers that should be updated when new data is
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020049 // Set. It will grow on every .Watch() and shrink any time a watcher is
Serge Bazanskic00318e2021-03-03 12:39:24 +010050 // determined to have been closed.
Serge Bazanski37110c32023-03-01 13:57:27 +000051 watchers []*watcher[T]
Serge Bazanskic00318e2021-03-03 12:39:24 +010052
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020053 // Sync, if set to true, blocks all .Set() calls on the Value until all
54 // Watchers derived from it actively .Get() the new value. This can be used
55 // to ensure Watchers always receive a full log of all Set() calls.
Serge Bazanskic00318e2021-03-03 12:39:24 +010056 //
57 // This must not be changed after the first .Set/.Watch call.
58 //
59 // This is an experimental API and subject to change. It might be migrated
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020060 // to per-Watcher settings defined within the main event.Value/Watcher
Serge Bazanskic00318e2021-03-03 12:39:24 +010061 // interfaces.
62 Sync bool
63}
64
65// Set updates the Value to the given data. It is safe to call this from
66// multiple goroutines, including concurrently.
67//
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020068// For more information about guarantees, see event.Value.Set.
Serge Bazanski37110c32023-03-01 13:57:27 +000069func (m *Value[T]) Set(val T) {
Serge Bazanskic00318e2021-03-03 12:39:24 +010070 m.mu.Lock()
71 defer m.mu.Unlock()
72
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020073 // Update the data that is provided on first Get() to watchers.
Serge Bazanskic00318e2021-03-03 12:39:24 +010074 m.inner = val
75 m.innerSet = true
76
77 // Go through all watchers, updating them on the new value and filtering out
78 // all closed watchers.
Serge Bazanski37110c32023-03-01 13:57:27 +000079 newWatchers := make([]*watcher[T], 0, len(m.watchers))
Serge Bazanskic00318e2021-03-03 12:39:24 +010080 for _, w := range m.watchers {
81 w := w
82 if w.closed() {
83 continue
84 }
85 w.update(m.Sync, val)
86 newWatchers = append(newWatchers, w)
87 }
88 m.watchers = newWatchers
89}
90
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020091// watcher implements the event.Watcher interface for watchers returned by
92// Value.
Serge Bazanski37110c32023-03-01 13:57:27 +000093type watcher[T any] struct {
Serge Bazanskic00318e2021-03-03 12:39:24 +010094 // activeReqC is a channel used to request an active submission channel
95 // from a pending Get function, if any.
Serge Bazanski37110c32023-03-01 13:57:27 +000096 activeReqC chan chan T
Serge Bazanskic00318e2021-03-03 12:39:24 +010097 // deadletterSubmitC is a channel used to communicate a value that
98 // attempted to be submitted via activeReqC. This will be received by the
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020099 // deadletter worker of this watcher and passed on to the next .Get call
Serge Bazanskic00318e2021-03-03 12:39:24 +0100100 // that occurs.
Serge Bazanski37110c32023-03-01 13:57:27 +0000101 deadletterSubmitC chan T
Serge Bazanskic00318e2021-03-03 12:39:24 +0100102
103 // getSem is a channel-based semaphore (which is of size 1, and thus in
104 // fact a mutex) that is used to ensure that only a single .Get() call is
105 // active. It is implemented as a channel to permit concurrent .Get() calls
106 // to error out instead of blocking.
107 getSem chan struct{}
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200108 // close is a channel that is closed when this watcher is itself Closed.
Serge Bazanskic00318e2021-03-03 12:39:24 +0100109 close chan struct{}
110}
111
112// Watch retrieves a Watcher that keeps track on the version of the data
113// contained within the Value that was last seen by a consumer.
114//
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200115// For more information about guarantees, see event.Value.Watch.
Serge Bazanski37110c32023-03-01 13:57:27 +0000116func (m *Value[T]) Watch() event.Watcher[T] {
117 waiter := &watcher[T]{
118 activeReqC: make(chan chan T),
119 deadletterSubmitC: make(chan T),
Serge Bazanskic00318e2021-03-03 12:39:24 +0100120 close: make(chan struct{}),
121 getSem: make(chan struct{}, 1),
122 }
123 // Start the deadletter worker as a goroutine. It will be stopped when the
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200124 // watcher is Closed() (as signaled by the close channel).
Serge Bazanskic00318e2021-03-03 12:39:24 +0100125 go waiter.deadletterWorker()
126
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200127 // Append this watcher to the Value.
Serge Bazanskic00318e2021-03-03 12:39:24 +0100128 m.mu.Lock()
129 m.watchers = append(m.watchers, waiter)
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200130 // If the Value already has some value set, communicate that to the
Serge Bazanskic00318e2021-03-03 12:39:24 +0100131 // first Get call by going through the deadletter worker.
132 if m.innerSet {
133 waiter.deadletterSubmitC <- m.inner
134 }
135 m.mu.Unlock()
136
137 return waiter
138}
139
140// deadletterWorker runs the 'deadletter worker', as goroutine that contains
141// any data that has been Set on the Value that is being watched that was
142// unable to be delivered directly to a pending .Get call.
143//
144// It watches the deadletterSubmitC channel for updated data, and overrides
145// previously received data. Then, when a .Get() begins to pend (and respond to
146// activeReqC receives), the deadletter worker will deliver that value.
Serge Bazanski37110c32023-03-01 13:57:27 +0000147func (m *watcher[T]) deadletterWorker() {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100148 // Current value, and flag to mark it as set (vs. nil).
Serge Bazanski37110c32023-03-01 13:57:27 +0000149 var cur T
Serge Bazanskic00318e2021-03-03 12:39:24 +0100150 var set bool
151
152 for {
153 if !set {
154 // If no value is yet available, only attempt to receive one from the
155 // submit channel, as there's nothing to submit to pending .Get() calls
156 // yet.
157 val, ok := <-m.deadletterSubmitC
158 if !ok {
159 // If the channel has been closed (by Close()), exit.
160 return
161 }
162 cur = val
163 set = true
164 } else {
165 // If a value is available, update the inner state. Otherwise, if a
166 // .Get() is pending, submit our current state and unset it.
167 select {
168 case val, ok := <-m.deadletterSubmitC:
169 if !ok {
170 // If the channel has been closed (by Close()), exit.
171 return
172 }
173 cur = val
174 case c := <-m.activeReqC:
175 // Potential race: a .Get() might've been active, but might've
176 // quit by the time we're here (and will not receive on the
177 // responded channel). Handle this gracefully by just returning
178 // to the main loop if that's the case.
179 select {
180 case c <- cur:
181 set = false
182 default:
183 }
184 }
185 }
186 }
187}
188
189// closed returns whether this watcher has been closed.
Serge Bazanski37110c32023-03-01 13:57:27 +0000190func (m *watcher[T]) closed() bool {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100191 select {
192 case _, ok := <-m.close:
193 if !ok {
194 return true
195 }
196 default:
197 }
198 return false
199}
200
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200201// update is the high level update-this-watcher function called by Value.
Serge Bazanski37110c32023-03-01 13:57:27 +0000202func (m *watcher[T]) update(sync bool, val T) {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100203 // If synchronous delivery was requested, block until a watcher .Gets it.
204 if sync {
205 c := <-m.activeReqC
206 c <- val
207 return
208 }
209
210 // Otherwise, deliver asynchronously. This means either delivering directly
211 // to a pending .Get if one exists, or submitting to the deadletter worker
212 // otherwise.
213 select {
214 case c := <-m.activeReqC:
215 // Potential race: a .Get() might've been active, but might've quit by
216 // the time we're here (and will not receive on the responded channel).
217 // Handle this gracefully by falling back to the deadletter worker.
218 select {
219 case c <- val:
220 default:
221 m.deadletterSubmitC <- val
222 }
223 default:
224 m.deadletterSubmitC <- val
225 }
226}
227
Serge Bazanski37110c32023-03-01 13:57:27 +0000228func (m *watcher[T]) Close() error {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100229 close(m.deadletterSubmitC)
230 close(m.close)
231 return nil
232}
233
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200234// Get blocks until a Value's data is available. See event.Watcher.Get for
235// guarantees and more information.
Serge Bazanski37110c32023-03-01 13:57:27 +0000236func (m *watcher[T]) Get(ctx context.Context, opts ...event.GetOption[T]) (T, error) {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100237 // Make sure we're the only active .Get call.
Serge Bazanski37110c32023-03-01 13:57:27 +0000238 var empty T
Serge Bazanskic00318e2021-03-03 12:39:24 +0100239 select {
240 case m.getSem <- struct{}{}:
241 default:
Serge Bazanski37110c32023-03-01 13:57:27 +0000242 return empty, fmt.Errorf("cannot Get() concurrently on a single waiter")
Serge Bazanskic00318e2021-03-03 12:39:24 +0100243 }
244 defer func() {
245 <-m.getSem
246 }()
247
Serge Bazanski37110c32023-03-01 13:57:27 +0000248 var predicate func(t T) bool
249 for _, opt := range opts {
250 if opt.Predicate != nil {
251 predicate = opt.Predicate
252 }
253 if opt.BacklogOnly != false {
254 return empty, errors.New("BacklogOnly is not implemented for memory watchers")
Serge Bazanski8d45a052021-10-18 17:24:24 +0200255 }
256 }
257
Serge Bazanski37110c32023-03-01 13:57:27 +0000258 c := make(chan T)
Serge Bazanskic00318e2021-03-03 12:39:24 +0100259
260 // Start responding on activeReqC. This signals to .update and to the
261 // deadletter worker that we're ready to accept data updates.
262
263 // There is a potential for a race condition here that hasn't been observed
264 // in tests but might happen:
265 // 1) Value.Watch returns a Watcher 'w'.
266 // 2) w.Set(0) is called, no .Get() is pending, so 0 is submitted to the
267 // deadletter worker.
268 // 3) w.Get() is called, and activeReqC begins to be served.
269 // 4) Simultaneously:
270 // a) w.Set(1) is called, attempting to submit via activeReqC
271 // b) the deadletter worker attempts to submit via activeReqC
272 //
273 // This could theoretically cause .Get() to first return 1, and then 0, if
274 // the Set activeReqC read and subsequent channel write is served before
275 // the deadletter workers' read/write is.
276 // As noted, however, this has not been observed in practice, even though
277 // TestConcurrency explicitly attempts to trigger this condition. More
278 // research needs to be done to attempt to trigger this (or to lawyer the
279 // Go channel spec to see if this has some guarantees that resolve this
280 // either way), or a preemptive fix can be attempted by adding monotonic
281 // counters associated with each .Set() value, ensuring an older value does
282 // not replace a newer value.
283 //
284 // TODO(q3k): investigate this.
285 for {
286 select {
287 case <-ctx.Done():
Serge Bazanski37110c32023-03-01 13:57:27 +0000288 return empty, ctx.Err()
Serge Bazanskic00318e2021-03-03 12:39:24 +0100289 case m.activeReqC <- c:
290 case val := <-c:
Serge Bazanski37110c32023-03-01 13:57:27 +0000291 if predicate != nil && !predicate(val) {
292 continue
293 }
Serge Bazanskic00318e2021-03-03 12:39:24 +0100294 return val, nil
295 }
296 }
297}