Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 1 | package manager |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "crypto/ed25519" |
Serge Bazanski | 51987d6 | 2023-04-06 16:35:35 +0200 | [diff] [blame^] | 6 | "crypto/x509" |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 7 | "encoding/hex" |
Serge Bazanski | 51987d6 | 2023-04-06 16:35:35 +0200 | [diff] [blame^] | 8 | "encoding/pem" |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 9 | "errors" |
| 10 | "flag" |
| 11 | "fmt" |
| 12 | "net" |
| 13 | "os" |
| 14 | "strconv" |
| 15 | "strings" |
| 16 | "time" |
| 17 | |
| 18 | "github.com/google/uuid" |
| 19 | "github.com/packethost/packngo" |
| 20 | "golang.org/x/crypto/ssh" |
Serge Bazanski | 9eb903d | 2023-02-20 14:28:19 +0100 | [diff] [blame] | 21 | "golang.org/x/sync/errgroup" |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 22 | "golang.org/x/time/rate" |
| 23 | "google.golang.org/protobuf/proto" |
| 24 | "k8s.io/klog/v2" |
| 25 | |
| 26 | apb "source.monogon.dev/cloud/agent/api" |
| 27 | "source.monogon.dev/cloud/bmaas/bmdb" |
| 28 | "source.monogon.dev/cloud/bmaas/bmdb/model" |
| 29 | ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo" |
| 30 | ) |
| 31 | |
| 32 | // AgentConfig configures how the Initializer will deploy Agents on machines. In |
| 33 | // CLI scenarios, this should be populated from flags via RegisterFlags. |
| 34 | type AgentConfig struct { |
| 35 | // Executable is the contents of the agent binary created and run |
| 36 | // at the provisioned servers. Must be set. |
| 37 | Executable []byte |
| 38 | |
| 39 | // TargetPath is a filesystem destination path used while uploading the BMaaS |
| 40 | // agent executable to hosts as part of the initialization process. Must be set. |
| 41 | TargetPath string |
| 42 | |
| 43 | // Endpoint is the address Agent will use to contact the BMaaS |
| 44 | // infrastructure. Must be set. |
| 45 | Endpoint string |
Serge Bazanski | 51987d6 | 2023-04-06 16:35:35 +0200 | [diff] [blame^] | 46 | // EndpointCACertificate is an optional DER-encoded (but not PEM-armored) X509 |
| 47 | // certificate used to populate the trusted CA store of the agent. It should be |
| 48 | // set to the CA certificate of the endpoint if not using a system-trusted CA |
| 49 | // certificate. |
| 50 | EndpointCACertificate []byte |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 51 | |
| 52 | // SSHTimeout is the amount of time set aside for the initializing |
| 53 | // SSH session to run its course. Upon timeout, the iteration would be |
| 54 | // declared a failure. Must be set. |
| 55 | SSHConnectTimeout time.Duration |
| 56 | // SSHExecTimeout is the amount of time set aside for executing the agent and |
| 57 | // getting its output once the SSH connection has been established. Upon timeout, |
| 58 | // the iteration would be declared as failure. Must be set. |
| 59 | SSHExecTimeout time.Duration |
| 60 | } |
| 61 | |
| 62 | func (a *AgentConfig) RegisterFlags() { |
| 63 | flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error { |
| 64 | if val == "" { |
| 65 | return nil |
| 66 | } |
| 67 | data, err := os.ReadFile(val) |
| 68 | if err != nil { |
Serge Bazanski | 77b11d3 | 2023-04-06 14:43:19 +0200 | [diff] [blame] | 69 | return fmt.Errorf("could not read: %w", err) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 70 | } |
| 71 | a.Executable = data |
| 72 | return nil |
| 73 | }) |
| 74 | flag.StringVar(&a.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from") |
| 75 | flag.StringVar(&a.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect") |
Serge Bazanski | 51987d6 | 2023-04-06 16:35:35 +0200 | [diff] [blame^] | 76 | flag.Func("agent_endpoint_ca_certificate_path", "Path to PEM X509 CA certificate that the agent endpoint is serving with. If not set, the agent will attempt to use system CA certificates to authenticate the endpoint.", func(val string) error { |
| 77 | if val == "" { |
| 78 | return nil |
| 79 | } |
| 80 | data, err := os.ReadFile(val) |
| 81 | if err != nil { |
| 82 | return fmt.Errorf("could not read: %w", err) |
| 83 | } |
| 84 | block, _ := pem.Decode(data) |
| 85 | if block.Type != "CERTIFICATE" { |
| 86 | return fmt.Errorf("not a certificate") |
| 87 | } |
| 88 | _, err = x509.ParseCertificate(block.Bytes) |
| 89 | if err != nil { |
| 90 | return fmt.Errorf("invalid certificate: %w", err) |
| 91 | } |
| 92 | a.EndpointCACertificate = block.Bytes |
| 93 | return nil |
| 94 | }) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 95 | flag.DurationVar(&a.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine") |
| 96 | flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine") |
| 97 | } |
| 98 | |
| 99 | // InitializerConfig configures the broad agent initialization process. The |
| 100 | // specifics of how an agent is started are instead configured in Agent Config. In |
| 101 | // CLI scenarios, this should be populated from flags via RegisterFlags. |
| 102 | type InitializerConfig struct { |
| 103 | // DBQueryLimiter limits the rate at which BMDB is queried for servers ready |
| 104 | // for BMaaS agent initialization. Must be set. |
| 105 | DBQueryLimiter *rate.Limiter |
Serge Bazanski | 9eb903d | 2023-02-20 14:28:19 +0100 | [diff] [blame] | 106 | |
| 107 | // Parallelism is how many instances of the Initializer will be allowed to run in |
| 108 | // parallel against the BMDB. This speeds up the process of starting/restarting |
| 109 | // agents significantly, as one initializer instance can handle at most one agent |
| 110 | // (re)starting process. |
| 111 | // |
| 112 | // If not set (ie. 0), default to 1. A good starting value for production |
| 113 | // deployments is 10 or so. |
| 114 | Parallelism int |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 115 | } |
| 116 | |
| 117 | // flagLimiter configures a *rate.Limiter as a flag. |
| 118 | func flagLimiter(l **rate.Limiter, name, defval, help string) { |
| 119 | syntax := "'duration,count' eg. '2m,10' for a 10-sized bucket refilled at one token every 2 minutes" |
| 120 | help = help + fmt.Sprintf(" (default: %q, syntax: %s)", defval, syntax) |
| 121 | flag.Func(name, help, func(val string) error { |
| 122 | if val == "" { |
| 123 | val = defval |
| 124 | } |
| 125 | parts := strings.Split(val, ",") |
| 126 | if len(parts) != 2 { |
| 127 | return fmt.Errorf("invalid syntax, want: %s", syntax) |
| 128 | } |
| 129 | duration, err := time.ParseDuration(parts[0]) |
| 130 | if err != nil { |
| 131 | return fmt.Errorf("invalid duration: %w", err) |
| 132 | } |
| 133 | refill, err := strconv.ParseUint(parts[1], 10, 31) |
| 134 | if err != nil { |
| 135 | return fmt.Errorf("invalid refill rate: %w", err) |
| 136 | } |
| 137 | *l = rate.NewLimiter(rate.Every(duration), int(refill)) |
| 138 | return nil |
| 139 | }) |
| 140 | flag.Set(name, defval) |
| 141 | } |
| 142 | |
| 143 | func (i *InitializerConfig) RegisterFlags() { |
| 144 | flagLimiter(&i.DBQueryLimiter, "initializer_db_query_rate", "250ms,8", "Rate limiting for BMDB queries") |
Serge Bazanski | 9eb903d | 2023-02-20 14:28:19 +0100 | [diff] [blame] | 145 | flag.IntVar(&i.Parallelism, "initializer_parallelism", 1, "How many initializer instances to run in parallel, ie. how many agents to attempt to (re)start at once") |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 146 | } |
| 147 | |
| 148 | // Initializer implements the BMaaS agent initialization process. Initialization |
| 149 | // entails asking the BMDB for machines that need the agent started |
| 150 | // (or-restarted) and acting upon that. |
| 151 | type Initializer struct { |
| 152 | config *InitializerConfig |
| 153 | agentConfig *AgentConfig |
| 154 | sharedConfig *SharedConfig |
| 155 | sshClient SSHClient |
| 156 | // cl is the packngo wrapper used by the initializer. |
| 157 | cl ecl.Client |
| 158 | } |
| 159 | |
| 160 | // task describes a single server currently being processed either in the |
| 161 | // context of agent initialization or recovery. |
| 162 | type task struct { |
| 163 | // id is the BMDB-assigned machine identifier. |
| 164 | id uuid.UUID |
| 165 | // pid is an identifier assigned by the provider (Equinix). |
| 166 | pid uuid.UUID |
| 167 | // work is a machine lock facilitated by BMDB that prevents machines from |
| 168 | // being processed by multiple workers at the same time. |
| 169 | work *bmdb.Work |
| 170 | // dev is a provider machine/device record. |
| 171 | dev *packngo.Device |
| 172 | } |
| 173 | |
| 174 | // New creates an Initializer instance, checking the InitializerConfig, |
| 175 | // SharedConfig and AgentConfig for errors. |
| 176 | func (c *InitializerConfig) New(cl ecl.Client, sc *SharedConfig, ac *AgentConfig) (*Initializer, error) { |
| 177 | if err := sc.check(); err != nil { |
| 178 | return nil, err |
| 179 | } |
| 180 | if len(ac.Executable) == 0 { |
| 181 | return nil, fmt.Errorf("agent executable not configured") |
| 182 | } |
| 183 | if ac.TargetPath == "" { |
| 184 | return nil, fmt.Errorf("agent target path must be set") |
| 185 | } |
| 186 | if ac.Endpoint == "" { |
| 187 | return nil, fmt.Errorf("agent endpoint must be set") |
| 188 | } |
| 189 | if ac.SSHConnectTimeout == 0 { |
| 190 | return nil, fmt.Errorf("agent SSH connection timeout must be set") |
| 191 | } |
| 192 | if ac.SSHExecTimeout == 0 { |
| 193 | return nil, fmt.Errorf("agent SSH execution timeout must be set") |
| 194 | } |
| 195 | if c.DBQueryLimiter == nil { |
| 196 | return nil, fmt.Errorf("DBQueryLimiter must be configured") |
| 197 | } |
Serge Bazanski | 9eb903d | 2023-02-20 14:28:19 +0100 | [diff] [blame] | 198 | if c.Parallelism == 0 { |
| 199 | c.Parallelism = 1 |
| 200 | } |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 201 | return &Initializer{ |
| 202 | config: c, |
| 203 | sharedConfig: sc, |
| 204 | agentConfig: ac, |
| 205 | sshClient: &PlainSSHClient{}, |
| 206 | cl: cl, |
| 207 | }, nil |
| 208 | } |
| 209 | |
Serge Bazanski | 9eb903d | 2023-02-20 14:28:19 +0100 | [diff] [blame] | 210 | // Run the initializer(s) (depending on opts.Parallelism) blocking the current |
| 211 | // goroutine until the given context expires and all provisioners quit. |
| 212 | func (i *Initializer) Run(ctx context.Context, conn *bmdb.Connection) error { |
| 213 | eg := errgroup.Group{} |
| 214 | for j := 0; j < i.config.Parallelism; j += 1 { |
| 215 | eg.Go(func() error { |
| 216 | return i.runOne(ctx, conn) |
| 217 | }) |
| 218 | } |
| 219 | return eg.Wait() |
| 220 | } |
| 221 | |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 222 | // Run the initializer blocking the current goroutine until the given context |
| 223 | // expires. |
Serge Bazanski | 9eb903d | 2023-02-20 14:28:19 +0100 | [diff] [blame] | 224 | func (c *Initializer) runOne(ctx context.Context, conn *bmdb.Connection) error { |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 225 | signer, err := c.sharedConfig.sshSigner() |
| 226 | if err != nil { |
| 227 | return fmt.Errorf("could not initialize signer: %w", err) |
| 228 | } |
| 229 | |
| 230 | // Maintain a BMDB session as long as possible. |
| 231 | var sess *bmdb.Session |
| 232 | for { |
| 233 | if sess == nil { |
| 234 | sess, err = conn.StartSession(ctx) |
| 235 | if err != nil { |
| 236 | return fmt.Errorf("could not start BMDB session: %w", err) |
| 237 | } |
| 238 | } |
| 239 | // Inside that session, run the main logic. |
| 240 | err = c.runInSession(ctx, sess, signer) |
| 241 | |
| 242 | switch { |
| 243 | case err == nil: |
| 244 | case errors.Is(err, ctx.Err()): |
| 245 | return err |
| 246 | case errors.Is(err, bmdb.ErrSessionExpired): |
| 247 | klog.Errorf("Session expired, restarting...") |
| 248 | sess = nil |
| 249 | time.Sleep(time.Second) |
| 250 | case err != nil: |
| 251 | klog.Errorf("Processing failed: %v", err) |
| 252 | // TODO(q3k): close session |
| 253 | time.Sleep(time.Second) |
| 254 | } |
| 255 | } |
| 256 | } |
| 257 | |
| 258 | // runInSession executes one iteration of the initializer's control loop within a |
| 259 | // BMDB session. This control loop attempts to start or re-start the agent on any |
| 260 | // machines that need this per the BMDB. |
| 261 | func (c *Initializer) runInSession(ctx context.Context, sess *bmdb.Session, signer ssh.Signer) error { |
| 262 | t, err := c.source(ctx, sess) |
| 263 | if err != nil { |
| 264 | return fmt.Errorf("could not source machine: %w", err) |
| 265 | } |
| 266 | if t == nil { |
| 267 | return nil |
| 268 | } |
| 269 | defer t.work.Cancel(ctx) |
| 270 | |
Serge Bazanski | 1038313 | 2023-02-20 15:39:45 +0100 | [diff] [blame] | 271 | klog.Infof("Machine %q needs agent start, fetching corresponding packngo device %q...", t.id, t.pid) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 272 | dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.pid.String()) |
| 273 | if err != nil { |
| 274 | klog.Errorf("failed to fetch device %q: %v", t.pid, err) |
| 275 | d := 30 * time.Second |
| 276 | err = t.work.Fail(ctx, &d, "failed to fetch device from equinix") |
| 277 | return err |
| 278 | } |
| 279 | t.dev = dev |
| 280 | |
| 281 | err = c.init(ctx, signer, t) |
| 282 | if err != nil { |
| 283 | klog.Errorf("Failed to initialize: %v", err) |
| 284 | d := 1 * time.Minute |
| 285 | err = t.work.Fail(ctx, &d, fmt.Sprintf("failed to initialize machine: %v", err)) |
| 286 | return err |
| 287 | } |
| 288 | return nil |
| 289 | } |
| 290 | |
| 291 | // startAgent runs the agent executable on the target device d, returning the |
| 292 | // agent's public key on success. |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 293 | func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d *packngo.Device, mid uuid.UUID) ([]byte, error) { |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 294 | // Provide a bound on execution time in case we get stuck after the SSH |
| 295 | // connection is established. |
| 296 | sctx, sctxC := context.WithTimeout(ctx, i.agentConfig.SSHExecTimeout) |
| 297 | defer sctxC() |
| 298 | |
| 299 | // Use the device's IP address exposed by Equinix API. |
| 300 | ni := d.GetNetworkInfo() |
| 301 | var addr string |
| 302 | if ni.PublicIPv4 != "" { |
| 303 | addr = net.JoinHostPort(ni.PublicIPv4, "22") |
| 304 | } else if ni.PublicIPv6 != "" { |
| 305 | addr = net.JoinHostPort(ni.PublicIPv6, "22") |
| 306 | } else { |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 307 | return nil, fmt.Errorf("device (machine ID: %s) has no available addresses", mid) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 308 | } |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 309 | klog.V(1).Infof("Dialing device (machine ID: %s, addr: %s).", mid, addr) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 310 | |
| 311 | conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.agentConfig.SSHConnectTimeout) |
| 312 | if err != nil { |
| 313 | return nil, fmt.Errorf("while dialing the device: %w", err) |
| 314 | } |
| 315 | defer conn.Close() |
| 316 | |
| 317 | // Upload the agent executable. |
| 318 | |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 319 | klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 320 | if err := conn.Upload(sctx, i.agentConfig.TargetPath, i.agentConfig.Executable); err != nil { |
| 321 | return nil, fmt.Errorf("while uploading agent executable: %w", err) |
| 322 | } |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 323 | klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 324 | |
| 325 | // The initialization protobuf message will be sent to the agent on its |
| 326 | // standard input. |
| 327 | imsg := apb.TakeoverInit{ |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 328 | MachineId: mid.String(), |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 329 | BmaasEndpoint: i.agentConfig.Endpoint, |
Serge Bazanski | 51987d6 | 2023-04-06 16:35:35 +0200 | [diff] [blame^] | 330 | CaCertificate: i.agentConfig.EndpointCACertificate, |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 331 | } |
| 332 | imsgb, err := proto.Marshal(&imsg) |
| 333 | if err != nil { |
| 334 | return nil, fmt.Errorf("while marshaling agent message: %w", err) |
| 335 | } |
| 336 | |
| 337 | // Start the agent and wait for the agent's output to arrive. |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 338 | klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.agentConfig.TargetPath, mid) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 339 | stdout, stderr, err := conn.Execute(ctx, i.agentConfig.TargetPath, imsgb) |
| 340 | stderrStr := strings.TrimSpace(string(stderr)) |
| 341 | if stderrStr != "" { |
| 342 | klog.Warningf("Agent stderr: %q", stderrStr) |
| 343 | } |
| 344 | if err != nil { |
| 345 | return nil, fmt.Errorf("while starting the agent executable: %w", err) |
| 346 | } |
| 347 | |
| 348 | var arsp apb.TakeoverResponse |
| 349 | if err := proto.Unmarshal(stdout, &arsp); err != nil { |
| 350 | return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err) |
| 351 | } |
Lorenz Brun | 595dfe9 | 2023-02-21 19:13:02 +0100 | [diff] [blame] | 352 | var successResp *apb.TakeoverSuccess |
| 353 | switch r := arsp.Result.(type) { |
| 354 | case *apb.TakeoverResponse_Error: |
| 355 | return nil, fmt.Errorf("agent returned error: %v", r.Error.Message) |
| 356 | case *apb.TakeoverResponse_Success: |
| 357 | successResp = r.Success |
| 358 | default: |
| 359 | return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result) |
| 360 | } |
| 361 | if !proto.Equal(&imsg, successResp.InitMessage) { |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 362 | return nil, fmt.Errorf("agent did not send back the init message.") |
| 363 | } |
Lorenz Brun | 595dfe9 | 2023-02-21 19:13:02 +0100 | [diff] [blame] | 364 | if len(successResp.Key) != ed25519.PublicKeySize { |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 365 | return nil, fmt.Errorf("agent key length mismatch.") |
| 366 | } |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 367 | klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key)) |
Lorenz Brun | 595dfe9 | 2023-02-21 19:13:02 +0100 | [diff] [blame] | 368 | return successResp.Key, nil |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 369 | } |
| 370 | |
| 371 | // init initializes the server described by t, using BMDB session 'sess' to set |
| 372 | // the relevant BMDB tag on success, and 'sgn' to authenticate to the server. |
| 373 | func (ir *Initializer) init(ctx context.Context, sgn ssh.Signer, t *task) error { |
| 374 | // Start the agent. |
| 375 | klog.Infof("Starting agent on device (ID: %s, PID %s)", t.id, t.pid) |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 376 | apk, err := ir.startAgent(ctx, sgn, t.dev, t.id) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 377 | if err != nil { |
Serge Bazanski | 1038313 | 2023-02-20 15:39:45 +0100 | [diff] [blame] | 378 | return fmt.Errorf("while starting the agent: %w", err) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 379 | } |
| 380 | |
| 381 | // Agent startup succeeded. Set the appropriate BMDB tag, and release the |
| 382 | // lock. |
| 383 | klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.id, t.pid, hex.EncodeToString(apk)) |
| 384 | err = t.work.Finish(ctx, func(q *model.Queries) error { |
| 385 | return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{ |
| 386 | MachineID: t.id, |
| 387 | AgentStartedAt: time.Now(), |
| 388 | AgentPublicKey: apk, |
| 389 | }) |
| 390 | }) |
| 391 | if err != nil { |
| 392 | return fmt.Errorf("while setting AgentStarted tag: %w", err) |
| 393 | } |
| 394 | return nil |
| 395 | } |
| 396 | |
| 397 | // source supplies returns a BMDB-locked server ready for initialization, locked |
| 398 | // by a work item. If both task and error are nil, then there are no machines |
| 399 | // needed to be initialized. |
| 400 | // The returned work item in task _must_ be canceled or finished by the caller. |
| 401 | func (ir *Initializer) source(ctx context.Context, sess *bmdb.Session) (*task, error) { |
| 402 | ir.config.DBQueryLimiter.Wait(ctx) |
| 403 | |
| 404 | var machine *model.MachineProvided |
Serge Bazanski | 1038313 | 2023-02-20 15:39:45 +0100 | [diff] [blame] | 405 | work, err := sess.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) { |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 406 | machines, err := q.GetMachinesForAgentStart(ctx, 1) |
| 407 | if err != nil { |
| 408 | return nil, err |
| 409 | } |
| 410 | if len(machines) < 1 { |
| 411 | return nil, bmdb.ErrNothingToDo |
| 412 | } |
| 413 | machine = &machines[0] |
| 414 | return []uuid.UUID{machines[0].MachineID}, nil |
| 415 | }) |
| 416 | |
| 417 | if errors.Is(err, bmdb.ErrNothingToDo) { |
| 418 | return nil, nil |
| 419 | } |
| 420 | |
| 421 | if err != nil { |
| 422 | return nil, fmt.Errorf("while querying BMDB agent candidates: %w", err) |
| 423 | } |
| 424 | |
| 425 | pid, err := uuid.Parse(machine.ProviderID) |
| 426 | if err != nil { |
| 427 | t := time.Hour |
| 428 | work.Fail(ctx, &t, fmt.Sprintf("could not parse provider UUID %q", machine.ProviderID)) |
| 429 | return nil, fmt.Errorf("while parsing provider UUID: %w", err) |
| 430 | } |
| 431 | |
| 432 | return &task{ |
| 433 | id: machine.MachineID, |
| 434 | pid: pid, |
| 435 | work: work, |
| 436 | }, nil |
| 437 | } |