blob: 3b8129193f7f2749998f2b4e59ed514abb6cf795 [file] [log] [blame]
Serge Bazanski9c09c4e2020-03-24 13:58:01 +01001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package supervisor
18
19import (
20 "context"
21 "fmt"
22 "testing"
23 "time"
Serge Bazanski26d52252022-02-07 15:57:54 +010024
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020025 "source.monogon.dev/osbase/logtree"
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010026)
27
Serge Bazanski35e43d12021-07-06 13:12:14 +020028// waitSettle waits until the supervisor reaches a 'settled' state - ie., one
29// where no actions have been performed for a number of GC cycles.
30// This is used in tests only.
31func (s *supervisor) waitSettle(ctx context.Context) error {
32 waiter := make(chan struct{})
33 s.pReq <- &processorRequest{
34 waitSettled: &processorRequestWaitSettled{
35 waiter: waiter,
36 },
37 }
38
39 select {
40 case <-ctx.Done():
41 return ctx.Err()
42 case <-waiter:
43 return nil
44 }
45}
46
47// waitSettleError wraps waitSettle to fail a test if an error occurs, eg. the
48// context is canceled.
49func (s *supervisor) waitSettleError(ctx context.Context, t *testing.T) {
50 err := s.waitSettle(ctx)
51 if err != nil {
52 t.Fatalf("waitSettle: %v", err)
53 }
54}
55
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010056func runnableBecomesHealthy(healthy, done chan struct{}) Runnable {
57 return func(ctx context.Context) error {
58 Signal(ctx, SignalHealthy)
59
60 go func() {
61 if healthy != nil {
62 healthy <- struct{}{}
63 }
64 }()
65
66 <-ctx.Done()
67
Serge Bazanski579015a2021-11-18 13:20:20 +010068 if done != nil {
69 done <- struct{}{}
70 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010071
72 return ctx.Err()
73 }
74}
75
76func runnableSpawnsMore(healthy, done chan struct{}, levels int) Runnable {
77 return func(ctx context.Context) error {
78 if levels > 0 {
79 err := RunGroup(ctx, map[string]Runnable{
80 "a": runnableSpawnsMore(nil, nil, levels-1),
81 "b": runnableSpawnsMore(nil, nil, levels-1),
82 })
83 if err != nil {
84 return err
85 }
86 }
87
88 Signal(ctx, SignalHealthy)
89
90 go func() {
91 if healthy != nil {
92 healthy <- struct{}{}
93 }
94 }()
95
96 <-ctx.Done()
97
Serge Bazanski579015a2021-11-18 13:20:20 +010098 if done != nil {
99 done <- struct{}{}
100 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100101 return ctx.Err()
102 }
103}
104
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200105// rc is a Remote Controlled runnable. It is a generic runnable used for
106// testing the supervisor.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100107type rc struct {
108 req chan rcRunnableRequest
109}
110
111type rcRunnableRequest struct {
112 cmd rcRunnableCommand
113 stateC chan rcRunnableState
114}
115
116type rcRunnableCommand int
117
118const (
119 rcRunnableCommandBecomeHealthy rcRunnableCommand = iota
120 rcRunnableCommandBecomeDone
121 rcRunnableCommandDie
122 rcRunnableCommandPanic
123 rcRunnableCommandState
124)
125
126type rcRunnableState int
127
128const (
129 rcRunnableStateNew rcRunnableState = iota
130 rcRunnableStateHealthy
131 rcRunnableStateDone
132)
133
134func (r *rc) becomeHealthy() {
135 r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeHealthy}
136}
137
138func (r *rc) becomeDone() {
139 r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeDone}
140}
141func (r *rc) die() {
142 r.req <- rcRunnableRequest{cmd: rcRunnableCommandDie}
143}
144
145func (r *rc) panic() {
146 r.req <- rcRunnableRequest{cmd: rcRunnableCommandPanic}
147}
148
149func (r *rc) state() rcRunnableState {
150 c := make(chan rcRunnableState)
151 r.req <- rcRunnableRequest{
152 cmd: rcRunnableCommandState,
153 stateC: c,
154 }
155 return <-c
156}
157
158func (r *rc) waitState(s rcRunnableState) {
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200159 // This is poll based. Making it non-poll based would make the RC runnable
160 // logic a bit more complex for little gain.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100161 for {
162 got := r.state()
163 if got == s {
164 return
165 }
166 time.Sleep(10 * time.Millisecond)
167 }
168}
169
170func newRC() *rc {
171 return &rc{
172 req: make(chan rcRunnableRequest),
173 }
174}
175
176// Remote Controlled Runnable
177func (r *rc) runnable() Runnable {
178 return func(ctx context.Context) error {
179 state := rcRunnableStateNew
180
181 for {
182 select {
183 case <-ctx.Done():
184 return ctx.Err()
185 case r := <-r.req:
186 switch r.cmd {
187 case rcRunnableCommandBecomeHealthy:
188 Signal(ctx, SignalHealthy)
189 state = rcRunnableStateHealthy
190 case rcRunnableCommandBecomeDone:
191 Signal(ctx, SignalDone)
192 state = rcRunnableStateDone
193 case rcRunnableCommandDie:
194 return fmt.Errorf("died on request")
195 case rcRunnableCommandPanic:
196 panic("at the disco")
197 case rcRunnableCommandState:
198 r.stateC <- state
199 }
200 }
201 }
202 }
203}
204
205func TestSimple(t *testing.T) {
206 h1 := make(chan struct{})
207 d1 := make(chan struct{})
208 h2 := make(chan struct{})
209 d2 := make(chan struct{})
210
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100211 ctx, ctxC := context.WithCancel(context.Background())
212 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100213 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100214 err := RunGroup(ctx, map[string]Runnable{
215 "one": runnableBecomesHealthy(h1, d1),
216 "two": runnableBecomesHealthy(h2, d2),
217 })
218 if err != nil {
219 return err
220 }
221 Signal(ctx, SignalHealthy)
222 Signal(ctx, SignalDone)
223 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200224 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100225
226 // Expect both to start running.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200227 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100228 select {
229 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200230 default:
231 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100232 }
233 select {
234 case <-h2:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200235 default:
236 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100237 }
238}
239
240func TestSimpleFailure(t *testing.T) {
241 h1 := make(chan struct{})
242 d1 := make(chan struct{})
243 two := newRC()
244
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200245 ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100246 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100247 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100248 err := RunGroup(ctx, map[string]Runnable{
249 "one": runnableBecomesHealthy(h1, d1),
250 "two": two.runnable(),
251 })
252 if err != nil {
253 return err
254 }
255 Signal(ctx, SignalHealthy)
256 Signal(ctx, SignalDone)
257 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200258 }, WithPropagatePanic)
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200259 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100260
261 two.becomeHealthy()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200262 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100263 // Expect one to start running.
264 select {
265 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200266 default:
267 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100268 }
269
270 // Kill off two, one should restart.
271 two.die()
Serge Bazanski579015a2021-11-18 13:20:20 +0100272 <-d1
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100273
274 // And one should start running again.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200275 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100276 select {
277 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200278 default:
279 t.Fatalf("runnable 'one' didn't restart")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100280 }
281}
282
283func TestDeepFailure(t *testing.T) {
284 h1 := make(chan struct{})
285 d1 := make(chan struct{})
286 two := newRC()
287
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200288 ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100289 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100290 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100291 err := RunGroup(ctx, map[string]Runnable{
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200292 "one": runnableSpawnsMore(h1, d1, 5),
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100293 "two": two.runnable(),
294 })
295 if err != nil {
296 return err
297 }
298 Signal(ctx, SignalHealthy)
299 Signal(ctx, SignalDone)
300 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200301 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100302
303 two.becomeHealthy()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200304 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100305 // Expect one to start running.
306 select {
307 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200308 default:
309 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100310 }
311
312 // Kill off two, one should restart.
313 two.die()
Serge Bazanski579015a2021-11-18 13:20:20 +0100314 <-d1
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100315
316 // And one should start running again.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200317 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100318 select {
319 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200320 default:
321 t.Fatalf("runnable 'one' didn't restart")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100322 }
323}
324
325func TestPanic(t *testing.T) {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100326 h1 := make(chan struct{})
327 d1 := make(chan struct{})
328 two := newRC()
329
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100330 ctx, ctxC := context.WithCancel(context.Background())
331 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100332 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100333 err := RunGroup(ctx, map[string]Runnable{
334 "one": runnableBecomesHealthy(h1, d1),
335 "two": two.runnable(),
336 })
337 if err != nil {
338 return err
339 }
340 Signal(ctx, SignalHealthy)
341 Signal(ctx, SignalDone)
342 return nil
343 })
344
345 two.becomeHealthy()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200346 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100347 // Expect one to start running.
348 select {
349 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200350 default:
351 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100352 }
353
354 // Kill off two, one should restart.
355 two.panic()
Serge Bazanski579015a2021-11-18 13:20:20 +0100356 <-d1
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100357
358 // And one should start running again.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200359 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100360 select {
361 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200362 default:
363 t.Fatalf("runnable 'one' didn't restart")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100364 }
365}
366
367func TestMultipleLevelFailure(t *testing.T) {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100368 ctx, ctxC := context.WithCancel(context.Background())
369 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100370 New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100371 err := RunGroup(ctx, map[string]Runnable{
372 "one": runnableSpawnsMore(nil, nil, 4),
373 "two": runnableSpawnsMore(nil, nil, 4),
374 })
375 if err != nil {
376 return err
377 }
378 Signal(ctx, SignalHealthy)
379 Signal(ctx, SignalDone)
380 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200381 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100382}
383
384func TestBackoff(t *testing.T) {
385 one := newRC()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200386
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200387 ctx, ctxC := context.WithTimeout(context.Background(), 20*time.Second)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100388 defer ctxC()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200389
Serge Bazanskic7359672020-10-30 16:38:57 +0100390 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100391 if err := Run(ctx, "one", one.runnable()); err != nil {
392 return err
393 }
394 Signal(ctx, SignalHealthy)
395 Signal(ctx, SignalDone)
396 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200397 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100398
399 one.becomeHealthy()
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200400 // Die a bunch of times in a row, this brings up the next exponential
401 // backoff to over a second.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100402 for i := 0; i < 4; i += 1 {
403 one.die()
404 one.waitState(rcRunnableStateNew)
405 }
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200406 // Measure how long it takes for the runnable to respawn after a number of
407 // failures
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100408 start := time.Now()
409 one.die()
410 one.becomeHealthy()
411 one.waitState(rcRunnableStateHealthy)
412 taken := time.Since(start)
413 if taken < 1*time.Second {
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200414 t.Errorf("Runnable took %v to restart, wanted at least a second from backoff", taken)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100415 }
416
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200417 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100418 // Now that we've become healthy, die again. Becoming healthy resets the backoff.
419 start = time.Now()
420 one.die()
421 one.becomeHealthy()
422 one.waitState(rcRunnableStateHealthy)
423 taken = time.Since(start)
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200424 if taken > 1*time.Second || taken < 100*time.Millisecond {
425 t.Errorf("Runnable took %v to restart, wanted at least 100ms from backoff and at most 1s from backoff reset", taken)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100426 }
427}
428
Jan Schär65602092024-12-19 10:37:34 +0100429// TestCancelRestart fails a runnable, but before its restart timeout expires,
430// also fails its parent. This should cause cancelation of the restart timeout.
431func TestCancelRestart(t *testing.T) {
432 startedOuter := make(chan struct{})
433 failInner := make(chan struct{})
434 failOuter := make(chan struct{})
435
436 ctx, ctxC := context.WithCancel(context.Background())
437 defer ctxC()
438
439 New(ctx, func(ctx context.Context) error {
440 <-startedOuter
441 err := Run(ctx, "inner", func(ctx context.Context) error {
442 <-failInner
443 return fmt.Errorf("failed")
444 })
445 if err != nil {
446 return err
447 }
448 <-failOuter
449 return fmt.Errorf("failed")
450 }, WithPropagatePanic)
451
452 startedOuter <- struct{}{}
453 failInner <- struct{}{}
454 time.Sleep(10 * time.Millisecond)
455 // Before the inner runnable has restarted, fail the outer runnable.
456 failOuter <- struct{}{}
457
458 start := time.Now()
459 startedOuter <- struct{}{}
460 taken := time.Since(start)
461 // With the default backoff parameters, the initial backoff time is
462 // 0.5s +- 0.25s because of randomization. If the inner restart timer is not
463 // canceled, then it takes twice as long.
464 if taken > 1*time.Second {
465 t.Errorf("Runnable took %v to restart, wanted at most 1s", taken)
466 }
467}
468
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200469// TestResilience throws some curveballs at the supervisor - either programming
470// errors or high load. It then ensures that another runnable is running, and
471// that it restarts on its sibling failure.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100472func TestResilience(t *testing.T) {
473 // request/response channel for testing liveness of the 'one' runnable
474 req := make(chan chan struct{})
475
476 // A runnable that responds on the 'req' channel.
477 one := func(ctx context.Context) error {
478 Signal(ctx, SignalHealthy)
479 for {
480 select {
481 case <-ctx.Done():
482 return ctx.Err()
483 case r := <-req:
484 r <- struct{}{}
485 }
486 }
487 }
488 oneSibling := newRC()
489
490 oneTest := func() {
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200491 timeout := time.NewTicker(1000 * time.Millisecond)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100492 ping := make(chan struct{})
493 req <- ping
494 select {
495 case <-ping:
496 case <-timeout.C:
497 t.Fatalf("one ping response timeout")
498 }
499 timeout.Stop()
500 }
501
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200502 // A nasty runnable that calls Signal with the wrong context (this is a
503 // programming error)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100504 two := func(ctx context.Context) error {
505 Signal(context.TODO(), SignalHealthy)
506 return nil
507 }
508
509 // A nasty runnable that calls Signal wrong (this is a programming error).
510 three := func(ctx context.Context) error {
511 Signal(ctx, SignalDone)
512 return nil
513 }
514
515 // A nasty runnable that runs in a busy loop (this is a programming error).
516 four := func(ctx context.Context) error {
517 for {
518 time.Sleep(0)
519 }
520 }
521
522 // A nasty runnable that keeps creating more runnables.
523 five := func(ctx context.Context) error {
524 i := 1
525 for {
526 err := Run(ctx, fmt.Sprintf("r%d", i), runnableSpawnsMore(nil, nil, 2))
527 if err != nil {
528 return err
529 }
530
531 time.Sleep(100 * time.Millisecond)
532 i += 1
533 }
534 }
535
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100536 ctx, ctxC := context.WithCancel(context.Background())
537 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100538 New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100539 RunGroup(ctx, map[string]Runnable{
540 "one": one,
541 "oneSibling": oneSibling.runnable(),
542 })
543 rs := map[string]Runnable{
544 "two": two, "three": three, "four": four, "five": five,
545 }
546 for k, v := range rs {
547 if err := Run(ctx, k, v); err != nil {
548 return err
549 }
550 }
551 Signal(ctx, SignalHealthy)
552 Signal(ctx, SignalDone)
553 return nil
554 })
555
556 // Five rounds of letting one run, then restarting it.
557 for i := 0; i < 5; i += 1 {
558 oneSibling.becomeHealthy()
559 oneSibling.waitState(rcRunnableStateHealthy)
560
561 // 'one' should work for at least a second.
562 deadline := time.Now().Add(1 * time.Second)
563 for {
564 if time.Now().After(deadline) {
565 break
566 }
567
568 oneTest()
569 }
570
571 // Killing 'oneSibling' should restart one.
572 oneSibling.panic()
573 }
574 // Make sure 'one' is still okay.
575 oneTest()
576}
577
Serge Bazanski26d52252022-02-07 15:57:54 +0100578// TestSubLoggers exercises the reserved/sub-logger functionality of runnable
579// nodes. It ensures a sub-logger and runnable cannot have colliding names, and
580// that logging actually works.
581func TestSubLoggers(t *testing.T) {
582 ctx, ctxC := context.WithCancel(context.Background())
583 defer ctxC()
584
585 errCA := make(chan error)
586 errCB := make(chan error)
587 lt := logtree.New()
588 New(ctx, func(ctx context.Context) error {
589 err := RunGroup(ctx, map[string]Runnable{
590 // foo will first create a sublogger, then attempt to create a
591 // colliding runnable.
592 "foo": func(ctx context.Context) error {
593 sl, err := SubLogger(ctx, "dut")
594 if err != nil {
595 errCA <- fmt.Errorf("creating sub-logger: %w", err)
596 return nil
597 }
598 sl.Infof("hello from foo.dut")
599 err = Run(ctx, "dut", runnableBecomesHealthy(nil, nil))
600 if err == nil {
601 errCA <- fmt.Errorf("creating colliding runnable should have failed")
602 return nil
603 }
604 Signal(ctx, SignalHealthy)
605 Signal(ctx, SignalDone)
606 errCA <- nil
607 return nil
608 },
609 })
610 if err != nil {
611 return err
612 }
613 _, err = SubLogger(ctx, "foo")
614 if err == nil {
615 errCB <- fmt.Errorf("creating collising sub-logger should have failed")
616 return nil
617 }
618 Signal(ctx, SignalHealthy)
619 Signal(ctx, SignalDone)
620 errCB <- nil
621 return nil
622 }, WithPropagatePanic, WithExistingLogtree(lt))
623
624 err := <-errCA
625 if err != nil {
626 t.Fatalf("from root.foo: %v", err)
627 }
628 err = <-errCB
629 if err != nil {
630 t.Fatalf("from root: %v", err)
631 }
632
633 // Now enure that the expected message appears in the logtree.
634 dn := logtree.DN("root.foo.dut")
635 r, err := lt.Read(dn, logtree.WithBacklog(logtree.BacklogAllAvailable))
636 if err != nil {
637 t.Fatalf("logtree read failed: %v", err)
638 }
639 defer r.Close()
640 found := false
641 for _, e := range r.Backlog {
642 if e.DN != dn {
643 continue
644 }
645 if e.Leveled == nil {
646 continue
647 }
648 if e.Leveled.MessagesJoined() != "hello from foo.dut" {
649 continue
650 }
651 found = true
652 break
653 }
654 if !found {
655 t.Fatalf("did not find expected logline in %s", dn)
656 }
657}
658
Serge Bazanskicf864da2024-07-31 11:23:34 +0000659func TestMetrics(t *testing.T) {
660 ctx, ctxC := context.WithCancel(context.Background())
661 defer ctxC()
662
663 // Build a supervision tree with 'wait'/step channels per runnable:
664 //
665 // root: wait, start one, wait, healthy
666 // one: wait, start two, crash, wait, start two, healthy, wait, done
667 // two: wait, healthy, run forever
668 //
669 // This tree allows us to exercise a few flows, like two getting canceled when
670 // one crashes, runnables returning done, runnables staying healthy, etc.
671
672 stepRoot := make(chan struct{})
673 stepOne := make(chan struct{})
674 stepTwo := make(chan struct{})
675 m := InMemoryMetrics{}
676
677 New(ctx, func(ctx context.Context) error {
678 <-stepRoot
679
680 attempts := 0
681 Run(ctx, "one", func(ctx context.Context) error {
682 <-stepOne
683 Run(ctx, "two", func(ctx context.Context) error {
684 <-stepTwo
685 Signal(ctx, SignalHealthy)
686 <-ctx.Done()
687 return ctx.Err()
688 })
689 if attempts == 0 {
690 attempts += 1
691 return fmt.Errorf("failed")
692 }
693 Signal(ctx, SignalHealthy)
694 <-stepOne
695 Signal(ctx, SignalDone)
696 return nil
697 })
698
699 <-stepRoot
700 Signal(ctx, SignalHealthy)
701 return nil
702 }, WithPropagatePanic, WithMetrics(&m))
703
704 // expectDN waits a second until a given DN is at a given state and fails the
705 // test otherwise.
706 expectDN := func(dn string, state NodeState) {
707 t.Helper()
708 start := time.Now()
709 for {
710 snap := m.DNs()
711 if _, ok := snap[dn]; !ok {
712 if time.Since(start) > time.Second {
713 t.Fatalf("No DN %q", dn)
714 } else {
715 time.Sleep(100 * time.Millisecond)
716 continue
717 }
718 }
719 if want, got := state, snap[dn].State; want != got {
720 if time.Since(start) > time.Second {
721 t.Fatalf("Expected %q to be %s, got %s", dn, want, got)
722 } else {
723 time.Sleep(100 * time.Millisecond)
724 continue
725 }
726 }
727 break
728 }
729 }
730
731 // Make progress thorugh the runnable tree and check expected states.
732
733 expectDN("root", NodeStateNew)
734
735 stepRoot <- struct{}{}
736 expectDN("root", NodeStateNew)
737 expectDN("root.one", NodeStateNew)
738
739 stepOne <- struct{}{}
740 stepTwo <- struct{}{}
741 expectDN("root", NodeStateNew)
742 expectDN("root.one", NodeStateDead)
743 expectDN("root.one.two", NodeStateCanceled)
744
745 stepOne <- struct{}{}
746 expectDN("root", NodeStateNew)
747 expectDN("root.one", NodeStateHealthy)
748 expectDN("root.one.two", NodeStateNew)
749
750 stepOne <- struct{}{}
751 expectDN("root", NodeStateNew)
752 expectDN("root.one", NodeStateDone)
753 expectDN("root.one.two", NodeStateNew)
754
755 stepTwo <- struct{}{}
756 expectDN("root", NodeStateNew)
757 expectDN("root.one", NodeStateDone)
758 expectDN("root.one.two", NodeStateHealthy)
759}
760
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100761func ExampleNew() {
762 // Minimal runnable that is immediately done.
763 childC := make(chan struct{})
764 child := func(ctx context.Context) error {
765 Signal(ctx, SignalHealthy)
766 close(childC)
767 Signal(ctx, SignalDone)
768 return nil
769 }
770
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100771 // Start a supervision tree with a root runnable.
772 ctx, ctxC := context.WithCancel(context.Background())
773 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100774 New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100775 err := Run(ctx, "child", child)
776 if err != nil {
777 return fmt.Errorf("could not run 'child': %w", err)
778 }
779 Signal(ctx, SignalHealthy)
780
781 t := time.NewTicker(time.Second)
782 defer t.Stop()
783
784 // Do something in the background, and exit on context cancel.
785 for {
786 select {
787 case <-t.C:
788 fmt.Printf("tick!")
789 case <-ctx.Done():
790 return ctx.Err()
791 }
792 }
793 })
794
795 // root.child will close this channel.
796 <-childC
797}