Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 1 | package manager |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "crypto/ed25519" |
Serge Bazanski | 51987d6 | 2023-04-06 16:35:35 +0200 | [diff] [blame] | 6 | "crypto/x509" |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 7 | "encoding/hex" |
Serge Bazanski | 51987d6 | 2023-04-06 16:35:35 +0200 | [diff] [blame] | 8 | "encoding/pem" |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 9 | "flag" |
| 10 | "fmt" |
| 11 | "net" |
| 12 | "os" |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 13 | "strings" |
| 14 | "time" |
| 15 | |
| 16 | "github.com/google/uuid" |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 17 | "google.golang.org/protobuf/proto" |
| 18 | "k8s.io/klog/v2" |
| 19 | |
| 20 | apb "source.monogon.dev/cloud/agent/api" |
Tim Windelschmidt | 0e74961 | 2023-08-07 17:42:59 +0000 | [diff] [blame] | 21 | |
Serge Bazanski | 00cf57d | 2023-04-20 11:19:00 +0200 | [diff] [blame] | 22 | "source.monogon.dev/cloud/bmaas/bmdb" |
Serge Bazanski | c50f694 | 2023-04-24 18:27:22 +0200 | [diff] [blame] | 23 | "source.monogon.dev/cloud/bmaas/bmdb/metrics" |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 24 | "source.monogon.dev/cloud/bmaas/bmdb/model" |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 25 | "source.monogon.dev/cloud/shepherd" |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 26 | ) |
| 27 | |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 28 | // InitializerConfig configures how the Initializer will deploy Agents on |
| 29 | // machines. In CLI scenarios, this should be populated from flags via |
| 30 | // RegisterFlags. |
| 31 | type InitializerConfig struct { |
| 32 | ControlLoopConfig |
| 33 | |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 34 | // Executable is the contents of the agent binary created and run |
| 35 | // at the provisioned servers. Must be set. |
| 36 | Executable []byte |
| 37 | |
| 38 | // TargetPath is a filesystem destination path used while uploading the BMaaS |
| 39 | // agent executable to hosts as part of the initialization process. Must be set. |
| 40 | TargetPath string |
| 41 | |
| 42 | // Endpoint is the address Agent will use to contact the BMaaS |
| 43 | // infrastructure. Must be set. |
| 44 | Endpoint string |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 45 | |
Serge Bazanski | 51987d6 | 2023-04-06 16:35:35 +0200 | [diff] [blame] | 46 | // EndpointCACertificate is an optional DER-encoded (but not PEM-armored) X509 |
| 47 | // certificate used to populate the trusted CA store of the agent. It should be |
| 48 | // set to the CA certificate of the endpoint if not using a system-trusted CA |
| 49 | // certificate. |
| 50 | EndpointCACertificate []byte |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 51 | |
| 52 | // SSHTimeout is the amount of time set aside for the initializing |
| 53 | // SSH session to run its course. Upon timeout, the iteration would be |
| 54 | // declared a failure. Must be set. |
| 55 | SSHConnectTimeout time.Duration |
| 56 | // SSHExecTimeout is the amount of time set aside for executing the agent and |
| 57 | // getting its output once the SSH connection has been established. Upon timeout, |
| 58 | // the iteration would be declared as failure. Must be set. |
| 59 | SSHExecTimeout time.Duration |
| 60 | } |
| 61 | |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 62 | func (ic *InitializerConfig) RegisterFlags() { |
| 63 | ic.ControlLoopConfig.RegisterFlags("initializer") |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 64 | |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 65 | flag.Func("agent_executable_path", "Local filesystem path of agent binary to be uploaded", func(val string) error { |
| 66 | if val == "" { |
| 67 | return nil |
| 68 | } |
| 69 | data, err := os.ReadFile(val) |
| 70 | if err != nil { |
Serge Bazanski | 77b11d3 | 2023-04-06 14:43:19 +0200 | [diff] [blame] | 71 | return fmt.Errorf("could not read: %w", err) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 72 | } |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 73 | ic.Executable = data |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 74 | return nil |
| 75 | }) |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 76 | flag.StringVar(&ic.TargetPath, "agent_target_path", "/root/agent", "Filesystem path where the agent will be uploaded to and ran from") |
| 77 | flag.StringVar(&ic.Endpoint, "agent_endpoint", "", "Address of BMDB Server to which the agent will attempt to connect") |
Serge Bazanski | 51987d6 | 2023-04-06 16:35:35 +0200 | [diff] [blame] | 78 | flag.Func("agent_endpoint_ca_certificate_path", "Path to PEM X509 CA certificate that the agent endpoint is serving with. If not set, the agent will attempt to use system CA certificates to authenticate the endpoint.", func(val string) error { |
| 79 | if val == "" { |
| 80 | return nil |
| 81 | } |
| 82 | data, err := os.ReadFile(val) |
| 83 | if err != nil { |
| 84 | return fmt.Errorf("could not read: %w", err) |
| 85 | } |
| 86 | block, _ := pem.Decode(data) |
| 87 | if block.Type != "CERTIFICATE" { |
| 88 | return fmt.Errorf("not a certificate") |
| 89 | } |
| 90 | _, err = x509.ParseCertificate(block.Bytes) |
| 91 | if err != nil { |
| 92 | return fmt.Errorf("invalid certificate: %w", err) |
| 93 | } |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 94 | ic.EndpointCACertificate = block.Bytes |
Serge Bazanski | 51987d6 | 2023-04-06 16:35:35 +0200 | [diff] [blame] | 95 | return nil |
| 96 | }) |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 97 | flag.DurationVar(&ic.SSHConnectTimeout, "agent_ssh_connect_timeout", 2*time.Second, "Timeout for connecting over SSH to a machine") |
| 98 | flag.DurationVar(&ic.SSHExecTimeout, "agent_ssh_exec_timeout", 60*time.Second, "Timeout for connecting over SSH to a machine") |
| 99 | } |
| 100 | |
| 101 | func (ic *InitializerConfig) Check() error { |
| 102 | if err := ic.ControlLoopConfig.Check(); err != nil { |
| 103 | return err |
| 104 | } |
| 105 | |
| 106 | if len(ic.Executable) == 0 { |
| 107 | return fmt.Errorf("agent executable not configured") |
| 108 | } |
| 109 | if ic.TargetPath == "" { |
| 110 | return fmt.Errorf("agent target path must be set") |
| 111 | } |
| 112 | if ic.Endpoint == "" { |
| 113 | return fmt.Errorf("agent endpoint must be set") |
| 114 | } |
| 115 | if ic.SSHConnectTimeout == 0 { |
| 116 | return fmt.Errorf("agent SSH connection timeout must be set") |
| 117 | } |
| 118 | if ic.SSHExecTimeout == 0 { |
| 119 | return fmt.Errorf("agent SSH execution timeout must be set") |
| 120 | } |
| 121 | |
| 122 | return nil |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 123 | } |
| 124 | |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 125 | // The Initializer starts the agent on machines that aren't yet running it. |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 126 | type Initializer struct { |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 127 | InitializerConfig |
| 128 | |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 129 | sshClient SSHClient |
| 130 | p shepherd.Provider |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 131 | } |
| 132 | |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 133 | // NewInitializer creates an Initializer instance, checking the |
| 134 | // InitializerConfig, SharedConfig and AgentConfig for errors. |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 135 | func NewInitializer(p shepherd.Provider, sshClient SSHClient, ic InitializerConfig) (*Initializer, error) { |
| 136 | if err := ic.Check(); err != nil { |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 137 | return nil, err |
| 138 | } |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 139 | |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 140 | return &Initializer{ |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 141 | InitializerConfig: ic, |
| 142 | |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 143 | p: p, |
| 144 | sshClient: sshClient, |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 145 | }, nil |
| 146 | } |
| 147 | |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 148 | func (i *Initializer) getProcessInfo() processInfo { |
Serge Bazanski | 00cf57d | 2023-04-20 11:19:00 +0200 | [diff] [blame] | 149 | return processInfo{ |
| 150 | process: model.ProcessShepherdAgentStart, |
| 151 | defaultBackoff: bmdb.Backoff{ |
| 152 | Initial: 5 * time.Minute, |
| 153 | Maximum: 4 * time.Hour, |
| 154 | Exponent: 1.2, |
| 155 | }, |
Serge Bazanski | c50f694 | 2023-04-24 18:27:22 +0200 | [diff] [blame] | 156 | processor: metrics.ProcessorShepherdInitializer, |
Serge Bazanski | 00cf57d | 2023-04-20 11:19:00 +0200 | [diff] [blame] | 157 | } |
| 158 | } |
| 159 | |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 160 | func (i *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) { |
Tim Windelschmidt | 0e74961 | 2023-08-07 17:42:59 +0000 | [diff] [blame] | 161 | return q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{ |
| 162 | Limit: limit, |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 163 | Provider: i.p.Type(), |
Tim Windelschmidt | 0e74961 | 2023-08-07 17:42:59 +0000 | [diff] [blame] | 164 | }) |
Serge Bazanski | 9eb903d | 2023-02-20 14:28:19 +0100 | [diff] [blame] | 165 | } |
| 166 | |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 167 | func (i *Initializer) processMachine(ctx context.Context, t *task) error { |
| 168 | machine, err := i.p.GetMachine(ctx, shepherd.ProviderID(t.machine.ProviderID)) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 169 | if err != nil { |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 170 | return fmt.Errorf("while fetching machine %q: %v", t.machine.ProviderID, err) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 171 | } |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 172 | |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 173 | // Start the agent. |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 174 | klog.Infof("Starting agent on machine (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID) |
| 175 | apk, err := i.startAgent(ctx, machine, t.machine.MachineID) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 176 | if err != nil { |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 177 | return fmt.Errorf("while starting the agent: %w", err) |
| 178 | } |
| 179 | |
| 180 | // Agent startup succeeded. Set the appropriate BMDB tag, and release the |
| 181 | // lock. |
| 182 | klog.Infof("Setting AgentStarted (ID: %s, PID: %s, Agent public key: %s).", t.machine.MachineID, t.machine.ProviderID, hex.EncodeToString(apk)) |
| 183 | err = t.work.Finish(ctx, func(q *model.Queries) error { |
| 184 | return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{ |
| 185 | MachineID: t.machine.MachineID, |
| 186 | AgentStartedAt: time.Now(), |
| 187 | AgentPublicKey: apk, |
| 188 | }) |
| 189 | }) |
| 190 | if err != nil { |
| 191 | return fmt.Errorf("while setting AgentStarted tag: %w", err) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 192 | } |
| 193 | return nil |
| 194 | } |
| 195 | |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 196 | // startAgent runs the agent executable on the target machine m, returning the |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 197 | // agent's public key on success. |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 198 | func (i *Initializer) startAgent(ctx context.Context, m shepherd.Machine, mid uuid.UUID) ([]byte, error) { |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 199 | // Provide a bound on execution time in case we get stuck after the SSH |
| 200 | // connection is established. |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 201 | sctx, sctxC := context.WithTimeout(ctx, i.SSHExecTimeout) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 202 | defer sctxC() |
| 203 | |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 204 | // Use the machine's IP address |
| 205 | ni := m.Addr() |
| 206 | if !ni.IsValid() { |
| 207 | return nil, fmt.Errorf("machine (machine ID: %s) has no available addresses", mid) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 208 | } |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 209 | |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 210 | addr := net.JoinHostPort(ni.String(), "22") |
| 211 | klog.V(1).Infof("Dialing machine (machine ID: %s, addr: %s).", mid, addr) |
| 212 | |
| 213 | conn, err := i.sshClient.Dial(sctx, addr, i.SSHConnectTimeout) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 214 | if err != nil { |
Tim Windelschmidt | b6308cd | 2023-10-10 21:19:03 +0200 | [diff] [blame] | 215 | return nil, fmt.Errorf("while dialing the machine: %w", err) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 216 | } |
| 217 | defer conn.Close() |
| 218 | |
| 219 | // Upload the agent executable. |
| 220 | |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 221 | klog.Infof("Uploading the agent executable (machine ID: %s, addr: %s).", mid, addr) |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 222 | if err := conn.Upload(sctx, i.TargetPath, i.Executable); err != nil { |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 223 | return nil, fmt.Errorf("while uploading agent executable: %w", err) |
| 224 | } |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 225 | klog.V(1).Infof("Upload successful (machine ID: %s, addr: %s).", mid, addr) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 226 | |
| 227 | // The initialization protobuf message will be sent to the agent on its |
| 228 | // standard input. |
| 229 | imsg := apb.TakeoverInit{ |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 230 | MachineId: mid.String(), |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 231 | BmaasEndpoint: i.Endpoint, |
| 232 | CaCertificate: i.EndpointCACertificate, |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 233 | } |
| 234 | imsgb, err := proto.Marshal(&imsg) |
| 235 | if err != nil { |
| 236 | return nil, fmt.Errorf("while marshaling agent message: %w", err) |
| 237 | } |
| 238 | |
| 239 | // Start the agent and wait for the agent's output to arrive. |
Serge Bazanski | 86a714d | 2023-04-17 15:54:21 +0200 | [diff] [blame] | 240 | klog.V(1).Infof("Starting the agent executable at path %q (machine ID: %s).", i.TargetPath, mid) |
| 241 | stdout, stderr, err := conn.Execute(ctx, i.TargetPath, imsgb) |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 242 | stderrStr := strings.TrimSpace(string(stderr)) |
| 243 | if stderrStr != "" { |
| 244 | klog.Warningf("Agent stderr: %q", stderrStr) |
| 245 | } |
| 246 | if err != nil { |
| 247 | return nil, fmt.Errorf("while starting the agent executable: %w", err) |
| 248 | } |
| 249 | |
| 250 | var arsp apb.TakeoverResponse |
| 251 | if err := proto.Unmarshal(stdout, &arsp); err != nil { |
| 252 | return nil, fmt.Errorf("agent reply couldn't be unmarshaled: %w", err) |
| 253 | } |
Lorenz Brun | 595dfe9 | 2023-02-21 19:13:02 +0100 | [diff] [blame] | 254 | var successResp *apb.TakeoverSuccess |
| 255 | switch r := arsp.Result.(type) { |
| 256 | case *apb.TakeoverResponse_Error: |
| 257 | return nil, fmt.Errorf("agent returned error: %v", r.Error.Message) |
| 258 | case *apb.TakeoverResponse_Success: |
| 259 | successResp = r.Success |
| 260 | default: |
| 261 | return nil, fmt.Errorf("agent returned unknown result of type %T", arsp.Result) |
| 262 | } |
| 263 | if !proto.Equal(&imsg, successResp.InitMessage) { |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 264 | return nil, fmt.Errorf("agent did not send back the init message.") |
| 265 | } |
Lorenz Brun | 595dfe9 | 2023-02-21 19:13:02 +0100 | [diff] [blame] | 266 | if len(successResp.Key) != ed25519.PublicKeySize { |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 267 | return nil, fmt.Errorf("agent key length mismatch.") |
| 268 | } |
Lorenz Brun | 5b8b860 | 2023-03-09 17:22:21 +0100 | [diff] [blame] | 269 | klog.Infof("Started the agent (machine ID: %s, key: %s).", mid, hex.EncodeToString(successResp.Key)) |
Lorenz Brun | 595dfe9 | 2023-02-21 19:13:02 +0100 | [diff] [blame] | 270 | return successResp.Key, nil |
Serge Bazanski | caa1208 | 2023-02-16 14:54:04 +0100 | [diff] [blame] | 271 | } |