supervisor: init
This introduces the service supervisor (or supervisor, for short) - a
library used to reliably run parts of Smalltown.
The design is outlined in [[ https://phab.monogon.dev/u/supervision | go/supervision ]].
This only implements the supervision itself, and does not actually use
it in Smalltown. Another revision based on this one will aims to move at
least parts of the codebase onto this library.
Test Plan: the supervision code is integration tested
Bug: T653
X-Origin-Diff: phab/D429
GitOrigin-RevId: cffa73de5957e95af629b78379ffc0c7e8681afb
diff --git a/core/internal/common/supervisor/BUILD.bazel b/core/internal/common/supervisor/BUILD.bazel
new file mode 100644
index 0000000..c72ef04
--- /dev/null
+++ b/core/internal/common/supervisor/BUILD.bazel
@@ -0,0 +1,25 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "supervisor.go",
+ "supervisor_node.go",
+ "supervisor_processor.go",
+ "supervisor_support.go",
+ ],
+ importpath = "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor",
+ visibility = ["//core:__subpackages__"],
+ deps = [
+ "@com_github_cenkalti_backoff_v4//:go_default_library",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_uber_go_zap//:go_default_library",
+ ],
+)
+
+go_test(
+ name = "go_default_test",
+ srcs = ["supervisor_test.go"],
+ embed = [":go_default_library"],
+ deps = ["@org_uber_go_zap//:go_default_library"],
+)
diff --git a/core/internal/common/supervisor/supervisor.go b/core/internal/common/supervisor/supervisor.go
new file mode 100644
index 0000000..78fa5d2
--- /dev/null
+++ b/core/internal/common/supervisor/supervisor.go
@@ -0,0 +1,113 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+// The service supervision library allows for writing of reliable, service-style software within Smalltown.
+// It builds upon the Erlang/OTP supervision tree system, adapted to be more Go-ish.
+// For detailed design see go/supervision.
+
+import (
+ "context"
+ "sync"
+
+ "go.uber.org/zap"
+)
+
+// A Runnable is a function that will be run in a goroutine, and supervised throughout its lifetime. It can in turn
+// start more runnables as its children, and those will form part of a supervision tree.
+// The context passed to a runnable is very important and needs to be handled properly. It will be live (non-errored) as
+// long as the runnable should be running, and canceled (ctx.Err() will be non-nil) when the supervisor wants it to
+// exit. This means this context is also perfectly usable for performing any blocking operations.
+type Runnable func(ctx context.Context) error
+
+// RunGroup starts a set of runnables as a group. These runnables will run together, and if any one of them quits
+// unexpectedly, the result will be canceled and restarted.
+// The context here must be an existing Runnable context, and the spawned runnables will run under the node that this
+// context represents.
+func RunGroup(ctx context.Context, runnables map[string]Runnable) error {
+ node, unlock := fromContext(ctx)
+ defer unlock()
+ return node.runGroup(runnables)
+}
+
+// Run starts a single runnable in its own group.
+func Run(ctx context.Context, name string, runnable Runnable) error {
+ return RunGroup(ctx, map[string]Runnable{
+ name: runnable,
+ })
+}
+
+// Signal tells the supervisor that the calling runnable has reached a certain state of its lifecycle. All runnables
+// should SignalHealthy when they are ready with set up, running other child runnables and are now 'serving'.
+func Signal(ctx context.Context, signal SignalType) {
+ node, unlock := fromContext(ctx)
+ defer unlock()
+ node.signal(signal)
+}
+
+type SignalType int
+
+const (
+ // The runnable is healthy, done with setup, done with spawning more Runnables, and ready to serve in a loop.
+ // The runnable needs to check the parent context and ensure that if that context is done, the runnable exits.
+ SignalHealthy SignalType = iota
+ // The runnable is done - it does not need to run any loop. This is useful for Runnables that only set up other
+ // child runnables. This runnable will be restarted if a related failure happens somewhere in the supervision tree.
+ SignalDone
+)
+
+// Logger returns a Zap logger that will be named after the Distinguished Name of a the runnable (ie its place in the
+// supervision tree, dot-separated).
+func Logger(ctx context.Context) *zap.Logger {
+ node, unlock := fromContext(ctx)
+ defer unlock()
+ return node.getLogger()
+}
+
+// supervisor represents and instance of the supervision system. It keeps track of a supervision tree and a request
+// channel to its internal processor goroutine.
+type supervisor struct {
+ // mu guards the entire state of the supervisor.
+ mu sync.RWMutex
+ // root is the root node of the supervision tree, named 'root'. It represents the Runnable started with the
+ // supervisor.New call.
+ root *node
+ // logger is the Zap logger used to create loggers available to runnables.
+ logger *zap.Logger
+ // ilogger is the Zap logger used for internal logging by the supervisor.
+ ilogger *zap.Logger
+
+ // pReq is an interface channel to the lifecycle processor of the supervisor.
+ pReq chan *processorRequest
+}
+
+// New creates a new supervisor with its root running the given root runnable.
+// The given context can be used to cancel the entire supervision tree.
+func New(ctx context.Context, logger *zap.Logger, rootRunnable Runnable) {
+ sup := &supervisor{
+ logger: logger,
+ ilogger: logger.Named("supervisor"),
+ pReq: make(chan *processorRequest),
+ }
+ sup.root = newNode("root", rootRunnable, sup, nil)
+
+ go sup.processor(ctx)
+
+ sup.pReq <- &processorRequest{
+ schedule: &processorRequestSchedule{dn: "root"},
+ }
+}
diff --git a/core/internal/common/supervisor/supervisor_node.go b/core/internal/common/supervisor/supervisor_node.go
new file mode 100644
index 0000000..32f9720
--- /dev/null
+++ b/core/internal/common/supervisor/supervisor_node.go
@@ -0,0 +1,283 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+import (
+ "context"
+ "fmt"
+ "regexp"
+ "strings"
+
+ "github.com/cenkalti/backoff/v4"
+ "go.uber.org/zap"
+)
+
+// node is a supervision tree node. It represents the state of a Runnable within this tree, its relation to other tree
+// elements, and contains supporting data needed to actually supervise it.
+type node struct {
+ // The name of this node. Opaque string. It's used to make up the 'dn' (distinguished name) of a node within
+ // the tree. When starting a runnable inside a tree, this is where that name gets used.
+ name string
+ runnable Runnable
+
+ // The supervisor managing this tree.
+ sup *supervisor
+ // The parent, within the tree, of this node. If this is the root node of the tree, this is nil.
+ parent *node
+ // Children of this tree. This is represented by a map keyed from child node names, for easy access.
+ children map[string]*node
+ // Supervision groups. Each group is a set of names of children. Sets, and as such groups, don't overlap between
+ // each other. A supervision group indicates that if any child within that group fails, all others should be
+ // canceled and restarted together.
+ groups []map[string]bool
+
+ // The current state of the runnable in this node.
+ state nodeState
+
+ // Backoff used to keep runnables from being restarted too fast.
+ bo *backoff.ExponentialBackOff
+
+ // Context passed to the runnable, and its cancel function.
+ ctx context.Context
+ ctxC context.CancelFunc
+}
+
+// nodeState is the state of a runnable within a node, and in a way the node itself.
+// This follows the state diagram from go/supervision.
+type nodeState int
+
+const (
+ // A node that has just been created, and whose runnable has been started already but hasn't signaled anything yet.
+ nodeStateNew nodeState = iota
+ // A node whose runnable has signaled being healthy - this means it's ready to serve/act.
+ nodeStateHealthy
+ // A node that has unexpectedly returned or panicked.
+ nodeStateDead
+ // A node that has declared that its done with its work and should not be restarted, unless a supervision tree
+ // failure requires that.
+ nodeStateDone
+ // A node that has returned after being requested to cancel.
+ nodeStateCanceled
+)
+
+func (s nodeState) String() string {
+ switch s {
+ case nodeStateNew:
+ return "NODE_STATE_NEW"
+ case nodeStateHealthy:
+ return "NODE_STATE_HEALTHY"
+ case nodeStateDead:
+ return "NODE_STATE_DEAD"
+ case nodeStateDone:
+ return "NODE_STATE_DONE"
+ case nodeStateCanceled:
+ return "NODE_STATE_CANCELED"
+ }
+ return "UNKNOWN"
+}
+
+func (n *node) String() string {
+ return fmt.Sprintf("%s (%s)", n.dn(), n.state.String())
+}
+
+// contextKey is a type used to keep data within context values.
+type contextKey string
+
+var (
+ supervisorKey = contextKey("supervisor")
+ dnKey = contextKey("dn")
+)
+
+// fromContext retrieves a tree node from a runnable context. It takes a lock on the tree and returns an unlock
+// function. This unlock function needs to be called once mutations on the tree/supervisor/node are done.
+func fromContext(ctx context.Context) (*node, func()) {
+ sup, ok := ctx.Value(supervisorKey).(*supervisor)
+ if !ok {
+ panic("supervisor function called from non-runnable context")
+ }
+
+ sup.mu.Lock()
+
+ dnParent, ok := ctx.Value(dnKey).(string)
+ if !ok {
+ sup.mu.Unlock()
+ panic("supervisor function called from non-runnable context")
+ }
+
+ return sup.nodeByDN(dnParent), sup.mu.Unlock
+}
+
+// All the following 'internal' supervisor functions must only be called with the supervisor lock taken. Getting a lock
+// via fromContext is enough.
+
+// dn returns the distinguished name of a node. This distinguished name is a period-separated, inverse-DNS-like name.
+// For instance, the runnable 'foo' within the runnable 'bar' will be called 'root.bar.foo'. The root of the tree is
+// always named, and has the dn, 'root'.
+func (n *node) dn() string {
+ if n.parent != nil {
+ return fmt.Sprintf("%s.%s", n.parent.dn(), n.name)
+ }
+ return n.name
+}
+
+// groupSiblings is a helper function to get all runnable group siblings of a given runnable name within this node.
+// All children are always in a group, even if that group is unary.
+func (n *node) groupSiblings(name string) map[string]bool {
+ for _, m := range n.groups {
+ if _, ok := m[name]; ok {
+ return m
+ }
+ }
+ return nil
+}
+
+// newNode creates a new node with a given parent. It does not register it with the parent (as that depends on group
+// placement).
+func newNode(name string, runnable Runnable, sup *supervisor, parent *node) *node {
+ n := &node{
+ name: name,
+ runnable: runnable,
+
+ bo: backoff.NewExponentialBackOff(),
+
+ sup: sup,
+ parent: parent,
+ }
+ n.reset()
+ return n
+}
+
+// resetNode sets up all the dynamic fields of the node, in preparation of starting a runnable. It clears the node's
+// children, groups and resets its context.
+func (n *node) reset() {
+ // Make new context. First, acquire parent context. For the root node that's Background, otherwise it's the
+ // parent's context.
+ var pCtx context.Context
+ if n.parent == nil {
+ pCtx = context.Background()
+ } else {
+ pCtx = n.parent.ctx
+ }
+ // Mark DN and supervisor in context.
+ ctx := context.WithValue(pCtx, dnKey, n.dn())
+ ctx = context.WithValue(ctx, supervisorKey, n.sup)
+ ctx, ctxC := context.WithCancel(ctx)
+ // Set context
+ n.ctx = ctx
+ n.ctxC = ctxC
+
+ // Clear children and state
+ n.state = nodeStateNew
+ n.children = make(map[string]*node)
+ n.groups = nil
+
+ // The node is now ready to be scheduled.
+}
+
+// nodeByDN returns a node by given DN from the supervisor.
+func (s *supervisor) nodeByDN(dn string) *node {
+ parts := strings.Split(dn, ".")
+ if parts[0] != "root" {
+ panic("DN does not start with root.")
+ }
+ parts = parts[1:]
+ cur := s.root
+ for {
+ if len(parts) == 0 {
+ return cur
+ }
+
+ next, ok := cur.children[parts[0]]
+ if !ok {
+ panic(fmt.Errorf("could not find %v (%s) in %s", parts, dn, cur))
+ }
+ cur = next
+ parts = parts[1:]
+ }
+}
+
+// reNodeName validates a node name against constraints.
+var reNodeName = regexp.MustCompile(`[a-z90-9_]{1,64}`)
+
+// runGroup schedules a new group of runnables to run on a node.
+func (n *node) runGroup(runnables map[string]Runnable) error {
+ // Check that the parent node is in the right state.
+ if n.state != nodeStateNew {
+ return fmt.Errorf("cannot run new runnable on non-NEW node")
+ }
+
+ // Check the requested runnable names.
+ for name, _ := range runnables {
+ if !reNodeName.MatchString(name) {
+ return fmt.Errorf("runnable name %q is invalid", name)
+ }
+ if _, ok := n.children[name]; ok {
+ return fmt.Errorf("runnable %q already exists", name)
+ }
+ }
+
+ // Create child nodes.
+ dns := make(map[string]string)
+ group := make(map[string]bool)
+ for name, runnable := range runnables {
+ if g := n.groupSiblings(name); g != nil {
+ return fmt.Errorf("duplicate child name %q", name)
+ }
+ node := newNode(name, runnable, n.sup, n)
+ n.children[name] = node
+
+ dns[name] = node.dn()
+ group[name] = true
+ }
+ // Add group.
+ n.groups = append(n.groups, group)
+
+ // Schedule execution of group members.
+ go func() {
+ for name, _ := range runnables {
+ n.sup.pReq <- &processorRequest{
+ schedule: &processorRequestSchedule{
+ dn: dns[name],
+ },
+ }
+ }
+ }()
+ return nil
+}
+
+// signal sequences state changes by signals received from runnables and updates a node's status accordingly.
+func (n *node) signal(signal SignalType) {
+ switch signal {
+ case SignalHealthy:
+ if n.state != nodeStateNew {
+ panic(fmt.Errorf("node %s signaled healthy", n))
+ }
+ n.state = nodeStateHealthy
+ n.bo.Reset()
+ case SignalDone:
+ if n.state != nodeStateHealthy {
+ panic(fmt.Errorf("node %s signaled done", n))
+ }
+ n.state = nodeStateDone
+ n.bo.Reset()
+ }
+}
+
+// getLogger creates a new logger for a given supervisor node, to be used by its runnable.
+func (n *node) getLogger() *zap.Logger {
+ return n.sup.logger.Named(n.dn())
+}
diff --git a/core/internal/common/supervisor/supervisor_processor.go b/core/internal/common/supervisor/supervisor_processor.go
new file mode 100644
index 0000000..b1d92a4
--- /dev/null
+++ b/core/internal/common/supervisor/supervisor_processor.go
@@ -0,0 +1,408 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "runtime/debug"
+ "time"
+
+ "go.uber.org/zap"
+)
+
+// The processor maintains runnable goroutines - ie., when requested will start one, and then once it exists it will
+// record the result and act accordingly. It is also responsible for detecting and acting upon supervision subtrees that
+// need to be restarted after death (via a 'GC' process)
+
+// flagCatchPanic is a global flag that configures whether panics from runnables are handled and treated as errors, or
+// cause a panic of the entire supervisor.
+// For production use cases, you likely want to catch panics.
+// For debugging and tests, you likely want panics to bubble up and abort the entire supervisor early.
+var flagCatchPanic = true
+
+func init() {
+ flag.BoolVar(&flagCatchPanic, "catch_panic", flagCatchPanic, "Catch service/runnable panics - disable this for development or testing")
+}
+
+// processorRequest is a request for the processor. Only one of the fields can be set.
+type processorRequest struct {
+ schedule *processorRequestSchedule
+ died *processorRequestDied
+}
+
+// processorRequestSchedule requests that a given node's runnable be started.
+type processorRequestSchedule struct {
+ dn string
+}
+
+// processorRequestDied is a signal from a runnable goroutine that the runnable has died.
+type processorRequestDied struct {
+ dn string
+ err error
+}
+
+// processor is the main processing loop.
+func (s *supervisor) processor(ctx context.Context) {
+ s.ilogger.Info("supervisor processor started")
+
+ // The GC will run every millisecond if needed. Any time the processor requests a change in the supervision tree
+ // (ie a death or a new runnable) it will mark the state as dirty and run the GC on the next millisecond cycle.
+ gc := time.NewTicker(1 * time.Millisecond)
+ defer gc.Stop()
+ clean := true
+
+ for {
+ select {
+ case <-ctx.Done():
+ s.ilogger.Info("supervisor processor exiting...", zap.Error(ctx.Err()))
+ s.processKill()
+ s.ilogger.Info("supervisor exited")
+ return
+ case <-gc.C:
+ if !clean {
+ gcStart := time.Now()
+ s.processGC()
+ s.ilogger.Debug("gc done", zap.Duration("elapsed", time.Since(gcStart)))
+ }
+ clean = true
+ case r := <-s.pReq:
+ switch {
+ case r.schedule != nil:
+ s.processSchedule(r.schedule)
+ clean = false
+ case r.died != nil:
+ s.processDied(r.died)
+ clean = false
+ default:
+ panic(fmt.Errorf("unhandled request %+v", r))
+ }
+ }
+ }
+}
+
+// processKill cancels all nodes in the supervision tree. This is only called right before exiting the processor, so
+// they do not get automatically restarted.
+func (s *supervisor) processKill() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ // Gather all context cancel functions.
+ var cancels []func()
+ queue := []*node{s.root}
+ for {
+ if len(queue) == 0 {
+ break
+ }
+
+ cur := queue[0]
+ queue = queue[1:]
+
+ cancels = append(cancels, cur.ctxC)
+ for _, c := range cur.children {
+ queue = append(queue, c)
+ }
+ }
+
+ // Call all context cancels.
+ for _, c := range cancels {
+ c()
+ }
+}
+
+// processSchedule starts a node's runnable in a goroutine and records its output once it's done.
+func (s *supervisor) processSchedule(r *processorRequestSchedule) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ n := s.nodeByDN(r.dn)
+ go func() {
+ if flagCatchPanic {
+ defer func() {
+ if rec := recover(); rec != nil {
+ s.pReq <- &processorRequest{
+ died: &processorRequestDied{
+ dn: r.dn,
+ err: fmt.Errorf("panic: %v, stacktrace: %s", rec, string(debug.Stack())),
+ },
+ }
+ }
+ }()
+ }
+
+ res := n.runnable(n.ctx)
+
+ s.pReq <- &processorRequest{
+ died: &processorRequestDied{
+ dn: r.dn,
+ err: res,
+ },
+ }
+ }()
+}
+
+// processDied records the result from a runnable goroutine, and updates its node state accordingly. If the result
+// is a death and not an expected exit, related nodes (ie. children and group siblings) are canceled accordingly.
+func (s *supervisor) processDied(r *processorRequestDied) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ // Okay, so a Runnable has quit. What now?
+ n := s.nodeByDN(r.dn)
+ ctx := n.ctx
+
+ // Simple case: it was marked as Done and quit with no error.
+ if n.state == nodeStateDone && r.err == nil {
+ // Do nothing. This was supposed to happen. Keep the process as DONE.
+ return
+ }
+
+ // Find innermost error to check if it's a context canceled error.
+ perr := r.err
+ for {
+ if inner := errors.Unwrap(perr); inner != nil {
+ perr = inner
+ continue
+ }
+ break
+ }
+
+ // Simple case: the context was canceled and the returned error is the context error.
+ if err := ctx.Err(); err != nil && perr == err {
+ // Mark the node as canceled successfully.
+ n.state = nodeStateCanceled
+ return
+ }
+
+ // Otherwise, the Runnable should not have died or quit. Handle accordingly.
+ err := r.err
+ // A lack of returned error is also an error.
+ if err == nil {
+ err = fmt.Errorf("returned when %s", n.state)
+ } else {
+ err = fmt.Errorf("returned error when %s: %w", n.state, err)
+ }
+
+ s.ilogger.Error("Runnable died", zap.String("dn", n.dn()), zap.Error(err))
+ // Mark as dead.
+ n.state = nodeStateDead
+
+ // Cancel that node's context, just in case something still depends on it.
+ n.ctxC()
+
+ // Cancel all siblings.
+ if n.parent != nil {
+ for name, _ := range n.parent.groupSiblings(n.name) {
+ if name == n.name {
+ continue
+ }
+ sibling := n.parent.children[name]
+ // TODO(q3k): does this need to run in a goroutine, ie. can a context cancel block?
+ sibling.ctxC()
+ }
+ }
+}
+
+// processGC runs the GC process. It's not really Garbage Collection, as in, it doesn't remove unnecessary tree nodes -
+// but it does find nodes that need to be restarted, find the subset that can and then schedules them for running.
+// As such, it's less of a Garbage Collector and more of a Necromancer. However, GC is a friendlier name.
+func (s *supervisor) processGC() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ // The 'GC' serves is the main business logic of the supervision tree. It traverses a locked tree and tries to
+ // find subtrees that must be restarted (because of a DEAD/CANCELED runnable). It then finds which of these
+ // subtrees that should be restarted can be restarted, ie. which ones are fully recursively DEAD/CANCELED. It
+ // also finds the smallest set of largest subtrees that can be restarted, ie. if there's multiple DEAD runnables
+ // that can be restarted at once, it will do so.
+
+ tStart := time.Now()
+
+ // Phase one: Find all leaves.
+ // This is a simple DFS that finds all the leaves of the tree, ie all nodes that do not have children nodes.
+ leaves := make(map[string]bool)
+ queue := []*node{s.root}
+ for {
+ if len(queue) == 0 {
+ break
+ }
+ cur := queue[0]
+ queue = queue[1:]
+
+ for _, c := range cur.children {
+ queue = append([]*node{c}, queue...)
+ }
+
+ if len(cur.children) == 0 {
+ leaves[cur.dn()] = true
+ }
+ }
+
+ tPhase1 := time.Now()
+ s.ilogger.Debug("gc phase 1 done", zap.Any("leaves", leaves))
+
+ // Phase two: traverse tree from node to root and make note of all subtrees that can be restarted.
+ // A subtree is restartable/ready iff every node in that subtree is either CANCELED, DEAD or DONE.
+ // Such a 'ready' subtree can be restarted by the supervisor if needed.
+
+ // DNs that we already visited.
+ visited := make(map[string]bool)
+ // DNs whose subtrees are ready to be restarted.
+ // These are all subtrees recursively - ie., root.a.a and root.a will both be marked here.
+ ready := make(map[string]bool)
+
+ // We build a queue of nodes to visit, starting from the leaves.
+ queue = []*node{}
+ for l, _ := range leaves {
+ queue = append(queue, s.nodeByDN(l))
+ }
+
+ for {
+ if len(queue) == 0 {
+ break
+ }
+
+ cur := queue[0]
+ curDn := cur.dn()
+
+ queue = queue[1:]
+
+ // Do we have a decision about our children?
+ allVisited := true
+ for _, c := range cur.children {
+ if !visited[c.dn()] {
+ allVisited = false
+ break
+ }
+ }
+
+ // If no decision about children is available, it means we ended up in this subtree through some shorter path
+ // of a shorter/lower-order leaf. There is a path to a leaf that's longer than the one that caused this node
+ // to be enqueued. Easy solution: just push back the current element and retry later.
+ if !allVisited {
+ // Push back to queue and wait for a decision later.
+ queue = append(queue, cur)
+ continue
+ }
+
+ // All children have been visited and we have an idea about whether they're ready/restartable. All of the node's
+ // children must be restartable in order for this node to be restartable.
+ childrenReady := true
+ for _, c := range cur.children {
+ if !ready[c.dn()] {
+ childrenReady = false
+ break
+ }
+ }
+
+ // In addition to children, the node itself must be restartable (ie. DONE, DEAD or CANCELED).
+ curReady := false
+ switch cur.state {
+ case nodeStateDone:
+ curReady = true
+ case nodeStateCanceled:
+ curReady = true
+ case nodeStateDead:
+ curReady = true
+ }
+
+ // Note down that we have an opinion on this node, and note that opinion down.
+ visited[curDn] = true
+ ready[curDn] = childrenReady && curReady
+
+ // Now we can also enqueue the parent of this node for processing.
+ if cur.parent != nil && !visited[cur.parent.dn()] {
+ queue = append(queue, cur.parent)
+ }
+ }
+
+ tPhase2 := time.Now()
+ s.ilogger.Debug("gc phase 2 done", zap.Any("ready", ready))
+
+ // Phase 3: traverse tree from root to find largest subtrees that need to be restarted and are ready to be
+ // restarted.
+
+ // All DNs that need to be restarted by the GC process.
+ want := make(map[string]bool)
+ // All DNs that need to be restarted and can be restarted by the GC process - a subset of 'want' DNs.
+ can := make(map[string]bool)
+ // The set difference between 'want' and 'can' are all nodes that should be restarted but can't yet (ie. because
+ // a child is still in the process of being canceled).
+
+ // DFS from root.
+ queue = []*node{s.root}
+ for {
+ if len(queue) == 0 {
+ break
+ }
+
+ cur := queue[0]
+ queue = queue[1:]
+
+ // If this node is DEAD or CANCELED it should be restarted.
+ if cur.state == nodeStateDead || cur.state == nodeStateCanceled {
+ want[cur.dn()] = true
+ }
+
+ // If it should and can be restarted, that's all we want.
+ if want[cur.dn()] && ready[cur.dn()] {
+ can[cur.dn()] = true
+ continue
+ }
+
+ // Otherwise, traverse further down the tree to see if something else needs to be done.
+ for _, c := range cur.children {
+ queue = append(queue, c)
+ }
+ }
+
+ tPhase3 := time.Now()
+ s.ilogger.Debug("gc phase 3 done", zap.Any("want", want), zap.Any("can", can))
+
+ // Reinitialize and reschedule all subtrees
+ for dn, _ := range can {
+ n := s.nodeByDN(dn)
+ bo := time.Duration(0)
+ // Only back off when the node unexpectedly died - not when it got canceled.
+ if n.state == nodeStateDead {
+ bo = n.bo.NextBackOff()
+ }
+ // Prepare node for rescheduling - remove its children, reset its state to new.
+ n.reset()
+
+ s.ilogger.Info("rescheduling supervised node", zap.String("dn", dn), zap.Duration("backoff", bo))
+
+ // Reschedule node runnable to run after backoff.
+ go func(n *node, bo time.Duration) {
+ time.Sleep(bo)
+ s.pReq <- &processorRequest{
+ schedule: &processorRequestSchedule{dn: n.dn()},
+ }
+ }(n, bo)
+ }
+
+ tPhase4 := time.Now()
+ s.ilogger.Debug("gc phase 4 done")
+
+ s.ilogger.Debug("gc timings",
+ zap.Duration("phase1", tPhase1.Sub(tStart)),
+ zap.Duration("phase2", tPhase2.Sub(tPhase1)),
+ zap.Duration("phase3", tPhase3.Sub(tPhase2)),
+ zap.Duration("phase4", tPhase4.Sub(tPhase3)))
+}
diff --git a/core/internal/common/supervisor/supervisor_support.go b/core/internal/common/supervisor/supervisor_support.go
new file mode 100644
index 0000000..f11afcc
--- /dev/null
+++ b/core/internal/common/supervisor/supervisor_support.go
@@ -0,0 +1,62 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+// Supporting infrastructure to allow running some non-Go payloads under supervision.
+
+import (
+ "context"
+ "net"
+ "os/exec"
+
+ "google.golang.org/grpc"
+)
+
+// GRPCServer creates a Runnable that serves gRPC requests as longs as it's not canceled.
+// If graceful is set to true, the server will be gracefully stopped instead of plain stopped. This means all pending
+// RPCs will finish, but also requires streaming gRPC handlers to check their context liveliness and exit accordingly.
+// If the server code does not support this, `graceful` should be false and the server will be killed violently instead.
+func GRPCServer(srv *grpc.Server, lis net.Listener, graceful bool) Runnable {
+ return func(ctx context.Context) error {
+ Signal(ctx, SignalHealthy)
+ errC := make(chan error)
+ go func() {
+ errC <- srv.Serve(lis)
+ }()
+ select {
+ case <-ctx.Done():
+ if graceful {
+ srv.GracefulStop()
+ } else {
+ srv.Stop()
+ }
+ return ctx.Err()
+ case err := <-errC:
+ return err
+ }
+ }
+}
+
+// Command will create a Runnable that starts a long-running command, whose exit is determined to be a failure.
+func Command(name string, arg ...string) Runnable {
+ return func(ctx context.Context) error {
+ Signal(ctx, SignalHealthy)
+
+ cmd := exec.CommandContext(ctx, name, arg...)
+ return cmd.Run()
+ }
+}
diff --git a/core/internal/common/supervisor/supervisor_test.go b/core/internal/common/supervisor/supervisor_test.go
new file mode 100644
index 0000000..1b440a5
--- /dev/null
+++ b/core/internal/common/supervisor/supervisor_test.go
@@ -0,0 +1,560 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package supervisor
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "go.uber.org/zap"
+)
+
+func runnableBecomesHealthy(healthy, done chan struct{}) Runnable {
+ return func(ctx context.Context) error {
+ Signal(ctx, SignalHealthy)
+
+ go func() {
+ if healthy != nil {
+ healthy <- struct{}{}
+ }
+ }()
+
+ <-ctx.Done()
+
+ go func() {
+ if done != nil {
+ done <- struct{}{}
+ }
+ }()
+
+ return ctx.Err()
+ }
+}
+
+func runnableSpawnsMore(healthy, done chan struct{}, levels int) Runnable {
+ return func(ctx context.Context) error {
+ if levels > 0 {
+ err := RunGroup(ctx, map[string]Runnable{
+ "a": runnableSpawnsMore(nil, nil, levels-1),
+ "b": runnableSpawnsMore(nil, nil, levels-1),
+ })
+ if err != nil {
+ return err
+ }
+ }
+
+ Signal(ctx, SignalHealthy)
+
+ go func() {
+ if healthy != nil {
+ healthy <- struct{}{}
+ }
+ }()
+
+ <-ctx.Done()
+
+ go func() {
+ if done != nil {
+ done <- struct{}{}
+ }
+ }()
+ return ctx.Err()
+ }
+}
+
+// rc is a Remote Controlled runnable. It is a generic runnable used for testing the supervisor.
+type rc struct {
+ req chan rcRunnableRequest
+}
+
+type rcRunnableRequest struct {
+ cmd rcRunnableCommand
+ stateC chan rcRunnableState
+}
+
+type rcRunnableCommand int
+
+const (
+ rcRunnableCommandBecomeHealthy rcRunnableCommand = iota
+ rcRunnableCommandBecomeDone
+ rcRunnableCommandDie
+ rcRunnableCommandPanic
+ rcRunnableCommandState
+)
+
+type rcRunnableState int
+
+const (
+ rcRunnableStateNew rcRunnableState = iota
+ rcRunnableStateHealthy
+ rcRunnableStateDone
+)
+
+func (r *rc) becomeHealthy() {
+ r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeHealthy}
+}
+
+func (r *rc) becomeDone() {
+ r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeDone}
+}
+func (r *rc) die() {
+ r.req <- rcRunnableRequest{cmd: rcRunnableCommandDie}
+}
+
+func (r *rc) panic() {
+ r.req <- rcRunnableRequest{cmd: rcRunnableCommandPanic}
+}
+
+func (r *rc) state() rcRunnableState {
+ c := make(chan rcRunnableState)
+ r.req <- rcRunnableRequest{
+ cmd: rcRunnableCommandState,
+ stateC: c,
+ }
+ return <-c
+}
+
+func (r *rc) waitState(s rcRunnableState) {
+ // This is poll based. Making it non-poll based would make the RC runnable logic a bit more complex for little gain.
+ for {
+ got := r.state()
+ if got == s {
+ return
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+}
+
+func newRC() *rc {
+ return &rc{
+ req: make(chan rcRunnableRequest),
+ }
+}
+
+// Remote Controlled Runnable
+func (r *rc) runnable() Runnable {
+ return func(ctx context.Context) error {
+ state := rcRunnableStateNew
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case r := <-r.req:
+ switch r.cmd {
+ case rcRunnableCommandBecomeHealthy:
+ Signal(ctx, SignalHealthy)
+ state = rcRunnableStateHealthy
+ case rcRunnableCommandBecomeDone:
+ Signal(ctx, SignalDone)
+ state = rcRunnableStateDone
+ case rcRunnableCommandDie:
+ return fmt.Errorf("died on request")
+ case rcRunnableCommandPanic:
+ panic("at the disco")
+ case rcRunnableCommandState:
+ r.stateC <- state
+ }
+ }
+ }
+ }
+}
+
+func TestSimple(t *testing.T) {
+ h1 := make(chan struct{})
+ d1 := make(chan struct{})
+ h2 := make(chan struct{})
+ d2 := make(chan struct{})
+
+ log, _ := zap.NewDevelopment()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+ New(ctx, log, func(ctx context.Context) error {
+ err := RunGroup(ctx, map[string]Runnable{
+ "one": runnableBecomesHealthy(h1, d1),
+ "two": runnableBecomesHealthy(h2, d2),
+ })
+ if err != nil {
+ return err
+ }
+ Signal(ctx, SignalHealthy)
+ Signal(ctx, SignalDone)
+ return nil
+ })
+
+ // Expect both to start running.
+ select {
+ case <-h1:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatalf("runnable 'one' didn't start one time")
+ }
+ select {
+ case <-h2:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatalf("runnable 'one' didn't start one time")
+ }
+}
+
+func TestSimpleFailure(t *testing.T) {
+ h1 := make(chan struct{})
+ d1 := make(chan struct{})
+ two := newRC()
+
+ log, _ := zap.NewDevelopment()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+ New(ctx, log, func(ctx context.Context) error {
+ err := RunGroup(ctx, map[string]Runnable{
+ "one": runnableBecomesHealthy(h1, d1),
+ "two": two.runnable(),
+ })
+ if err != nil {
+ return err
+ }
+ Signal(ctx, SignalHealthy)
+ Signal(ctx, SignalDone)
+ return nil
+ })
+
+ two.becomeHealthy()
+ // Expect one to start running.
+ select {
+ case <-h1:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatalf("runnable 'one' didn't start one time")
+ }
+
+ // Kill off two, one should restart.
+ two.die()
+ select {
+ case <-d1:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatalf("runnable 'one' didn't acknowledge cancel on time")
+ }
+
+ // And one should start running again.
+ select {
+ case <-h1:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatalf("runnable 'one' didn't restart on time")
+ }
+}
+
+func TestDeepFailure(t *testing.T) {
+ h1 := make(chan struct{})
+ d1 := make(chan struct{})
+ two := newRC()
+
+ log, _ := zap.NewDevelopment()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+ New(ctx, log, func(ctx context.Context) error {
+ err := RunGroup(ctx, map[string]Runnable{
+ "one": runnableSpawnsMore(h1, d1, 4),
+ "two": two.runnable(),
+ })
+ if err != nil {
+ return err
+ }
+ Signal(ctx, SignalHealthy)
+ Signal(ctx, SignalDone)
+ return nil
+ })
+
+ two.becomeHealthy()
+ // Expect one to start running.
+ select {
+ case <-h1:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatalf("runnable 'one' didn't start one time")
+ }
+
+ // Kill off two, one should restart.
+ two.die()
+ select {
+ case <-d1:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatalf("runnable 'one' didn't acknowledge cancel on time")
+ }
+
+ // And one should start running again.
+ select {
+ case <-h1:
+ case <-time.After(110 * time.Millisecond):
+ t.Fatalf("runnable 'one' didn't restart on time")
+ }
+}
+
+func TestPanic(t *testing.T) {
+ catchPanicPrev := flagCatchPanic
+ flagCatchPanic = true
+ defer func() {
+ flagCatchPanic = catchPanicPrev
+ }()
+
+ h1 := make(chan struct{})
+ d1 := make(chan struct{})
+ two := newRC()
+
+ log, _ := zap.NewDevelopment()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+ New(ctx, log, func(ctx context.Context) error {
+ err := RunGroup(ctx, map[string]Runnable{
+ "one": runnableBecomesHealthy(h1, d1),
+ "two": two.runnable(),
+ })
+ if err != nil {
+ return err
+ }
+ Signal(ctx, SignalHealthy)
+ Signal(ctx, SignalDone)
+ return nil
+ })
+
+ two.becomeHealthy()
+ // Expect one to start running.
+ select {
+ case <-h1:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatalf("runnable 'one' didn't start one time")
+ }
+
+ // Kill off two, one should restart.
+ two.panic()
+ select {
+ case <-d1:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatalf("runnable 'one' didn't acknowledge cancel on time")
+ }
+
+ // And one should start running again.
+ select {
+ case <-h1:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatalf("runnable 'one' didn't restart on time")
+ }
+}
+
+func TestMultipleLevelFailure(t *testing.T) {
+ log, _ := zap.NewDevelopment()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+ New(ctx, log, func(ctx context.Context) error {
+ err := RunGroup(ctx, map[string]Runnable{
+ "one": runnableSpawnsMore(nil, nil, 4),
+ "two": runnableSpawnsMore(nil, nil, 4),
+ })
+ if err != nil {
+ return err
+ }
+ Signal(ctx, SignalHealthy)
+ Signal(ctx, SignalDone)
+ return nil
+ })
+}
+
+func TestBackoff(t *testing.T) {
+ one := newRC()
+ log, _ := zap.NewDevelopment()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+ New(ctx, log, func(ctx context.Context) error {
+ if err := Run(ctx, "one", one.runnable()); err != nil {
+ return err
+ }
+ Signal(ctx, SignalHealthy)
+ Signal(ctx, SignalDone)
+ return nil
+ })
+
+ one.becomeHealthy()
+ // Die a bunch of times in a row, this brings up the next exponential backoff to over a second.
+ for i := 0; i < 4; i += 1 {
+ one.die()
+ one.waitState(rcRunnableStateNew)
+ }
+ // Measure how long it takes for the runnable to respawn after a number of failures
+ start := time.Now()
+ one.die()
+ one.becomeHealthy()
+ one.waitState(rcRunnableStateHealthy)
+ taken := time.Since(start)
+ if taken < 1*time.Second {
+ t.Errorf("Runnable took %v to restarted, wanted at least a second from backoff", taken)
+ }
+
+ // Now that we've become healthy, die again. Becoming healthy resets the backoff.
+ start = time.Now()
+ one.die()
+ one.becomeHealthy()
+ one.waitState(rcRunnableStateHealthy)
+ taken = time.Since(start)
+ if taken > 500*time.Millisecond || taken < 100*time.Millisecond {
+ t.Errorf("Runnable took %v to restarted, wanted at least 100ms from backoff and at most 500ms from backoff reset", taken)
+ }
+}
+
+// TestResilience throws some curveballs at the supervisor - either programming errors or high load. It then ensures
+// that another runnable is running, and that it restarts on its sibling failure.
+func TestResilience(t *testing.T) {
+ // request/response channel for testing liveness of the 'one' runnable
+ req := make(chan chan struct{})
+
+ // A runnable that responds on the 'req' channel.
+ one := func(ctx context.Context) error {
+ Signal(ctx, SignalHealthy)
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case r := <-req:
+ r <- struct{}{}
+ }
+ }
+ }
+ oneSibling := newRC()
+
+ oneTest := func() {
+ timeout := time.NewTicker(100 * time.Millisecond)
+ ping := make(chan struct{})
+ req <- ping
+ select {
+ case <-ping:
+ case <-timeout.C:
+ t.Fatalf("one ping response timeout")
+ }
+ timeout.Stop()
+ }
+
+ // A nasty runnable that calls Signal with the wrong context (this is a programming error)
+ two := func(ctx context.Context) error {
+ Signal(context.TODO(), SignalHealthy)
+ return nil
+ }
+
+ // A nasty runnable that calls Signal wrong (this is a programming error).
+ three := func(ctx context.Context) error {
+ Signal(ctx, SignalDone)
+ return nil
+ }
+
+ // A nasty runnable that runs in a busy loop (this is a programming error).
+ four := func(ctx context.Context) error {
+ for {
+ time.Sleep(0)
+ }
+ }
+
+ // A nasty runnable that keeps creating more runnables.
+ five := func(ctx context.Context) error {
+ i := 1
+ for {
+ err := Run(ctx, fmt.Sprintf("r%d", i), runnableSpawnsMore(nil, nil, 2))
+ if err != nil {
+ return err
+ }
+
+ time.Sleep(100 * time.Millisecond)
+ i += 1
+ }
+ }
+
+ log, _ := zap.NewDevelopment()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+ New(ctx, log, func(ctx context.Context) error {
+ RunGroup(ctx, map[string]Runnable{
+ "one": one,
+ "oneSibling": oneSibling.runnable(),
+ })
+ rs := map[string]Runnable{
+ "two": two, "three": three, "four": four, "five": five,
+ }
+ for k, v := range rs {
+ if err := Run(ctx, k, v); err != nil {
+ return err
+ }
+ }
+ Signal(ctx, SignalHealthy)
+ Signal(ctx, SignalDone)
+ return nil
+ })
+
+ // Five rounds of letting one run, then restarting it.
+ for i := 0; i < 5; i += 1 {
+ oneSibling.becomeHealthy()
+ oneSibling.waitState(rcRunnableStateHealthy)
+
+ // 'one' should work for at least a second.
+ deadline := time.Now().Add(1 * time.Second)
+ for {
+ if time.Now().After(deadline) {
+ break
+ }
+
+ oneTest()
+ }
+
+ // Killing 'oneSibling' should restart one.
+ oneSibling.panic()
+ }
+ // Make sure 'one' is still okay.
+ oneTest()
+}
+
+func ExampleNew() {
+ // Minimal runnable that is immediately done.
+ childC := make(chan struct{})
+ child := func(ctx context.Context) error {
+ Signal(ctx, SignalHealthy)
+ close(childC)
+ Signal(ctx, SignalDone)
+ return nil
+ }
+
+ log, _ := zap.NewDevelopment()
+
+ // Start a supervision tree with a root runnable.
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+ New(ctx, log, func(ctx context.Context) error {
+ err := Run(ctx, "child", child)
+ if err != nil {
+ return fmt.Errorf("could not run 'child': %w", err)
+ }
+ Signal(ctx, SignalHealthy)
+
+ t := time.NewTicker(time.Second)
+ defer t.Stop()
+
+ // Do something in the background, and exit on context cancel.
+ for {
+ select {
+ case <-t.C:
+ fmt.Printf("tick!")
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ })
+
+ // root.child will close this channel.
+ <-childC
+}