blob: adf412d1579052f87397056a4142df2d68fedd08 [file] [log] [blame]
Serge Bazanskic00318e2021-03-03 12:39:24 +01001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package event
18
19import (
20 "context"
21 "fmt"
22 "sync"
23)
24
25var (
26 // Type assert that *MemoryValue implements Value. We do this artificially, as
27 // there currently is no code path that needs this to be strictly true. However,
28 // users of this library might want to rely on the Value type instead of
29 // particular Value implementations.
30 _ Value = &MemoryValue{}
31)
32
33// MemoryValue implements a Value stored in memory. It is safe to construct an
34// empty object of this type. However, this must not be copied.
35type MemoryValue struct {
36 // mu guards the inner, innerSet and watchers fields.
37 mu sync.RWMutex
38 // inner is the latest data Set on the MemoryValue. It is used to provide
39 // the newest version of the Set data to new Watchers.
40 inner interface{}
41 // innerSet is true when inner has been Set at least once. It is used to
42 // differentiate between a nil and unset value.
43 innerSet bool
44 // watchers is the list of watchers that should be updated when new data is
45 // Set. It will grow on every .Watch() and shrink any time a Watcher is
46 // determined to have been closed.
47 watchers []*MemoryWatcher
48
49 // Sync, if set to true, blocks all .Set() calls on the MemoryValue until
50 // all Watchers derived from it actively .Get() the new value. This can be
51 // used to ensure Watchers always receive a full log of all Set() calls.
52 //
53 // This must not be changed after the first .Set/.Watch call.
54 //
55 // This is an experimental API and subject to change. It might be migrated
56 // to per-Watcher settings defined within the main Value/Watcher
57 // interfaces.
58 Sync bool
59}
60
61// Set updates the Value to the given data. It is safe to call this from
62// multiple goroutines, including concurrently.
63//
64// For more information about guarantees, see Value.Set.
65func (m *MemoryValue) Set(val interface{}) {
66 m.mu.Lock()
67 defer m.mu.Unlock()
68
69 // Update the data that is provided on first Get() to Watchers.
70 m.inner = val
71 m.innerSet = true
72
73 // Go through all watchers, updating them on the new value and filtering out
74 // all closed watchers.
75 newWatchers := make([]*MemoryWatcher, 0, len(m.watchers))
76 for _, w := range m.watchers {
77 w := w
78 if w.closed() {
79 continue
80 }
81 w.update(m.Sync, val)
82 newWatchers = append(newWatchers, w)
83 }
84 m.watchers = newWatchers
85}
86
87// MemoryWatcher implements the Watcher interface for watchers returned by
88// MemoryValue.
89type MemoryWatcher struct {
90 // activeReqC is a channel used to request an active submission channel
91 // from a pending Get function, if any.
92 activeReqC chan chan interface{}
93 // deadletterSubmitC is a channel used to communicate a value that
94 // attempted to be submitted via activeReqC. This will be received by the
95 // deadletter worker of this Watcher and passed on to the next .Get call
96 // that occurs.
97 deadletterSubmitC chan interface{}
98
99 // getSem is a channel-based semaphore (which is of size 1, and thus in
100 // fact a mutex) that is used to ensure that only a single .Get() call is
101 // active. It is implemented as a channel to permit concurrent .Get() calls
102 // to error out instead of blocking.
103 getSem chan struct{}
104 // close is a channel that is closed when this Watcher is itself Closed.
105 close chan struct{}
106}
107
108// Watch retrieves a Watcher that keeps track on the version of the data
109// contained within the Value that was last seen by a consumer.
110//
111// For more information about guarantees, see Value.Watch.
112func (m *MemoryValue) Watch() Watcher {
113 waiter := &MemoryWatcher{
114 activeReqC: make(chan chan interface{}),
115 deadletterSubmitC: make(chan interface{}),
116 close: make(chan struct{}),
117 getSem: make(chan struct{}, 1),
118 }
119 // Start the deadletter worker as a goroutine. It will be stopped when the
120 // Watcher is Closed() (as signaled by the close channel).
121 go waiter.deadletterWorker()
122
123 // Append this watcher to the MemoryValue.
124 m.mu.Lock()
125 m.watchers = append(m.watchers, waiter)
126 // If the MemoryValue already has some value set, communicate that to the
127 // first Get call by going through the deadletter worker.
128 if m.innerSet {
129 waiter.deadletterSubmitC <- m.inner
130 }
131 m.mu.Unlock()
132
133 return waiter
134}
135
136// deadletterWorker runs the 'deadletter worker', as goroutine that contains
137// any data that has been Set on the Value that is being watched that was
138// unable to be delivered directly to a pending .Get call.
139//
140// It watches the deadletterSubmitC channel for updated data, and overrides
141// previously received data. Then, when a .Get() begins to pend (and respond to
142// activeReqC receives), the deadletter worker will deliver that value.
143func (m *MemoryWatcher) deadletterWorker() {
144 // Current value, and flag to mark it as set (vs. nil).
145 var cur interface{}
146 var set bool
147
148 for {
149 if !set {
150 // If no value is yet available, only attempt to receive one from the
151 // submit channel, as there's nothing to submit to pending .Get() calls
152 // yet.
153 val, ok := <-m.deadletterSubmitC
154 if !ok {
155 // If the channel has been closed (by Close()), exit.
156 return
157 }
158 cur = val
159 set = true
160 } else {
161 // If a value is available, update the inner state. Otherwise, if a
162 // .Get() is pending, submit our current state and unset it.
163 select {
164 case val, ok := <-m.deadletterSubmitC:
165 if !ok {
166 // If the channel has been closed (by Close()), exit.
167 return
168 }
169 cur = val
170 case c := <-m.activeReqC:
171 // Potential race: a .Get() might've been active, but might've
172 // quit by the time we're here (and will not receive on the
173 // responded channel). Handle this gracefully by just returning
174 // to the main loop if that's the case.
175 select {
176 case c <- cur:
177 set = false
178 default:
179 }
180 }
181 }
182 }
183}
184
185// closed returns whether this watcher has been closed.
186func (m *MemoryWatcher) closed() bool {
187 select {
188 case _, ok := <-m.close:
189 if !ok {
190 return true
191 }
192 default:
193 }
194 return false
195}
196
197// update is the high level update-this-watcher function called by MemoryValue.
198func (m *MemoryWatcher) update(sync bool, val interface{}) {
199 // If synchronous delivery was requested, block until a watcher .Gets it.
200 if sync {
201 c := <-m.activeReqC
202 c <- val
203 return
204 }
205
206 // Otherwise, deliver asynchronously. This means either delivering directly
207 // to a pending .Get if one exists, or submitting to the deadletter worker
208 // otherwise.
209 select {
210 case c := <-m.activeReqC:
211 // Potential race: a .Get() might've been active, but might've quit by
212 // the time we're here (and will not receive on the responded channel).
213 // Handle this gracefully by falling back to the deadletter worker.
214 select {
215 case c <- val:
216 default:
217 m.deadletterSubmitC <- val
218 }
219 default:
220 m.deadletterSubmitC <- val
221 }
222}
223
224func (m *MemoryWatcher) Close() error {
225 close(m.deadletterSubmitC)
226 close(m.close)
227 return nil
228}
229
230// Get blocks until a Value's data is available. See Watcher.Get for guarantees
231// and more information.
232func (m *MemoryWatcher) Get(ctx context.Context) (interface{}, error) {
233 // Make sure we're the only active .Get call.
234 select {
235 case m.getSem <- struct{}{}:
236 default:
237 return nil, fmt.Errorf("cannot Get() concurrently on a single waiter")
238 }
239 defer func() {
240 <-m.getSem
241 }()
242
243 c := make(chan interface{})
244
245 // Start responding on activeReqC. This signals to .update and to the
246 // deadletter worker that we're ready to accept data updates.
247
248 // There is a potential for a race condition here that hasn't been observed
249 // in tests but might happen:
250 // 1) Value.Watch returns a Watcher 'w'.
251 // 2) w.Set(0) is called, no .Get() is pending, so 0 is submitted to the
252 // deadletter worker.
253 // 3) w.Get() is called, and activeReqC begins to be served.
254 // 4) Simultaneously:
255 // a) w.Set(1) is called, attempting to submit via activeReqC
256 // b) the deadletter worker attempts to submit via activeReqC
257 //
258 // This could theoretically cause .Get() to first return 1, and then 0, if
259 // the Set activeReqC read and subsequent channel write is served before
260 // the deadletter workers' read/write is.
261 // As noted, however, this has not been observed in practice, even though
262 // TestConcurrency explicitly attempts to trigger this condition. More
263 // research needs to be done to attempt to trigger this (or to lawyer the
264 // Go channel spec to see if this has some guarantees that resolve this
265 // either way), or a preemptive fix can be attempted by adding monotonic
266 // counters associated with each .Set() value, ensuring an older value does
267 // not replace a newer value.
268 //
269 // TODO(q3k): investigate this.
270 for {
271 select {
272 case <-ctx.Done():
273 return nil, ctx.Err()
274 case m.activeReqC <- c:
275 case val := <-c:
276 return val, nil
277 }
278 }
279}
280