cloud/shepherd/equinix/manager: init

This adds implementation managing Equinix Metal server lifecycle as
part of the BMaaS project.

Co-authored-by: Mateusz Zalega <mateusz@monogon.tech>
Supersedes: https://review.monogon.dev/c/monogon/+/990
Change-Id: I5537b2d07763985ad27aecac544ed19f933d6727
Reviewed-on: https://review.monogon.dev/c/monogon/+/1129
Reviewed-by: Leopold Schabel <leo@monogon.tech>
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/cloud/shepherd/equinix/manager/initializer.go b/cloud/shepherd/equinix/manager/initializer.go
new file mode 100644
index 0000000..fd2e00b
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/initializer.go
@@ -0,0 +1,376 @@
+package manager
+
+import (
+	"context"
+	"crypto/ed25519"
+	"encoding/hex"
+	"errors"
+	"flag"
+	"fmt"
+	"net"
+	"os"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/google/uuid"
+	"github.com/packethost/packngo"
+	"golang.org/x/crypto/ssh"
+	"golang.org/x/time/rate"
+	"google.golang.org/protobuf/proto"
+	"k8s.io/klog/v2"
+
+	apb "source.monogon.dev/cloud/agent/api"
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
+)
+
+// AgentConfig configures how the Initializer will deploy Agents on machines. In
+// CLI scenarios, this should be populated from flags via RegisterFlags.
+type AgentConfig struct {
+	// Executable is the contents of the agent binary created and run
+	// at the provisioned servers. Must be set.
+	Executable []byte
+
+	// TargetPath is a filesystem destination path used while uploading the BMaaS
+	// agent executable to hosts as part of the initialization process. Must be set.
+	TargetPath string
+
+	// Endpoint is the address Agent will use to contact the BMaaS
+	// infrastructure. Must be set.
+	Endpoint string
+
+	// SSHTimeout is the amount of time set aside for the initializing
+	// SSH session to run its course. Upon timeout, the iteration would be
+	// declared a failure. Must be set.
+	SSHConnectTimeout time.Duration
+	// SSHExecTimeout is the amount of time set aside for executing the agent and
+	// getting its output once the SSH connection has been established. Upon timeout,
+	// the iteration would be declared as failure. Must be set.
+	SSHExecTimeout time.Duration
+}
+
+func (a *AgentConfig) RegisterFlags() {
+	flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
+		if val == "" {
+			return nil
+		}
+		data, err := os.ReadFile(val)
+		if err != nil {
+			return fmt.Errorf("could not read -agent_executable_path: %w", err)
+		}
+		a.Executable = data
+		return nil
+	})
+	flag.StringVar(&a.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
+	flag.StringVar(&a.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
+	flag.DurationVar(&a.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
+	flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
+}
+
+// InitializerConfig configures the broad agent initialization process. The
+// specifics of how an agent is started are instead configured in Agent Config. In
+// CLI scenarios, this should be populated from flags via RegisterFlags.
+type InitializerConfig struct {
+	// DBQueryLimiter limits the rate at which BMDB is queried for servers ready
+	// for BMaaS agent initialization. Must be set.
+	DBQueryLimiter *rate.Limiter
+}
+
+// flagLimiter configures a *rate.Limiter as a flag.
+func flagLimiter(l **rate.Limiter, name, defval, help string) {
+	syntax := "'duration,count' eg. '2m,10' for a 10-sized bucket refilled at one token every 2 minutes"
+	help = help + fmt.Sprintf(" (default: %q, syntax: %s)", defval, syntax)
+	flag.Func(name, help, func(val string) error {
+		if val == "" {
+			val = defval
+		}
+		parts := strings.Split(val, ",")
+		if len(parts) != 2 {
+			return fmt.Errorf("invalid syntax, want: %s", syntax)
+		}
+		duration, err := time.ParseDuration(parts[0])
+		if err != nil {
+			return fmt.Errorf("invalid duration: %w", err)
+		}
+		refill, err := strconv.ParseUint(parts[1], 10, 31)
+		if err != nil {
+			return fmt.Errorf("invalid refill rate: %w", err)
+		}
+		*l = rate.NewLimiter(rate.Every(duration), int(refill))
+		return nil
+	})
+	flag.Set(name, defval)
+}
+
+func (i *InitializerConfig) RegisterFlags() {
+	flagLimiter(&i.DBQueryLimiter, "initializer_db_query_rate", "250ms,8", "Rate limiting for BMDB queries")
+}
+
+// Initializer implements the BMaaS agent initialization process. Initialization
+// entails asking the BMDB for machines that need the agent started
+// (or-restarted) and acting upon that.
+type Initializer struct {
+	config       *InitializerConfig
+	agentConfig  *AgentConfig
+	sharedConfig *SharedConfig
+	sshClient    SSHClient
+	// cl is the packngo wrapper used by the initializer.
+	cl ecl.Client
+}
+
+// task describes a single server currently being processed either in the
+// context of agent initialization or recovery.
+type task struct {
+	// id is the BMDB-assigned machine identifier.
+	id uuid.UUID
+	// pid is an identifier assigned by the provider (Equinix).
+	pid uuid.UUID
+	// work is a machine lock facilitated by BMDB that prevents machines from
+	// being processed by multiple workers at the same time.
+	work *bmdb.Work
+	// dev is a provider machine/device record.
+	dev *packngo.Device
+}
+
+// New creates an Initializer instance, checking the InitializerConfig,
+// SharedConfig and AgentConfig for errors.
+func (c *InitializerConfig) New(cl ecl.Client, sc *SharedConfig, ac *AgentConfig) (*Initializer, error) {
+	if err := sc.check(); err != nil {
+		return nil, err
+	}
+	if len(ac.Executable) == 0 {
+		return nil, fmt.Errorf("agent executable not configured")
+	}
+	if ac.TargetPath == "" {
+		return nil, fmt.Errorf("agent target path must be set")
+	}
+	if ac.Endpoint == "" {
+		return nil, fmt.Errorf("agent endpoint must be set")
+	}
+	if ac.SSHConnectTimeout == 0 {
+		return nil, fmt.Errorf("agent SSH connection timeout must be set")
+	}
+	if ac.SSHExecTimeout == 0 {
+		return nil, fmt.Errorf("agent SSH execution timeout must be set")
+	}
+	if c.DBQueryLimiter == nil {
+		return nil, fmt.Errorf("DBQueryLimiter must be configured")
+	}
+	return &Initializer{
+		config:       c,
+		sharedConfig: sc,
+		agentConfig:  ac,
+		sshClient:    &PlainSSHClient{},
+		cl:           cl,
+	}, nil
+}
+
+// Run the initializer blocking the current goroutine until the given context
+// expires.
+func (c *Initializer) Run(ctx context.Context, conn *bmdb.Connection) error {
+	signer, err := c.sharedConfig.sshSigner()
+	if err != nil {
+		return fmt.Errorf("could not initialize signer: %w", err)
+	}
+
+	// Maintain a BMDB session as long as possible.
+	var sess *bmdb.Session
+	for {
+		if sess == nil {
+			sess, err = conn.StartSession(ctx)
+			if err != nil {
+				return fmt.Errorf("could not start BMDB session: %w", err)
+			}
+		}
+		// Inside that session, run the main logic.
+		err = c.runInSession(ctx, sess, signer)
+
+		switch {
+		case err == nil:
+		case errors.Is(err, ctx.Err()):
+			return err
+		case errors.Is(err, bmdb.ErrSessionExpired):
+			klog.Errorf("Session expired, restarting...")
+			sess = nil
+			time.Sleep(time.Second)
+		case err != nil:
+			klog.Errorf("Processing failed: %v", err)
+			// TODO(q3k): close session
+			time.Sleep(time.Second)
+		}
+	}
+}
+
+// runInSession executes one iteration of the initializer's control loop within a
+// BMDB session. This control loop attempts to start or re-start the agent on any
+// machines that need this per the BMDB.
+func (c *Initializer) runInSession(ctx context.Context, sess *bmdb.Session, signer ssh.Signer) error {
+	t, err := c.source(ctx, sess)
+	if err != nil {
+		return fmt.Errorf("could not source machine: %w", err)
+	}
+	if t == nil {
+		return nil
+	}
+	defer t.work.Cancel(ctx)
+
+	klog.Infof("Machine %q needs installation, fetching corresponding packngo device %q...", t.id, t.pid)
+	dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.pid.String())
+	if err != nil {
+		klog.Errorf("failed to fetch device %q: %v", t.pid, err)
+		d := 30 * time.Second
+		err = t.work.Fail(ctx, &d, "failed to fetch device from equinix")
+		return err
+	}
+	t.dev = dev
+
+	err = c.init(ctx, signer, t)
+	if err != nil {
+		klog.Errorf("Failed to initialize: %v", err)
+		d := 1 * time.Minute
+		err = t.work.Fail(ctx, &d, fmt.Sprintf("failed to initialize machine: %v", err))
+		return err
+	}
+	return nil
+}
+
+// startAgent runs the agent executable on the target device d, returning the
+// agent's public key on success.
+func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d packngo.Device) ([]byte, error) {
+	// Provide a bound on execution time in case we get stuck after the SSH
+	// connection is established.
+	sctx, sctxC := context.WithTimeout(ctx, i.agentConfig.SSHExecTimeout)
+	defer sctxC()
+
+	// Use the device's IP address exposed by Equinix API.
+	ni := d.GetNetworkInfo()
+	var addr string
+	if ni.PublicIPv4 != "" {
+		addr = net.JoinHostPort(ni.PublicIPv4, "22")
+	} else if ni.PublicIPv6 != "" {
+		addr = net.JoinHostPort(ni.PublicIPv6, "22")
+	} else {
+		return nil, fmt.Errorf("device (ID: %s) has no available addresses", d.ID)
+	}
+	klog.V(1).Infof("Dialing device (provider ID: %s, addr: %s).", d.ID, addr)
+
+	conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.agentConfig.SSHConnectTimeout)
+	if err != nil {
+		return nil, fmt.Errorf("while dialing the device: %w", err)
+	}
+	defer conn.Close()
+
+	// Upload the agent executable.
+
+	klog.Infof("Uploading the agent executable (provider ID: %s, addr: %s).", d.ID, addr)
+	if err := conn.Upload(sctx, i.agentConfig.TargetPath, i.agentConfig.Executable); err != nil {
+		return nil, fmt.Errorf("while uploading agent executable: %w", err)
+	}
+	klog.V(1).Infof("Upload successful (provider ID: %s, addr: %s).", d.ID, addr)
+
+	// The initialization protobuf message will be sent to the agent on its
+	// standard input.
+	imsg := apb.TakeoverInit{
+		Provider:      "equinix",
+		ProviderId:    d.ID,
+		BmaasEndpoint: i.agentConfig.Endpoint,
+	}
+	imsgb, err := proto.Marshal(&imsg)
+	if err != nil {
+		return nil, fmt.Errorf("while marshaling agent message: %w", err)
+	}
+
+	// Start the agent and wait for the agent's output to arrive.
+	klog.V(1).Infof("Starting the agent executable at path %q (provider ID: %s).", i.agentConfig.TargetPath, d.ID)
+	stdout, stderr, err := conn.Execute(ctx, i.agentConfig.TargetPath, imsgb)
+	stderrStr := strings.TrimSpace(string(stderr))
+	if stderrStr != "" {
+		klog.Warningf("Agent stderr: %q", stderrStr)
+	}
+	if err != nil {
+		return nil, fmt.Errorf("while starting the agent executable: %w", err)
+	}
+
+	var arsp apb.TakeoverResponse
+	if err := proto.Unmarshal(stdout, &arsp); err != nil {
+		return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
+	}
+	if !proto.Equal(&imsg, arsp.InitMessage) {
+		return nil, fmt.Errorf("agent did not send back the init message.")
+	}
+	if len(arsp.Key) != ed25519.PublicKeySize {
+		return nil, fmt.Errorf("agent key length mismatch.")
+	}
+	klog.Infof("Started the agent (provider ID: %s, key: %s).", d.ID, hex.EncodeToString(arsp.Key))
+	return arsp.Key, nil
+}
+
+// init initializes the server described by t, using BMDB session 'sess' to set
+// the relevant BMDB tag on success, and 'sgn' to authenticate to the server.
+func (ir *Initializer) init(ctx context.Context, sgn ssh.Signer, t *task) error {
+	// Start the agent.
+	klog.Infof("Starting agent on device (ID: %s, PID %s)", t.id, t.pid)
+	apk, err := ir.startAgent(ctx, sgn, *t.dev)
+	if err != nil {
+		return fmt.Errorf("while installing the agent: %w", err)
+	}
+
+	// Agent startup succeeded. Set the appropriate BMDB tag, and release the
+	// lock.
+	klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.id, t.pid, hex.EncodeToString(apk))
+	err = t.work.Finish(ctx, func(q *model.Queries) error {
+		return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+			MachineID:      t.id,
+			AgentStartedAt: time.Now(),
+			AgentPublicKey: apk,
+		})
+	})
+	if err != nil {
+		return fmt.Errorf("while setting AgentStarted tag: %w", err)
+	}
+	return nil
+}
+
+// source supplies returns a BMDB-locked server ready for initialization, locked
+// by a work item. If both task and error are nil, then there are no machines
+// needed to be initialized.
+// The returned work item in task _must_ be canceled or finished by the caller.
+func (ir *Initializer) source(ctx context.Context, sess *bmdb.Session) (*task, error) {
+	ir.config.DBQueryLimiter.Wait(ctx)
+
+	var machine *model.MachineProvided
+	work, err := sess.Work(ctx, model.ProcessShepherdInstall, func(q *model.Queries) ([]uuid.UUID, error) {
+		machines, err := q.GetMachinesForAgentStart(ctx, 1)
+		if err != nil {
+			return nil, err
+		}
+		if len(machines) < 1 {
+			return nil, bmdb.ErrNothingToDo
+		}
+		machine = &machines[0]
+		return []uuid.UUID{machines[0].MachineID}, nil
+	})
+
+	if errors.Is(err, bmdb.ErrNothingToDo) {
+		return nil, nil
+	}
+
+	if err != nil {
+		return nil, fmt.Errorf("while querying BMDB agent candidates: %w", err)
+	}
+
+	pid, err := uuid.Parse(machine.ProviderID)
+	if err != nil {
+		t := time.Hour
+		work.Fail(ctx, &t, fmt.Sprintf("could not parse provider UUID %q", machine.ProviderID))
+		return nil, fmt.Errorf("while parsing provider UUID: %w", err)
+	}
+
+	return &task{
+		id:   machine.MachineID,
+		pid:  pid,
+		work: work,
+	}, nil
+}