blob: 2f1721bdd8d932d513c6d8a6e52f9326f06660c9 [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "crypto/ed25519"
Serge Bazanski51987d62023-04-06 16:35:35 +02006 "crypto/x509"
Serge Bazanskicaa12082023-02-16 14:54:04 +01007 "encoding/hex"
Serge Bazanski51987d62023-04-06 16:35:35 +02008 "encoding/pem"
Serge Bazanskicaa12082023-02-16 14:54:04 +01009 "flag"
10 "fmt"
11 "net"
12 "os"
Serge Bazanskicaa12082023-02-16 14:54:04 +010013 "strings"
14 "time"
15
16 "github.com/google/uuid"
17 "github.com/packethost/packngo"
18 "golang.org/x/crypto/ssh"
Serge Bazanskicaa12082023-02-16 14:54:04 +010019 "google.golang.org/protobuf/proto"
20 "k8s.io/klog/v2"
21
22 apb "source.monogon.dev/cloud/agent/api"
Serge Bazanski00cf57d2023-04-20 11:19:00 +020023 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanskic50f6942023-04-24 18:27:22 +020024 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
Serge Bazanskicaa12082023-02-16 14:54:04 +010025 "source.monogon.dev/cloud/bmaas/bmdb/model"
26 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
27)
28
Serge Bazanski86a714d2023-04-17 15:54:21 +020029// InitializerConfig configures how the Initializer will deploy Agents on
30// machines. In CLI scenarios, this should be populated from flags via
31// RegisterFlags.
32type InitializerConfig struct {
33 ControlLoopConfig
34
Serge Bazanskicaa12082023-02-16 14:54:04 +010035 // Executable is the contents of the agent binary created and run
36 // at the provisioned servers. Must be set.
37 Executable []byte
38
39 // TargetPath is a filesystem destination path used while uploading the BMaaS
40 // agent executable to hosts as part of the initialization process. Must be set.
41 TargetPath string
42
43 // Endpoint is the address Agent will use to contact the BMaaS
44 // infrastructure. Must be set.
45 Endpoint string
Serge Bazanski51987d62023-04-06 16:35:35 +020046 // EndpointCACertificate is an optional DER-encoded (but not PEM-armored) X509
47 // certificate used to populate the trusted CA store of the agent. It should be
48 // set to the CA certificate of the endpoint if not using a system-trusted CA
49 // certificate.
50 EndpointCACertificate []byte
Serge Bazanskicaa12082023-02-16 14:54:04 +010051
52 // SSHTimeout is the amount of time set aside for the initializing
53 // SSH session to run its course. Upon timeout, the iteration would be
54 // declared a failure. Must be set.
55 SSHConnectTimeout time.Duration
56 // SSHExecTimeout is the amount of time set aside for executing the agent and
57 // getting its output once the SSH connection has been established. Upon timeout,
58 // the iteration would be declared as failure. Must be set.
59 SSHExecTimeout time.Duration
60}
61
Serge Bazanski86a714d2023-04-17 15:54:21 +020062func (a *InitializerConfig) RegisterFlags() {
63 a.ControlLoopConfig.RegisterFlags("initializer")
64
Serge Bazanskicaa12082023-02-16 14:54:04 +010065 flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
66 if val == "" {
67 return nil
68 }
69 data, err := os.ReadFile(val)
70 if err != nil {
Serge Bazanski77b11d32023-04-06 14:43:19 +020071 return fmt.Errorf("could not read: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +010072 }
73 a.Executable = data
74 return nil
75 })
76 flag.StringVar(&a.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
77 flag.StringVar(&a.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
Serge Bazanski51987d62023-04-06 16:35:35 +020078 flag.Func("agent_endpoint_ca_certificate_path", "Path to PEM X509 CA certificate that the agent endpoint is serving with. If not set, the agent will attempt to use system CA certificates to authenticate the endpoint.", func(val string) error {
79 if val == "" {
80 return nil
81 }
82 data, err := os.ReadFile(val)
83 if err != nil {
84 return fmt.Errorf("could not read: %w", err)
85 }
86 block, _ := pem.Decode(data)
87 if block.Type != "CERTIFICATE" {
88 return fmt.Errorf("not a certificate")
89 }
90 _, err = x509.ParseCertificate(block.Bytes)
91 if err != nil {
92 return fmt.Errorf("invalid certificate: %w", err)
93 }
94 a.EndpointCACertificate = block.Bytes
95 return nil
96 })
Serge Bazanskicaa12082023-02-16 14:54:04 +010097 flag.DurationVar(&a.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
98 flag.DurationVar(&a.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
99}
100
Serge Bazanski86a714d2023-04-17 15:54:21 +0200101// The Initializer starts the agent on machines that aren't yet running it.
Serge Bazanskicaa12082023-02-16 14:54:04 +0100102type Initializer struct {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200103 InitializerConfig
104
Serge Bazanskicaa12082023-02-16 14:54:04 +0100105 sharedConfig *SharedConfig
Serge Bazanski86a714d2023-04-17 15:54:21 +0200106 cl ecl.Client
107 signer ssh.Signer
Serge Bazanskicaa12082023-02-16 14:54:04 +0100108 sshClient SSHClient
Serge Bazanskicaa12082023-02-16 14:54:04 +0100109}
110
Serge Bazanski86a714d2023-04-17 15:54:21 +0200111// NewInitializer creates an Initializer instance, checking the
112// InitializerConfig, SharedConfig and AgentConfig for errors.
113func NewInitializer(cl ecl.Client, ic InitializerConfig, sc *SharedConfig) (*Initializer, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100114 if err := sc.check(); err != nil {
115 return nil, err
116 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200117 if err := ic.ControlLoopConfig.Check(); err != nil {
118 return nil, err
119 }
120
121 if len(ic.Executable) == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100122 return nil, fmt.Errorf("agent executable not configured")
123 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200124 if ic.TargetPath == "" {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100125 return nil, fmt.Errorf("agent target path must be set")
126 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200127 if ic.Endpoint == "" {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100128 return nil, fmt.Errorf("agent endpoint must be set")
129 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200130 if ic.SSHConnectTimeout == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100131 return nil, fmt.Errorf("agent SSH connection timeout must be set")
132 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200133 if ic.SSHExecTimeout == 0 {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100134 return nil, fmt.Errorf("agent SSH execution timeout must be set")
135 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200136
137 signer, err := sc.sshSigner()
138 if err != nil {
139 return nil, fmt.Errorf("could not initialize signer: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100140 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200141
Serge Bazanskicaa12082023-02-16 14:54:04 +0100142 return &Initializer{
Serge Bazanski86a714d2023-04-17 15:54:21 +0200143 InitializerConfig: ic,
144
Serge Bazanskicaa12082023-02-16 14:54:04 +0100145 sharedConfig: sc,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100146 cl: cl,
Serge Bazanski86a714d2023-04-17 15:54:21 +0200147 sshClient: &PlainSSHClient{},
148 signer: signer,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100149 }, nil
150}
151
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200152func (c *Initializer) getProcessInfo() processInfo {
153 return processInfo{
154 process: model.ProcessShepherdAgentStart,
155 defaultBackoff: bmdb.Backoff{
156 Initial: 5 * time.Minute,
157 Maximum: 4 * time.Hour,
158 Exponent: 1.2,
159 },
Serge Bazanskic50f6942023-04-24 18:27:22 +0200160 processor: metrics.ProcessorShepherdInitializer,
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200161 }
162}
163
Serge Bazanski86a714d2023-04-17 15:54:21 +0200164func (c *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
165 return q.GetMachinesForAgentStart(ctx, limit)
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100166}
167
Serge Bazanski86a714d2023-04-17 15:54:21 +0200168func (c *Initializer) processMachine(ctx context.Context, t *task) error {
Serge Bazanski4969fd72023-04-19 17:43:12 +0200169 dev, err := c.cl.GetDevice(ctx, c.sharedConfig.ProjectId, t.machine.ProviderID, nil)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100170 if err != nil {
Serge Bazanski20312b42023-04-19 13:49:47 +0200171 return fmt.Errorf("while fetching device %q: %v", t.machine.ProviderID, err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100172 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100173
Serge Bazanski86a714d2023-04-17 15:54:21 +0200174 // Start the agent.
175 klog.Infof("Starting agent on device (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
176 apk, err := c.startAgent(ctx, c.signer, dev, t.machine.MachineID)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100177 if err != nil {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200178 return fmt.Errorf("while starting the agent: %w", err)
179 }
180
181 // Agent startup succeeded. Set the appropriate BMDB tag, and release the
182 // lock.
183 klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.machine.MachineID, t.machine.ProviderID, hex.EncodeToString(apk))
184 err = t.work.Finish(ctx, func(q *model.Queries) error {
185 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
186 MachineID: t.machine.MachineID,
187 AgentStartedAt: time.Now(),
188 AgentPublicKey: apk,
189 })
190 })
191 if err != nil {
192 return fmt.Errorf("while setting AgentStarted tag: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100193 }
194 return nil
195}
196
197// startAgent runs the agent executable on the target device d, returning the
198// agent's public key on success.
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100199func (i *Initializer) startAgent(ctx context.Context, sgn ssh.Signer, d *packngo.Device, mid uuid.UUID) ([]byte, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100200 // Provide a bound on execution time in case we get stuck after the SSH
201 // connection is established.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200202 sctx, sctxC := context.WithTimeout(ctx, i.SSHExecTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100203 defer sctxC()
204
205 // Use the device's IP address exposed by Equinix API.
206 ni := d.GetNetworkInfo()
207 var addr string
208 if ni.PublicIPv4 != "" {
209 addr = net.JoinHostPort(ni.PublicIPv4, "22")
210 } else if ni.PublicIPv6 != "" {
211 addr = net.JoinHostPort(ni.PublicIPv6, "22")
212 } else {
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100213 return nil, fmt.Errorf("device (machine ID: %s) has no available addresses", mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100214 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100215 klog.V(1).Infof("Dialing device (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100216
Serge Bazanski86a714d2023-04-17 15:54:21 +0200217 conn, err := i.sshClient.Dial(sctx, addr, "root", sgn, i.SSHConnectTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100218 if err != nil {
219 return nil, fmt.Errorf("while dialing the device: %w", err)
220 }
221 defer conn.Close()
222
223 // Upload the agent executable.
224
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100225 klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanski86a714d2023-04-17 15:54:21 +0200226 if err := conn.Upload(sctx, i.TargetPath, i.Executable); err != nil {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100227 return nil, fmt.Errorf("while uploading agent executable: %w", err)
228 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100229 klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100230
231 // The initialization protobuf message will be sent to the agent on its
232 // standard input.
233 imsg := apb.TakeoverInit{
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100234 MachineId: mid.String(),
Serge Bazanski86a714d2023-04-17 15:54:21 +0200235 BmaasEndpoint: i.Endpoint,
236 CaCertificate: i.EndpointCACertificate,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100237 }
238 imsgb, err := proto.Marshal(&imsg)
239 if err != nil {
240 return nil, fmt.Errorf("while marshaling agent message: %w", err)
241 }
242
243 // Start the agent and wait for the agent's output to arrive.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200244 klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.TargetPath, mid)
245 stdout, stderr, err := conn.Execute(ctx, i.TargetPath, imsgb)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100246 stderrStr := strings.TrimSpace(string(stderr))
247 if stderrStr != "" {
248 klog.Warningf("Agent stderr: %q", stderrStr)
249 }
250 if err != nil {
251 return nil, fmt.Errorf("while starting the agent executable: %w", err)
252 }
253
254 var arsp apb.TakeoverResponse
255 if err := proto.Unmarshal(stdout, &arsp); err != nil {
256 return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
257 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100258 var successResp *apb.TakeoverSuccess
259 switch r := arsp.Result.(type) {
260 case *apb.TakeoverResponse_Error:
261 return nil, fmt.Errorf("agent returned error: %v", r.Error.Message)
262 case *apb.TakeoverResponse_Success:
263 successResp = r.Success
264 default:
265 return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result)
266 }
267 if !proto.Equal(&imsg, successResp.InitMessage) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100268 return nil, fmt.Errorf("agent did not send back the init message.")
269 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100270 if len(successResp.Key) != ed25519.PublicKeySize {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100271 return nil, fmt.Errorf("agent key length mismatch.")
272 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100273 klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
Lorenz Brun595dfe92023-02-21 19:13:02 +0100274 return successResp.Key, nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100275}