cloud/shepherd/equinix/manager: init

This adds implementation managing Equinix Metal server lifecycle as
part of the BMaaS project.

Co-authored-by: Mateusz Zalega <mateusz@monogon.tech>
Supersedes: https://review.monogon.dev/c/monogon/+/990
Change-Id: I5537b2d07763985ad27aecac544ed19f933d6727
Reviewed-on: https://review.monogon.dev/c/monogon/+/1129
Reviewed-by: Leopold Schabel <leo@monogon.tech>
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql
index ccf1ab3..e1afca2 100644
--- a/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql
+++ b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql
@@ -54,4 +54,4 @@
 );
 
 -- Used by the Shepherd when performing direct actions against a machine.
-ALTER TYPE process ADD VALUE IF NOT EXISTS 'ShepherdInstall';
\ No newline at end of file
+ALTER TYPE process ADD VALUE IF NOT EXISTS 'ShepherdInstall';
diff --git a/cloud/bmaas/bmdb/model/queries_workflows.sql b/cloud/bmaas/bmdb/model/queries_workflows.sql
index b7b60fe..b0132a3 100644
--- a/cloud/bmaas/bmdb/model/queries_workflows.sql
+++ b/cloud/bmaas/bmdb/model/queries_workflows.sql
@@ -1,3 +1,10 @@
+-- name: GetProvidedMachines :many
+SELECT
+    machine_provided.*
+FROM machines
+INNER JOIN machine_provided ON machines.machine_id = machine_provided.machine_id
+WHERE machine_provided.provider = $1;
+
 -- name: GetMachinesForAgentStart :many
 -- Get machines that need agent installed for the first time. Machine can be
 -- assumed to be 'new', with no previous attempts or failures.
@@ -79,4 +86,4 @@
         machine_os_installation_report.machine_id IS NULL
         OR machine_os_installation_report.generation != machine_os_installation_request.generation
     )
-LIMIT $2;
\ No newline at end of file
+LIMIT $2;
diff --git a/cloud/shepherd/equinix/manager/BUILD.bazel b/cloud/shepherd/equinix/manager/BUILD.bazel
new file mode 100644
index 0000000..004cd24
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/BUILD.bazel
@@ -0,0 +1,54 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+    name = "manager",
+    srcs = [
+        "initializer.go",
+        "manager.go",
+        "provisioner.go",
+        "shared_config.go",
+        "ssh.go",
+    ],
+    importpath = "source.monogon.dev/cloud/shepherd/equinix/manager",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//cloud/agent/api",
+        "//cloud/bmaas/bmdb",
+        "//cloud/bmaas/bmdb/model",
+        "//cloud/lib/sinbin",
+        "//cloud/shepherd/equinix/wrapngo",
+        "@com_github_google_uuid//:uuid",
+        "@com_github_packethost_packngo//:packngo",
+        "@com_github_pkg_sftp//:sftp",
+        "@io_k8s_klog_v2//:klog",
+        "@org_golang_google_protobuf//proto",
+        "@org_golang_x_crypto//ssh",
+        "@org_golang_x_time//rate",
+    ],
+)
+
+go_test(
+    name = "manager_test",
+    timeout = "eternal",
+    srcs = [
+        "fakequinix_test.go",
+        "initializer_test.go",
+        "provisioner_test.go",
+    ],
+    data = [
+        "//cloud/shepherd/equinix/manager/test_agent",
+        "@cockroach",
+    ],
+    embed = [":manager"],
+    deps = [
+        "//cloud/agent/api",
+        "//cloud/bmaas/bmdb",
+        "//cloud/bmaas/bmdb/model",
+        "//cloud/lib/component",
+        "@com_github_google_uuid//:uuid",
+        "@com_github_packethost_packngo//:packngo",
+        "@org_golang_google_protobuf//proto",
+        "@org_golang_x_crypto//ssh",
+        "@org_golang_x_time//rate",
+    ],
+)
diff --git a/cloud/shepherd/equinix/manager/README.md b/cloud/shepherd/equinix/manager/README.md
new file mode 100644
index 0000000..e3c0f24
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/README.md
@@ -0,0 +1,54 @@
+Equinix Shepherd
+===
+
+Manages Equinix machines in sync with BMDB contents. Made up of two components:
+
+Provisioner
+---
+
+Brings up machines from hardware reservations and populates BMDB with new Provided machines.
+
+Initializer
+---
+
+Starts the Agent over SSH (wherever necessary per the BMDB) and reports success into the BMDB.
+
+
+Running
+===
+
+Unit Tests
+---
+
+The Shepherd has some basic smoke tests which run against a Fakequinix.
+
+Manual Testing
+---
+
+If you have Equinix credentials, you can run:
+
+```
+$ bazel build //cloud/shepherd/equinix/manager/server
+$ bazel build //cloud/shepherd/equinix/manager/test_agent
+$ bazel-bin/cloud/shepherd/equinix/manager/server/server_/server \
+    -bmdb_eat_my_data \
+    -equinix_project_id FIXME \
+    -equinix_api_username FIXME \
+    -equinix_api_key FIXME \
+    -agent_executable_path bazel-bin/cloud/shepherd/equinix/manager/test_agent/test_agent_/test_agent \
+    -agent_endpoint example.com \
+    -equinix_ssh_key_label $USER-FIXME \
+    -equinix_device_prefix $USER-FIXME- \
+    -provisioner_assimilate -provisioner_max_machines 10
+```
+
+Replace $USER-FIXME with `<your username>-test` or some other unique name/prefix.
+
+This will start a single instance of the provisioner accompanied by a single instance of the initializer.
+
+A persistent SSH key will be created in your current working directory.
+
+Prod Deployment
+---
+
+TODO(q3k): split server binary into separate provisioner/initializer for initializer scalability, as that's the main bottleneck.
\ No newline at end of file
diff --git a/cloud/shepherd/equinix/manager/fakequinix_test.go b/cloud/shepherd/equinix/manager/fakequinix_test.go
new file mode 100644
index 0000000..3970373
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/fakequinix_test.go
@@ -0,0 +1,172 @@
+package manager
+
+import (
+	"context"
+	"net/http"
+	"sync"
+
+	"github.com/google/uuid"
+	"github.com/packethost/packngo"
+)
+
+// fakequinix implements a wrapngo.Client for testing. It starts out with a
+// number of made up hardware reservations, and allows for creating devices and
+// SSH keys.
+type fakequinix struct {
+	mu sync.Mutex
+
+	pid          string
+	devices      map[string]*packngo.Device
+	reservations map[string]*packngo.HardwareReservation
+	sshKeys      map[string]*packngo.SSHKey
+}
+
+// newFakequinix makes a fakequinix with a given fake project ID and number of
+// hardware reservations to create.
+func newFakequinix(pid string, numReservations int) *fakequinix {
+	f := fakequinix{
+		pid:          pid,
+		devices:      make(map[string]*packngo.Device),
+		reservations: make(map[string]*packngo.HardwareReservation),
+		sshKeys:      make(map[string]*packngo.SSHKey),
+	}
+
+	for i := 0; i < numReservations; i++ {
+		uid := uuid.New()
+		f.reservations[uid.String()] = &packngo.HardwareReservation{
+			ID:            uid.String(),
+			ShortID:       uid.String(),
+			Provisionable: true,
+		}
+	}
+
+	return &f
+}
+
+func (f *fakequinix) notFound() error {
+	return &packngo.ErrorResponse{
+		Response: &http.Response{
+			StatusCode: http.StatusNotFound,
+		},
+	}
+}
+
+func (f *fakequinix) GetDevice(_ context.Context, pid, did string) (*packngo.Device, error) {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+
+	if pid != f.pid {
+		return nil, f.notFound()
+	}
+	val := f.devices[did]
+	if val == nil {
+		return nil, f.notFound()
+	}
+	return val, nil
+}
+
+func (f *fakequinix) ListDevices(_ context.Context, pid string) ([]packngo.Device, error) {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+
+	if pid != f.pid {
+		return nil, nil
+	}
+	var res []packngo.Device
+	for _, dev := range f.devices {
+		res = append(res, *dev)
+	}
+	return res, nil
+}
+
+func (f *fakequinix) CreateDevice(_ context.Context, request *packngo.DeviceCreateRequest) (*packngo.Device, error) {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+
+	rid := request.HardwareReservationID
+	res := f.reservations[rid]
+	if res == nil {
+		return nil, f.notFound()
+	}
+	if res.Device != nil {
+		return nil, f.notFound()
+	}
+
+	dev := &packngo.Device{
+		ID:       uuid.New().String(),
+		State:    "very fake",
+		Hostname: request.Hostname,
+		OS: &packngo.OS{
+			Name: request.OS,
+			Slug: request.OS,
+		},
+	}
+	res.Device = dev
+	res.Provisionable = false
+
+	f.devices[dev.ID] = dev
+	return dev, nil
+}
+
+func (f *fakequinix) ListReservations(_ context.Context, pid string) ([]packngo.HardwareReservation, error) {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+
+	var res []packngo.HardwareReservation
+	for _, r := range f.reservations {
+		res = append(res, *r)
+	}
+
+	return res, nil
+}
+
+func (f *fakequinix) ListSSHKeys(_ context.Context) ([]packngo.SSHKey, error) {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+
+	var res []packngo.SSHKey
+	for _, key := range f.sshKeys {
+		res = append(res, *key)
+	}
+
+	return res, nil
+}
+
+func (f *fakequinix) CreateSSHKey(_ context.Context, req *packngo.SSHKeyCreateRequest) (*packngo.SSHKey, error) {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+
+	for _, k := range f.sshKeys {
+		if k.Key == req.Key {
+			return nil, f.notFound()
+		}
+		if k.Label == req.Label {
+			return nil, f.notFound()
+		}
+	}
+
+	uid := uuid.New().String()
+	f.sshKeys[uid] = &packngo.SSHKey{
+		ID:    uid,
+		Label: req.Label,
+		Key:   req.Key,
+	}
+
+	return f.sshKeys[uid], nil
+}
+
+func (f *fakequinix) UpdateSSHKey(_ context.Context, kid string, req *packngo.SSHKeyUpdateRequest) (*packngo.SSHKey, error) {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+
+	key := f.sshKeys[kid]
+	if key == nil {
+		return nil, f.notFound()
+	}
+	key.Key = *req.Key
+
+	return key, nil
+}
+
+func (f *fakequinix) Close() {
+}
diff --git a/cloud/shepherd/equinix/manager/initializer.go b/cloud/shepherd/equinix/manager/initializer.go
new file mode 100644
index 0000000..fd2e00b
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/initializer.go
@@ -0,0 +1,376 @@
+package manager
+
+import (
+	"context"
+	"crypto/ed25519"
+	"encoding/hex"
+	"errors"
+	"flag"
+	"fmt"
+	"net"
+	"os"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/google/uuid"
+	"github.com/packethost/packngo"
+	"golang.org/x/crypto/ssh"
+	"golang.org/x/time/rate"
+	"google.golang.org/protobuf/proto"
+	"k8s.io/klog/v2"
+
+	apb "source.monogon.dev/cloud/agent/api"
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
+)
+
+// AgentConfig configures how the Initializer will deploy Agents on machines. In
+// CLI scenarios, this should be populated from flags via RegisterFlags.
+type AgentConfig struct {
+	// Executable is the contents of the agent binary created and run
+	// at the provisioned servers. Must be set.
+	Executable []byte
+
+	// TargetPath is a filesystem destination path used while uploading the BMaaS
+	// agent executable to hosts as part of the initialization process. Must be set.
+	TargetPath string
+
+	// Endpoint is the address Agent will use to contact the BMaaS
+	// infrastructure. Must be set.
+	Endpoint string
+
+	// SSHTimeout is the amount of time set aside for the initializing
+	// SSH session to run its course. Upon timeout, the iteration would be
+	// declared a failure. Must be set.
+	SSHConnectTimeout time.Duration
+	// SSHExecTimeout is the amount of time set aside for executing the agent and
+	// getting its output once the SSH connection has been established. Upon timeout,
+	// the iteration would be declared as failure. Must be set.
+	SSHExecTimeout time.Duration
+}
+
+func (a *AgentConfig) RegisterFlags() {
+	flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
+		if val == "" {
+			return nil
+		}
+		data, err := os.ReadFile(val)
+		if err != nil {
+			return fmt.Errorf("could not read -agent_executable_path: %w", err)
+		}
+		a.Executable = data
+		return nil
+	})
+	flag.StringVar(&a.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
+	flag.StringVar(&a.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
+	flag.DurationVar(&a.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
+	flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
+}
+
+// InitializerConfig configures the broad agent initialization process. The
+// specifics of how an agent is started are instead configured in Agent Config. In
+// CLI scenarios, this should be populated from flags via RegisterFlags.
+type InitializerConfig struct {
+	// DBQueryLimiter limits the rate at which BMDB is queried for servers ready
+	// for BMaaS agent initialization. Must be set.
+	DBQueryLimiter *rate.Limiter
+}
+
+// flagLimiter configures a *rate.Limiter as a flag.
+func flagLimiter(l **rate.Limiter, name, defval, help string) {
+	syntax := "'duration,count' eg. '2m,10' for a 10-sized bucket refilled at one token every 2 minutes"
+	help = help + fmt.Sprintf(" (default: %q, syntax: %s)", defval, syntax)
+	flag.Func(name, help, func(val string) error {
+		if val == "" {
+			val = defval
+		}
+		parts := strings.Split(val, ",")
+		if len(parts) != 2 {
+			return fmt.Errorf("invalid syntax, want: %s", syntax)
+		}
+		duration, err := time.ParseDuration(parts[0])
+		if err != nil {
+			return fmt.Errorf("invalid duration: %w", err)
+		}
+		refill, err := strconv.ParseUint(parts[1], 10, 31)
+		if err != nil {
+			return fmt.Errorf("invalid refill rate: %w", err)
+		}
+		*l = rate.NewLimiter(rate.Every(duration), int(refill))
+		return nil
+	})
+	flag.Set(name, defval)
+}
+
+func (i *InitializerConfig) RegisterFlags() {
+	flagLimiter(&i.DBQueryLimiter, "initializer_db_query_rate", "250ms,8", "Rate limiting for BMDB queries")
+}
+
+// Initializer implements the BMaaS agent initialization process. Initialization
+// entails asking the BMDB for machines that need the agent started
+// (or-restarted) and acting upon that.
+type Initializer struct {
+	config       *InitializerConfig
+	agentConfig  *AgentConfig
+	sharedConfig *SharedConfig
+	sshClient    SSHClient
+	// cl is the packngo wrapper used by the initializer.
+	cl ecl.Client
+}
+
+// task describes a single server currently being processed either in the
+// context of agent initialization or recovery.
+type task struct {
+	// id is the BMDB-assigned machine identifier.
+	id uuid.UUID
+	// pid is an identifier assigned by the provider (Equinix).
+	pid uuid.UUID
+	// work is a machine lock facilitated by BMDB that prevents machines from
+	// being processed by multiple workers at the same time.
+	work *bmdb.Work
+	// dev is a provider machine/device record.
+	dev *packngo.Device
+}
+
+// New creates an Initializer instance, checking the InitializerConfig,
+// SharedConfig and AgentConfig for errors.
+func (c *InitializerConfig) New(cl ecl.Client, sc *SharedConfig, ac *AgentConfig) (*Initializer, error) {
+	if err := sc.check(); err != nil {
+		return nil, err
+	}
+	if len(ac.Executable) == 0 {
+		return nil, fmt.Errorf("agent executable not configured")
+	}
+	if ac.TargetPath == "" {
+		return nil, fmt.Errorf("agent target path must be set")
+	}
+	if ac.Endpoint == "" {
+		return nil, fmt.Errorf("agent endpoint must be set")
+	}
+	if ac.SSHConnectTimeout == 0 {
+		return nil, fmt.Errorf("agent SSH connection timeout must be set")
+	}
+	if ac.SSHExecTimeout == 0 {
+		return nil, fmt.Errorf("agent SSH execution timeout must be set")
+	}
+	if c.DBQueryLimiter == nil {
+		return nil, fmt.Errorf("DBQueryLimiter must be configured")
+	}
+	return &Initializer{
+		config:       c,
+		sharedConfig: sc,
+		agentConfig:  ac,
+		sshClient:    &PlainSSHClient{},
+		cl:           cl,
+	}, nil
+}
+
+// Run the initializer blocking the current goroutine until the given context
+// expires.
+func (c *Initializer) Run(ctx context.Context, conn *bmdb.Connection) error {
+	signer, err := c.sharedConfig.sshSigner()
+	if err != nil {
+		return fmt.Errorf("could not initialize signer: %w", err)
+	}
+
+	// Maintain a BMDB session as long as possible.
+	var sess *bmdb.Session
+	for {
+		if sess == nil {
+			sess, err = conn.StartSession(ctx)
+			if err != nil {
+				return fmt.Errorf("could not start BMDB session: %w", err)
+			}
+		}
+		// Inside that session, run the main logic.
+		err = c.runInSession(ctx, sess, signer)
+
+		switch {
+		case err == nil:
+		case errors.Is(err, ctx.Err()):
+			return err
+		case errors.Is(err, bmdb.ErrSessionExpired):
+			klog.Errorf("Session expired, restarting...")
+			sess = nil
+			time.Sleep(time.Second)
+		case err != nil:
+			klog.Errorf("Processing failed: %v", err)
+			// TODO(q3k): close session
+			time.Sleep(time.Second)
+		}
+	}
+}
+
+// runInSession executes one iteration of the initializer's control loop within a
+// BMDB session. This control loop attempts to start or re-start the agent on any
+// machines that need this per the BMDB.
+func (c *Initializer) runInSession(ctx context.Context, sess *bmdb.Session, signer ssh.Signer) error {
+	t, err := c.source(ctx, sess)
+	if err != nil {
+		return fmt.Errorf("could not source machine: %w", err)
+	}
+	if t == nil {
+		return nil
+	}
+	defer t.work.Cancel(ctx)
+
+	klog.Infof("Machine %q needs installation, fetching corresponding packngo device %q...", t.id, t.pid)
+	dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.pid.String())
+	if err != nil {
+		klog.Errorf("failed to fetch device %q: %v", t.pid, err)
+		d := 30 * time.Second
+		err = t.work.Fail(ctx, &d, "failed to fetch device from equinix")
+		return err
+	}
+	t.dev = dev
+
+	err = c.init(ctx, signer, t)
+	if err != nil {
+		klog.Errorf("Failed to initialize: %v", err)
+		d := 1 * time.Minute
+		err = t.work.Fail(ctx, &d, fmt.Sprintf("failed to initialize machine: %v", err))
+		return err
+	}
+	return nil
+}
+
+// startAgent runs the agent executable on the target device d, returning the
+// agent's public key on success.
+func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d packngo.Device) ([]byte, error) {
+	// Provide a bound on execution time in case we get stuck after the SSH
+	// connection is established.
+	sctx, sctxC := context.WithTimeout(ctx, i.agentConfig.SSHExecTimeout)
+	defer sctxC()
+
+	// Use the device's IP address exposed by Equinix API.
+	ni := d.GetNetworkInfo()
+	var addr string
+	if ni.PublicIPv4 != "" {
+		addr = net.JoinHostPort(ni.PublicIPv4, "22")
+	} else if ni.PublicIPv6 != "" {
+		addr = net.JoinHostPort(ni.PublicIPv6, "22")
+	} else {
+		return nil, fmt.Errorf("device (ID: %s) has no available addresses", d.ID)
+	}
+	klog.V(1).Infof("Dialing device (provider ID: %s, addr: %s).", d.ID, addr)
+
+	conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.agentConfig.SSHConnectTimeout)
+	if err != nil {
+		return nil, fmt.Errorf("while dialing the device: %w", err)
+	}
+	defer conn.Close()
+
+	// Upload the agent executable.
+
+	klog.Infof("Uploading the agent executable (provider ID: %s, addr: %s).", d.ID, addr)
+	if err := conn.Upload(sctx, i.agentConfig.TargetPath, i.agentConfig.Executable); err != nil {
+		return nil, fmt.Errorf("while uploading agent executable: %w", err)
+	}
+	klog.V(1).Infof("Upload successful (provider ID: %s, addr: %s).", d.ID, addr)
+
+	// The initialization protobuf message will be sent to the agent on its
+	// standard input.
+	imsg := apb.TakeoverInit{
+		Provider:      "equinix",
+		ProviderId:    d.ID,
+		BmaasEndpoint: i.agentConfig.Endpoint,
+	}
+	imsgb, err := proto.Marshal(&imsg)
+	if err != nil {
+		return nil, fmt.Errorf("while marshaling agent message: %w", err)
+	}
+
+	// Start the agent and wait for the agent's output to arrive.
+	klog.V(1).Infof("Starting the agent executable at path %q (provider ID: %s).", i.agentConfig.TargetPath, d.ID)
+	stdout, stderr, err := conn.Execute(ctx, i.agentConfig.TargetPath, imsgb)
+	stderrStr := strings.TrimSpace(string(stderr))
+	if stderrStr != "" {
+		klog.Warningf("Agent stderr: %q", stderrStr)
+	}
+	if err != nil {
+		return nil, fmt.Errorf("while starting the agent executable: %w", err)
+	}
+
+	var arsp apb.TakeoverResponse
+	if err := proto.Unmarshal(stdout, &arsp); err != nil {
+		return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
+	}
+	if !proto.Equal(&imsg, arsp.InitMessage) {
+		return nil, fmt.Errorf("agent did not send back the init message.")
+	}
+	if len(arsp.Key) != ed25519.PublicKeySize {
+		return nil, fmt.Errorf("agent key length mismatch.")
+	}
+	klog.Infof("Started the agent (provider ID: %s, key: %s).", d.ID, hex.EncodeToString(arsp.Key))
+	return arsp.Key, nil
+}
+
+// init initializes the server described by t, using BMDB session 'sess' to set
+// the relevant BMDB tag on success, and 'sgn' to authenticate to the server.
+func (ir *Initializer) init(ctx context.Context, sgn ssh.Signer, t *task) error {
+	// Start the agent.
+	klog.Infof("Starting agent on device (ID: %s, PID %s)", t.id, t.pid)
+	apk, err := ir.startAgent(ctx, sgn, *t.dev)
+	if err != nil {
+		return fmt.Errorf("while installing the agent: %w", err)
+	}
+
+	// Agent startup succeeded. Set the appropriate BMDB tag, and release the
+	// lock.
+	klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.id, t.pid, hex.EncodeToString(apk))
+	err = t.work.Finish(ctx, func(q *model.Queries) error {
+		return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+			MachineID:      t.id,
+			AgentStartedAt: time.Now(),
+			AgentPublicKey: apk,
+		})
+	})
+	if err != nil {
+		return fmt.Errorf("while setting AgentStarted tag: %w", err)
+	}
+	return nil
+}
+
+// source supplies returns a BMDB-locked server ready for initialization, locked
+// by a work item. If both task and error are nil, then there are no machines
+// needed to be initialized.
+// The returned work item in task _must_ be canceled or finished by the caller.
+func (ir *Initializer) source(ctx context.Context, sess *bmdb.Session) (*task, error) {
+	ir.config.DBQueryLimiter.Wait(ctx)
+
+	var machine *model.MachineProvided
+	work, err := sess.Work(ctx, model.ProcessShepherdInstall, func(q *model.Queries) ([]uuid.UUID, error) {
+		machines, err := q.GetMachinesForAgentStart(ctx, 1)
+		if err != nil {
+			return nil, err
+		}
+		if len(machines) < 1 {
+			return nil, bmdb.ErrNothingToDo
+		}
+		machine = &machines[0]
+		return []uuid.UUID{machines[0].MachineID}, nil
+	})
+
+	if errors.Is(err, bmdb.ErrNothingToDo) {
+		return nil, nil
+	}
+
+	if err != nil {
+		return nil, fmt.Errorf("while querying BMDB agent candidates: %w", err)
+	}
+
+	pid, err := uuid.Parse(machine.ProviderID)
+	if err != nil {
+		t := time.Hour
+		work.Fail(ctx, &t, fmt.Sprintf("could not parse provider UUID %q", machine.ProviderID))
+		return nil, fmt.Errorf("while parsing provider UUID: %w", err)
+	}
+
+	return &task{
+		id:   machine.MachineID,
+		pid:  pid,
+		work: work,
+	}, nil
+}
diff --git a/cloud/shepherd/equinix/manager/initializer_test.go b/cloud/shepherd/equinix/manager/initializer_test.go
new file mode 100644
index 0000000..f07341c
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/initializer_test.go
@@ -0,0 +1,183 @@
+package manager
+
+import (
+	"context"
+	"crypto/ed25519"
+	"crypto/rand"
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/packethost/packngo"
+	"golang.org/x/crypto/ssh"
+	"golang.org/x/time/rate"
+	"google.golang.org/protobuf/proto"
+
+	apb "source.monogon.dev/cloud/agent/api"
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	"source.monogon.dev/cloud/lib/component"
+)
+
+// fakeSSHClient is an SSHClient that pretends to start an agent, but in reality
+// just responds with what an agent would respond on every execution attempt.
+type fakeSSHClient struct {
+}
+
+type fakeSSHConnection struct {
+}
+
+func (f *fakeSSHClient) Dial(ctx context.Context, address, username string, sshkey ssh.Signer, timeout time.Duration) (SSHConnection, error) {
+	return &fakeSSHConnection{}, nil
+}
+
+func (f *fakeSSHConnection) Execute(ctx context.Context, command string, stdin []byte) (stdout []byte, stderr []byte, err error) {
+	var aim apb.TakeoverInit
+	if err := proto.Unmarshal(stdin, &aim); err != nil {
+		return nil, nil, fmt.Errorf("while unmarshaling TakeoverInit message: %v", err)
+	}
+
+	// Agent should send back apb.TakeoverResponse on its standard output.
+	pub, _, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		return nil, nil, fmt.Errorf("while generating agent public key: %v", err)
+	}
+	arsp := apb.TakeoverResponse{
+		InitMessage: &aim,
+		Key:         pub,
+	}
+	arspb, err := proto.Marshal(&arsp)
+	if err != nil {
+		return nil, nil, fmt.Errorf("while marshaling TakeoverResponse message: %v", err)
+	}
+	return arspb, nil, nil
+}
+
+func (f *fakeSSHConnection) Upload(ctx context.Context, targetPath string, data []byte) error {
+	if targetPath != "/fake/path" {
+		return fmt.Errorf("unexpected target path in test")
+	}
+	return nil
+}
+
+func (f *fakeSSHConnection) Close() error {
+	return nil
+}
+
+// TestInitializerSmokes makes sure the Initializer doesn't go up in flames on
+// the happy path.
+func TestInitializerSmokes(t *testing.T) {
+	ic := InitializerConfig{
+		DBQueryLimiter: rate.NewLimiter(rate.Every(time.Second), 10),
+	}
+	_, key, _ := ed25519.GenerateKey(rand.Reader)
+	sc := SharedConfig{
+		ProjectId:    "noproject",
+		KeyLabel:     "somekey",
+		Key:          key,
+		DevicePrefix: "test-",
+	}
+	ac := AgentConfig{
+		Executable:        []byte("beep boop i'm a real program"),
+		TargetPath:        "/fake/path",
+		Endpoint:          "example.com:1234",
+		SSHConnectTimeout: time.Second,
+		SSHExecTimeout:    time.Second,
+	}
+	f := newFakequinix(sc.ProjectId, 100)
+	i, err := ic.New(f, &sc, &ac)
+	if err != nil {
+		t.Fatalf("Could not create Initializer: %v", err)
+	}
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	b := bmdb.BMDB{
+		Config: bmdb.Config{
+			Database: component.CockroachConfig{
+				InMemory: true,
+			},
+			ComponentName: "test",
+			RuntimeInfo:   "test",
+		},
+	}
+	conn, err := b.Open(true)
+	if err != nil {
+		t.Fatalf("Could not create in-memory BMDB: %v", err)
+	}
+
+	if err := sc.SSHEquinixEnsure(ctx, f); err != nil {
+		t.Fatalf("Failed to ensure SSH key: %v", err)
+	}
+
+	i.sshClient = &fakeSSHClient{}
+	go i.Run(ctx, conn)
+
+	reservations, _ := f.ListReservations(ctx, sc.ProjectId)
+	kid, err := sc.sshEquinixId(ctx, f)
+	if err != nil {
+		t.Fatalf("Failed to retrieve equinix key ID: %v", err)
+	}
+	sess, err := conn.StartSession(ctx)
+	if err != nil {
+		t.Fatalf("Failed to create BMDB session for verifiaction: %v", err)
+	}
+
+	// Create 10 provided machines for testing.
+	for i := 0; i < 10; i++ {
+		res := reservations[i]
+		dev, _ := f.CreateDevice(ctx, &packngo.DeviceCreateRequest{
+			Hostname:              fmt.Sprintf("test-%d", i),
+			OS:                    "fake",
+			ProjectID:             sc.ProjectId,
+			HardwareReservationID: res.ID,
+			ProjectSSHKeys:        []string{kid},
+		})
+		f.devices[dev.ID].Network = []*packngo.IPAddressAssignment{
+			{
+				IpAddressCommon: packngo.IpAddressCommon{
+					ID:            "fake",
+					Address:       "1.2.3.4",
+					Management:    true,
+					AddressFamily: 4,
+					Public:        true,
+				},
+			},
+		}
+		err = sess.Transact(ctx, func(q *model.Queries) error {
+			machine, err := q.NewMachine(ctx)
+			if err != nil {
+				return err
+			}
+			return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+				MachineID:  machine.MachineID,
+				Provider:   model.ProviderEquinix,
+				ProviderID: dev.ID,
+			})
+		})
+		if err != nil {
+			t.Fatalf("Failed to create BMDB machine: %v", err)
+		}
+	}
+
+	go i.Run(ctx, conn)
+
+	// Expect to find 0 machines needing start.
+	for {
+		time.Sleep(100 * time.Millisecond)
+
+		var machines []model.MachineProvided
+		err = sess.Transact(ctx, func(q *model.Queries) error {
+			var err error
+			machines, err = q.GetMachinesForAgentStart(ctx, 100)
+			return err
+		})
+		if err != nil {
+			t.Fatalf("Failed to run Transaction: %v", err)
+		}
+		if len(machines) == 0 {
+			break
+		}
+	}
+}
diff --git a/cloud/shepherd/equinix/manager/manager.go b/cloud/shepherd/equinix/manager/manager.go
new file mode 100644
index 0000000..3ae7854
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/manager.go
@@ -0,0 +1,18 @@
+// Package manager, itself a part of BMaaS project, provides implementation
+// governing Equinix bare metal server lifecycle according to conditions set by
+// Bare Metal Database (BMDB).
+//
+// The implementation will attempt to provide as many machines as possible and
+// register them with BMDB. This is limited by the count of Hardware
+// Reservations available in the Equinix Metal project used. The BMaaS agent
+// will then be started on these machines as soon as they become ready.
+//
+// The implementation is provided in the form of a library, to which interface is
+// exported through Provisioner and Initializer types, each taking servers
+// through a single stage of their lifecycle.
+//
+// See the included test code for usage examples.
+//
+// The terms "device" and "machine" are used interchangeably throughout this
+// package due to differences in Equinix Metal and BMDB nomenclature.
+package manager
diff --git a/cloud/shepherd/equinix/manager/provisioner.go b/cloud/shepherd/equinix/manager/provisioner.go
new file mode 100644
index 0000000..3964334
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/provisioner.go
@@ -0,0 +1,435 @@
+package manager
+
+import (
+	"context"
+	"errors"
+	"flag"
+	"fmt"
+	"sort"
+	"time"
+
+	"github.com/google/uuid"
+	"github.com/packethost/packngo"
+	"golang.org/x/time/rate"
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	"source.monogon.dev/cloud/lib/sinbin"
+	ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
+)
+
+// ProvisionerConfig configures the provisioning process.
+type ProvisionerConfig struct {
+	// OS defines the operating system new devices are created with. Its format
+	// is specified by Equinix API.
+	OS string
+	// MaxCount is the maximum count of managed servers. No new devices will be
+	// created after reaching the limit. No attempt will be made to reduce the
+	// server count.
+	MaxCount uint
+
+	// ReconcileLoopLimiter limits the rate of the main reconciliation loop
+	// iterating. As new machines are being provisioned, each loop will cause one
+	// 'long' ListHardwareReservations call to Equinix.
+	ReconcileLoopLimiter *rate.Limiter
+
+	// DeviceCreation limits the rate at which devices are created within
+	// Equinix through use of appropriate API calls.
+	DeviceCreationLimiter *rate.Limiter
+
+	// Assimilate Equinix machines that match the configured device prefix into the
+	// BMDB as Provided. This should only be used for manual testing with
+	// -bmdb_eat_my_data.
+	Assimilate bool
+
+	// ReservationChunkSize is how many Equinix machines will try to be spawned in a
+	// single reconciliation loop. Higher numbers allow for faster initial
+	// provisioning, but lower numbers decrease potential raciness with other systems
+	// and make sure that other parts of the reconciliation logic are ran regularly.
+	//
+	// 20 is decent starting point.
+	ReservationChunkSize uint
+}
+
+func (p *ProvisionerConfig) RegisterFlags() {
+	flag.StringVar(&p.OS, "provisioner_os", "ubuntu_20_04", "OS that provisioner will deploy on Equinix machines. Not the target OS for cluster customers.")
+	flag.UintVar(&p.MaxCount, "provisioner_max_machines", 50, "Limit of machines that the provisioner will attempt to pull into the BMDB. Zero for no limit.")
+	flagLimiter(&p.ReconcileLoopLimiter, "provisioner_reconciler_rate", "1m,1", "Rate limiting for main provisioner reconciliation loop")
+	flagLimiter(&p.DeviceCreationLimiter, "provisioner_device_creation_rate", "5s,1", "Rate limiting for Equinix device/machine creation")
+	flag.BoolVar(&p.Assimilate, "provisioner_assimilate", false, "Assimilate matching machines in Equinix project into BMDB as Provided. Only to be used when manually testing.")
+	flag.UintVar(&p.ReservationChunkSize, "provisioner_reservation_chunk_size", 20, "How many machines will the provisioner attempt to create in a single reconciliation loop iteration")
+}
+
+// Provisioner implements the server provisioning logic. Provisioning entails
+// bringing all available hardware reservations (subject to limits) into BMDB as
+// machines provided by Equinix.
+type Provisioner struct {
+	config       *ProvisionerConfig
+	sharedConfig *SharedConfig
+
+	// cl is the wrapngo client instance used.
+	cl ecl.Client
+
+	// badReservations is a holiday resort for Equinix hardware reservations which
+	// failed to be provisioned for some reason or another. We keep a list of them in
+	// memory just so that we don't repeatedly try to provision the same known bad
+	// machines.
+	badReservations sinbin.Sinbin[string]
+}
+
+// New creates a Provisioner instance, checking ProvisionerConfig and
+// SharedConfig for errors.
+func (c *ProvisionerConfig) New(cl ecl.Client, sc *SharedConfig) (*Provisioner, error) {
+	// If these are unset, it's probably because someone is using us as a library.
+	// Provide error messages useful to code users instead of flag names.
+	if c.OS == "" {
+		return nil, fmt.Errorf("OS must be set")
+	}
+	if c.ReconcileLoopLimiter == nil {
+		return nil, fmt.Errorf("ReconcileLoopLimiter must be set")
+	}
+	if c.DeviceCreationLimiter == nil {
+		return nil, fmt.Errorf("DeviceCreationLimiter must be set")
+	}
+	if c.ReservationChunkSize == 0 {
+		return nil, fmt.Errorf("ReservationChunkSize must be set")
+	}
+	return &Provisioner{
+		config:       c,
+		sharedConfig: sc,
+
+		cl: cl,
+	}, nil
+}
+
+// Run the provisioner blocking the current goroutine until the given context
+// expires.
+func (p *Provisioner) Run(ctx context.Context, conn *bmdb.Connection) error {
+
+	var sess *bmdb.Session
+	var err error
+	for {
+		if sess == nil {
+			sess, err = conn.StartSession(ctx)
+			if err != nil {
+				return fmt.Errorf("could not start BMDB session: %w", err)
+			}
+		}
+		err = p.runInSession(ctx, sess)
+
+		switch {
+		case err == nil:
+		case errors.Is(err, ctx.Err()):
+			return err
+		case errors.Is(err, bmdb.ErrSessionExpired):
+			klog.Errorf("Session expired, restarting...")
+			sess = nil
+			time.Sleep(time.Second)
+		case err != nil:
+			klog.Errorf("Processing failed: %v", err)
+			// TODO(q3k): close session
+			time.Sleep(time.Second)
+		}
+	}
+}
+
+type machineListing struct {
+	machines []uuid.UUID
+	err      error
+}
+
+// runInSession executes one iteration of the provisioner's control loop within a
+// BMDB session. This control loop attempts to bring all Equinix hardware
+// reservations into machines in the BMDB, subject to limits.
+func (p *Provisioner) runInSession(ctx context.Context, sess *bmdb.Session) error {
+	if err := p.config.ReconcileLoopLimiter.Wait(ctx); err != nil {
+		return err
+	}
+
+	providerC := make(chan *machineListing, 1)
+	bmdbC := make(chan *machineListing, 1)
+
+	klog.Infof("Getting provider and bmdb machines...")
+
+	// Make sub-context for two parallel operations, and so that we can cancel one
+	// immediately if the other fails.
+	subCtx, subCtxC := context.WithCancel(ctx)
+	defer subCtxC()
+
+	go func() {
+		machines, err := p.listInProvider(subCtx)
+		providerC <- &machineListing{
+			machines: machines,
+			err:      err,
+		}
+	}()
+	go func() {
+		machines, err := p.listInBMDB(subCtx, sess)
+		bmdbC <- &machineListing{
+			machines: machines,
+			err:      err,
+		}
+	}()
+	var inProvider, inBMDB *machineListing
+	for {
+		select {
+		case inProvider = <-providerC:
+			if err := inProvider.err; err != nil {
+				return fmt.Errorf("listing provider machines failed: %w", err)
+			}
+			klog.Infof("Got %d machines managed in provider.", len(inProvider.machines))
+		case inBMDB = <-bmdbC:
+			if err := inBMDB.err; err != nil {
+				return fmt.Errorf("listing BMDB machines failed: %w", err)
+			}
+			klog.Infof("Got %d machines in BMDB.", len(inBMDB.machines))
+		}
+		if inProvider != nil && inBMDB != nil {
+			break
+		}
+	}
+
+	subCtxC()
+	if err := p.reconcile(ctx, sess, inProvider.machines, inBMDB.machines); err != nil {
+		return fmt.Errorf("reconciliation failed: %w", err)
+	}
+	return nil
+}
+
+// listInProviders returns all machines that the provider thinks we should be
+// managing.
+func (p *Provisioner) listInProvider(ctx context.Context) ([]uuid.UUID, error) {
+	devices, err := p.sharedConfig.managedDevices(ctx, p.cl)
+	if err != nil {
+		return nil, fmt.Errorf("while fetching managed machines: %w", err)
+	}
+	var pvr []uuid.UUID
+	for _, dev := range devices {
+		id, err := uuid.Parse(dev.ID)
+		if err != nil {
+			klog.Errorf("Device ID %q is not UUID, skipping", dev.ID)
+		} else {
+			pvr = append(pvr, id)
+		}
+	}
+	sort.Slice(pvr, func(i, j int) bool {
+		return pvr[i].String() < pvr[j].String()
+	})
+	return pvr, nil
+}
+
+// listInBMDB returns all the machines that the BMDB thinks we should be managing.
+func (p *Provisioner) listInBMDB(ctx context.Context, sess *bmdb.Session) ([]uuid.UUID, error) {
+	var res []uuid.UUID
+	err := sess.Transact(ctx, func(q *model.Queries) error {
+		machines, err := q.GetProvidedMachines(ctx, model.ProviderEquinix)
+		if err != nil {
+			return err
+		}
+		res = make([]uuid.UUID, len(machines))
+		for i, machine := range machines {
+			id, err := uuid.Parse(machine.ProviderID)
+			if err != nil {
+				klog.Errorf("BMDB machine %s has unparseable provider ID %q", machine.MachineID, machine.ProviderID)
+			} else {
+				res[i] = id
+			}
+		}
+		return nil
+	})
+	if err != nil {
+		return nil, err
+	}
+	sort.Slice(res, func(i, j int) bool {
+		return res[i].String() < res[j].String()
+	})
+	return res, nil
+}
+
+// reconcile takes a list of machines that the provider thinks we should be
+// managing and that the BMDB thinks we should be managing, and tries to make
+// sense of that. First, some checks are performed across the two lists to make
+// sure we haven't dropped anything. Then, additional machines are deployed from
+// hardware reservations as needed.
+func (p *Provisioner) reconcile(ctx context.Context, sess *bmdb.Session, inProvider, inBMDB []uuid.UUID) error {
+	klog.Infof("Reconciling...")
+
+	bmdb := make(map[string]bool)
+	provider := make(map[string]bool)
+	for _, machine := range inProvider {
+		provider[machine.String()] = true
+	}
+	for _, machine := range inBMDB {
+		bmdb[machine.String()] = true
+	}
+
+	managed := make(map[string]bool)
+
+	// Some desynchronization between the BMDB and Provider point of view might be so
+	// bad we shouldn't attempt to do any work, at least not any time soon.
+	badbadnotgood := false
+
+	// Find any machines supposedly managed by us in the provider, but not in the
+	// BMDB, and assimilate them if so configured.
+	for machine, _ := range provider {
+		if bmdb[machine] {
+			managed[machine] = true
+			continue
+		}
+		if p.config.Assimilate {
+			klog.Warningf("Provider machine %s has no corresponding machine in BMDB. Assimilating it.", machine)
+			if err := p.assimilate(ctx, sess, machine); err != nil {
+				klog.Errorf("Failed to assimilate: %v", err)
+			} else {
+				managed[machine] = true
+			}
+		} else {
+			klog.Errorf("Provider machine %s has no corresponding machine in BMDB.", machine)
+			badbadnotgood = true
+		}
+	}
+
+	// Find any machines in the BMDB but not in the provider.
+	for machine, _ := range bmdb {
+		if !provider[machine] {
+			klog.Errorf("Provider device ID %s referred to in BMDB (from TODO) but missing in provider.", machine)
+			badbadnotgood = true
+		}
+	}
+
+	// Bail if things are weird.
+	if badbadnotgood {
+		klog.Errorf("Something's very wrong. Bailing early and refusing to do any work.")
+		return fmt.Errorf("fatal discrepency between BMDB and provider")
+	}
+
+	// Summarize all managed machines, which is the intersection of BMDB and
+	// Provisioner machines, usually both of these sets being equal.
+	nmanaged := len(managed)
+	klog.Infof("Total managed machines: %d", nmanaged)
+
+	if p.config.MaxCount != 0 && p.config.MaxCount <= uint(nmanaged) {
+		klog.Infof("Not bringing up more machines (at limit of %d machines)", p.config.MaxCount)
+		return nil
+	}
+
+	limitName := "no limit"
+	if p.config.MaxCount != 0 {
+		limitName = fmt.Sprintf("%d", p.config.MaxCount)
+	}
+	klog.Infof("Below managed machine limit (%s), bringing up more...", limitName)
+	klog.Infof("Retrieving hardware reservations, this will take a while...")
+	reservations, err := p.cl.ListReservations(ctx, p.sharedConfig.ProjectId)
+	if err != nil {
+		return fmt.Errorf("failed to list reservations: %w", err)
+	}
+
+	// Collect all reservations.
+	var toProvision []packngo.HardwareReservation
+	for _, reservation := range reservations {
+		if !reservation.Provisionable {
+			continue
+		}
+		if reservation.Device != nil {
+			if managed[reservation.Device.ID] {
+				continue
+			}
+		}
+		if p.badReservations.Penalized(reservation.ID) {
+			continue
+		}
+		toProvision = append(toProvision, reservation)
+	}
+
+	// Limit them to MaxCount, if applicable.
+	if p.config.MaxCount != 0 {
+		needed := int(p.config.MaxCount) - nmanaged
+		if len(toProvision) < needed {
+			needed = len(toProvision)
+		}
+		toProvision = toProvision[:needed]
+	}
+
+	// Limit them to an arbitrary 'chunk' size so that we don't do too many things in
+	// a single reconciliation operation.
+	if uint(len(toProvision)) > p.config.ReservationChunkSize {
+		toProvision = toProvision[:p.config.ReservationChunkSize]
+	}
+
+	if len(toProvision) == 0 {
+		klog.Infof("No more hardware reservations available, or all filtered out.")
+		return nil
+	}
+
+	klog.Infof("Bringing up %d machines...", len(toProvision))
+	for _, res := range toProvision {
+		p.config.DeviceCreationLimiter.Wait(ctx)
+		if err := p.provision(ctx, sess, res); err != nil {
+			klog.Errorf("Failed to provision reservation %s: %v", res.ID, err)
+			until := time.Now().Add(time.Hour)
+			klog.Errorf("Adding hardware reservation %s to sinbin until %s", res.ID, until)
+			p.badReservations.Add(res.ID, until)
+		}
+	}
+
+	return nil
+}
+
+// provision attempts to create a device within Equinix using given Hardware
+// Reservation rsv. The resulting device is registered with BMDB, and tagged as
+// "provided" in the process.
+func (pr *Provisioner) provision(ctx context.Context, sess *bmdb.Session, rsv packngo.HardwareReservation) error {
+	klog.Infof("Creating a new device using reservation ID %s.", rsv.ID)
+	hostname := pr.sharedConfig.DevicePrefix + rsv.ID[:18]
+	kid, err := pr.sharedConfig.sshEquinixId(ctx, pr.cl)
+	if err != nil {
+		return err
+	}
+	req := &packngo.DeviceCreateRequest{
+		Hostname:              hostname,
+		OS:                    pr.config.OS,
+		Plan:                  rsv.Plan.Slug,
+		ProjectID:             pr.sharedConfig.ProjectId,
+		HardwareReservationID: rsv.ID,
+		ProjectSSHKeys:        []string{kid},
+	}
+	nd, err := pr.cl.CreateDevice(ctx, req)
+	if err != nil {
+		return fmt.Errorf("while creating new device within Equinix: %w", err)
+	}
+	klog.Infof("Created a new device within Equinix (PID: %s).", nd.ID)
+
+	err = pr.assimilate(ctx, sess, nd.ID)
+	if err != nil {
+		// TODO(mateusz@monogon.tech) at this point the device at Equinix isn't
+		// matched by a BMDB record. Schedule device deletion or make sure this
+		// case is being handled elsewhere.
+		return err
+	}
+	return nil
+}
+
+// assimilate brings in an already existing machine from Equinix into the BMDB.
+// This is only used in manual testing.
+func (pr *Provisioner) assimilate(ctx context.Context, sess *bmdb.Session, deviceID string) error {
+	return sess.Transact(ctx, func(q *model.Queries) error {
+		// Create a new machine record within BMDB.
+		m, err := q.NewMachine(ctx)
+		if err != nil {
+			return fmt.Errorf("while creating a new machine record in BMDB: %w", err)
+		}
+
+		// Link the new machine with the Equinix device, and tag it "provided".
+		p := model.MachineAddProvidedParams{
+			MachineID:  m.MachineID,
+			ProviderID: deviceID,
+			Provider:   model.ProviderEquinix,
+		}
+		klog.Infof("Setting \"provided\" tag (ID: %s, PID: %s, Provider: %s).", p.MachineID, p.ProviderID, p.Provider)
+		if err := q.MachineAddProvided(ctx, p); err != nil {
+			return fmt.Errorf("while tagging machine active: %w", err)
+		}
+		return nil
+	})
+}
diff --git a/cloud/shepherd/equinix/manager/provisioner_test.go b/cloud/shepherd/equinix/manager/provisioner_test.go
new file mode 100644
index 0000000..80a90b8
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/provisioner_test.go
@@ -0,0 +1,94 @@
+package manager
+
+import (
+	"context"
+	"crypto/ed25519"
+	"crypto/rand"
+	"testing"
+	"time"
+
+	"golang.org/x/time/rate"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	"source.monogon.dev/cloud/lib/component"
+)
+
+// TestProvisionerSmokes makes sure the Provisioner doesn't go up in flames on
+// the happy path.
+func TestProvisionerSmokes(t *testing.T) {
+	pc := ProvisionerConfig{
+		OS:       "fake",
+		MaxCount: 10,
+		// We need 3 iterations to provide 10 machines with a chunk size of 4.
+		ReconcileLoopLimiter:  rate.NewLimiter(rate.Every(10*time.Second), 3),
+		DeviceCreationLimiter: rate.NewLimiter(rate.Every(time.Second), 10),
+		ReservationChunkSize:  4,
+	}
+	_, key, _ := ed25519.GenerateKey(rand.Reader)
+	sc := SharedConfig{
+		ProjectId:    "noproject",
+		KeyLabel:     "somekey",
+		Key:          key,
+		DevicePrefix: "test-",
+	}
+	f := newFakequinix(sc.ProjectId, 100)
+	p, err := pc.New(f, &sc)
+	if err != nil {
+		t.Fatalf("Could not create Provisioner: %v", err)
+	}
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	b := bmdb.BMDB{
+		Config: bmdb.Config{
+			Database: component.CockroachConfig{
+				InMemory: true,
+			},
+			ComponentName: "test",
+			RuntimeInfo:   "test",
+		},
+	}
+	conn, err := b.Open(true)
+	if err != nil {
+		t.Fatalf("Could not create in-memory BMDB: %v", err)
+	}
+
+	if err := sc.SSHEquinixEnsure(ctx, f); err != nil {
+		t.Fatalf("Failed to ensure SSH key: %v", err)
+	}
+	go p.Run(ctx, conn)
+
+	sess, err := conn.StartSession(ctx)
+	if err != nil {
+		t.Fatalf("Failed to create BMDB session for verification: %v", err)
+	}
+	for {
+		time.Sleep(100 * time.Millisecond)
+
+		var provided []model.MachineProvided
+		err = sess.Transact(ctx, func(q *model.Queries) error {
+			var err error
+			provided, err = q.GetProvidedMachines(ctx, model.ProviderEquinix)
+			return err
+		})
+		if err != nil {
+			t.Errorf("Transact failed: %v", err)
+		}
+		if len(provided) < 10 {
+			continue
+		}
+		if len(provided) > 10 {
+			t.Errorf("%d machines provided (limit: 10)", len(provided))
+		}
+
+		for _, mp := range provided {
+			if f.devices[mp.ProviderID] == nil {
+				t.Errorf("BMDB machine %q has unknown provider ID %q", mp.MachineID, mp.ProviderID)
+			}
+		}
+
+		return
+	}
+}
diff --git a/cloud/shepherd/equinix/manager/server/BUILD.bazel b/cloud/shepherd/equinix/manager/server/BUILD.bazel
new file mode 100644
index 0000000..8593004
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/server/BUILD.bazel
@@ -0,0 +1,22 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+
+go_library(
+    name = "server_lib",
+    srcs = ["main.go"],
+    importpath = "source.monogon.dev/cloud/shepherd/equinix/manager/server",
+    visibility = ["//visibility:private"],
+    deps = [
+        "//cloud/bmaas/bmdb",
+        "//cloud/lib/component",
+        "//cloud/shepherd/equinix/manager",
+        "//cloud/shepherd/equinix/wrapngo",
+        "//metropolis/cli/pkg/context",
+        "@io_k8s_klog//:klog",
+    ],
+)
+
+go_binary(
+    name = "server",
+    embed = [":server_lib"],
+    visibility = ["//visibility:public"],
+)
diff --git a/cloud/shepherd/equinix/manager/server/main.go b/cloud/shepherd/equinix/manager/server/main.go
new file mode 100644
index 0000000..567ebd7
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/server/main.go
@@ -0,0 +1,107 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"strings"
+
+	"k8s.io/klog"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/lib/component"
+	"source.monogon.dev/cloud/shepherd/equinix/manager"
+	"source.monogon.dev/cloud/shepherd/equinix/wrapngo"
+	clicontext "source.monogon.dev/metropolis/cli/pkg/context"
+)
+
+type Config struct {
+	Component component.ComponentConfig
+	BMDB      bmdb.BMDB
+
+	SharedConfig      manager.SharedConfig
+	AgentConfig       manager.AgentConfig
+	ProvisionerConfig manager.ProvisionerConfig
+	InitializerConfig manager.InitializerConfig
+	API               wrapngo.Opts
+}
+
+// TODO(q3k): factor this out to BMDB library?
+func runtimeInfo() string {
+	hostname, _ := os.Hostname()
+	if hostname == "" {
+		hostname = "UNKNOWN"
+	}
+	return fmt.Sprintf("host %s", hostname)
+}
+
+func (c *Config) RegisterFlags() {
+	c.Component.RegisterFlags("shepherd")
+	c.BMDB.ComponentName = "shepherd-equinix"
+	c.BMDB.RuntimeInfo = runtimeInfo()
+	c.BMDB.Database.RegisterFlags("bmdb")
+
+	c.SharedConfig.RegisterFlags("")
+	c.AgentConfig.RegisterFlags()
+	c.ProvisionerConfig.RegisterFlags()
+	c.InitializerConfig.RegisterFlags()
+	c.API.RegisterFlags()
+}
+
+func main() {
+	c := &Config{}
+	c.RegisterFlags()
+	flag.Parse()
+
+	ctx := clicontext.WithInterrupt(context.Background())
+
+	if c.API.APIKey == "" || c.API.User == "" {
+		klog.Exitf("-equinix_api_username and -equinix_api_key must be set")
+	}
+	api := wrapngo.New(&c.API)
+
+	// These variables are _very_ important to configure correctly, otherwise someone
+	// running this locally with prod creds will actually destroy production
+	// data.
+	if strings.Contains(c.SharedConfig.KeyLabel, "FIXME") {
+		klog.Exitf("refusing to run with -equinix_ssh_key_label %q, please set it to something unique", c.SharedConfig.KeyLabel)
+	}
+	if strings.Contains(c.SharedConfig.DevicePrefix, "FIXME") {
+		klog.Exitf("refusing to run with -equinix_device_prefix %q, please set it to something unique", c.SharedConfig.DevicePrefix)
+	}
+
+	klog.Infof("Ensuring our SSH key is configured...")
+	if err := c.SharedConfig.SSHEquinixEnsure(ctx, api); err != nil {
+		klog.Exitf("Ensuring SSH key failed: %v", err)
+	}
+
+	provisioner, err := c.ProvisionerConfig.New(api, &c.SharedConfig)
+	if err != nil {
+		klog.Exitf("%v", err)
+	}
+
+	initializer, err := c.InitializerConfig.New(api, &c.SharedConfig, &c.AgentConfig)
+	if err != nil {
+		klog.Exitf("%v", err)
+	}
+
+	conn, err := c.BMDB.Open(true)
+	if err != nil {
+		klog.Exitf("Failed to open BMDB connection: %v", err)
+	}
+	go func() {
+		err = provisioner.Run(ctx, conn)
+		if err != nil {
+			klog.Exit(err)
+		}
+	}()
+	go func() {
+		err = initializer.Run(ctx, conn)
+		if err != nil {
+			klog.Exit(err)
+		}
+	}()
+
+	<-ctx.Done()
+}
diff --git a/cloud/shepherd/equinix/manager/shared_config.go b/cloud/shepherd/equinix/manager/shared_config.go
new file mode 100644
index 0000000..6ece4ce
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/shared_config.go
@@ -0,0 +1,256 @@
+package manager
+
+import (
+	"context"
+	"crypto/ed25519"
+	"crypto/rand"
+	"errors"
+	"flag"
+	"fmt"
+	"os"
+	"strings"
+	"sync"
+
+	"github.com/packethost/packngo"
+	"golang.org/x/crypto/ssh"
+	"k8s.io/klog/v2"
+
+	ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
+)
+
+var (
+	NoSuchKey = errors.New("no such key")
+)
+
+// SharedConfig contains configuration options used by both the Initializer and
+// Provisioner components of the Shepherd. In CLI scenarios, RegisterFlags should
+// be called to configure this struct from CLI flags. Otherwise, this structure
+// should be explicitly configured, as the default values are not valid.
+type SharedConfig struct {
+	// ProjectId is the Equinix project UUID used by the manager. See Equinix API
+	// documentation for details. Must be set.
+	ProjectId string
+
+	// Label specifies the ID to use when handling the Equinix-registered SSH key
+	// used to authenticate to newly created servers. Must be set.
+	KeyLabel string
+
+	// myKey guards Key.
+	muKey sync.Mutex
+
+	// SSH key to use when creating machines and then connecting to them. If not
+	// provided, it will be automatically loaded from KeyPersistPath, and if that
+	// doesn't exist either, it will be first generated and persisted there.
+	Key ed25519.PrivateKey
+
+	// Path at which the SSH key will be loaded from and persisted to, if Key is not
+	// explicitly set. Either KeyPersistPath or Key must be set.
+	KeyPersistPath string
+
+	// Prefix applied to all devices (machines) created by the Provisioner, and used
+	// by the Provisioner to identify machines which it managed. Must be set.
+	DevicePrefix string
+
+	// configPrefix will be set to the prefix of the latest RegisterFlags call and
+	// will be then used by various methods to display the full name of a
+	// misconfigured flag.
+	configPrefix string
+}
+
+func (c *SharedConfig) check() error {
+	if c.ProjectId == "" {
+		return fmt.Errorf("-%sequinix_project_id must be set", c.configPrefix)
+	}
+	if c.KeyLabel == "" {
+		return fmt.Errorf("-%sequinix_ssh_key_label must be set", c.configPrefix)
+	}
+	if c.DevicePrefix == "" {
+		return fmt.Errorf("-%sequinix_device_prefix must be set", c.configPrefix)
+	}
+	return nil
+}
+
+func (k *SharedConfig) RegisterFlags(prefix string) {
+	k.configPrefix = prefix
+
+	flag.StringVar(&k.ProjectId, prefix+"equinix_project_id", "", "Equinix project ID where resources will be managed")
+	flag.StringVar(&k.KeyLabel, prefix+"equinix_ssh_key_label", "shepherd-FIXME", "Label used to identify managed SSH key in Equinix project")
+	flag.StringVar(&k.KeyPersistPath, prefix+"ssh_key_path", "shepherd-key.priv", "Local filesystem path to read SSH key from, and save generated key to")
+	flag.StringVar(&k.DevicePrefix, prefix+"equinix_device_prefix", "shepherd-FIXME-", "Prefix applied to all devices (machines) in Equinix project, used to identify managed machines")
+}
+
+// sshKey returns the SSH key as defined by the Key and KeyPersistPath options,
+// loading/generating/persisting it as necessary.
+func (c *SharedConfig) sshKey() (ed25519.PrivateKey, error) {
+	c.muKey.Lock()
+	defer c.muKey.Unlock()
+
+	if c.Key != nil {
+		return c.Key, nil
+	}
+	if c.KeyPersistPath == "" {
+		return nil, fmt.Errorf("-%sequinix_ssh_key_path must be set", c.configPrefix)
+	}
+
+	data, err := os.ReadFile(c.KeyPersistPath)
+	switch {
+	case err == nil:
+		if len(data) != ed25519.PrivateKeySize {
+			return nil, fmt.Errorf("%s is not a valid ed25519 private key", c.KeyPersistPath)
+		}
+		c.Key = data
+		klog.Infof("Loaded SSH key from %s", c.KeyPersistPath)
+		return c.Key, nil
+	case os.IsNotExist(err):
+		if err := c.sshGenerateUnlocked(); err != nil {
+			return nil, err
+		}
+		if err := os.WriteFile(c.KeyPersistPath, c.Key, 0400); err != nil {
+			return nil, fmt.Errorf("could not persist key: %w", err)
+		}
+		return c.Key, nil
+	default:
+		return nil, fmt.Errorf("could not load peristed key: %w", err)
+	}
+}
+
+// sshPub returns the SSH public key marshaled for use, based on sshKey.
+func (c *SharedConfig) sshPub() (string, error) {
+	private, err := c.sshKey()
+	if err != nil {
+		return "", err
+	}
+	// Marshal the public key part in OpenSSH authorized_keys format that will be
+	// registered with Equinix Metal.
+	sshpub, err := ssh.NewPublicKey(private.Public())
+	if err != nil {
+		return "", fmt.Errorf("while building SSH public key: %w", err)
+	}
+	return string(ssh.MarshalAuthorizedKey(sshpub)), nil
+}
+
+// sshSigner builds an ssh.Signer (for use in SSH connections) based on sshKey.
+func (c *SharedConfig) sshSigner() (ssh.Signer, error) {
+	private, err := c.sshKey()
+	if err != nil {
+		return nil, err
+	}
+	// Set up the internal ssh.Signer to be later used to initiate SSH
+	// connections with newly provided hosts.
+	signer, err := ssh.NewSignerFromKey(private)
+	if err != nil {
+		return nil, fmt.Errorf("while building SSH signer: %w", err)
+	}
+	return signer, nil
+}
+
+// sshGenerateUnlocked saves a new private key into SharedConfig.Key.
+func (c *SharedConfig) sshGenerateUnlocked() error {
+	if c.Key != nil {
+		return nil
+	}
+	_, priv, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		return fmt.Errorf("while generating SSH key: %w", err)
+	}
+	c.Key = priv
+	return nil
+}
+
+// sshEquinixGet looks up the Equinix key matching SharedConfig.KeyLabel,
+// returning its packngo.SSHKey instance.
+func (c *SharedConfig) sshEquinix(ctx context.Context, cl ecl.Client) (*packngo.SSHKey, error) {
+	ks, err := cl.ListSSHKeys(ctx)
+	if err != nil {
+		return nil, fmt.Errorf("while listing SSH keys: %w", err)
+	}
+
+	for _, k := range ks {
+		if k.Label == c.KeyLabel {
+			return &k, nil
+		}
+	}
+	return nil, NoSuchKey
+}
+
+// sshEquinixId looks up the Equinix key identified by SharedConfig.KeyLabel,
+// returning its Equinix-assigned UUID.
+func (c *SharedConfig) sshEquinixId(ctx context.Context, cl ecl.Client) (string, error) {
+	k, err := c.sshEquinix(ctx, cl)
+	if err != nil {
+		return "", err
+	}
+	return k.ID, nil
+}
+
+// sshEquinixUpdate makes sure the existing SSH key registered with Equinix
+// matches the one from sshPub.
+func (c *SharedConfig) sshEquinixUpdate(ctx context.Context, cl ecl.Client, kid string) error {
+	pub, err := c.sshPub()
+	if err != nil {
+		return err
+	}
+	_, err = cl.UpdateSSHKey(ctx, kid, &packngo.SSHKeyUpdateRequest{
+		Key: &pub,
+	})
+	if err != nil {
+		return fmt.Errorf("while updating the SSH key: %w", err)
+	}
+	return nil
+}
+
+// sshEquinixUpload registers a new SSH key from sshPub.
+func (c *SharedConfig) sshEquinixUpload(ctx context.Context, cl ecl.Client) error {
+	pub, err := c.sshPub()
+	if err != nil {
+		return fmt.Errorf("while generating public key: %w", err)
+	}
+	_, err = cl.CreateSSHKey(ctx, &packngo.SSHKeyCreateRequest{
+		Label:     c.KeyLabel,
+		Key:       pub,
+		ProjectID: c.ProjectId,
+	})
+	if err != nil {
+		return fmt.Errorf("while creating an SSH key: %w", err)
+	}
+	return nil
+}
+
+// SSHEquinixEnsure initializes the locally managed SSH key (from a persistence
+// path or explicitly set key) and updates or uploads it to Equinix. The key is
+// generated as needed The key is generated as needed
+func (c *SharedConfig) SSHEquinixEnsure(ctx context.Context, cl ecl.Client) error {
+	k, err := c.sshEquinix(ctx, cl)
+	switch err {
+	case NoSuchKey:
+		if err := c.sshEquinixUpload(ctx, cl); err != nil {
+			return fmt.Errorf("while uploading key: %w", err)
+		}
+		return nil
+	case nil:
+		if err := c.sshEquinixUpdate(ctx, cl, k.ID); err != nil {
+			return fmt.Errorf("while updating key: %w", err)
+		}
+		return nil
+	default:
+		return err
+	}
+}
+
+// managedDevices provides a map of device provider IDs to matching
+// packngo.Device instances. It calls Equinix API's ListDevices. The returned
+// devices are filtered according to DevicePrefix provided through Opts. The
+// returned error value, if not nil, will originate in wrapngo.
+func (c *SharedConfig) managedDevices(ctx context.Context, cl ecl.Client) (map[string]packngo.Device, error) {
+	ds, err := cl.ListDevices(ctx, c.ProjectId)
+	if err != nil {
+		return nil, err
+	}
+	dm := map[string]packngo.Device{}
+	for _, d := range ds {
+		if strings.HasPrefix(d.Hostname, c.DevicePrefix) {
+			dm[d.ID] = d
+		}
+	}
+	return dm, nil
+}
diff --git a/cloud/shepherd/equinix/manager/ssh.go b/cloud/shepherd/equinix/manager/ssh.go
new file mode 100644
index 0000000..3eff4c5
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/ssh.go
@@ -0,0 +1,142 @@
+package manager
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"io"
+	"net"
+	"time"
+
+	"github.com/pkg/sftp"
+	"golang.org/x/crypto/ssh"
+)
+
+// SSHClient defines a simple interface to an abstract SSH client. Usually this
+// would be PlainSSHClient, but tests can use this interface to dependency-inject
+// fake SSH connections.
+type SSHClient interface {
+	// Dial returns an SSHConnection to a given address (host:port pair) using a
+	// given username/sshkey for authentication, and with a timeout for connection.
+	Dial(ctx context.Context, address string, username string, sshkey ssh.Signer, connectTimeout time.Duration) (SSHConnection, error)
+}
+
+type SSHConnection interface {
+	// Execute a given command on a remote host synchronously, passing in stdin as
+	// input, and returning a captured stdout/stderr. The returned data might be
+	// valid even when err != nil, which might happen if the remote side returned a
+	// non-zero exit code.
+	Execute(ctx context.Context, command string, stdin []byte) (stdout []byte, stderr []byte, err error)
+	// Upload a given blob to a targetPath on the system and make executable.
+	Upload(ctx context.Context, targetPath string, data []byte) error
+	// Close this connection.
+	Close() error
+}
+
+// PlainSSHClient implements SSHClient (and SSHConnection) using
+// golang.org/x/crypto/ssh.
+type PlainSSHClient struct {
+}
+
+type plainSSHConn struct {
+	cl *ssh.Client
+}
+
+func (p *PlainSSHClient) Dial(ctx context.Context, address, username string, sshkey ssh.Signer, connectTimeout time.Duration) (SSHConnection, error) {
+	d := net.Dialer{
+		Timeout: connectTimeout,
+	}
+	conn, err := d.DialContext(ctx, "tcp", address)
+	if err != nil {
+		return nil, err
+	}
+	conf := &ssh.ClientConfig{
+		// Equinix OS installations always use root.
+		User: username,
+		Auth: []ssh.AuthMethod{
+			ssh.PublicKeys(sshkey),
+		},
+		// Ignore the host key, since it's likely the first time anything logs into
+		// this device, and also because there's no way of knowing its fingerprint.
+		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
+		// Timeout sets a bound on the time it takes to set up the connection, but
+		// not on total session time.
+		Timeout: connectTimeout,
+	}
+	conn2, chanC, reqC, err := ssh.NewClientConn(conn, address, conf)
+	if err != nil {
+		return nil, err
+	}
+	cl := ssh.NewClient(conn2, chanC, reqC)
+	return &plainSSHConn{
+		cl: cl,
+	}, nil
+}
+
+func (p *plainSSHConn) Execute(ctx context.Context, command string, stdin []byte) (stdout []byte, stderr []byte, err error) {
+	sess, err := p.cl.NewSession()
+	if err != nil {
+		return nil, nil, fmt.Errorf("while creating SSH session: %w", err)
+	}
+	stdoutBuf := bytes.NewBuffer(nil)
+	stderrBuf := bytes.NewBuffer(nil)
+	sess.Stdin = bytes.NewBuffer(stdin)
+	sess.Stdout = stdoutBuf
+	sess.Stderr = stderrBuf
+	defer sess.Close()
+
+	if err := sess.Start(command); err != nil {
+		return nil, nil, err
+	}
+	doneC := make(chan error, 1)
+	go func() {
+		doneC <- sess.Wait()
+	}()
+	select {
+	case <-ctx.Done():
+		return nil, nil, ctx.Err()
+	case err := <-doneC:
+		return stdoutBuf.Bytes(), stderrBuf.Bytes(), err
+	}
+}
+
+func (p *plainSSHConn) Upload(ctx context.Context, targetPath string, data []byte) error {
+	sc, err := sftp.NewClient(p.cl)
+	if err != nil {
+		return fmt.Errorf("while building sftp client: %w", err)
+	}
+	defer sc.Close()
+
+	acrdr := bytes.NewReader(data)
+	df, err := sc.Create(targetPath)
+	if err != nil {
+		return fmt.Errorf("while creating file on the host: %w", err)
+	}
+
+	doneC := make(chan error, 1)
+
+	go func() {
+		_, err := io.Copy(df, acrdr)
+		df.Close()
+		doneC <- err
+	}()
+
+	select {
+	case err := <-doneC:
+		if err != nil {
+			return fmt.Errorf("while copying file: %w", err)
+		}
+	case <-ctx.Done():
+		df.Close()
+		return ctx.Err()
+	}
+
+	if err := sc.Chmod(targetPath, 0755); err != nil {
+		return fmt.Errorf("while setting file permissions: %w", err)
+	}
+	return nil
+}
+
+func (p *plainSSHConn) Close() error {
+	return p.cl.Close()
+}
diff --git a/cloud/shepherd/equinix/manager/test_agent/BUILD.bazel b/cloud/shepherd/equinix/manager/test_agent/BUILD.bazel
new file mode 100644
index 0000000..8f03070
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/test_agent/BUILD.bazel
@@ -0,0 +1,20 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+
+go_binary(
+    name = "test_agent",
+    embed = [":test_agent_lib"],
+    visibility = [
+        "//cloud/shepherd/equinix/manager:__pkg__",
+    ],
+)
+
+go_library(
+    name = "test_agent_lib",
+    srcs = ["main.go"],
+    importpath = "source.monogon.dev/cloud/shepherd/equinix/manager/test_agent",
+    visibility = ["//visibility:private"],
+    deps = [
+        "//cloud/agent/api",
+        "@org_golang_google_protobuf//proto",
+    ],
+)
diff --git a/cloud/shepherd/equinix/manager/test_agent/main.go b/cloud/shepherd/equinix/manager/test_agent/main.go
new file mode 100644
index 0000000..5dd5ccd
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/test_agent/main.go
@@ -0,0 +1,52 @@
+// test_agent is used by the Equinix Metal Manager test code. Its only role
+// is to ensure successful delivery of the BMaaS agent executable to the test
+// hosts, together with its subsequent execution.
+package main
+
+import (
+	"crypto/ed25519"
+	"crypto/rand"
+	"fmt"
+	"io"
+	"os"
+
+	"google.golang.org/protobuf/proto"
+
+	apb "source.monogon.dev/cloud/agent/api"
+)
+
+func main() {
+	// The agent initialization message will arrive from Shepherd on Agent's
+	// standard input.
+	aimb, err := io.ReadAll(os.Stdin)
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "while reading AgentInit message: %v\n", err)
+		return
+	}
+	var aim apb.TakeoverInit
+	if err := proto.Unmarshal(aimb, &aim); err != nil {
+		fmt.Fprintf(os.Stderr, "while unmarshaling TakeoverInit message: %v\n", err)
+		return
+	}
+
+	// Agent should send back apb.TakeoverResponse on its standard output.
+	pub, _, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "while generating agent public key: %v\n", err)
+		return
+	}
+	arsp := apb.TakeoverResponse{
+		InitMessage: &aim,
+		Key:         pub,
+	}
+	arspb, err := proto.Marshal(&arsp)
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "while marshaling TakeoverResponse message: %v\n", err)
+		return
+	}
+	if _, err := os.Stdout.Write(arspb); err != nil {
+		fmt.Fprintf(os.Stderr, "while writing TakeoverResponse message: %v\n", err)
+	}
+	// The agent must detach and/or terminate after sending back the reply.
+	// Failure to do so will leave the session hanging.
+}
diff --git a/go.mod b/go.mod
index 03d0f33..0c1ee62 100644
--- a/go.mod
+++ b/go.mod
@@ -104,6 +104,7 @@
 	github.com/packethost/packngo v0.29.0
 	github.com/pierrec/lz4/v4 v4.1.14
 	github.com/pkg/errors v0.9.1
+	github.com/pkg/sftp v1.10.1
 	github.com/rekby/gpt v0.0.0-20200614112001-7da10aec5566
 	github.com/rmohr/bazeldnf v0.5.4
 	github.com/sbezverk/nfproxy v0.0.0-20210112155058-0d98b4a69f0c
@@ -123,6 +124,7 @@
 	golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4
 	golang.org/x/sys v0.0.0-20220804214406-8e32c043e418
 	golang.org/x/text v0.3.7
+	golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
 	golang.org/x/tools v0.1.10-0.20220218145154-897bd77cd717
 	golang.zx2c4.com/wireguard/wgctrl v0.0.0-20220208144051-fde48d68ee68
 	google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21
@@ -271,6 +273,7 @@
 	github.com/json-iterator/go v1.1.12 // indirect
 	github.com/karrick/godirwalk v1.16.1 // indirect
 	github.com/klauspost/compress v1.14.2 // indirect
+	github.com/kr/fs v0.1.0 // indirect
 	github.com/kr/pty v1.1.8 // indirect
 	github.com/libopenstorage/openstorage v1.0.0 // indirect
 	github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
@@ -377,7 +380,6 @@
 	golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
 	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
 	golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
-	golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
 	golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
 	golang.zx2c4.com/wireguard v0.0.0-20220202223031-3b95c81cc178 // indirect
 	gonum.org/v1/gonum v0.9.3 // indirect
diff --git a/go.sum b/go.sum
index 5bf3e09..279bd7f 100644
--- a/go.sum
+++ b/go.sum
@@ -1425,6 +1425,7 @@
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
 github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -1827,6 +1828,7 @@
 github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
+github.com/pkg/sftp v1.10.1 h1:VasscCm72135zRysgrJDKsntdmPN+OuU3+nnHYA9wyc=
 github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
 github.com/pkg/xattr v0.4.1 h1:dhclzL6EqOXNaPDWqoeb9tIxATfBSmjqL0b4DpSjwRw=
 github.com/pkg/xattr v0.4.1/go.mod h1:W2cGD0TBEus7MkUgv0tNZ9JutLtVO3cXu+IBRuHqnFs=