m/p/supervisor: wait for runnables to exit in TestHarness

This ensures that tests which aren't marked as parallel won't interfere
with eachother due to still running runnables (for example, gracefully
terminating gRPC services listening on some stable port number).

To implement this, we add the Liquidator, a goroutine responsible for
maintaining a minimum viable supervisor processor which records all
runnables' exits. These can then be inspected by the TestHarness to
ensure that all runnables are truly dead.

Change-Id: I436f9608d1e0e04796f7198b641e7d625df885f8
Reviewed-on: https://review.monogon.dev/c/monogon/+/625
Reviewed-by: Leopold Schabel <leo@nexantic.com>
diff --git a/metropolis/pkg/supervisor/supervisor_processor.go b/metropolis/pkg/supervisor/supervisor_processor.go
index b205a0b..ea8a4d0 100644
--- a/metropolis/pkg/supervisor/supervisor_processor.go
+++ b/metropolis/pkg/supervisor/supervisor_processor.go
@@ -21,6 +21,7 @@
 	"errors"
 	"fmt"
 	"runtime/debug"
+	"sort"
 	"time"
 )
 
@@ -81,7 +82,8 @@
 		case <-ctx.Done():
 			s.ilogger.Infof("supervisor processor exiting: %v", ctx.Err())
 			s.processKill()
-			s.ilogger.Info("supervisor exited")
+			s.ilogger.Info("supervisor exited, starting liquidator to clean up remaining runnables...")
+			go s.liquidator()
 			return
 		case <-gc.C:
 			if !clean {
@@ -115,6 +117,84 @@
 	}
 }
 
+// The liquidator is a context-free goroutine which the supervisor starts after
+// its context has been canceled. Its job is to take over listening on the
+// processing channels that the supervisor processor would usually listen on,
+// and implement the minimum amount of logic required to mark existing runnables
+// as DEAD.
+//
+// It exits when all runnables have exited one way or another, and the
+// supervision tree is well and truly dead. This will also be reflected by
+// liveRunnables returning an empty list.
+func (s *supervisor) liquidator() {
+	for {
+		select {
+		case r := <-s.pReq:
+			switch {
+			case r.schedule != nil:
+				s.ilogger.Infof("liquidator: refusing to schedule %s", r.schedule.dn)
+				s.mu.Lock()
+				n := s.nodeByDN(r.schedule.dn)
+				n.state = nodeStateDead
+				s.mu.Unlock()
+			case r.died != nil:
+				s.ilogger.Infof("liquidator: %s exited", r.died.dn)
+				s.mu.Lock()
+				n := s.nodeByDN(r.died.dn)
+				n.state = nodeStateDead
+				s.mu.Unlock()
+			}
+		}
+		live := s.liveRunnables()
+		if len(live) == 0 {
+			s.ilogger.Infof("liquidator: complete, all runnables dead or done")
+			return
+		}
+	}
+}
+
+// liveRunnables returns a list of runnable DNs that aren't DONE/DEAD. This is
+// used by the liquidator to figure out when its job is done, and by the
+// TestHarness to know when to unblock the test cleanup function.
+func (s *supervisor) liveRunnables() []string {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+
+	// DFS through supervision tree, making not of live (non-DONE/DEAD runnables).
+	var live []string
+	seen := make(map[string]bool)
+	q := []*node{s.root}
+	for {
+		if len(q) == 0 {
+			break
+		}
+
+		// Pop from DFS queue.
+		el := q[0]
+		q = q[1:]
+
+		// Skip already visited runnables (this shouldn't happen because the supervision
+		// tree is, well, a tree - but better stay safe than get stuck in a loop).
+		eldn := el.dn()
+		if seen[eldn] {
+			continue
+		}
+		seen[eldn] = true
+
+		if el.state != nodeStateDead && el.state != nodeStateDone {
+			live = append(live, eldn)
+		}
+
+		// Recurse.
+		for _, child := range el.children {
+			q = append(q, child)
+		}
+	}
+
+	sort.Strings(live)
+	return live
+}
+
 // processKill cancels all nodes in the supervision tree. This is only called
 // right before exiting the processor, so they do not get automatically
 // restarted.
diff --git a/metropolis/pkg/supervisor/supervisor_testhelpers.go b/metropolis/pkg/supervisor/supervisor_testhelpers.go
index 85361c2..1bfb6c0 100644
--- a/metropolis/pkg/supervisor/supervisor_testhelpers.go
+++ b/metropolis/pkg/supervisor/supervisor_testhelpers.go
@@ -19,7 +19,10 @@
 import (
 	"context"
 	"errors"
+	"log"
+	"sort"
 	"testing"
+	"time"
 
 	"source.monogon.dev/metropolis/pkg/logtree"
 )
@@ -32,9 +35,10 @@
 // error, the harness will throw a test error, but will not abort the test.
 //
 // The harness also returns a context cancel function that can be used to
-// terminate the started supervisor early.  Regardless of manual cancellation,
+// terminate the started supervisor early. Regardless of manual cancellation,
 // the supervisor will always be terminated up at the end of the test/benchmark
-// it's running in.
+// it's running in. The supervision tree will also be cleaned up and the test
+// will block until all runnables have exited.
 //
 // The second returned value is the logtree used by this supervisor. It can be
 // used to assert some log messages are emitted in tests that exercise some
@@ -43,12 +47,11 @@
 	t.Helper()
 
 	ctx, ctxC := context.WithCancel(context.Background())
-	t.Cleanup(ctxC)
 
 	lt := logtree.New()
 	logtree.PipeAllToStderr(t, lt)
 
-	New(ctx, func(ctx context.Context) error {
+	sup := New(ctx, func(ctx context.Context) error {
 		Logger(ctx).Infof("Starting test %s...", t.Name())
 		if err := r(ctx); err != nil && !errors.Is(err, ctx.Err()) {
 			t.Errorf("Supervised runnable in harness returned error: %v", err)
@@ -56,5 +59,31 @@
 		}
 		return nil
 	}, WithExistingLogtree(lt))
+
+	t.Cleanup(func() {
+		log.Printf("supervisor.TestHarness: Canceling context...")
+		ctxC()
+		log.Printf("supervisor.TestHarness: Waiting for supervisor runnables to die...")
+		timeoutNag := time.Now().Add(5 * time.Second)
+
+		for {
+			live := sup.liveRunnables()
+			if len(live) == 0 {
+				log.Printf("supervisor.TestHarness: All done.")
+				return
+			}
+
+			if time.Now().After(timeoutNag) {
+				timeoutNag = time.Now().Add(5 * time.Second)
+				sort.Strings(live)
+				log.Printf("supervisor.TestHarness: Still live:")
+				for _, l := range live {
+					log.Printf("supervisor.TestHarness: - %s", l)
+				}
+			}
+
+			time.Sleep(time.Second)
+		}
+	})
 	return ctxC, lt
 }