blob: a380a8f7df90623dee941ceb402f3e04226b782e [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
Tim Windelschmidt5f5f3302024-02-22 23:50:24 +01004 "bytes"
Serge Bazanskicaa12082023-02-16 14:54:04 +01005 "context"
6 "crypto/ed25519"
Serge Bazanski51987d62023-04-06 16:35:35 +02007 "crypto/x509"
Serge Bazanskicaa12082023-02-16 14:54:04 +01008 "encoding/hex"
Serge Bazanski51987d62023-04-06 16:35:35 +02009 "encoding/pem"
Serge Bazanskicaa12082023-02-16 14:54:04 +010010 "flag"
11 "fmt"
12 "net"
13 "os"
Serge Bazanskicaa12082023-02-16 14:54:04 +010014 "strings"
15 "time"
16
17 "github.com/google/uuid"
Serge Bazanskicaa12082023-02-16 14:54:04 +010018 "google.golang.org/protobuf/proto"
19 "k8s.io/klog/v2"
20
21 apb "source.monogon.dev/cloud/agent/api"
Tim Windelschmidt0e749612023-08-07 17:42:59 +000022
Serge Bazanski00cf57d2023-04-20 11:19:00 +020023 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanskic50f6942023-04-24 18:27:22 +020024 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
Serge Bazanskicaa12082023-02-16 14:54:04 +010025 "source.monogon.dev/cloud/bmaas/bmdb/model"
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020026 "source.monogon.dev/cloud/shepherd"
Tim Windelschmidt5f5f3302024-02-22 23:50:24 +010027 "source.monogon.dev/go/net/ssh"
Serge Bazanskicaa12082023-02-16 14:54:04 +010028)
29
Serge Bazanski86a714d2023-04-17 15:54:21 +020030// InitializerConfig configures how the Initializer will deploy Agents on
31// machines. In CLI scenarios, this should be populated from flags via
32// RegisterFlags.
33type InitializerConfig struct {
34 ControlLoopConfig
35
Serge Bazanskicaa12082023-02-16 14:54:04 +010036 // Executable is the contents of the agent binary created and run
37 // at the provisioned servers. Must be set.
38 Executable []byte
39
40 // TargetPath is a filesystem destination path used while uploading the BMaaS
41 // agent executable to hosts as part of the initialization process. Must be set.
42 TargetPath string
43
44 // Endpoint is the address Agent will use to contact the BMaaS
45 // infrastructure. Must be set.
46 Endpoint string
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020047
Serge Bazanski51987d62023-04-06 16:35:35 +020048 // EndpointCACertificate is an optional DER-encoded (but not PEM-armored) X509
49 // certificate used to populate the trusted CA store of the agent. It should be
50 // set to the CA certificate of the endpoint if not using a system-trusted CA
51 // certificate.
52 EndpointCACertificate []byte
Serge Bazanskicaa12082023-02-16 14:54:04 +010053
54 // SSHTimeout is the amount of time set aside for the initializing
55 // SSH session to run its course. Upon timeout, the iteration would be
56 // declared a failure. Must be set.
57 SSHConnectTimeout time.Duration
58 // SSHExecTimeout is the amount of time set aside for executing the agent and
59 // getting its output once the SSH connection has been established. Upon timeout,
60 // the iteration would be declared as failure. Must be set.
61 SSHExecTimeout time.Duration
62}
63
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020064func (ic *InitializerConfig) RegisterFlags() {
65 ic.ControlLoopConfig.RegisterFlags("initializer")
Serge Bazanski86a714d2023-04-17 15:54:21 +020066
Serge Bazanskicaa12082023-02-16 14:54:04 +010067 flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
68 if val == "" {
69 return nil
70 }
71 data, err := os.ReadFile(val)
72 if err != nil {
Serge Bazanski77b11d32023-04-06 14:43:19 +020073 return fmt.Errorf("could not read: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +010074 }
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020075 ic.Executable = data
Serge Bazanskicaa12082023-02-16 14:54:04 +010076 return nil
77 })
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020078 flag.StringVar(&ic.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
79 flag.StringVar(&ic.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
Serge Bazanski51987d62023-04-06 16:35:35 +020080 flag.Func("agent_endpoint_ca_certificate_path", "Path to PEM X509 CA certificate that the agent endpoint is serving with. If not set, the agent will attempt to use system CA certificates to authenticate the endpoint.", func(val string) error {
81 if val == "" {
82 return nil
83 }
84 data, err := os.ReadFile(val)
85 if err != nil {
86 return fmt.Errorf("could not read: %w", err)
87 }
88 block, _ := pem.Decode(data)
89 if block.Type != "CERTIFICATE" {
90 return fmt.Errorf("not a certificate")
91 }
92 _, err = x509.ParseCertificate(block.Bytes)
93 if err != nil {
94 return fmt.Errorf("invalid certificate: %w", err)
95 }
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020096 ic.EndpointCACertificate = block.Bytes
Serge Bazanski51987d62023-04-06 16:35:35 +020097 return nil
98 })
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020099 flag.DurationVar(&ic.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
100 flag.DurationVar(&ic.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
101}
102
103func (ic *InitializerConfig) Check() error {
104 if err := ic.ControlLoopConfig.Check(); err != nil {
105 return err
106 }
107
108 if len(ic.Executable) == 0 {
109 return fmt.Errorf("agent executable not configured")
110 }
111 if ic.TargetPath == "" {
112 return fmt.Errorf("agent target path must be set")
113 }
114 if ic.Endpoint == "" {
115 return fmt.Errorf("agent endpoint must be set")
116 }
117 if ic.SSHConnectTimeout == 0 {
118 return fmt.Errorf("agent SSH connection timeout must be set")
119 }
120 if ic.SSHExecTimeout == 0 {
121 return fmt.Errorf("agent SSH execution timeout must be set")
122 }
123
124 return nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100125}
126
Serge Bazanski86a714d2023-04-17 15:54:21 +0200127// The Initializer starts the agent on machines that aren't yet running it.
Serge Bazanskicaa12082023-02-16 14:54:04 +0100128type Initializer struct {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200129 InitializerConfig
130
Tim Windelschmidt5f5f3302024-02-22 23:50:24 +0100131 sshClient ssh.Client
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200132 p shepherd.Provider
Serge Bazanskicaa12082023-02-16 14:54:04 +0100133}
134
Serge Bazanski86a714d2023-04-17 15:54:21 +0200135// NewInitializer creates an Initializer instance, checking the
136// InitializerConfig, SharedConfig and AgentConfig for errors.
Tim Windelschmidt5f5f3302024-02-22 23:50:24 +0100137func NewInitializer(p shepherd.Provider, sshClient ssh.Client, ic InitializerConfig) (*Initializer, error) {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200138 if err := ic.Check(); err != nil {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100139 return nil, err
140 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200141
Serge Bazanskicaa12082023-02-16 14:54:04 +0100142 return &Initializer{
Serge Bazanski86a714d2023-04-17 15:54:21 +0200143 InitializerConfig: ic,
144
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200145 p: p,
146 sshClient: sshClient,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100147 }, nil
148}
149
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200150func (i *Initializer) getProcessInfo() processInfo {
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200151 return processInfo{
152 process: model.ProcessShepherdAgentStart,
153 defaultBackoff: bmdb.Backoff{
154 Initial: 5 * time.Minute,
155 Maximum: 4 * time.Hour,
156 Exponent: 1.2,
157 },
Serge Bazanskic50f6942023-04-24 18:27:22 +0200158 processor: metrics.ProcessorShepherdInitializer,
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200159 }
160}
161
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200162func (i *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000163 return q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
164 Limit: limit,
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200165 Provider: i.p.Type(),
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000166 })
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100167}
168
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200169func (i *Initializer) processMachine(ctx context.Context, t *task) error {
170 machine, err := i.p.GetMachine(ctx, shepherd.ProviderID(t.machine.ProviderID))
Serge Bazanskicaa12082023-02-16 14:54:04 +0100171 if err != nil {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200172 return fmt.Errorf("while fetching machine %q: %v", t.machine.ProviderID, err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100173 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100174
Serge Bazanski86a714d2023-04-17 15:54:21 +0200175 // Start the agent.
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200176 klog.Infof("Starting agent on machine (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
177 apk, err := i.startAgent(ctx, machine, t.machine.MachineID)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100178 if err != nil {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200179 return fmt.Errorf("while starting the agent: %w", err)
180 }
181
182 // Agent startup succeeded. Set the appropriate BMDB tag, and release the
183 // lock.
184 klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.machine.MachineID, t.machine.ProviderID, hex.EncodeToString(apk))
185 err = t.work.Finish(ctx, func(q *model.Queries) error {
186 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
187 MachineID: t.machine.MachineID,
188 AgentStartedAt: time.Now(),
189 AgentPublicKey: apk,
190 })
191 })
192 if err != nil {
193 return fmt.Errorf("while setting AgentStarted tag: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100194 }
195 return nil
196}
197
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200198// startAgent runs the agent executable on the target machine m, returning the
Serge Bazanskicaa12082023-02-16 14:54:04 +0100199// agent's public key on success.
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200200func (i *Initializer) startAgent(ctx context.Context, m shepherd.Machine, mid uuid.UUID) ([]byte, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100201 // Provide a bound on execution time in case we get stuck after the SSH
202 // connection is established.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200203 sctx, sctxC := context.WithTimeout(ctx, i.SSHExecTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100204 defer sctxC()
205
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200206 // Use the machine's IP address
207 ni := m.Addr()
208 if !ni.IsValid() {
209 return nil, fmt.Errorf("machine (machine ID: %s) has no available addresses", mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100210 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100211
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200212 addr := net.JoinHostPort(ni.String(), "22")
213 klog.V(1).Infof("Dialing machine (machine ID: %s, addr: %s).", mid, addr)
214
215 conn, err := i.sshClient.Dial(sctx, addr, i.SSHConnectTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100216 if err != nil {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200217 return nil, fmt.Errorf("while dialing the machine: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100218 }
219 defer conn.Close()
220
221 // Upload the agent executable.
222
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100223 klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
Tim Windelschmidt5f5f3302024-02-22 23:50:24 +0100224 if err := conn.Upload(sctx, i.TargetPath, bytes.NewReader(i.Executable)); err != nil {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100225 return nil, fmt.Errorf("while uploading agent executable: %w", err)
226 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100227 klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100228
229 // The initialization protobuf message will be sent to the agent on its
230 // standard input.
231 imsg := apb.TakeoverInit{
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100232 MachineId: mid.String(),
Serge Bazanski86a714d2023-04-17 15:54:21 +0200233 BmaasEndpoint: i.Endpoint,
234 CaCertificate: i.EndpointCACertificate,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100235 }
236 imsgb, err := proto.Marshal(&imsg)
237 if err != nil {
238 return nil, fmt.Errorf("while marshaling agent message: %w", err)
239 }
240
241 // Start the agent and wait for the agent's output to arrive.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200242 klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.TargetPath, mid)
243 stdout, stderr, err := conn.Execute(ctx, i.TargetPath, imsgb)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100244 stderrStr := strings.TrimSpace(string(stderr))
245 if stderrStr != "" {
246 klog.Warningf("Agent stderr: %q", stderrStr)
247 }
248 if err != nil {
249 return nil, fmt.Errorf("while starting the agent executable: %w", err)
250 }
251
252 var arsp apb.TakeoverResponse
253 if err := proto.Unmarshal(stdout, &arsp); err != nil {
254 return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
255 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100256 var successResp *apb.TakeoverSuccess
257 switch r := arsp.Result.(type) {
258 case *apb.TakeoverResponse_Error:
259 return nil, fmt.Errorf("agent returned error: %v", r.Error.Message)
260 case *apb.TakeoverResponse_Success:
261 successResp = r.Success
262 default:
263 return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result)
264 }
265 if !proto.Equal(&imsg, successResp.InitMessage) {
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200266 return nil, fmt.Errorf("agent did not send back the init message")
Serge Bazanskicaa12082023-02-16 14:54:04 +0100267 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100268 if len(successResp.Key) != ed25519.PublicKeySize {
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200269 return nil, fmt.Errorf("agent key length mismatch")
Serge Bazanskicaa12082023-02-16 14:54:04 +0100270 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100271 klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
Lorenz Brun595dfe92023-02-21 19:13:02 +0100272 return successResp.Key, nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100273}