blob: 98a150114f220fa69bfe05fd8a56cf23326aab6d [file] [log] [blame]
Serge Bazanskic00318e2021-03-03 12:39:24 +01001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020017package memory
Serge Bazanskic00318e2021-03-03 12:39:24 +010018
19import (
20 "context"
21 "fmt"
22 "sync"
23 "sync/atomic"
24 "testing"
25 "time"
Jan Schär36bde9c2024-03-19 15:05:33 +010026
27 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanskic00318e2021-03-03 12:39:24 +010028)
29
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020030// TestAsync exercises the high-level behaviour of a Value, in which a
Serge Bazanskic00318e2021-03-03 12:39:24 +010031// watcher is able to catch up to the newest Set value.
32func TestAsync(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +000033 p := Value[int]{}
Serge Bazanskic00318e2021-03-03 12:39:24 +010034 p.Set(0)
35
36 ctx := context.Background()
37
38 // The 0 from Set() should be available via .Get().
39 watcher := p.Watch()
40 val, err := watcher.Get(ctx)
41 if err != nil {
42 t.Fatalf("Get: %v", err)
43 }
Serge Bazanski37110c32023-03-01 13:57:27 +000044 if want, got := 0, val; want != got {
Serge Bazanskic00318e2021-03-03 12:39:24 +010045 t.Fatalf("Value: got %d, wanted %d", got, want)
46 }
47
48 // Send a large amount of updates that the watcher does not actively .Get().
49 for i := 1; i <= 100; i++ {
50 p.Set(i)
51 }
52
53 // The watcher should still end up with the newest .Set() value on the next
54 // .Get() call.
55 val, err = watcher.Get(ctx)
56 if err != nil {
57 t.Fatalf("Get: %v", err)
58 }
Serge Bazanski37110c32023-03-01 13:57:27 +000059 if want, got := 100, val; want != got {
Serge Bazanskic00318e2021-03-03 12:39:24 +010060 t.Fatalf("Value: got %d, wanted %d", got, want)
61 }
62}
63
Serge Bazanski68ca5ee2021-04-27 16:09:16 +020064// TestSyncBlocks exercises the Value's 'Sync' field, which makes all
Serge Bazanskic00318e2021-03-03 12:39:24 +010065// Set() calls block until all respective watchers .Get() the updated data.
66// This particular test ensures that .Set() calls to a Watcher result in a
67// prefect log of updates being transmitted to a watcher.
68func TestSync(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +000069 p := Value[int]{
Serge Bazanskic00318e2021-03-03 12:39:24 +010070 Sync: true,
71 }
72 values := make(chan int, 100)
73 wg := sync.WaitGroup{}
74 wg.Add(1)
75 go func() {
76 ctx := context.Background()
77 watcher := p.Watch()
78 wg.Done()
79 for {
80 value, err := watcher.Get(ctx)
81 if err != nil {
82 panic(err)
83 }
Serge Bazanski37110c32023-03-01 13:57:27 +000084 values <- value
Serge Bazanskic00318e2021-03-03 12:39:24 +010085 }
86 }()
87
88 p.Set(0)
89 wg.Wait()
90
91 want := []int{1, 2, 3, 4}
92 for _, w := range want {
93 p.Set(w)
94 }
95
96 timeout := time.After(time.Second)
97 for i, w := range append([]int{0}, want...) {
98 select {
99 case <-timeout:
100 t.Fatalf("timed out on value %d (%d)", i, w)
101 case val := <-values:
102 if w != val {
103 t.Errorf("value %d was %d, wanted %d", i, val, w)
104 }
105 }
106 }
107}
108
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200109// TestSyncBlocks exercises the Value's 'Sync' field, which makes all
Serge Bazanskic00318e2021-03-03 12:39:24 +0100110// Set() calls block until all respective watchers .Get() the updated data.
111// This particular test ensures that .Set() calls actually block when a watcher
112// is unattended.
113func TestSyncBlocks(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +0000114 p := Value[int]{
Serge Bazanskic00318e2021-03-03 12:39:24 +0100115 Sync: true,
116 }
117 ctx := context.Background()
118
119 // Shouldn't block, as there's no declared watchers.
120 p.Set(0)
121
122 watcher := p.Watch()
123
124 // Should retrieve the zero, more requests will pend.
125 value, err := watcher.Get(ctx)
126 if err != nil {
127 t.Fatalf("Get: %v", err)
128 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000129 if want, got := 0, value; want != got {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100130 t.Fatalf("Got initial value %d, wanted %d", got, want)
131 }
132
133 // .Set() Should block, as watcher is unattended.
134 //
135 // Whether something blocks in Go is untestable in a robust way (see: halting
136 // problem). We work around this this by introducing a 'stage' int64, which is
137 // put on the 'c' channel after the needs-to-block function returns. We then
138 // perform an action that should unblock this function right after updating
139 // 'stage' to a different value.
140 // Then, we observe what was put on the channel: If it's the initial value, it
141 // means the function didn't block when expected. Otherwise, it means the
142 // function unblocked when expected.
143 stage := int64(0)
144 c := make(chan int64, 1)
145 go func() {
146 p.Set(1)
147 c <- atomic.LoadInt64(&stage)
148 }()
149
150 // Getting should unblock the provider. Mark via 'stage' variable that
151 // unblocking now is expected.
152 atomic.StoreInt64(&stage, int64(1))
153 // Potential race: .Set() unblocks here due to some bug, before .Get() is
154 // called, and we record a false positive.
155 value, err = watcher.Get(ctx)
156 if err != nil {
157 t.Fatalf("Get: %v", err)
158 }
159
160 res := <-c
161 if res != int64(1) {
162 t.Fatalf("Set() returned before Get()")
163 }
164
Serge Bazanski37110c32023-03-01 13:57:27 +0000165 if want, got := 1, value; want != got {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100166 t.Fatalf("Wanted value %d, got %d", want, got)
167 }
168
169 // Closing the watcher and setting should not block anymore.
170 if err := watcher.Close(); err != nil {
171 t.Fatalf("Close: %v", err)
172 }
173 // Last step, if this blocks we will get a deadlock error and the test will panic.
174 p.Set(2)
175}
176
177// TestMultipleGets verifies that calling .Get() on a single watcher from two
178// goroutines is prevented by returning an error in exactly one of them.
179func TestMultipleGets(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +0000180 p := Value[int]{}
Serge Bazanskic00318e2021-03-03 12:39:24 +0100181 ctx := context.Background()
182
183 w := p.Watch()
184
185 tryError := func(errs chan error) {
186 _, err := w.Get(ctx)
187 errs <- err
188 }
189 errs := make(chan error, 2)
190 go tryError(errs)
191 go tryError(errs)
192
193 for err := range errs {
194 if err == nil {
195 t.Fatalf("A Get call succeeded, while it should have blocked or returned an error")
196 } else {
197 // Found the error, test succeeded.
198 break
199 }
200 }
201}
202
Serge Bazanski68ca5ee2021-04-27 16:09:16 +0200203// TestConcurrency attempts to stress the Value/Watcher
Serge Bazanskic00318e2021-03-03 12:39:24 +0100204// implementation to design limits (a hundred simultaneous watchers), ensuring
205// that the watchers all settle to the final set value.
206func TestConcurrency(t *testing.T) {
207 ctx := context.Background()
208
Serge Bazanski37110c32023-03-01 13:57:27 +0000209 p := Value[int]{}
Serge Bazanskic00318e2021-03-03 12:39:24 +0100210 p.Set(0)
211
212 // Number of watchers to create.
213 watcherN := 100
214 // Expected final value to be Set().
215 final := 100
216 // Result channel per watcher.
217 resC := make([]chan error, watcherN)
218
219 // Spawn watcherN watchers.
220 for i := 0; i < watcherN; i++ {
221 resC[i] = make(chan error, 1)
222 go func(id int) {
223 // done is a helper function that will put an error on the
224 // respective watcher's resC.
225 done := func(err error) {
226 resC[id] <- err
227 close(resC[id])
228 }
229
230 watcher := p.Watch()
231 // prev is used to ensure the values received are monotonic.
232 prev := -1
233 for {
234 val, err := watcher.Get(ctx)
235 if err != nil {
236 done(err)
237 return
238 }
239
240 // Ensure monotonicity of received data.
Serge Bazanski37110c32023-03-01 13:57:27 +0000241 if val <= prev {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100242 done(fmt.Errorf("received out of order data: %d after %d", val, prev))
243 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000244 prev = val
Serge Bazanskic00318e2021-03-03 12:39:24 +0100245
246 // Quit when the final value is received.
247 if val == final {
248 done(nil)
249 return
250 }
251
252 // Sleep a bit, depending on the watcher. This makes each
253 // watcher behave slightly differently, and attempts to
254 // exercise races dependent on sleep time between subsequent
255 // Get calls.
256 time.Sleep(time.Millisecond * time.Duration(id))
257 }
258 }(i)
259 }
260
261 // Set 1..final on the value.
262 for i := 1; i <= final; i++ {
263 p.Set(i)
264 }
265
266 // Ensure all watchers exit with no error.
267 for i, c := range resC {
268 err := <-c
269 if err != nil {
270 t.Errorf("Watcher %d returned %v", i, err)
271 }
272 }
273}
274
275// TestCanceling exercises whether a context canceling in a .Get() gracefully
276// aborts that particular Get call, but also allows subsequent use of the same
277// watcher.
278func TestCanceling(t *testing.T) {
Serge Bazanski37110c32023-03-01 13:57:27 +0000279 p := Value[int]{
Serge Bazanskic00318e2021-03-03 12:39:24 +0100280 Sync: true,
281 }
282
283 ctx, ctxC := context.WithCancel(context.Background())
284
285 watcher := p.Watch()
286
287 // errs will contain the error returned by Get.
288 errs := make(chan error, 1)
289 go func() {
290 // This Get will block, as no initial data has been Set on the value.
291 _, err := watcher.Get(ctx)
292 errs <- err
293 }()
294
295 // Cancel the context, and expect that context error to propagate to the .Get().
296 ctxC()
297 if want, got := ctx.Err(), <-errs; want != got {
298 t.Fatalf("Get should've returned %v, got %v", want, got)
299 }
300
301 // Do another .Get() on the same watcher with a new context. Even though the
302 // call was aborted via a context cancel, the watcher should continue working.
303 ctx = context.Background()
304 go func() {
305 _, err := watcher.Get(ctx)
306 errs <- err
307 }()
308
309 // Unblock the .Get now.
310 p.Set(1)
311 if want, got := error(nil), <-errs; want != got {
312 t.Fatalf("Get should've returned %v, got %v", want, got)
313 }
314}
315
316// TestSetAfterWatch ensures that if a value is updated between a Watch and the
317// initial Get, only the newest Set value is returns.
318func TestSetAfterWatch(t *testing.T) {
319 ctx := context.Background()
320
Serge Bazanski37110c32023-03-01 13:57:27 +0000321 p := Value[int]{}
Serge Bazanskic00318e2021-03-03 12:39:24 +0100322 p.Set(0)
323
324 watcher := p.Watch()
325 p.Set(1)
326
327 data, err := watcher.Get(ctx)
328 if err != nil {
329 t.Fatalf("Get: %v", err)
330 }
Serge Bazanski37110c32023-03-01 13:57:27 +0000331 if want, got := 1, data; want != got {
Serge Bazanskic00318e2021-03-03 12:39:24 +0100332 t.Errorf("Get should've returned %v, got %v", want, got)
333 }
334}
Jan Schär36bde9c2024-03-19 15:05:33 +0100335
336// TestWatchersList ensures that the list of watchers is managed correctly,
337// i.e. there is no memory leak and closed watchers are removed while
338// keeping all non-closed watchers.
339func TestWatchersList(t *testing.T) {
340 ctx := context.Background()
341 p := Value[int]{}
342
343 var watchers []event.Watcher[int]
344 for i := 0; i < 100; i++ {
345 watchers = append(watchers, p.Watch())
346 }
347 for i := 0; i < 10000; i++ {
348 watchers[10].Close()
349 watchers[10] = p.Watch()
350 }
351
352 if want, got := 1000, cap(p.watchers); want <= got {
353 t.Fatalf("Got capacity %d, wanted less than %d", got, want)
354 }
355
356 p.Set(1)
357 if want, got := 100, len(p.watchers); want != got {
358 t.Fatalf("Got %d watchers, wanted %d", got, want)
359 }
360
361 for _, watcher := range watchers {
362 data, err := watcher.Get(ctx)
363 if err != nil {
364 t.Fatalf("Get: %v", err)
365 }
366 if want, got := 1, data; want != got {
367 t.Errorf("Get should've returned %v, got %v", want, got)
368 }
369 }
370}