blob: d9e35796751a120454897f013a2304d6658f35e2 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanskicaa12082023-02-16 14:54:04 +01004package manager
5
6import (
Tim Windelschmidt5f5f3302024-02-22 23:50:24 +01007 "bytes"
Serge Bazanskicaa12082023-02-16 14:54:04 +01008 "context"
9 "crypto/ed25519"
Serge Bazanski51987d62023-04-06 16:35:35 +020010 "crypto/x509"
Serge Bazanskicaa12082023-02-16 14:54:04 +010011 "encoding/hex"
Serge Bazanski51987d62023-04-06 16:35:35 +020012 "encoding/pem"
Serge Bazanskicaa12082023-02-16 14:54:04 +010013 "flag"
14 "fmt"
Jan Schär0175d7a2025-03-26 12:57:23 +000015 "io"
Serge Bazanskicaa12082023-02-16 14:54:04 +010016 "net"
17 "os"
Serge Bazanskicaa12082023-02-16 14:54:04 +010018 "strings"
19 "time"
20
21 "github.com/google/uuid"
Jan Schär0175d7a2025-03-26 12:57:23 +000022 "golang.org/x/crypto/ssh"
Serge Bazanskicaa12082023-02-16 14:54:04 +010023 "google.golang.org/protobuf/proto"
24 "k8s.io/klog/v2"
25
26 apb "source.monogon.dev/cloud/agent/api"
Tim Windelschmidt0e749612023-08-07 17:42:59 +000027
Serge Bazanski00cf57d2023-04-20 11:19:00 +020028 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanskic50f6942023-04-24 18:27:22 +020029 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
Serge Bazanskicaa12082023-02-16 14:54:04 +010030 "source.monogon.dev/cloud/bmaas/bmdb/model"
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020031 "source.monogon.dev/cloud/shepherd"
Jan Schär0175d7a2025-03-26 12:57:23 +000032 "source.monogon.dev/osbase/net/sshtakeover"
Serge Bazanskicaa12082023-02-16 14:54:04 +010033)
34
Serge Bazanski86a714d2023-04-17 15:54:21 +020035// InitializerConfig configures how the Initializer will deploy Agents on
36// machines. In CLI scenarios, this should be populated from flags via
37// RegisterFlags.
38type InitializerConfig struct {
39 ControlLoopConfig
40
Serge Bazanskicaa12082023-02-16 14:54:04 +010041 // Executable is the contents of the agent binary created and run
42 // at the provisioned servers. Must be set.
43 Executable []byte
44
45 // TargetPath is a filesystem destination path used while uploading the BMaaS
46 // agent executable to hosts as part of the initialization process. Must be set.
47 TargetPath string
48
49 // Endpoint is the address Agent will use to contact the BMaaS
50 // infrastructure. Must be set.
51 Endpoint string
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020052
Serge Bazanski51987d62023-04-06 16:35:35 +020053 // EndpointCACertificate is an optional DER-encoded (but not PEM-armored) X509
54 // certificate used to populate the trusted CA store of the agent. It should be
55 // set to the CA certificate of the endpoint if not using a system-trusted CA
56 // certificate.
57 EndpointCACertificate []byte
Serge Bazanskicaa12082023-02-16 14:54:04 +010058
Jan Schär0175d7a2025-03-26 12:57:23 +000059 SSHConfig ssh.ClientConfig
Serge Bazanskicaa12082023-02-16 14:54:04 +010060 // SSHExecTimeout is the amount of time set aside for executing the agent and
61 // getting its output once the SSH connection has been established. Upon timeout,
62 // the iteration would be declared as failure. Must be set.
63 SSHExecTimeout time.Duration
Jan Schär0175d7a2025-03-26 12:57:23 +000064
65 // DialSSH can be set in tests to override how ssh connections are started.
66 DialSSH func(ctx context.Context, address string, config *ssh.ClientConfig) (SSHClient, error)
67}
68
69type SSHClient interface {
70 Execute(ctx context.Context, command string, stdin []byte) (stdout []byte, stderr []byte, err error)
71 UploadExecutable(ctx context.Context, targetPath string, src io.Reader) error
72 Close() error
Serge Bazanskicaa12082023-02-16 14:54:04 +010073}
74
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020075func (ic *InitializerConfig) RegisterFlags() {
76 ic.ControlLoopConfig.RegisterFlags("initializer")
Serge Bazanski86a714d2023-04-17 15:54:21 +020077
Serge Bazanskicaa12082023-02-16 14:54:04 +010078 flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error {
79 if val == "" {
80 return nil
81 }
82 data, err := os.ReadFile(val)
83 if err != nil {
Serge Bazanski77b11d32023-04-06 14:43:19 +020084 return fmt.Errorf("could not read: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +010085 }
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020086 ic.Executable = data
Serge Bazanskicaa12082023-02-16 14:54:04 +010087 return nil
88 })
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020089 flag.StringVar(&ic.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from")
90 flag.StringVar(&ic.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect")
Serge Bazanski51987d62023-04-06 16:35:35 +020091 flag.Func("agent_endpoint_ca_certificate_path", "Path to PEM X509 CA certificate that the agent endpoint is serving with. If not set, the agent will attempt to use system CA certificates to authenticate the endpoint.", func(val string) error {
92 if val == "" {
93 return nil
94 }
95 data, err := os.ReadFile(val)
96 if err != nil {
97 return fmt.Errorf("could not read: %w", err)
98 }
99 block, _ := pem.Decode(data)
100 if block.Type != "CERTIFICATE" {
101 return fmt.Errorf("not a certificate")
102 }
103 _, err = x509.ParseCertificate(block.Bytes)
104 if err != nil {
105 return fmt.Errorf("invalid certificate: %w", err)
106 }
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200107 ic.EndpointCACertificate = block.Bytes
Serge Bazanski51987d62023-04-06 16:35:35 +0200108 return nil
109 })
Jan Schär0175d7a2025-03-26 12:57:23 +0000110 flag.DurationVar(&ic.SSHConfig.Timeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine")
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200111 flag.DurationVar(&ic.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine")
112}
113
114func (ic *InitializerConfig) Check() error {
115 if err := ic.ControlLoopConfig.Check(); err != nil {
116 return err
117 }
118
119 if len(ic.Executable) == 0 {
120 return fmt.Errorf("agent executable not configured")
121 }
122 if ic.TargetPath == "" {
123 return fmt.Errorf("agent target path must be set")
124 }
125 if ic.Endpoint == "" {
126 return fmt.Errorf("agent endpoint must be set")
127 }
Jan Schär0175d7a2025-03-26 12:57:23 +0000128 if ic.SSHConfig.Timeout == 0 {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200129 return fmt.Errorf("agent SSH connection timeout must be set")
130 }
131 if ic.SSHExecTimeout == 0 {
132 return fmt.Errorf("agent SSH execution timeout must be set")
133 }
134
135 return nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100136}
137
Serge Bazanski86a714d2023-04-17 15:54:21 +0200138// The Initializer starts the agent on machines that aren't yet running it.
Serge Bazanskicaa12082023-02-16 14:54:04 +0100139type Initializer struct {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200140 InitializerConfig
141
Jan Schär0175d7a2025-03-26 12:57:23 +0000142 p shepherd.Provider
Serge Bazanskicaa12082023-02-16 14:54:04 +0100143}
144
Serge Bazanski86a714d2023-04-17 15:54:21 +0200145// NewInitializer creates an Initializer instance, checking the
146// InitializerConfig, SharedConfig and AgentConfig for errors.
Jan Schär0175d7a2025-03-26 12:57:23 +0000147func NewInitializer(p shepherd.Provider, ic InitializerConfig) (*Initializer, error) {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200148 if err := ic.Check(); err != nil {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100149 return nil, err
150 }
Serge Bazanski86a714d2023-04-17 15:54:21 +0200151
Serge Bazanskicaa12082023-02-16 14:54:04 +0100152 return &Initializer{
Serge Bazanski86a714d2023-04-17 15:54:21 +0200153 InitializerConfig: ic,
154
Jan Schär0175d7a2025-03-26 12:57:23 +0000155 p: p,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100156 }, nil
157}
158
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200159func (i *Initializer) getProcessInfo() processInfo {
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200160 return processInfo{
161 process: model.ProcessShepherdAgentStart,
162 defaultBackoff: bmdb.Backoff{
163 Initial: 5 * time.Minute,
164 Maximum: 4 * time.Hour,
165 Exponent: 1.2,
166 },
Serge Bazanskic50f6942023-04-24 18:27:22 +0200167 processor: metrics.ProcessorShepherdInitializer,
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200168 }
169}
170
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200171func (i *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000172 return q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
173 Limit: limit,
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200174 Provider: i.p.Type(),
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000175 })
Serge Bazanski9eb903d2023-02-20 14:28:19 +0100176}
177
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200178func (i *Initializer) processMachine(ctx context.Context, t *task) error {
179 machine, err := i.p.GetMachine(ctx, shepherd.ProviderID(t.machine.ProviderID))
Serge Bazanskicaa12082023-02-16 14:54:04 +0100180 if err != nil {
Tim Windelschmidt327cdba2024-05-21 13:51:32 +0200181 return fmt.Errorf("while fetching machine %q: %w", t.machine.ProviderID, err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100182 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100183
Serge Bazanski86a714d2023-04-17 15:54:21 +0200184 // Start the agent.
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200185 klog.Infof("Starting agent on machine (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
186 apk, err := i.startAgent(ctx, machine, t.machine.MachineID)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100187 if err != nil {
Serge Bazanski86a714d2023-04-17 15:54:21 +0200188 return fmt.Errorf("while starting the agent: %w", err)
189 }
190
191 // Agent startup succeeded. Set the appropriate BMDB tag, and release the
192 // lock.
193 klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.machine.MachineID, t.machine.ProviderID, hex.EncodeToString(apk))
194 err = t.work.Finish(ctx, func(q *model.Queries) error {
195 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
196 MachineID: t.machine.MachineID,
197 AgentStartedAt: time.Now(),
198 AgentPublicKey: apk,
199 })
200 })
201 if err != nil {
202 return fmt.Errorf("while setting AgentStarted tag: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100203 }
204 return nil
205}
206
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200207// startAgent runs the agent executable on the target machine m, returning the
Serge Bazanskicaa12082023-02-16 14:54:04 +0100208// agent's public key on success.
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200209func (i *Initializer) startAgent(ctx context.Context, m shepherd.Machine, mid uuid.UUID) ([]byte, error) {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100210 // Provide a bound on execution time in case we get stuck after the SSH
211 // connection is established.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200212 sctx, sctxC := context.WithTimeout(ctx, i.SSHExecTimeout)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100213 defer sctxC()
214
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200215 // Use the machine's IP address
216 ni := m.Addr()
217 if !ni.IsValid() {
218 return nil, fmt.Errorf("machine (machine ID: %s) has no available addresses", mid)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100219 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100220
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200221 addr := net.JoinHostPort(ni.String(), "22")
222 klog.V(1).Infof("Dialing machine (machine ID: %s, addr: %s).", mid, addr)
223
Jan Schär0175d7a2025-03-26 12:57:23 +0000224 var conn SSHClient
225 var err error
226 if i.DialSSH != nil {
227 conn, err = i.DialSSH(sctx, addr, &i.SSHConfig)
228 } else {
229 conn, err = sshtakeover.Dial(sctx, addr, &i.SSHConfig)
230 }
Serge Bazanskicaa12082023-02-16 14:54:04 +0100231 if err != nil {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200232 return nil, fmt.Errorf("while dialing the machine: %w", err)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100233 }
234 defer conn.Close()
235
236 // Upload the agent executable.
237
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100238 klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr)
Jan Schär0175d7a2025-03-26 12:57:23 +0000239 if err := conn.UploadExecutable(sctx, i.TargetPath, bytes.NewReader(i.Executable)); err != nil {
Serge Bazanskicaa12082023-02-16 14:54:04 +0100240 return nil, fmt.Errorf("while uploading agent executable: %w", err)
241 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100242 klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100243
244 // The initialization protobuf message will be sent to the agent on its
245 // standard input.
246 imsg := apb.TakeoverInit{
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100247 MachineId: mid.String(),
Serge Bazanski86a714d2023-04-17 15:54:21 +0200248 BmaasEndpoint: i.Endpoint,
249 CaCertificate: i.EndpointCACertificate,
Serge Bazanskicaa12082023-02-16 14:54:04 +0100250 }
251 imsgb, err := proto.Marshal(&imsg)
252 if err != nil {
253 return nil, fmt.Errorf("while marshaling agent message: %w", err)
254 }
255
256 // Start the agent and wait for the agent's output to arrive.
Serge Bazanski86a714d2023-04-17 15:54:21 +0200257 klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.TargetPath, mid)
258 stdout, stderr, err := conn.Execute(ctx, i.TargetPath, imsgb)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100259 stderrStr := strings.TrimSpace(string(stderr))
260 if stderrStr != "" {
261 klog.Warningf("Agent stderr: %q", stderrStr)
262 }
263 if err != nil {
264 return nil, fmt.Errorf("while starting the agent executable: %w", err)
265 }
266
267 var arsp apb.TakeoverResponse
268 if err := proto.Unmarshal(stdout, &arsp); err != nil {
269 return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err)
270 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100271 var successResp *apb.TakeoverSuccess
272 switch r := arsp.Result.(type) {
273 case *apb.TakeoverResponse_Error:
274 return nil, fmt.Errorf("agent returned error: %v", r.Error.Message)
275 case *apb.TakeoverResponse_Success:
276 successResp = r.Success
277 default:
278 return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result)
279 }
280 if !proto.Equal(&imsg, successResp.InitMessage) {
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200281 return nil, fmt.Errorf("agent did not send back the init message")
Serge Bazanskicaa12082023-02-16 14:54:04 +0100282 }
Lorenz Brun595dfe92023-02-21 19:13:02 +0100283 if len(successResp.Key) != ed25519.PublicKeySize {
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200284 return nil, fmt.Errorf("agent key length mismatch")
Serge Bazanskicaa12082023-02-16 14:54:04 +0100285 }
Lorenz Brun5b8b8602023-03-09 17:22:21 +0100286 klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key))
Lorenz Brun595dfe92023-02-21 19:13:02 +0100287 return successResp.Key, nil
Serge Bazanskicaa12082023-02-16 14:54:04 +0100288}