blob: bd1dbba5452b00fe71e2ee7ea09900decc15100a [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "crypto/ed25519"
Serge Bazanski51987d62023-04-06 16:35:35 +02006 "crypto/x509"
Serge Bazanskicaa12082023-02-16 14:54:04 +01007 "encoding/hex"
Serge Bazanski51987d62023-04-06 16:35:35 +02008 "encoding/pem"
Serge Bazanskicaa12082023-02-16 14:54:04 +01009 "flag"
10 "fmt"
11 "net"
12 "os"
Serge Bazanskicaa12082023-02-16 14:54:04 +010013 "strings"
14 "time"
15
16 "github.com/google/uuid"
17 "github.com/packethost/packngo"
18 "golang.org/x/crypto/ssh"
Serge Bazanskicaa12082023-02-16 14:54:04 +010019 "google.golang.org/protobuf/proto"
20 "k8s.io/klog/v2"
21
22 apb "source.monogon.dev/cloud/agent/api"
Serge Bazanskicaa12082023-02-16 14:54:04 +010023 "source.monogon.dev/cloud/bmaas/bmdb/model"
24 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
25)
26
Serge Bazanski86a714d2023-04-17 15:54:21 +020027// InitializerConfig configures how the Initializer will deploy Agents on
28// machines. In CLI scenarios, this should be populated from flags via
29// RegisterFlags.
30type InitializerConfig struct {
31 ControlLoopConfig
32
Serge Bazanskicaa12082023-02-16 14:54:04 +010033 // Executable is the contents of the agent binary created and run
34 // at the provisioned servers. Must be set.
35 Executable []byte
36
37 // TargetPath is a filesystem destination path used while uploading the BMaaS
38 // agent executable to hosts as part of the initialization process. Must be set.
39 TargetPath string
40
41 // Endpoint is the address Agent will use to contact the BMaaS
42 // infrastructure. Must be set.
43 Endpoint string
Serge Bazanski51987d62023-04-06 16:35:35 +020044 // EndpointCACertificate is an optional DER-encoded (but not PEM-armored) X509
45 // certificate used to populate the trusted CA store of the agent. It should be
46 // set to the CA certificate of the endpoint if not using a system-trusted CA
47 // certificate.
48 EndpointCACertificate []byte
Serge Bazanskicaa12082023-02-16 14:54:04 +010049
50 // SSHTimeout is the amount of time set aside for the initializing
51 // SSH session to run its course. Upon timeout, the iteration would be
52 // declared a failure. Must be set.
53 SSHConnectTimeout time.Duration
54 // SSHExecTimeout is the amount of time set aside for executing the agent and
55 // getting its output once the SSH connection has been established. Upon timeout,
56 // the iteration would be declared as failure. Must be set.
57 SSHExecTimeout time.Duration
58}
59
Serge Bazanski86a714d2023-04-17 15:54:21 +020060func (a *InitializerConfig) RegisterFlags() {
61 a.ControlLoopConfig.RegisterFlags("initializer")
62
Serge Bazanskicaa12082023-02-16 14:54:04 +010063 flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
64 if val == "" {
65 return nil
66 }
67 data, err := os.ReadFile(val)
68 if err != nil {
Serge Bazanski77b11d32023-04-06 14:43:19 +020069 return fmt.Errorf("could not read: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +010070 }
71 a.Executable = data
72 return nil
73 })
74 flag.StringVar(&a.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
75 flag.StringVar(&a.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
Serge Bazanski51987d62023-04-06 16:35:35 +020076 flag.Func("agent_endpoint_ca_certificate_path", "Path to PEM X509 CA certificate that the agent endpoint is serving with. If not set, the agent will attempt to use system CA certificates to authenticate the endpoint.", func(val string) error {
77 if val == "" {
78 return nil
79 }
80 data, err := os.ReadFile(val)
81 if err != nil {
82 return fmt.Errorf("could not read: %w", err)
83 }
84 block, _ := pem.Decode(data)
85 if block.Type != "CERTIFICATE" {
86 return fmt.Errorf("not a certificate")
87 }
88 _, err = x509.ParseCertificate(block.Bytes)
89 if err != nil {
90 return fmt.Errorf("invalid certificate: %w", err)
91 }
92 a.EndpointCACertificate = block.Bytes
93 return nil
94 })
Serge Bazanskicaa12082023-02-16 14:54:04 +010095 flag.DurationVar(&a.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
96 flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
97}
98
Serge Bazanski86a714d2023-04-17 15:54:21 +020099// The Initializer starts the agent on machines that aren't yet running it.
Serge Bazanskicaa12082023-02-16 14:54:04 +0100100type Initializer struct {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200101 InitializerConfig
102
Serge Bazanskicaa12082023-02-16 14:54:04 +0100103 sharedConfig *SharedConfig
Serge Bazanski86a714d2023-04-17 15:54:21 +0200104 cl ecl.Client
105 signer ssh.Signer
Serge Bazanskicaa12082023-02-16 14:54:04 +0100106 sshClient SSHClient
Serge Bazanskicaa12082023-02-16 14:54:04 +0100107}
108
Serge Bazanski86a714d2023-04-17 15:54:21 +0200109// NewInitializer creates an Initializer instance, checking the
110// InitializerConfig, SharedConfig and AgentConfig for errors.
111func NewInitializer(cl ecl.Client, ic InitializerConfig, sc *SharedConfig) (*Initializer, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100112 if err := sc.check(); err != nil {
113 return nil, err
114 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200115 if err := ic.ControlLoopConfig.Check(); err != nil {
116 return nil, err
117 }
118
119 if len(ic.Executable) == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100120 return nil, fmt.Errorf("agent executable not configured")
121 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200122 if ic.TargetPath == "" {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100123 return nil, fmt.Errorf("agent target path must be set")
124 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200125 if ic.Endpoint == "" {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100126 return nil, fmt.Errorf("agent endpoint must be set")
127 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200128 if ic.SSHConnectTimeout == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100129 return nil, fmt.Errorf("agent SSH connection timeout must be set")
130 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200131 if ic.SSHExecTimeout == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100132 return nil, fmt.Errorf("agent SSH execution timeout must be set")
133 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200134
135 signer, err := sc.sshSigner()
136 if err != nil {
137 return nil, fmt.Errorf("could not initialize signer: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100138 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200139
Serge Bazanskicaa12082023-02-16 14:54:04 +0100140 return &Initializer{
Serge Bazanski86a714d2023-04-17 15:54:21 +0200141 InitializerConfig: ic,
142
Serge Bazanskicaa12082023-02-16 14:54:04 +0100143 sharedConfig: sc,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100144 cl: cl,
Serge Bazanski86a714d2023-04-17 15:54:21 +0200145 sshClient: &PlainSSHClient{},
146 signer: signer,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100147 }, nil
148}
149
Serge Bazanski86a714d2023-04-17 15:54:21 +0200150func (c *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
151 return q.GetMachinesForAgentStart(ctx, limit)
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100152}
153
Serge Bazanski86a714d2023-04-17 15:54:21 +0200154func (c *Initializer) processMachine(ctx context.Context, t *task) error {
155 dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.machine.ProviderID)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100156 if err != nil {
Serge Bazanski20312b42023-04-19 13:49:47 +0200157 return fmt.Errorf("while fetching device %q: %v", t.machine.ProviderID, err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100158 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100159
Serge Bazanski86a714d2023-04-17 15:54:21 +0200160 // Start the agent.
161 klog.Infof("Starting agent on device (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
162 apk, err := c.startAgent(ctx, c.signer, dev, t.machine.MachineID)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100163 if err != nil {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200164 return fmt.Errorf("while starting the agent: %w", err)
165 }
166
167 // Agent startup succeeded. Set the appropriate BMDB tag, and release the
168 // lock.
169 klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.machine.MachineID, t.machine.ProviderID, hex.EncodeToString(apk))
170 err = t.work.Finish(ctx, func(q *model.Queries) error {
171 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
172 MachineID: t.machine.MachineID,
173 AgentStartedAt: time.Now(),
174 AgentPublicKey: apk,
175 })
176 })
177 if err != nil {
178 return fmt.Errorf("while setting AgentStarted tag: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100179 }
180 return nil
181}
182
183// startAgent runs the agent executable on the target device d, returning the
184// agent's public key on success.
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100185func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d *packngo.Device, mid uuid.UUID) ([]byte, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100186 // Provide a bound on execution time in case we get stuck after the SSH
187 // connection is established.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200188 sctx, sctxC := context.WithTimeout(ctx, i.SSHExecTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100189 defer sctxC()
190
191 // Use the device's IP address exposed by Equinix API.
192 ni := d.GetNetworkInfo()
193 var addr string
194 if ni.PublicIPv4 != "" {
195 addr = net.JoinHostPort(ni.PublicIPv4, "22")
196 } else if ni.PublicIPv6 != "" {
197 addr = net.JoinHostPort(ni.PublicIPv6, "22")
198 } else {
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100199 return nil, fmt.Errorf("device (machine ID: %s) has no available addresses", mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100200 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100201 klog.V(1).Infof("Dialing device (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100202
Serge Bazanski86a714d2023-04-17 15:54:21 +0200203 conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.SSHConnectTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100204 if err != nil {
205 return nil, fmt.Errorf("while dialing the device: %w", err)
206 }
207 defer conn.Close()
208
209 // Upload the agent executable.
210
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100211 klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanski86a714d2023-04-17 15:54:21 +0200212 if err := conn.Upload(sctx, i.TargetPath, i.Executable); err != nil {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100213 return nil, fmt.Errorf("while uploading agent executable: %w", err)
214 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100215 klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100216
217 // The initialization protobuf message will be sent to the agent on its
218 // standard input.
219 imsg := apb.TakeoverInit{
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100220 MachineId: mid.String(),
Serge Bazanski86a714d2023-04-17 15:54:21 +0200221 BmaasEndpoint: i.Endpoint,
222 CaCertificate: i.EndpointCACertificate,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100223 }
224 imsgb, err := proto.Marshal(&imsg)
225 if err != nil {
226 return nil, fmt.Errorf("while marshaling agent message: %w", err)
227 }
228
229 // Start the agent and wait for the agent's output to arrive.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200230 klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.TargetPath, mid)
231 stdout, stderr, err := conn.Execute(ctx, i.TargetPath, imsgb)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100232 stderrStr := strings.TrimSpace(string(stderr))
233 if stderrStr != "" {
234 klog.Warningf("Agent stderr: %q", stderrStr)
235 }
236 if err != nil {
237 return nil, fmt.Errorf("while starting the agent executable: %w", err)
238 }
239
240 var arsp apb.TakeoverResponse
241 if err := proto.Unmarshal(stdout, &arsp); err != nil {
242 return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
243 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100244 var successResp *apb.TakeoverSuccess
245 switch r := arsp.Result.(type) {
246 case *apb.TakeoverResponse_Error:
247 return nil, fmt.Errorf("agent returned error: %v", r.Error.Message)
248 case *apb.TakeoverResponse_Success:
249 successResp = r.Success
250 default:
251 return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result)
252 }
253 if !proto.Equal(&imsg, successResp.InitMessage) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100254 return nil, fmt.Errorf("agent did not send back the init message.")
255 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100256 if len(successResp.Key) != ed25519.PublicKeySize {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100257 return nil, fmt.Errorf("agent key length mismatch.")
258 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100259 klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
Lorenz Brun595dfe92023-02-21 19:13:02 +0100260 return successResp.Key, nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100261}