osbase/supervisor: implement Metrics API
This is a base building block for exporting per-DN/runnable status from
the supervisor into an external system. A sample implementation is
provided which can be used in simple debug facilities to inspect the
current supervision tree.
A follow-up change will use the same API to implement Prometheus
metrics.
Change-Id: I0d586b03a397a3ccf8dac2d8043b9dd2f319be4e
Reviewed-on: https://review.monogon.dev/c/monogon/+/3290
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/osbase/supervisor/BUILD.bazel b/osbase/supervisor/BUILD.bazel
index 252b3b7..1997f45 100644
--- a/osbase/supervisor/BUILD.bazel
+++ b/osbase/supervisor/BUILD.bazel
@@ -4,6 +4,7 @@
name = "supervisor",
srcs = [
"supervisor.go",
+ "supervisor_metrics.go",
"supervisor_node.go",
"supervisor_processor.go",
"supervisor_support.go",
diff --git a/osbase/supervisor/supervisor.go b/osbase/supervisor/supervisor.go
index c7b0c0f..ff87a25 100644
--- a/osbase/supervisor/supervisor.go
+++ b/osbase/supervisor/supervisor.go
@@ -102,6 +102,8 @@
// propagate panics, ie. don't catch them.
propagatePanic bool
+
+ metrics *metricsFanout
}
// SupervisorOpt are runtime configurable options for the supervisor.
@@ -120,6 +122,15 @@
}
}
+// WithMetrics makes the Supervisor export per-DN metrics into a given Metrics
+// implementation. This can be called repeatedly to export the same data into
+// multiple Metrics implementations.
+func WithMetrics(m Metrics) SupervisorOpt {
+ return func(s *supervisor) {
+ s.metrics.sub = append(s.metrics.sub, m)
+ }
+}
+
// New creates a new supervisor with its root running the given root runnable.
// The given context can be used to cancel the entire supervision tree.
//
@@ -130,6 +141,7 @@
sup := &supervisor{
logtree: logtree.New(),
pReq: make(chan *processorRequest),
+ metrics: &metricsFanout{},
}
for _, o := range opts {
diff --git a/osbase/supervisor/supervisor_metrics.go b/osbase/supervisor/supervisor_metrics.go
new file mode 100644
index 0000000..d83b7a7
--- /dev/null
+++ b/osbase/supervisor/supervisor_metrics.go
@@ -0,0 +1,70 @@
+package supervisor
+
+import (
+ "sync"
+ "time"
+)
+
+// Metrics is an interface from the supervisor to any kind of metrics-collecting
+// component.
+type Metrics interface {
+ // NotifyNodeState is called whenever a given runnable at a given DN changes
+ // state. Called synchronously from the supervisor's processor loop, so must not
+ // block, but is also guaranteed to only be called from a single goroutine.
+ NotifyNodeState(dn string, state NodeState)
+}
+
+// metricsFanout is used internally to fan out a single Metrics interface (which
+// it implements) onto multiple subordinate Metrics interfaces (as provided by
+// the user via WithMetrics).
+type metricsFanout struct {
+ sub []Metrics
+}
+
+func (m *metricsFanout) NotifyNodeState(dn string, state NodeState) {
+ for _, sub := range m.sub {
+ sub.NotifyNodeState(dn, state)
+ }
+}
+
+// InMemoryMetrics is a simple Metrics implementation that keeps an in-memory
+// mirror of the state of all DNs in the supervisor. The zero value for
+// InMemoryMetrics is ready to use.
+type InMemoryMetrics struct {
+ mu sync.RWMutex
+ dns map[string]DNState
+}
+
+// DNState is the state of a supervisor runnable, recorded alongside a timestamp
+// of when the State changed.
+type DNState struct {
+ // State is the current state of the runnable.
+ State NodeState
+ // Transition is the time at which the runnable reached its State.
+ Transition time.Time
+}
+
+func (m *InMemoryMetrics) NotifyNodeState(dn string, state NodeState) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if m.dns == nil {
+ m.dns = make(map[string]DNState)
+ }
+ m.dns[dn] = DNState{
+ State: state,
+ Transition: time.Now(),
+ }
+}
+
+// DNs returns a copy (snapshot in time) of the recorded DN states, in a map from
+// DN to DNState. The returned value can be mutated.
+func (m *InMemoryMetrics) DNs() map[string]DNState {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ res := make(map[string]DNState)
+ for k, v := range m.dns {
+ res[k] = v
+ }
+ return res
+}
diff --git a/osbase/supervisor/supervisor_node.go b/osbase/supervisor/supervisor_node.go
index 8b31ad4..44e8c84 100644
--- a/osbase/supervisor/supervisor_node.go
+++ b/osbase/supervisor/supervisor_node.go
@@ -292,12 +292,14 @@
panic(fmt.Errorf("node %s signaled healthy", n))
}
n.state = NodeStateHealthy
+ n.sup.metrics.NotifyNodeState(n.dn(), n.state)
n.bo.Reset()
case SignalDone:
if n.state != NodeStateHealthy {
panic(fmt.Errorf("node %s signaled done", n))
}
n.state = NodeStateDone
+ n.sup.metrics.NotifyNodeState(n.dn(), n.state)
n.bo.Reset()
}
}
diff --git a/osbase/supervisor/supervisor_processor.go b/osbase/supervisor/supervisor_processor.go
index 2a01cf7..6304b09 100644
--- a/osbase/supervisor/supervisor_processor.go
+++ b/osbase/supervisor/supervisor_processor.go
@@ -230,6 +230,10 @@
defer s.mu.Unlock()
n := s.nodeByDN(r.dn)
+ if n.state != NodeStateNew {
+ panic("programming error: scheduled node not new")
+ }
+ s.metrics.NotifyNodeState(r.dn, n.state)
go func() {
if !s.propagatePanic {
defer func() {
@@ -268,6 +272,7 @@
// Simple case: it was marked as Done and quit with no error.
if n.state == NodeStateDone && r.err == nil {
+ s.metrics.NotifyNodeState(r.dn, n.state)
// Do nothing. This was supposed to happen. Keep the process as DONE.
return
}
@@ -277,6 +282,7 @@
if r.err != nil && ctx.Err() != nil && errors.Is(r.err, ctx.Err()) {
// Mark the node as canceled successfully.
n.state = NodeStateCanceled
+ s.metrics.NotifyNodeState(r.dn, n.state)
return
}
@@ -291,6 +297,7 @@
s.ilogger.Errorf("%s: %v", n.dn(), err)
// Mark as dead.
n.state = NodeStateDead
+ s.metrics.NotifyNodeState(r.dn, n.state)
// Cancel that node's context, just in case something still depends on it.
n.ctxC()
diff --git a/osbase/supervisor/supervisor_test.go b/osbase/supervisor/supervisor_test.go
index f812531..feb5510 100644
--- a/osbase/supervisor/supervisor_test.go
+++ b/osbase/supervisor/supervisor_test.go
@@ -616,6 +616,108 @@
}
}
+func TestMetrics(t *testing.T) {
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ // Build a supervision tree with 'wait'/step channels per runnable:
+ //
+ // root: wait, start one, wait, healthy
+ // one: wait, start two, crash, wait, start two, healthy, wait, done
+ // two: wait, healthy, run forever
+ //
+ // This tree allows us to exercise a few flows, like two getting canceled when
+ // one crashes, runnables returning done, runnables staying healthy, etc.
+
+ stepRoot := make(chan struct{})
+ stepOne := make(chan struct{})
+ stepTwo := make(chan struct{})
+ m := InMemoryMetrics{}
+
+ New(ctx, func(ctx context.Context) error {
+ <-stepRoot
+
+ attempts := 0
+ Run(ctx, "one", func(ctx context.Context) error {
+ <-stepOne
+ Run(ctx, "two", func(ctx context.Context) error {
+ <-stepTwo
+ Signal(ctx, SignalHealthy)
+ <-ctx.Done()
+ return ctx.Err()
+ })
+ if attempts == 0 {
+ attempts += 1
+ return fmt.Errorf("failed")
+ }
+ Signal(ctx, SignalHealthy)
+ <-stepOne
+ Signal(ctx, SignalDone)
+ return nil
+ })
+
+ <-stepRoot
+ Signal(ctx, SignalHealthy)
+ return nil
+ }, WithPropagatePanic, WithMetrics(&m))
+
+ // expectDN waits a second until a given DN is at a given state and fails the
+ // test otherwise.
+ expectDN := func(dn string, state NodeState) {
+ t.Helper()
+ start := time.Now()
+ for {
+ snap := m.DNs()
+ if _, ok := snap[dn]; !ok {
+ if time.Since(start) > time.Second {
+ t.Fatalf("No DN %q", dn)
+ } else {
+ time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ }
+ if want, got := state, snap[dn].State; want != got {
+ if time.Since(start) > time.Second {
+ t.Fatalf("Expected %q to be %s, got %s", dn, want, got)
+ } else {
+ time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ }
+ break
+ }
+ }
+
+ // Make progress thorugh the runnable tree and check expected states.
+
+ expectDN("root", NodeStateNew)
+
+ stepRoot <- struct{}{}
+ expectDN("root", NodeStateNew)
+ expectDN("root.one", NodeStateNew)
+
+ stepOne <- struct{}{}
+ stepTwo <- struct{}{}
+ expectDN("root", NodeStateNew)
+ expectDN("root.one", NodeStateDead)
+ expectDN("root.one.two", NodeStateCanceled)
+
+ stepOne <- struct{}{}
+ expectDN("root", NodeStateNew)
+ expectDN("root.one", NodeStateHealthy)
+ expectDN("root.one.two", NodeStateNew)
+
+ stepOne <- struct{}{}
+ expectDN("root", NodeStateNew)
+ expectDN("root.one", NodeStateDone)
+ expectDN("root.one.two", NodeStateNew)
+
+ stepTwo <- struct{}{}
+ expectDN("root", NodeStateNew)
+ expectDN("root.one", NodeStateDone)
+ expectDN("root.one.two", NodeStateHealthy)
+}
+
func ExampleNew() {
// Minimal runnable that is immediately done.
childC := make(chan struct{})