core/internal/common/supervisor: deflake

We fix several flaky tests in the supervisor framework, and one bug in the
supervisor itself.

Tests are deflaked by depending less on tight timing and instead on a
'settled' state of the supervisor, which is basically a wait to join the
supervisor when it's done with whatever work it's currently taking care
of.

Another flake, TestBackoff, is fixed by widening the allowed restart
time.

Finally, we fix a bug in the supervisor that caused it to spuriously
restart children when it would schedule them when their future parents
context was canceled.

Finally, we make some log messages less verbose.

Test Plan: Covered by existing tests that are now less flaky. This was tested with bazel test --runs_per_test=100 to not flake anymore.

X-Origin-Diff: phab/D495
GitOrigin-RevId: f92f7368708c54c59644d3e7dca03b2b5692c30a
diff --git a/core/internal/common/supervisor/BUILD.bazel b/core/internal/common/supervisor/BUILD.bazel
index c72ef04..ca8b513 100644
--- a/core/internal/common/supervisor/BUILD.bazel
+++ b/core/internal/common/supervisor/BUILD.bazel
@@ -7,6 +7,7 @@
         "supervisor_node.go",
         "supervisor_processor.go",
         "supervisor_support.go",
+        "supervisor_testhelpers.go",
     ],
     importpath = "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor",
     visibility = ["//core:__subpackages__"],
diff --git a/core/internal/common/supervisor/supervisor.go b/core/internal/common/supervisor/supervisor.go
index e839d0a..db4489e 100644
--- a/core/internal/common/supervisor/supervisor.go
+++ b/core/internal/common/supervisor/supervisor.go
@@ -111,7 +111,7 @@
 
 // New creates a new supervisor with its root running the given root runnable.
 // The given context can be used to cancel the entire supervision tree.
-func New(ctx context.Context, logger *zap.Logger, rootRunnable Runnable, opts ...SupervisorOpt) {
+func New(ctx context.Context, logger *zap.Logger, rootRunnable Runnable, opts ...SupervisorOpt) *supervisor {
 	sup := &supervisor{
 		logger:  logger,
 		ilogger: logger.Named("supervisor"),
@@ -129,4 +129,6 @@
 	sup.pReq <- &processorRequest{
 		schedule: &processorRequestSchedule{dn: "root"},
 	}
+
+	return sup
 }
diff --git a/core/internal/common/supervisor/supervisor_processor.go b/core/internal/common/supervisor/supervisor_processor.go
index 3fcaef7..f283898 100644
--- a/core/internal/common/supervisor/supervisor_processor.go
+++ b/core/internal/common/supervisor/supervisor_processor.go
@@ -32,8 +32,9 @@
 
 // processorRequest is a request for the processor. Only one of the fields can be set.
 type processorRequest struct {
-	schedule *processorRequestSchedule
-	died     *processorRequestDied
+	schedule    *processorRequestSchedule
+	died        *processorRequestDied
+	waitSettled *processorRequestWaitSettled
 }
 
 // processorRequestSchedule requests that a given node's runnable be started.
@@ -47,16 +48,31 @@
 	err error
 }
 
+type processorRequestWaitSettled struct {
+	waiter chan struct{}
+}
+
 // processor is the main processing loop.
 func (s *supervisor) processor(ctx context.Context) {
 	s.ilogger.Info("supervisor processor started")
 
+	// Waiters waiting for the GC to be settled.
+	var waiters []chan struct{}
+
 	// The GC will run every millisecond if needed. Any time the processor requests a change in the supervision tree
 	// (ie a death or a new runnable) it will mark the state as dirty and run the GC on the next millisecond cycle.
 	gc := time.NewTicker(1 * time.Millisecond)
 	defer gc.Stop()
 	clean := true
 
+	// How long has the GC been clean. This is used to notify 'settled' waiters.
+	cleanCycles := 0
+
+	markDirty := func() {
+		clean = false
+		cleanCycles = 0
+	}
+
 	for {
 		select {
 		case <-ctx.Done():
@@ -71,14 +87,25 @@
 				s.ilogger.Debug("gc done", zap.Duration("elapsed", time.Since(gcStart)))
 			}
 			clean = true
+			cleanCycles += 1
+
+			// This threshold is somewhat arbitrary. It's a balance between test speed and test reliability.
+			if cleanCycles > 50 {
+				for _, w := range waiters {
+					close(w)
+				}
+				waiters = nil
+			}
 		case r := <-s.pReq:
 			switch {
 			case r.schedule != nil:
 				s.processSchedule(r.schedule)
-				clean = false
+				markDirty()
 			case r.died != nil:
 				s.processDied(r.died)
-				clean = false
+				markDirty()
+			case r.waitSettled != nil:
+				waiters = append(waiters, r.waitSettled.waiter)
 			default:
 				panic(fmt.Errorf("unhandled request %+v", r))
 			}
@@ -174,6 +201,7 @@
 
 	// Simple case: the context was canceled and the returned error is the context error.
 	if err := ctx.Err(); err != nil && perr == err {
+		s.ilogger.Debug("runnable returned after context cancel", zap.String("dn", n.dn()))
 		// Mark the node as canceled successfully.
 		n.state = nodeStateCanceled
 		return
@@ -244,7 +272,7 @@
 	}
 
 	tPhase1 := time.Now()
-	s.ilogger.Debug("gc phase 1 done", zap.Any("leaves", leaves))
+	s.ilogger.Debug("gc phase 1 done", zap.Any("leaves", len(leaves)))
 
 	// Phase two: traverse tree from node to root and make note of all subtrees that can be restarted.
 	// A subtree is restartable/ready iff every node in that subtree is either CANCELED, DEAD or DONE.
@@ -322,7 +350,7 @@
 	}
 
 	tPhase2 := time.Now()
-	s.ilogger.Debug("gc phase 2 done", zap.Any("ready", ready))
+	s.ilogger.Debug("gc phase 2 done", zap.Any("ready", len(ready)))
 
 	// Phase 3: traverse tree from root to find largest subtrees that need to be restarted and are ready to be
 	// restarted.
@@ -349,10 +377,13 @@
 			want[cur.dn()] = true
 		}
 
-		// If it should and can be restarted, that's all we want.
+		// If it should be restarted and is ready to be restarted...
 		if want[cur.dn()] && ready[cur.dn()] {
-			can[cur.dn()] = true
-			continue
+			// And its parent context is valid (ie hasn't been canceled), mark it as restartable.
+			if cur.parent == nil || cur.parent.ctx.Err() == nil {
+				can[cur.dn()] = true
+				continue
+			}
 		}
 
 		// Otherwise, traverse further down the tree to see if something else needs to be done.
@@ -362,19 +393,20 @@
 	}
 
 	tPhase3 := time.Now()
-	s.ilogger.Debug("gc phase 3 done", zap.Any("want", want), zap.Any("can", can))
+	s.ilogger.Debug("gc phase 3 done", zap.Any("want", len(want)), zap.Any("can", len(can)))
 
 	// Reinitialize and reschedule all subtrees
 	for dn, _ := range can {
 		n := s.nodeByDN(dn)
-		bo := time.Duration(0)
+
 		// Only back off when the node unexpectedly died - not when it got canceled.
+		bo := time.Duration(0)
 		if n.state == nodeStateDead {
 			bo = n.bo.NextBackOff()
 		}
+
 		// Prepare node for rescheduling - remove its children, reset its state to new.
 		n.reset()
-
 		s.ilogger.Info("rescheduling supervised node", zap.String("dn", dn), zap.Duration("backoff", bo))
 
 		// Reschedule node runnable to run after backoff.
diff --git a/core/internal/common/supervisor/supervisor_test.go b/core/internal/common/supervisor/supervisor_test.go
index 6a9bf42..9a190c9 100644
--- a/core/internal/common/supervisor/supervisor_test.go
+++ b/core/internal/common/supervisor/supervisor_test.go
@@ -185,7 +185,7 @@
 	log, _ := zap.NewDevelopment()
 	ctx, ctxC := context.WithCancel(context.Background())
 	defer ctxC()
-	New(ctx, log, func(ctx context.Context) error {
+	s := New(ctx, log, func(ctx context.Context) error {
 		err := RunGroup(ctx, map[string]Runnable{
 			"one": runnableBecomesHealthy(h1, d1),
 			"two": runnableBecomesHealthy(h2, d2),
@@ -199,15 +199,16 @@
 	}, WithPropagatePanic)
 
 	// Expect both to start running.
+	s.waitSettleError(ctx, t)
 	select {
 	case <-h1:
-	case <-time.After(100 * time.Millisecond):
-		t.Fatalf("runnable 'one' didn't start one time")
+	default:
+		t.Fatalf("runnable 'one' didn't start")
 	}
 	select {
 	case <-h2:
-	case <-time.After(100 * time.Millisecond):
-		t.Fatalf("runnable 'one' didn't start one time")
+	default:
+		t.Fatalf("runnable 'one' didn't start")
 	}
 }
 
@@ -217,9 +218,9 @@
 	two := newRC()
 
 	log, _ := zap.NewDevelopment()
-	ctx, ctxC := context.WithCancel(context.Background())
+	ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
 	defer ctxC()
-	New(ctx, log, func(ctx context.Context) error {
+	s := New(ctx, log, func(ctx context.Context) error {
 		err := RunGroup(ctx, map[string]Runnable{
 			"one": runnableBecomesHealthy(h1, d1),
 			"two": two.runnable(),
@@ -231,28 +232,32 @@
 		Signal(ctx, SignalDone)
 		return nil
 	}, WithPropagatePanic)
+	s.waitSettleError(ctx, t)
 
 	two.becomeHealthy()
+	s.waitSettleError(ctx, t)
 	// Expect one to start running.
 	select {
 	case <-h1:
-	case <-time.After(100 * time.Millisecond):
-		t.Fatalf("runnable 'one' didn't start one time")
+	default:
+		t.Fatalf("runnable 'one' didn't start")
 	}
 
 	// Kill off two, one should restart.
 	two.die()
+	s.waitSettleError(ctx, t)
 	select {
 	case <-d1:
-	case <-time.After(100 * time.Millisecond):
-		t.Fatalf("runnable 'one' didn't acknowledge cancel on time")
+	default:
+		t.Fatalf("runnable 'one' didn't acknowledge cancel")
 	}
 
 	// And one should start running again.
+	s.waitSettleError(ctx, t)
 	select {
 	case <-h1:
-	case <-time.After(100 * time.Millisecond):
-		t.Fatalf("runnable 'one' didn't restart on time")
+	default:
+		t.Fatalf("runnable 'one' didn't restart")
 	}
 }
 
@@ -262,11 +267,12 @@
 	two := newRC()
 
 	log, _ := zap.NewDevelopment()
-	ctx, ctxC := context.WithCancel(context.Background())
+
+	ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
 	defer ctxC()
-	New(ctx, log, func(ctx context.Context) error {
+	s := New(ctx, log, func(ctx context.Context) error {
 		err := RunGroup(ctx, map[string]Runnable{
-			"one": runnableSpawnsMore(h1, d1, 4),
+			"one": runnableSpawnsMore(h1, d1, 5),
 			"two": two.runnable(),
 		})
 		if err != nil {
@@ -278,26 +284,29 @@
 	}, WithPropagatePanic)
 
 	two.becomeHealthy()
+	s.waitSettleError(ctx, t)
 	// Expect one to start running.
 	select {
 	case <-h1:
-	case <-time.After(100 * time.Millisecond):
-		t.Fatalf("runnable 'one' didn't start one time")
+	default:
+		t.Fatalf("runnable 'one' didn't start")
 	}
 
 	// Kill off two, one should restart.
 	two.die()
+	s.waitSettleError(ctx, t)
 	select {
 	case <-d1:
-	case <-time.After(100 * time.Millisecond):
-		t.Fatalf("runnable 'one' didn't acknowledge cancel on time")
+	default:
+		t.Fatalf("runnable 'one' didn't acknowledge cancel")
 	}
 
 	// And one should start running again.
+	s.waitSettleError(ctx, t)
 	select {
 	case <-h1:
-	case <-time.After(100 * time.Millisecond):
-		t.Fatalf("runnable 'one' didn't restart on time")
+	default:
+		t.Fatalf("runnable 'one' didn't restart")
 	}
 }
 
@@ -309,7 +318,7 @@
 	log, _ := zap.NewDevelopment()
 	ctx, ctxC := context.WithCancel(context.Background())
 	defer ctxC()
-	New(ctx, log, func(ctx context.Context) error {
+	s := New(ctx, log, func(ctx context.Context) error {
 		err := RunGroup(ctx, map[string]Runnable{
 			"one": runnableBecomesHealthy(h1, d1),
 			"two": two.runnable(),
@@ -323,26 +332,29 @@
 	})
 
 	two.becomeHealthy()
+	s.waitSettleError(ctx, t)
 	// Expect one to start running.
 	select {
 	case <-h1:
-	case <-time.After(100 * time.Millisecond):
-		t.Fatalf("runnable 'one' didn't start one time")
+	default:
+		t.Fatalf("runnable 'one' didn't start")
 	}
 
 	// Kill off two, one should restart.
 	two.panic()
+	s.waitSettleError(ctx, t)
 	select {
 	case <-d1:
-	case <-time.After(100 * time.Millisecond):
-		t.Fatalf("runnable 'one' didn't acknowledge cancel on time")
+	default:
+		t.Fatalf("runnable 'one' didn't acknowledge cancel")
 	}
 
 	// And one should start running again.
+	s.waitSettleError(ctx, t)
 	select {
 	case <-h1:
-	case <-time.After(100 * time.Millisecond):
-		t.Fatalf("runnable 'one' didn't restart on time")
+	default:
+		t.Fatalf("runnable 'one' didn't restart")
 	}
 }
 
@@ -366,10 +378,12 @@
 
 func TestBackoff(t *testing.T) {
 	one := newRC()
+
 	log, _ := zap.NewDevelopment()
-	ctx, ctxC := context.WithCancel(context.Background())
+	ctx, ctxC := context.WithTimeout(context.Background(), 20*time.Second)
 	defer ctxC()
-	New(ctx, log, func(ctx context.Context) error {
+
+	s := New(ctx, log, func(ctx context.Context) error {
 		if err := Run(ctx, "one", one.runnable()); err != nil {
 			return err
 		}
@@ -391,17 +405,18 @@
 	one.waitState(rcRunnableStateHealthy)
 	taken := time.Since(start)
 	if taken < 1*time.Second {
-		t.Errorf("Runnable took %v to restarted, wanted at least a second from backoff", taken)
+		t.Errorf("Runnable took %v to restart, wanted at least a second from backoff", taken)
 	}
 
+	s.waitSettleError(ctx, t)
 	// Now that we've become healthy, die again. Becoming healthy resets the backoff.
 	start = time.Now()
 	one.die()
 	one.becomeHealthy()
 	one.waitState(rcRunnableStateHealthy)
 	taken = time.Since(start)
-	if taken > 500*time.Millisecond || taken < 100*time.Millisecond {
-		t.Errorf("Runnable took %v to restarted, wanted at least 100ms from backoff and at most 500ms from backoff reset", taken)
+	if taken > 1*time.Second || taken < 100*time.Millisecond {
+		t.Errorf("Runnable took %v to restart, wanted at least 100ms from backoff and at most 1s from backoff reset", taken)
 	}
 }
 
@@ -426,7 +441,7 @@
 	oneSibling := newRC()
 
 	oneTest := func() {
-		timeout := time.NewTicker(100 * time.Millisecond)
+		timeout := time.NewTicker(1000 * time.Millisecond)
 		ping := make(chan struct{})
 		req <- ping
 		select {
diff --git a/core/internal/common/supervisor/supervisor_testhelpers.go b/core/internal/common/supervisor/supervisor_testhelpers.go
new file mode 100644
index 0000000..771e02f
--- /dev/null
+++ b/core/internal/common/supervisor/supervisor_testhelpers.go
@@ -0,0 +1,50 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+import (
+	"context"
+	"testing"
+)
+
+// waitSettle waits until the supervisor reaches a 'settled' state - ie., one
+// where no actions have been performed for a number of GC cycles.
+// This is used in tests only.
+func (s *supervisor) waitSettle(ctx context.Context) error {
+	waiter := make(chan struct{})
+	s.pReq <- &processorRequest{
+		waitSettled: &processorRequestWaitSettled{
+			waiter: waiter,
+		},
+	}
+
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	case <-waiter:
+		return nil
+	}
+}
+
+// waitSettleError wraps waitSettle to fail a test if an error occurs, eg. the
+// context is canceled.
+func (s *supervisor) waitSettleError(ctx context.Context, t *testing.T) {
+	err := s.waitSettle(ctx)
+	if err != nil {
+		t.Fatalf("waitSettle: %v", err)
+	}
+}