cloud/shepherd/equinix: split out control loop logic

This is in preparation for implementing the recoverer/rebooter inside
the shepherd.

In the future this will likely be split away from from the shepherd and
end up as a generic bmdb library. But let's first wait for concrete
usages outside of the shepherd component.

Change-Id: I69b9a2e913dcefa2c6558e271b6853285c6120b3
Reviewed-on: https://review.monogon.dev/c/monogon/+/1559
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/shepherd/equinix/manager/BUILD.bazel b/cloud/shepherd/equinix/manager/BUILD.bazel
index 2c702fe..d6554e0 100644
--- a/cloud/shepherd/equinix/manager/BUILD.bazel
+++ b/cloud/shepherd/equinix/manager/BUILD.bazel
@@ -3,6 +3,7 @@
 go_library(
     name = "manager",
     srcs = [
+        "control_loop.go",
         "initializer.go",
         "manager.go",
         "provisioner.go",
diff --git a/cloud/shepherd/equinix/manager/control_loop.go b/cloud/shepherd/equinix/manager/control_loop.go
new file mode 100644
index 0000000..ef0832b
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/control_loop.go
@@ -0,0 +1,234 @@
+package manager
+
+import (
+	"context"
+	"errors"
+	"flag"
+	"fmt"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/google/uuid"
+	"golang.org/x/sync/errgroup"
+	"golang.org/x/time/rate"
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+)
+
+// task describes a single server currently being processed by a control loop.
+type task struct {
+	// machine is the machine data (including provider and provider ID) retrieved
+	// from the BMDB.
+	machine *model.MachineProvided
+	// work is a machine lock facilitated by BMDB that prevents machines from
+	// being processed by multiple workers at the same time.
+	work *bmdb.Work
+}
+
+// controlLoop is implemented by any component which should act as a BMDB-based
+// control loop. Implementing these methods allows the given component to be
+// started using RunControlLoop.
+type controlLoop interface {
+	// getMachines must return the list of machines ready to be processed by the
+	// control loop for a given control loop implementation.
+	getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error)
+	// processMachine will be called within the scope of an active task/BMDB work by
+	// the control loop logic.
+	processMachine(ctx context.Context, t *task) error
+
+	// getControlLoopConfig is implemented by ControlLoopConfig which should be
+	// embedded by the control loop component. If not embedded, this method will have
+	// to be implemented, too.
+	getControlLoopConfig() *ControlLoopConfig
+}
+
+// ControlLoopConfig should be embedded the every component which acts as a
+// control loop. RegisterFlags should be called by the component whenever it is
+// registering its own flags. Check should be called whenever the component is
+// instantiated, after RegisterFlags has been called.
+type ControlLoopConfig struct {
+	// DBQueryLimiter limits the rate at which BMDB is queried for servers ready
+	// for BMaaS agent initialization. Must be set.
+	DBQueryLimiter *rate.Limiter
+
+	// Parallelism is how many instances of the Initializer will be allowed to run in
+	// parallel against the BMDB. This speeds up the process of starting/restarting
+	// agents significantly, as one initializer instance can handle at most one agent
+	// (re)starting process.
+	//
+	// If not set (ie. 0), default to 1. A good starting value for production
+	// deployments is 10 or so.
+	Parallelism int
+}
+
+func (c *ControlLoopConfig) getControlLoopConfig() *ControlLoopConfig {
+	return c
+}
+
+// flagLimiter configures a *rate.Limiter as a flag.
+func flagLimiter(l **rate.Limiter, name, defval, help string) {
+	syntax := "'duration,count' eg. '2m,10' for a 10-sized bucket refilled at one token every 2 minutes"
+	help = help + fmt.Sprintf(" (default: %q, syntax: %s)", defval, syntax)
+	flag.Func(name, help, func(val string) error {
+		if val == "" {
+			val = defval
+		}
+		parts := strings.Split(val, ",")
+		if len(parts) != 2 {
+			return fmt.Errorf("invalid syntax, want: %s", syntax)
+		}
+		duration, err := time.ParseDuration(parts[0])
+		if err != nil {
+			return fmt.Errorf("invalid duration: %w", err)
+		}
+		refill, err := strconv.ParseUint(parts[1], 10, 31)
+		if err != nil {
+			return fmt.Errorf("invalid refill rate: %w", err)
+		}
+		*l = rate.NewLimiter(rate.Every(duration), int(refill))
+		return nil
+	})
+	flag.Set(name, defval)
+}
+
+// RegisterFlags should be called on this configuration whenever the embeddeding
+// component/configuration is registering its own flags. The prefix should be the
+// name of the component.
+func (c *ControlLoopConfig) RegisterFlags(prefix string) {
+	flagLimiter(&c.DBQueryLimiter, prefix+"_db_query_rate", "250ms,8", "Rate limiting for BMDB queries")
+	flag.IntVar(&c.Parallelism, prefix+"_loop_parallelism", 1, "How many initializer instances to run in parallel, ie. how many agents to attempt to (re)start at once")
+}
+
+// Check should be called after RegisterFlags but before the control loop is ran.
+// If an error is returned, the control loop cannot start.
+func (c *ControlLoopConfig) Check() error {
+	if c.DBQueryLimiter == nil {
+		return fmt.Errorf("DBQueryLimiter must be configured")
+	}
+	if c.Parallelism == 0 {
+		c.Parallelism = 1
+	}
+	return nil
+}
+
+// RunControlLoop runs the given controlLoop implementation against the BMDB. The
+// loop will be run with the parallelism and rate configured by the
+// ControlLoopConfig embedded or otherwise returned by the controlLoop.
+func RunControlLoop(ctx context.Context, conn *bmdb.Connection, loop controlLoop) error {
+	clr := &controlLoopRunner{
+		loop:   loop,
+		config: loop.getControlLoopConfig(),
+	}
+	return clr.run(ctx, conn)
+}
+
+// controlLoopRunner is a configured control loop with an underlying control loop
+// implementation.
+type controlLoopRunner struct {
+	config *ControlLoopConfig
+	loop   controlLoop
+}
+
+// run the control loops(s) (depending on opts.Parallelism) blocking the current
+// goroutine until the given context expires and all provisioners quit.
+func (r *controlLoopRunner) run(ctx context.Context, conn *bmdb.Connection) error {
+	eg := errgroup.Group{}
+	for j := 0; j < r.config.Parallelism; j += 1 {
+		eg.Go(func() error {
+			return r.runOne(ctx, conn)
+		})
+	}
+	return eg.Wait()
+}
+
+// run the control loop blocking the current goroutine until the given context
+// expires.
+func (r *controlLoopRunner) runOne(ctx context.Context, conn *bmdb.Connection) error {
+	var err error
+
+	// Maintain a BMDB session as long as possible.
+	var sess *bmdb.Session
+	for {
+		if sess == nil {
+			sess, err = conn.StartSession(ctx)
+			if err != nil {
+				return fmt.Errorf("could not start BMDB session: %w", err)
+			}
+		}
+		// Inside that session, run the main logic.
+		err := r.runInSession(ctx, sess)
+
+		switch {
+		case err == nil:
+		case errors.Is(err, ctx.Err()):
+			return err
+		case errors.Is(err, bmdb.ErrSessionExpired):
+			klog.Errorf("Session expired, restarting...")
+			sess = nil
+			time.Sleep(time.Second)
+		case err != nil:
+			klog.Errorf("Processing failed: %v", err)
+			// TODO(q3k): close session
+			time.Sleep(time.Second)
+		}
+	}
+}
+
+// runInSession executes one iteration of the control loop within a BMDB session.
+// This control loop attempts to start or re-start the agent on any machines that
+// need this per the BMDB.
+func (r *controlLoopRunner) runInSession(ctx context.Context, sess *bmdb.Session) error {
+	t, err := r.source(ctx, sess)
+	if err != nil {
+		return fmt.Errorf("could not source machine: %w", err)
+	}
+	if t == nil {
+		return nil
+	}
+	defer t.work.Cancel(ctx)
+
+	if err := r.loop.processMachine(ctx, t); err != nil {
+		klog.Errorf("Failed to process machine %s: %v", t.machine.MachineID, err)
+		d := 1 * time.Minute
+		err = t.work.Fail(ctx, &d, fmt.Sprintf("failed to process: %v", err))
+		return err
+	}
+	return nil
+}
+
+// source supplies returns a BMDB-locked server ready for processing by the
+// control loop, locked by a work item. If both task and error are nil, then
+// there are no machines needed to be initialized. The returned work item in task
+// _must_ be canceled or finished by the caller.
+func (r *controlLoopRunner) source(ctx context.Context, sess *bmdb.Session) (*task, error) {
+	r.config.DBQueryLimiter.Wait(ctx)
+
+	var machine *model.MachineProvided
+	work, err := sess.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
+		machines, err := r.loop.getMachines(ctx, q, 1)
+		if err != nil {
+			return nil, err
+		}
+		if len(machines) < 1 {
+			return nil, bmdb.ErrNothingToDo
+		}
+		machine = &machines[0]
+		return []uuid.UUID{machines[0].MachineID}, nil
+	})
+
+	if errors.Is(err, bmdb.ErrNothingToDo) {
+		return nil, nil
+	}
+
+	if err != nil {
+		return nil, fmt.Errorf("while querying BMDB agent candidates: %w", err)
+	}
+
+	return &task{
+		machine: machine,
+		work:    work,
+	}, nil
+}
diff --git a/cloud/shepherd/equinix/manager/initializer.go b/cloud/shepherd/equinix/manager/initializer.go
index f51d4e4..4193f4c 100644
--- a/cloud/shepherd/equinix/manager/initializer.go
+++ b/cloud/shepherd/equinix/manager/initializer.go
@@ -6,32 +6,30 @@
 	"crypto/x509"
 	"encoding/hex"
 	"encoding/pem"
-	"errors"
 	"flag"
 	"fmt"
 	"net"
 	"os"
-	"strconv"
 	"strings"
 	"time"
 
 	"github.com/google/uuid"
 	"github.com/packethost/packngo"
 	"golang.org/x/crypto/ssh"
-	"golang.org/x/sync/errgroup"
-	"golang.org/x/time/rate"
 	"google.golang.org/protobuf/proto"
 	"k8s.io/klog/v2"
 
 	apb "source.monogon.dev/cloud/agent/api"
-	"source.monogon.dev/cloud/bmaas/bmdb"
 	"source.monogon.dev/cloud/bmaas/bmdb/model"
 	ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
 )
 
-// AgentConfig configures how the Initializer will deploy Agents on machines. In
-// CLI scenarios, this should be populated from flags via RegisterFlags.
-type AgentConfig struct {
+// InitializerConfig configures how the Initializer will deploy Agents on
+// machines. In CLI scenarios, this should be populated from flags via
+// RegisterFlags.
+type InitializerConfig struct {
+	ControlLoopConfig
+
 	// Executable is the contents of the agent binary created and run
 	// at the provisioned servers. Must be set.
 	Executable []byte
@@ -59,7 +57,9 @@
 	SSHExecTimeout time.Duration
 }
 
-func (a *AgentConfig) RegisterFlags() {
+func (a *InitializerConfig) RegisterFlags() {
+	a.ControlLoopConfig.RegisterFlags("initializer")
+
 	flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
 		if val == "" {
 			return nil
@@ -96,194 +96,89 @@
 	flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
 }
 
-// InitializerConfig configures the broad agent initialization process. The
-// specifics of how an agent is started are instead configured in Agent Config. In
-// CLI scenarios, this should be populated from flags via RegisterFlags.
-type InitializerConfig struct {
-	// DBQueryLimiter limits the rate at which BMDB is queried for servers ready
-	// for BMaaS agent initialization. Must be set.
-	DBQueryLimiter *rate.Limiter
-
-	// Parallelism is how many instances of the Initializer will be allowed to run in
-	// parallel against the BMDB. This speeds up the process of starting/restarting
-	// agents significantly, as one initializer instance can handle at most one agent
-	// (re)starting process.
-	//
-	// If not set (ie. 0), default to 1. A good starting value for production
-	// deployments is 10 or so.
-	Parallelism int
-}
-
-// flagLimiter configures a *rate.Limiter as a flag.
-func flagLimiter(l **rate.Limiter, name, defval, help string) {
-	syntax := "'duration,count' eg. '2m,10' for a 10-sized bucket refilled at one token every 2 minutes"
-	help = help + fmt.Sprintf(" (default: %q, syntax: %s)", defval, syntax)
-	flag.Func(name, help, func(val string) error {
-		if val == "" {
-			val = defval
-		}
-		parts := strings.Split(val, ",")
-		if len(parts) != 2 {
-			return fmt.Errorf("invalid syntax, want: %s", syntax)
-		}
-		duration, err := time.ParseDuration(parts[0])
-		if err != nil {
-			return fmt.Errorf("invalid duration: %w", err)
-		}
-		refill, err := strconv.ParseUint(parts[1], 10, 31)
-		if err != nil {
-			return fmt.Errorf("invalid refill rate: %w", err)
-		}
-		*l = rate.NewLimiter(rate.Every(duration), int(refill))
-		return nil
-	})
-	flag.Set(name, defval)
-}
-
-func (i *InitializerConfig) RegisterFlags() {
-	flagLimiter(&i.DBQueryLimiter, "initializer_db_query_rate", "250ms,8", "Rate limiting for BMDB queries")
-	flag.IntVar(&i.Parallelism, "initializer_parallelism", 1, "How many initializer instances to run in parallel, ie. how many agents to attempt to (re)start at once")
-}
-
-// Initializer implements the BMaaS agent initialization process. Initialization
-// entails asking the BMDB for machines that need the agent started
-// (or-restarted) and acting upon that.
+// The Initializer starts the agent on machines that aren't yet running it.
 type Initializer struct {
-	config       *InitializerConfig
-	agentConfig  *AgentConfig
+	InitializerConfig
+
 	sharedConfig *SharedConfig
+	cl           ecl.Client
+	signer       ssh.Signer
 	sshClient    SSHClient
-	// cl is the packngo wrapper used by the initializer.
-	cl ecl.Client
 }
 
-// task describes a single server currently being processed either in the
-// context of agent initialization or recovery.
-type task struct {
-	// id is the BMDB-assigned machine identifier.
-	id uuid.UUID
-	// pid is an identifier assigned by the provider (Equinix).
-	pid uuid.UUID
-	// work is a machine lock facilitated by BMDB that prevents machines from
-	// being processed by multiple workers at the same time.
-	work *bmdb.Work
-	// dev is a provider machine/device record.
-	dev *packngo.Device
-}
-
-// New creates an Initializer instance, checking the InitializerConfig,
-// SharedConfig and AgentConfig for errors.
-func (c *InitializerConfig) New(cl ecl.Client, sc *SharedConfig, ac *AgentConfig) (*Initializer, error) {
+// NewInitializer creates an Initializer instance, checking the
+// InitializerConfig, SharedConfig and AgentConfig for errors.
+func NewInitializer(cl ecl.Client, ic InitializerConfig, sc *SharedConfig) (*Initializer, error) {
 	if err := sc.check(); err != nil {
 		return nil, err
 	}
-	if len(ac.Executable) == 0 {
+	if err := ic.ControlLoopConfig.Check(); err != nil {
+		return nil, err
+	}
+
+	if len(ic.Executable) == 0 {
 		return nil, fmt.Errorf("agent executable not configured")
 	}
-	if ac.TargetPath == "" {
+	if ic.TargetPath == "" {
 		return nil, fmt.Errorf("agent target path must be set")
 	}
-	if ac.Endpoint == "" {
+	if ic.Endpoint == "" {
 		return nil, fmt.Errorf("agent endpoint must be set")
 	}
-	if ac.SSHConnectTimeout == 0 {
+	if ic.SSHConnectTimeout == 0 {
 		return nil, fmt.Errorf("agent SSH connection timeout must be set")
 	}
-	if ac.SSHExecTimeout == 0 {
+	if ic.SSHExecTimeout == 0 {
 		return nil, fmt.Errorf("agent SSH execution timeout must be set")
 	}
-	if c.DBQueryLimiter == nil {
-		return nil, fmt.Errorf("DBQueryLimiter must be configured")
+
+	signer, err := sc.sshSigner()
+	if err != nil {
+		return nil, fmt.Errorf("could not initialize signer: %w", err)
 	}
-	if c.Parallelism == 0 {
-		c.Parallelism = 1
-	}
+
 	return &Initializer{
-		config:       c,
+		InitializerConfig: ic,
+
 		sharedConfig: sc,
-		agentConfig:  ac,
-		sshClient:    &PlainSSHClient{},
 		cl:           cl,
+		sshClient:    &PlainSSHClient{},
+		signer:       signer,
 	}, nil
 }
 
-// Run the initializer(s) (depending on opts.Parallelism) blocking the current
-// goroutine until the given context expires and all provisioners quit.
-func (i *Initializer) Run(ctx context.Context, conn *bmdb.Connection) error {
-	eg := errgroup.Group{}
-	for j := 0; j < i.config.Parallelism; j += 1 {
-		eg.Go(func() error {
-			return i.runOne(ctx, conn)
-		})
-	}
-	return eg.Wait()
+func (c *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
+	return q.GetMachinesForAgentStart(ctx, limit)
 }
 
-// Run the initializer blocking the current goroutine until the given context
-// expires.
-func (c *Initializer) runOne(ctx context.Context, conn *bmdb.Connection) error {
-	signer, err := c.sharedConfig.sshSigner()
+func (c *Initializer) processMachine(ctx context.Context, t *task) error {
+	dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.machine.ProviderID)
 	if err != nil {
-		return fmt.Errorf("could not initialize signer: %w", err)
-	}
-
-	// Maintain a BMDB session as long as possible.
-	var sess *bmdb.Session
-	for {
-		if sess == nil {
-			sess, err = conn.StartSession(ctx)
-			if err != nil {
-				return fmt.Errorf("could not start BMDB session: %w", err)
-			}
-		}
-		// Inside that session, run the main logic.
-		err = c.runInSession(ctx, sess, signer)
-
-		switch {
-		case err == nil:
-		case errors.Is(err, ctx.Err()):
-			return err
-		case errors.Is(err, bmdb.ErrSessionExpired):
-			klog.Errorf("Session expired, restarting...")
-			sess = nil
-			time.Sleep(time.Second)
-		case err != nil:
-			klog.Errorf("Processing failed: %v", err)
-			// TODO(q3k): close session
-			time.Sleep(time.Second)
-		}
-	}
-}
-
-// runInSession executes one iteration of the initializer's control loop within a
-// BMDB session. This control loop attempts to start or re-start the agent on any
-// machines that need this per the BMDB.
-func (c *Initializer) runInSession(ctx context.Context, sess *bmdb.Session, signer ssh.Signer) error {
-	t, err := c.source(ctx, sess)
-	if err != nil {
-		return fmt.Errorf("could not source machine: %w", err)
-	}
-	if t == nil {
-		return nil
-	}
-	defer t.work.Cancel(ctx)
-
-	klog.Infof("Machine %q needs agent start, fetching corresponding packngo device %q...", t.id, t.pid)
-	dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.pid.String())
-	if err != nil {
-		klog.Errorf("failed to fetch device %q: %v", t.pid, err)
+		klog.Errorf("failed to fetch device %q: %v", t.machine.ProviderID, err)
 		d := 30 * time.Second
 		err = t.work.Fail(ctx, &d, "failed to fetch device from equinix")
 		return err
 	}
-	t.dev = dev
 
-	err = c.init(ctx, signer, t)
+	// Start the agent.
+	klog.Infof("Starting agent on device (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
+	apk, err := c.startAgent(ctx, c.signer, dev, t.machine.MachineID)
 	if err != nil {
-		klog.Errorf("Failed to initialize: %v", err)
-		d := 1 * time.Minute
-		err = t.work.Fail(ctx, &d, fmt.Sprintf("failed to initialize machine: %v", err))
-		return err
+		return fmt.Errorf("while starting the agent: %w", err)
+	}
+
+	// Agent startup succeeded. Set the appropriate BMDB tag, and release the
+	// lock.
+	klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.machine.MachineID, t.machine.ProviderID, hex.EncodeToString(apk))
+	err = t.work.Finish(ctx, func(q *model.Queries) error {
+		return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+			MachineID:      t.machine.MachineID,
+			AgentStartedAt: time.Now(),
+			AgentPublicKey: apk,
+		})
+	})
+	if err != nil {
+		return fmt.Errorf("while setting AgentStarted tag: %w", err)
 	}
 	return nil
 }
@@ -293,7 +188,7 @@
 func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d *packngo.Device, mid uuid.UUID) ([]byte, error) {
 	// Provide a bound on execution time in case we get stuck after the SSH
 	// connection is established.
-	sctx, sctxC := context.WithTimeout(ctx, i.agentConfig.SSHExecTimeout)
+	sctx, sctxC := context.WithTimeout(ctx, i.SSHExecTimeout)
 	defer sctxC()
 
 	// Use the device's IP address exposed by Equinix API.
@@ -308,7 +203,7 @@
 	}
 	klog.V(1).Infof("Dialing device (machine ID: %s, addr: %s).", mid, addr)
 
-	conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.agentConfig.SSHConnectTimeout)
+	conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.SSHConnectTimeout)
 	if err != nil {
 		return nil, fmt.Errorf("while dialing the device: %w", err)
 	}
@@ -317,7 +212,7 @@
 	// Upload the agent executable.
 
 	klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
-	if err := conn.Upload(sctx, i.agentConfig.TargetPath, i.agentConfig.Executable); err != nil {
+	if err := conn.Upload(sctx, i.TargetPath, i.Executable); err != nil {
 		return nil, fmt.Errorf("while uploading agent executable: %w", err)
 	}
 	klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
@@ -326,8 +221,8 @@
 	// standard input.
 	imsg := apb.TakeoverInit{
 		MachineId:     mid.String(),
-		BmaasEndpoint: i.agentConfig.Endpoint,
-		CaCertificate: i.agentConfig.EndpointCACertificate,
+		BmaasEndpoint: i.Endpoint,
+		CaCertificate: i.EndpointCACertificate,
 	}
 	imsgb, err := proto.Marshal(&imsg)
 	if err != nil {
@@ -335,8 +230,8 @@
 	}
 
 	// Start the agent and wait for the agent's output to arrive.
-	klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.agentConfig.TargetPath, mid)
-	stdout, stderr, err := conn.Execute(ctx, i.agentConfig.TargetPath, imsgb)
+	klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.TargetPath, mid)
+	stdout, stderr, err := conn.Execute(ctx, i.TargetPath, imsgb)
 	stderrStr := strings.TrimSpace(string(stderr))
 	if stderrStr != "" {
 		klog.Warningf("Agent stderr: %q", stderrStr)
@@ -367,71 +262,3 @@
 	klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
 	return successResp.Key, nil
 }
-
-// init initializes the server described by t, using BMDB session 'sess' to set
-// the relevant BMDB tag on success, and 'sgn' to authenticate to the server.
-func (ir *Initializer) init(ctx context.Context, sgn ssh.Signer, t *task) error {
-	// Start the agent.
-	klog.Infof("Starting agent on device (ID: %s, PID %s)", t.id, t.pid)
-	apk, err := ir.startAgent(ctx, sgn, t.dev, t.id)
-	if err != nil {
-		return fmt.Errorf("while starting the agent: %w", err)
-	}
-
-	// Agent startup succeeded. Set the appropriate BMDB tag, and release the
-	// lock.
-	klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.id, t.pid, hex.EncodeToString(apk))
-	err = t.work.Finish(ctx, func(q *model.Queries) error {
-		return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
-			MachineID:      t.id,
-			AgentStartedAt: time.Now(),
-			AgentPublicKey: apk,
-		})
-	})
-	if err != nil {
-		return fmt.Errorf("while setting AgentStarted tag: %w", err)
-	}
-	return nil
-}
-
-// source supplies returns a BMDB-locked server ready for initialization, locked
-// by a work item. If both task and error are nil, then there are no machines
-// needed to be initialized.
-// The returned work item in task _must_ be canceled or finished by the caller.
-func (ir *Initializer) source(ctx context.Context, sess *bmdb.Session) (*task, error) {
-	ir.config.DBQueryLimiter.Wait(ctx)
-
-	var machine *model.MachineProvided
-	work, err := sess.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
-		machines, err := q.GetMachinesForAgentStart(ctx, 1)
-		if err != nil {
-			return nil, err
-		}
-		if len(machines) < 1 {
-			return nil, bmdb.ErrNothingToDo
-		}
-		machine = &machines[0]
-		return []uuid.UUID{machines[0].MachineID}, nil
-	})
-
-	if errors.Is(err, bmdb.ErrNothingToDo) {
-		return nil, nil
-	}
-
-	if err != nil {
-		return nil, fmt.Errorf("while querying BMDB agent candidates: %w", err)
-	}
-
-	pid, err := uuid.Parse(machine.ProviderID)
-	if err != nil {
-		t := time.Hour
-		work.Fail(ctx, &t, fmt.Sprintf("could not parse provider UUID %q", machine.ProviderID))
-		return nil, fmt.Errorf("while parsing provider UUID: %w", err)
-	}
-
-	return &task{
-		id:   machine.MachineID,
-		pid:  pid,
-		work: work,
-	}, nil
-}
diff --git a/cloud/shepherd/equinix/manager/initializer_test.go b/cloud/shepherd/equinix/manager/initializer_test.go
index 6e82b98..20c2f16 100644
--- a/cloud/shepherd/equinix/manager/initializer_test.go
+++ b/cloud/shepherd/equinix/manager/initializer_test.go
@@ -64,12 +64,16 @@
 	return nil
 }
 
-// TestInitializerSmokes makes sure the Initializer doesn't go up in flames on
-// the happy path.
-func TestInitializerSmokes(t *testing.T) {
-	ic := InitializerConfig{
-		DBQueryLimiter: rate.NewLimiter(rate.Every(time.Second), 10),
-	}
+type initializerDut struct {
+	f    *fakequinix
+	i    *Initializer
+	bmdb *bmdb.Connection
+	ctx  context.Context
+}
+
+func newInitializerDut(t *testing.T) *initializerDut {
+	t.Helper()
+
 	_, key, _ := ed25519.GenerateKey(rand.Reader)
 	sc := SharedConfig{
 		ProjectId:    "noproject",
@@ -77,22 +81,23 @@
 		Key:          key,
 		DevicePrefix: "test-",
 	}
-	ac := AgentConfig{
+	ic := InitializerConfig{
+		ControlLoopConfig: ControlLoopConfig{
+			DBQueryLimiter: rate.NewLimiter(rate.Every(time.Second), 10),
+		},
 		Executable:        []byte("beep boop i'm a real program"),
 		TargetPath:        "/fake/path",
 		Endpoint:          "example.com:1234",
 		SSHConnectTimeout: time.Second,
 		SSHExecTimeout:    time.Second,
 	}
+
 	f := newFakequinix(sc.ProjectId, 100)
-	i, err := ic.New(f, &sc, &ac)
+	i, err := NewInitializer(f, ic, &sc)
 	if err != nil {
 		t.Fatalf("Could not create Initializer: %v", err)
 	}
 
-	ctx, ctxC := context.WithCancel(context.Background())
-	defer ctxC()
-
 	b := bmdb.BMDB{
 		Config: bmdb.Config{
 			Database: component.CockroachConfig{
@@ -107,12 +112,32 @@
 		t.Fatalf("Could not create in-memory BMDB: %v", err)
 	}
 
+	ctx, ctxC := context.WithCancel(context.Background())
+	t.Cleanup(ctxC)
+
 	if err := sc.SSHEquinixEnsure(ctx, f); err != nil {
 		t.Fatalf("Failed to ensure SSH key: %v", err)
 	}
 
 	i.sshClient = &fakeSSHClient{}
-	go i.Run(ctx, conn)
+	go RunControlLoop(ctx, conn, i)
+
+	return &initializerDut{
+		f:    f,
+		i:    i,
+		bmdb: conn,
+		ctx:  ctx,
+	}
+}
+
+// TestInitializerSmokes makes sure the Initializer doesn't go up in flames on
+// the happy path.
+func TestInitializerSmokes(t *testing.T) {
+	dut := newInitializerDut(t)
+	f := dut.f
+	ctx := dut.ctx
+	conn := dut.bmdb
+	sc := dut.i.sharedConfig
 
 	reservations, _ := f.ListReservations(ctx, sc.ProjectId)
 	kid, err := sc.sshEquinixId(ctx, f)
@@ -161,8 +186,6 @@
 		}
 	}
 
-	go i.Run(ctx, conn)
-
 	// Expect to find 0 machines needing start.
 	for {
 		time.Sleep(100 * time.Millisecond)
diff --git a/cloud/shepherd/equinix/manager/server/main.go b/cloud/shepherd/equinix/manager/server/main.go
index 550eea3..e9b9289 100644
--- a/cloud/shepherd/equinix/manager/server/main.go
+++ b/cloud/shepherd/equinix/manager/server/main.go
@@ -22,7 +22,6 @@
 	BMDB      bmdb.BMDB
 
 	SharedConfig      manager.SharedConfig
-	AgentConfig       manager.AgentConfig
 	ProvisionerConfig manager.ProvisionerConfig
 	InitializerConfig manager.InitializerConfig
 	WebugConfig       webug.Config
@@ -45,7 +44,6 @@
 	c.BMDB.Database.RegisterFlags("bmdb")
 
 	c.SharedConfig.RegisterFlags("")
-	c.AgentConfig.RegisterFlags()
 	c.ProvisionerConfig.RegisterFlags()
 	c.InitializerConfig.RegisterFlags()
 	c.WebugConfig.RegisterFlags()
@@ -84,7 +82,7 @@
 		klog.Exitf("%v", err)
 	}
 
-	initializer, err := c.InitializerConfig.New(api, &c.SharedConfig, &c.AgentConfig)
+	initializer, err := manager.NewInitializer(api, c.InitializerConfig, &c.SharedConfig)
 	if err != nil {
 		klog.Exitf("%v", err)
 	}
@@ -101,7 +99,7 @@
 		}
 	}()
 	go func() {
-		err = initializer.Run(ctx, conn)
+		err = manager.RunControlLoop(ctx, conn, initializer)
 		if err != nil {
 			klog.Exit(err)
 		}