| Tim Windelschmidt | 6d33a43 | 2025-02-04 14:34:25 +0100 | [diff] [blame] | 1 | // Copyright The Monogon Project Authors. |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 2 | // SPDX-License-Identifier: Apache-2.0 |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 3 | |
| 4 | package supervisor |
| 5 | |
| 6 | import ( |
| 7 | "context" |
| 8 | "fmt" |
| 9 | "testing" |
| 10 | "time" |
| Serge Bazanski | 26d5225 | 2022-02-07 15:57:54 +0100 | [diff] [blame] | 11 | |
| Tim Windelschmidt | 9f21f53 | 2024-05-07 15:14:20 +0200 | [diff] [blame] | 12 | "source.monogon.dev/osbase/logtree" |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 13 | ) |
| 14 | |
| Serge Bazanski | 35e43d1 | 2021-07-06 13:12:14 +0200 | [diff] [blame] | 15 | // waitSettle waits until the supervisor reaches a 'settled' state - ie., one |
| 16 | // where no actions have been performed for a number of GC cycles. |
| 17 | // This is used in tests only. |
| 18 | func (s *supervisor) waitSettle(ctx context.Context) error { |
| 19 | waiter := make(chan struct{}) |
| 20 | s.pReq <- &processorRequest{ |
| 21 | waitSettled: &processorRequestWaitSettled{ |
| 22 | waiter: waiter, |
| 23 | }, |
| 24 | } |
| 25 | |
| 26 | select { |
| 27 | case <-ctx.Done(): |
| 28 | return ctx.Err() |
| 29 | case <-waiter: |
| 30 | return nil |
| 31 | } |
| 32 | } |
| 33 | |
| 34 | // waitSettleError wraps waitSettle to fail a test if an error occurs, eg. the |
| 35 | // context is canceled. |
| 36 | func (s *supervisor) waitSettleError(ctx context.Context, t *testing.T) { |
| 37 | err := s.waitSettle(ctx) |
| 38 | if err != nil { |
| 39 | t.Fatalf("waitSettle: %v", err) |
| 40 | } |
| 41 | } |
| 42 | |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 43 | func runnableBecomesHealthy(healthy, done chan struct{}) Runnable { |
| 44 | return func(ctx context.Context) error { |
| 45 | Signal(ctx, SignalHealthy) |
| 46 | |
| 47 | go func() { |
| 48 | if healthy != nil { |
| 49 | healthy <- struct{}{} |
| 50 | } |
| 51 | }() |
| 52 | |
| 53 | <-ctx.Done() |
| 54 | |
| Serge Bazanski | 579015a | 2021-11-18 13:20:20 +0100 | [diff] [blame] | 55 | if done != nil { |
| 56 | done <- struct{}{} |
| 57 | } |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 58 | |
| 59 | return ctx.Err() |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | func runnableSpawnsMore(healthy, done chan struct{}, levels int) Runnable { |
| 64 | return func(ctx context.Context) error { |
| 65 | if levels > 0 { |
| 66 | err := RunGroup(ctx, map[string]Runnable{ |
| 67 | "a": runnableSpawnsMore(nil, nil, levels-1), |
| 68 | "b": runnableSpawnsMore(nil, nil, levels-1), |
| 69 | }) |
| 70 | if err != nil { |
| 71 | return err |
| 72 | } |
| 73 | } |
| 74 | |
| 75 | Signal(ctx, SignalHealthy) |
| 76 | |
| 77 | go func() { |
| 78 | if healthy != nil { |
| 79 | healthy <- struct{}{} |
| 80 | } |
| 81 | }() |
| 82 | |
| 83 | <-ctx.Done() |
| 84 | |
| Serge Bazanski | 579015a | 2021-11-18 13:20:20 +0100 | [diff] [blame] | 85 | if done != nil { |
| 86 | done <- struct{}{} |
| 87 | } |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 88 | return ctx.Err() |
| 89 | } |
| 90 | } |
| 91 | |
| Serge Bazanski | 216fe7b | 2021-05-21 18:36:16 +0200 | [diff] [blame] | 92 | // rc is a Remote Controlled runnable. It is a generic runnable used for |
| 93 | // testing the supervisor. |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 94 | type rc struct { |
| 95 | req chan rcRunnableRequest |
| 96 | } |
| 97 | |
| 98 | type rcRunnableRequest struct { |
| 99 | cmd rcRunnableCommand |
| 100 | stateC chan rcRunnableState |
| 101 | } |
| 102 | |
| 103 | type rcRunnableCommand int |
| 104 | |
| 105 | const ( |
| 106 | rcRunnableCommandBecomeHealthy rcRunnableCommand = iota |
| 107 | rcRunnableCommandBecomeDone |
| 108 | rcRunnableCommandDie |
| 109 | rcRunnableCommandPanic |
| 110 | rcRunnableCommandState |
| 111 | ) |
| 112 | |
| 113 | type rcRunnableState int |
| 114 | |
| 115 | const ( |
| 116 | rcRunnableStateNew rcRunnableState = iota |
| 117 | rcRunnableStateHealthy |
| 118 | rcRunnableStateDone |
| 119 | ) |
| 120 | |
| 121 | func (r *rc) becomeHealthy() { |
| 122 | r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeHealthy} |
| 123 | } |
| 124 | |
| 125 | func (r *rc) becomeDone() { |
| 126 | r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeDone} |
| 127 | } |
| 128 | func (r *rc) die() { |
| 129 | r.req <- rcRunnableRequest{cmd: rcRunnableCommandDie} |
| 130 | } |
| 131 | |
| 132 | func (r *rc) panic() { |
| 133 | r.req <- rcRunnableRequest{cmd: rcRunnableCommandPanic} |
| 134 | } |
| 135 | |
| 136 | func (r *rc) state() rcRunnableState { |
| 137 | c := make(chan rcRunnableState) |
| 138 | r.req <- rcRunnableRequest{ |
| 139 | cmd: rcRunnableCommandState, |
| 140 | stateC: c, |
| 141 | } |
| 142 | return <-c |
| 143 | } |
| 144 | |
| 145 | func (r *rc) waitState(s rcRunnableState) { |
| Serge Bazanski | 216fe7b | 2021-05-21 18:36:16 +0200 | [diff] [blame] | 146 | // This is poll based. Making it non-poll based would make the RC runnable |
| 147 | // logic a bit more complex for little gain. |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 148 | for { |
| 149 | got := r.state() |
| 150 | if got == s { |
| 151 | return |
| 152 | } |
| 153 | time.Sleep(10 * time.Millisecond) |
| 154 | } |
| 155 | } |
| 156 | |
| 157 | func newRC() *rc { |
| 158 | return &rc{ |
| 159 | req: make(chan rcRunnableRequest), |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | // Remote Controlled Runnable |
| 164 | func (r *rc) runnable() Runnable { |
| 165 | return func(ctx context.Context) error { |
| 166 | state := rcRunnableStateNew |
| 167 | |
| 168 | for { |
| 169 | select { |
| 170 | case <-ctx.Done(): |
| 171 | return ctx.Err() |
| 172 | case r := <-r.req: |
| 173 | switch r.cmd { |
| 174 | case rcRunnableCommandBecomeHealthy: |
| 175 | Signal(ctx, SignalHealthy) |
| 176 | state = rcRunnableStateHealthy |
| 177 | case rcRunnableCommandBecomeDone: |
| 178 | Signal(ctx, SignalDone) |
| 179 | state = rcRunnableStateDone |
| 180 | case rcRunnableCommandDie: |
| 181 | return fmt.Errorf("died on request") |
| 182 | case rcRunnableCommandPanic: |
| 183 | panic("at the disco") |
| 184 | case rcRunnableCommandState: |
| 185 | r.stateC <- state |
| 186 | } |
| 187 | } |
| 188 | } |
| 189 | } |
| 190 | } |
| 191 | |
| 192 | func TestSimple(t *testing.T) { |
| 193 | h1 := make(chan struct{}) |
| 194 | d1 := make(chan struct{}) |
| 195 | h2 := make(chan struct{}) |
| 196 | d2 := make(chan struct{}) |
| 197 | |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 198 | ctx, ctxC := context.WithCancel(context.Background()) |
| 199 | defer ctxC() |
| Serge Bazanski | c735967 | 2020-10-30 16:38:57 +0100 | [diff] [blame] | 200 | s := New(ctx, func(ctx context.Context) error { |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 201 | err := RunGroup(ctx, map[string]Runnable{ |
| 202 | "one": runnableBecomesHealthy(h1, d1), |
| 203 | "two": runnableBecomesHealthy(h2, d2), |
| 204 | }) |
| 205 | if err != nil { |
| 206 | return err |
| 207 | } |
| 208 | Signal(ctx, SignalHealthy) |
| 209 | Signal(ctx, SignalDone) |
| 210 | return nil |
| Serge Bazanski | 19bb412 | 2020-05-04 17:57:50 +0200 | [diff] [blame] | 211 | }, WithPropagatePanic) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 212 | |
| 213 | // Expect both to start running. |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 214 | s.waitSettleError(ctx, t) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 215 | select { |
| 216 | case <-h1: |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 217 | default: |
| 218 | t.Fatalf("runnable 'one' didn't start") |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 219 | } |
| 220 | select { |
| 221 | case <-h2: |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 222 | default: |
| 223 | t.Fatalf("runnable 'one' didn't start") |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 224 | } |
| 225 | } |
| 226 | |
| 227 | func TestSimpleFailure(t *testing.T) { |
| 228 | h1 := make(chan struct{}) |
| 229 | d1 := make(chan struct{}) |
| 230 | two := newRC() |
| 231 | |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 232 | ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 233 | defer ctxC() |
| Serge Bazanski | c735967 | 2020-10-30 16:38:57 +0100 | [diff] [blame] | 234 | s := New(ctx, func(ctx context.Context) error { |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 235 | err := RunGroup(ctx, map[string]Runnable{ |
| 236 | "one": runnableBecomesHealthy(h1, d1), |
| 237 | "two": two.runnable(), |
| 238 | }) |
| 239 | if err != nil { |
| 240 | return err |
| 241 | } |
| 242 | Signal(ctx, SignalHealthy) |
| 243 | Signal(ctx, SignalDone) |
| 244 | return nil |
| Serge Bazanski | 19bb412 | 2020-05-04 17:57:50 +0200 | [diff] [blame] | 245 | }, WithPropagatePanic) |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 246 | s.waitSettleError(ctx, t) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 247 | |
| 248 | two.becomeHealthy() |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 249 | s.waitSettleError(ctx, t) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 250 | // Expect one to start running. |
| 251 | select { |
| 252 | case <-h1: |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 253 | default: |
| 254 | t.Fatalf("runnable 'one' didn't start") |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 255 | } |
| 256 | |
| 257 | // Kill off two, one should restart. |
| 258 | two.die() |
| Serge Bazanski | 579015a | 2021-11-18 13:20:20 +0100 | [diff] [blame] | 259 | <-d1 |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 260 | |
| 261 | // And one should start running again. |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 262 | s.waitSettleError(ctx, t) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 263 | select { |
| 264 | case <-h1: |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 265 | default: |
| 266 | t.Fatalf("runnable 'one' didn't restart") |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 267 | } |
| 268 | } |
| 269 | |
| 270 | func TestDeepFailure(t *testing.T) { |
| 271 | h1 := make(chan struct{}) |
| 272 | d1 := make(chan struct{}) |
| 273 | two := newRC() |
| 274 | |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 275 | ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 276 | defer ctxC() |
| Serge Bazanski | c735967 | 2020-10-30 16:38:57 +0100 | [diff] [blame] | 277 | s := New(ctx, func(ctx context.Context) error { |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 278 | err := RunGroup(ctx, map[string]Runnable{ |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 279 | "one": runnableSpawnsMore(h1, d1, 5), |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 280 | "two": two.runnable(), |
| 281 | }) |
| 282 | if err != nil { |
| 283 | return err |
| 284 | } |
| 285 | Signal(ctx, SignalHealthy) |
| 286 | Signal(ctx, SignalDone) |
| 287 | return nil |
| Serge Bazanski | 19bb412 | 2020-05-04 17:57:50 +0200 | [diff] [blame] | 288 | }, WithPropagatePanic) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 289 | |
| 290 | two.becomeHealthy() |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 291 | s.waitSettleError(ctx, t) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 292 | // Expect one to start running. |
| 293 | select { |
| 294 | case <-h1: |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 295 | default: |
| 296 | t.Fatalf("runnable 'one' didn't start") |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 297 | } |
| 298 | |
| 299 | // Kill off two, one should restart. |
| 300 | two.die() |
| Serge Bazanski | 579015a | 2021-11-18 13:20:20 +0100 | [diff] [blame] | 301 | <-d1 |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 302 | |
| 303 | // And one should start running again. |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 304 | s.waitSettleError(ctx, t) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 305 | select { |
| 306 | case <-h1: |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 307 | default: |
| 308 | t.Fatalf("runnable 'one' didn't restart") |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 309 | } |
| 310 | } |
| 311 | |
| 312 | func TestPanic(t *testing.T) { |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 313 | h1 := make(chan struct{}) |
| 314 | d1 := make(chan struct{}) |
| 315 | two := newRC() |
| 316 | |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 317 | ctx, ctxC := context.WithCancel(context.Background()) |
| 318 | defer ctxC() |
| Serge Bazanski | c735967 | 2020-10-30 16:38:57 +0100 | [diff] [blame] | 319 | s := New(ctx, func(ctx context.Context) error { |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 320 | err := RunGroup(ctx, map[string]Runnable{ |
| 321 | "one": runnableBecomesHealthy(h1, d1), |
| 322 | "two": two.runnable(), |
| 323 | }) |
| 324 | if err != nil { |
| 325 | return err |
| 326 | } |
| 327 | Signal(ctx, SignalHealthy) |
| 328 | Signal(ctx, SignalDone) |
| 329 | return nil |
| 330 | }) |
| 331 | |
| 332 | two.becomeHealthy() |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 333 | s.waitSettleError(ctx, t) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 334 | // Expect one to start running. |
| 335 | select { |
| 336 | case <-h1: |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 337 | default: |
| 338 | t.Fatalf("runnable 'one' didn't start") |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 339 | } |
| 340 | |
| 341 | // Kill off two, one should restart. |
| 342 | two.panic() |
| Serge Bazanski | 579015a | 2021-11-18 13:20:20 +0100 | [diff] [blame] | 343 | <-d1 |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 344 | |
| 345 | // And one should start running again. |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 346 | s.waitSettleError(ctx, t) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 347 | select { |
| 348 | case <-h1: |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 349 | default: |
| 350 | t.Fatalf("runnable 'one' didn't restart") |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 351 | } |
| 352 | } |
| 353 | |
| 354 | func TestMultipleLevelFailure(t *testing.T) { |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 355 | ctx, ctxC := context.WithCancel(context.Background()) |
| 356 | defer ctxC() |
| Serge Bazanski | c735967 | 2020-10-30 16:38:57 +0100 | [diff] [blame] | 357 | New(ctx, func(ctx context.Context) error { |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 358 | err := RunGroup(ctx, map[string]Runnable{ |
| 359 | "one": runnableSpawnsMore(nil, nil, 4), |
| 360 | "two": runnableSpawnsMore(nil, nil, 4), |
| 361 | }) |
| 362 | if err != nil { |
| 363 | return err |
| 364 | } |
| 365 | Signal(ctx, SignalHealthy) |
| 366 | Signal(ctx, SignalDone) |
| 367 | return nil |
| Serge Bazanski | 19bb412 | 2020-05-04 17:57:50 +0200 | [diff] [blame] | 368 | }, WithPropagatePanic) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 369 | } |
| 370 | |
| 371 | func TestBackoff(t *testing.T) { |
| 372 | one := newRC() |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 373 | |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 374 | ctx, ctxC := context.WithTimeout(context.Background(), 20*time.Second) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 375 | defer ctxC() |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 376 | |
| Serge Bazanski | c735967 | 2020-10-30 16:38:57 +0100 | [diff] [blame] | 377 | s := New(ctx, func(ctx context.Context) error { |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 378 | if err := Run(ctx, "one", one.runnable()); err != nil { |
| 379 | return err |
| 380 | } |
| 381 | Signal(ctx, SignalHealthy) |
| 382 | Signal(ctx, SignalDone) |
| 383 | return nil |
| Serge Bazanski | 19bb412 | 2020-05-04 17:57:50 +0200 | [diff] [blame] | 384 | }, WithPropagatePanic) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 385 | |
| 386 | one.becomeHealthy() |
| Serge Bazanski | 216fe7b | 2021-05-21 18:36:16 +0200 | [diff] [blame] | 387 | // Die a bunch of times in a row, this brings up the next exponential |
| 388 | // backoff to over a second. |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 389 | for i := 0; i < 4; i += 1 { |
| 390 | one.die() |
| 391 | one.waitState(rcRunnableStateNew) |
| 392 | } |
| Serge Bazanski | 216fe7b | 2021-05-21 18:36:16 +0200 | [diff] [blame] | 393 | // Measure how long it takes for the runnable to respawn after a number of |
| 394 | // failures |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 395 | start := time.Now() |
| 396 | one.die() |
| 397 | one.becomeHealthy() |
| 398 | one.waitState(rcRunnableStateHealthy) |
| 399 | taken := time.Since(start) |
| 400 | if taken < 1*time.Second { |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 401 | t.Errorf("Runnable took %v to restart, wanted at least a second from backoff", taken) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 402 | } |
| 403 | |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 404 | s.waitSettleError(ctx, t) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 405 | // Now that we've become healthy, die again. Becoming healthy resets the backoff. |
| 406 | start = time.Now() |
| 407 | one.die() |
| 408 | one.becomeHealthy() |
| 409 | one.waitState(rcRunnableStateHealthy) |
| 410 | taken = time.Since(start) |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 411 | if taken > 1*time.Second || taken < 100*time.Millisecond { |
| 412 | t.Errorf("Runnable took %v to restart, wanted at least 100ms from backoff and at most 1s from backoff reset", taken) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 413 | } |
| 414 | } |
| 415 | |
| Jan Schär | 6560209 | 2024-12-19 10:37:34 +0100 | [diff] [blame] | 416 | // TestCancelRestart fails a runnable, but before its restart timeout expires, |
| 417 | // also fails its parent. This should cause cancelation of the restart timeout. |
| 418 | func TestCancelRestart(t *testing.T) { |
| 419 | startedOuter := make(chan struct{}) |
| 420 | failInner := make(chan struct{}) |
| 421 | failOuter := make(chan struct{}) |
| 422 | |
| 423 | ctx, ctxC := context.WithCancel(context.Background()) |
| 424 | defer ctxC() |
| 425 | |
| 426 | New(ctx, func(ctx context.Context) error { |
| 427 | <-startedOuter |
| 428 | err := Run(ctx, "inner", func(ctx context.Context) error { |
| 429 | <-failInner |
| 430 | return fmt.Errorf("failed") |
| 431 | }) |
| 432 | if err != nil { |
| 433 | return err |
| 434 | } |
| 435 | <-failOuter |
| 436 | return fmt.Errorf("failed") |
| 437 | }, WithPropagatePanic) |
| 438 | |
| 439 | startedOuter <- struct{}{} |
| 440 | failInner <- struct{}{} |
| 441 | time.Sleep(10 * time.Millisecond) |
| 442 | // Before the inner runnable has restarted, fail the outer runnable. |
| 443 | failOuter <- struct{}{} |
| 444 | |
| 445 | start := time.Now() |
| 446 | startedOuter <- struct{}{} |
| 447 | taken := time.Since(start) |
| 448 | // With the default backoff parameters, the initial backoff time is |
| 449 | // 0.5s +- 0.25s because of randomization. If the inner restart timer is not |
| 450 | // canceled, then it takes twice as long. |
| 451 | if taken > 1*time.Second { |
| 452 | t.Errorf("Runnable took %v to restart, wanted at most 1s", taken) |
| 453 | } |
| 454 | } |
| 455 | |
| Jan Schär | 08c1c72 | 2024-12-19 12:03:17 +0100 | [diff] [blame] | 456 | // TestDoneDelay test that a node is only considered restartable once it has |
| 457 | // returned, not already when it has signaled Done. Otherwise, we can get into |
| 458 | // an inconsistent state and for example panic because the node no longer |
| 459 | // exists once the runnable returns. |
| 460 | func TestDoneDelay(t *testing.T) { |
| 461 | startedInner := make(chan struct{}) |
| 462 | failOuter := make(chan struct{}) |
| 463 | |
| 464 | ctx, ctxC := context.WithCancel(context.Background()) |
| 465 | defer ctxC() |
| 466 | |
| 467 | New(ctx, func(ctx context.Context) error { |
| 468 | err := Run(ctx, "inner", func(ctx context.Context) error { |
| 469 | Signal(ctx, SignalHealthy) |
| 470 | Signal(ctx, SignalDone) |
| 471 | <-startedInner |
| 472 | time.Sleep(10 * time.Millisecond) |
| 473 | return nil |
| 474 | }) |
| 475 | if err != nil { |
| 476 | return err |
| 477 | } |
| 478 | <-failOuter |
| 479 | return fmt.Errorf("failed") |
| 480 | }, WithPropagatePanic) |
| 481 | |
| 482 | startedInner <- struct{}{} |
| 483 | failOuter <- struct{}{} |
| 484 | time.Sleep(20 * time.Millisecond) |
| 485 | } |
| 486 | |
| Jan Schär | fce7c76 | 2024-12-19 14:07:24 +0100 | [diff] [blame] | 487 | // TestCancelDoneSibling tests that a node in state DONE is restarted if it is |
| 488 | // canceled because a sibling has died. |
| 489 | func TestCancelDoneSibling(t *testing.T) { |
| 490 | innerRunning := make(chan struct{}) |
| 491 | innerExit := make(chan struct{}) |
| 492 | failSibling := make(chan struct{}) |
| 493 | |
| 494 | ctx, ctxC := context.WithCancel(context.Background()) |
| 495 | defer ctxC() |
| 496 | |
| 497 | New(ctx, func(ctx context.Context) error { |
| 498 | err := RunGroup(ctx, map[string]Runnable{ |
| 499 | "done": func(ctx context.Context) error { |
| 500 | err := Run(ctx, "inner", func(ctx context.Context) error { |
| 501 | <-innerRunning |
| 502 | <-ctx.Done() |
| 503 | <-innerExit |
| 504 | return ctx.Err() |
| 505 | }) |
| 506 | if err != nil { |
| 507 | return err |
| 508 | } |
| 509 | Signal(ctx, SignalHealthy) |
| 510 | Signal(ctx, SignalDone) |
| 511 | return nil |
| 512 | }, |
| 513 | "sibling": func(ctx context.Context) error { |
| 514 | <-failSibling |
| 515 | return fmt.Errorf("failed") |
| 516 | }, |
| 517 | }) |
| 518 | if err != nil { |
| 519 | return err |
| 520 | } |
| 521 | Signal(ctx, SignalHealthy) |
| 522 | Signal(ctx, SignalDone) |
| 523 | return nil |
| 524 | }, WithPropagatePanic) |
| 525 | |
| 526 | innerRunning <- struct{}{} |
| 527 | failSibling <- struct{}{} |
| 528 | // The inner node should exit and start running again. |
| 529 | innerExit <- struct{}{} |
| 530 | innerRunning <- struct{}{} |
| 531 | } |
| 532 | |
| Serge Bazanski | 216fe7b | 2021-05-21 18:36:16 +0200 | [diff] [blame] | 533 | // TestResilience throws some curveballs at the supervisor - either programming |
| 534 | // errors or high load. It then ensures that another runnable is running, and |
| 535 | // that it restarts on its sibling failure. |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 536 | func TestResilience(t *testing.T) { |
| 537 | // request/response channel for testing liveness of the 'one' runnable |
| 538 | req := make(chan chan struct{}) |
| 539 | |
| 540 | // A runnable that responds on the 'req' channel. |
| 541 | one := func(ctx context.Context) error { |
| 542 | Signal(ctx, SignalHealthy) |
| 543 | for { |
| 544 | select { |
| 545 | case <-ctx.Done(): |
| 546 | return ctx.Err() |
| 547 | case r := <-req: |
| 548 | r <- struct{}{} |
| 549 | } |
| 550 | } |
| 551 | } |
| 552 | oneSibling := newRC() |
| 553 | |
| 554 | oneTest := func() { |
| Serge Bazanski | ac6b644 | 2020-05-06 19:13:43 +0200 | [diff] [blame] | 555 | timeout := time.NewTicker(1000 * time.Millisecond) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 556 | ping := make(chan struct{}) |
| 557 | req <- ping |
| 558 | select { |
| 559 | case <-ping: |
| 560 | case <-timeout.C: |
| 561 | t.Fatalf("one ping response timeout") |
| 562 | } |
| 563 | timeout.Stop() |
| 564 | } |
| 565 | |
| Serge Bazanski | 216fe7b | 2021-05-21 18:36:16 +0200 | [diff] [blame] | 566 | // A nasty runnable that calls Signal with the wrong context (this is a |
| 567 | // programming error) |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 568 | two := func(ctx context.Context) error { |
| 569 | Signal(context.TODO(), SignalHealthy) |
| 570 | return nil |
| 571 | } |
| 572 | |
| 573 | // A nasty runnable that calls Signal wrong (this is a programming error). |
| 574 | three := func(ctx context.Context) error { |
| 575 | Signal(ctx, SignalDone) |
| 576 | return nil |
| 577 | } |
| 578 | |
| 579 | // A nasty runnable that runs in a busy loop (this is a programming error). |
| 580 | four := func(ctx context.Context) error { |
| 581 | for { |
| 582 | time.Sleep(0) |
| 583 | } |
| 584 | } |
| 585 | |
| 586 | // A nasty runnable that keeps creating more runnables. |
| 587 | five := func(ctx context.Context) error { |
| 588 | i := 1 |
| 589 | for { |
| 590 | err := Run(ctx, fmt.Sprintf("r%d", i), runnableSpawnsMore(nil, nil, 2)) |
| 591 | if err != nil { |
| 592 | return err |
| 593 | } |
| 594 | |
| 595 | time.Sleep(100 * time.Millisecond) |
| 596 | i += 1 |
| 597 | } |
| 598 | } |
| 599 | |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 600 | ctx, ctxC := context.WithCancel(context.Background()) |
| 601 | defer ctxC() |
| Serge Bazanski | c735967 | 2020-10-30 16:38:57 +0100 | [diff] [blame] | 602 | New(ctx, func(ctx context.Context) error { |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 603 | RunGroup(ctx, map[string]Runnable{ |
| 604 | "one": one, |
| 605 | "oneSibling": oneSibling.runnable(), |
| 606 | }) |
| 607 | rs := map[string]Runnable{ |
| 608 | "two": two, "three": three, "four": four, "five": five, |
| 609 | } |
| 610 | for k, v := range rs { |
| 611 | if err := Run(ctx, k, v); err != nil { |
| 612 | return err |
| 613 | } |
| 614 | } |
| 615 | Signal(ctx, SignalHealthy) |
| 616 | Signal(ctx, SignalDone) |
| 617 | return nil |
| 618 | }) |
| 619 | |
| 620 | // Five rounds of letting one run, then restarting it. |
| 621 | for i := 0; i < 5; i += 1 { |
| 622 | oneSibling.becomeHealthy() |
| 623 | oneSibling.waitState(rcRunnableStateHealthy) |
| 624 | |
| 625 | // 'one' should work for at least a second. |
| 626 | deadline := time.Now().Add(1 * time.Second) |
| 627 | for { |
| 628 | if time.Now().After(deadline) { |
| 629 | break |
| 630 | } |
| 631 | |
| 632 | oneTest() |
| 633 | } |
| 634 | |
| 635 | // Killing 'oneSibling' should restart one. |
| 636 | oneSibling.panic() |
| 637 | } |
| 638 | // Make sure 'one' is still okay. |
| 639 | oneTest() |
| 640 | } |
| 641 | |
| Serge Bazanski | 26d5225 | 2022-02-07 15:57:54 +0100 | [diff] [blame] | 642 | // TestSubLoggers exercises the reserved/sub-logger functionality of runnable |
| 643 | // nodes. It ensures a sub-logger and runnable cannot have colliding names, and |
| 644 | // that logging actually works. |
| 645 | func TestSubLoggers(t *testing.T) { |
| 646 | ctx, ctxC := context.WithCancel(context.Background()) |
| 647 | defer ctxC() |
| 648 | |
| 649 | errCA := make(chan error) |
| 650 | errCB := make(chan error) |
| 651 | lt := logtree.New() |
| 652 | New(ctx, func(ctx context.Context) error { |
| 653 | err := RunGroup(ctx, map[string]Runnable{ |
| 654 | // foo will first create a sublogger, then attempt to create a |
| 655 | // colliding runnable. |
| 656 | "foo": func(ctx context.Context) error { |
| 657 | sl, err := SubLogger(ctx, "dut") |
| 658 | if err != nil { |
| 659 | errCA <- fmt.Errorf("creating sub-logger: %w", err) |
| 660 | return nil |
| 661 | } |
| 662 | sl.Infof("hello from foo.dut") |
| 663 | err = Run(ctx, "dut", runnableBecomesHealthy(nil, nil)) |
| 664 | if err == nil { |
| 665 | errCA <- fmt.Errorf("creating colliding runnable should have failed") |
| 666 | return nil |
| 667 | } |
| 668 | Signal(ctx, SignalHealthy) |
| 669 | Signal(ctx, SignalDone) |
| 670 | errCA <- nil |
| 671 | return nil |
| 672 | }, |
| 673 | }) |
| 674 | if err != nil { |
| 675 | return err |
| 676 | } |
| 677 | _, err = SubLogger(ctx, "foo") |
| 678 | if err == nil { |
| 679 | errCB <- fmt.Errorf("creating collising sub-logger should have failed") |
| 680 | return nil |
| 681 | } |
| 682 | Signal(ctx, SignalHealthy) |
| 683 | Signal(ctx, SignalDone) |
| 684 | errCB <- nil |
| 685 | return nil |
| 686 | }, WithPropagatePanic, WithExistingLogtree(lt)) |
| 687 | |
| 688 | err := <-errCA |
| 689 | if err != nil { |
| 690 | t.Fatalf("from root.foo: %v", err) |
| 691 | } |
| 692 | err = <-errCB |
| 693 | if err != nil { |
| 694 | t.Fatalf("from root: %v", err) |
| 695 | } |
| 696 | |
| 697 | // Now enure that the expected message appears in the logtree. |
| 698 | dn := logtree.DN("root.foo.dut") |
| 699 | r, err := lt.Read(dn, logtree.WithBacklog(logtree.BacklogAllAvailable)) |
| 700 | if err != nil { |
| 701 | t.Fatalf("logtree read failed: %v", err) |
| 702 | } |
| 703 | defer r.Close() |
| 704 | found := false |
| 705 | for _, e := range r.Backlog { |
| 706 | if e.DN != dn { |
| 707 | continue |
| 708 | } |
| 709 | if e.Leveled == nil { |
| 710 | continue |
| 711 | } |
| 712 | if e.Leveled.MessagesJoined() != "hello from foo.dut" { |
| 713 | continue |
| 714 | } |
| 715 | found = true |
| 716 | break |
| 717 | } |
| 718 | if !found { |
| 719 | t.Fatalf("did not find expected logline in %s", dn) |
| 720 | } |
| 721 | } |
| 722 | |
| Serge Bazanski | cf864da | 2024-07-31 11:23:34 +0000 | [diff] [blame] | 723 | func TestMetrics(t *testing.T) { |
| 724 | ctx, ctxC := context.WithCancel(context.Background()) |
| 725 | defer ctxC() |
| 726 | |
| 727 | // Build a supervision tree with 'wait'/step channels per runnable: |
| 728 | // |
| 729 | // root: wait, start one, wait, healthy |
| 730 | // one: wait, start two, crash, wait, start two, healthy, wait, done |
| 731 | // two: wait, healthy, run forever |
| 732 | // |
| 733 | // This tree allows us to exercise a few flows, like two getting canceled when |
| 734 | // one crashes, runnables returning done, runnables staying healthy, etc. |
| 735 | |
| 736 | stepRoot := make(chan struct{}) |
| 737 | stepOne := make(chan struct{}) |
| 738 | stepTwo := make(chan struct{}) |
| 739 | m := InMemoryMetrics{} |
| 740 | |
| 741 | New(ctx, func(ctx context.Context) error { |
| 742 | <-stepRoot |
| 743 | |
| 744 | attempts := 0 |
| 745 | Run(ctx, "one", func(ctx context.Context) error { |
| 746 | <-stepOne |
| 747 | Run(ctx, "two", func(ctx context.Context) error { |
| 748 | <-stepTwo |
| 749 | Signal(ctx, SignalHealthy) |
| 750 | <-ctx.Done() |
| 751 | return ctx.Err() |
| 752 | }) |
| 753 | if attempts == 0 { |
| 754 | attempts += 1 |
| 755 | return fmt.Errorf("failed") |
| 756 | } |
| 757 | Signal(ctx, SignalHealthy) |
| 758 | <-stepOne |
| 759 | Signal(ctx, SignalDone) |
| 760 | return nil |
| 761 | }) |
| 762 | |
| 763 | <-stepRoot |
| 764 | Signal(ctx, SignalHealthy) |
| 765 | return nil |
| 766 | }, WithPropagatePanic, WithMetrics(&m)) |
| 767 | |
| 768 | // expectDN waits a second until a given DN is at a given state and fails the |
| 769 | // test otherwise. |
| 770 | expectDN := func(dn string, state NodeState) { |
| 771 | t.Helper() |
| 772 | start := time.Now() |
| 773 | for { |
| 774 | snap := m.DNs() |
| 775 | if _, ok := snap[dn]; !ok { |
| 776 | if time.Since(start) > time.Second { |
| 777 | t.Fatalf("No DN %q", dn) |
| 778 | } else { |
| 779 | time.Sleep(100 * time.Millisecond) |
| 780 | continue |
| 781 | } |
| 782 | } |
| 783 | if want, got := state, snap[dn].State; want != got { |
| 784 | if time.Since(start) > time.Second { |
| 785 | t.Fatalf("Expected %q to be %s, got %s", dn, want, got) |
| 786 | } else { |
| 787 | time.Sleep(100 * time.Millisecond) |
| 788 | continue |
| 789 | } |
| 790 | } |
| 791 | break |
| 792 | } |
| 793 | } |
| 794 | |
| 795 | // Make progress thorugh the runnable tree and check expected states. |
| 796 | |
| 797 | expectDN("root", NodeStateNew) |
| 798 | |
| 799 | stepRoot <- struct{}{} |
| 800 | expectDN("root", NodeStateNew) |
| 801 | expectDN("root.one", NodeStateNew) |
| 802 | |
| 803 | stepOne <- struct{}{} |
| 804 | stepTwo <- struct{}{} |
| 805 | expectDN("root", NodeStateNew) |
| 806 | expectDN("root.one", NodeStateDead) |
| 807 | expectDN("root.one.two", NodeStateCanceled) |
| 808 | |
| 809 | stepOne <- struct{}{} |
| 810 | expectDN("root", NodeStateNew) |
| 811 | expectDN("root.one", NodeStateHealthy) |
| 812 | expectDN("root.one.two", NodeStateNew) |
| 813 | |
| 814 | stepOne <- struct{}{} |
| 815 | expectDN("root", NodeStateNew) |
| 816 | expectDN("root.one", NodeStateDone) |
| 817 | expectDN("root.one.two", NodeStateNew) |
| 818 | |
| 819 | stepTwo <- struct{}{} |
| 820 | expectDN("root", NodeStateNew) |
| 821 | expectDN("root.one", NodeStateDone) |
| 822 | expectDN("root.one.two", NodeStateHealthy) |
| 823 | } |
| 824 | |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 825 | func ExampleNew() { |
| 826 | // Minimal runnable that is immediately done. |
| 827 | childC := make(chan struct{}) |
| 828 | child := func(ctx context.Context) error { |
| 829 | Signal(ctx, SignalHealthy) |
| 830 | close(childC) |
| 831 | Signal(ctx, SignalDone) |
| 832 | return nil |
| 833 | } |
| 834 | |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 835 | // Start a supervision tree with a root runnable. |
| 836 | ctx, ctxC := context.WithCancel(context.Background()) |
| 837 | defer ctxC() |
| Serge Bazanski | c735967 | 2020-10-30 16:38:57 +0100 | [diff] [blame] | 838 | New(ctx, func(ctx context.Context) error { |
| Serge Bazanski | 9c09c4e | 2020-03-24 13:58:01 +0100 | [diff] [blame] | 839 | err := Run(ctx, "child", child) |
| 840 | if err != nil { |
| 841 | return fmt.Errorf("could not run 'child': %w", err) |
| 842 | } |
| 843 | Signal(ctx, SignalHealthy) |
| 844 | |
| 845 | t := time.NewTicker(time.Second) |
| 846 | defer t.Stop() |
| 847 | |
| 848 | // Do something in the background, and exit on context cancel. |
| 849 | for { |
| 850 | select { |
| 851 | case <-t.C: |
| 852 | fmt.Printf("tick!") |
| 853 | case <-ctx.Done(): |
| 854 | return ctx.Err() |
| 855 | } |
| 856 | } |
| 857 | }) |
| 858 | |
| 859 | // root.child will close this channel. |
| 860 | <-childC |
| 861 | } |