blob: 7f4fb2a8d70a7cf7f07b24e8c41a86f9c5dc78a8 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +01002// SPDX-License-Identifier: Apache-2.0
Serge Bazanski9c09c4e2020-03-24 13:58:01 +01003
4package supervisor
5
6import (
7 "context"
8 "fmt"
9 "testing"
10 "time"
Serge Bazanski26d52252022-02-07 15:57:54 +010011
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020012 "source.monogon.dev/osbase/logtree"
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010013)
14
Serge Bazanski35e43d12021-07-06 13:12:14 +020015// waitSettle waits until the supervisor reaches a 'settled' state - ie., one
16// where no actions have been performed for a number of GC cycles.
17// This is used in tests only.
18func (s *supervisor) waitSettle(ctx context.Context) error {
19 waiter := make(chan struct{})
20 s.pReq <- &processorRequest{
21 waitSettled: &processorRequestWaitSettled{
22 waiter: waiter,
23 },
24 }
25
26 select {
27 case <-ctx.Done():
28 return ctx.Err()
29 case <-waiter:
30 return nil
31 }
32}
33
34// waitSettleError wraps waitSettle to fail a test if an error occurs, eg. the
35// context is canceled.
36func (s *supervisor) waitSettleError(ctx context.Context, t *testing.T) {
37 err := s.waitSettle(ctx)
38 if err != nil {
39 t.Fatalf("waitSettle: %v", err)
40 }
41}
42
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010043func runnableBecomesHealthy(healthy, done chan struct{}) Runnable {
44 return func(ctx context.Context) error {
45 Signal(ctx, SignalHealthy)
46
47 go func() {
48 if healthy != nil {
49 healthy <- struct{}{}
50 }
51 }()
52
53 <-ctx.Done()
54
Serge Bazanski579015a2021-11-18 13:20:20 +010055 if done != nil {
56 done <- struct{}{}
57 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010058
59 return ctx.Err()
60 }
61}
62
63func runnableSpawnsMore(healthy, done chan struct{}, levels int) Runnable {
64 return func(ctx context.Context) error {
65 if levels > 0 {
66 err := RunGroup(ctx, map[string]Runnable{
67 "a": runnableSpawnsMore(nil, nil, levels-1),
68 "b": runnableSpawnsMore(nil, nil, levels-1),
69 })
70 if err != nil {
71 return err
72 }
73 }
74
75 Signal(ctx, SignalHealthy)
76
77 go func() {
78 if healthy != nil {
79 healthy <- struct{}{}
80 }
81 }()
82
83 <-ctx.Done()
84
Serge Bazanski579015a2021-11-18 13:20:20 +010085 if done != nil {
86 done <- struct{}{}
87 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010088 return ctx.Err()
89 }
90}
91
Serge Bazanski216fe7b2021-05-21 18:36:16 +020092// rc is a Remote Controlled runnable. It is a generic runnable used for
93// testing the supervisor.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010094type rc struct {
95 req chan rcRunnableRequest
96}
97
98type rcRunnableRequest struct {
99 cmd rcRunnableCommand
100 stateC chan rcRunnableState
101}
102
103type rcRunnableCommand int
104
105const (
106 rcRunnableCommandBecomeHealthy rcRunnableCommand = iota
107 rcRunnableCommandBecomeDone
108 rcRunnableCommandDie
109 rcRunnableCommandPanic
110 rcRunnableCommandState
111)
112
113type rcRunnableState int
114
115const (
116 rcRunnableStateNew rcRunnableState = iota
117 rcRunnableStateHealthy
118 rcRunnableStateDone
119)
120
121func (r *rc) becomeHealthy() {
122 r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeHealthy}
123}
124
125func (r *rc) becomeDone() {
126 r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeDone}
127}
128func (r *rc) die() {
129 r.req <- rcRunnableRequest{cmd: rcRunnableCommandDie}
130}
131
132func (r *rc) panic() {
133 r.req <- rcRunnableRequest{cmd: rcRunnableCommandPanic}
134}
135
136func (r *rc) state() rcRunnableState {
137 c := make(chan rcRunnableState)
138 r.req <- rcRunnableRequest{
139 cmd: rcRunnableCommandState,
140 stateC: c,
141 }
142 return <-c
143}
144
145func (r *rc) waitState(s rcRunnableState) {
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200146 // This is poll based. Making it non-poll based would make the RC runnable
147 // logic a bit more complex for little gain.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100148 for {
149 got := r.state()
150 if got == s {
151 return
152 }
153 time.Sleep(10 * time.Millisecond)
154 }
155}
156
157func newRC() *rc {
158 return &rc{
159 req: make(chan rcRunnableRequest),
160 }
161}
162
163// Remote Controlled Runnable
164func (r *rc) runnable() Runnable {
165 return func(ctx context.Context) error {
166 state := rcRunnableStateNew
167
168 for {
169 select {
170 case <-ctx.Done():
171 return ctx.Err()
172 case r := <-r.req:
173 switch r.cmd {
174 case rcRunnableCommandBecomeHealthy:
175 Signal(ctx, SignalHealthy)
176 state = rcRunnableStateHealthy
177 case rcRunnableCommandBecomeDone:
178 Signal(ctx, SignalDone)
179 state = rcRunnableStateDone
180 case rcRunnableCommandDie:
181 return fmt.Errorf("died on request")
182 case rcRunnableCommandPanic:
183 panic("at the disco")
184 case rcRunnableCommandState:
185 r.stateC <- state
186 }
187 }
188 }
189 }
190}
191
192func TestSimple(t *testing.T) {
193 h1 := make(chan struct{})
194 d1 := make(chan struct{})
195 h2 := make(chan struct{})
196 d2 := make(chan struct{})
197
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100198 ctx, ctxC := context.WithCancel(context.Background())
199 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100200 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100201 err := RunGroup(ctx, map[string]Runnable{
202 "one": runnableBecomesHealthy(h1, d1),
203 "two": runnableBecomesHealthy(h2, d2),
204 })
205 if err != nil {
206 return err
207 }
208 Signal(ctx, SignalHealthy)
209 Signal(ctx, SignalDone)
210 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200211 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100212
213 // Expect both to start running.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200214 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100215 select {
216 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200217 default:
218 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100219 }
220 select {
221 case <-h2:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200222 default:
223 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100224 }
225}
226
227func TestSimpleFailure(t *testing.T) {
228 h1 := make(chan struct{})
229 d1 := make(chan struct{})
230 two := newRC()
231
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200232 ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100233 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100234 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100235 err := RunGroup(ctx, map[string]Runnable{
236 "one": runnableBecomesHealthy(h1, d1),
237 "two": two.runnable(),
238 })
239 if err != nil {
240 return err
241 }
242 Signal(ctx, SignalHealthy)
243 Signal(ctx, SignalDone)
244 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200245 }, WithPropagatePanic)
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200246 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100247
248 two.becomeHealthy()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200249 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100250 // Expect one to start running.
251 select {
252 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200253 default:
254 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100255 }
256
257 // Kill off two, one should restart.
258 two.die()
Serge Bazanski579015a2021-11-18 13:20:20 +0100259 <-d1
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100260
261 // And one should start running again.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200262 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100263 select {
264 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200265 default:
266 t.Fatalf("runnable 'one' didn't restart")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100267 }
268}
269
270func TestDeepFailure(t *testing.T) {
271 h1 := make(chan struct{})
272 d1 := make(chan struct{})
273 two := newRC()
274
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200275 ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100276 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100277 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100278 err := RunGroup(ctx, map[string]Runnable{
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200279 "one": runnableSpawnsMore(h1, d1, 5),
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100280 "two": two.runnable(),
281 })
282 if err != nil {
283 return err
284 }
285 Signal(ctx, SignalHealthy)
286 Signal(ctx, SignalDone)
287 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200288 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100289
290 two.becomeHealthy()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200291 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100292 // Expect one to start running.
293 select {
294 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200295 default:
296 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100297 }
298
299 // Kill off two, one should restart.
300 two.die()
Serge Bazanski579015a2021-11-18 13:20:20 +0100301 <-d1
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100302
303 // And one should start running again.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200304 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100305 select {
306 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200307 default:
308 t.Fatalf("runnable 'one' didn't restart")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100309 }
310}
311
312func TestPanic(t *testing.T) {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100313 h1 := make(chan struct{})
314 d1 := make(chan struct{})
315 two := newRC()
316
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100317 ctx, ctxC := context.WithCancel(context.Background())
318 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100319 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100320 err := RunGroup(ctx, map[string]Runnable{
321 "one": runnableBecomesHealthy(h1, d1),
322 "two": two.runnable(),
323 })
324 if err != nil {
325 return err
326 }
327 Signal(ctx, SignalHealthy)
328 Signal(ctx, SignalDone)
329 return nil
330 })
331
332 two.becomeHealthy()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200333 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100334 // Expect one to start running.
335 select {
336 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200337 default:
338 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100339 }
340
341 // Kill off two, one should restart.
342 two.panic()
Serge Bazanski579015a2021-11-18 13:20:20 +0100343 <-d1
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100344
345 // And one should start running again.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200346 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100347 select {
348 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200349 default:
350 t.Fatalf("runnable 'one' didn't restart")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100351 }
352}
353
354func TestMultipleLevelFailure(t *testing.T) {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100355 ctx, ctxC := context.WithCancel(context.Background())
356 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100357 New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100358 err := RunGroup(ctx, map[string]Runnable{
359 "one": runnableSpawnsMore(nil, nil, 4),
360 "two": runnableSpawnsMore(nil, nil, 4),
361 })
362 if err != nil {
363 return err
364 }
365 Signal(ctx, SignalHealthy)
366 Signal(ctx, SignalDone)
367 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200368 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100369}
370
371func TestBackoff(t *testing.T) {
372 one := newRC()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200373
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200374 ctx, ctxC := context.WithTimeout(context.Background(), 20*time.Second)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100375 defer ctxC()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200376
Serge Bazanskic7359672020-10-30 16:38:57 +0100377 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100378 if err := Run(ctx, "one", one.runnable()); err != nil {
379 return err
380 }
381 Signal(ctx, SignalHealthy)
382 Signal(ctx, SignalDone)
383 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200384 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100385
386 one.becomeHealthy()
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200387 // Die a bunch of times in a row, this brings up the next exponential
388 // backoff to over a second.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100389 for i := 0; i < 4; i += 1 {
390 one.die()
391 one.waitState(rcRunnableStateNew)
392 }
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200393 // Measure how long it takes for the runnable to respawn after a number of
394 // failures
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100395 start := time.Now()
396 one.die()
397 one.becomeHealthy()
398 one.waitState(rcRunnableStateHealthy)
399 taken := time.Since(start)
400 if taken < 1*time.Second {
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200401 t.Errorf("Runnable took %v to restart, wanted at least a second from backoff", taken)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100402 }
403
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200404 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100405 // Now that we've become healthy, die again. Becoming healthy resets the backoff.
406 start = time.Now()
407 one.die()
408 one.becomeHealthy()
409 one.waitState(rcRunnableStateHealthy)
410 taken = time.Since(start)
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200411 if taken > 1*time.Second || taken < 100*time.Millisecond {
412 t.Errorf("Runnable took %v to restart, wanted at least 100ms from backoff and at most 1s from backoff reset", taken)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100413 }
414}
415
Jan Schär65602092024-12-19 10:37:34 +0100416// TestCancelRestart fails a runnable, but before its restart timeout expires,
417// also fails its parent. This should cause cancelation of the restart timeout.
418func TestCancelRestart(t *testing.T) {
419 startedOuter := make(chan struct{})
420 failInner := make(chan struct{})
421 failOuter := make(chan struct{})
422
423 ctx, ctxC := context.WithCancel(context.Background())
424 defer ctxC()
425
426 New(ctx, func(ctx context.Context) error {
427 <-startedOuter
428 err := Run(ctx, "inner", func(ctx context.Context) error {
429 <-failInner
430 return fmt.Errorf("failed")
431 })
432 if err != nil {
433 return err
434 }
435 <-failOuter
436 return fmt.Errorf("failed")
437 }, WithPropagatePanic)
438
439 startedOuter <- struct{}{}
440 failInner <- struct{}{}
441 time.Sleep(10 * time.Millisecond)
442 // Before the inner runnable has restarted, fail the outer runnable.
443 failOuter <- struct{}{}
444
445 start := time.Now()
446 startedOuter <- struct{}{}
447 taken := time.Since(start)
448 // With the default backoff parameters, the initial backoff time is
449 // 0.5s +- 0.25s because of randomization. If the inner restart timer is not
450 // canceled, then it takes twice as long.
451 if taken > 1*time.Second {
452 t.Errorf("Runnable took %v to restart, wanted at most 1s", taken)
453 }
454}
455
Jan Schär08c1c722024-12-19 12:03:17 +0100456// TestDoneDelay test that a node is only considered restartable once it has
457// returned, not already when it has signaled Done. Otherwise, we can get into
458// an inconsistent state and for example panic because the node no longer
459// exists once the runnable returns.
460func TestDoneDelay(t *testing.T) {
461 startedInner := make(chan struct{})
462 failOuter := make(chan struct{})
463
464 ctx, ctxC := context.WithCancel(context.Background())
465 defer ctxC()
466
467 New(ctx, func(ctx context.Context) error {
468 err := Run(ctx, "inner", func(ctx context.Context) error {
469 Signal(ctx, SignalHealthy)
470 Signal(ctx, SignalDone)
471 <-startedInner
472 time.Sleep(10 * time.Millisecond)
473 return nil
474 })
475 if err != nil {
476 return err
477 }
478 <-failOuter
479 return fmt.Errorf("failed")
480 }, WithPropagatePanic)
481
482 startedInner <- struct{}{}
483 failOuter <- struct{}{}
484 time.Sleep(20 * time.Millisecond)
485}
486
Jan Schärfce7c762024-12-19 14:07:24 +0100487// TestCancelDoneSibling tests that a node in state DONE is restarted if it is
488// canceled because a sibling has died.
489func TestCancelDoneSibling(t *testing.T) {
490 innerRunning := make(chan struct{})
491 innerExit := make(chan struct{})
492 failSibling := make(chan struct{})
493
494 ctx, ctxC := context.WithCancel(context.Background())
495 defer ctxC()
496
497 New(ctx, func(ctx context.Context) error {
498 err := RunGroup(ctx, map[string]Runnable{
499 "done": func(ctx context.Context) error {
500 err := Run(ctx, "inner", func(ctx context.Context) error {
501 <-innerRunning
502 <-ctx.Done()
503 <-innerExit
504 return ctx.Err()
505 })
506 if err != nil {
507 return err
508 }
509 Signal(ctx, SignalHealthy)
510 Signal(ctx, SignalDone)
511 return nil
512 },
513 "sibling": func(ctx context.Context) error {
514 <-failSibling
515 return fmt.Errorf("failed")
516 },
517 })
518 if err != nil {
519 return err
520 }
521 Signal(ctx, SignalHealthy)
522 Signal(ctx, SignalDone)
523 return nil
524 }, WithPropagatePanic)
525
526 innerRunning <- struct{}{}
527 failSibling <- struct{}{}
528 // The inner node should exit and start running again.
529 innerExit <- struct{}{}
530 innerRunning <- struct{}{}
531}
532
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200533// TestResilience throws some curveballs at the supervisor - either programming
534// errors or high load. It then ensures that another runnable is running, and
535// that it restarts on its sibling failure.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100536func TestResilience(t *testing.T) {
537 // request/response channel for testing liveness of the 'one' runnable
538 req := make(chan chan struct{})
539
540 // A runnable that responds on the 'req' channel.
541 one := func(ctx context.Context) error {
542 Signal(ctx, SignalHealthy)
543 for {
544 select {
545 case <-ctx.Done():
546 return ctx.Err()
547 case r := <-req:
548 r <- struct{}{}
549 }
550 }
551 }
552 oneSibling := newRC()
553
554 oneTest := func() {
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200555 timeout := time.NewTicker(1000 * time.Millisecond)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100556 ping := make(chan struct{})
557 req <- ping
558 select {
559 case <-ping:
560 case <-timeout.C:
561 t.Fatalf("one ping response timeout")
562 }
563 timeout.Stop()
564 }
565
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200566 // A nasty runnable that calls Signal with the wrong context (this is a
567 // programming error)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100568 two := func(ctx context.Context) error {
569 Signal(context.TODO(), SignalHealthy)
570 return nil
571 }
572
573 // A nasty runnable that calls Signal wrong (this is a programming error).
574 three := func(ctx context.Context) error {
575 Signal(ctx, SignalDone)
576 return nil
577 }
578
579 // A nasty runnable that runs in a busy loop (this is a programming error).
580 four := func(ctx context.Context) error {
581 for {
582 time.Sleep(0)
583 }
584 }
585
586 // A nasty runnable that keeps creating more runnables.
587 five := func(ctx context.Context) error {
588 i := 1
589 for {
590 err := Run(ctx, fmt.Sprintf("r%d", i), runnableSpawnsMore(nil, nil, 2))
591 if err != nil {
592 return err
593 }
594
595 time.Sleep(100 * time.Millisecond)
596 i += 1
597 }
598 }
599
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100600 ctx, ctxC := context.WithCancel(context.Background())
601 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100602 New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100603 RunGroup(ctx, map[string]Runnable{
604 "one": one,
605 "oneSibling": oneSibling.runnable(),
606 })
607 rs := map[string]Runnable{
608 "two": two, "three": three, "four": four, "five": five,
609 }
610 for k, v := range rs {
611 if err := Run(ctx, k, v); err != nil {
612 return err
613 }
614 }
615 Signal(ctx, SignalHealthy)
616 Signal(ctx, SignalDone)
617 return nil
618 })
619
620 // Five rounds of letting one run, then restarting it.
621 for i := 0; i < 5; i += 1 {
622 oneSibling.becomeHealthy()
623 oneSibling.waitState(rcRunnableStateHealthy)
624
625 // 'one' should work for at least a second.
626 deadline := time.Now().Add(1 * time.Second)
627 for {
628 if time.Now().After(deadline) {
629 break
630 }
631
632 oneTest()
633 }
634
635 // Killing 'oneSibling' should restart one.
636 oneSibling.panic()
637 }
638 // Make sure 'one' is still okay.
639 oneTest()
640}
641
Serge Bazanski26d52252022-02-07 15:57:54 +0100642// TestSubLoggers exercises the reserved/sub-logger functionality of runnable
643// nodes. It ensures a sub-logger and runnable cannot have colliding names, and
644// that logging actually works.
645func TestSubLoggers(t *testing.T) {
646 ctx, ctxC := context.WithCancel(context.Background())
647 defer ctxC()
648
649 errCA := make(chan error)
650 errCB := make(chan error)
651 lt := logtree.New()
652 New(ctx, func(ctx context.Context) error {
653 err := RunGroup(ctx, map[string]Runnable{
654 // foo will first create a sublogger, then attempt to create a
655 // colliding runnable.
656 "foo": func(ctx context.Context) error {
657 sl, err := SubLogger(ctx, "dut")
658 if err != nil {
659 errCA <- fmt.Errorf("creating sub-logger: %w", err)
660 return nil
661 }
662 sl.Infof("hello from foo.dut")
663 err = Run(ctx, "dut", runnableBecomesHealthy(nil, nil))
664 if err == nil {
665 errCA <- fmt.Errorf("creating colliding runnable should have failed")
666 return nil
667 }
668 Signal(ctx, SignalHealthy)
669 Signal(ctx, SignalDone)
670 errCA <- nil
671 return nil
672 },
673 })
674 if err != nil {
675 return err
676 }
677 _, err = SubLogger(ctx, "foo")
678 if err == nil {
679 errCB <- fmt.Errorf("creating collising sub-logger should have failed")
680 return nil
681 }
682 Signal(ctx, SignalHealthy)
683 Signal(ctx, SignalDone)
684 errCB <- nil
685 return nil
686 }, WithPropagatePanic, WithExistingLogtree(lt))
687
688 err := <-errCA
689 if err != nil {
690 t.Fatalf("from root.foo: %v", err)
691 }
692 err = <-errCB
693 if err != nil {
694 t.Fatalf("from root: %v", err)
695 }
696
697 // Now enure that the expected message appears in the logtree.
698 dn := logtree.DN("root.foo.dut")
699 r, err := lt.Read(dn, logtree.WithBacklog(logtree.BacklogAllAvailable))
700 if err != nil {
701 t.Fatalf("logtree read failed: %v", err)
702 }
703 defer r.Close()
704 found := false
705 for _, e := range r.Backlog {
706 if e.DN != dn {
707 continue
708 }
709 if e.Leveled == nil {
710 continue
711 }
712 if e.Leveled.MessagesJoined() != "hello from foo.dut" {
713 continue
714 }
715 found = true
716 break
717 }
718 if !found {
719 t.Fatalf("did not find expected logline in %s", dn)
720 }
721}
722
Serge Bazanskicf864da2024-07-31 11:23:34 +0000723func TestMetrics(t *testing.T) {
724 ctx, ctxC := context.WithCancel(context.Background())
725 defer ctxC()
726
727 // Build a supervision tree with 'wait'/step channels per runnable:
728 //
729 // root: wait, start one, wait, healthy
730 // one: wait, start two, crash, wait, start two, healthy, wait, done
731 // two: wait, healthy, run forever
732 //
733 // This tree allows us to exercise a few flows, like two getting canceled when
734 // one crashes, runnables returning done, runnables staying healthy, etc.
735
736 stepRoot := make(chan struct{})
737 stepOne := make(chan struct{})
738 stepTwo := make(chan struct{})
739 m := InMemoryMetrics{}
740
741 New(ctx, func(ctx context.Context) error {
742 <-stepRoot
743
744 attempts := 0
745 Run(ctx, "one", func(ctx context.Context) error {
746 <-stepOne
747 Run(ctx, "two", func(ctx context.Context) error {
748 <-stepTwo
749 Signal(ctx, SignalHealthy)
750 <-ctx.Done()
751 return ctx.Err()
752 })
753 if attempts == 0 {
754 attempts += 1
755 return fmt.Errorf("failed")
756 }
757 Signal(ctx, SignalHealthy)
758 <-stepOne
759 Signal(ctx, SignalDone)
760 return nil
761 })
762
763 <-stepRoot
764 Signal(ctx, SignalHealthy)
765 return nil
766 }, WithPropagatePanic, WithMetrics(&m))
767
768 // expectDN waits a second until a given DN is at a given state and fails the
769 // test otherwise.
770 expectDN := func(dn string, state NodeState) {
771 t.Helper()
772 start := time.Now()
773 for {
774 snap := m.DNs()
775 if _, ok := snap[dn]; !ok {
776 if time.Since(start) > time.Second {
777 t.Fatalf("No DN %q", dn)
778 } else {
779 time.Sleep(100 * time.Millisecond)
780 continue
781 }
782 }
783 if want, got := state, snap[dn].State; want != got {
784 if time.Since(start) > time.Second {
785 t.Fatalf("Expected %q to be %s, got %s", dn, want, got)
786 } else {
787 time.Sleep(100 * time.Millisecond)
788 continue
789 }
790 }
791 break
792 }
793 }
794
795 // Make progress thorugh the runnable tree and check expected states.
796
797 expectDN("root", NodeStateNew)
798
799 stepRoot <- struct{}{}
800 expectDN("root", NodeStateNew)
801 expectDN("root.one", NodeStateNew)
802
803 stepOne <- struct{}{}
804 stepTwo <- struct{}{}
805 expectDN("root", NodeStateNew)
806 expectDN("root.one", NodeStateDead)
807 expectDN("root.one.two", NodeStateCanceled)
808
809 stepOne <- struct{}{}
810 expectDN("root", NodeStateNew)
811 expectDN("root.one", NodeStateHealthy)
812 expectDN("root.one.two", NodeStateNew)
813
814 stepOne <- struct{}{}
815 expectDN("root", NodeStateNew)
816 expectDN("root.one", NodeStateDone)
817 expectDN("root.one.two", NodeStateNew)
818
819 stepTwo <- struct{}{}
820 expectDN("root", NodeStateNew)
821 expectDN("root.one", NodeStateDone)
822 expectDN("root.one.two", NodeStateHealthy)
823}
824
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100825func ExampleNew() {
826 // Minimal runnable that is immediately done.
827 childC := make(chan struct{})
828 child := func(ctx context.Context) error {
829 Signal(ctx, SignalHealthy)
830 close(childC)
831 Signal(ctx, SignalDone)
832 return nil
833 }
834
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100835 // Start a supervision tree with a root runnable.
836 ctx, ctxC := context.WithCancel(context.Background())
837 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100838 New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100839 err := Run(ctx, "child", child)
840 if err != nil {
841 return fmt.Errorf("could not run 'child': %w", err)
842 }
843 Signal(ctx, SignalHealthy)
844
845 t := time.NewTicker(time.Second)
846 defer t.Stop()
847
848 // Do something in the background, and exit on context cancel.
849 for {
850 select {
851 case <-t.C:
852 fmt.Printf("tick!")
853 case <-ctx.Done():
854 return ctx.Err()
855 }
856 }
857 })
858
859 // root.child will close this channel.
860 <-childC
861}