cloud/shepherd/equinix: split out control loop logic
This is in preparation for implementing the recoverer/rebooter inside
the shepherd.
In the future this will likely be split away from from the shepherd and
end up as a generic bmdb library. But let's first wait for concrete
usages outside of the shepherd component.
Change-Id: I69b9a2e913dcefa2c6558e271b6853285c6120b3
Reviewed-on: https://review.monogon.dev/c/monogon/+/1559
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/shepherd/equinix/manager/initializer.go b/cloud/shepherd/equinix/manager/initializer.go
index f51d4e4..4193f4c 100644
--- a/cloud/shepherd/equinix/manager/initializer.go
+++ b/cloud/shepherd/equinix/manager/initializer.go
@@ -6,32 +6,30 @@
"crypto/x509"
"encoding/hex"
"encoding/pem"
- "errors"
"flag"
"fmt"
"net"
"os"
- "strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/packethost/packngo"
"golang.org/x/crypto/ssh"
- "golang.org/x/sync/errgroup"
- "golang.org/x/time/rate"
"google.golang.org/protobuf/proto"
"k8s.io/klog/v2"
apb "source.monogon.dev/cloud/agent/api"
- "source.monogon.dev/cloud/bmaas/bmdb"
"source.monogon.dev/cloud/bmaas/bmdb/model"
ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
)
-// AgentConfig configures how the Initializer will deploy Agents on machines. In
-// CLI scenarios, this should be populated from flags via RegisterFlags.
-type AgentConfig struct {
+// InitializerConfig configures how the Initializer will deploy Agents on
+// machines. In CLI scenarios, this should be populated from flags via
+// RegisterFlags.
+type InitializerConfig struct {
+ ControlLoopConfig
+
// Executable is the contents of the agent binary created and run
// at the provisioned servers. Must be set.
Executable []byte
@@ -59,7 +57,9 @@
SSHExecTimeout time.Duration
}
-func (a *AgentConfig) RegisterFlags() {
+func (a *InitializerConfig) RegisterFlags() {
+ a.ControlLoopConfig.RegisterFlags("initializer")
+
flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
if val == "" {
return nil
@@ -96,194 +96,89 @@
flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
}
-// InitializerConfig configures the broad agent initialization process. The
-// specifics of how an agent is started are instead configured in Agent Config. In
-// CLI scenarios, this should be populated from flags via RegisterFlags.
-type InitializerConfig struct {
- // DBQueryLimiter limits the rate at which BMDB is queried for servers ready
- // for BMaaS agent initialization. Must be set.
- DBQueryLimiter *rate.Limiter
-
- // Parallelism is how many instances of the Initializer will be allowed to run in
- // parallel against the BMDB. This speeds up the process of starting/restarting
- // agents significantly, as one initializer instance can handle at most one agent
- // (re)starting process.
- //
- // If not set (ie. 0), default to 1. A good starting value for production
- // deployments is 10 or so.
- Parallelism int
-}
-
-// flagLimiter configures a *rate.Limiter as a flag.
-func flagLimiter(l **rate.Limiter, name, defval, help string) {
- syntax := "'duration,count' eg. '2m,10' for a 10-sized bucket refilled at one token every 2 minutes"
- help = help + fmt.Sprintf(" (default: %q, syntax: %s)", defval, syntax)
- flag.Func(name, help, func(val string) error {
- if val == "" {
- val = defval
- }
- parts := strings.Split(val, ",")
- if len(parts) != 2 {
- return fmt.Errorf("invalid syntax, want: %s", syntax)
- }
- duration, err := time.ParseDuration(parts[0])
- if err != nil {
- return fmt.Errorf("invalid duration: %w", err)
- }
- refill, err := strconv.ParseUint(parts[1], 10, 31)
- if err != nil {
- return fmt.Errorf("invalid refill rate: %w", err)
- }
- *l = rate.NewLimiter(rate.Every(duration), int(refill))
- return nil
- })
- flag.Set(name, defval)
-}
-
-func (i *InitializerConfig) RegisterFlags() {
- flagLimiter(&i.DBQueryLimiter, "initializer_db_query_rate", "250ms,8", "Rate limiting for BMDB queries")
- flag.IntVar(&i.Parallelism, "initializer_parallelism", 1, "How many initializer instances to run in parallel, ie. how many agents to attempt to (re)start at once")
-}
-
-// Initializer implements the BMaaS agent initialization process. Initialization
-// entails asking the BMDB for machines that need the agent started
-// (or-restarted) and acting upon that.
+// The Initializer starts the agent on machines that aren't yet running it.
type Initializer struct {
- config *InitializerConfig
- agentConfig *AgentConfig
+ InitializerConfig
+
sharedConfig *SharedConfig
+ cl ecl.Client
+ signer ssh.Signer
sshClient SSHClient
- // cl is the packngo wrapper used by the initializer.
- cl ecl.Client
}
-// task describes a single server currently being processed either in the
-// context of agent initialization or recovery.
-type task struct {
- // id is the BMDB-assigned machine identifier.
- id uuid.UUID
- // pid is an identifier assigned by the provider (Equinix).
- pid uuid.UUID
- // work is a machine lock facilitated by BMDB that prevents machines from
- // being processed by multiple workers at the same time.
- work *bmdb.Work
- // dev is a provider machine/device record.
- dev *packngo.Device
-}
-
-// New creates an Initializer instance, checking the InitializerConfig,
-// SharedConfig and AgentConfig for errors.
-func (c *InitializerConfig) New(cl ecl.Client, sc *SharedConfig, ac *AgentConfig) (*Initializer, error) {
+// NewInitializer creates an Initializer instance, checking the
+// InitializerConfig, SharedConfig and AgentConfig for errors.
+func NewInitializer(cl ecl.Client, ic InitializerConfig, sc *SharedConfig) (*Initializer, error) {
if err := sc.check(); err != nil {
return nil, err
}
- if len(ac.Executable) == 0 {
+ if err := ic.ControlLoopConfig.Check(); err != nil {
+ return nil, err
+ }
+
+ if len(ic.Executable) == 0 {
return nil, fmt.Errorf("agent executable not configured")
}
- if ac.TargetPath == "" {
+ if ic.TargetPath == "" {
return nil, fmt.Errorf("agent target path must be set")
}
- if ac.Endpoint == "" {
+ if ic.Endpoint == "" {
return nil, fmt.Errorf("agent endpoint must be set")
}
- if ac.SSHConnectTimeout == 0 {
+ if ic.SSHConnectTimeout == 0 {
return nil, fmt.Errorf("agent SSH connection timeout must be set")
}
- if ac.SSHExecTimeout == 0 {
+ if ic.SSHExecTimeout == 0 {
return nil, fmt.Errorf("agent SSH execution timeout must be set")
}
- if c.DBQueryLimiter == nil {
- return nil, fmt.Errorf("DBQueryLimiter must be configured")
+
+ signer, err := sc.sshSigner()
+ if err != nil {
+ return nil, fmt.Errorf("could not initialize signer: %w", err)
}
- if c.Parallelism == 0 {
- c.Parallelism = 1
- }
+
return &Initializer{
- config: c,
+ InitializerConfig: ic,
+
sharedConfig: sc,
- agentConfig: ac,
- sshClient: &PlainSSHClient{},
cl: cl,
+ sshClient: &PlainSSHClient{},
+ signer: signer,
}, nil
}
-// Run the initializer(s) (depending on opts.Parallelism) blocking the current
-// goroutine until the given context expires and all provisioners quit.
-func (i *Initializer) Run(ctx context.Context, conn *bmdb.Connection) error {
- eg := errgroup.Group{}
- for j := 0; j < i.config.Parallelism; j += 1 {
- eg.Go(func() error {
- return i.runOne(ctx, conn)
- })
- }
- return eg.Wait()
+func (c *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
+ return q.GetMachinesForAgentStart(ctx, limit)
}
-// Run the initializer blocking the current goroutine until the given context
-// expires.
-func (c *Initializer) runOne(ctx context.Context, conn *bmdb.Connection) error {
- signer, err := c.sharedConfig.sshSigner()
+func (c *Initializer) processMachine(ctx context.Context, t *task) error {
+ dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.machine.ProviderID)
if err != nil {
- return fmt.Errorf("could not initialize signer: %w", err)
- }
-
- // Maintain a BMDB session as long as possible.
- var sess *bmdb.Session
- for {
- if sess == nil {
- sess, err = conn.StartSession(ctx)
- if err != nil {
- return fmt.Errorf("could not start BMDB session: %w", err)
- }
- }
- // Inside that session, run the main logic.
- err = c.runInSession(ctx, sess, signer)
-
- switch {
- case err == nil:
- case errors.Is(err, ctx.Err()):
- return err
- case errors.Is(err, bmdb.ErrSessionExpired):
- klog.Errorf("Session expired, restarting...")
- sess = nil
- time.Sleep(time.Second)
- case err != nil:
- klog.Errorf("Processing failed: %v", err)
- // TODO(q3k): close session
- time.Sleep(time.Second)
- }
- }
-}
-
-// runInSession executes one iteration of the initializer's control loop within a
-// BMDB session. This control loop attempts to start or re-start the agent on any
-// machines that need this per the BMDB.
-func (c *Initializer) runInSession(ctx context.Context, sess *bmdb.Session, signer ssh.Signer) error {
- t, err := c.source(ctx, sess)
- if err != nil {
- return fmt.Errorf("could not source machine: %w", err)
- }
- if t == nil {
- return nil
- }
- defer t.work.Cancel(ctx)
-
- klog.Infof("Machine %q needs agent start, fetching corresponding packngo device %q...", t.id, t.pid)
- dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.pid.String())
- if err != nil {
- klog.Errorf("failed to fetch device %q: %v", t.pid, err)
+ klog.Errorf("failed to fetch device %q: %v", t.machine.ProviderID, err)
d := 30 * time.Second
err = t.work.Fail(ctx, &d, "failed to fetch device from equinix")
return err
}
- t.dev = dev
- err = c.init(ctx, signer, t)
+ // Start the agent.
+ klog.Infof("Starting agent on device (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
+ apk, err := c.startAgent(ctx, c.signer, dev, t.machine.MachineID)
if err != nil {
- klog.Errorf("Failed to initialize: %v", err)
- d := 1 * time.Minute
- err = t.work.Fail(ctx, &d, fmt.Sprintf("failed to initialize machine: %v", err))
- return err
+ return fmt.Errorf("while starting the agent: %w", err)
+ }
+
+ // Agent startup succeeded. Set the appropriate BMDB tag, and release the
+ // lock.
+ klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.machine.MachineID, t.machine.ProviderID, hex.EncodeToString(apk))
+ err = t.work.Finish(ctx, func(q *model.Queries) error {
+ return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+ MachineID: t.machine.MachineID,
+ AgentStartedAt: time.Now(),
+ AgentPublicKey: apk,
+ })
+ })
+ if err != nil {
+ return fmt.Errorf("while setting AgentStarted tag: %w", err)
}
return nil
}
@@ -293,7 +188,7 @@
func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d *packngo.Device, mid uuid.UUID) ([]byte, error) {
// Provide a bound on execution time in case we get stuck after the SSH
// connection is established.
- sctx, sctxC := context.WithTimeout(ctx, i.agentConfig.SSHExecTimeout)
+ sctx, sctxC := context.WithTimeout(ctx, i.SSHExecTimeout)
defer sctxC()
// Use the device's IP address exposed by Equinix API.
@@ -308,7 +203,7 @@
}
klog.V(1).Infof("Dialing device (machine ID: %s, addr: %s).", mid, addr)
- conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.agentConfig.SSHConnectTimeout)
+ conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.SSHConnectTimeout)
if err != nil {
return nil, fmt.Errorf("while dialing the device: %w", err)
}
@@ -317,7 +212,7 @@
// Upload the agent executable.
klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
- if err := conn.Upload(sctx, i.agentConfig.TargetPath, i.agentConfig.Executable); err != nil {
+ if err := conn.Upload(sctx, i.TargetPath, i.Executable); err != nil {
return nil, fmt.Errorf("while uploading agent executable: %w", err)
}
klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
@@ -326,8 +221,8 @@
// standard input.
imsg := apb.TakeoverInit{
MachineId: mid.String(),
- BmaasEndpoint: i.agentConfig.Endpoint,
- CaCertificate: i.agentConfig.EndpointCACertificate,
+ BmaasEndpoint: i.Endpoint,
+ CaCertificate: i.EndpointCACertificate,
}
imsgb, err := proto.Marshal(&imsg)
if err != nil {
@@ -335,8 +230,8 @@
}
// Start the agent and wait for the agent's output to arrive.
- klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.agentConfig.TargetPath, mid)
- stdout, stderr, err := conn.Execute(ctx, i.agentConfig.TargetPath, imsgb)
+ klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.TargetPath, mid)
+ stdout, stderr, err := conn.Execute(ctx, i.TargetPath, imsgb)
stderrStr := strings.TrimSpace(string(stderr))
if stderrStr != "" {
klog.Warningf("Agent stderr: %q", stderrStr)
@@ -367,71 +262,3 @@
klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
return successResp.Key, nil
}
-
-// init initializes the server described by t, using BMDB session 'sess' to set
-// the relevant BMDB tag on success, and 'sgn' to authenticate to the server.
-func (ir *Initializer) init(ctx context.Context, sgn ssh.Signer, t *task) error {
- // Start the agent.
- klog.Infof("Starting agent on device (ID: %s, PID %s)", t.id, t.pid)
- apk, err := ir.startAgent(ctx, sgn, t.dev, t.id)
- if err != nil {
- return fmt.Errorf("while starting the agent: %w", err)
- }
-
- // Agent startup succeeded. Set the appropriate BMDB tag, and release the
- // lock.
- klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.id, t.pid, hex.EncodeToString(apk))
- err = t.work.Finish(ctx, func(q *model.Queries) error {
- return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
- MachineID: t.id,
- AgentStartedAt: time.Now(),
- AgentPublicKey: apk,
- })
- })
- if err != nil {
- return fmt.Errorf("while setting AgentStarted tag: %w", err)
- }
- return nil
-}
-
-// source supplies returns a BMDB-locked server ready for initialization, locked
-// by a work item. If both task and error are nil, then there are no machines
-// needed to be initialized.
-// The returned work item in task _must_ be canceled or finished by the caller.
-func (ir *Initializer) source(ctx context.Context, sess *bmdb.Session) (*task, error) {
- ir.config.DBQueryLimiter.Wait(ctx)
-
- var machine *model.MachineProvided
- work, err := sess.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
- machines, err := q.GetMachinesForAgentStart(ctx, 1)
- if err != nil {
- return nil, err
- }
- if len(machines) < 1 {
- return nil, bmdb.ErrNothingToDo
- }
- machine = &machines[0]
- return []uuid.UUID{machines[0].MachineID}, nil
- })
-
- if errors.Is(err, bmdb.ErrNothingToDo) {
- return nil, nil
- }
-
- if err != nil {
- return nil, fmt.Errorf("while querying BMDB agent candidates: %w", err)
- }
-
- pid, err := uuid.Parse(machine.ProviderID)
- if err != nil {
- t := time.Hour
- work.Fail(ctx, &t, fmt.Sprintf("could not parse provider UUID %q", machine.ProviderID))
- return nil, fmt.Errorf("while parsing provider UUID: %w", err)
- }
-
- return &task{
- id: machine.MachineID,
- pid: pid,
- work: work,
- }, nil
-}