blob: f51d4e440d9d210c5f65cd2b629900bee8a8446d [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "crypto/ed25519"
Serge Bazanski51987d62023-04-06 16:35:35 +02006 "crypto/x509"
Serge Bazanskicaa12082023-02-16 14:54:04 +01007 "encoding/hex"
Serge Bazanski51987d62023-04-06 16:35:35 +02008 "encoding/pem"
Serge Bazanskicaa12082023-02-16 14:54:04 +01009 "errors"
10 "flag"
11 "fmt"
12 "net"
13 "os"
14 "strconv"
15 "strings"
16 "time"
17
18 "github.com/google/uuid"
19 "github.com/packethost/packngo"
20 "golang.org/x/crypto/ssh"
Serge Bazanski9eb903d2023-02-20 14:28:19 +010021 "golang.org/x/sync/errgroup"
Serge Bazanskicaa12082023-02-16 14:54:04 +010022 "golang.org/x/time/rate"
23 "google.golang.org/protobuf/proto"
24 "k8s.io/klog/v2"
25
26 apb "source.monogon.dev/cloud/agent/api"
27 "source.monogon.dev/cloud/bmaas/bmdb"
28 "source.monogon.dev/cloud/bmaas/bmdb/model"
29 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
30)
31
32// AgentConfig configures how the Initializer will deploy Agents on machines. In
33// CLI scenarios, this should be populated from flags via RegisterFlags.
34type AgentConfig struct {
35 // Executable is the contents of the agent binary created and run
36 // at the provisioned servers. Must be set.
37 Executable []byte
38
39 // TargetPath is a filesystem destination path used while uploading the BMaaS
40 // agent executable to hosts as part of the initialization process. Must be set.
41 TargetPath string
42
43 // Endpoint is the address Agent will use to contact the BMaaS
44 // infrastructure. Must be set.
45 Endpoint string
Serge Bazanski51987d62023-04-06 16:35:35 +020046 // EndpointCACertificate is an optional DER-encoded (but not PEM-armored) X509
47 // certificate used to populate the trusted CA store of the agent. It should be
48 // set to the CA certificate of the endpoint if not using a system-trusted CA
49 // certificate.
50 EndpointCACertificate []byte
Serge Bazanskicaa12082023-02-16 14:54:04 +010051
52 // SSHTimeout is the amount of time set aside for the initializing
53 // SSH session to run its course. Upon timeout, the iteration would be
54 // declared a failure. Must be set.
55 SSHConnectTimeout time.Duration
56 // SSHExecTimeout is the amount of time set aside for executing the agent and
57 // getting its output once the SSH connection has been established. Upon timeout,
58 // the iteration would be declared as failure. Must be set.
59 SSHExecTimeout time.Duration
60}
61
62func (a *AgentConfig) RegisterFlags() {
63 flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
64 if val == "" {
65 return nil
66 }
67 data, err := os.ReadFile(val)
68 if err != nil {
Serge Bazanski77b11d32023-04-06 14:43:19 +020069 return fmt.Errorf("could not read: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +010070 }
71 a.Executable = data
72 return nil
73 })
74 flag.StringVar(&a.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
75 flag.StringVar(&a.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
Serge Bazanski51987d62023-04-06 16:35:35 +020076 flag.Func("agent_endpoint_ca_certificate_path", "Path to PEM X509 CA certificate that the agent endpoint is serving with. If not set, the agent will attempt to use system CA certificates to authenticate the endpoint.", func(val string) error {
77 if val == "" {
78 return nil
79 }
80 data, err := os.ReadFile(val)
81 if err != nil {
82 return fmt.Errorf("could not read: %w", err)
83 }
84 block, _ := pem.Decode(data)
85 if block.Type != "CERTIFICATE" {
86 return fmt.Errorf("not a certificate")
87 }
88 _, err = x509.ParseCertificate(block.Bytes)
89 if err != nil {
90 return fmt.Errorf("invalid certificate: %w", err)
91 }
92 a.EndpointCACertificate = block.Bytes
93 return nil
94 })
Serge Bazanskicaa12082023-02-16 14:54:04 +010095 flag.DurationVar(&a.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
96 flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
97}
98
99// InitializerConfig configures the broad agent initialization process. The
100// specifics of how an agent is started are instead configured in Agent Config. In
101// CLI scenarios, this should be populated from flags via RegisterFlags.
102type InitializerConfig struct {
103 // DBQueryLimiter limits the rate at which BMDB is queried for servers ready
104 // for BMaaS agent initialization. Must be set.
105 DBQueryLimiter *rate.Limiter
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100106
107 // Parallelism is how many instances of the Initializer will be allowed to run in
108 // parallel against the BMDB. This speeds up the process of starting/restarting
109 // agents significantly, as one initializer instance can handle at most one agent
110 // (re)starting process.
111 //
112 // If not set (ie. 0), default to 1. A good starting value for production
113 // deployments is 10 or so.
114 Parallelism int
Serge Bazanskicaa12082023-02-16 14:54:04 +0100115}
116
117// flagLimiter configures a *rate.Limiter as a flag.
118func flagLimiter(l **rate.Limiter, name, defval, help string) {
119 syntax := "'duration,count' eg. '2m,10' for a 10-sized bucket refilled at one token every 2 minutes"
120 help = help + fmt.Sprintf(" (default: %q, syntax: %s)", defval, syntax)
121 flag.Func(name, help, func(val string) error {
122 if val == "" {
123 val = defval
124 }
125 parts := strings.Split(val, ",")
126 if len(parts) != 2 {
127 return fmt.Errorf("invalid syntax, want: %s", syntax)
128 }
129 duration, err := time.ParseDuration(parts[0])
130 if err != nil {
131 return fmt.Errorf("invalid duration: %w", err)
132 }
133 refill, err := strconv.ParseUint(parts[1], 10, 31)
134 if err != nil {
135 return fmt.Errorf("invalid refill rate: %w", err)
136 }
137 *l = rate.NewLimiter(rate.Every(duration), int(refill))
138 return nil
139 })
140 flag.Set(name, defval)
141}
142
143func (i *InitializerConfig) RegisterFlags() {
144 flagLimiter(&i.DBQueryLimiter, "initializer_db_query_rate", "250ms,8", "Rate limiting for BMDB queries")
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100145 flag.IntVar(&i.Parallelism, "initializer_parallelism", 1, "How many initializer instances to run in parallel, ie. how many agents to attempt to (re)start at once")
Serge Bazanskicaa12082023-02-16 14:54:04 +0100146}
147
148// Initializer implements the BMaaS agent initialization process. Initialization
149// entails asking the BMDB for machines that need the agent started
150// (or-restarted) and acting upon that.
151type Initializer struct {
152 config *InitializerConfig
153 agentConfig *AgentConfig
154 sharedConfig *SharedConfig
155 sshClient SSHClient
156 // cl is the packngo wrapper used by the initializer.
157 cl ecl.Client
158}
159
160// task describes a single server currently being processed either in the
161// context of agent initialization or recovery.
162type task struct {
163 // id is the BMDB-assigned machine identifier.
164 id uuid.UUID
165 // pid is an identifier assigned by the provider (Equinix).
166 pid uuid.UUID
167 // work is a machine lock facilitated by BMDB that prevents machines from
168 // being processed by multiple workers at the same time.
169 work *bmdb.Work
170 // dev is a provider machine/device record.
171 dev *packngo.Device
172}
173
174// New creates an Initializer instance, checking the InitializerConfig,
175// SharedConfig and AgentConfig for errors.
176func (c *InitializerConfig) New(cl ecl.Client, sc *SharedConfig, ac *AgentConfig) (*Initializer, error) {
177 if err := sc.check(); err != nil {
178 return nil, err
179 }
180 if len(ac.Executable) == 0 {
181 return nil, fmt.Errorf("agent executable not configured")
182 }
183 if ac.TargetPath == "" {
184 return nil, fmt.Errorf("agent target path must be set")
185 }
186 if ac.Endpoint == "" {
187 return nil, fmt.Errorf("agent endpoint must be set")
188 }
189 if ac.SSHConnectTimeout == 0 {
190 return nil, fmt.Errorf("agent SSH connection timeout must be set")
191 }
192 if ac.SSHExecTimeout == 0 {
193 return nil, fmt.Errorf("agent SSH execution timeout must be set")
194 }
195 if c.DBQueryLimiter == nil {
196 return nil, fmt.Errorf("DBQueryLimiter must be configured")
197 }
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100198 if c.Parallelism == 0 {
199 c.Parallelism = 1
200 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100201 return &Initializer{
202 config: c,
203 sharedConfig: sc,
204 agentConfig: ac,
205 sshClient: &PlainSSHClient{},
206 cl: cl,
207 }, nil
208}
209
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100210// Run the initializer(s) (depending on opts.Parallelism) blocking the current
211// goroutine until the given context expires and all provisioners quit.
212func (i *Initializer) Run(ctx context.Context, conn *bmdb.Connection) error {
213 eg := errgroup.Group{}
214 for j := 0; j < i.config.Parallelism; j += 1 {
215 eg.Go(func() error {
216 return i.runOne(ctx, conn)
217 })
218 }
219 return eg.Wait()
220}
221
Serge Bazanskicaa12082023-02-16 14:54:04 +0100222// Run the initializer blocking the current goroutine until the given context
223// expires.
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100224func (c *Initializer) runOne(ctx context.Context, conn *bmdb.Connection) error {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100225 signer, err := c.sharedConfig.sshSigner()
226 if err != nil {
227 return fmt.Errorf("could not initialize signer: %w", err)
228 }
229
230 // Maintain a BMDB session as long as possible.
231 var sess *bmdb.Session
232 for {
233 if sess == nil {
234 sess, err = conn.StartSession(ctx)
235 if err != nil {
236 return fmt.Errorf("could not start BMDB session: %w", err)
237 }
238 }
239 // Inside that session, run the main logic.
240 err = c.runInSession(ctx, sess, signer)
241
242 switch {
243 case err == nil:
244 case errors.Is(err, ctx.Err()):
245 return err
246 case errors.Is(err, bmdb.ErrSessionExpired):
247 klog.Errorf("Session expired, restarting...")
248 sess = nil
249 time.Sleep(time.Second)
250 case err != nil:
251 klog.Errorf("Processing failed: %v", err)
252 // TODO(q3k): close session
253 time.Sleep(time.Second)
254 }
255 }
256}
257
258// runInSession executes one iteration of the initializer's control loop within a
259// BMDB session. This control loop attempts to start or re-start the agent on any
260// machines that need this per the BMDB.
261func (c *Initializer) runInSession(ctx context.Context, sess *bmdb.Session, signer ssh.Signer) error {
262 t, err := c.source(ctx, sess)
263 if err != nil {
264 return fmt.Errorf("could not source machine: %w", err)
265 }
266 if t == nil {
267 return nil
268 }
269 defer t.work.Cancel(ctx)
270
Serge Bazanski10383132023-02-20 15:39:45 +0100271 klog.Infof("Machine %q needs agent start, fetching corresponding packngo device %q...", t.id, t.pid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100272 dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.pid.String())
273 if err != nil {
274 klog.Errorf("failed to fetch device %q: %v", t.pid, err)
275 d := 30 * time.Second
276 err = t.work.Fail(ctx, &d, "failed to fetch device from equinix")
277 return err
278 }
279 t.dev = dev
280
281 err = c.init(ctx, signer, t)
282 if err != nil {
283 klog.Errorf("Failed to initialize: %v", err)
284 d := 1 * time.Minute
285 err = t.work.Fail(ctx, &d, fmt.Sprintf("failed to initialize machine: %v", err))
286 return err
287 }
288 return nil
289}
290
291// startAgent runs the agent executable on the target device d, returning the
292// agent's public key on success.
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100293func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d *packngo.Device, mid uuid.UUID) ([]byte, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100294 // Provide a bound on execution time in case we get stuck after the SSH
295 // connection is established.
296 sctx, sctxC := context.WithTimeout(ctx, i.agentConfig.SSHExecTimeout)
297 defer sctxC()
298
299 // Use the device's IP address exposed by Equinix API.
300 ni := d.GetNetworkInfo()
301 var addr string
302 if ni.PublicIPv4 != "" {
303 addr = net.JoinHostPort(ni.PublicIPv4, "22")
304 } else if ni.PublicIPv6 != "" {
305 addr = net.JoinHostPort(ni.PublicIPv6, "22")
306 } else {
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100307 return nil, fmt.Errorf("device (machine ID: %s) has no available addresses", mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100308 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100309 klog.V(1).Infof("Dialing device (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100310
311 conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.agentConfig.SSHConnectTimeout)
312 if err != nil {
313 return nil, fmt.Errorf("while dialing the device: %w", err)
314 }
315 defer conn.Close()
316
317 // Upload the agent executable.
318
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100319 klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100320 if err := conn.Upload(sctx, i.agentConfig.TargetPath, i.agentConfig.Executable); err != nil {
321 return nil, fmt.Errorf("while uploading agent executable: %w", err)
322 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100323 klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100324
325 // The initialization protobuf message will be sent to the agent on its
326 // standard input.
327 imsg := apb.TakeoverInit{
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100328 MachineId: mid.String(),
Serge Bazanskicaa12082023-02-16 14:54:04 +0100329 BmaasEndpoint: i.agentConfig.Endpoint,
Serge Bazanski51987d62023-04-06 16:35:35 +0200330 CaCertificate: i.agentConfig.EndpointCACertificate,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100331 }
332 imsgb, err := proto.Marshal(&imsg)
333 if err != nil {
334 return nil, fmt.Errorf("while marshaling agent message: %w", err)
335 }
336
337 // Start the agent and wait for the agent's output to arrive.
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100338 klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.agentConfig.TargetPath, mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100339 stdout, stderr, err := conn.Execute(ctx, i.agentConfig.TargetPath, imsgb)
340 stderrStr := strings.TrimSpace(string(stderr))
341 if stderrStr != "" {
342 klog.Warningf("Agent stderr: %q", stderrStr)
343 }
344 if err != nil {
345 return nil, fmt.Errorf("while starting the agent executable: %w", err)
346 }
347
348 var arsp apb.TakeoverResponse
349 if err := proto.Unmarshal(stdout, &arsp); err != nil {
350 return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
351 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100352 var successResp *apb.TakeoverSuccess
353 switch r := arsp.Result.(type) {
354 case *apb.TakeoverResponse_Error:
355 return nil, fmt.Errorf("agent returned error: %v", r.Error.Message)
356 case *apb.TakeoverResponse_Success:
357 successResp = r.Success
358 default:
359 return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result)
360 }
361 if !proto.Equal(&imsg, successResp.InitMessage) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100362 return nil, fmt.Errorf("agent did not send back the init message.")
363 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100364 if len(successResp.Key) != ed25519.PublicKeySize {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100365 return nil, fmt.Errorf("agent key length mismatch.")
366 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100367 klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
Lorenz Brun595dfe92023-02-21 19:13:02 +0100368 return successResp.Key, nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100369}
370
371// init initializes the server described by t, using BMDB session 'sess' to set
372// the relevant BMDB tag on success, and 'sgn' to authenticate to the server.
373func (ir *Initializer) init(ctx context.Context, sgn ssh.Signer, t *task) error {
374 // Start the agent.
375 klog.Infof("Starting agent on device (ID: %s, PID %s)", t.id, t.pid)
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100376 apk, err := ir.startAgent(ctx, sgn, t.dev, t.id)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100377 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100378 return fmt.Errorf("while starting the agent: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100379 }
380
381 // Agent startup succeeded. Set the appropriate BMDB tag, and release the
382 // lock.
383 klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.id, t.pid, hex.EncodeToString(apk))
384 err = t.work.Finish(ctx, func(q *model.Queries) error {
385 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
386 MachineID: t.id,
387 AgentStartedAt: time.Now(),
388 AgentPublicKey: apk,
389 })
390 })
391 if err != nil {
392 return fmt.Errorf("while setting AgentStarted tag: %w", err)
393 }
394 return nil
395}
396
397// source supplies returns a BMDB-locked server ready for initialization, locked
398// by a work item. If both task and error are nil, then there are no machines
399// needed to be initialized.
400// The returned work item in task _must_ be canceled or finished by the caller.
401func (ir *Initializer) source(ctx context.Context, sess *bmdb.Session) (*task, error) {
402 ir.config.DBQueryLimiter.Wait(ctx)
403
404 var machine *model.MachineProvided
Serge Bazanski10383132023-02-20 15:39:45 +0100405 work, err := sess.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100406 machines, err := q.GetMachinesForAgentStart(ctx, 1)
407 if err != nil {
408 return nil, err
409 }
410 if len(machines) < 1 {
411 return nil, bmdb.ErrNothingToDo
412 }
413 machine = &machines[0]
414 return []uuid.UUID{machines[0].MachineID}, nil
415 })
416
417 if errors.Is(err, bmdb.ErrNothingToDo) {
418 return nil, nil
419 }
420
421 if err != nil {
422 return nil, fmt.Errorf("while querying BMDB agent candidates: %w", err)
423 }
424
425 pid, err := uuid.Parse(machine.ProviderID)
426 if err != nil {
427 t := time.Hour
428 work.Fail(ctx, &t, fmt.Sprintf("could not parse provider UUID %q", machine.ProviderID))
429 return nil, fmt.Errorf("while parsing provider UUID: %w", err)
430 }
431
432 return &task{
433 id: machine.MachineID,
434 pid: pid,
435 work: work,
436 }, nil
437}