blob: 832ec7332b0efa6434a8f460aba44fe293edbee4 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +01002// SPDX-License-Identifier: Apache-2.0
Serge Bazanski9c09c4e2020-03-24 13:58:01 +01003
4package supervisor
5
6import (
7 "context"
8 "errors"
Serge Bazanski9c09c4e2020-03-24 13:58:01 +01009 "fmt"
10 "runtime/debug"
Serge Bazanskiec19b602022-03-09 20:41:31 +010011 "sort"
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010012 "time"
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010013)
14
Serge Bazanski216fe7b2021-05-21 18:36:16 +020015// The processor maintains runnable goroutines - ie., when requested will start
Jan Schäraa6b42a2024-12-18 18:03:26 +010016// one, and then once it exits, it will record the result and act accordingly.
Serge Bazanski216fe7b2021-05-21 18:36:16 +020017// It is also responsible for detecting and acting upon supervision subtrees
18// that need to be restarted after death (via a 'GC' process)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010019
Serge Bazanski216fe7b2021-05-21 18:36:16 +020020// processorRequest is a request for the processor. Only one of the fields can
21// be set.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010022type processorRequest struct {
Serge Bazanskiac6b6442020-05-06 19:13:43 +020023 schedule *processorRequestSchedule
24 died *processorRequestDied
25 waitSettled *processorRequestWaitSettled
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010026}
27
28// processorRequestSchedule requests that a given node's runnable be started.
29type processorRequestSchedule struct {
30 dn string
31}
32
Serge Bazanski216fe7b2021-05-21 18:36:16 +020033// processorRequestDied is a signal from a runnable goroutine that the runnable
34// has died.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010035type processorRequestDied struct {
36 dn string
37 err error
38}
39
Serge Bazanskiac6b6442020-05-06 19:13:43 +020040type processorRequestWaitSettled struct {
41 waiter chan struct{}
42}
43
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010044// processor is the main processing loop.
45func (s *supervisor) processor(ctx context.Context) {
46 s.ilogger.Info("supervisor processor started")
47
Serge Bazanskiac6b6442020-05-06 19:13:43 +020048 // Waiters waiting for the GC to be settled.
49 var waiters []chan struct{}
50
Serge Bazanski216fe7b2021-05-21 18:36:16 +020051 // The GC will run every millisecond if needed. Any time the processor
52 // requests a change in the supervision tree (ie a death or a new runnable)
53 // it will mark the state as dirty and run the GC on the next millisecond
54 // cycle.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010055 gc := time.NewTicker(1 * time.Millisecond)
56 defer gc.Stop()
57 clean := true
58
Serge Bazanskiac6b6442020-05-06 19:13:43 +020059 // How long has the GC been clean. This is used to notify 'settled' waiters.
60 cleanCycles := 0
61
62 markDirty := func() {
63 clean = false
64 cleanCycles = 0
65 }
66
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010067 for {
68 select {
69 case <-ctx.Done():
Serge Bazanskic7359672020-10-30 16:38:57 +010070 s.ilogger.Infof("supervisor processor exiting: %v", ctx.Err())
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010071 s.processKill()
Serge Bazanskiec19b602022-03-09 20:41:31 +010072 s.ilogger.Info("supervisor exited, starting liquidator to clean up remaining runnables...")
73 go s.liquidator()
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010074 return
75 case <-gc.C:
76 if !clean {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010077 s.processGC()
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010078 }
79 clean = true
Serge Bazanskiac6b6442020-05-06 19:13:43 +020080 cleanCycles += 1
81
Serge Bazanski216fe7b2021-05-21 18:36:16 +020082 // This threshold is somewhat arbitrary. It's a balance between
83 // test speed and test reliability.
Serge Bazanskiac6b6442020-05-06 19:13:43 +020084 if cleanCycles > 50 {
85 for _, w := range waiters {
86 close(w)
87 }
88 waiters = nil
89 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010090 case r := <-s.pReq:
91 switch {
92 case r.schedule != nil:
93 s.processSchedule(r.schedule)
Serge Bazanskiac6b6442020-05-06 19:13:43 +020094 markDirty()
Serge Bazanski9c09c4e2020-03-24 13:58:01 +010095 case r.died != nil:
96 s.processDied(r.died)
Serge Bazanskiac6b6442020-05-06 19:13:43 +020097 markDirty()
98 case r.waitSettled != nil:
99 waiters = append(waiters, r.waitSettled.waiter)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100100 default:
101 panic(fmt.Errorf("unhandled request %+v", r))
102 }
103 }
104 }
105}
106
Serge Bazanskiec19b602022-03-09 20:41:31 +0100107// The liquidator is a context-free goroutine which the supervisor starts after
108// its context has been canceled. Its job is to take over listening on the
109// processing channels that the supervisor processor would usually listen on,
110// and implement the minimum amount of logic required to mark existing runnables
111// as DEAD.
112//
113// It exits when all runnables have exited one way or another, and the
114// supervision tree is well and truly dead. This will also be reflected by
115// liveRunnables returning an empty list.
116func (s *supervisor) liquidator() {
117 for {
Tim Windelschmidt931b3a32024-04-18 23:39:20 +0200118 r := <-s.pReq
119 switch {
120 case r.schedule != nil:
121 s.ilogger.Infof("liquidator: refusing to schedule %s", r.schedule.dn)
122 s.mu.Lock()
123 n := s.nodeByDN(r.schedule.dn)
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000124 n.state = NodeStateDead
Tim Windelschmidt931b3a32024-04-18 23:39:20 +0200125 s.mu.Unlock()
126 case r.died != nil:
127 s.ilogger.Infof("liquidator: %s exited", r.died.dn)
128 s.mu.Lock()
129 n := s.nodeByDN(r.died.dn)
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000130 n.state = NodeStateDead
Tim Windelschmidt931b3a32024-04-18 23:39:20 +0200131 s.mu.Unlock()
Serge Bazanskiec19b602022-03-09 20:41:31 +0100132 }
133 live := s.liveRunnables()
134 if len(live) == 0 {
135 s.ilogger.Infof("liquidator: complete, all runnables dead or done")
136 return
137 }
138 }
139}
140
Jan Schäraa6b42a2024-12-18 18:03:26 +0100141// liveRunnables returns a list of runnable DNs that aren't DONE/DEAD/CANCELED.
142// This is used by the liquidator to figure out when its job is done, and by the
Serge Bazanskiec19b602022-03-09 20:41:31 +0100143// TestHarness to know when to unblock the test cleanup function.
144func (s *supervisor) liveRunnables() []string {
145 s.mu.RLock()
146 defer s.mu.RUnlock()
147
Jan Schäraa6b42a2024-12-18 18:03:26 +0100148 // DFS through supervision tree, making note of live (non-DONE/DEAD/CANCELED
149 // runnables).
Serge Bazanskiec19b602022-03-09 20:41:31 +0100150 var live []string
151 seen := make(map[string]bool)
152 q := []*node{s.root}
153 for {
154 if len(q) == 0 {
155 break
156 }
157
158 // Pop from DFS queue.
159 el := q[0]
160 q = q[1:]
161
162 // Skip already visited runnables (this shouldn't happen because the supervision
163 // tree is, well, a tree - but better stay safe than get stuck in a loop).
164 eldn := el.dn()
165 if seen[eldn] {
166 continue
167 }
168 seen[eldn] = true
169
Jan Schäraa6b42a2024-12-18 18:03:26 +0100170 if el.state != NodeStateDead && el.state != NodeStateDone && el.state != NodeStateCanceled {
Serge Bazanskiec19b602022-03-09 20:41:31 +0100171 live = append(live, eldn)
172 }
173
174 // Recurse.
175 for _, child := range el.children {
176 q = append(q, child)
177 }
178 }
179
180 sort.Strings(live)
181 return live
182}
183
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200184// processKill cancels all nodes in the supervision tree. This is only called
185// right before exiting the processor, so they do not get automatically
186// restarted.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100187func (s *supervisor) processKill() {
188 s.mu.Lock()
189 defer s.mu.Unlock()
190
191 // Gather all context cancel functions.
192 var cancels []func()
193 queue := []*node{s.root}
194 for {
195 if len(queue) == 0 {
196 break
197 }
198
199 cur := queue[0]
200 queue = queue[1:]
201
202 cancels = append(cancels, cur.ctxC)
203 for _, c := range cur.children {
204 queue = append(queue, c)
205 }
206 }
207
208 // Call all context cancels.
209 for _, c := range cancels {
210 c()
211 }
212}
213
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200214// processSchedule starts a node's runnable in a goroutine and records its
215// output once it's done.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100216func (s *supervisor) processSchedule(r *processorRequestSchedule) {
217 s.mu.Lock()
218 defer s.mu.Unlock()
219
220 n := s.nodeByDN(r.dn)
Serge Bazanskicf864da2024-07-31 11:23:34 +0000221 if n.state != NodeStateNew {
222 panic("programming error: scheduled node not new")
223 }
224 s.metrics.NotifyNodeState(r.dn, n.state)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100225 go func() {
Serge Bazanski19bb4122020-05-04 17:57:50 +0200226 if !s.propagatePanic {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100227 defer func() {
228 if rec := recover(); rec != nil {
229 s.pReq <- &processorRequest{
230 died: &processorRequestDied{
231 dn: r.dn,
232 err: fmt.Errorf("panic: %v, stacktrace: %s", rec, string(debug.Stack())),
233 },
234 }
235 }
236 }()
237 }
238
239 res := n.runnable(n.ctx)
240
241 s.pReq <- &processorRequest{
242 died: &processorRequestDied{
243 dn: r.dn,
244 err: res,
245 },
246 }
247 }()
248}
249
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200250// processDied records the result from a runnable goroutine, and updates its
251// node state accordingly. If the result is a death and not an expected exit,
252// related nodes (ie. children and group siblings) are canceled accordingly.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100253func (s *supervisor) processDied(r *processorRequestDied) {
254 s.mu.Lock()
255 defer s.mu.Unlock()
256
257 // Okay, so a Runnable has quit. What now?
258 n := s.nodeByDN(r.dn)
259 ctx := n.ctx
260
Jan Schär08c1c722024-12-19 12:03:17 +0100261 // Simple case: it has signaled Done and quit with no error.
262 if n.signaledDone && r.err == nil {
263 // Mark the node as DONE.
264 n.state = NodeStateDone
Serge Bazanskicf864da2024-07-31 11:23:34 +0000265 s.metrics.NotifyNodeState(r.dn, n.state)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100266 return
267 }
268
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200269 // Simple case: the context was canceled and the returned error is the
270 // context error.
Tim Windelschmidt47d03442024-04-23 15:08:44 +0200271 if r.err != nil && ctx.Err() != nil && errors.Is(r.err, ctx.Err()) {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100272 // Mark the node as canceled successfully.
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000273 n.state = NodeStateCanceled
Serge Bazanskicf864da2024-07-31 11:23:34 +0000274 s.metrics.NotifyNodeState(r.dn, n.state)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100275 return
276 }
277
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200278 // Otherwise, the Runnable should not have died or quit. Handle
279 // accordingly.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100280 err := r.err
281 // A lack of returned error is also an error.
282 if err == nil {
Serge Bazanski0164c712023-03-16 17:54:07 +0100283 err = fmt.Errorf("returned nil when %s", n.state)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100284 }
285
Serge Bazanski0164c712023-03-16 17:54:07 +0100286 s.ilogger.Errorf("%s: %v", n.dn(), err)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100287 // Mark as dead.
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000288 n.state = NodeStateDead
Serge Bazanskicf864da2024-07-31 11:23:34 +0000289 s.metrics.NotifyNodeState(r.dn, n.state)
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100290
291 // Cancel that node's context, just in case something still depends on it.
292 n.ctxC()
293
294 // Cancel all siblings.
295 if n.parent != nil {
Tim Windelschmidt6b6428d2024-04-11 01:35:41 +0200296 for name := range n.parent.groupSiblings(n.name) {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100297 if name == n.name {
298 continue
299 }
300 sibling := n.parent.children[name]
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200301 // TODO(q3k): does this need to run in a goroutine, ie. can a
302 // context cancel block?
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100303 sibling.ctxC()
304 }
305 }
306}
307
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200308// processGC runs the GC process. It's not really Garbage Collection, as in, it
309// doesn't remove unnecessary tree nodes - but it does find nodes that need to
310// be restarted, find the subset that can and then schedules them for running.
311// As such, it's less of a Garbage Collector and more of a Necromancer.
312// However, GC is a friendlier name.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100313func (s *supervisor) processGC() {
314 s.mu.Lock()
315 defer s.mu.Unlock()
316
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200317 // The 'GC' serves is the main business logic of the supervision tree. It
318 // traverses a locked tree and tries to find subtrees that must be
319 // restarted (because of a DEAD/CANCELED runnable). It then finds which of
320 // these subtrees that should be restarted can be restarted, ie. which ones
321 // are fully recursively DEAD/CANCELED. It also finds the smallest set of
322 // largest subtrees that can be restarted, ie. if there's multiple DEAD
323 // runnables that can be restarted at once, it will do so.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100324
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100325 // Phase one: Find all leaves.
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200326 // This is a simple DFS that finds all the leaves of the tree, ie all nodes
327 // that do not have children nodes.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100328 leaves := make(map[string]bool)
329 queue := []*node{s.root}
330 for {
331 if len(queue) == 0 {
332 break
333 }
334 cur := queue[0]
335 queue = queue[1:]
336
337 for _, c := range cur.children {
338 queue = append([]*node{c}, queue...)
339 }
340
341 if len(cur.children) == 0 {
342 leaves[cur.dn()] = true
343 }
344 }
345
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200346 // Phase two: traverse tree from node to root and make note of all subtrees
347 // that can be restarted.
348 // A subtree is restartable/ready iff every node in that subtree is either
349 // CANCELED, DEAD or DONE. Such a 'ready' subtree can be restarted by the
350 // supervisor if needed.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100351
352 // DNs that we already visited.
353 visited := make(map[string]bool)
354 // DNs whose subtrees are ready to be restarted.
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200355 // These are all subtrees recursively - ie., root.a.a and root.a will both
356 // be marked here.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100357 ready := make(map[string]bool)
358
359 // We build a queue of nodes to visit, starting from the leaves.
360 queue = []*node{}
Tim Windelschmidt6b6428d2024-04-11 01:35:41 +0200361 for l := range leaves {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100362 queue = append(queue, s.nodeByDN(l))
363 }
364
365 for {
366 if len(queue) == 0 {
367 break
368 }
369
370 cur := queue[0]
371 curDn := cur.dn()
372
373 queue = queue[1:]
374
375 // Do we have a decision about our children?
376 allVisited := true
377 for _, c := range cur.children {
378 if !visited[c.dn()] {
379 allVisited = false
380 break
381 }
382 }
383
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200384 // If no decision about children is available, it means we ended up in
385 // this subtree through some shorter path of a shorter/lower-order
386 // leaf. There is a path to a leaf that's longer than the one that
387 // caused this node to be enqueued. Easy solution: just push back the
388 // current element and retry later.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100389 if !allVisited {
390 // Push back to queue and wait for a decision later.
391 queue = append(queue, cur)
392 continue
393 }
394
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200395 // All children have been visited and we have an idea about whether
396 // they're ready/restartable. All of the node's children must be
397 // restartable in order for this node to be restartable.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100398 childrenReady := true
Serge Bazanskiba7bf7d2021-10-29 16:59:00 +0200399 var childrenNotReady []string
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100400 for _, c := range cur.children {
401 if !ready[c.dn()] {
Serge Bazanskiba7bf7d2021-10-29 16:59:00 +0200402 childrenNotReady = append(childrenNotReady, c.dn())
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100403 childrenReady = false
404 break
405 }
406 }
407
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200408 // In addition to children, the node itself must be restartable (ie.
409 // DONE, DEAD or CANCELED).
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100410 curReady := false
411 switch cur.state {
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000412 case NodeStateDone:
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100413 curReady = true
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000414 case NodeStateCanceled:
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100415 curReady = true
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000416 case NodeStateDead:
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100417 curReady = true
Tim Windelschmidt9b2c1562024-04-11 01:39:25 +0200418 default:
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100419 }
420
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000421 if cur.state == NodeStateDead && !childrenReady {
Serge Bazanskiba7bf7d2021-10-29 16:59:00 +0200422 s.ilogger.Warningf("Not restarting %s: children not ready to be restarted: %v", curDn, childrenNotReady)
423 }
424
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200425 // Note down that we have an opinion on this node, and note that
426 // opinion down.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100427 visited[curDn] = true
428 ready[curDn] = childrenReady && curReady
429
430 // Now we can also enqueue the parent of this node for processing.
431 if cur.parent != nil && !visited[cur.parent.dn()] {
432 queue = append(queue, cur.parent)
433 }
434 }
435
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200436 // Phase 3: traverse tree from root to find largest subtrees that need to
437 // be restarted and are ready to be restarted.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100438
439 // All DNs that need to be restarted by the GC process.
440 want := make(map[string]bool)
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200441 // All DNs that need to be restarted and can be restarted by the GC process
442 // - a subset of 'want' DNs.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100443 can := make(map[string]bool)
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200444 // The set difference between 'want' and 'can' are all nodes that should be
445 // restarted but can't yet (ie. because a child is still in the process of
446 // being canceled).
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100447
448 // DFS from root.
449 queue = []*node{s.root}
450 for {
451 if len(queue) == 0 {
452 break
453 }
454
455 cur := queue[0]
456 queue = queue[1:]
457
Jan Schärfce7c762024-12-19 14:07:24 +0100458 // If this node's context is canceled and it has exited, it should be
459 // restarted.
460 exited := cur.state == NodeStateDead || cur.state == NodeStateCanceled || cur.state == NodeStateDone
461 if cur.ctx.Err() != nil && exited {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100462 want[cur.dn()] = true
463 }
464
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200465 // If it should be restarted and is ready to be restarted...
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100466 if want[cur.dn()] && ready[cur.dn()] {
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200467 // And its parent context is valid (ie hasn't been canceled), mark
468 // it as restartable.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200469 if cur.parent == nil || cur.parent.ctx.Err() == nil {
470 can[cur.dn()] = true
471 continue
472 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100473 }
474
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200475 // Otherwise, traverse further down the tree to see if something else
476 // needs to be done.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100477 for _, c := range cur.children {
478 queue = append(queue, c)
479 }
480 }
481
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100482 // Reinitialize and reschedule all subtrees
Tim Windelschmidt6b6428d2024-04-11 01:35:41 +0200483 for dn := range can {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100484 n := s.nodeByDN(dn)
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200485
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200486 // Only back off when the node unexpectedly died - not when it got
487 // canceled.
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200488 bo := time.Duration(0)
Serge Bazanskieca8ee32024-07-30 14:32:19 +0000489 if n.state == NodeStateDead {
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100490 bo = n.bo.NextBackOff()
491 }
Serge Bazanskiac6b6442020-05-06 19:13:43 +0200492
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200493 // Prepare node for rescheduling - remove its children, reset its state
494 // to new.
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100495 n.reset()
Serge Bazanskic7359672020-10-30 16:38:57 +0100496 s.ilogger.Infof("rescheduling supervised node %s with backoff %s", dn, bo.String())
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100497
498 // Reschedule node runnable to run after backoff.
499 go func(n *node, bo time.Duration) {
Jan Schär65602092024-12-19 10:37:34 +0100500 select {
501 case <-time.After(bo):
502 s.pReq <- &processorRequest{
503 schedule: &processorRequestSchedule{dn: n.dn()},
504 }
505 case <-n.ctx.Done():
506 s.pReq <- &processorRequest{
507 died: &processorRequestDied{
508 dn: n.dn(),
509 err: n.ctx.Err(),
510 },
511 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100512 }
513 }(n, bo)
514 }
Serge Bazanski9c09c4e2020-03-24 13:58:01 +0100515}