osbase/supervisor: implement Metrics API

This is a base building block for exporting per-DN/runnable status from
the supervisor into an external system. A sample implementation is
provided which can be used in simple debug facilities to inspect the
current supervision tree.

A follow-up change will use the same API to implement Prometheus
metrics.

Change-Id: I0d586b03a397a3ccf8dac2d8043b9dd2f319be4e
Reviewed-on: https://review.monogon.dev/c/monogon/+/3290
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/osbase/supervisor/BUILD.bazel b/osbase/supervisor/BUILD.bazel
index 252b3b7..1997f45 100644
--- a/osbase/supervisor/BUILD.bazel
+++ b/osbase/supervisor/BUILD.bazel
@@ -4,6 +4,7 @@
     name = "supervisor",
     srcs = [
         "supervisor.go",
+        "supervisor_metrics.go",
         "supervisor_node.go",
         "supervisor_processor.go",
         "supervisor_support.go",
diff --git a/osbase/supervisor/supervisor.go b/osbase/supervisor/supervisor.go
index c7b0c0f..ff87a25 100644
--- a/osbase/supervisor/supervisor.go
+++ b/osbase/supervisor/supervisor.go
@@ -102,6 +102,8 @@
 
 	// propagate panics, ie. don't catch them.
 	propagatePanic bool
+
+	metrics *metricsFanout
 }
 
 // SupervisorOpt are runtime configurable options for the supervisor.
@@ -120,6 +122,15 @@
 	}
 }
 
+// WithMetrics makes the Supervisor export per-DN metrics into a given Metrics
+// implementation. This can be called repeatedly to export the same data into
+// multiple Metrics implementations.
+func WithMetrics(m Metrics) SupervisorOpt {
+	return func(s *supervisor) {
+		s.metrics.sub = append(s.metrics.sub, m)
+	}
+}
+
 // New creates a new supervisor with its root running the given root runnable.
 // The given context can be used to cancel the entire supervision tree.
 //
@@ -130,6 +141,7 @@
 	sup := &supervisor{
 		logtree: logtree.New(),
 		pReq:    make(chan *processorRequest),
+		metrics: &metricsFanout{},
 	}
 
 	for _, o := range opts {
diff --git a/osbase/supervisor/supervisor_metrics.go b/osbase/supervisor/supervisor_metrics.go
new file mode 100644
index 0000000..d83b7a7
--- /dev/null
+++ b/osbase/supervisor/supervisor_metrics.go
@@ -0,0 +1,70 @@
+package supervisor
+
+import (
+	"sync"
+	"time"
+)
+
+// Metrics is an interface from the supervisor to any kind of metrics-collecting
+// component.
+type Metrics interface {
+	// NotifyNodeState is called whenever a given runnable at a given DN changes
+	// state. Called synchronously from the supervisor's processor loop, so must not
+	// block, but is also guaranteed to only be called from a single goroutine.
+	NotifyNodeState(dn string, state NodeState)
+}
+
+// metricsFanout is used internally to fan out a single Metrics interface (which
+// it implements) onto multiple subordinate Metrics interfaces (as provided by
+// the user via WithMetrics).
+type metricsFanout struct {
+	sub []Metrics
+}
+
+func (m *metricsFanout) NotifyNodeState(dn string, state NodeState) {
+	for _, sub := range m.sub {
+		sub.NotifyNodeState(dn, state)
+	}
+}
+
+// InMemoryMetrics is a simple Metrics implementation that keeps an in-memory
+// mirror of the state of all DNs in the supervisor. The zero value for
+// InMemoryMetrics is ready to use.
+type InMemoryMetrics struct {
+	mu  sync.RWMutex
+	dns map[string]DNState
+}
+
+// DNState is the state of a supervisor runnable, recorded alongside a timestamp
+// of when the State changed.
+type DNState struct {
+	// State is the current state of the runnable.
+	State NodeState
+	// Transition is the time at which the runnable reached its State.
+	Transition time.Time
+}
+
+func (m *InMemoryMetrics) NotifyNodeState(dn string, state NodeState) {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	if m.dns == nil {
+		m.dns = make(map[string]DNState)
+	}
+	m.dns[dn] = DNState{
+		State:      state,
+		Transition: time.Now(),
+	}
+}
+
+// DNs returns a copy (snapshot in time) of the recorded DN states, in a map from
+// DN to DNState. The returned value can be mutated.
+func (m *InMemoryMetrics) DNs() map[string]DNState {
+	m.mu.RLock()
+	defer m.mu.RUnlock()
+
+	res := make(map[string]DNState)
+	for k, v := range m.dns {
+		res[k] = v
+	}
+	return res
+}
diff --git a/osbase/supervisor/supervisor_node.go b/osbase/supervisor/supervisor_node.go
index 8b31ad4..44e8c84 100644
--- a/osbase/supervisor/supervisor_node.go
+++ b/osbase/supervisor/supervisor_node.go
@@ -292,12 +292,14 @@
 			panic(fmt.Errorf("node %s signaled healthy", n))
 		}
 		n.state = NodeStateHealthy
+		n.sup.metrics.NotifyNodeState(n.dn(), n.state)
 		n.bo.Reset()
 	case SignalDone:
 		if n.state != NodeStateHealthy {
 			panic(fmt.Errorf("node %s signaled done", n))
 		}
 		n.state = NodeStateDone
+		n.sup.metrics.NotifyNodeState(n.dn(), n.state)
 		n.bo.Reset()
 	}
 }
diff --git a/osbase/supervisor/supervisor_processor.go b/osbase/supervisor/supervisor_processor.go
index 2a01cf7..6304b09 100644
--- a/osbase/supervisor/supervisor_processor.go
+++ b/osbase/supervisor/supervisor_processor.go
@@ -230,6 +230,10 @@
 	defer s.mu.Unlock()
 
 	n := s.nodeByDN(r.dn)
+	if n.state != NodeStateNew {
+		panic("programming error: scheduled node not new")
+	}
+	s.metrics.NotifyNodeState(r.dn, n.state)
 	go func() {
 		if !s.propagatePanic {
 			defer func() {
@@ -268,6 +272,7 @@
 
 	// Simple case: it was marked as Done and quit with no error.
 	if n.state == NodeStateDone && r.err == nil {
+		s.metrics.NotifyNodeState(r.dn, n.state)
 		// Do nothing. This was supposed to happen. Keep the process as DONE.
 		return
 	}
@@ -277,6 +282,7 @@
 	if r.err != nil && ctx.Err() != nil && errors.Is(r.err, ctx.Err()) {
 		// Mark the node as canceled successfully.
 		n.state = NodeStateCanceled
+		s.metrics.NotifyNodeState(r.dn, n.state)
 		return
 	}
 
@@ -291,6 +297,7 @@
 	s.ilogger.Errorf("%s: %v", n.dn(), err)
 	// Mark as dead.
 	n.state = NodeStateDead
+	s.metrics.NotifyNodeState(r.dn, n.state)
 
 	// Cancel that node's context, just in case something still depends on it.
 	n.ctxC()
diff --git a/osbase/supervisor/supervisor_test.go b/osbase/supervisor/supervisor_test.go
index f812531..feb5510 100644
--- a/osbase/supervisor/supervisor_test.go
+++ b/osbase/supervisor/supervisor_test.go
@@ -616,6 +616,108 @@
 	}
 }
 
+func TestMetrics(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	// Build a supervision tree with 'wait'/step channels per runnable:
+	//
+	// root: wait, start one, wait, healthy
+	//   one: wait, start two, crash, wait, start two, healthy, wait, done
+	//     two: wait, healthy, run forever
+	//
+	// This tree allows us to exercise a few flows, like two getting canceled when
+	// one crashes, runnables returning done, runnables staying healthy, etc.
+
+	stepRoot := make(chan struct{})
+	stepOne := make(chan struct{})
+	stepTwo := make(chan struct{})
+	m := InMemoryMetrics{}
+
+	New(ctx, func(ctx context.Context) error {
+		<-stepRoot
+
+		attempts := 0
+		Run(ctx, "one", func(ctx context.Context) error {
+			<-stepOne
+			Run(ctx, "two", func(ctx context.Context) error {
+				<-stepTwo
+				Signal(ctx, SignalHealthy)
+				<-ctx.Done()
+				return ctx.Err()
+			})
+			if attempts == 0 {
+				attempts += 1
+				return fmt.Errorf("failed")
+			}
+			Signal(ctx, SignalHealthy)
+			<-stepOne
+			Signal(ctx, SignalDone)
+			return nil
+		})
+
+		<-stepRoot
+		Signal(ctx, SignalHealthy)
+		return nil
+	}, WithPropagatePanic, WithMetrics(&m))
+
+	// expectDN waits a second until a given DN is at a given state and fails the
+	// test otherwise.
+	expectDN := func(dn string, state NodeState) {
+		t.Helper()
+		start := time.Now()
+		for {
+			snap := m.DNs()
+			if _, ok := snap[dn]; !ok {
+				if time.Since(start) > time.Second {
+					t.Fatalf("No DN %q", dn)
+				} else {
+					time.Sleep(100 * time.Millisecond)
+					continue
+				}
+			}
+			if want, got := state, snap[dn].State; want != got {
+				if time.Since(start) > time.Second {
+					t.Fatalf("Expected %q to be %s, got %s", dn, want, got)
+				} else {
+					time.Sleep(100 * time.Millisecond)
+					continue
+				}
+			}
+			break
+		}
+	}
+
+	// Make progress thorugh the runnable tree and check expected states.
+
+	expectDN("root", NodeStateNew)
+
+	stepRoot <- struct{}{}
+	expectDN("root", NodeStateNew)
+	expectDN("root.one", NodeStateNew)
+
+	stepOne <- struct{}{}
+	stepTwo <- struct{}{}
+	expectDN("root", NodeStateNew)
+	expectDN("root.one", NodeStateDead)
+	expectDN("root.one.two", NodeStateCanceled)
+
+	stepOne <- struct{}{}
+	expectDN("root", NodeStateNew)
+	expectDN("root.one", NodeStateHealthy)
+	expectDN("root.one.two", NodeStateNew)
+
+	stepOne <- struct{}{}
+	expectDN("root", NodeStateNew)
+	expectDN("root.one", NodeStateDone)
+	expectDN("root.one.two", NodeStateNew)
+
+	stepTwo <- struct{}{}
+	expectDN("root", NodeStateNew)
+	expectDN("root.one", NodeStateDone)
+	expectDN("root.one.two", NodeStateHealthy)
+}
+
 func ExampleNew() {
 	// Minimal runnable that is immediately done.
 	childC := make(chan struct{})