blob: 7f1af852faa5f3ee5db286c0501740f4b92b0187 [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "crypto/ed25519"
6 "encoding/hex"
7 "errors"
8 "flag"
9 "fmt"
10 "net"
11 "os"
12 "strconv"
13 "strings"
14 "time"
15
16 "github.com/google/uuid"
17 "github.com/packethost/packngo"
18 "golang.org/x/crypto/ssh"
Serge Bazanski9eb903d2023-02-20 14:28:19 +010019 "golang.org/x/sync/errgroup"
Serge Bazanskicaa12082023-02-16 14:54:04 +010020 "golang.org/x/time/rate"
21 "google.golang.org/protobuf/proto"
22 "k8s.io/klog/v2"
23
24 apb "source.monogon.dev/cloud/agent/api"
25 "source.monogon.dev/cloud/bmaas/bmdb"
26 "source.monogon.dev/cloud/bmaas/bmdb/model"
27 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
28)
29
30// AgentConfig configures how the Initializer will deploy Agents on machines. In
31// CLI scenarios, this should be populated from flags via RegisterFlags.
32type AgentConfig struct {
33 // Executable is the contents of the agent binary created and run
34 // at the provisioned servers. Must be set.
35 Executable []byte
36
37 // TargetPath is a filesystem destination path used while uploading the BMaaS
38 // agent executable to hosts as part of the initialization process. Must be set.
39 TargetPath string
40
41 // Endpoint is the address Agent will use to contact the BMaaS
42 // infrastructure. Must be set.
43 Endpoint string
44
45 // SSHTimeout is the amount of time set aside for the initializing
46 // SSH session to run its course. Upon timeout, the iteration would be
47 // declared a failure. Must be set.
48 SSHConnectTimeout time.Duration
49 // SSHExecTimeout is the amount of time set aside for executing the agent and
50 // getting its output once the SSH connection has been established. Upon timeout,
51 // the iteration would be declared as failure. Must be set.
52 SSHExecTimeout time.Duration
53}
54
55func (a *AgentConfig) RegisterFlags() {
56 flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
57 if val == "" {
58 return nil
59 }
60 data, err := os.ReadFile(val)
61 if err != nil {
62 return fmt.Errorf("could not read -agent_executable_path: %w", err)
63 }
64 a.Executable = data
65 return nil
66 })
67 flag.StringVar(&a.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
68 flag.StringVar(&a.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
69 flag.DurationVar(&a.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
70 flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
71}
72
73// InitializerConfig configures the broad agent initialization process. The
74// specifics of how an agent is started are instead configured in Agent Config. In
75// CLI scenarios, this should be populated from flags via RegisterFlags.
76type InitializerConfig struct {
77 // DBQueryLimiter limits the rate at which BMDB is queried for servers ready
78 // for BMaaS agent initialization. Must be set.
79 DBQueryLimiter *rate.Limiter
Serge Bazanski9eb903d2023-02-20 14:28:19 +010080
81 // Parallelism is how many instances of the Initializer will be allowed to run in
82 // parallel against the BMDB. This speeds up the process of starting/restarting
83 // agents significantly, as one initializer instance can handle at most one agent
84 // (re)starting process.
85 //
86 // If not set (ie. 0), default to 1. A good starting value for production
87 // deployments is 10 or so.
88 Parallelism int
Serge Bazanskicaa12082023-02-16 14:54:04 +010089}
90
91// flagLimiter configures a *rate.Limiter as a flag.
92func flagLimiter(l **rate.Limiter, name, defval, help string) {
93 syntax := "'duration,count' eg. '2m,10' for a 10-sized bucket refilled at one token every 2 minutes"
94 help = help + fmt.Sprintf(" (default: %q, syntax: %s)", defval, syntax)
95 flag.Func(name, help, func(val string) error {
96 if val == "" {
97 val = defval
98 }
99 parts := strings.Split(val, ",")
100 if len(parts) != 2 {
101 return fmt.Errorf("invalid syntax, want: %s", syntax)
102 }
103 duration, err := time.ParseDuration(parts[0])
104 if err != nil {
105 return fmt.Errorf("invalid duration: %w", err)
106 }
107 refill, err := strconv.ParseUint(parts[1], 10, 31)
108 if err != nil {
109 return fmt.Errorf("invalid refill rate: %w", err)
110 }
111 *l = rate.NewLimiter(rate.Every(duration), int(refill))
112 return nil
113 })
114 flag.Set(name, defval)
115}
116
117func (i *InitializerConfig) RegisterFlags() {
118 flagLimiter(&i.DBQueryLimiter, "initializer_db_query_rate", "250ms,8", "Rate limiting for BMDB queries")
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100119 flag.IntVar(&i.Parallelism, "initializer_parallelism", 1, "How many initializer instances to run in parallel, ie. how many agents to attempt to (re)start at once")
Serge Bazanskicaa12082023-02-16 14:54:04 +0100120}
121
122// Initializer implements the BMaaS agent initialization process. Initialization
123// entails asking the BMDB for machines that need the agent started
124// (or-restarted) and acting upon that.
125type Initializer struct {
126 config *InitializerConfig
127 agentConfig *AgentConfig
128 sharedConfig *SharedConfig
129 sshClient SSHClient
130 // cl is the packngo wrapper used by the initializer.
131 cl ecl.Client
132}
133
134// task describes a single server currently being processed either in the
135// context of agent initialization or recovery.
136type task struct {
137 // id is the BMDB-assigned machine identifier.
138 id uuid.UUID
139 // pid is an identifier assigned by the provider (Equinix).
140 pid uuid.UUID
141 // work is a machine lock facilitated by BMDB that prevents machines from
142 // being processed by multiple workers at the same time.
143 work *bmdb.Work
144 // dev is a provider machine/device record.
145 dev *packngo.Device
146}
147
148// New creates an Initializer instance, checking the InitializerConfig,
149// SharedConfig and AgentConfig for errors.
150func (c *InitializerConfig) New(cl ecl.Client, sc *SharedConfig, ac *AgentConfig) (*Initializer, error) {
151 if err := sc.check(); err != nil {
152 return nil, err
153 }
154 if len(ac.Executable) == 0 {
155 return nil, fmt.Errorf("agent executable not configured")
156 }
157 if ac.TargetPath == "" {
158 return nil, fmt.Errorf("agent target path must be set")
159 }
160 if ac.Endpoint == "" {
161 return nil, fmt.Errorf("agent endpoint must be set")
162 }
163 if ac.SSHConnectTimeout == 0 {
164 return nil, fmt.Errorf("agent SSH connection timeout must be set")
165 }
166 if ac.SSHExecTimeout == 0 {
167 return nil, fmt.Errorf("agent SSH execution timeout must be set")
168 }
169 if c.DBQueryLimiter == nil {
170 return nil, fmt.Errorf("DBQueryLimiter must be configured")
171 }
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100172 if c.Parallelism == 0 {
173 c.Parallelism = 1
174 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100175 return &Initializer{
176 config: c,
177 sharedConfig: sc,
178 agentConfig: ac,
179 sshClient: &PlainSSHClient{},
180 cl: cl,
181 }, nil
182}
183
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100184// Run the initializer(s) (depending on opts.Parallelism) blocking the current
185// goroutine until the given context expires and all provisioners quit.
186func (i *Initializer) Run(ctx context.Context, conn *bmdb.Connection) error {
187 eg := errgroup.Group{}
188 for j := 0; j < i.config.Parallelism; j += 1 {
189 eg.Go(func() error {
190 return i.runOne(ctx, conn)
191 })
192 }
193 return eg.Wait()
194}
195
Serge Bazanskicaa12082023-02-16 14:54:04 +0100196// Run the initializer blocking the current goroutine until the given context
197// expires.
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100198func (c *Initializer) runOne(ctx context.Context, conn *bmdb.Connection) error {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100199 signer, err := c.sharedConfig.sshSigner()
200 if err != nil {
201 return fmt.Errorf("could not initialize signer: %w", err)
202 }
203
204 // Maintain a BMDB session as long as possible.
205 var sess *bmdb.Session
206 for {
207 if sess == nil {
208 sess, err = conn.StartSession(ctx)
209 if err != nil {
210 return fmt.Errorf("could not start BMDB session: %w", err)
211 }
212 }
213 // Inside that session, run the main logic.
214 err = c.runInSession(ctx, sess, signer)
215
216 switch {
217 case err == nil:
218 case errors.Is(err, ctx.Err()):
219 return err
220 case errors.Is(err, bmdb.ErrSessionExpired):
221 klog.Errorf("Session expired, restarting...")
222 sess = nil
223 time.Sleep(time.Second)
224 case err != nil:
225 klog.Errorf("Processing failed: %v", err)
226 // TODO(q3k): close session
227 time.Sleep(time.Second)
228 }
229 }
230}
231
232// runInSession executes one iteration of the initializer's control loop within a
233// BMDB session. This control loop attempts to start or re-start the agent on any
234// machines that need this per the BMDB.
235func (c *Initializer) runInSession(ctx context.Context, sess *bmdb.Session, signer ssh.Signer) error {
236 t, err := c.source(ctx, sess)
237 if err != nil {
238 return fmt.Errorf("could not source machine: %w", err)
239 }
240 if t == nil {
241 return nil
242 }
243 defer t.work.Cancel(ctx)
244
Serge Bazanski10383132023-02-20 15:39:45 +0100245 klog.Infof("Machine %q needs agent start, fetching corresponding packngo device %q...", t.id, t.pid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100246 dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.pid.String())
247 if err != nil {
248 klog.Errorf("failed to fetch device %q: %v", t.pid, err)
249 d := 30 * time.Second
250 err = t.work.Fail(ctx, &d, "failed to fetch device from equinix")
251 return err
252 }
253 t.dev = dev
254
255 err = c.init(ctx, signer, t)
256 if err != nil {
257 klog.Errorf("Failed to initialize: %v", err)
258 d := 1 * time.Minute
259 err = t.work.Fail(ctx, &d, fmt.Sprintf("failed to initialize machine: %v", err))
260 return err
261 }
262 return nil
263}
264
265// startAgent runs the agent executable on the target device d, returning the
266// agent's public key on success.
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100267func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d *packngo.Device, mid uuid.UUID) ([]byte, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100268 // Provide a bound on execution time in case we get stuck after the SSH
269 // connection is established.
270 sctx, sctxC := context.WithTimeout(ctx, i.agentConfig.SSHExecTimeout)
271 defer sctxC()
272
273 // Use the device's IP address exposed by Equinix API.
274 ni := d.GetNetworkInfo()
275 var addr string
276 if ni.PublicIPv4 != "" {
277 addr = net.JoinHostPort(ni.PublicIPv4, "22")
278 } else if ni.PublicIPv6 != "" {
279 addr = net.JoinHostPort(ni.PublicIPv6, "22")
280 } else {
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100281 return nil, fmt.Errorf("device (machine ID: %s) has no available addresses", mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100282 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100283 klog.V(1).Infof("Dialing device (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100284
285 conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.agentConfig.SSHConnectTimeout)
286 if err != nil {
287 return nil, fmt.Errorf("while dialing the device: %w", err)
288 }
289 defer conn.Close()
290
291 // Upload the agent executable.
292
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100293 klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100294 if err := conn.Upload(sctx, i.agentConfig.TargetPath, i.agentConfig.Executable); err != nil {
295 return nil, fmt.Errorf("while uploading agent executable: %w", err)
296 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100297 klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100298
299 // The initialization protobuf message will be sent to the agent on its
300 // standard input.
301 imsg := apb.TakeoverInit{
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100302 MachineId: mid.String(),
Serge Bazanskicaa12082023-02-16 14:54:04 +0100303 BmaasEndpoint: i.agentConfig.Endpoint,
304 }
305 imsgb, err := proto.Marshal(&imsg)
306 if err != nil {
307 return nil, fmt.Errorf("while marshaling agent message: %w", err)
308 }
309
310 // Start the agent and wait for the agent's output to arrive.
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100311 klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.agentConfig.TargetPath, mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100312 stdout, stderr, err := conn.Execute(ctx, i.agentConfig.TargetPath, imsgb)
313 stderrStr := strings.TrimSpace(string(stderr))
314 if stderrStr != "" {
315 klog.Warningf("Agent stderr: %q", stderrStr)
316 }
317 if err != nil {
318 return nil, fmt.Errorf("while starting the agent executable: %w", err)
319 }
320
321 var arsp apb.TakeoverResponse
322 if err := proto.Unmarshal(stdout, &arsp); err != nil {
323 return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
324 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100325 var successResp *apb.TakeoverSuccess
326 switch r := arsp.Result.(type) {
327 case *apb.TakeoverResponse_Error:
328 return nil, fmt.Errorf("agent returned error: %v", r.Error.Message)
329 case *apb.TakeoverResponse_Success:
330 successResp = r.Success
331 default:
332 return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result)
333 }
334 if !proto.Equal(&imsg, successResp.InitMessage) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100335 return nil, fmt.Errorf("agent did not send back the init message.")
336 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100337 if len(successResp.Key) != ed25519.PublicKeySize {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100338 return nil, fmt.Errorf("agent key length mismatch.")
339 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100340 klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
Lorenz Brun595dfe92023-02-21 19:13:02 +0100341 return successResp.Key, nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100342}
343
344// init initializes the server described by t, using BMDB session 'sess' to set
345// the relevant BMDB tag on success, and 'sgn' to authenticate to the server.
346func (ir *Initializer) init(ctx context.Context, sgn ssh.Signer, t *task) error {
347 // Start the agent.
348 klog.Infof("Starting agent on device (ID: %s, PID %s)", t.id, t.pid)
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100349 apk, err := ir.startAgent(ctx, sgn, t.dev, t.id)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100350 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100351 return fmt.Errorf("while starting the agent: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100352 }
353
354 // Agent startup succeeded. Set the appropriate BMDB tag, and release the
355 // lock.
356 klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.id, t.pid, hex.EncodeToString(apk))
357 err = t.work.Finish(ctx, func(q *model.Queries) error {
358 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
359 MachineID: t.id,
360 AgentStartedAt: time.Now(),
361 AgentPublicKey: apk,
362 })
363 })
364 if err != nil {
365 return fmt.Errorf("while setting AgentStarted tag: %w", err)
366 }
367 return nil
368}
369
370// source supplies returns a BMDB-locked server ready for initialization, locked
371// by a work item. If both task and error are nil, then there are no machines
372// needed to be initialized.
373// The returned work item in task _must_ be canceled or finished by the caller.
374func (ir *Initializer) source(ctx context.Context, sess *bmdb.Session) (*task, error) {
375 ir.config.DBQueryLimiter.Wait(ctx)
376
377 var machine *model.MachineProvided
Serge Bazanski10383132023-02-20 15:39:45 +0100378 work, err := sess.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100379 machines, err := q.GetMachinesForAgentStart(ctx, 1)
380 if err != nil {
381 return nil, err
382 }
383 if len(machines) < 1 {
384 return nil, bmdb.ErrNothingToDo
385 }
386 machine = &machines[0]
387 return []uuid.UUID{machines[0].MachineID}, nil
388 })
389
390 if errors.Is(err, bmdb.ErrNothingToDo) {
391 return nil, nil
392 }
393
394 if err != nil {
395 return nil, fmt.Errorf("while querying BMDB agent candidates: %w", err)
396 }
397
398 pid, err := uuid.Parse(machine.ProviderID)
399 if err != nil {
400 t := time.Hour
401 work.Fail(ctx, &t, fmt.Sprintf("could not parse provider UUID %q", machine.ProviderID))
402 return nil, fmt.Errorf("while parsing provider UUID: %w", err)
403 }
404
405 return &task{
406 id: machine.MachineID,
407 pid: pid,
408 work: work,
409 }, nil
410}