blob: 5abbc687e8ff5c9de187201c18eee508da17f771 [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "crypto/ed25519"
Serge Bazanski51987d62023-04-06 16:35:35 +02006 "crypto/x509"
Serge Bazanskicaa12082023-02-16 14:54:04 +01007 "encoding/hex"
Serge Bazanski51987d62023-04-06 16:35:35 +02008 "encoding/pem"
Serge Bazanskicaa12082023-02-16 14:54:04 +01009 "flag"
10 "fmt"
11 "net"
12 "os"
Serge Bazanskicaa12082023-02-16 14:54:04 +010013 "strings"
14 "time"
15
16 "github.com/google/uuid"
Serge Bazanskicaa12082023-02-16 14:54:04 +010017 "google.golang.org/protobuf/proto"
18 "k8s.io/klog/v2"
19
20 apb "source.monogon.dev/cloud/agent/api"
Tim Windelschmidt0e749612023-08-07 17:42:59 +000021
Serge Bazanski00cf57d2023-04-20 11:19:00 +020022 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanskic50f6942023-04-24 18:27:22 +020023 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
Serge Bazanskicaa12082023-02-16 14:54:04 +010024 "source.monogon.dev/cloud/bmaas/bmdb/model"
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020025 "source.monogon.dev/cloud/shepherd"
Serge Bazanskicaa12082023-02-16 14:54:04 +010026)
27
Serge Bazanski86a714d2023-04-17 15:54:21 +020028// InitializerConfig configures how the Initializer will deploy Agents on
29// machines. In CLI scenarios, this should be populated from flags via
30// RegisterFlags.
31type InitializerConfig struct {
32 ControlLoopConfig
33
Serge Bazanskicaa12082023-02-16 14:54:04 +010034 // Executable is the contents of the agent binary created and run
35 // at the provisioned servers. Must be set.
36 Executable []byte
37
38 // TargetPath is a filesystem destination path used while uploading the BMaaS
39 // agent executable to hosts as part of the initialization process. Must be set.
40 TargetPath string
41
42 // Endpoint is the address Agent will use to contact the BMaaS
43 // infrastructure. Must be set.
44 Endpoint string
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020045
Serge Bazanski51987d62023-04-06 16:35:35 +020046 // EndpointCACertificate is an optional DER-encoded (but not PEM-armored) X509
47 // certificate used to populate the trusted CA store of the agent. It should be
48 // set to the CA certificate of the endpoint if not using a system-trusted CA
49 // certificate.
50 EndpointCACertificate []byte
Serge Bazanskicaa12082023-02-16 14:54:04 +010051
52 // SSHTimeout is the amount of time set aside for the initializing
53 // SSH session to run its course. Upon timeout, the iteration would be
54 // declared a failure. Must be set.
55 SSHConnectTimeout time.Duration
56 // SSHExecTimeout is the amount of time set aside for executing the agent and
57 // getting its output once the SSH connection has been established. Upon timeout,
58 // the iteration would be declared as failure. Must be set.
59 SSHExecTimeout time.Duration
60}
61
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020062func (ic *InitializerConfig) RegisterFlags() {
63 ic.ControlLoopConfig.RegisterFlags("initializer")
Serge Bazanski86a714d2023-04-17 15:54:21 +020064
Serge Bazanskicaa12082023-02-16 14:54:04 +010065 flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
66 if val == "" {
67 return nil
68 }
69 data, err := os.ReadFile(val)
70 if err != nil {
Serge Bazanski77b11d32023-04-06 14:43:19 +020071 return fmt.Errorf("could not read: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +010072 }
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020073 ic.Executable = data
Serge Bazanskicaa12082023-02-16 14:54:04 +010074 return nil
75 })
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020076 flag.StringVar(&ic.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
77 flag.StringVar(&ic.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
Serge Bazanski51987d62023-04-06 16:35:35 +020078 flag.Func("agent_endpoint_ca_certificate_path", "Path to PEM X509 CA certificate that the agent endpoint is serving with. If not set, the agent will attempt to use system CA certificates to authenticate the endpoint.", func(val string) error {
79 if val == "" {
80 return nil
81 }
82 data, err := os.ReadFile(val)
83 if err != nil {
84 return fmt.Errorf("could not read: %w", err)
85 }
86 block, _ := pem.Decode(data)
87 if block.Type != "CERTIFICATE" {
88 return fmt.Errorf("not a certificate")
89 }
90 _, err = x509.ParseCertificate(block.Bytes)
91 if err != nil {
92 return fmt.Errorf("invalid certificate: %w", err)
93 }
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020094 ic.EndpointCACertificate = block.Bytes
Serge Bazanski51987d62023-04-06 16:35:35 +020095 return nil
96 })
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020097 flag.DurationVar(&ic.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
98 flag.DurationVar(&ic.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
99}
100
101func (ic *InitializerConfig) Check() error {
102 if err := ic.ControlLoopConfig.Check(); err != nil {
103 return err
104 }
105
106 if len(ic.Executable) == 0 {
107 return fmt.Errorf("agent executable not configured")
108 }
109 if ic.TargetPath == "" {
110 return fmt.Errorf("agent target path must be set")
111 }
112 if ic.Endpoint == "" {
113 return fmt.Errorf("agent endpoint must be set")
114 }
115 if ic.SSHConnectTimeout == 0 {
116 return fmt.Errorf("agent SSH connection timeout must be set")
117 }
118 if ic.SSHExecTimeout == 0 {
119 return fmt.Errorf("agent SSH execution timeout must be set")
120 }
121
122 return nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100123}
124
Serge Bazanski86a714d2023-04-17 15:54:21 +0200125// The Initializer starts the agent on machines that aren't yet running it.
Serge Bazanskicaa12082023-02-16 14:54:04 +0100126type Initializer struct {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200127 InitializerConfig
128
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200129 sshClient SSHClient
130 p shepherd.Provider
Serge Bazanskicaa12082023-02-16 14:54:04 +0100131}
132
Serge Bazanski86a714d2023-04-17 15:54:21 +0200133// NewInitializer creates an Initializer instance, checking the
134// InitializerConfig, SharedConfig and AgentConfig for errors.
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200135func NewInitializer(p shepherd.Provider, sshClient SSHClient, ic InitializerConfig) (*Initializer, error) {
136 if err := ic.Check(); err != nil {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100137 return nil, err
138 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200139
Serge Bazanskicaa12082023-02-16 14:54:04 +0100140 return &Initializer{
Serge Bazanski86a714d2023-04-17 15:54:21 +0200141 InitializerConfig: ic,
142
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200143 p: p,
144 sshClient: sshClient,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100145 }, nil
146}
147
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200148func (i *Initializer) getProcessInfo() processInfo {
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200149 return processInfo{
150 process: model.ProcessShepherdAgentStart,
151 defaultBackoff: bmdb.Backoff{
152 Initial: 5 * time.Minute,
153 Maximum: 4 * time.Hour,
154 Exponent: 1.2,
155 },
Serge Bazanskic50f6942023-04-24 18:27:22 +0200156 processor: metrics.ProcessorShepherdInitializer,
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200157 }
158}
159
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200160func (i *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000161 return q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
162 Limit: limit,
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200163 Provider: i.p.Type(),
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000164 })
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100165}
166
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200167func (i *Initializer) processMachine(ctx context.Context, t *task) error {
168 machine, err := i.p.GetMachine(ctx, shepherd.ProviderID(t.machine.ProviderID))
Serge Bazanskicaa12082023-02-16 14:54:04 +0100169 if err != nil {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200170 return fmt.Errorf("while fetching machine %q: %v", t.machine.ProviderID, err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100171 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100172
Serge Bazanski86a714d2023-04-17 15:54:21 +0200173 // Start the agent.
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200174 klog.Infof("Starting agent on machine (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
175 apk, err := i.startAgent(ctx, machine, t.machine.MachineID)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100176 if err != nil {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200177 return fmt.Errorf("while starting the agent: %w", err)
178 }
179
180 // Agent startup succeeded. Set the appropriate BMDB tag, and release the
181 // lock.
182 klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.machine.MachineID, t.machine.ProviderID, hex.EncodeToString(apk))
183 err = t.work.Finish(ctx, func(q *model.Queries) error {
184 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
185 MachineID: t.machine.MachineID,
186 AgentStartedAt: time.Now(),
187 AgentPublicKey: apk,
188 })
189 })
190 if err != nil {
191 return fmt.Errorf("while setting AgentStarted tag: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100192 }
193 return nil
194}
195
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200196// startAgent runs the agent executable on the target machine m, returning the
Serge Bazanskicaa12082023-02-16 14:54:04 +0100197// agent's public key on success.
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200198func (i *Initializer) startAgent(ctx context.Context, m shepherd.Machine, mid uuid.UUID) ([]byte, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100199 // Provide a bound on execution time in case we get stuck after the SSH
200 // connection is established.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200201 sctx, sctxC := context.WithTimeout(ctx, i.SSHExecTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100202 defer sctxC()
203
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200204 // Use the machine's IP address
205 ni := m.Addr()
206 if !ni.IsValid() {
207 return nil, fmt.Errorf("machine (machine ID: %s) has no available addresses", mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100208 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100209
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200210 addr := net.JoinHostPort(ni.String(), "22")
211 klog.V(1).Infof("Dialing machine (machine ID: %s, addr: %s).", mid, addr)
212
213 conn, err := i.sshClient.Dial(sctx, addr, i.SSHConnectTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100214 if err != nil {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200215 return nil, fmt.Errorf("while dialing the machine: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100216 }
217 defer conn.Close()
218
219 // Upload the agent executable.
220
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100221 klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanski86a714d2023-04-17 15:54:21 +0200222 if err := conn.Upload(sctx, i.TargetPath, i.Executable); err != nil {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100223 return nil, fmt.Errorf("while uploading agent executable: %w", err)
224 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100225 klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100226
227 // The initialization protobuf message will be sent to the agent on its
228 // standard input.
229 imsg := apb.TakeoverInit{
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100230 MachineId: mid.String(),
Serge Bazanski86a714d2023-04-17 15:54:21 +0200231 BmaasEndpoint: i.Endpoint,
232 CaCertificate: i.EndpointCACertificate,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100233 }
234 imsgb, err := proto.Marshal(&imsg)
235 if err != nil {
236 return nil, fmt.Errorf("while marshaling agent message: %w", err)
237 }
238
239 // Start the agent and wait for the agent's output to arrive.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200240 klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.TargetPath, mid)
241 stdout, stderr, err := conn.Execute(ctx, i.TargetPath, imsgb)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100242 stderrStr := strings.TrimSpace(string(stderr))
243 if stderrStr != "" {
244 klog.Warningf("Agent stderr: %q", stderrStr)
245 }
246 if err != nil {
247 return nil, fmt.Errorf("while starting the agent executable: %w", err)
248 }
249
250 var arsp apb.TakeoverResponse
251 if err := proto.Unmarshal(stdout, &arsp); err != nil {
252 return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
253 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100254 var successResp *apb.TakeoverSuccess
255 switch r := arsp.Result.(type) {
256 case *apb.TakeoverResponse_Error:
257 return nil, fmt.Errorf("agent returned error: %v", r.Error.Message)
258 case *apb.TakeoverResponse_Success:
259 successResp = r.Success
260 default:
261 return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result)
262 }
263 if !proto.Equal(&imsg, successResp.InitMessage) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100264 return nil, fmt.Errorf("agent did not send back the init message.")
265 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100266 if len(successResp.Key) != ed25519.PublicKeySize {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100267 return nil, fmt.Errorf("agent key length mismatch.")
268 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100269 klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
Lorenz Brun595dfe92023-02-21 19:13:02 +0100270 return successResp.Key, nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100271}