blob: 272df2033387d04c95fc1df84efb47d28f473ca8 [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "crypto/ed25519"
Serge Bazanski51987d62023-04-06 16:35:35 +02006 "crypto/x509"
Serge Bazanskicaa12082023-02-16 14:54:04 +01007 "encoding/hex"
Serge Bazanski51987d62023-04-06 16:35:35 +02008 "encoding/pem"
Serge Bazanskicaa12082023-02-16 14:54:04 +01009 "flag"
10 "fmt"
11 "net"
12 "os"
Serge Bazanskicaa12082023-02-16 14:54:04 +010013 "strings"
14 "time"
15
16 "github.com/google/uuid"
17 "github.com/packethost/packngo"
18 "golang.org/x/crypto/ssh"
Serge Bazanskicaa12082023-02-16 14:54:04 +010019 "google.golang.org/protobuf/proto"
20 "k8s.io/klog/v2"
21
22 apb "source.monogon.dev/cloud/agent/api"
Tim Windelschmidt0e749612023-08-07 17:42:59 +000023
Serge Bazanski00cf57d2023-04-20 11:19:00 +020024 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanskic50f6942023-04-24 18:27:22 +020025 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
Serge Bazanskicaa12082023-02-16 14:54:04 +010026 "source.monogon.dev/cloud/bmaas/bmdb/model"
27 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
28)
29
Serge Bazanski86a714d2023-04-17 15:54:21 +020030// InitializerConfig configures how the Initializer will deploy Agents on
31// machines. In CLI scenarios, this should be populated from flags via
32// RegisterFlags.
33type InitializerConfig struct {
34 ControlLoopConfig
35
Serge Bazanskicaa12082023-02-16 14:54:04 +010036 // Executable is the contents of the agent binary created and run
37 // at the provisioned servers. Must be set.
38 Executable []byte
39
40 // TargetPath is a filesystem destination path used while uploading the BMaaS
41 // agent executable to hosts as part of the initialization process. Must be set.
42 TargetPath string
43
44 // Endpoint is the address Agent will use to contact the BMaaS
45 // infrastructure. Must be set.
46 Endpoint string
Serge Bazanski51987d62023-04-06 16:35:35 +020047 // EndpointCACertificate is an optional DER-encoded (but not PEM-armored) X509
48 // certificate used to populate the trusted CA store of the agent. It should be
49 // set to the CA certificate of the endpoint if not using a system-trusted CA
50 // certificate.
51 EndpointCACertificate []byte
Serge Bazanskicaa12082023-02-16 14:54:04 +010052
53 // SSHTimeout is the amount of time set aside for the initializing
54 // SSH session to run its course. Upon timeout, the iteration would be
55 // declared a failure. Must be set.
56 SSHConnectTimeout time.Duration
57 // SSHExecTimeout is the amount of time set aside for executing the agent and
58 // getting its output once the SSH connection has been established. Upon timeout,
59 // the iteration would be declared as failure. Must be set.
60 SSHExecTimeout time.Duration
61}
62
Serge Bazanski86a714d2023-04-17 15:54:21 +020063func (a *InitializerConfig) RegisterFlags() {
64 a.ControlLoopConfig.RegisterFlags("initializer")
65
Serge Bazanskicaa12082023-02-16 14:54:04 +010066 flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
67 if val == "" {
68 return nil
69 }
70 data, err := os.ReadFile(val)
71 if err != nil {
Serge Bazanski77b11d32023-04-06 14:43:19 +020072 return fmt.Errorf("could not read: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +010073 }
74 a.Executable = data
75 return nil
76 })
77 flag.StringVar(&a.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
78 flag.StringVar(&a.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
Serge Bazanski51987d62023-04-06 16:35:35 +020079 flag.Func("agent_endpoint_ca_certificate_path", "Path to PEM X509 CA certificate that the agent endpoint is serving with. If not set, the agent will attempt to use system CA certificates to authenticate the endpoint.", func(val string) error {
80 if val == "" {
81 return nil
82 }
83 data, err := os.ReadFile(val)
84 if err != nil {
85 return fmt.Errorf("could not read: %w", err)
86 }
87 block, _ := pem.Decode(data)
88 if block.Type != "CERTIFICATE" {
89 return fmt.Errorf("not a certificate")
90 }
91 _, err = x509.ParseCertificate(block.Bytes)
92 if err != nil {
93 return fmt.Errorf("invalid certificate: %w", err)
94 }
95 a.EndpointCACertificate = block.Bytes
96 return nil
97 })
Serge Bazanskicaa12082023-02-16 14:54:04 +010098 flag.DurationVar(&a.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
99 flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
100}
101
Serge Bazanski86a714d2023-04-17 15:54:21 +0200102// The Initializer starts the agent on machines that aren't yet running it.
Serge Bazanskicaa12082023-02-16 14:54:04 +0100103type Initializer struct {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200104 InitializerConfig
105
Serge Bazanskicaa12082023-02-16 14:54:04 +0100106 sharedConfig *SharedConfig
Serge Bazanski86a714d2023-04-17 15:54:21 +0200107 cl ecl.Client
108 signer ssh.Signer
Serge Bazanskicaa12082023-02-16 14:54:04 +0100109 sshClient SSHClient
Serge Bazanskicaa12082023-02-16 14:54:04 +0100110}
111
Serge Bazanski86a714d2023-04-17 15:54:21 +0200112// NewInitializer creates an Initializer instance, checking the
113// InitializerConfig, SharedConfig and AgentConfig for errors.
114func NewInitializer(cl ecl.Client, ic InitializerConfig, sc *SharedConfig) (*Initializer, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100115 if err := sc.check(); err != nil {
116 return nil, err
117 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200118 if err := ic.ControlLoopConfig.Check(); err != nil {
119 return nil, err
120 }
121
122 if len(ic.Executable) == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100123 return nil, fmt.Errorf("agent executable not configured")
124 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200125 if ic.TargetPath == "" {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100126 return nil, fmt.Errorf("agent target path must be set")
127 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200128 if ic.Endpoint == "" {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100129 return nil, fmt.Errorf("agent endpoint must be set")
130 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200131 if ic.SSHConnectTimeout == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100132 return nil, fmt.Errorf("agent SSH connection timeout must be set")
133 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200134 if ic.SSHExecTimeout == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100135 return nil, fmt.Errorf("agent SSH execution timeout must be set")
136 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200137
138 signer, err := sc.sshSigner()
139 if err != nil {
140 return nil, fmt.Errorf("could not initialize signer: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100141 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200142
Serge Bazanskicaa12082023-02-16 14:54:04 +0100143 return &Initializer{
Serge Bazanski86a714d2023-04-17 15:54:21 +0200144 InitializerConfig: ic,
145
Serge Bazanskicaa12082023-02-16 14:54:04 +0100146 sharedConfig: sc,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100147 cl: cl,
Serge Bazanski86a714d2023-04-17 15:54:21 +0200148 sshClient: &PlainSSHClient{},
149 signer: signer,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100150 }, nil
151}
152
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200153func (c *Initializer) getProcessInfo() processInfo {
154 return processInfo{
155 process: model.ProcessShepherdAgentStart,
156 defaultBackoff: bmdb.Backoff{
157 Initial: 5 * time.Minute,
158 Maximum: 4 * time.Hour,
159 Exponent: 1.2,
160 },
Serge Bazanskic50f6942023-04-24 18:27:22 +0200161 processor: metrics.ProcessorShepherdInitializer,
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200162 }
163}
164
Serge Bazanski86a714d2023-04-17 15:54:21 +0200165func (c *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000166 return q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
167 Limit: limit,
168 Provider: model.ProviderEquinix,
169 })
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100170}
171
Serge Bazanski86a714d2023-04-17 15:54:21 +0200172func (c *Initializer) processMachine(ctx context.Context, t *task) error {
Serge Bazanski4969fd72023-04-19 17:43:12 +0200173 dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.machine.ProviderID, nil)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100174 if err != nil {
Serge Bazanski20312b42023-04-19 13:49:47 +0200175 return fmt.Errorf("while fetching device %q: %v", t.machine.ProviderID, err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100176 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100177
Serge Bazanski86a714d2023-04-17 15:54:21 +0200178 // Start the agent.
179 klog.Infof("Starting agent on device (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
180 apk, err := c.startAgent(ctx, c.signer, dev, t.machine.MachineID)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100181 if err != nil {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200182 return fmt.Errorf("while starting the agent: %w", err)
183 }
184
185 // Agent startup succeeded. Set the appropriate BMDB tag, and release the
186 // lock.
187 klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.machine.MachineID, t.machine.ProviderID, hex.EncodeToString(apk))
188 err = t.work.Finish(ctx, func(q *model.Queries) error {
189 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
190 MachineID: t.machine.MachineID,
191 AgentStartedAt: time.Now(),
192 AgentPublicKey: apk,
193 })
194 })
195 if err != nil {
196 return fmt.Errorf("while setting AgentStarted tag: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100197 }
198 return nil
199}
200
201// startAgent runs the agent executable on the target device d, returning the
202// agent's public key on success.
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100203func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d *packngo.Device, mid uuid.UUID) ([]byte, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100204 // Provide a bound on execution time in case we get stuck after the SSH
205 // connection is established.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200206 sctx, sctxC := context.WithTimeout(ctx, i.SSHExecTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100207 defer sctxC()
208
209 // Use the device's IP address exposed by Equinix API.
210 ni := d.GetNetworkInfo()
211 var addr string
212 if ni.PublicIPv4 != "" {
213 addr = net.JoinHostPort(ni.PublicIPv4, "22")
214 } else if ni.PublicIPv6 != "" {
215 addr = net.JoinHostPort(ni.PublicIPv6, "22")
216 } else {
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100217 return nil, fmt.Errorf("device (machine ID: %s) has no available addresses", mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100218 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100219 klog.V(1).Infof("Dialing device (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100220
Serge Bazanski86a714d2023-04-17 15:54:21 +0200221 conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.SSHConnectTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100222 if err != nil {
223 return nil, fmt.Errorf("while dialing the device: %w", err)
224 }
225 defer conn.Close()
226
227 // Upload the agent executable.
228
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100229 klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanski86a714d2023-04-17 15:54:21 +0200230 if err := conn.Upload(sctx, i.TargetPath, i.Executable); err != nil {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100231 return nil, fmt.Errorf("while uploading agent executable: %w", err)
232 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100233 klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100234
235 // The initialization protobuf message will be sent to the agent on its
236 // standard input.
237 imsg := apb.TakeoverInit{
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100238 MachineId: mid.String(),
Serge Bazanski86a714d2023-04-17 15:54:21 +0200239 BmaasEndpoint: i.Endpoint,
240 CaCertificate: i.EndpointCACertificate,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100241 }
242 imsgb, err := proto.Marshal(&imsg)
243 if err != nil {
244 return nil, fmt.Errorf("while marshaling agent message: %w", err)
245 }
246
247 // Start the agent and wait for the agent's output to arrive.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200248 klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.TargetPath, mid)
249 stdout, stderr, err := conn.Execute(ctx, i.TargetPath, imsgb)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100250 stderrStr := strings.TrimSpace(string(stderr))
251 if stderrStr != "" {
252 klog.Warningf("Agent stderr: %q", stderrStr)
253 }
254 if err != nil {
255 return nil, fmt.Errorf("while starting the agent executable: %w", err)
256 }
257
258 var arsp apb.TakeoverResponse
259 if err := proto.Unmarshal(stdout, &arsp); err != nil {
260 return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
261 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100262 var successResp *apb.TakeoverSuccess
263 switch r := arsp.Result.(type) {
264 case *apb.TakeoverResponse_Error:
265 return nil, fmt.Errorf("agent returned error: %v", r.Error.Message)
266 case *apb.TakeoverResponse_Success:
267 successResp = r.Success
268 default:
269 return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result)
270 }
271 if !proto.Equal(&imsg, successResp.InitMessage) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100272 return nil, fmt.Errorf("agent did not send back the init message.")
273 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100274 if len(successResp.Key) != ed25519.PublicKeySize {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100275 return nil, fmt.Errorf("agent key length mismatch.")
276 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100277 klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
Lorenz Brun595dfe92023-02-21 19:13:02 +0100278 return successResp.Key, nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100279}