blob: c90f8d88c7c53ad1e41e83f94697472aa517d542 [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "crypto/ed25519"
Serge Bazanski51987d62023-04-06 16:35:35 +02006 "crypto/x509"
Serge Bazanskicaa12082023-02-16 14:54:04 +01007 "encoding/hex"
Serge Bazanski51987d62023-04-06 16:35:35 +02008 "encoding/pem"
Serge Bazanskicaa12082023-02-16 14:54:04 +01009 "flag"
10 "fmt"
11 "net"
12 "os"
Serge Bazanskicaa12082023-02-16 14:54:04 +010013 "strings"
14 "time"
15
16 "github.com/google/uuid"
17 "github.com/packethost/packngo"
18 "golang.org/x/crypto/ssh"
Serge Bazanskicaa12082023-02-16 14:54:04 +010019 "google.golang.org/protobuf/proto"
20 "k8s.io/klog/v2"
21
22 apb "source.monogon.dev/cloud/agent/api"
Serge Bazanski00cf57d2023-04-20 11:19:00 +020023 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanskicaa12082023-02-16 14:54:04 +010024 "source.monogon.dev/cloud/bmaas/bmdb/model"
25 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
26)
27
Serge Bazanski86a714d2023-04-17 15:54:21 +020028// InitializerConfig configures how the Initializer will deploy Agents on
29// machines. In CLI scenarios, this should be populated from flags via
30// RegisterFlags.
31type InitializerConfig struct {
32 ControlLoopConfig
33
Serge Bazanskicaa12082023-02-16 14:54:04 +010034 // Executable is the contents of the agent binary created and run
35 // at the provisioned servers. Must be set.
36 Executable []byte
37
38 // TargetPath is a filesystem destination path used while uploading the BMaaS
39 // agent executable to hosts as part of the initialization process. Must be set.
40 TargetPath string
41
42 // Endpoint is the address Agent will use to contact the BMaaS
43 // infrastructure. Must be set.
44 Endpoint string
Serge Bazanski51987d62023-04-06 16:35:35 +020045 // EndpointCACertificate is an optional DER-encoded (but not PEM-armored) X509
46 // certificate used to populate the trusted CA store of the agent. It should be
47 // set to the CA certificate of the endpoint if not using a system-trusted CA
48 // certificate.
49 EndpointCACertificate []byte
Serge Bazanskicaa12082023-02-16 14:54:04 +010050
51 // SSHTimeout is the amount of time set aside for the initializing
52 // SSH session to run its course. Upon timeout, the iteration would be
53 // declared a failure. Must be set.
54 SSHConnectTimeout time.Duration
55 // SSHExecTimeout is the amount of time set aside for executing the agent and
56 // getting its output once the SSH connection has been established. Upon timeout,
57 // the iteration would be declared as failure. Must be set.
58 SSHExecTimeout time.Duration
59}
60
Serge Bazanski86a714d2023-04-17 15:54:21 +020061func (a *InitializerConfig) RegisterFlags() {
62 a.ControlLoopConfig.RegisterFlags("initializer")
63
Serge Bazanskicaa12082023-02-16 14:54:04 +010064 flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
65 if val == "" {
66 return nil
67 }
68 data, err := os.ReadFile(val)
69 if err != nil {
Serge Bazanski77b11d32023-04-06 14:43:19 +020070 return fmt.Errorf("could not read: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +010071 }
72 a.Executable = data
73 return nil
74 })
75 flag.StringVar(&a.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
76 flag.StringVar(&a.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
Serge Bazanski51987d62023-04-06 16:35:35 +020077 flag.Func("agent_endpoint_ca_certificate_path", "Path to PEM X509 CA certificate that the agent endpoint is serving with. If not set, the agent will attempt to use system CA certificates to authenticate the endpoint.", func(val string) error {
78 if val == "" {
79 return nil
80 }
81 data, err := os.ReadFile(val)
82 if err != nil {
83 return fmt.Errorf("could not read: %w", err)
84 }
85 block, _ := pem.Decode(data)
86 if block.Type != "CERTIFICATE" {
87 return fmt.Errorf("not a certificate")
88 }
89 _, err = x509.ParseCertificate(block.Bytes)
90 if err != nil {
91 return fmt.Errorf("invalid certificate: %w", err)
92 }
93 a.EndpointCACertificate = block.Bytes
94 return nil
95 })
Serge Bazanskicaa12082023-02-16 14:54:04 +010096 flag.DurationVar(&a.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
97 flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
98}
99
Serge Bazanski86a714d2023-04-17 15:54:21 +0200100// The Initializer starts the agent on machines that aren't yet running it.
Serge Bazanskicaa12082023-02-16 14:54:04 +0100101type Initializer struct {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200102 InitializerConfig
103
Serge Bazanskicaa12082023-02-16 14:54:04 +0100104 sharedConfig *SharedConfig
Serge Bazanski86a714d2023-04-17 15:54:21 +0200105 cl ecl.Client
106 signer ssh.Signer
Serge Bazanskicaa12082023-02-16 14:54:04 +0100107 sshClient SSHClient
Serge Bazanskicaa12082023-02-16 14:54:04 +0100108}
109
Serge Bazanski86a714d2023-04-17 15:54:21 +0200110// NewInitializer creates an Initializer instance, checking the
111// InitializerConfig, SharedConfig and AgentConfig for errors.
112func NewInitializer(cl ecl.Client, ic InitializerConfig, sc *SharedConfig) (*Initializer, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100113 if err := sc.check(); err != nil {
114 return nil, err
115 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200116 if err := ic.ControlLoopConfig.Check(); err != nil {
117 return nil, err
118 }
119
120 if len(ic.Executable) == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100121 return nil, fmt.Errorf("agent executable not configured")
122 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200123 if ic.TargetPath == "" {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100124 return nil, fmt.Errorf("agent target path must be set")
125 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200126 if ic.Endpoint == "" {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100127 return nil, fmt.Errorf("agent endpoint must be set")
128 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200129 if ic.SSHConnectTimeout == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100130 return nil, fmt.Errorf("agent SSH connection timeout must be set")
131 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200132 if ic.SSHExecTimeout == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100133 return nil, fmt.Errorf("agent SSH execution timeout must be set")
134 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200135
136 signer, err := sc.sshSigner()
137 if err != nil {
138 return nil, fmt.Errorf("could not initialize signer: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100139 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200140
Serge Bazanskicaa12082023-02-16 14:54:04 +0100141 return &Initializer{
Serge Bazanski86a714d2023-04-17 15:54:21 +0200142 InitializerConfig: ic,
143
Serge Bazanskicaa12082023-02-16 14:54:04 +0100144 sharedConfig: sc,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100145 cl: cl,
Serge Bazanski86a714d2023-04-17 15:54:21 +0200146 sshClient: &PlainSSHClient{},
147 signer: signer,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100148 }, nil
149}
150
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200151func (c *Initializer) getProcessInfo() processInfo {
152 return processInfo{
153 process: model.ProcessShepherdAgentStart,
154 defaultBackoff: bmdb.Backoff{
155 Initial: 5 * time.Minute,
156 Maximum: 4 * time.Hour,
157 Exponent: 1.2,
158 },
159 }
160}
161
Serge Bazanski86a714d2023-04-17 15:54:21 +0200162func (c *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
163 return q.GetMachinesForAgentStart(ctx, limit)
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100164}
165
Serge Bazanski86a714d2023-04-17 15:54:21 +0200166func (c *Initializer) processMachine(ctx context.Context, t *task) error {
Serge Bazanski4969fd72023-04-19 17:43:12 +0200167 dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.machine.ProviderID, nil)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100168 if err != nil {
Serge Bazanski20312b42023-04-19 13:49:47 +0200169 return fmt.Errorf("while fetching device %q: %v", t.machine.ProviderID, err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100170 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100171
Serge Bazanski86a714d2023-04-17 15:54:21 +0200172 // Start the agent.
173 klog.Infof("Starting agent on device (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
174 apk, err := c.startAgent(ctx, c.signer, dev, t.machine.MachineID)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100175 if err != nil {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200176 return fmt.Errorf("while starting the agent: %w", err)
177 }
178
179 // Agent startup succeeded. Set the appropriate BMDB tag, and release the
180 // lock.
181 klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.machine.MachineID, t.machine.ProviderID, hex.EncodeToString(apk))
182 err = t.work.Finish(ctx, func(q *model.Queries) error {
183 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
184 MachineID: t.machine.MachineID,
185 AgentStartedAt: time.Now(),
186 AgentPublicKey: apk,
187 })
188 })
189 if err != nil {
190 return fmt.Errorf("while setting AgentStarted tag: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100191 }
192 return nil
193}
194
195// startAgent runs the agent executable on the target device d, returning the
196// agent's public key on success.
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100197func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d *packngo.Device, mid uuid.UUID) ([]byte, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100198 // Provide a bound on execution time in case we get stuck after the SSH
199 // connection is established.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200200 sctx, sctxC := context.WithTimeout(ctx, i.SSHExecTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100201 defer sctxC()
202
203 // Use the device's IP address exposed by Equinix API.
204 ni := d.GetNetworkInfo()
205 var addr string
206 if ni.PublicIPv4 != "" {
207 addr = net.JoinHostPort(ni.PublicIPv4, "22")
208 } else if ni.PublicIPv6 != "" {
209 addr = net.JoinHostPort(ni.PublicIPv6, "22")
210 } else {
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100211 return nil, fmt.Errorf("device (machine ID: %s) has no available addresses", mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100212 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100213 klog.V(1).Infof("Dialing device (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100214
Serge Bazanski86a714d2023-04-17 15:54:21 +0200215 conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.SSHConnectTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100216 if err != nil {
217 return nil, fmt.Errorf("while dialing the device: %w", err)
218 }
219 defer conn.Close()
220
221 // Upload the agent executable.
222
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100223 klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanski86a714d2023-04-17 15:54:21 +0200224 if err := conn.Upload(sctx, i.TargetPath, i.Executable); err != nil {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100225 return nil, fmt.Errorf("while uploading agent executable: %w", err)
226 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100227 klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100228
229 // The initialization protobuf message will be sent to the agent on its
230 // standard input.
231 imsg := apb.TakeoverInit{
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100232 MachineId: mid.String(),
Serge Bazanski86a714d2023-04-17 15:54:21 +0200233 BmaasEndpoint: i.Endpoint,
234 CaCertificate: i.EndpointCACertificate,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100235 }
236 imsgb, err := proto.Marshal(&imsg)
237 if err != nil {
238 return nil, fmt.Errorf("while marshaling agent message: %w", err)
239 }
240
241 // Start the agent and wait for the agent's output to arrive.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200242 klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.TargetPath, mid)
243 stdout, stderr, err := conn.Execute(ctx, i.TargetPath, imsgb)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100244 stderrStr := strings.TrimSpace(string(stderr))
245 if stderrStr != "" {
246 klog.Warningf("Agent stderr: %q", stderrStr)
247 }
248 if err != nil {
249 return nil, fmt.Errorf("while starting the agent executable: %w", err)
250 }
251
252 var arsp apb.TakeoverResponse
253 if err := proto.Unmarshal(stdout, &arsp); err != nil {
254 return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
255 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100256 var successResp *apb.TakeoverSuccess
257 switch r := arsp.Result.(type) {
258 case *apb.TakeoverResponse_Error:
259 return nil, fmt.Errorf("agent returned error: %v", r.Error.Message)
260 case *apb.TakeoverResponse_Success:
261 successResp = r.Success
262 default:
263 return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result)
264 }
265 if !proto.Equal(&imsg, successResp.InitMessage) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100266 return nil, fmt.Errorf("agent did not send back the init message.")
267 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100268 if len(successResp.Key) != ed25519.PublicKeySize {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100269 return nil, fmt.Errorf("agent key length mismatch.")
270 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100271 klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
Lorenz Brun595dfe92023-02-21 19:13:02 +0100272 return successResp.Key, nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100273}