blob: cf303e80c9df46c9aadf3910581313479f84e836 [file] [log] [blame]
Serge Bazanski9c09c4e2020-03-24 13:58:01 +01001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package supervisor
18
19import (
20 "context"
21 "fmt"
22 "testing"
23 "time"
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010024)
25
Serge Bazanski35e43d12021-07-06 13:12:14 +020026// waitSettle waits until the supervisor reaches a 'settled' state - ie., one
27// where no actions have been performed for a number of GC cycles.
28// This is used in tests only.
29func (s *supervisor) waitSettle(ctx context.Context) error {
30 waiter := make(chan struct{})
31 s.pReq <- &processorRequest{
32 waitSettled: &processorRequestWaitSettled{
33 waiter: waiter,
34 },
35 }
36
37 select {
38 case <-ctx.Done():
39 return ctx.Err()
40 case <-waiter:
41 return nil
42 }
43}
44
45// waitSettleError wraps waitSettle to fail a test if an error occurs, eg. the
46// context is canceled.
47func (s *supervisor) waitSettleError(ctx context.Context, t *testing.T) {
48 err := s.waitSettle(ctx)
49 if err != nil {
50 t.Fatalf("waitSettle: %v", err)
51 }
52}
53
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010054func runnableBecomesHealthy(healthy, done chan struct{}) Runnable {
55 return func(ctx context.Context) error {
56 Signal(ctx, SignalHealthy)
57
58 go func() {
59 if healthy != nil {
60 healthy <- struct{}{}
61 }
62 }()
63
64 <-ctx.Done()
65
66 go func() {
67 if done != nil {
68 done <- struct{}{}
69 }
70 }()
71
72 return ctx.Err()
73 }
74}
75
76func runnableSpawnsMore(healthy, done chan struct{}, levels int) Runnable {
77 return func(ctx context.Context) error {
78 if levels > 0 {
79 err := RunGroup(ctx, map[string]Runnable{
80 "a": runnableSpawnsMore(nil, nil, levels-1),
81 "b": runnableSpawnsMore(nil, nil, levels-1),
82 })
83 if err != nil {
84 return err
85 }
86 }
87
88 Signal(ctx, SignalHealthy)
89
90 go func() {
91 if healthy != nil {
92 healthy <- struct{}{}
93 }
94 }()
95
96 <-ctx.Done()
97
98 go func() {
99 if done != nil {
100 done <- struct{}{}
101 }
102 }()
103 return ctx.Err()
104 }
105}
106
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200107// rc is a Remote Controlled runnable. It is a generic runnable used for
108// testing the supervisor.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100109type rc struct {
110 req chan rcRunnableRequest
111}
112
113type rcRunnableRequest struct {
114 cmd rcRunnableCommand
115 stateC chan rcRunnableState
116}
117
118type rcRunnableCommand int
119
120const (
121 rcRunnableCommandBecomeHealthy rcRunnableCommand = iota
122 rcRunnableCommandBecomeDone
123 rcRunnableCommandDie
124 rcRunnableCommandPanic
125 rcRunnableCommandState
126)
127
128type rcRunnableState int
129
130const (
131 rcRunnableStateNew rcRunnableState = iota
132 rcRunnableStateHealthy
133 rcRunnableStateDone
134)
135
136func (r *rc) becomeHealthy() {
137 r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeHealthy}
138}
139
140func (r *rc) becomeDone() {
141 r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeDone}
142}
143func (r *rc) die() {
144 r.req <- rcRunnableRequest{cmd: rcRunnableCommandDie}
145}
146
147func (r *rc) panic() {
148 r.req <- rcRunnableRequest{cmd: rcRunnableCommandPanic}
149}
150
151func (r *rc) state() rcRunnableState {
152 c := make(chan rcRunnableState)
153 r.req <- rcRunnableRequest{
154 cmd: rcRunnableCommandState,
155 stateC: c,
156 }
157 return <-c
158}
159
160func (r *rc) waitState(s rcRunnableState) {
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200161 // This is poll based. Making it non-poll based would make the RC runnable
162 // logic a bit more complex for little gain.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100163 for {
164 got := r.state()
165 if got == s {
166 return
167 }
168 time.Sleep(10 * time.Millisecond)
169 }
170}
171
172func newRC() *rc {
173 return &rc{
174 req: make(chan rcRunnableRequest),
175 }
176}
177
178// Remote Controlled Runnable
179func (r *rc) runnable() Runnable {
180 return func(ctx context.Context) error {
181 state := rcRunnableStateNew
182
183 for {
184 select {
185 case <-ctx.Done():
186 return ctx.Err()
187 case r := <-r.req:
188 switch r.cmd {
189 case rcRunnableCommandBecomeHealthy:
190 Signal(ctx, SignalHealthy)
191 state = rcRunnableStateHealthy
192 case rcRunnableCommandBecomeDone:
193 Signal(ctx, SignalDone)
194 state = rcRunnableStateDone
195 case rcRunnableCommandDie:
196 return fmt.Errorf("died on request")
197 case rcRunnableCommandPanic:
198 panic("at the disco")
199 case rcRunnableCommandState:
200 r.stateC <- state
201 }
202 }
203 }
204 }
205}
206
207func TestSimple(t *testing.T) {
208 h1 := make(chan struct{})
209 d1 := make(chan struct{})
210 h2 := make(chan struct{})
211 d2 := make(chan struct{})
212
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100213 ctx, ctxC := context.WithCancel(context.Background())
214 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100215 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100216 err := RunGroup(ctx, map[string]Runnable{
217 "one": runnableBecomesHealthy(h1, d1),
218 "two": runnableBecomesHealthy(h2, d2),
219 })
220 if err != nil {
221 return err
222 }
223 Signal(ctx, SignalHealthy)
224 Signal(ctx, SignalDone)
225 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200226 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100227
228 // Expect both to start running.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200229 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100230 select {
231 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200232 default:
233 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100234 }
235 select {
236 case <-h2:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200237 default:
238 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100239 }
240}
241
242func TestSimpleFailure(t *testing.T) {
243 h1 := make(chan struct{})
244 d1 := make(chan struct{})
245 two := newRC()
246
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200247 ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100248 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100249 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100250 err := RunGroup(ctx, map[string]Runnable{
251 "one": runnableBecomesHealthy(h1, d1),
252 "two": two.runnable(),
253 })
254 if err != nil {
255 return err
256 }
257 Signal(ctx, SignalHealthy)
258 Signal(ctx, SignalDone)
259 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200260 }, WithPropagatePanic)
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200261 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100262
263 two.becomeHealthy()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200264 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100265 // Expect one to start running.
266 select {
267 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200268 default:
269 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100270 }
271
272 // Kill off two, one should restart.
273 two.die()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200274 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100275 select {
276 case <-d1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200277 default:
278 t.Fatalf("runnable 'one' didn't acknowledge cancel")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100279 }
280
281 // And one should start running again.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200282 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100283 select {
284 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200285 default:
286 t.Fatalf("runnable 'one' didn't restart")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100287 }
288}
289
290func TestDeepFailure(t *testing.T) {
291 h1 := make(chan struct{})
292 d1 := make(chan struct{})
293 two := newRC()
294
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200295 ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100296 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100297 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100298 err := RunGroup(ctx, map[string]Runnable{
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200299 "one": runnableSpawnsMore(h1, d1, 5),
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100300 "two": two.runnable(),
301 })
302 if err != nil {
303 return err
304 }
305 Signal(ctx, SignalHealthy)
306 Signal(ctx, SignalDone)
307 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200308 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100309
310 two.becomeHealthy()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200311 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100312 // Expect one to start running.
313 select {
314 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200315 default:
316 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100317 }
318
319 // Kill off two, one should restart.
320 two.die()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200321 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100322 select {
323 case <-d1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200324 default:
325 t.Fatalf("runnable 'one' didn't acknowledge cancel")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100326 }
327
328 // And one should start running again.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200329 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100330 select {
331 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200332 default:
333 t.Fatalf("runnable 'one' didn't restart")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100334 }
335}
336
337func TestPanic(t *testing.T) {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100338 h1 := make(chan struct{})
339 d1 := make(chan struct{})
340 two := newRC()
341
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100342 ctx, ctxC := context.WithCancel(context.Background())
343 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100344 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100345 err := RunGroup(ctx, map[string]Runnable{
346 "one": runnableBecomesHealthy(h1, d1),
347 "two": two.runnable(),
348 })
349 if err != nil {
350 return err
351 }
352 Signal(ctx, SignalHealthy)
353 Signal(ctx, SignalDone)
354 return nil
355 })
356
357 two.becomeHealthy()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200358 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100359 // Expect one to start running.
360 select {
361 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200362 default:
363 t.Fatalf("runnable 'one' didn't start")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100364 }
365
366 // Kill off two, one should restart.
367 two.panic()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200368 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100369 select {
370 case <-d1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200371 default:
372 t.Fatalf("runnable 'one' didn't acknowledge cancel")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100373 }
374
375 // And one should start running again.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200376 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100377 select {
378 case <-h1:
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200379 default:
380 t.Fatalf("runnable 'one' didn't restart")
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100381 }
382}
383
384func TestMultipleLevelFailure(t *testing.T) {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100385 ctx, ctxC := context.WithCancel(context.Background())
386 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100387 New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100388 err := RunGroup(ctx, map[string]Runnable{
389 "one": runnableSpawnsMore(nil, nil, 4),
390 "two": runnableSpawnsMore(nil, nil, 4),
391 })
392 if err != nil {
393 return err
394 }
395 Signal(ctx, SignalHealthy)
396 Signal(ctx, SignalDone)
397 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200398 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100399}
400
401func TestBackoff(t *testing.T) {
402 one := newRC()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200403
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200404 ctx, ctxC := context.WithTimeout(context.Background(), 20*time.Second)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100405 defer ctxC()
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200406
Serge Bazanskic7359672020-10-30 16:38:57 +0100407 s := New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100408 if err := Run(ctx, "one", one.runnable()); err != nil {
409 return err
410 }
411 Signal(ctx, SignalHealthy)
412 Signal(ctx, SignalDone)
413 return nil
Serge Bazanski19bb4122020-05-04 17:57:50 +0200414 }, WithPropagatePanic)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100415
416 one.becomeHealthy()
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200417 // Die a bunch of times in a row, this brings up the next exponential
418 // backoff to over a second.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100419 for i := 0; i < 4; i += 1 {
420 one.die()
421 one.waitState(rcRunnableStateNew)
422 }
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200423 // Measure how long it takes for the runnable to respawn after a number of
424 // failures
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100425 start := time.Now()
426 one.die()
427 one.becomeHealthy()
428 one.waitState(rcRunnableStateHealthy)
429 taken := time.Since(start)
430 if taken < 1*time.Second {
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200431 t.Errorf("Runnable took %v to restart, wanted at least a second from backoff", taken)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100432 }
433
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200434 s.waitSettleError(ctx, t)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100435 // Now that we've become healthy, die again. Becoming healthy resets the backoff.
436 start = time.Now()
437 one.die()
438 one.becomeHealthy()
439 one.waitState(rcRunnableStateHealthy)
440 taken = time.Since(start)
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200441 if taken > 1*time.Second || taken < 100*time.Millisecond {
442 t.Errorf("Runnable took %v to restart, wanted at least 100ms from backoff and at most 1s from backoff reset", taken)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100443 }
444}
445
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200446// TestResilience throws some curveballs at the supervisor - either programming
447// errors or high load. It then ensures that another runnable is running, and
448// that it restarts on its sibling failure.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100449func TestResilience(t *testing.T) {
450 // request/response channel for testing liveness of the 'one' runnable
451 req := make(chan chan struct{})
452
453 // A runnable that responds on the 'req' channel.
454 one := func(ctx context.Context) error {
455 Signal(ctx, SignalHealthy)
456 for {
457 select {
458 case <-ctx.Done():
459 return ctx.Err()
460 case r := <-req:
461 r <- struct{}{}
462 }
463 }
464 }
465 oneSibling := newRC()
466
467 oneTest := func() {
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200468 timeout := time.NewTicker(1000 * time.Millisecond)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100469 ping := make(chan struct{})
470 req <- ping
471 select {
472 case <-ping:
473 case <-timeout.C:
474 t.Fatalf("one ping response timeout")
475 }
476 timeout.Stop()
477 }
478
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200479 // A nasty runnable that calls Signal with the wrong context (this is a
480 // programming error)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100481 two := func(ctx context.Context) error {
482 Signal(context.TODO(), SignalHealthy)
483 return nil
484 }
485
486 // A nasty runnable that calls Signal wrong (this is a programming error).
487 three := func(ctx context.Context) error {
488 Signal(ctx, SignalDone)
489 return nil
490 }
491
492 // A nasty runnable that runs in a busy loop (this is a programming error).
493 four := func(ctx context.Context) error {
494 for {
495 time.Sleep(0)
496 }
497 }
498
499 // A nasty runnable that keeps creating more runnables.
500 five := func(ctx context.Context) error {
501 i := 1
502 for {
503 err := Run(ctx, fmt.Sprintf("r%d", i), runnableSpawnsMore(nil, nil, 2))
504 if err != nil {
505 return err
506 }
507
508 time.Sleep(100 * time.Millisecond)
509 i += 1
510 }
511 }
512
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100513 ctx, ctxC := context.WithCancel(context.Background())
514 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100515 New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100516 RunGroup(ctx, map[string]Runnable{
517 "one": one,
518 "oneSibling": oneSibling.runnable(),
519 })
520 rs := map[string]Runnable{
521 "two": two, "three": three, "four": four, "five": five,
522 }
523 for k, v := range rs {
524 if err := Run(ctx, k, v); err != nil {
525 return err
526 }
527 }
528 Signal(ctx, SignalHealthy)
529 Signal(ctx, SignalDone)
530 return nil
531 })
532
533 // Five rounds of letting one run, then restarting it.
534 for i := 0; i < 5; i += 1 {
535 oneSibling.becomeHealthy()
536 oneSibling.waitState(rcRunnableStateHealthy)
537
538 // 'one' should work for at least a second.
539 deadline := time.Now().Add(1 * time.Second)
540 for {
541 if time.Now().After(deadline) {
542 break
543 }
544
545 oneTest()
546 }
547
548 // Killing 'oneSibling' should restart one.
549 oneSibling.panic()
550 }
551 // Make sure 'one' is still okay.
552 oneTest()
553}
554
555func ExampleNew() {
556 // Minimal runnable that is immediately done.
557 childC := make(chan struct{})
558 child := func(ctx context.Context) error {
559 Signal(ctx, SignalHealthy)
560 close(childC)
561 Signal(ctx, SignalDone)
562 return nil
563 }
564
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100565 // Start a supervision tree with a root runnable.
566 ctx, ctxC := context.WithCancel(context.Background())
567 defer ctxC()
Serge Bazanskic7359672020-10-30 16:38:57 +0100568 New(ctx, func(ctx context.Context) error {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100569 err := Run(ctx, "child", child)
570 if err != nil {
571 return fmt.Errorf("could not run 'child': %w", err)
572 }
573 Signal(ctx, SignalHealthy)
574
575 t := time.NewTicker(time.Second)
576 defer t.Stop()
577
578 // Do something in the background, and exit on context cancel.
579 for {
580 select {
581 case <-t.C:
582 fmt.Printf("tick!")
583 case <-ctx.Done():
584 return ctx.Err()
585 }
586 }
587 })
588
589 // root.child will close this channel.
590 <-childC
591}