blob: 0db25244b4975141c5068d6b7f5bc81db94c7be1 [file] [log] [blame]
Serge Bazanskic00318e2021-03-03 12:39:24 +01001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020017package memory
Serge Bazanskic00318e2021-03-03 12:39:24 +010018
19import (
20 "context"
21 "fmt"
22 "sync"
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020023
24 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanskic00318e2021-03-03 12:39:24 +010025)
26
27var (
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020028 // Type assert that *Value implements Value. We do this artificially, as
Serge Bazanskic00318e2021-03-03 12:39:24 +010029 // there currently is no code path that needs this to be strictly true. However,
30 // users of this library might want to rely on the Value type instead of
31 // particular Value implementations.
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020032 _ event.Value = &Value{}
Serge Bazanskic00318e2021-03-03 12:39:24 +010033)
34
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020035// Value is a 'memory value', which implements a event.Value stored in memory.
36// It is safe to construct an empty object of this type. However, this must not
37// be copied.
38type Value struct {
Serge Bazanskic00318e2021-03-03 12:39:24 +010039 // mu guards the inner, innerSet and watchers fields.
40 mu sync.RWMutex
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020041 // inner is the latest data Set on the Value. It is used to provide the
42 // newest version of the Set data to new watchers.
Serge Bazanskic00318e2021-03-03 12:39:24 +010043 inner interface{}
44 // innerSet is true when inner has been Set at least once. It is used to
45 // differentiate between a nil and unset value.
46 innerSet bool
47 // watchers is the list of watchers that should be updated when new data is
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020048 // Set. It will grow on every .Watch() and shrink any time a watcher is
Serge Bazanskic00318e2021-03-03 12:39:24 +010049 // determined to have been closed.
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020050 watchers []*watcher
Serge Bazanskic00318e2021-03-03 12:39:24 +010051
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020052 // Sync, if set to true, blocks all .Set() calls on the Value until all
53 // Watchers derived from it actively .Get() the new value. This can be used
54 // to ensure Watchers always receive a full log of all Set() calls.
Serge Bazanskic00318e2021-03-03 12:39:24 +010055 //
56 // This must not be changed after the first .Set/.Watch call.
57 //
58 // This is an experimental API and subject to change. It might be migrated
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020059 // to per-Watcher settings defined within the main event.Value/Watcher
Serge Bazanskic00318e2021-03-03 12:39:24 +010060 // interfaces.
61 Sync bool
62}
63
64// Set updates the Value to the given data. It is safe to call this from
65// multiple goroutines, including concurrently.
66//
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020067// For more information about guarantees, see event.Value.Set.
68func (m *Value) Set(val interface{}) {
Serge Bazanskic00318e2021-03-03 12:39:24 +010069 m.mu.Lock()
70 defer m.mu.Unlock()
71
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020072 // Update the data that is provided on first Get() to watchers.
Serge Bazanskic00318e2021-03-03 12:39:24 +010073 m.inner = val
74 m.innerSet = true
75
76 // Go through all watchers, updating them on the new value and filtering out
77 // all closed watchers.
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020078 newWatchers := make([]*watcher, 0, len(m.watchers))
Serge Bazanskic00318e2021-03-03 12:39:24 +010079 for _, w := range m.watchers {
80 w := w
81 if w.closed() {
82 continue
83 }
84 w.update(m.Sync, val)
85 newWatchers = append(newWatchers, w)
86 }
87 m.watchers = newWatchers
88}
89
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020090// watcher implements the event.Watcher interface for watchers returned by
91// Value.
92type watcher struct {
Serge Bazanskic00318e2021-03-03 12:39:24 +010093 // activeReqC is a channel used to request an active submission channel
94 // from a pending Get function, if any.
95 activeReqC chan chan interface{}
96 // deadletterSubmitC is a channel used to communicate a value that
97 // attempted to be submitted via activeReqC. This will be received by the
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020098 // deadletter worker of this watcher and passed on to the next .Get call
Serge Bazanskic00318e2021-03-03 12:39:24 +010099 // that occurs.
100 deadletterSubmitC chan interface{}
101
102 // getSem is a channel-based semaphore (which is of size 1, and thus in
103 // fact a mutex) that is used to ensure that only a single .Get() call is
104 // active. It is implemented as a channel to permit concurrent .Get() calls
105 // to error out instead of blocking.
106 getSem chan struct{}
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200107 // close is a channel that is closed when this watcher is itself Closed.
Serge Bazanskic00318e2021-03-03 12:39:24 +0100108 close chan struct{}
109}
110
111// Watch retrieves a Watcher that keeps track on the version of the data
112// contained within the Value that was last seen by a consumer.
113//
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200114// For more information about guarantees, see event.Value.Watch.
115func (m *Value) Watch() event.Watcher {
116 waiter := &watcher{
Serge Bazanskic00318e2021-03-03 12:39:24 +0100117 activeReqC: make(chan chan interface{}),
118 deadletterSubmitC: make(chan interface{}),
119 close: make(chan struct{}),
120 getSem: make(chan struct{}, 1),
121 }
122 // Start the deadletter worker as a goroutine. It will be stopped when the
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200123 // watcher is Closed() (as signaled by the close channel).
Serge Bazanskic00318e2021-03-03 12:39:24 +0100124 go waiter.deadletterWorker()
125
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200126 // Append this watcher to the Value.
Serge Bazanskic00318e2021-03-03 12:39:24 +0100127 m.mu.Lock()
128 m.watchers = append(m.watchers, waiter)
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200129 // If the Value already has some value set, communicate that to the
Serge Bazanskic00318e2021-03-03 12:39:24 +0100130 // first Get call by going through the deadletter worker.
131 if m.innerSet {
132 waiter.deadletterSubmitC <- m.inner
133 }
134 m.mu.Unlock()
135
136 return waiter
137}
138
139// deadletterWorker runs the 'deadletter worker', as goroutine that contains
140// any data that has been Set on the Value that is being watched that was
141// unable to be delivered directly to a pending .Get call.
142//
143// It watches the deadletterSubmitC channel for updated data, and overrides
144// previously received data. Then, when a .Get() begins to pend (and respond to
145// activeReqC receives), the deadletter worker will deliver that value.
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200146func (m *watcher) deadletterWorker() {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100147 // Current value, and flag to mark it as set (vs. nil).
148 var cur interface{}
149 var set bool
150
151 for {
152 if !set {
153 // If no value is yet available, only attempt to receive one from the
154 // submit channel, as there's nothing to submit to pending .Get() calls
155 // yet.
156 val, ok := <-m.deadletterSubmitC
157 if !ok {
158 // If the channel has been closed (by Close()), exit.
159 return
160 }
161 cur = val
162 set = true
163 } else {
164 // If a value is available, update the inner state. Otherwise, if a
165 // .Get() is pending, submit our current state and unset it.
166 select {
167 case val, ok := <-m.deadletterSubmitC:
168 if !ok {
169 // If the channel has been closed (by Close()), exit.
170 return
171 }
172 cur = val
173 case c := <-m.activeReqC:
174 // Potential race: a .Get() might've been active, but might've
175 // quit by the time we're here (and will not receive on the
176 // responded channel). Handle this gracefully by just returning
177 // to the main loop if that's the case.
178 select {
179 case c <- cur:
180 set = false
181 default:
182 }
183 }
184 }
185 }
186}
187
188// closed returns whether this watcher has been closed.
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200189func (m *watcher) closed() bool {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100190 select {
191 case _, ok := <-m.close:
192 if !ok {
193 return true
194 }
195 default:
196 }
197 return false
198}
199
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200200// update is the high level update-this-watcher function called by Value.
201func (m *watcher) update(sync bool, val interface{}) {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100202 // If synchronous delivery was requested, block until a watcher .Gets it.
203 if sync {
204 c := <-m.activeReqC
205 c <- val
206 return
207 }
208
209 // Otherwise, deliver asynchronously. This means either delivering directly
210 // to a pending .Get if one exists, or submitting to the deadletter worker
211 // otherwise.
212 select {
213 case c := <-m.activeReqC:
214 // Potential race: a .Get() might've been active, but might've quit by
215 // the time we're here (and will not receive on the responded channel).
216 // Handle this gracefully by falling back to the deadletter worker.
217 select {
218 case c <- val:
219 default:
220 m.deadletterSubmitC <- val
221 }
222 default:
223 m.deadletterSubmitC <- val
224 }
225}
226
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200227func (m *watcher) Close() error {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100228 close(m.deadletterSubmitC)
229 close(m.close)
230 return nil
231}
232
Serge Bazanski8d45a052021-10-18 17:24:24 +0200233// GetOption is a memory.Get-specific option passed to Get. Currently no options
234// are implemented.
235type GetOption struct {
236}
237
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200238// Get blocks until a Value's data is available. See event.Watcher.Get for
239// guarantees and more information.
Serge Bazanski8d45a052021-10-18 17:24:24 +0200240func (m *watcher) Get(ctx context.Context, opts ...event.GetOption) (interface{}, error) {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100241 // Make sure we're the only active .Get call.
242 select {
243 case m.getSem <- struct{}{}:
244 default:
245 return nil, fmt.Errorf("cannot Get() concurrently on a single waiter")
246 }
247 defer func() {
248 <-m.getSem
249 }()
250
Serge Bazanski8d45a052021-10-18 17:24:24 +0200251 for _, optI := range opts {
252 _, ok := optI.(GetOption)
253 if !ok {
254 return nil, fmt.Errorf("get options must be of type memory.GetOption")
255 }
256 }
257
Serge Bazanskic00318e2021-03-03 12:39:24 +0100258 c := make(chan interface{})
259
260 // Start responding on activeReqC. This signals to .update and to the
261 // deadletter worker that we're ready to accept data updates.
262
263 // There is a potential for a race condition here that hasn't been observed
264 // in tests but might happen:
265 // 1) Value.Watch returns a Watcher 'w'.
266 // 2) w.Set(0) is called, no .Get() is pending, so 0 is submitted to the
267 // deadletter worker.
268 // 3) w.Get() is called, and activeReqC begins to be served.
269 // 4) Simultaneously:
270 // a) w.Set(1) is called, attempting to submit via activeReqC
271 // b) the deadletter worker attempts to submit via activeReqC
272 //
273 // This could theoretically cause .Get() to first return 1, and then 0, if
274 // the Set activeReqC read and subsequent channel write is served before
275 // the deadletter workers' read/write is.
276 // As noted, however, this has not been observed in practice, even though
277 // TestConcurrency explicitly attempts to trigger this condition. More
278 // research needs to be done to attempt to trigger this (or to lawyer the
279 // Go channel spec to see if this has some guarantees that resolve this
280 // either way), or a preemptive fix can be attempted by adding monotonic
281 // counters associated with each .Set() value, ensuring an older value does
282 // not replace a newer value.
283 //
284 // TODO(q3k): investigate this.
285 for {
286 select {
287 case <-ctx.Done():
288 return nil, ctx.Err()
289 case m.activeReqC <- c:
290 case val := <-c:
291 return val, nil
292 }
293 }
294}