blob: 5b1ea497e059bb9d2450d4de0d485d3c91bf01ea [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "crypto/ed25519"
6 "encoding/hex"
7 "errors"
8 "flag"
9 "fmt"
10 "net"
11 "os"
12 "strconv"
13 "strings"
14 "time"
15
16 "github.com/google/uuid"
17 "github.com/packethost/packngo"
18 "golang.org/x/crypto/ssh"
19 "golang.org/x/time/rate"
20 "google.golang.org/protobuf/proto"
21 "k8s.io/klog/v2"
22
23 apb "source.monogon.dev/cloud/agent/api"
24 "source.monogon.dev/cloud/bmaas/bmdb"
25 "source.monogon.dev/cloud/bmaas/bmdb/model"
26 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
27)
28
29// AgentConfig configures how the Initializer will deploy Agents on machines. In
30// CLI scenarios, this should be populated from flags via RegisterFlags.
31type AgentConfig struct {
32 // Executable is the contents of the agent binary created and run
33 // at the provisioned servers. Must be set.
34 Executable []byte
35
36 // TargetPath is a filesystem destination path used while uploading the BMaaS
37 // agent executable to hosts as part of the initialization process. Must be set.
38 TargetPath string
39
40 // Endpoint is the address Agent will use to contact the BMaaS
41 // infrastructure. Must be set.
42 Endpoint string
43
44 // SSHTimeout is the amount of time set aside for the initializing
45 // SSH session to run its course. Upon timeout, the iteration would be
46 // declared a failure. Must be set.
47 SSHConnectTimeout time.Duration
48 // SSHExecTimeout is the amount of time set aside for executing the agent and
49 // getting its output once the SSH connection has been established. Upon timeout,
50 // the iteration would be declared as failure. Must be set.
51 SSHExecTimeout time.Duration
52}
53
54func (a *AgentConfig) RegisterFlags() {
55 flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
56 if val == "" {
57 return nil
58 }
59 data, err := os.ReadFile(val)
60 if err != nil {
61 return fmt.Errorf("could not read -agent_executable_path: %w", err)
62 }
63 a.Executable = data
64 return nil
65 })
66 flag.StringVar(&a.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
67 flag.StringVar(&a.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
68 flag.DurationVar(&a.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
69 flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
70}
71
72// InitializerConfig configures the broad agent initialization process. The
73// specifics of how an agent is started are instead configured in Agent Config. In
74// CLI scenarios, this should be populated from flags via RegisterFlags.
75type InitializerConfig struct {
76 // DBQueryLimiter limits the rate at which BMDB is queried for servers ready
77 // for BMaaS agent initialization. Must be set.
78 DBQueryLimiter *rate.Limiter
79}
80
81// flagLimiter configures a *rate.Limiter as a flag.
82func flagLimiter(l **rate.Limiter, name, defval, help string) {
83 syntax := "'duration,count' eg. '2m,10' for a 10-sized bucket refilled at one token every 2 minutes"
84 help = help + fmt.Sprintf(" (default: %q, syntax: %s)", defval, syntax)
85 flag.Func(name, help, func(val string) error {
86 if val == "" {
87 val = defval
88 }
89 parts := strings.Split(val, ",")
90 if len(parts) != 2 {
91 return fmt.Errorf("invalid syntax, want: %s", syntax)
92 }
93 duration, err := time.ParseDuration(parts[0])
94 if err != nil {
95 return fmt.Errorf("invalid duration: %w", err)
96 }
97 refill, err := strconv.ParseUint(parts[1], 10, 31)
98 if err != nil {
99 return fmt.Errorf("invalid refill rate: %w", err)
100 }
101 *l = rate.NewLimiter(rate.Every(duration), int(refill))
102 return nil
103 })
104 flag.Set(name, defval)
105}
106
107func (i *InitializerConfig) RegisterFlags() {
108 flagLimiter(&i.DBQueryLimiter, "initializer_db_query_rate", "250ms,8", "Rate limiting for BMDB queries")
109}
110
111// Initializer implements the BMaaS agent initialization process. Initialization
112// entails asking the BMDB for machines that need the agent started
113// (or-restarted) and acting upon that.
114type Initializer struct {
115 config *InitializerConfig
116 agentConfig *AgentConfig
117 sharedConfig *SharedConfig
118 sshClient SSHClient
119 // cl is the packngo wrapper used by the initializer.
120 cl ecl.Client
121}
122
123// task describes a single server currently being processed either in the
124// context of agent initialization or recovery.
125type task struct {
126 // id is the BMDB-assigned machine identifier.
127 id uuid.UUID
128 // pid is an identifier assigned by the provider (Equinix).
129 pid uuid.UUID
130 // work is a machine lock facilitated by BMDB that prevents machines from
131 // being processed by multiple workers at the same time.
132 work *bmdb.Work
133 // dev is a provider machine/device record.
134 dev *packngo.Device
135}
136
137// New creates an Initializer instance, checking the InitializerConfig,
138// SharedConfig and AgentConfig for errors.
139func (c *InitializerConfig) New(cl ecl.Client, sc *SharedConfig, ac *AgentConfig) (*Initializer, error) {
140 if err := sc.check(); err != nil {
141 return nil, err
142 }
143 if len(ac.Executable) == 0 {
144 return nil, fmt.Errorf("agent executable not configured")
145 }
146 if ac.TargetPath == "" {
147 return nil, fmt.Errorf("agent target path must be set")
148 }
149 if ac.Endpoint == "" {
150 return nil, fmt.Errorf("agent endpoint must be set")
151 }
152 if ac.SSHConnectTimeout == 0 {
153 return nil, fmt.Errorf("agent SSH connection timeout must be set")
154 }
155 if ac.SSHExecTimeout == 0 {
156 return nil, fmt.Errorf("agent SSH execution timeout must be set")
157 }
158 if c.DBQueryLimiter == nil {
159 return nil, fmt.Errorf("DBQueryLimiter must be configured")
160 }
161 return &Initializer{
162 config: c,
163 sharedConfig: sc,
164 agentConfig: ac,
165 sshClient: &PlainSSHClient{},
166 cl: cl,
167 }, nil
168}
169
170// Run the initializer blocking the current goroutine until the given context
171// expires.
172func (c *Initializer) Run(ctx context.Context, conn *bmdb.Connection) error {
173 signer, err := c.sharedConfig.sshSigner()
174 if err != nil {
175 return fmt.Errorf("could not initialize signer: %w", err)
176 }
177
178 // Maintain a BMDB session as long as possible.
179 var sess *bmdb.Session
180 for {
181 if sess == nil {
182 sess, err = conn.StartSession(ctx)
183 if err != nil {
184 return fmt.Errorf("could not start BMDB session: %w", err)
185 }
186 }
187 // Inside that session, run the main logic.
188 err = c.runInSession(ctx, sess, signer)
189
190 switch {
191 case err == nil:
192 case errors.Is(err, ctx.Err()):
193 return err
194 case errors.Is(err, bmdb.ErrSessionExpired):
195 klog.Errorf("Session expired, restarting...")
196 sess = nil
197 time.Sleep(time.Second)
198 case err != nil:
199 klog.Errorf("Processing failed: %v", err)
200 // TODO(q3k): close session
201 time.Sleep(time.Second)
202 }
203 }
204}
205
206// runInSession executes one iteration of the initializer's control loop within a
207// BMDB session. This control loop attempts to start or re-start the agent on any
208// machines that need this per the BMDB.
209func (c *Initializer) runInSession(ctx context.Context, sess *bmdb.Session, signer ssh.Signer) error {
210 t, err := c.source(ctx, sess)
211 if err != nil {
212 return fmt.Errorf("could not source machine: %w", err)
213 }
214 if t == nil {
215 return nil
216 }
217 defer t.work.Cancel(ctx)
218
Serge Bazanski10383132023-02-20 15:39:45 +0100219 klog.Infof("Machine %q needs agent start, fetching corresponding packngo device %q...", t.id, t.pid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100220 dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.pid.String())
221 if err != nil {
222 klog.Errorf("failed to fetch device %q: %v", t.pid, err)
223 d := 30 * time.Second
224 err = t.work.Fail(ctx, &d, "failed to fetch device from equinix")
225 return err
226 }
227 t.dev = dev
228
229 err = c.init(ctx, signer, t)
230 if err != nil {
231 klog.Errorf("Failed to initialize: %v", err)
232 d := 1 * time.Minute
233 err = t.work.Fail(ctx, &d, fmt.Sprintf("failed to initialize machine: %v", err))
234 return err
235 }
236 return nil
237}
238
239// startAgent runs the agent executable on the target device d, returning the
240// agent's public key on success.
241func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d packngo.Device) ([]byte, error) {
242 // Provide a bound on execution time in case we get stuck after the SSH
243 // connection is established.
244 sctx, sctxC := context.WithTimeout(ctx, i.agentConfig.SSHExecTimeout)
245 defer sctxC()
246
247 // Use the device's IP address exposed by Equinix API.
248 ni := d.GetNetworkInfo()
249 var addr string
250 if ni.PublicIPv4 != "" {
251 addr = net.JoinHostPort(ni.PublicIPv4, "22")
252 } else if ni.PublicIPv6 != "" {
253 addr = net.JoinHostPort(ni.PublicIPv6, "22")
254 } else {
255 return nil, fmt.Errorf("device (ID: %s) has no available addresses", d.ID)
256 }
257 klog.V(1).Infof("Dialing device (provider ID: %s, addr: %s).", d.ID, addr)
258
259 conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.agentConfig.SSHConnectTimeout)
260 if err != nil {
261 return nil, fmt.Errorf("while dialing the device: %w", err)
262 }
263 defer conn.Close()
264
265 // Upload the agent executable.
266
267 klog.Infof("Uploading the agent executable (provider ID: %s, addr: %s).", d.ID, addr)
268 if err := conn.Upload(sctx, i.agentConfig.TargetPath, i.agentConfig.Executable); err != nil {
269 return nil, fmt.Errorf("while uploading agent executable: %w", err)
270 }
271 klog.V(1).Infof("Upload successful (provider ID: %s, addr: %s).", d.ID, addr)
272
273 // The initialization protobuf message will be sent to the agent on its
274 // standard input.
275 imsg := apb.TakeoverInit{
276 Provider: "equinix",
277 ProviderId: d.ID,
278 BmaasEndpoint: i.agentConfig.Endpoint,
279 }
280 imsgb, err := proto.Marshal(&imsg)
281 if err != nil {
282 return nil, fmt.Errorf("while marshaling agent message: %w", err)
283 }
284
285 // Start the agent and wait for the agent's output to arrive.
286 klog.V(1).Infof("Starting the agent executable at path %q (provider ID: %s).", i.agentConfig.TargetPath, d.ID)
287 stdout, stderr, err := conn.Execute(ctx, i.agentConfig.TargetPath, imsgb)
288 stderrStr := strings.TrimSpace(string(stderr))
289 if stderrStr != "" {
290 klog.Warningf("Agent stderr: %q", stderrStr)
291 }
292 if err != nil {
293 return nil, fmt.Errorf("while starting the agent executable: %w", err)
294 }
295
296 var arsp apb.TakeoverResponse
297 if err := proto.Unmarshal(stdout, &arsp); err != nil {
298 return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
299 }
300 if !proto.Equal(&imsg, arsp.InitMessage) {
301 return nil, fmt.Errorf("agent did not send back the init message.")
302 }
303 if len(arsp.Key) != ed25519.PublicKeySize {
304 return nil, fmt.Errorf("agent key length mismatch.")
305 }
306 klog.Infof("Started the agent (provider ID: %s, key: %s).", d.ID, hex.EncodeToString(arsp.Key))
307 return arsp.Key, nil
308}
309
310// init initializes the server described by t, using BMDB session 'sess' to set
311// the relevant BMDB tag on success, and 'sgn' to authenticate to the server.
312func (ir *Initializer) init(ctx context.Context, sgn ssh.Signer, t *task) error {
313 // Start the agent.
314 klog.Infof("Starting agent on device (ID: %s, PID %s)", t.id, t.pid)
315 apk, err := ir.startAgent(ctx, sgn, *t.dev)
316 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100317 return fmt.Errorf("while starting the agent: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100318 }
319
320 // Agent startup succeeded. Set the appropriate BMDB tag, and release the
321 // lock.
322 klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.id, t.pid, hex.EncodeToString(apk))
323 err = t.work.Finish(ctx, func(q *model.Queries) error {
324 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
325 MachineID: t.id,
326 AgentStartedAt: time.Now(),
327 AgentPublicKey: apk,
328 })
329 })
330 if err != nil {
331 return fmt.Errorf("while setting AgentStarted tag: %w", err)
332 }
333 return nil
334}
335
336// source supplies returns a BMDB-locked server ready for initialization, locked
337// by a work item. If both task and error are nil, then there are no machines
338// needed to be initialized.
339// The returned work item in task _must_ be canceled or finished by the caller.
340func (ir *Initializer) source(ctx context.Context, sess *bmdb.Session) (*task, error) {
341 ir.config.DBQueryLimiter.Wait(ctx)
342
343 var machine *model.MachineProvided
Serge Bazanski10383132023-02-20 15:39:45 +0100344 work, err := sess.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100345 machines, err := q.GetMachinesForAgentStart(ctx, 1)
346 if err != nil {
347 return nil, err
348 }
349 if len(machines) < 1 {
350 return nil, bmdb.ErrNothingToDo
351 }
352 machine = &machines[0]
353 return []uuid.UUID{machines[0].MachineID}, nil
354 })
355
356 if errors.Is(err, bmdb.ErrNothingToDo) {
357 return nil, nil
358 }
359
360 if err != nil {
361 return nil, fmt.Errorf("while querying BMDB agent candidates: %w", err)
362 }
363
364 pid, err := uuid.Parse(machine.ProviderID)
365 if err != nil {
366 t := time.Hour
367 work.Fail(ctx, &t, fmt.Sprintf("could not parse provider UUID %q", machine.ProviderID))
368 return nil, fmt.Errorf("while parsing provider UUID: %w", err)
369 }
370
371 return &task{
372 id: machine.MachineID,
373 pid: pid,
374 work: work,
375 }, nil
376}