blob: 667b2ab2161b6448bd036db1d2eaa6836bc7e24d [file] [log] [blame]
Serge Bazanski9c09c4e2020-03-24 13:58:01 +01001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package supervisor
18
19import (
20 "context"
21 "errors"
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010022 "fmt"
23 "runtime/debug"
Serge Bazanskiec19b602022-03-09 20:41:31 +010024 "sort"
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010025 "time"
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010026)
27
Serge Bazanski216fe7b2021-05-21 18:36:16 +020028// The processor maintains runnable goroutines - ie., when requested will start
Jan Schäraa6b42a2024-12-18 18:03:26 +010029// one, and then once it exits, it will record the result and act accordingly.
Serge Bazanski216fe7b2021-05-21 18:36:16 +020030// It is also responsible for detecting and acting upon supervision subtrees
31// that need to be restarted after death (via a 'GC' process)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010032
Serge Bazanski216fe7b2021-05-21 18:36:16 +020033// processorRequest is a request for the processor. Only one of the fields can
34// be set.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010035type processorRequest struct {
Serge Bazanskiac6b6442020-05-06 19:13:43 +020036 schedule *processorRequestSchedule
37 died *processorRequestDied
38 waitSettled *processorRequestWaitSettled
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010039}
40
41// processorRequestSchedule requests that a given node's runnable be started.
42type processorRequestSchedule struct {
43 dn string
44}
45
Serge Bazanski216fe7b2021-05-21 18:36:16 +020046// processorRequestDied is a signal from a runnable goroutine that the runnable
47// has died.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010048type processorRequestDied struct {
49 dn string
50 err error
51}
52
Serge Bazanskiac6b6442020-05-06 19:13:43 +020053type processorRequestWaitSettled struct {
54 waiter chan struct{}
55}
56
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010057// processor is the main processing loop.
58func (s *supervisor) processor(ctx context.Context) {
59 s.ilogger.Info("supervisor processor started")
60
Serge Bazanskiac6b6442020-05-06 19:13:43 +020061 // Waiters waiting for the GC to be settled.
62 var waiters []chan struct{}
63
Serge Bazanski216fe7b2021-05-21 18:36:16 +020064 // The GC will run every millisecond if needed. Any time the processor
65 // requests a change in the supervision tree (ie a death or a new runnable)
66 // it will mark the state as dirty and run the GC on the next millisecond
67 // cycle.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010068 gc := time.NewTicker(1 * time.Millisecond)
69 defer gc.Stop()
70 clean := true
71
Serge Bazanskiac6b6442020-05-06 19:13:43 +020072 // How long has the GC been clean. This is used to notify 'settled' waiters.
73 cleanCycles := 0
74
75 markDirty := func() {
76 clean = false
77 cleanCycles = 0
78 }
79
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010080 for {
81 select {
82 case <-ctx.Done():
Serge Bazanskic7359672020-10-30 16:38:57 +010083 s.ilogger.Infof("supervisor processor exiting: %v", ctx.Err())
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010084 s.processKill()
Serge Bazanskiec19b602022-03-09 20:41:31 +010085 s.ilogger.Info("supervisor exited, starting liquidator to clean up remaining runnables...")
86 go s.liquidator()
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010087 return
88 case <-gc.C:
89 if !clean {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010090 s.processGC()
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010091 }
92 clean = true
Serge Bazanskiac6b6442020-05-06 19:13:43 +020093 cleanCycles += 1
94
Serge Bazanski216fe7b2021-05-21 18:36:16 +020095 // This threshold is somewhat arbitrary. It's a balance between
96 // test speed and test reliability.
Serge Bazanskiac6b6442020-05-06 19:13:43 +020097 if cleanCycles > 50 {
98 for _, w := range waiters {
99 close(w)
100 }
101 waiters = nil
102 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100103 case r := <-s.pReq:
104 switch {
105 case r.schedule != nil:
106 s.processSchedule(r.schedule)
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200107 markDirty()
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100108 case r.died != nil:
109 s.processDied(r.died)
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200110 markDirty()
111 case r.waitSettled != nil:
112 waiters = append(waiters, r.waitSettled.waiter)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100113 default:
114 panic(fmt.Errorf("unhandled request %+v", r))
115 }
116 }
117 }
118}
119
Serge Bazanskiec19b602022-03-09 20:41:31 +0100120// The liquidator is a context-free goroutine which the supervisor starts after
121// its context has been canceled. Its job is to take over listening on the
122// processing channels that the supervisor processor would usually listen on,
123// and implement the minimum amount of logic required to mark existing runnables
124// as DEAD.
125//
126// It exits when all runnables have exited one way or another, and the
127// supervision tree is well and truly dead. This will also be reflected by
128// liveRunnables returning an empty list.
129func (s *supervisor) liquidator() {
130 for {
Tim Windelschmidt931b3a32024-04-18 23:39:20 +0200131 r := <-s.pReq
132 switch {
133 case r.schedule != nil:
134 s.ilogger.Infof("liquidator: refusing to schedule %s", r.schedule.dn)
135 s.mu.Lock()
136 n := s.nodeByDN(r.schedule.dn)
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000137 n.state = NodeStateDead
Tim Windelschmidt931b3a32024-04-18 23:39:20 +0200138 s.mu.Unlock()
139 case r.died != nil:
140 s.ilogger.Infof("liquidator: %s exited", r.died.dn)
141 s.mu.Lock()
142 n := s.nodeByDN(r.died.dn)
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000143 n.state = NodeStateDead
Tim Windelschmidt931b3a32024-04-18 23:39:20 +0200144 s.mu.Unlock()
Serge Bazanskiec19b602022-03-09 20:41:31 +0100145 }
146 live := s.liveRunnables()
147 if len(live) == 0 {
148 s.ilogger.Infof("liquidator: complete, all runnables dead or done")
149 return
150 }
151 }
152}
153
Jan Schäraa6b42a2024-12-18 18:03:26 +0100154// liveRunnables returns a list of runnable DNs that aren't DONE/DEAD/CANCELED.
155// This is used by the liquidator to figure out when its job is done, and by the
Serge Bazanskiec19b602022-03-09 20:41:31 +0100156// TestHarness to know when to unblock the test cleanup function.
157func (s *supervisor) liveRunnables() []string {
158 s.mu.RLock()
159 defer s.mu.RUnlock()
160
Jan Schäraa6b42a2024-12-18 18:03:26 +0100161 // DFS through supervision tree, making note of live (non-DONE/DEAD/CANCELED
162 // runnables).
Serge Bazanskiec19b602022-03-09 20:41:31 +0100163 var live []string
164 seen := make(map[string]bool)
165 q := []*node{s.root}
166 for {
167 if len(q) == 0 {
168 break
169 }
170
171 // Pop from DFS queue.
172 el := q[0]
173 q = q[1:]
174
175 // Skip already visited runnables (this shouldn't happen because the supervision
176 // tree is, well, a tree - but better stay safe than get stuck in a loop).
177 eldn := el.dn()
178 if seen[eldn] {
179 continue
180 }
181 seen[eldn] = true
182
Jan Schäraa6b42a2024-12-18 18:03:26 +0100183 if el.state != NodeStateDead && el.state != NodeStateDone && el.state != NodeStateCanceled {
Serge Bazanskiec19b602022-03-09 20:41:31 +0100184 live = append(live, eldn)
185 }
186
187 // Recurse.
188 for _, child := range el.children {
189 q = append(q, child)
190 }
191 }
192
193 sort.Strings(live)
194 return live
195}
196
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200197// processKill cancels all nodes in the supervision tree. This is only called
198// right before exiting the processor, so they do not get automatically
199// restarted.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100200func (s *supervisor) processKill() {
201 s.mu.Lock()
202 defer s.mu.Unlock()
203
204 // Gather all context cancel functions.
205 var cancels []func()
206 queue := []*node{s.root}
207 for {
208 if len(queue) == 0 {
209 break
210 }
211
212 cur := queue[0]
213 queue = queue[1:]
214
215 cancels = append(cancels, cur.ctxC)
216 for _, c := range cur.children {
217 queue = append(queue, c)
218 }
219 }
220
221 // Call all context cancels.
222 for _, c := range cancels {
223 c()
224 }
225}
226
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200227// processSchedule starts a node's runnable in a goroutine and records its
228// output once it's done.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100229func (s *supervisor) processSchedule(r *processorRequestSchedule) {
230 s.mu.Lock()
231 defer s.mu.Unlock()
232
233 n := s.nodeByDN(r.dn)
Serge Bazanskicf864da2024-07-31 11:23:34 +0000234 if n.state != NodeStateNew {
235 panic("programming error: scheduled node not new")
236 }
237 s.metrics.NotifyNodeState(r.dn, n.state)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100238 go func() {
Serge Bazanski19bb4122020-05-04 17:57:50 +0200239 if !s.propagatePanic {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100240 defer func() {
241 if rec := recover(); rec != nil {
242 s.pReq <- &processorRequest{
243 died: &processorRequestDied{
244 dn: r.dn,
245 err: fmt.Errorf("panic: %v, stacktrace: %s", rec, string(debug.Stack())),
246 },
247 }
248 }
249 }()
250 }
251
252 res := n.runnable(n.ctx)
253
254 s.pReq <- &processorRequest{
255 died: &processorRequestDied{
256 dn: r.dn,
257 err: res,
258 },
259 }
260 }()
261}
262
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200263// processDied records the result from a runnable goroutine, and updates its
264// node state accordingly. If the result is a death and not an expected exit,
265// related nodes (ie. children and group siblings) are canceled accordingly.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100266func (s *supervisor) processDied(r *processorRequestDied) {
267 s.mu.Lock()
268 defer s.mu.Unlock()
269
270 // Okay, so a Runnable has quit. What now?
271 n := s.nodeByDN(r.dn)
272 ctx := n.ctx
273
Jan Schär08c1c722024-12-19 12:03:17 +0100274 // Simple case: it has signaled Done and quit with no error.
275 if n.signaledDone && r.err == nil {
276 // Mark the node as DONE.
277 n.state = NodeStateDone
Serge Bazanskicf864da2024-07-31 11:23:34 +0000278 s.metrics.NotifyNodeState(r.dn, n.state)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100279 return
280 }
281
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200282 // Simple case: the context was canceled and the returned error is the
283 // context error.
Tim Windelschmidt47d03442024-04-23 15:08:44 +0200284 if r.err != nil && ctx.Err() != nil && errors.Is(r.err, ctx.Err()) {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100285 // Mark the node as canceled successfully.
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000286 n.state = NodeStateCanceled
Serge Bazanskicf864da2024-07-31 11:23:34 +0000287 s.metrics.NotifyNodeState(r.dn, n.state)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100288 return
289 }
290
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200291 // Otherwise, the Runnable should not have died or quit. Handle
292 // accordingly.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100293 err := r.err
294 // A lack of returned error is also an error.
295 if err == nil {
Serge Bazanski0164c712023-03-16 17:54:07 +0100296 err = fmt.Errorf("returned nil when %s", n.state)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100297 }
298
Serge Bazanski0164c712023-03-16 17:54:07 +0100299 s.ilogger.Errorf("%s: %v", n.dn(), err)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100300 // Mark as dead.
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000301 n.state = NodeStateDead
Serge Bazanskicf864da2024-07-31 11:23:34 +0000302 s.metrics.NotifyNodeState(r.dn, n.state)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100303
304 // Cancel that node's context, just in case something still depends on it.
305 n.ctxC()
306
307 // Cancel all siblings.
308 if n.parent != nil {
Tim Windelschmidt6b6428d2024-04-11 01:35:41 +0200309 for name := range n.parent.groupSiblings(n.name) {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100310 if name == n.name {
311 continue
312 }
313 sibling := n.parent.children[name]
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200314 // TODO(q3k): does this need to run in a goroutine, ie. can a
315 // context cancel block?
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100316 sibling.ctxC()
317 }
318 }
319}
320
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200321// processGC runs the GC process. It's not really Garbage Collection, as in, it
322// doesn't remove unnecessary tree nodes - but it does find nodes that need to
323// be restarted, find the subset that can and then schedules them for running.
324// As such, it's less of a Garbage Collector and more of a Necromancer.
325// However, GC is a friendlier name.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100326func (s *supervisor) processGC() {
327 s.mu.Lock()
328 defer s.mu.Unlock()
329
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200330 // The 'GC' serves is the main business logic of the supervision tree. It
331 // traverses a locked tree and tries to find subtrees that must be
332 // restarted (because of a DEAD/CANCELED runnable). It then finds which of
333 // these subtrees that should be restarted can be restarted, ie. which ones
334 // are fully recursively DEAD/CANCELED. It also finds the smallest set of
335 // largest subtrees that can be restarted, ie. if there's multiple DEAD
336 // runnables that can be restarted at once, it will do so.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100337
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100338 // Phase one: Find all leaves.
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200339 // This is a simple DFS that finds all the leaves of the tree, ie all nodes
340 // that do not have children nodes.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100341 leaves := make(map[string]bool)
342 queue := []*node{s.root}
343 for {
344 if len(queue) == 0 {
345 break
346 }
347 cur := queue[0]
348 queue = queue[1:]
349
350 for _, c := range cur.children {
351 queue = append([]*node{c}, queue...)
352 }
353
354 if len(cur.children) == 0 {
355 leaves[cur.dn()] = true
356 }
357 }
358
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200359 // Phase two: traverse tree from node to root and make note of all subtrees
360 // that can be restarted.
361 // A subtree is restartable/ready iff every node in that subtree is either
362 // CANCELED, DEAD or DONE. Such a 'ready' subtree can be restarted by the
363 // supervisor if needed.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100364
365 // DNs that we already visited.
366 visited := make(map[string]bool)
367 // DNs whose subtrees are ready to be restarted.
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200368 // These are all subtrees recursively - ie., root.a.a and root.a will both
369 // be marked here.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100370 ready := make(map[string]bool)
371
372 // We build a queue of nodes to visit, starting from the leaves.
373 queue = []*node{}
Tim Windelschmidt6b6428d2024-04-11 01:35:41 +0200374 for l := range leaves {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100375 queue = append(queue, s.nodeByDN(l))
376 }
377
378 for {
379 if len(queue) == 0 {
380 break
381 }
382
383 cur := queue[0]
384 curDn := cur.dn()
385
386 queue = queue[1:]
387
388 // Do we have a decision about our children?
389 allVisited := true
390 for _, c := range cur.children {
391 if !visited[c.dn()] {
392 allVisited = false
393 break
394 }
395 }
396
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200397 // If no decision about children is available, it means we ended up in
398 // this subtree through some shorter path of a shorter/lower-order
399 // leaf. There is a path to a leaf that's longer than the one that
400 // caused this node to be enqueued. Easy solution: just push back the
401 // current element and retry later.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100402 if !allVisited {
403 // Push back to queue and wait for a decision later.
404 queue = append(queue, cur)
405 continue
406 }
407
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200408 // All children have been visited and we have an idea about whether
409 // they're ready/restartable. All of the node's children must be
410 // restartable in order for this node to be restartable.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100411 childrenReady := true
Serge Bazanskiba7bf7d2021-10-29 16:59:00 +0200412 var childrenNotReady []string
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100413 for _, c := range cur.children {
414 if !ready[c.dn()] {
Serge Bazanskiba7bf7d2021-10-29 16:59:00 +0200415 childrenNotReady = append(childrenNotReady, c.dn())
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100416 childrenReady = false
417 break
418 }
419 }
420
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200421 // In addition to children, the node itself must be restartable (ie.
422 // DONE, DEAD or CANCELED).
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100423 curReady := false
424 switch cur.state {
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000425 case NodeStateDone:
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100426 curReady = true
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000427 case NodeStateCanceled:
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100428 curReady = true
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000429 case NodeStateDead:
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100430 curReady = true
Tim Windelschmidt9b2c1562024-04-11 01:39:25 +0200431 default:
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100432 }
433
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000434 if cur.state == NodeStateDead && !childrenReady {
Serge Bazanskiba7bf7d2021-10-29 16:59:00 +0200435 s.ilogger.Warningf("Not restarting %s: children not ready to be restarted: %v", curDn, childrenNotReady)
436 }
437
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200438 // Note down that we have an opinion on this node, and note that
439 // opinion down.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100440 visited[curDn] = true
441 ready[curDn] = childrenReady && curReady
442
443 // Now we can also enqueue the parent of this node for processing.
444 if cur.parent != nil && !visited[cur.parent.dn()] {
445 queue = append(queue, cur.parent)
446 }
447 }
448
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200449 // Phase 3: traverse tree from root to find largest subtrees that need to
450 // be restarted and are ready to be restarted.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100451
452 // All DNs that need to be restarted by the GC process.
453 want := make(map[string]bool)
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200454 // All DNs that need to be restarted and can be restarted by the GC process
455 // - a subset of 'want' DNs.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100456 can := make(map[string]bool)
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200457 // The set difference between 'want' and 'can' are all nodes that should be
458 // restarted but can't yet (ie. because a child is still in the process of
459 // being canceled).
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100460
461 // DFS from root.
462 queue = []*node{s.root}
463 for {
464 if len(queue) == 0 {
465 break
466 }
467
468 cur := queue[0]
469 queue = queue[1:]
470
471 // If this node is DEAD or CANCELED it should be restarted.
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000472 if cur.state == NodeStateDead || cur.state == NodeStateCanceled {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100473 want[cur.dn()] = true
474 }
475
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200476 // If it should be restarted and is ready to be restarted...
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100477 if want[cur.dn()] && ready[cur.dn()] {
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200478 // And its parent context is valid (ie hasn't been canceled), mark
479 // it as restartable.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200480 if cur.parent == nil || cur.parent.ctx.Err() == nil {
481 can[cur.dn()] = true
482 continue
483 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100484 }
485
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200486 // Otherwise, traverse further down the tree to see if something else
487 // needs to be done.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100488 for _, c := range cur.children {
489 queue = append(queue, c)
490 }
491 }
492
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100493 // Reinitialize and reschedule all subtrees
Tim Windelschmidt6b6428d2024-04-11 01:35:41 +0200494 for dn := range can {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100495 n := s.nodeByDN(dn)
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200496
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200497 // Only back off when the node unexpectedly died - not when it got
498 // canceled.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200499 bo := time.Duration(0)
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000500 if n.state == NodeStateDead {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100501 bo = n.bo.NextBackOff()
502 }
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200503
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200504 // Prepare node for rescheduling - remove its children, reset its state
505 // to new.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100506 n.reset()
Serge Bazanskic7359672020-10-30 16:38:57 +0100507 s.ilogger.Infof("rescheduling supervised node %s with backoff %s", dn, bo.String())
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100508
509 // Reschedule node runnable to run after backoff.
510 go func(n *node, bo time.Duration) {
Jan Schär65602092024-12-19 10:37:34 +0100511 select {
512 case <-time.After(bo):
513 s.pReq <- &processorRequest{
514 schedule: &processorRequestSchedule{dn: n.dn()},
515 }
516 case <-n.ctx.Done():
517 s.pReq <- &processorRequest{
518 died: &processorRequestDied{
519 dn: n.dn(),
520 err: n.ctx.Err(),
521 },
522 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100523 }
524 }(n, bo)
525 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100526}