metropolis: unify utility packages

One last sweeping rename / reshuffle.

We get rid of //metropolis/node/common and //golibs, unifying them into
a single //metropolis/pkg meta-package.

This is to be documented somwhere properly, but here's the new logic
behind selecting where to place a new library package:

 - if it's specific to k8s-on-metropolis, put it in
   //metropolis/node/kubernetes/*. This is a self-contained tree that
   other paths cannot import from.
 - if it's a big new subsystem of the metropolis core, put it in
   //metropolis/node/core. This can be imported by anything in
   //m/n (eg the Kubernetes code at //m/n/kubernetes
 - otherwise, treat it as generic library that's part of the metropolis
   project, and put it in //metropolis/pkg. This can be imported by
   anything within //metropolis.

This will be followed up by a diff that updates visibility rules.

Test Plan: Pure refactor, CI only.

X-Origin-Diff: phab/D683
GitOrigin-RevId: 883e7f09a7d22d64e966d07bbe839454ed081c79
diff --git a/metropolis/pkg/supervisor/BUILD.bazel b/metropolis/pkg/supervisor/BUILD.bazel
new file mode 100644
index 0000000..40b0469
--- /dev/null
+++ b/metropolis/pkg/supervisor/BUILD.bazel
@@ -0,0 +1,28 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+    name = "go_default_library",
+    srcs = [
+        "supervisor.go",
+        "supervisor_node.go",
+        "supervisor_processor.go",
+        "supervisor_support.go",
+        "supervisor_testhelpers.go",
+    ],
+    importpath = "git.monogon.dev/source/nexantic.git/metropolis/pkg/supervisor",
+    visibility = [
+        "//metropolis/node:__subpackages__",
+        "//metropolis/test:__subpackages__",
+    ],
+    deps = [
+        "//metropolis/pkg/logtree:go_default_library",
+        "@com_github_cenkalti_backoff_v4//:go_default_library",
+        "@org_golang_google_grpc//:go_default_library",
+    ],
+)
+
+go_test(
+    name = "go_default_test",
+    srcs = ["supervisor_test.go"],
+    embed = [":go_default_library"],
+)
diff --git a/metropolis/pkg/supervisor/supervisor.go b/metropolis/pkg/supervisor/supervisor.go
new file mode 100644
index 0000000..ed79c69
--- /dev/null
+++ b/metropolis/pkg/supervisor/supervisor.go
@@ -0,0 +1,145 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+// The service supervision library allows for writing of reliable, service-style software within a Metropolis node.
+// It builds upon the Erlang/OTP supervision tree system, adapted to be more Go-ish.
+// For detailed design see go/supervision.
+
+import (
+	"context"
+	"io"
+	"sync"
+
+	"git.monogon.dev/source/nexantic.git/metropolis/pkg/logtree"
+)
+
+// A Runnable is a function that will be run in a goroutine, and supervised throughout its lifetime. It can in turn
+// start more runnables as its children, and those will form part of a supervision tree.
+// The context passed to a runnable is very important and needs to be handled properly. It will be live (non-errored) as
+// long as the runnable should be running, and canceled (ctx.Err() will be non-nil) when the supervisor wants it to
+// exit. This means this context is also perfectly usable for performing any blocking operations.
+type Runnable func(ctx context.Context) error
+
+// RunGroup starts a set of runnables as a group. These runnables will run together, and if any one of them quits
+// unexpectedly, the result will be canceled and restarted.
+// The context here must be an existing Runnable context, and the spawned runnables will run under the node that this
+// context represents.
+func RunGroup(ctx context.Context, runnables map[string]Runnable) error {
+	node, unlock := fromContext(ctx)
+	defer unlock()
+	return node.runGroup(runnables)
+}
+
+// Run starts a single runnable in its own group.
+func Run(ctx context.Context, name string, runnable Runnable) error {
+	return RunGroup(ctx, map[string]Runnable{
+		name: runnable,
+	})
+}
+
+// Signal tells the supervisor that the calling runnable has reached a certain state of its lifecycle. All runnables
+// should SignalHealthy when they are ready with set up, running other child runnables and are now 'serving'.
+func Signal(ctx context.Context, signal SignalType) {
+	node, unlock := fromContext(ctx)
+	defer unlock()
+	node.signal(signal)
+}
+
+type SignalType int
+
+const (
+	// The runnable is healthy, done with setup, done with spawning more Runnables, and ready to serve in a loop.
+	// The runnable needs to check the parent context and ensure that if that context is done, the runnable exits.
+	SignalHealthy SignalType = iota
+	// The runnable is done - it does not need to run any loop. This is useful for Runnables that only set up other
+	// child runnables. This runnable will be restarted if a related failure happens somewhere in the supervision tree.
+	SignalDone
+)
+
+// supervisor represents and instance of the supervision system. It keeps track of a supervision tree and a request
+// channel to its internal processor goroutine.
+type supervisor struct {
+	// mu guards the entire state of the supervisor.
+	mu sync.RWMutex
+	// root is the root node of the supervision tree, named 'root'. It represents the Runnable started with the
+	// supervisor.New call.
+	root *node
+	// logtree is the main logtree exposed to runnables and used internally.
+	logtree *logtree.LogTree
+	// ilogger is the internal logger logging to "supervisor" in the logtree.
+	ilogger logtree.LeveledLogger
+
+	// pReq is an interface channel to the lifecycle processor of the supervisor.
+	pReq chan *processorRequest
+
+	// propagate panics, ie. don't catch them.
+	propagatePanic bool
+}
+
+// SupervisorOpt are runtime configurable options for the supervisor.
+type SupervisorOpt func(s *supervisor)
+
+var (
+	// WithPropagatePanic prevents the Supervisor from catching panics in runnables and treating them as failures.
+	// This is useful to enable for testing and local debugging.
+	WithPropagatePanic = func(s *supervisor) {
+		s.propagatePanic = true
+	}
+)
+
+func WithExistingLogtree(lt *logtree.LogTree) SupervisorOpt {
+	return func(s *supervisor) {
+		s.logtree = lt
+	}
+}
+
+// New creates a new supervisor with its root running the given root runnable.
+// The given context can be used to cancel the entire supervision tree.
+func New(ctx context.Context, rootRunnable Runnable, opts ...SupervisorOpt) *supervisor {
+	sup := &supervisor{
+		logtree: logtree.New(),
+		pReq:    make(chan *processorRequest),
+	}
+
+	for _, o := range opts {
+		o(sup)
+	}
+
+	sup.ilogger = sup.logtree.MustLeveledFor("supervisor")
+	sup.root = newNode("root", rootRunnable, sup, nil)
+
+	go sup.processor(ctx)
+
+	sup.pReq <- &processorRequest{
+		schedule: &processorRequestSchedule{dn: "root"},
+	}
+
+	return sup
+}
+
+func Logger(ctx context.Context) logtree.LeveledLogger {
+	node, unlock := fromContext(ctx)
+	defer unlock()
+	return node.sup.logtree.MustLeveledFor(logtree.DN(node.dn()))
+}
+
+func RawLogger(ctx context.Context) io.Writer {
+	node, unlock := fromContext(ctx)
+	defer unlock()
+	return node.sup.logtree.MustRawFor(logtree.DN(node.dn()))
+}
diff --git a/metropolis/pkg/supervisor/supervisor_node.go b/metropolis/pkg/supervisor/supervisor_node.go
new file mode 100644
index 0000000..a7caf82
--- /dev/null
+++ b/metropolis/pkg/supervisor/supervisor_node.go
@@ -0,0 +1,282 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+import (
+	"context"
+	"fmt"
+	"regexp"
+	"strings"
+
+	"github.com/cenkalti/backoff/v4"
+)
+
+// node is a supervision tree node. It represents the state of a Runnable within this tree, its relation to other tree
+// elements, and contains supporting data needed to actually supervise it.
+type node struct {
+	// The name of this node. Opaque string. It's used to make up the 'dn' (distinguished name) of a node within
+	// the tree. When starting a runnable inside a tree, this is where that name gets used.
+	name     string
+	runnable Runnable
+
+	// The supervisor managing this tree.
+	sup *supervisor
+	// The parent, within the tree, of this node. If this is the root node of the tree, this is nil.
+	parent *node
+	// Children of this tree. This is represented by a map keyed from child node names, for easy access.
+	children map[string]*node
+	// Supervision groups. Each group is a set of names of children. Sets, and as such groups, don't overlap between
+	// each other. A supervision group indicates that if any child within that group fails, all others should be
+	// canceled and restarted together.
+	groups []map[string]bool
+
+	// The current state of the runnable in this node.
+	state nodeState
+
+	// Backoff used to keep runnables from being restarted too fast.
+	bo *backoff.ExponentialBackOff
+
+	// Context passed to the runnable, and its cancel function.
+	ctx  context.Context
+	ctxC context.CancelFunc
+}
+
+// nodeState is the state of a runnable within a node, and in a way the node itself.
+// This follows the state diagram from go/supervision.
+type nodeState int
+
+const (
+	// A node that has just been created, and whose runnable has been started already but hasn't signaled anything yet.
+	nodeStateNew nodeState = iota
+	// A node whose runnable has signaled being healthy - this means it's ready to serve/act.
+	nodeStateHealthy
+	// A node that has unexpectedly returned or panicked.
+	nodeStateDead
+	// A node that has declared that its done with its work and should not be restarted, unless a supervision tree
+	// failure requires that.
+	nodeStateDone
+	// A node that has returned after being requested to cancel.
+	nodeStateCanceled
+)
+
+func (s nodeState) String() string {
+	switch s {
+	case nodeStateNew:
+		return "NODE_STATE_NEW"
+	case nodeStateHealthy:
+		return "NODE_STATE_HEALTHY"
+	case nodeStateDead:
+		return "NODE_STATE_DEAD"
+	case nodeStateDone:
+		return "NODE_STATE_DONE"
+	case nodeStateCanceled:
+		return "NODE_STATE_CANCELED"
+	}
+	return "UNKNOWN"
+}
+
+func (n *node) String() string {
+	return fmt.Sprintf("%s (%s)", n.dn(), n.state.String())
+}
+
+// contextKey is a type used to keep data within context values.
+type contextKey string
+
+var (
+	supervisorKey = contextKey("supervisor")
+	dnKey         = contextKey("dn")
+)
+
+// fromContext retrieves a tree node from a runnable context. It takes a lock on the tree and returns an unlock
+// function. This unlock function needs to be called once mutations on the tree/supervisor/node are done.
+func fromContext(ctx context.Context) (*node, func()) {
+	sup, ok := ctx.Value(supervisorKey).(*supervisor)
+	if !ok {
+		panic("supervisor function called from non-runnable context")
+	}
+
+	sup.mu.Lock()
+
+	dnParent, ok := ctx.Value(dnKey).(string)
+	if !ok {
+		sup.mu.Unlock()
+		panic("supervisor function called from non-runnable context")
+	}
+
+	return sup.nodeByDN(dnParent), sup.mu.Unlock
+}
+
+// All the following 'internal' supervisor functions must only be called with the supervisor lock taken. Getting a lock
+// via fromContext is enough.
+
+// dn returns the distinguished name of a node. This distinguished name is a period-separated, inverse-DNS-like name.
+// For instance, the runnable 'foo' within the runnable 'bar' will be called 'root.bar.foo'. The root of the tree is
+// always named, and has the dn, 'root'.
+func (n *node) dn() string {
+	if n.parent != nil {
+		return fmt.Sprintf("%s.%s", n.parent.dn(), n.name)
+	}
+	return n.name
+}
+
+// groupSiblings is a helper function to get all runnable group siblings of a given runnable name within this node.
+// All children are always in a group, even if that group is unary.
+func (n *node) groupSiblings(name string) map[string]bool {
+	for _, m := range n.groups {
+		if _, ok := m[name]; ok {
+			return m
+		}
+	}
+	return nil
+}
+
+// newNode creates a new node with a given parent. It does not register it with the parent (as that depends on group
+// placement).
+func newNode(name string, runnable Runnable, sup *supervisor, parent *node) *node {
+	// We use exponential backoff for failed runnables, but at some point we cap at a given backoff time.
+	// To achieve this, we set MaxElapsedTime to 0, which will cap the backoff at MaxInterval.
+	bo := backoff.NewExponentialBackOff()
+	bo.MaxElapsedTime = 0
+
+	n := &node{
+		name:     name,
+		runnable: runnable,
+
+		bo: bo,
+
+		sup:    sup,
+		parent: parent,
+	}
+	n.reset()
+	return n
+}
+
+// resetNode sets up all the dynamic fields of the node, in preparation of starting a runnable. It clears the node's
+// children, groups and resets its context.
+func (n *node) reset() {
+	// Make new context. First, acquire parent context. For the root node that's Background, otherwise it's the
+	// parent's context.
+	var pCtx context.Context
+	if n.parent == nil {
+		pCtx = context.Background()
+	} else {
+		pCtx = n.parent.ctx
+	}
+	// Mark DN and supervisor in context.
+	ctx := context.WithValue(pCtx, dnKey, n.dn())
+	ctx = context.WithValue(ctx, supervisorKey, n.sup)
+	ctx, ctxC := context.WithCancel(ctx)
+	// Set context
+	n.ctx = ctx
+	n.ctxC = ctxC
+
+	// Clear children and state
+	n.state = nodeStateNew
+	n.children = make(map[string]*node)
+	n.groups = nil
+
+	// The node is now ready to be scheduled.
+}
+
+// nodeByDN returns a node by given DN from the supervisor.
+func (s *supervisor) nodeByDN(dn string) *node {
+	parts := strings.Split(dn, ".")
+	if parts[0] != "root" {
+		panic("DN does not start with root.")
+	}
+	parts = parts[1:]
+	cur := s.root
+	for {
+		if len(parts) == 0 {
+			return cur
+		}
+
+		next, ok := cur.children[parts[0]]
+		if !ok {
+			panic(fmt.Errorf("could not find %v (%s) in %s", parts, dn, cur))
+		}
+		cur = next
+		parts = parts[1:]
+	}
+}
+
+// reNodeName validates a node name against constraints.
+var reNodeName = regexp.MustCompile(`[a-z90-9_]{1,64}`)
+
+// runGroup schedules a new group of runnables to run on a node.
+func (n *node) runGroup(runnables map[string]Runnable) error {
+	// Check that the parent node is in the right state.
+	if n.state != nodeStateNew {
+		return fmt.Errorf("cannot run new runnable on non-NEW node")
+	}
+
+	// Check the requested runnable names.
+	for name, _ := range runnables {
+		if !reNodeName.MatchString(name) {
+			return fmt.Errorf("runnable name %q is invalid", name)
+		}
+		if _, ok := n.children[name]; ok {
+			return fmt.Errorf("runnable %q already exists", name)
+		}
+	}
+
+	// Create child nodes.
+	dns := make(map[string]string)
+	group := make(map[string]bool)
+	for name, runnable := range runnables {
+		if g := n.groupSiblings(name); g != nil {
+			return fmt.Errorf("duplicate child name %q", name)
+		}
+		node := newNode(name, runnable, n.sup, n)
+		n.children[name] = node
+
+		dns[name] = node.dn()
+		group[name] = true
+	}
+	// Add group.
+	n.groups = append(n.groups, group)
+
+	// Schedule execution of group members.
+	go func() {
+		for name, _ := range runnables {
+			n.sup.pReq <- &processorRequest{
+				schedule: &processorRequestSchedule{
+					dn: dns[name],
+				},
+			}
+		}
+	}()
+	return nil
+}
+
+// signal sequences state changes by signals received from runnables and updates a node's status accordingly.
+func (n *node) signal(signal SignalType) {
+	switch signal {
+	case SignalHealthy:
+		if n.state != nodeStateNew {
+			panic(fmt.Errorf("node %s signaled healthy", n))
+		}
+		n.state = nodeStateHealthy
+		n.bo.Reset()
+	case SignalDone:
+		if n.state != nodeStateHealthy {
+			panic(fmt.Errorf("node %s signaled done", n))
+		}
+		n.state = nodeStateDone
+		n.bo.Reset()
+	}
+}
diff --git a/metropolis/pkg/supervisor/supervisor_processor.go b/metropolis/pkg/supervisor/supervisor_processor.go
new file mode 100644
index 0000000..965a667
--- /dev/null
+++ b/metropolis/pkg/supervisor/supervisor_processor.go
@@ -0,0 +1,404 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"runtime/debug"
+	"time"
+)
+
+// The processor maintains runnable goroutines - ie., when requested will start one, and then once it exists it will
+// record the result and act accordingly. It is also responsible for detecting and acting upon supervision subtrees that
+// need to be restarted after death (via a 'GC' process)
+
+// processorRequest is a request for the processor. Only one of the fields can be set.
+type processorRequest struct {
+	schedule    *processorRequestSchedule
+	died        *processorRequestDied
+	waitSettled *processorRequestWaitSettled
+}
+
+// processorRequestSchedule requests that a given node's runnable be started.
+type processorRequestSchedule struct {
+	dn string
+}
+
+// processorRequestDied is a signal from a runnable goroutine that the runnable has died.
+type processorRequestDied struct {
+	dn  string
+	err error
+}
+
+type processorRequestWaitSettled struct {
+	waiter chan struct{}
+}
+
+// processor is the main processing loop.
+func (s *supervisor) processor(ctx context.Context) {
+	s.ilogger.Info("supervisor processor started")
+
+	// Waiters waiting for the GC to be settled.
+	var waiters []chan struct{}
+
+	// The GC will run every millisecond if needed. Any time the processor requests a change in the supervision tree
+	// (ie a death or a new runnable) it will mark the state as dirty and run the GC on the next millisecond cycle.
+	gc := time.NewTicker(1 * time.Millisecond)
+	defer gc.Stop()
+	clean := true
+
+	// How long has the GC been clean. This is used to notify 'settled' waiters.
+	cleanCycles := 0
+
+	markDirty := func() {
+		clean = false
+		cleanCycles = 0
+	}
+
+	for {
+		select {
+		case <-ctx.Done():
+			s.ilogger.Infof("supervisor processor exiting: %v", ctx.Err())
+			s.processKill()
+			s.ilogger.Info("supervisor exited")
+			return
+		case <-gc.C:
+			if !clean {
+				s.processGC()
+			}
+			clean = true
+			cleanCycles += 1
+
+			// This threshold is somewhat arbitrary. It's a balance between test speed and test reliability.
+			if cleanCycles > 50 {
+				for _, w := range waiters {
+					close(w)
+				}
+				waiters = nil
+			}
+		case r := <-s.pReq:
+			switch {
+			case r.schedule != nil:
+				s.processSchedule(r.schedule)
+				markDirty()
+			case r.died != nil:
+				s.processDied(r.died)
+				markDirty()
+			case r.waitSettled != nil:
+				waiters = append(waiters, r.waitSettled.waiter)
+			default:
+				panic(fmt.Errorf("unhandled request %+v", r))
+			}
+		}
+	}
+}
+
+// processKill cancels all nodes in the supervision tree. This is only called right before exiting the processor, so
+// they do not get automatically restarted.
+func (s *supervisor) processKill() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	// Gather all context cancel functions.
+	var cancels []func()
+	queue := []*node{s.root}
+	for {
+		if len(queue) == 0 {
+			break
+		}
+
+		cur := queue[0]
+		queue = queue[1:]
+
+		cancels = append(cancels, cur.ctxC)
+		for _, c := range cur.children {
+			queue = append(queue, c)
+		}
+	}
+
+	// Call all context cancels.
+	for _, c := range cancels {
+		c()
+	}
+}
+
+// processSchedule starts a node's runnable in a goroutine and records its output once it's done.
+func (s *supervisor) processSchedule(r *processorRequestSchedule) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	n := s.nodeByDN(r.dn)
+	go func() {
+		if !s.propagatePanic {
+			defer func() {
+				if rec := recover(); rec != nil {
+					s.pReq <- &processorRequest{
+						died: &processorRequestDied{
+							dn:  r.dn,
+							err: fmt.Errorf("panic: %v, stacktrace: %s", rec, string(debug.Stack())),
+						},
+					}
+				}
+			}()
+		}
+
+		res := n.runnable(n.ctx)
+
+		s.pReq <- &processorRequest{
+			died: &processorRequestDied{
+				dn:  r.dn,
+				err: res,
+			},
+		}
+	}()
+}
+
+// processDied records the result from a runnable goroutine, and updates its node state accordingly. If the result
+// is a death and not an expected exit, related nodes (ie. children and group siblings) are canceled accordingly.
+func (s *supervisor) processDied(r *processorRequestDied) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	// Okay, so a Runnable has quit. What now?
+	n := s.nodeByDN(r.dn)
+	ctx := n.ctx
+
+	// Simple case: it was marked as Done and quit with no error.
+	if n.state == nodeStateDone && r.err == nil {
+		// Do nothing. This was supposed to happen. Keep the process as DONE.
+		return
+	}
+
+	// Find innermost error to check if it's a context canceled error.
+	perr := r.err
+	for {
+		if inner := errors.Unwrap(perr); inner != nil {
+			perr = inner
+			continue
+		}
+		break
+	}
+
+	// Simple case: the context was canceled and the returned error is the context error.
+	if err := ctx.Err(); err != nil && perr == err {
+		// Mark the node as canceled successfully.
+		n.state = nodeStateCanceled
+		return
+	}
+
+	// Otherwise, the Runnable should not have died or quit. Handle accordingly.
+	err := r.err
+	// A lack of returned error is also an error.
+	if err == nil {
+		err = fmt.Errorf("returned when %s", n.state)
+	} else {
+		err = fmt.Errorf("returned error when %s: %w", n.state, err)
+	}
+
+	s.ilogger.Errorf("Runnable %s died: %v", n.dn(), err)
+	// Mark as dead.
+	n.state = nodeStateDead
+
+	// Cancel that node's context, just in case something still depends on it.
+	n.ctxC()
+
+	// Cancel all siblings.
+	if n.parent != nil {
+		for name, _ := range n.parent.groupSiblings(n.name) {
+			if name == n.name {
+				continue
+			}
+			sibling := n.parent.children[name]
+			// TODO(q3k): does this need to run in a goroutine, ie. can a context cancel block?
+			sibling.ctxC()
+		}
+	}
+}
+
+// processGC runs the GC process. It's not really Garbage Collection, as in, it doesn't remove unnecessary tree nodes -
+// but it does find nodes that need to be restarted, find the subset that can and then schedules them for running.
+// As such, it's less of a Garbage Collector and more of a Necromancer. However, GC is a friendlier name.
+func (s *supervisor) processGC() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	// The 'GC' serves is the main business logic of the supervision tree. It traverses a locked tree and tries to
+	// find subtrees that must be restarted (because of a DEAD/CANCELED runnable). It then finds which of these
+	// subtrees that should be restarted can be restarted, ie. which ones are fully recursively DEAD/CANCELED. It
+	// also finds the smallest set of largest subtrees that can be restarted, ie. if there's multiple DEAD runnables
+	// that can be restarted at once, it will do so.
+
+	// Phase one: Find all leaves.
+	// This is a simple DFS that finds all the leaves of the tree, ie all nodes that do not have children nodes.
+	leaves := make(map[string]bool)
+	queue := []*node{s.root}
+	for {
+		if len(queue) == 0 {
+			break
+		}
+		cur := queue[0]
+		queue = queue[1:]
+
+		for _, c := range cur.children {
+			queue = append([]*node{c}, queue...)
+		}
+
+		if len(cur.children) == 0 {
+			leaves[cur.dn()] = true
+		}
+	}
+
+	// Phase two: traverse tree from node to root and make note of all subtrees that can be restarted.
+	// A subtree is restartable/ready iff every node in that subtree is either CANCELED, DEAD or DONE.
+	// Such a 'ready' subtree can be restarted by the supervisor if needed.
+
+	// DNs that we already visited.
+	visited := make(map[string]bool)
+	// DNs whose subtrees are ready to be restarted.
+	// These are all subtrees recursively - ie., root.a.a and root.a will both be marked here.
+	ready := make(map[string]bool)
+
+	// We build a queue of nodes to visit, starting from the leaves.
+	queue = []*node{}
+	for l, _ := range leaves {
+		queue = append(queue, s.nodeByDN(l))
+	}
+
+	for {
+		if len(queue) == 0 {
+			break
+		}
+
+		cur := queue[0]
+		curDn := cur.dn()
+
+		queue = queue[1:]
+
+		// Do we have a decision about our children?
+		allVisited := true
+		for _, c := range cur.children {
+			if !visited[c.dn()] {
+				allVisited = false
+				break
+			}
+		}
+
+		// If no decision about children is available, it means we ended up in this subtree through some shorter path
+		// of a shorter/lower-order leaf. There is a path to a leaf that's longer than the one that caused this node
+		// to be enqueued. Easy solution: just push back the current element and retry later.
+		if !allVisited {
+			// Push back to queue and wait for a decision later.
+			queue = append(queue, cur)
+			continue
+		}
+
+		// All children have been visited and we have an idea about whether they're ready/restartable. All of the node's
+		// children must be restartable in order for this node to be restartable.
+		childrenReady := true
+		for _, c := range cur.children {
+			if !ready[c.dn()] {
+				childrenReady = false
+				break
+			}
+		}
+
+		// In addition to children, the node itself must be restartable (ie. DONE, DEAD or CANCELED).
+		curReady := false
+		switch cur.state {
+		case nodeStateDone:
+			curReady = true
+		case nodeStateCanceled:
+			curReady = true
+		case nodeStateDead:
+			curReady = true
+		}
+
+		// Note down that we have an opinion on this node, and note that opinion down.
+		visited[curDn] = true
+		ready[curDn] = childrenReady && curReady
+
+		// Now we can also enqueue the parent of this node for processing.
+		if cur.parent != nil && !visited[cur.parent.dn()] {
+			queue = append(queue, cur.parent)
+		}
+	}
+
+	// Phase 3: traverse tree from root to find largest subtrees that need to be restarted and are ready to be
+	// restarted.
+
+	// All DNs that need to be restarted by the GC process.
+	want := make(map[string]bool)
+	// All DNs that need to be restarted and can be restarted by the GC process - a subset of 'want' DNs.
+	can := make(map[string]bool)
+	// The set difference between 'want' and 'can' are all nodes that should be restarted but can't yet (ie. because
+	// a child is still in the process of being canceled).
+
+	// DFS from root.
+	queue = []*node{s.root}
+	for {
+		if len(queue) == 0 {
+			break
+		}
+
+		cur := queue[0]
+		queue = queue[1:]
+
+		// If this node is DEAD or CANCELED it should be restarted.
+		if cur.state == nodeStateDead || cur.state == nodeStateCanceled {
+			want[cur.dn()] = true
+		}
+
+		// If it should be restarted and is ready to be restarted...
+		if want[cur.dn()] && ready[cur.dn()] {
+			// And its parent context is valid (ie hasn't been canceled), mark it as restartable.
+			if cur.parent == nil || cur.parent.ctx.Err() == nil {
+				can[cur.dn()] = true
+				continue
+			}
+		}
+
+		// Otherwise, traverse further down the tree to see if something else needs to be done.
+		for _, c := range cur.children {
+			queue = append(queue, c)
+		}
+	}
+
+	// Reinitialize and reschedule all subtrees
+	for dn, _ := range can {
+		n := s.nodeByDN(dn)
+
+		// Only back off when the node unexpectedly died - not when it got canceled.
+		bo := time.Duration(0)
+		if n.state == nodeStateDead {
+			bo = n.bo.NextBackOff()
+		}
+
+		// Prepare node for rescheduling - remove its children, reset its state to new.
+		n.reset()
+		s.ilogger.Infof("rescheduling supervised node %s with backoff %s", dn, bo.String())
+
+		// Reschedule node runnable to run after backoff.
+		go func(n *node, bo time.Duration) {
+			time.Sleep(bo)
+			s.pReq <- &processorRequest{
+				schedule: &processorRequestSchedule{dn: n.dn()},
+			}
+		}(n, bo)
+	}
+}
diff --git a/metropolis/pkg/supervisor/supervisor_support.go b/metropolis/pkg/supervisor/supervisor_support.go
new file mode 100644
index 0000000..d54b35c
--- /dev/null
+++ b/metropolis/pkg/supervisor/supervisor_support.go
@@ -0,0 +1,62 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+// Supporting infrastructure to allow running some non-Go payloads under supervision.
+
+import (
+	"context"
+	"net"
+	"os/exec"
+
+	"google.golang.org/grpc"
+)
+
+// GRPCServer creates a Runnable that serves gRPC requests as longs as it's not canceled.
+// If graceful is set to true, the server will be gracefully stopped instead of plain stopped. This means all pending
+// RPCs will finish, but also requires streaming gRPC handlers to check their context liveliness and exit accordingly.
+// If the server code does not support this, `graceful` should be false and the server will be killed violently instead.
+func GRPCServer(srv *grpc.Server, lis net.Listener, graceful bool) Runnable {
+	return func(ctx context.Context) error {
+		Signal(ctx, SignalHealthy)
+		errC := make(chan error)
+		go func() {
+			errC <- srv.Serve(lis)
+		}()
+		select {
+		case <-ctx.Done():
+			if graceful {
+				srv.GracefulStop()
+			} else {
+				srv.Stop()
+			}
+			return ctx.Err()
+		case err := <-errC:
+			return err
+		}
+	}
+}
+
+// RunCommand will create a Runnable that starts a long-running command, whose exit is determined to be a failure.
+func RunCommand(ctx context.Context, cmd *exec.Cmd) error {
+	Signal(ctx, SignalHealthy)
+	cmd.Stdout = RawLogger(ctx)
+	cmd.Stderr = RawLogger(ctx)
+	err := cmd.Run()
+	Logger(ctx).Infof("Command returned: %v", err)
+	return err
+}
diff --git a/metropolis/pkg/supervisor/supervisor_test.go b/metropolis/pkg/supervisor/supervisor_test.go
new file mode 100644
index 0000000..9c7bdb7
--- /dev/null
+++ b/metropolis/pkg/supervisor/supervisor_test.go
@@ -0,0 +1,557 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+import (
+	"context"
+	"fmt"
+	"testing"
+	"time"
+)
+
+func runnableBecomesHealthy(healthy, done chan struct{}) Runnable {
+	return func(ctx context.Context) error {
+		Signal(ctx, SignalHealthy)
+
+		go func() {
+			if healthy != nil {
+				healthy <- struct{}{}
+			}
+		}()
+
+		<-ctx.Done()
+
+		go func() {
+			if done != nil {
+				done <- struct{}{}
+			}
+		}()
+
+		return ctx.Err()
+	}
+}
+
+func runnableSpawnsMore(healthy, done chan struct{}, levels int) Runnable {
+	return func(ctx context.Context) error {
+		if levels > 0 {
+			err := RunGroup(ctx, map[string]Runnable{
+				"a": runnableSpawnsMore(nil, nil, levels-1),
+				"b": runnableSpawnsMore(nil, nil, levels-1),
+			})
+			if err != nil {
+				return err
+			}
+		}
+
+		Signal(ctx, SignalHealthy)
+
+		go func() {
+			if healthy != nil {
+				healthy <- struct{}{}
+			}
+		}()
+
+		<-ctx.Done()
+
+		go func() {
+			if done != nil {
+				done <- struct{}{}
+			}
+		}()
+		return ctx.Err()
+	}
+}
+
+// rc is a Remote Controlled runnable. It is a generic runnable used for testing the supervisor.
+type rc struct {
+	req chan rcRunnableRequest
+}
+
+type rcRunnableRequest struct {
+	cmd    rcRunnableCommand
+	stateC chan rcRunnableState
+}
+
+type rcRunnableCommand int
+
+const (
+	rcRunnableCommandBecomeHealthy rcRunnableCommand = iota
+	rcRunnableCommandBecomeDone
+	rcRunnableCommandDie
+	rcRunnableCommandPanic
+	rcRunnableCommandState
+)
+
+type rcRunnableState int
+
+const (
+	rcRunnableStateNew rcRunnableState = iota
+	rcRunnableStateHealthy
+	rcRunnableStateDone
+)
+
+func (r *rc) becomeHealthy() {
+	r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeHealthy}
+}
+
+func (r *rc) becomeDone() {
+	r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeDone}
+}
+func (r *rc) die() {
+	r.req <- rcRunnableRequest{cmd: rcRunnableCommandDie}
+}
+
+func (r *rc) panic() {
+	r.req <- rcRunnableRequest{cmd: rcRunnableCommandPanic}
+}
+
+func (r *rc) state() rcRunnableState {
+	c := make(chan rcRunnableState)
+	r.req <- rcRunnableRequest{
+		cmd:    rcRunnableCommandState,
+		stateC: c,
+	}
+	return <-c
+}
+
+func (r *rc) waitState(s rcRunnableState) {
+	// This is poll based. Making it non-poll based would make the RC runnable logic a bit more complex for little gain.
+	for {
+		got := r.state()
+		if got == s {
+			return
+		}
+		time.Sleep(10 * time.Millisecond)
+	}
+}
+
+func newRC() *rc {
+	return &rc{
+		req: make(chan rcRunnableRequest),
+	}
+}
+
+// Remote Controlled Runnable
+func (r *rc) runnable() Runnable {
+	return func(ctx context.Context) error {
+		state := rcRunnableStateNew
+
+		for {
+			select {
+			case <-ctx.Done():
+				return ctx.Err()
+			case r := <-r.req:
+				switch r.cmd {
+				case rcRunnableCommandBecomeHealthy:
+					Signal(ctx, SignalHealthy)
+					state = rcRunnableStateHealthy
+				case rcRunnableCommandBecomeDone:
+					Signal(ctx, SignalDone)
+					state = rcRunnableStateDone
+				case rcRunnableCommandDie:
+					return fmt.Errorf("died on request")
+				case rcRunnableCommandPanic:
+					panic("at the disco")
+				case rcRunnableCommandState:
+					r.stateC <- state
+				}
+			}
+		}
+	}
+}
+
+func TestSimple(t *testing.T) {
+	h1 := make(chan struct{})
+	d1 := make(chan struct{})
+	h2 := make(chan struct{})
+	d2 := make(chan struct{})
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+	s := New(ctx, func(ctx context.Context) error {
+		err := RunGroup(ctx, map[string]Runnable{
+			"one": runnableBecomesHealthy(h1, d1),
+			"two": runnableBecomesHealthy(h2, d2),
+		})
+		if err != nil {
+			return err
+		}
+		Signal(ctx, SignalHealthy)
+		Signal(ctx, SignalDone)
+		return nil
+	}, WithPropagatePanic)
+
+	// Expect both to start running.
+	s.waitSettleError(ctx, t)
+	select {
+	case <-h1:
+	default:
+		t.Fatalf("runnable 'one' didn't start")
+	}
+	select {
+	case <-h2:
+	default:
+		t.Fatalf("runnable 'one' didn't start")
+	}
+}
+
+func TestSimpleFailure(t *testing.T) {
+	h1 := make(chan struct{})
+	d1 := make(chan struct{})
+	two := newRC()
+
+	ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
+	defer ctxC()
+	s := New(ctx, func(ctx context.Context) error {
+		err := RunGroup(ctx, map[string]Runnable{
+			"one": runnableBecomesHealthy(h1, d1),
+			"two": two.runnable(),
+		})
+		if err != nil {
+			return err
+		}
+		Signal(ctx, SignalHealthy)
+		Signal(ctx, SignalDone)
+		return nil
+	}, WithPropagatePanic)
+	s.waitSettleError(ctx, t)
+
+	two.becomeHealthy()
+	s.waitSettleError(ctx, t)
+	// Expect one to start running.
+	select {
+	case <-h1:
+	default:
+		t.Fatalf("runnable 'one' didn't start")
+	}
+
+	// Kill off two, one should restart.
+	two.die()
+	s.waitSettleError(ctx, t)
+	select {
+	case <-d1:
+	default:
+		t.Fatalf("runnable 'one' didn't acknowledge cancel")
+	}
+
+	// And one should start running again.
+	s.waitSettleError(ctx, t)
+	select {
+	case <-h1:
+	default:
+		t.Fatalf("runnable 'one' didn't restart")
+	}
+}
+
+func TestDeepFailure(t *testing.T) {
+	h1 := make(chan struct{})
+	d1 := make(chan struct{})
+	two := newRC()
+
+	ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
+	defer ctxC()
+	s := New(ctx, func(ctx context.Context) error {
+		err := RunGroup(ctx, map[string]Runnable{
+			"one": runnableSpawnsMore(h1, d1, 5),
+			"two": two.runnable(),
+		})
+		if err != nil {
+			return err
+		}
+		Signal(ctx, SignalHealthy)
+		Signal(ctx, SignalDone)
+		return nil
+	}, WithPropagatePanic)
+
+	two.becomeHealthy()
+	s.waitSettleError(ctx, t)
+	// Expect one to start running.
+	select {
+	case <-h1:
+	default:
+		t.Fatalf("runnable 'one' didn't start")
+	}
+
+	// Kill off two, one should restart.
+	two.die()
+	s.waitSettleError(ctx, t)
+	select {
+	case <-d1:
+	default:
+		t.Fatalf("runnable 'one' didn't acknowledge cancel")
+	}
+
+	// And one should start running again.
+	s.waitSettleError(ctx, t)
+	select {
+	case <-h1:
+	default:
+		t.Fatalf("runnable 'one' didn't restart")
+	}
+}
+
+func TestPanic(t *testing.T) {
+	h1 := make(chan struct{})
+	d1 := make(chan struct{})
+	two := newRC()
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+	s := New(ctx, func(ctx context.Context) error {
+		err := RunGroup(ctx, map[string]Runnable{
+			"one": runnableBecomesHealthy(h1, d1),
+			"two": two.runnable(),
+		})
+		if err != nil {
+			return err
+		}
+		Signal(ctx, SignalHealthy)
+		Signal(ctx, SignalDone)
+		return nil
+	})
+
+	two.becomeHealthy()
+	s.waitSettleError(ctx, t)
+	// Expect one to start running.
+	select {
+	case <-h1:
+	default:
+		t.Fatalf("runnable 'one' didn't start")
+	}
+
+	// Kill off two, one should restart.
+	two.panic()
+	s.waitSettleError(ctx, t)
+	select {
+	case <-d1:
+	default:
+		t.Fatalf("runnable 'one' didn't acknowledge cancel")
+	}
+
+	// And one should start running again.
+	s.waitSettleError(ctx, t)
+	select {
+	case <-h1:
+	default:
+		t.Fatalf("runnable 'one' didn't restart")
+	}
+}
+
+func TestMultipleLevelFailure(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+	New(ctx, func(ctx context.Context) error {
+		err := RunGroup(ctx, map[string]Runnable{
+			"one": runnableSpawnsMore(nil, nil, 4),
+			"two": runnableSpawnsMore(nil, nil, 4),
+		})
+		if err != nil {
+			return err
+		}
+		Signal(ctx, SignalHealthy)
+		Signal(ctx, SignalDone)
+		return nil
+	}, WithPropagatePanic)
+}
+
+func TestBackoff(t *testing.T) {
+	one := newRC()
+
+	ctx, ctxC := context.WithTimeout(context.Background(), 20*time.Second)
+	defer ctxC()
+
+	s := New(ctx, func(ctx context.Context) error {
+		if err := Run(ctx, "one", one.runnable()); err != nil {
+			return err
+		}
+		Signal(ctx, SignalHealthy)
+		Signal(ctx, SignalDone)
+		return nil
+	}, WithPropagatePanic)
+
+	one.becomeHealthy()
+	// Die a bunch of times in a row, this brings up the next exponential backoff to over a second.
+	for i := 0; i < 4; i += 1 {
+		one.die()
+		one.waitState(rcRunnableStateNew)
+	}
+	// Measure how long it takes for the runnable to respawn after a number of failures
+	start := time.Now()
+	one.die()
+	one.becomeHealthy()
+	one.waitState(rcRunnableStateHealthy)
+	taken := time.Since(start)
+	if taken < 1*time.Second {
+		t.Errorf("Runnable took %v to restart, wanted at least a second from backoff", taken)
+	}
+
+	s.waitSettleError(ctx, t)
+	// Now that we've become healthy, die again. Becoming healthy resets the backoff.
+	start = time.Now()
+	one.die()
+	one.becomeHealthy()
+	one.waitState(rcRunnableStateHealthy)
+	taken = time.Since(start)
+	if taken > 1*time.Second || taken < 100*time.Millisecond {
+		t.Errorf("Runnable took %v to restart, wanted at least 100ms from backoff and at most 1s from backoff reset", taken)
+	}
+}
+
+// TestResilience throws some curveballs at the supervisor - either programming errors or high load. It then ensures
+// that another runnable is running, and that it restarts on its sibling failure.
+func TestResilience(t *testing.T) {
+	// request/response channel for testing liveness of the 'one' runnable
+	req := make(chan chan struct{})
+
+	// A runnable that responds on the 'req' channel.
+	one := func(ctx context.Context) error {
+		Signal(ctx, SignalHealthy)
+		for {
+			select {
+			case <-ctx.Done():
+				return ctx.Err()
+			case r := <-req:
+				r <- struct{}{}
+			}
+		}
+	}
+	oneSibling := newRC()
+
+	oneTest := func() {
+		timeout := time.NewTicker(1000 * time.Millisecond)
+		ping := make(chan struct{})
+		req <- ping
+		select {
+		case <-ping:
+		case <-timeout.C:
+			t.Fatalf("one ping response timeout")
+		}
+		timeout.Stop()
+	}
+
+	// A nasty runnable that calls Signal with the wrong context (this is a programming error)
+	two := func(ctx context.Context) error {
+		Signal(context.TODO(), SignalHealthy)
+		return nil
+	}
+
+	// A nasty runnable that calls Signal wrong (this is a programming error).
+	three := func(ctx context.Context) error {
+		Signal(ctx, SignalDone)
+		return nil
+	}
+
+	// A nasty runnable that runs in a busy loop (this is a programming error).
+	four := func(ctx context.Context) error {
+		for {
+			time.Sleep(0)
+		}
+	}
+
+	// A nasty runnable that keeps creating more runnables.
+	five := func(ctx context.Context) error {
+		i := 1
+		for {
+			err := Run(ctx, fmt.Sprintf("r%d", i), runnableSpawnsMore(nil, nil, 2))
+			if err != nil {
+				return err
+			}
+
+			time.Sleep(100 * time.Millisecond)
+			i += 1
+		}
+	}
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+	New(ctx, func(ctx context.Context) error {
+		RunGroup(ctx, map[string]Runnable{
+			"one":        one,
+			"oneSibling": oneSibling.runnable(),
+		})
+		rs := map[string]Runnable{
+			"two": two, "three": three, "four": four, "five": five,
+		}
+		for k, v := range rs {
+			if err := Run(ctx, k, v); err != nil {
+				return err
+			}
+		}
+		Signal(ctx, SignalHealthy)
+		Signal(ctx, SignalDone)
+		return nil
+	})
+
+	// Five rounds of letting one run, then restarting it.
+	for i := 0; i < 5; i += 1 {
+		oneSibling.becomeHealthy()
+		oneSibling.waitState(rcRunnableStateHealthy)
+
+		// 'one' should work for at least a second.
+		deadline := time.Now().Add(1 * time.Second)
+		for {
+			if time.Now().After(deadline) {
+				break
+			}
+
+			oneTest()
+		}
+
+		// Killing 'oneSibling' should restart one.
+		oneSibling.panic()
+	}
+	// Make sure 'one' is still okay.
+	oneTest()
+}
+
+func ExampleNew() {
+	// Minimal runnable that is immediately done.
+	childC := make(chan struct{})
+	child := func(ctx context.Context) error {
+		Signal(ctx, SignalHealthy)
+		close(childC)
+		Signal(ctx, SignalDone)
+		return nil
+	}
+
+	// Start a supervision tree with a root runnable.
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+	New(ctx, func(ctx context.Context) error {
+		err := Run(ctx, "child", child)
+		if err != nil {
+			return fmt.Errorf("could not run 'child': %w", err)
+		}
+		Signal(ctx, SignalHealthy)
+
+		t := time.NewTicker(time.Second)
+		defer t.Stop()
+
+		// Do something in the background, and exit on context cancel.
+		for {
+			select {
+			case <-t.C:
+				fmt.Printf("tick!")
+			case <-ctx.Done():
+				return ctx.Err()
+			}
+		}
+	})
+
+	// root.child will close this channel.
+	<-childC
+}
diff --git a/metropolis/pkg/supervisor/supervisor_testhelpers.go b/metropolis/pkg/supervisor/supervisor_testhelpers.go
new file mode 100644
index 0000000..771e02f
--- /dev/null
+++ b/metropolis/pkg/supervisor/supervisor_testhelpers.go
@@ -0,0 +1,50 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+import (
+	"context"
+	"testing"
+)
+
+// waitSettle waits until the supervisor reaches a 'settled' state - ie., one
+// where no actions have been performed for a number of GC cycles.
+// This is used in tests only.
+func (s *supervisor) waitSettle(ctx context.Context) error {
+	waiter := make(chan struct{})
+	s.pReq <- &processorRequest{
+		waitSettled: &processorRequestWaitSettled{
+			waiter: waiter,
+		},
+	}
+
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	case <-waiter:
+		return nil
+	}
+}
+
+// waitSettleError wraps waitSettle to fail a test if an error occurs, eg. the
+// context is canceled.
+func (s *supervisor) waitSettleError(ctx context.Context, t *testing.T) {
+	err := s.waitSettle(ctx)
+	if err != nil {
+		t.Fatalf("waitSettle: %v", err)
+	}
+}