blob: eec0a375962d83785d41ed0665dfccdc6943c32d [file] [log] [blame]
Serge Bazanskic00318e2021-03-03 12:39:24 +01001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020017package memory
Serge Bazanskic00318e2021-03-03 12:39:24 +010018
19import (
20 "context"
Tim Windelschmidtd5f851b2024-04-23 14:59:37 +020021 "errors"
Serge Bazanskic00318e2021-03-03 12:39:24 +010022 "fmt"
23 "sync"
24 "sync/atomic"
25 "testing"
26 "time"
Jan Schär36bde9c2024-03-19 15:05:33 +010027
28 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanskic00318e2021-03-03 12:39:24 +010029)
30
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020031// TestAsync exercises the high-level behaviour of a Value, in which a
Serge Bazanskic00318e2021-03-03 12:39:24 +010032// watcher is able to catch up to the newest Set value.
33func TestAsync(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +000034 p := Value[int]{}
Serge Bazanskic00318e2021-03-03 12:39:24 +010035 p.Set(0)
36
37 ctx := context.Background()
38
39 // The 0 from Set() should be available via .Get().
40 watcher := p.Watch()
41 val, err := watcher.Get(ctx)
42 if err != nil {
43 t.Fatalf("Get: %v", err)
44 }
Serge Bazanski37110c32023-03-01 13:57:27 +000045 if want, got := 0, val; want != got {
Serge Bazanskic00318e2021-03-03 12:39:24 +010046 t.Fatalf("Value: got %d, wanted %d", got, want)
47 }
48
49 // Send a large amount of updates that the watcher does not actively .Get().
50 for i := 1; i <= 100; i++ {
51 p.Set(i)
52 }
53
54 // The watcher should still end up with the newest .Set() value on the next
55 // .Get() call.
56 val, err = watcher.Get(ctx)
57 if err != nil {
58 t.Fatalf("Get: %v", err)
59 }
Serge Bazanski37110c32023-03-01 13:57:27 +000060 if want, got := 100, val; want != got {
Serge Bazanskic00318e2021-03-03 12:39:24 +010061 t.Fatalf("Value: got %d, wanted %d", got, want)
62 }
63}
64
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020065// TestSyncBlocks exercises the Value's 'Sync' field, which makes all
Serge Bazanskic00318e2021-03-03 12:39:24 +010066// Set() calls block until all respective watchers .Get() the updated data.
67// This particular test ensures that .Set() calls to a Watcher result in a
68// prefect log of updates being transmitted to a watcher.
69func TestSync(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +000070 p := Value[int]{
Serge Bazanskic00318e2021-03-03 12:39:24 +010071 Sync: true,
72 }
73 values := make(chan int, 100)
Tim Windelschmidt3b5a9172024-05-23 13:33:52 +020074 var wg sync.WaitGroup
Serge Bazanskic00318e2021-03-03 12:39:24 +010075 wg.Add(1)
76 go func() {
77 ctx := context.Background()
78 watcher := p.Watch()
79 wg.Done()
80 for {
81 value, err := watcher.Get(ctx)
82 if err != nil {
83 panic(err)
84 }
Serge Bazanski37110c32023-03-01 13:57:27 +000085 values <- value
Serge Bazanskic00318e2021-03-03 12:39:24 +010086 }
87 }()
88
89 p.Set(0)
90 wg.Wait()
91
92 want := []int{1, 2, 3, 4}
93 for _, w := range want {
94 p.Set(w)
95 }
96
97 timeout := time.After(time.Second)
98 for i, w := range append([]int{0}, want...) {
99 select {
100 case <-timeout:
101 t.Fatalf("timed out on value %d (%d)", i, w)
102 case val := <-values:
103 if w != val {
104 t.Errorf("value %d was %d, wanted %d", i, val, w)
105 }
106 }
107 }
108}
109
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200110// TestSyncBlocks exercises the Value's 'Sync' field, which makes all
Serge Bazanskic00318e2021-03-03 12:39:24 +0100111// Set() calls block until all respective watchers .Get() the updated data.
112// This particular test ensures that .Set() calls actually block when a watcher
113// is unattended.
114func TestSyncBlocks(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +0000115 p := Value[int]{
Serge Bazanskic00318e2021-03-03 12:39:24 +0100116 Sync: true,
117 }
118 ctx := context.Background()
119
120 // Shouldn't block, as there's no declared watchers.
121 p.Set(0)
122
123 watcher := p.Watch()
124
125 // Should retrieve the zero, more requests will pend.
126 value, err := watcher.Get(ctx)
127 if err != nil {
128 t.Fatalf("Get: %v", err)
129 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000130 if want, got := 0, value; want != got {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100131 t.Fatalf("Got initial value %d, wanted %d", got, want)
132 }
133
134 // .Set() Should block, as watcher is unattended.
135 //
136 // Whether something blocks in Go is untestable in a robust way (see: halting
137 // problem). We work around this this by introducing a 'stage' int64, which is
138 // put on the 'c' channel after the needs-to-block function returns. We then
139 // perform an action that should unblock this function right after updating
140 // 'stage' to a different value.
141 // Then, we observe what was put on the channel: If it's the initial value, it
142 // means the function didn't block when expected. Otherwise, it means the
143 // function unblocked when expected.
144 stage := int64(0)
145 c := make(chan int64, 1)
146 go func() {
147 p.Set(1)
148 c <- atomic.LoadInt64(&stage)
149 }()
150
151 // Getting should unblock the provider. Mark via 'stage' variable that
152 // unblocking now is expected.
153 atomic.StoreInt64(&stage, int64(1))
154 // Potential race: .Set() unblocks here due to some bug, before .Get() is
155 // called, and we record a false positive.
156 value, err = watcher.Get(ctx)
157 if err != nil {
158 t.Fatalf("Get: %v", err)
159 }
160
161 res := <-c
162 if res != int64(1) {
163 t.Fatalf("Set() returned before Get()")
164 }
165
Serge Bazanski37110c32023-03-01 13:57:27 +0000166 if want, got := 1, value; want != got {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100167 t.Fatalf("Wanted value %d, got %d", want, got)
168 }
169
170 // Closing the watcher and setting should not block anymore.
171 if err := watcher.Close(); err != nil {
172 t.Fatalf("Close: %v", err)
173 }
174 // Last step, if this blocks we will get a deadlock error and the test will panic.
175 p.Set(2)
176}
177
178// TestMultipleGets verifies that calling .Get() on a single watcher from two
179// goroutines is prevented by returning an error in exactly one of them.
180func TestMultipleGets(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +0000181 p := Value[int]{}
Serge Bazanskic00318e2021-03-03 12:39:24 +0100182 ctx := context.Background()
183
184 w := p.Watch()
185
186 tryError := func(errs chan error) {
187 _, err := w.Get(ctx)
188 errs <- err
189 }
190 errs := make(chan error, 2)
191 go tryError(errs)
192 go tryError(errs)
193
194 for err := range errs {
195 if err == nil {
196 t.Fatalf("A Get call succeeded, while it should have blocked or returned an error")
197 } else {
198 // Found the error, test succeeded.
199 break
200 }
201 }
202}
203
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200204// TestConcurrency attempts to stress the Value/Watcher
Serge Bazanskic00318e2021-03-03 12:39:24 +0100205// implementation to design limits (a hundred simultaneous watchers), ensuring
206// that the watchers all settle to the final set value.
207func TestConcurrency(t *testing.T) {
208 ctx := context.Background()
209
Serge Bazanski37110c32023-03-01 13:57:27 +0000210 p := Value[int]{}
Serge Bazanskic00318e2021-03-03 12:39:24 +0100211 p.Set(0)
212
213 // Number of watchers to create.
214 watcherN := 100
215 // Expected final value to be Set().
216 final := 100
217 // Result channel per watcher.
218 resC := make([]chan error, watcherN)
219
220 // Spawn watcherN watchers.
221 for i := 0; i < watcherN; i++ {
222 resC[i] = make(chan error, 1)
223 go func(id int) {
224 // done is a helper function that will put an error on the
225 // respective watcher's resC.
226 done := func(err error) {
227 resC[id] <- err
228 close(resC[id])
229 }
230
231 watcher := p.Watch()
232 // prev is used to ensure the values received are monotonic.
233 prev := -1
234 for {
235 val, err := watcher.Get(ctx)
236 if err != nil {
237 done(err)
238 return
239 }
240
241 // Ensure monotonicity of received data.
Serge Bazanski37110c32023-03-01 13:57:27 +0000242 if val <= prev {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100243 done(fmt.Errorf("received out of order data: %d after %d", val, prev))
244 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000245 prev = val
Serge Bazanskic00318e2021-03-03 12:39:24 +0100246
247 // Quit when the final value is received.
248 if val == final {
249 done(nil)
250 return
251 }
252
253 // Sleep a bit, depending on the watcher. This makes each
254 // watcher behave slightly differently, and attempts to
255 // exercise races dependent on sleep time between subsequent
256 // Get calls.
257 time.Sleep(time.Millisecond * time.Duration(id))
258 }
259 }(i)
260 }
261
262 // Set 1..final on the value.
263 for i := 1; i <= final; i++ {
264 p.Set(i)
265 }
266
267 // Ensure all watchers exit with no error.
268 for i, c := range resC {
269 err := <-c
270 if err != nil {
271 t.Errorf("Watcher %d returned %v", i, err)
272 }
273 }
274}
275
276// TestCanceling exercises whether a context canceling in a .Get() gracefully
277// aborts that particular Get call, but also allows subsequent use of the same
278// watcher.
279func TestCanceling(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +0000280 p := Value[int]{
Serge Bazanskic00318e2021-03-03 12:39:24 +0100281 Sync: true,
282 }
283
284 ctx, ctxC := context.WithCancel(context.Background())
285
286 watcher := p.Watch()
287
288 // errs will contain the error returned by Get.
289 errs := make(chan error, 1)
290 go func() {
291 // This Get will block, as no initial data has been Set on the value.
292 _, err := watcher.Get(ctx)
293 errs <- err
294 }()
295
296 // Cancel the context, and expect that context error to propagate to the .Get().
297 ctxC()
Tim Windelschmidtd5f851b2024-04-23 14:59:37 +0200298 if want, got := ctx.Err(), <-errs; !errors.Is(got, want) {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100299 t.Fatalf("Get should've returned %v, got %v", want, got)
300 }
301
302 // Do another .Get() on the same watcher with a new context. Even though the
303 // call was aborted via a context cancel, the watcher should continue working.
304 ctx = context.Background()
305 go func() {
306 _, err := watcher.Get(ctx)
307 errs <- err
308 }()
309
310 // Unblock the .Get now.
311 p.Set(1)
Tim Windelschmidtd5f851b2024-04-23 14:59:37 +0200312 if want, got := error(nil), <-errs; !errors.Is(got, want) {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100313 t.Fatalf("Get should've returned %v, got %v", want, got)
314 }
315}
316
317// TestSetAfterWatch ensures that if a value is updated between a Watch and the
318// initial Get, only the newest Set value is returns.
319func TestSetAfterWatch(t *testing.T) {
320 ctx := context.Background()
321
Serge Bazanski37110c32023-03-01 13:57:27 +0000322 p := Value[int]{}
Serge Bazanskic00318e2021-03-03 12:39:24 +0100323 p.Set(0)
324
325 watcher := p.Watch()
326 p.Set(1)
327
328 data, err := watcher.Get(ctx)
329 if err != nil {
330 t.Fatalf("Get: %v", err)
331 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000332 if want, got := 1, data; want != got {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100333 t.Errorf("Get should've returned %v, got %v", want, got)
334 }
335}
Jan Schär36bde9c2024-03-19 15:05:33 +0100336
337// TestWatchersList ensures that the list of watchers is managed correctly,
338// i.e. there is no memory leak and closed watchers are removed while
339// keeping all non-closed watchers.
340func TestWatchersList(t *testing.T) {
341 ctx := context.Background()
342 p := Value[int]{}
343
344 var watchers []event.Watcher[int]
345 for i := 0; i < 100; i++ {
346 watchers = append(watchers, p.Watch())
347 }
348 for i := 0; i < 10000; i++ {
349 watchers[10].Close()
350 watchers[10] = p.Watch()
351 }
352
353 if want, got := 1000, cap(p.watchers); want <= got {
354 t.Fatalf("Got capacity %d, wanted less than %d", got, want)
355 }
356
357 p.Set(1)
358 if want, got := 100, len(p.watchers); want != got {
359 t.Fatalf("Got %d watchers, wanted %d", got, want)
360 }
361
362 for _, watcher := range watchers {
363 data, err := watcher.Get(ctx)
364 if err != nil {
365 t.Fatalf("Get: %v", err)
366 }
367 if want, got := 1, data; want != got {
368 t.Errorf("Get should've returned %v, got %v", want, got)
369 }
370 }
371}