blob: 4193f4c21b579948c74e6755fa7fc8e1b7bb3798 [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "crypto/ed25519"
Serge Bazanski51987d62023-04-06 16:35:35 +02006 "crypto/x509"
Serge Bazanskicaa12082023-02-16 14:54:04 +01007 "encoding/hex"
Serge Bazanski51987d62023-04-06 16:35:35 +02008 "encoding/pem"
Serge Bazanskicaa12082023-02-16 14:54:04 +01009 "flag"
10 "fmt"
11 "net"
12 "os"
Serge Bazanskicaa12082023-02-16 14:54:04 +010013 "strings"
14 "time"
15
16 "github.com/google/uuid"
17 "github.com/packethost/packngo"
18 "golang.org/x/crypto/ssh"
Serge Bazanskicaa12082023-02-16 14:54:04 +010019 "google.golang.org/protobuf/proto"
20 "k8s.io/klog/v2"
21
22 apb "source.monogon.dev/cloud/agent/api"
Serge Bazanskicaa12082023-02-16 14:54:04 +010023 "source.monogon.dev/cloud/bmaas/bmdb/model"
24 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
25)
26
Serge Bazanski86a714d2023-04-17 15:54:21 +020027// InitializerConfig configures how the Initializer will deploy Agents on
28// machines. In CLI scenarios, this should be populated from flags via
29// RegisterFlags.
30type InitializerConfig struct {
31 ControlLoopConfig
32
Serge Bazanskicaa12082023-02-16 14:54:04 +010033 // Executable is the contents of the agent binary created and run
34 // at the provisioned servers. Must be set.
35 Executable []byte
36
37 // TargetPath is a filesystem destination path used while uploading the BMaaS
38 // agent executable to hosts as part of the initialization process. Must be set.
39 TargetPath string
40
41 // Endpoint is the address Agent will use to contact the BMaaS
42 // infrastructure. Must be set.
43 Endpoint string
Serge Bazanski51987d62023-04-06 16:35:35 +020044 // EndpointCACertificate is an optional DER-encoded (but not PEM-armored) X509
45 // certificate used to populate the trusted CA store of the agent. It should be
46 // set to the CA certificate of the endpoint if not using a system-trusted CA
47 // certificate.
48 EndpointCACertificate []byte
Serge Bazanskicaa12082023-02-16 14:54:04 +010049
50 // SSHTimeout is the amount of time set aside for the initializing
51 // SSH session to run its course. Upon timeout, the iteration would be
52 // declared a failure. Must be set.
53 SSHConnectTimeout time.Duration
54 // SSHExecTimeout is the amount of time set aside for executing the agent and
55 // getting its output once the SSH connection has been established. Upon timeout,
56 // the iteration would be declared as failure. Must be set.
57 SSHExecTimeout time.Duration
58}
59
Serge Bazanski86a714d2023-04-17 15:54:21 +020060func (a *InitializerConfig) RegisterFlags() {
61 a.ControlLoopConfig.RegisterFlags("initializer")
62
Serge Bazanskicaa12082023-02-16 14:54:04 +010063 flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
64 if val == "" {
65 return nil
66 }
67 data, err := os.ReadFile(val)
68 if err != nil {
Serge Bazanski77b11d32023-04-06 14:43:19 +020069 return fmt.Errorf("could not read: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +010070 }
71 a.Executable = data
72 return nil
73 })
74 flag.StringVar(&a.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
75 flag.StringVar(&a.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
Serge Bazanski51987d62023-04-06 16:35:35 +020076 flag.Func("agent_endpoint_ca_certificate_path", "Path to PEM X509 CA certificate that the agent endpoint is serving with. If not set, the agent will attempt to use system CA certificates to authenticate the endpoint.", func(val string) error {
77 if val == "" {
78 return nil
79 }
80 data, err := os.ReadFile(val)
81 if err != nil {
82 return fmt.Errorf("could not read: %w", err)
83 }
84 block, _ := pem.Decode(data)
85 if block.Type != "CERTIFICATE" {
86 return fmt.Errorf("not a certificate")
87 }
88 _, err = x509.ParseCertificate(block.Bytes)
89 if err != nil {
90 return fmt.Errorf("invalid certificate: %w", err)
91 }
92 a.EndpointCACertificate = block.Bytes
93 return nil
94 })
Serge Bazanskicaa12082023-02-16 14:54:04 +010095 flag.DurationVar(&a.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
96 flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
97}
98
Serge Bazanski86a714d2023-04-17 15:54:21 +020099// The Initializer starts the agent on machines that aren't yet running it.
Serge Bazanskicaa12082023-02-16 14:54:04 +0100100type Initializer struct {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200101 InitializerConfig
102
Serge Bazanskicaa12082023-02-16 14:54:04 +0100103 sharedConfig *SharedConfig
Serge Bazanski86a714d2023-04-17 15:54:21 +0200104 cl ecl.Client
105 signer ssh.Signer
Serge Bazanskicaa12082023-02-16 14:54:04 +0100106 sshClient SSHClient
Serge Bazanskicaa12082023-02-16 14:54:04 +0100107}
108
Serge Bazanski86a714d2023-04-17 15:54:21 +0200109// NewInitializer creates an Initializer instance, checking the
110// InitializerConfig, SharedConfig and AgentConfig for errors.
111func NewInitializer(cl ecl.Client, ic InitializerConfig, sc *SharedConfig) (*Initializer, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100112 if err := sc.check(); err != nil {
113 return nil, err
114 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200115 if err := ic.ControlLoopConfig.Check(); err != nil {
116 return nil, err
117 }
118
119 if len(ic.Executable) == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100120 return nil, fmt.Errorf("agent executable not configured")
121 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200122 if ic.TargetPath == "" {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100123 return nil, fmt.Errorf("agent target path must be set")
124 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200125 if ic.Endpoint == "" {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100126 return nil, fmt.Errorf("agent endpoint must be set")
127 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200128 if ic.SSHConnectTimeout == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100129 return nil, fmt.Errorf("agent SSH connection timeout must be set")
130 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200131 if ic.SSHExecTimeout == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100132 return nil, fmt.Errorf("agent SSH execution timeout must be set")
133 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200134
135 signer, err := sc.sshSigner()
136 if err != nil {
137 return nil, fmt.Errorf("could not initialize signer: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100138 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200139
Serge Bazanskicaa12082023-02-16 14:54:04 +0100140 return &Initializer{
Serge Bazanski86a714d2023-04-17 15:54:21 +0200141 InitializerConfig: ic,
142
Serge Bazanskicaa12082023-02-16 14:54:04 +0100143 sharedConfig: sc,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100144 cl: cl,
Serge Bazanski86a714d2023-04-17 15:54:21 +0200145 sshClient: &PlainSSHClient{},
146 signer: signer,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100147 }, nil
148}
149
Serge Bazanski86a714d2023-04-17 15:54:21 +0200150func (c *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
151 return q.GetMachinesForAgentStart(ctx, limit)
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100152}
153
Serge Bazanski86a714d2023-04-17 15:54:21 +0200154func (c *Initializer) processMachine(ctx context.Context, t *task) error {
155 dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.machine.ProviderID)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100156 if err != nil {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200157 klog.Errorf("failed to fetch device %q: %v", t.machine.ProviderID, err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100158 d := 30 * time.Second
159 err = t.work.Fail(ctx, &d, "failed to fetch device from equinix")
160 return err
161 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100162
Serge Bazanski86a714d2023-04-17 15:54:21 +0200163 // Start the agent.
164 klog.Infof("Starting agent on device (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
165 apk, err := c.startAgent(ctx, c.signer, dev, t.machine.MachineID)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100166 if err != nil {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200167 return fmt.Errorf("while starting the agent: %w", err)
168 }
169
170 // Agent startup succeeded. Set the appropriate BMDB tag, and release the
171 // lock.
172 klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.machine.MachineID, t.machine.ProviderID, hex.EncodeToString(apk))
173 err = t.work.Finish(ctx, func(q *model.Queries) error {
174 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
175 MachineID: t.machine.MachineID,
176 AgentStartedAt: time.Now(),
177 AgentPublicKey: apk,
178 })
179 })
180 if err != nil {
181 return fmt.Errorf("while setting AgentStarted tag: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100182 }
183 return nil
184}
185
186// startAgent runs the agent executable on the target device d, returning the
187// agent's public key on success.
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100188func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d *packngo.Device, mid uuid.UUID) ([]byte, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100189 // Provide a bound on execution time in case we get stuck after the SSH
190 // connection is established.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200191 sctx, sctxC := context.WithTimeout(ctx, i.SSHExecTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100192 defer sctxC()
193
194 // Use the device's IP address exposed by Equinix API.
195 ni := d.GetNetworkInfo()
196 var addr string
197 if ni.PublicIPv4 != "" {
198 addr = net.JoinHostPort(ni.PublicIPv4, "22")
199 } else if ni.PublicIPv6 != "" {
200 addr = net.JoinHostPort(ni.PublicIPv6, "22")
201 } else {
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100202 return nil, fmt.Errorf("device (machine ID: %s) has no available addresses", mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100203 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100204 klog.V(1).Infof("Dialing device (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100205
Serge Bazanski86a714d2023-04-17 15:54:21 +0200206 conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.SSHConnectTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100207 if err != nil {
208 return nil, fmt.Errorf("while dialing the device: %w", err)
209 }
210 defer conn.Close()
211
212 // Upload the agent executable.
213
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100214 klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanski86a714d2023-04-17 15:54:21 +0200215 if err := conn.Upload(sctx, i.TargetPath, i.Executable); err != nil {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100216 return nil, fmt.Errorf("while uploading agent executable: %w", err)
217 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100218 klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100219
220 // The initialization protobuf message will be sent to the agent on its
221 // standard input.
222 imsg := apb.TakeoverInit{
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100223 MachineId: mid.String(),
Serge Bazanski86a714d2023-04-17 15:54:21 +0200224 BmaasEndpoint: i.Endpoint,
225 CaCertificate: i.EndpointCACertificate,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100226 }
227 imsgb, err := proto.Marshal(&imsg)
228 if err != nil {
229 return nil, fmt.Errorf("while marshaling agent message: %w", err)
230 }
231
232 // Start the agent and wait for the agent's output to arrive.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200233 klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.TargetPath, mid)
234 stdout, stderr, err := conn.Execute(ctx, i.TargetPath, imsgb)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100235 stderrStr := strings.TrimSpace(string(stderr))
236 if stderrStr != "" {
237 klog.Warningf("Agent stderr: %q", stderrStr)
238 }
239 if err != nil {
240 return nil, fmt.Errorf("while starting the agent executable: %w", err)
241 }
242
243 var arsp apb.TakeoverResponse
244 if err := proto.Unmarshal(stdout, &arsp); err != nil {
245 return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
246 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100247 var successResp *apb.TakeoverSuccess
248 switch r := arsp.Result.(type) {
249 case *apb.TakeoverResponse_Error:
250 return nil, fmt.Errorf("agent returned error: %v", r.Error.Message)
251 case *apb.TakeoverResponse_Success:
252 successResp = r.Success
253 default:
254 return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result)
255 }
256 if !proto.Equal(&imsg, successResp.InitMessage) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100257 return nil, fmt.Errorf("agent did not send back the init message.")
258 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100259 if len(successResp.Key) != ed25519.PublicKeySize {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100260 return nil, fmt.Errorf("agent key length mismatch.")
261 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100262 klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
Lorenz Brun595dfe92023-02-21 19:13:02 +0100263 return successResp.Key, nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100264}