blob: 7ae5f83536717697deeaeebd4278be1ae4e5ee9a [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
Tim Windelschmidt9f21f532024-05-07 15:14:20 +02005package launch
Serge Bazanski66e58952021-10-05 17:06:56 +02006
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020019 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "os"
21 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010022 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "path/filepath"
Serge Bazanski53458ba2024-06-18 09:56:46 +000024 "strconv"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020025 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "syscall"
27 "time"
28
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +010029 "github.com/bazelbuild/rules_go/go/runfiles"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020031 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020032 "golang.org/x/net/proxy"
Lorenz Brun87bbf7e2024-03-18 18:22:25 +010033 "golang.org/x/sys/unix"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010035 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020037 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020038 "k8s.io/client-go/kubernetes"
39 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020040
Serge Bazanski37cfcc12024-03-21 11:59:07 +010041 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020042 apb "source.monogon.dev/metropolis/proto/api"
43 cpb "source.monogon.dev/metropolis/proto/common"
44
Serge Bazanskidd5b03c2024-05-16 18:07:06 +020045 "source.monogon.dev/go/qcow2"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010046 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Serge Bazanski66e58952021-10-05 17:06:56 +020047 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020048 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020049 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020050 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020051 "source.monogon.dev/metropolis/test/localregistry"
52 "source.monogon.dev/osbase/test/launch"
Serge Bazanski66e58952021-10-05 17:06:56 +020053)
54
Serge Bazanski53458ba2024-06-18 09:56:46 +000055const (
56 // nodeNumberKey is the key of the node label used to carry a node's numerical
57 // index in the test system.
58 nodeNumberKey string = "test-node-number"
59)
60
Leopold20a036e2023-01-15 00:17:19 +010061// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020062type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010063 // Name is a human-readable identifier to be used in debug output.
64 Name string
65
Serge Bazanski66e58952021-10-05 17:06:56 +020066 // Ports contains the port mapping where to expose the internal ports of the VM to
67 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
68 // ConnectToSocket is set.
69 Ports launch.PortMap
70
Leopold20a036e2023-01-15 00:17:19 +010071 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
72 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020073 // want to test reboot behavior this should be false.
74 AllowReboot bool
75
Leopold20a036e2023-01-15 00:17:19 +010076 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
77 // set, it is instead connected to the given file descriptor/socket. If this is
78 // set, all port maps from the Ports option are ignored. Intended for networking
79 // this instance together with others for running more complex network
80 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020081 ConnectToSocket *os.File
82
Leopoldacfad5b2023-01-15 14:05:25 +010083 // When PcapDump is set, all traffic is dumped to a pcap file in the
84 // runtime directory (e.g. "net0.pcap" for the first interface).
85 PcapDump bool
86
Leopold20a036e2023-01-15 00:17:19 +010087 // SerialPort is an io.ReadWriter over which you can communicate with the serial
88 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020089 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
90 SerialPort io.ReadWriter
91
92 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
93 // registering into a cluster.
94 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020095
96 // Mac is the node's MAC address.
97 Mac *net.HardwareAddr
98
99 // Runtime keeps the node's QEMU runtime state.
100 Runtime *NodeRuntime
101}
102
Leopold20a036e2023-01-15 00:17:19 +0100103// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200104type NodeRuntime struct {
105 // ld points at the node's launch directory storing data such as storage
106 // images, firmware variables or the TPM state.
107 ld string
108 // sd points at the node's socket directory.
109 sd string
110
111 // ctxT is the context QEMU will execute in.
112 ctxT context.Context
113 // CtxC is the QEMU context's cancellation function.
114 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200115}
116
117// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200118var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200119 node.ConsensusPort,
120
121 node.CuratorServicePort,
122 node.DebugServicePort,
123
124 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100125 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200126 node.CuratorServicePort,
127 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200128 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200129}
130
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200131// setupRuntime creates the node's QEMU runtime directory, together with all
132// files required to preserve its state, a level below the chosen path ld. The
133// node's socket directory is similarily created a level below sd. It may
134// return an I/O error.
135func setupRuntime(ld, sd string) (*NodeRuntime, error) {
136 // Create a temporary directory to keep all the runtime files.
137 stdp, err := os.MkdirTemp(ld, "node_state*")
138 if err != nil {
139 return nil, fmt.Errorf("failed to create the state directory: %w", err)
140 }
141
142 // Initialize the node's storage with a prebuilt image.
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100143 si, err := runfiles.Rlocation("_main/metropolis/node/image.img")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200144 if err != nil {
145 return nil, fmt.Errorf("while resolving a path: %w", err)
146 }
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200147
148 di := filepath.Join(stdp, "image.qcow2")
149 launch.Log("Cluster: generating node QCOW2 snapshot image: %s -> %s", si, di)
150
151 df, err := os.Create(di)
152 if err != nil {
153 return nil, fmt.Errorf("while opening image for writing: %w", err)
154 }
155 defer df.Close()
156 if err := qcow2.Generate(df, qcow2.GenerateWithBackingFile(si)); err != nil {
157 return nil, fmt.Errorf("while creating copy-on-write node image: %w", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200158 }
159
160 // Initialize the OVMF firmware variables file.
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100161 sv, err := runfiles.Rlocation("edk2/OVMF_VARS.fd")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200162 if err != nil {
163 return nil, fmt.Errorf("while resolving a path: %w", err)
164 }
165 dv := filepath.Join(stdp, filepath.Base(sv))
166 if err := copyFile(sv, dv); err != nil {
167 return nil, fmt.Errorf("while copying firmware variables: %w", err)
168 }
169
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200170 // Create the socket directory.
171 sotdp, err := os.MkdirTemp(sd, "node_sock*")
172 if err != nil {
173 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
174 }
175
176 return &NodeRuntime{
177 ld: stdp,
178 sd: sotdp,
179 }, nil
180}
181
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200182// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200183// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200184func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200185 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200186 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200187 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100188 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200189 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200190 for _, n := range c.NodeIDs {
191 ep, err := resolver.NodeWithDefaultPort(n)
192 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200193 return nil, fmt.Errorf("could not add node %q by DNS: %w", n, err)
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200194 }
195 r.AddEndpoint(ep)
196 }
197 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
198 grpc.WithTransportCredentials(authCreds),
199 grpc.WithResolvers(r),
200 grpc.WithContextDialer(c.DialNode),
201 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200202 if err != nil {
203 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
204 }
205 c.authClient = authClient
206 }
207 return c.authClient, nil
208}
209
Serge Bazanski66e58952021-10-05 17:06:56 +0200210// LaunchNode launches a single Metropolis node instance with the given options.
211// The instance runs mostly paravirtualized but with some emulated hardware
212// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200213// writable, and the changes are kept across reboots and shutdowns. ld and sd
214// point to the launch directory and the socket directory, holding the nodes'
215// state files (storage, tpm state, firmware state), and UNIX socket files
216// (swtpm <-> QEMU interplay) respectively. The directories must exist before
217// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
218// if either are not initialized.
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200219func LaunchNode(ctx context.Context, ld, sd string, tpmFactory *TPMFactory, options *NodeOptions, doneC chan error) error {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200220 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
221 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200222 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
223 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
224 // that we open and pass the name of it to QEMU. Not pinning this crashes both
225 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
226 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200227
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200228 // If it's the node's first start, set up its runtime directories.
229 if options.Runtime == nil {
230 r, err := setupRuntime(ld, sd)
231 if err != nil {
232 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200233 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200234 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200235 }
236
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200237 // Replace the node's context with a new one.
238 r := options.Runtime
239 if r.CtxC != nil {
240 r.CtxC()
241 }
242 r.ctxT, r.CtxC = context.WithCancel(ctx)
243
Serge Bazanski66e58952021-10-05 17:06:56 +0200244 var qemuNetType string
245 var qemuNetConfig launch.QemuValue
246 if options.ConnectToSocket != nil {
247 qemuNetType = "socket"
248 qemuNetConfig = launch.QemuValue{
249 "id": {"net0"},
250 "fd": {"3"},
251 }
252 } else {
253 qemuNetType = "user"
254 qemuNetConfig = launch.QemuValue{
255 "id": {"net0"},
256 "net": {"10.42.0.0/24"},
257 "dhcpstart": {"10.42.0.10"},
258 "hostfwd": options.Ports.ToQemuForwards(),
259 }
260 }
261
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200262 // Generate the node's MAC address if it isn't already set in NodeOptions.
263 if options.Mac == nil {
264 mac, err := generateRandomEthernetMAC()
265 if err != nil {
266 return err
267 }
268 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200269 }
270
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100271 ovmfCodePath, err := runfiles.Rlocation("edk2/OVMF_CODE.fd")
272 if err != nil {
273 return err
274 }
275
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200276 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
277 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200278 storagePath := filepath.Join(r.ld, "image.qcow2")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200279 qemuArgs := []string{
Serge Bazanski99b02142024-04-17 16:33:28 +0200280 "-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "2048",
Serge Bazanski66e58952021-10-05 17:06:56 +0200281 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100282 "-drive", "if=pflash,format=raw,readonly=on,file=" + ovmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200283 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200284 "-drive", "if=virtio,format=qcow2,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200285 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200286 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200287 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
288 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
289 "-device", "tpm-tis,tpmdev=tpm0",
290 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200291 "-serial", "stdio",
292 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200293
294 if !options.AllowReboot {
295 qemuArgs = append(qemuArgs, "-no-reboot")
296 }
297
298 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200299 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200300 parametersRaw, err := proto.Marshal(options.NodeParameters)
301 if err != nil {
302 return fmt.Errorf("failed to encode node paraeters: %w", err)
303 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200304 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200305 return fmt.Errorf("failed to write node parameters: %w", err)
306 }
307 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
308 }
309
Leopoldacfad5b2023-01-15 14:05:25 +0100310 if options.PcapDump {
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200311 qemuNetDump := launch.QemuValue{
312 "id": {"net0"},
313 "netdev": {"net0"},
314 "file": {filepath.Join(r.ld, "net0.pcap")},
Leopoldacfad5b2023-01-15 14:05:25 +0100315 }
316 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
317 }
318
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200319 // Manufacture TPM if needed.
320 tpmd := filepath.Join(r.ld, "tpm")
321 err = tpmFactory.Manufacture(ctx, tpmd, &TPMPlatform{
322 Manufacturer: "Monogon",
323 Version: "1.0",
324 Model: "TestCluster",
325 })
326 if err != nil {
327 return fmt.Errorf("could not manufacture TPM: %w", err)
328 }
329
Serge Bazanski66e58952021-10-05 17:06:56 +0200330 // Start TPM emulator as a subprocess
Serge Bazanskib07c57a2024-06-04 14:33:27 +0000331 swtpm, err := runfiles.Rlocation("swtpm/swtpm")
332 if err != nil {
333 return fmt.Errorf("could not find swtpm: %w", err)
334 }
335
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200336 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200337
Serge Bazanskib07c57a2024-06-04 14:33:27 +0000338 tpmEmuCmd := exec.CommandContext(tpmCtx, swtpm, "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
339 // Silence warnings from unsafe libtpms build (uses non-constant-time
340 // cryptographic operations).
341 tpmEmuCmd.Env = append(tpmEmuCmd.Env, "MONOGON_LIBTPMS_ACKNOWLEDGE_UNSAFE=yes")
Serge Bazanski66e58952021-10-05 17:06:56 +0200342 tpmEmuCmd.Stderr = os.Stderr
343 tpmEmuCmd.Stdout = os.Stdout
344
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100345 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200346 if err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200347 tpmCancel()
Serge Bazanski66e58952021-10-05 17:06:56 +0200348 return fmt.Errorf("failed to start TPM emulator: %w", err)
349 }
350
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200351 // Wait for the socket to be created by the TPM emulator before launching
352 // QEMU.
353 for {
354 _, err := os.Stat(tpmSocketPath)
355 if err == nil {
356 break
357 }
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200358 if !os.IsNotExist(err) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200359 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200360 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
361 }
362 if err := tpmCtx.Err(); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200363 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200364 return fmt.Errorf("while waiting for the TPM socket: %w", err)
365 }
366 time.Sleep(time.Millisecond * 100)
367 }
368
Serge Bazanski66e58952021-10-05 17:06:56 +0200369 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200370 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200371 if options.ConnectToSocket != nil {
372 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
373 }
374
375 var stdErrBuf bytes.Buffer
376 systemCmd.Stderr = &stdErrBuf
377 systemCmd.Stdout = options.SerialPort
378
Leopoldaf5086b2023-01-15 14:12:42 +0100379 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
380
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200381 go func() {
382 launch.Log("Node: Starting...")
383 err = systemCmd.Run()
384 launch.Log("Node: Returned: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200385
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200386 // Stop TPM emulator and wait for it to exit to properly reap the child process
387 tpmCancel()
388 launch.Log("Node: Waiting for TPM emulator to exit")
389 // Wait returns a SIGKILL error because we just cancelled its context.
390 // We still need to call it to avoid creating zombies.
391 errTpm := tpmEmuCmd.Wait()
392 launch.Log("Node: TPM emulator done: %v", errTpm)
Serge Bazanski66e58952021-10-05 17:06:56 +0200393
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200394 var exerr *exec.ExitError
395 if err != nil && errors.As(err, &exerr) {
396 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
397 if status.Signaled() && status.Signal() == syscall.SIGKILL {
398 // Process was killed externally (most likely by our context being canceled).
399 // This is a normal exit for us, so return nil
400 doneC <- nil
401 return
402 }
403 exerr.Stderr = stdErrBuf.Bytes()
404 newErr := launch.QEMUError(*exerr)
405 launch.Log("Node: %q", stdErrBuf.String())
406 doneC <- &newErr
407 return
Serge Bazanski66e58952021-10-05 17:06:56 +0200408 }
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200409 doneC <- err
410 }()
411 return nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200412}
413
414func copyFile(src, dst string) error {
415 in, err := os.Open(src)
416 if err != nil {
417 return fmt.Errorf("when opening source: %w", err)
418 }
419 defer in.Close()
420
421 out, err := os.Create(dst)
422 if err != nil {
423 return fmt.Errorf("when creating destination: %w", err)
424 }
425 defer out.Close()
426
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100427 endPos, err := in.Seek(0, io.SeekEnd)
Serge Bazanski66e58952021-10-05 17:06:56 +0200428 if err != nil {
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100429 return fmt.Errorf("when getting source end: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200430 }
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100431
432 // Copy the file while preserving its sparseness. The image files are very
433 // sparse (less than 10% allocated), so this is a lot faster.
434 var lastHoleStart int64
435 for {
436 dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
437 if err != nil {
438 return fmt.Errorf("when seeking to next data block: %w", err)
439 }
440 holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
441 if err != nil {
442 return fmt.Errorf("when seeking to next hole: %w", err)
443 }
444 lastHoleStart = holeStart
445 if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
446 return fmt.Errorf("when seeking to current data block: %w", err)
447 }
448 if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
449 return fmt.Errorf("when seeking output to next data block: %w", err)
450 }
451 if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
452 return fmt.Errorf("when copying file: %w", err)
453 }
454 if endPos == holeStart {
455 // The next hole is at the end of the file, we're done here.
456 break
457 }
458 }
459
Serge Bazanski66e58952021-10-05 17:06:56 +0200460 return out.Close()
461}
462
Serge Bazanskie78a0892021-10-07 17:03:49 +0200463// getNodes wraps around Management.GetNodes to return a list of nodes in a
464// cluster.
465func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200466 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100467 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100468 err := backoff.Retry(func() error {
469 res = nil
470 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200471 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100472 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200473 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100474 for {
475 node, err := srvN.Recv()
476 if err == io.EOF {
477 break
478 }
479 if err != nil {
480 return fmt.Errorf("GetNodes.Recv: %w", err)
481 }
482 res = append(res, node)
483 }
484 return nil
485 }, bo)
486 if err != nil {
487 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200488 }
489 return res, nil
490}
491
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200492// getNode wraps Management.GetNodes. It returns node information matching
493// given node ID.
494func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
495 nodes, err := getNodes(ctx, mgmt)
496 if err != nil {
497 return nil, fmt.Errorf("could not get nodes: %w", err)
498 }
499 for _, n := range nodes {
500 eid := identity.NodeID(n.Pubkey)
501 if eid != id {
502 continue
503 }
504 return n, nil
505 }
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200506 return nil, fmt.Errorf("no such node")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200507}
508
Serge Bazanski66e58952021-10-05 17:06:56 +0200509// Gets a random EUI-48 Ethernet MAC address
510func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
511 macBuf := make([]byte, 6)
512 _, err := rand.Read(macBuf)
513 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200514 return nil, fmt.Errorf("failed to read randomness for MAC: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200515 }
516
517 // Set U/L bit and clear I/G bit (locally administered individual MAC)
518 // Ref IEEE 802-2014 Section 8.2.2
519 macBuf[0] = (macBuf[0] | 2) & 0xfe
520 mac := net.HardwareAddr(macBuf)
521 return &mac, nil
522}
523
Serge Bazanskibe742842022-04-04 13:18:50 +0200524const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200525
Serge Bazanskibe742842022-04-04 13:18:50 +0200526// ClusterPorts contains all ports handled by Nanoswitch.
527var ClusterPorts = []uint16{
528 // Forwarded to the first node.
529 uint16(node.CuratorServicePort),
530 uint16(node.DebugServicePort),
531 uint16(node.KubernetesAPIPort),
532 uint16(node.KubernetesAPIWrappedPort),
533
534 // SOCKS proxy to the switch network
535 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200536}
537
538// ClusterOptions contains all options for launching a Metropolis cluster.
539type ClusterOptions struct {
540 // The number of nodes this cluster should be started with.
541 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100542
543 // If true, node logs will be saved to individual files instead of being printed
544 // out to stderr. The path of these files will be still printed to stdout.
545 //
546 // The files will be located within the launch directory inside TEST_TMPDIR (or
547 // the default tempdir location, if not set).
548 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200549
550 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
551 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
552 // incomplete.
553 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200554
555 // Optional local registry which will be made available to the cluster to
556 // pull images from. This is a more efficient alternative to preseeding all
557 // images used for testing.
558 LocalRegistry *localregistry.Server
Serge Bazanskie564f172024-04-03 12:06:06 +0200559
560 // InitialClusterConfiguration will be passed to the first node when creating the
561 // cluster, and defines some basic properties of the cluster. If not specified,
562 // the cluster will default to defaults as defined in
563 // metropolis.proto.api.NodeParameters.
564 InitialClusterConfiguration *cpb.ClusterConfiguration
Serge Bazanski66e58952021-10-05 17:06:56 +0200565}
566
567// Cluster is the running Metropolis cluster launched using the LaunchCluster
568// function.
569type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200570 // Owner is the TLS Certificate of the owner of the test cluster. This can be
571 // used to authenticate further clients to the running cluster.
572 Owner tls.Certificate
573 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200574 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200575 Ports launch.PortMap
576
Serge Bazanskibe742842022-04-04 13:18:50 +0200577 // Nodes is a map from Node ID to its runtime information.
578 Nodes map[string]*NodeInCluster
579 // NodeIDs is a list of node IDs that are backing this cluster, in order of
580 // creation.
581 NodeIDs []string
582
Serge Bazanski54e212a2023-06-14 13:45:11 +0200583 // CACertificate is the cluster's CA certificate.
584 CACertificate *x509.Certificate
585
Serge Bazanski66e58952021-10-05 17:06:56 +0200586 // nodesDone is a list of channels populated with the return codes from all the
587 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100588 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200589 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200590 // nodeOpts are the cluster member nodes' mutable launch options, kept here
591 // to facilitate reboots.
592 nodeOpts []NodeOptions
593 // launchDir points at the directory keeping the nodes' state, such as storage
594 // images, firmware variable files, TPM state.
595 launchDir string
596 // socketDir points at the directory keeping UNIX socket files, such as these
597 // used to facilitate communication between QEMU and swtpm. It's different
598 // from launchDir, and anchored nearer the file system root, due to the
599 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100600 socketDir string
601 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200602
Lorenz Brun276a7462023-07-12 21:28:54 +0200603 // SOCKSDialer is used by DialNode to establish connections to nodes via the
Serge Bazanskibe742842022-04-04 13:18:50 +0200604 // SOCKS server ran by nanoswitch.
Lorenz Brun276a7462023-07-12 21:28:54 +0200605 SOCKSDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200606
607 // authClient is a cached authenticated owner connection to a Curator
608 // instance within the cluster.
609 authClient *grpc.ClientConn
610
611 // ctxT is the context individual node contexts are created from.
612 ctxT context.Context
613 // ctxC is used by Close to cancel the context under which the nodes are
614 // running.
615 ctxC context.CancelFunc
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200616
617 tpmFactory *TPMFactory
Serge Bazanskibe742842022-04-04 13:18:50 +0200618}
619
620// NodeInCluster represents information about a node that's part of a Cluster.
621type NodeInCluster struct {
622 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200623 ID string
624 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200625 // Address of the node on the network ran by nanoswitch. Not reachable from the
626 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
627 // on Cluster.Ports[SOCKSPort]).
628 ManagementAddress string
629}
630
631// firstConnection performs the initial owner credential escrow with a newly
632// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
633// running at 10.1.0.2, which is always the case with the current nanoswitch
634// implementation.
635//
Leopold20a036e2023-01-15 00:17:19 +0100636// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200637// information as NodeInCluster.
638func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
639 // Dial external service.
640 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100641 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200642 if err != nil {
643 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
644 }
645 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
646 return socksDialer.Dial("tcp", addr)
647 }
648 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
649 if err != nil {
650 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
651 }
652 defer initClient.Close()
653
654 // Retrieve owner certificate - this can take a while because the node is still
655 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100656 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200657 aaa := apb.NewAAAClient(initClient)
658 var cert *tls.Certificate
659 err = backoff.Retry(func() error {
660 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
661 if st, ok := status.FromError(err); ok {
662 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100663 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200664 return err
665 }
666 }
667 return backoff.Permanent(err)
668 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
669 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200670 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200671 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100672 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200673
674 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200675 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200676 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
677 if err != nil {
678 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
679 }
680 defer authClient.Close()
681 mgmt := apb.NewManagementClient(authClient)
682
683 var node *NodeInCluster
684 err = backoff.Retry(func() error {
685 nodes, err := getNodes(ctx, mgmt)
686 if err != nil {
687 return fmt.Errorf("retrieving nodes failed: %w", err)
688 }
689 if len(nodes) != 1 {
690 return fmt.Errorf("expected one node, got %d", len(nodes))
691 }
692 n := nodes[0]
693 if n.Status == nil || n.Status.ExternalAddress == "" {
694 return fmt.Errorf("node has no status and/or address")
695 }
696 node = &NodeInCluster{
697 ID: identity.NodeID(n.Pubkey),
698 ManagementAddress: n.Status.ExternalAddress,
699 }
700 return nil
701 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
702 if err != nil {
703 return nil, nil, err
704 }
705
706 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200707}
708
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100709func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200710 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100711 if err != nil {
712 return nil, err
713 }
714 return f, nil
715}
716
Serge Bazanski66e58952021-10-05 17:06:56 +0200717// LaunchCluster launches a cluster of Metropolis node VMs together with a
718// Nanoswitch instance to network them all together.
719//
720// The given context will be used to run all qemu instances in the cluster, and
721// canceling the context or calling Close() will terminate them.
722func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200723 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200724 return nil, errors.New("refusing to start cluster with zero nodes")
725 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200726
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200727 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100728 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200729 if err != nil {
730 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
731 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100732 // Create the metroctl config directory. We keep it in /tmp because in some
733 // scenarios it's end-user visible and we want it short.
734 md, err := os.MkdirTemp("/tmp", "metroctl-*")
735 if err != nil {
736 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
737 }
738
739 // Create the socket directory. We keep it in /tmp because of socket path limits.
740 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200741 if err != nil {
742 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
743 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200744
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200745 // Set up TPM factory.
746 tpmf, err := NewTPMFactory(filepath.Join(ld, "tpm"))
747 if err != nil {
748 return nil, fmt.Errorf("failed to create TPM factory: %w", err)
749 }
750
Serge Bazanski66e58952021-10-05 17:06:56 +0200751 // Prepare links between nodes and nanoswitch.
752 var switchPorts []*os.File
753 var vmPorts []*os.File
754 for i := 0; i < opts.NumNodes; i++ {
755 switchPort, vmPort, err := launch.NewSocketPair()
756 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200757 return nil, fmt.Errorf("failed to get socketpair: %w", err)
758 }
759 switchPorts = append(switchPorts, switchPort)
760 vmPorts = append(vmPorts, vmPort)
761 }
762
Serge Bazanskie78a0892021-10-07 17:03:49 +0200763 // Make a list of channels that will be populated by all running node qemu
764 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200765 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200766 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200767 done[i] = make(chan error, 1)
768 }
769
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200770 // Prepare the node options. These will be kept as part of Cluster.
771 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
772 // launch. The runtime information can be later used to restart a node.
773 // The 0th node will be initialized first. The rest will follow after it
774 // had bootstrapped the cluster.
775 nodeOpts := make([]NodeOptions, opts.NumNodes)
776 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100777 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200778 ConnectToSocket: vmPorts[0],
779 NodeParameters: &apb.NodeParameters{
780 Cluster: &apb.NodeParameters_ClusterBootstrap_{
781 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
Serge Bazanskie564f172024-04-03 12:06:06 +0200782 OwnerPublicKey: InsecurePublicKey,
783 InitialClusterConfiguration: opts.InitialClusterConfiguration,
Serge Bazanski11198c82024-05-22 14:11:01 +0200784 Labels: &cpb.NodeLabels{
785 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski53458ba2024-06-18 09:56:46 +0000786 {Key: nodeNumberKey, Value: "0"},
Serge Bazanski11198c82024-05-22 14:11:01 +0200787 },
788 },
Serge Bazanski66e58952021-10-05 17:06:56 +0200789 },
790 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200791 },
792 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100793 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200794 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100795 if opts.NodeLogsToFiles {
796 path := path.Join(ld, "node-1.txt")
797 port, err := NewSerialFileLogger(path)
798 if err != nil {
799 return nil, fmt.Errorf("could not open log file for node 1: %w", err)
800 }
801 launch.Log("Node 1 logs at %s", path)
802 nodeOpts[0].SerialPort = port
803 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200804
805 // Start the first node.
806 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100807 launch.Log("Cluster: Starting node %d...", 1)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200808 if err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[0], done[0]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200809 ctxC()
810 return nil, fmt.Errorf("failed to launch first node: %w", err)
811 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200812
Lorenz Brun150f24a2023-07-13 20:11:06 +0200813 localRegistryAddr := net.TCPAddr{
814 IP: net.IPv4(10, 42, 0, 82),
815 Port: 5000,
816 }
817
818 var guestSvcMap launch.GuestServiceMap
819 if opts.LocalRegistry != nil {
820 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
821 if err != nil {
822 ctxC()
823 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
824 }
825 s := http.Server{
826 Handler: opts.LocalRegistry,
827 }
828 go s.Serve(l)
829 go func() {
830 <-ctxT.Done()
831 s.Close()
832 }()
833 guestSvcMap = launch.GuestServiceMap{
834 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
835 }
836 }
837
Serge Bazanskie78a0892021-10-07 17:03:49 +0200838 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200839 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
840 if err != nil {
841 ctxC()
842 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
843 }
844
845 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100846 var serialPort io.ReadWriter
847 if opts.NodeLogsToFiles {
848 path := path.Join(ld, "nanoswitch.txt")
849 serialPort, err = NewSerialFileLogger(path)
850 if err != nil {
851 launch.Log("Could not open log file for nanoswitch: %v", err)
852 }
853 launch.Log("Nanoswitch logs at %s", path)
854 } else {
855 serialPort = newPrefixedStdio(99)
856 }
Tim Windelschmidt9f21f532024-05-07 15:14:20 +0200857 kernelPath, err := runfiles.Rlocation("_main/osbase/test/ktest/vmlinux")
Serge Bazanskie84726b2024-04-17 16:32:32 +0200858 if err != nil {
859 launch.Fatal("Failed to resolved nanoswitch kernel: %v", err)
860 }
861 initramfsPath, err := runfiles.Rlocation("_main/metropolis/test/nanoswitch/initramfs.cpio.zst")
862 if err != nil {
863 launch.Fatal("Failed to resolved nanoswitch initramfs: %v", err)
864 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200865 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100866 Name: "nanoswitch",
Serge Bazanskie84726b2024-04-17 16:32:32 +0200867 KernelPath: kernelPath,
868 InitramfsPath: initramfsPath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200869 ExtraNetworkInterfaces: switchPorts,
870 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200871 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100872 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100873 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200874 }); err != nil {
875 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100876 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200877 }
878 }
879 }()
880
Serge Bazanskibe742842022-04-04 13:18:50 +0200881 // Build SOCKS dialer.
882 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
883 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200884 if err != nil {
885 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200886 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200887 }
888
Serge Bazanskibe742842022-04-04 13:18:50 +0200889 // Retrieve owner credentials and first node.
890 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200891 if err != nil {
892 ctxC()
893 return nil, err
894 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200895
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100896 // Write credentials to the metroctl directory.
897 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
898 ctxC()
899 return nil, fmt.Errorf("could not write owner key: %w", err)
900 }
901 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
902 ctxC()
903 return nil, fmt.Errorf("could not write owner certificate: %w", err)
904 }
905
Serge Bazanski53458ba2024-06-18 09:56:46 +0000906 launch.Log("Cluster: Node %d is %s", 0, firstNode.ID)
907
908 // Set up a partially initialized cluster instance, to be filled in the
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200909 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200910 cluster := &Cluster{
911 Owner: *cert,
912 Ports: portMap,
913 Nodes: map[string]*NodeInCluster{
914 firstNode.ID: firstNode,
915 },
916 NodeIDs: []string{
917 firstNode.ID,
918 },
919
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100920 nodesDone: done,
921 nodeOpts: nodeOpts,
922 launchDir: ld,
923 socketDir: sd,
924 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200925
Lorenz Brun276a7462023-07-12 21:28:54 +0200926 SOCKSDialer: socksDialer,
Serge Bazanskibe742842022-04-04 13:18:50 +0200927
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200928 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200929 ctxC: ctxC,
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200930
931 tpmFactory: tpmf,
Serge Bazanskibe742842022-04-04 13:18:50 +0200932 }
933
934 // Now start the rest of the nodes and register them into the cluster.
935
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200936 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200937 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200938 if err != nil {
939 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200940 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200941 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200942 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200943
944 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100945 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200946 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
947 if err != nil {
948 ctxC()
949 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
950 }
951 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100952 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200953
954 // Retrieve cluster info (for directory and ca public key) to register further
955 // nodes.
956 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
957 if err != nil {
958 ctxC()
959 return nil, fmt.Errorf("GetClusterInfo: %w", err)
960 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200961 caCert, err := x509.ParseCertificate(resI.CaCertificate)
962 if err != nil {
963 ctxC()
964 return nil, fmt.Errorf("ParseCertificate: %w", err)
965 }
966 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200967
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200968 // Use the retrieved information to configure the rest of the node options.
969 for i := 1; i < opts.NumNodes; i++ {
970 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100971 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200972 ConnectToSocket: vmPorts[i],
973 NodeParameters: &apb.NodeParameters{
974 Cluster: &apb.NodeParameters_ClusterRegister_{
975 ClusterRegister: &apb.NodeParameters_ClusterRegister{
976 RegisterTicket: ticket,
977 ClusterDirectory: resI.ClusterDirectory,
978 CaCertificate: resI.CaCertificate,
Serge Bazanski30e30b32024-05-22 14:11:56 +0200979 Labels: &cpb.NodeLabels{
980 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski53458ba2024-06-18 09:56:46 +0000981 {Key: nodeNumberKey, Value: fmt.Sprintf("%d", i)},
Serge Bazanski30e30b32024-05-22 14:11:56 +0200982 },
983 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200984 },
985 },
986 },
987 SerialPort: newPrefixedStdio(i),
988 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100989 if opts.NodeLogsToFiles {
990 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
991 port, err := NewSerialFileLogger(path)
992 if err != nil {
993 return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
994 }
995 launch.Log("Node %d logs at %s", i+1, path)
996 nodeOpts[i].SerialPort = port
997 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200998 }
999
1000 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001001 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001002 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001003 err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[i], done[i])
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001004 if err != nil {
1005 return nil, fmt.Errorf("failed to launch node %d: %w", i+1, err)
1006 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001007 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001008
Serge Bazanski53458ba2024-06-18 09:56:46 +00001009 // Wait for nodes to appear as NEW, populate a map from node number (index into
1010 // NodeOpts, etc.) to Metropolis Node ID.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001011 seenNodes := make(map[string]bool)
Serge Bazanski53458ba2024-06-18 09:56:46 +00001012 nodeNumberToID := make(map[int]string)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001013 launch.Log("Cluster: waiting for nodes to appear as NEW...")
1014 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001015 for {
1016 nodes, err := getNodes(ctx, mgmt)
1017 if err != nil {
1018 ctxC()
1019 return nil, fmt.Errorf("could not get nodes: %w", err)
1020 }
1021 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001022 if n.State != cpb.NodeState_NODE_STATE_NEW {
1023 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +02001024 }
Serge Bazanski87d9c592024-03-20 12:35:11 +01001025 if seenNodes[n.Id] {
1026 continue
1027 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001028 seenNodes[n.Id] = true
1029 cluster.Nodes[n.Id] = &NodeInCluster{
1030 ID: n.Id,
1031 Pubkey: n.Pubkey,
1032 }
Serge Bazanski53458ba2024-06-18 09:56:46 +00001033
1034 num, err := strconv.Atoi(node.GetNodeLabel(n.Labels, nodeNumberKey))
1035 if err != nil {
1036 return nil, fmt.Errorf("node %s has undecodable number label: %w", n.Id, err)
1037 }
1038 launch.Log("Cluster: Node %d is %s", num, n.Id)
1039 nodeNumberToID[num] = n.Id
Serge Bazanskie78a0892021-10-07 17:03:49 +02001040 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001041
1042 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001043 break
1044 }
1045 time.Sleep(1 * time.Second)
1046 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001047 }
1048 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001049
Serge Bazanski53458ba2024-06-18 09:56:46 +00001050 // Build the rest of NodeIDs from map.
1051 for i := 1; i < opts.NumNodes; i++ {
1052 cluster.NodeIDs = append(cluster.NodeIDs, nodeNumberToID[i])
1053 }
1054
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001055 approvedNodes := make(map[string]bool)
1056 upNodes := make(map[string]bool)
1057 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001058 for {
1059 nodes, err := getNodes(ctx, mgmt)
1060 if err != nil {
1061 ctxC()
1062 return nil, fmt.Errorf("could not get nodes: %w", err)
1063 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001064 for _, node := range nodes {
1065 if !seenNodes[node.Id] {
1066 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001067 continue
1068 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001069
1070 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
1071 launch.Log("Cluster: node %s is up", node.Id)
1072 upNodes[node.Id] = true
1073 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +02001074 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001075 if upNodes[node.Id] {
1076 continue
Serge Bazanskibe742842022-04-04 13:18:50 +02001077 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001078
1079 if !approvedNodes[node.Id] {
1080 launch.Log("Cluster: approving node %s", node.Id)
1081 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1082 Pubkey: node.Pubkey,
1083 })
1084 if err != nil {
1085 ctxC()
1086 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
1087 }
1088 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +02001089 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001090 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001091
1092 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
1093 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001094 break
1095 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001096 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001097 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001098 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001099
Serge Bazanski05f813b2023-03-16 17:58:39 +01001100 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +02001101 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001102 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001103 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001104 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001105
Serge Bazanskibe742842022-04-04 13:18:50 +02001106 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001107}
1108
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001109// RebootNode reboots the cluster member node matching the given index, and
1110// waits for it to rejoin the cluster. It will use the given context ctx to run
1111// cluster API requests, whereas the resulting QEMU process will be created
1112// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1113func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1114 if idx < 0 || idx >= len(c.NodeIDs) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001115 return fmt.Errorf("index out of bounds")
1116 }
1117 if c.nodeOpts[idx].Runtime == nil {
1118 return fmt.Errorf("node not running")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001119 }
1120 id := c.NodeIDs[idx]
1121
1122 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001123 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001124 if err != nil {
1125 return err
1126 }
1127 mgmt := apb.NewManagementClient(curC)
1128
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001129 // Cancel the node's context. This will shut down QEMU.
1130 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001131 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001132 err = <-c.nodesDone[idx]
1133 if err != nil {
1134 return fmt.Errorf("while restarting node: %w", err)
1135 }
1136
1137 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001138 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001139 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001140 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1141 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001142
Serge Bazanskibc969572024-03-21 11:56:13 +01001143 start := time.Now()
1144
1145 // Poll Management.GetNodes until the node is healthy.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001146 for {
1147 cs, err := getNode(ctx, mgmt, id)
1148 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001149 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001150 return err
1151 }
Serge Bazanskibc969572024-03-21 11:56:13 +01001152 launch.Log("Cluster: node health: %+v", cs.Health)
1153
1154 lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
1155 if lhb.After(start) && cs.Health == apb.Node_HEALTHY {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001156 break
1157 }
1158 time.Sleep(time.Second)
1159 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001160 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001161 return nil
1162}
1163
Serge Bazanski500f6e02024-04-03 12:06:40 +02001164// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
1165// given by idx. If the node is already shut down, this is a no-op.
1166func (c *Cluster) ShutdownNode(idx int) error {
1167 if idx < 0 || idx >= len(c.NodeIDs) {
1168 return fmt.Errorf("index out of bounds")
1169 }
1170 // Return if node is already stopped.
1171 select {
1172 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1173 return nil
1174 default:
1175 }
1176 id := c.NodeIDs[idx]
1177
1178 // Cancel the node's context. This will shut down QEMU.
1179 c.nodeOpts[idx].Runtime.CtxC()
1180 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
1181 err := <-c.nodesDone[idx]
1182 if err != nil {
1183 return fmt.Errorf("while shutting down node: %w", err)
1184 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001185 launch.Log("Cluster: node %d (%s) stopped.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001186 return nil
1187}
1188
1189// StartNode performs a power on of the node given by idx. If the node is already
1190// running, this is a no-op.
1191func (c *Cluster) StartNode(idx int) error {
1192 if idx < 0 || idx >= len(c.NodeIDs) {
1193 return fmt.Errorf("index out of bounds")
1194 }
1195 id := c.NodeIDs[idx]
1196 // Return if node is already running.
1197 select {
1198 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1199 default:
1200 return nil
1201 }
1202
1203 // Start QEMU again.
1204 launch.Log("Cluster: starting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001205 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanski500f6e02024-04-03 12:06:40 +02001206 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1207 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001208 launch.Log("Cluster: node %d (%s) started.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001209 return nil
1210}
1211
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001212// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001213// nodes to stop. It returns an error if stopping the nodes failed, or one of
1214// the nodes failed to fully start in the first place.
1215func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001216 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001217 if c.authClient != nil {
1218 c.authClient.Close()
1219 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001220 c.ctxC()
1221
Leopold20a036e2023-01-15 00:17:19 +01001222 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001223 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001224 for _, c := range c.nodesDone {
1225 err := <-c
1226 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001227 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001228 }
1229 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001230 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001231 os.RemoveAll(c.launchDir)
1232 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001233 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001234 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001235 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001236}
Serge Bazanskibe742842022-04-04 13:18:50 +02001237
1238// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1239// their ID. This is performed by connecting to the cluster nanoswitch via its
1240// SOCKS proxy, and using the cluster node list for name resolution.
1241//
1242// For example:
1243//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001244// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001245func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1246 host, port, err := net.SplitHostPort(addr)
1247 if err != nil {
1248 return nil, fmt.Errorf("invalid host:port: %w", err)
1249 }
1250 // Already an IP address?
1251 if net.ParseIP(host) != nil {
Lorenz Brun276a7462023-07-12 21:28:54 +02001252 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001253 }
1254
1255 // Otherwise, expect a node name.
1256 node, ok := c.Nodes[host]
1257 if !ok {
1258 return nil, fmt.Errorf("unknown node %q", host)
1259 }
1260 addr = net.JoinHostPort(node.ManagementAddress, port)
Lorenz Brun276a7462023-07-12 21:28:54 +02001261 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001262}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001263
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001264// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1265// Kubernetes authenticating proxy using the cluster owner identity.
1266// It currently has access to everything (i.e. the cluster-admin role)
1267// via the owner-admin binding.
1268func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1269 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1270 if err != nil {
1271 // We explicitly pass an Ed25519 private key in, so this can't happen
1272 panic(err)
1273 }
1274
1275 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001276 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001277 Host: host,
1278 TLSClientConfig: rest.TLSClientConfig{
1279 // TODO(q3k): use CA certificate
1280 Insecure: true,
1281 ServerName: "kubernetes.default.svc",
1282 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1283 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1284 },
1285 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1286 return c.DialNode(ctx, address)
1287 },
1288 }
1289 return kubernetes.NewForConfig(&clientConfig)
1290}
1291
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001292// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1293// which are currently Kubernetes controllers, ie. run an apiserver. This list
1294// might be empty if no node is currently configured with the
1295// 'KubernetesController' node.
1296func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1297 curC, err := c.CuratorClient()
1298 if err != nil {
1299 return nil, err
1300 }
1301 mgmt := apb.NewManagementClient(curC)
1302 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1303 Filter: "has(node.roles.kubernetes_controller)",
1304 })
1305 if err != nil {
1306 return nil, err
1307 }
1308 defer srv.CloseSend()
1309 var res []string
1310 for {
1311 n, err := srv.Recv()
1312 if err == io.EOF {
1313 break
1314 }
1315 if err != nil {
1316 return nil, err
1317 }
1318 if n.Status == nil || n.Status.ExternalAddress == "" {
1319 continue
1320 }
1321 res = append(res, n.Status.ExternalAddress)
1322 }
1323 return res, nil
1324}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001325
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001326// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1327// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001328func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1329 // Get an authenticated owner client within the cluster.
1330 curC, err := c.CuratorClient()
1331 if err != nil {
1332 return err
1333 }
1334 mgmt := apb.NewManagementClient(curC)
1335 nodes, err := getNodes(ctx, mgmt)
1336 if err != nil {
1337 return err
1338 }
1339
1340 var unhealthy []string
1341 for _, node := range nodes {
1342 if node.Health == apb.Node_HEALTHY {
1343 continue
1344 }
1345 unhealthy = append(unhealthy, node.Id)
1346 }
1347 if len(unhealthy) == 0 {
1348 return nil
1349 }
1350 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1351}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001352
1353// ApproveNode approves a node by ID, waiting for it to become UP.
1354func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1355 curC, err := c.CuratorClient()
1356 if err != nil {
1357 return err
1358 }
1359 mgmt := apb.NewManagementClient(curC)
1360
1361 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1362 Pubkey: c.Nodes[id].Pubkey,
1363 })
1364 if err != nil {
1365 return fmt.Errorf("ApproveNode: %w", err)
1366 }
1367 launch.Log("Cluster: %s: approved, waiting for UP", id)
1368 for {
1369 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1370 if err != nil {
1371 return fmt.Errorf("GetNodes: %w", err)
1372 }
1373 found := false
1374 for {
1375 node, err := nodes.Recv()
1376 if errors.Is(err, io.EOF) {
1377 break
1378 }
1379 if err != nil {
1380 return fmt.Errorf("Nodes.Recv: %w", err)
1381 }
1382 if node.Id != id {
1383 continue
1384 }
1385 if node.State != cpb.NodeState_NODE_STATE_UP {
1386 continue
1387 }
1388 found = true
1389 break
1390 }
1391 nodes.CloseSend()
1392
1393 if found {
1394 break
1395 }
1396 time.Sleep(time.Second)
1397 }
1398 launch.Log("Cluster: %s: UP", id)
1399 return nil
1400}
1401
1402// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1403func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1404 curC, err := c.CuratorClient()
1405 if err != nil {
1406 return err
1407 }
1408 mgmt := apb.NewManagementClient(curC)
1409
1410 tr := true
1411 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1412 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1413 Node: &apb.UpdateNodeRolesRequest_Id{
1414 Id: id,
1415 },
1416 KubernetesWorker: &tr,
1417 })
1418 return err
1419}
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001420
1421// MakeConsensusMember adds the ConsensusMember role to a node by ID.
1422func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
1423 curC, err := c.CuratorClient()
1424 if err != nil {
1425 return err
1426 }
1427 mgmt := apb.NewManagementClient(curC)
1428 cur := ipb.NewCuratorClient(curC)
1429
1430 tr := true
1431 launch.Log("Cluster: %s: adding ConsensusMember", id)
1432 bo := backoff.NewExponentialBackOff()
1433 bo.MaxElapsedTime = 10 * time.Second
1434
1435 backoff.Retry(func() error {
1436 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1437 Node: &apb.UpdateNodeRolesRequest_Id{
1438 Id: id,
1439 },
1440 ConsensusMember: &tr,
1441 })
1442 if err != nil {
1443 launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
1444 }
1445 return err
1446 }, backoff.WithContext(bo, ctx))
1447 if err != nil {
1448 return err
1449 }
1450
1451 launch.Log("Cluster: %s: waiting for learner/full members...", id)
1452
1453 learner := false
1454 for {
1455 res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
1456 if err != nil {
1457 return fmt.Errorf("GetConsensusStatus: %w", err)
1458 }
1459 for _, member := range res.EtcdMember {
1460 if member.Id != id {
1461 continue
1462 }
1463 switch member.Status {
1464 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
1465 if !learner {
1466 learner = true
1467 launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
1468 }
1469 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
1470 launch.Log("Cluster: %s: became a full member", id)
1471 return nil
1472 }
1473 }
1474 time.Sleep(100 * time.Millisecond)
1475 }
1476}