blob: dab453bf8cd2c313a8cec9bf173df3b59588f1b5 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanski66e58952021-10-05 17:06:56 +02004// cluster builds on the launch package and implements launching Metropolis
5// nodes and clusters in a virtualized environment using qemu. It's kept in a
6// separate package as it depends on a Metropolis node image, which might not be
7// required for some use of the launch library.
Tim Windelschmidt9f21f532024-05-07 15:14:20 +02008package launch
Serge Bazanski66e58952021-10-05 17:06:56 +02009
10import (
11 "bytes"
12 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010013 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020014 "crypto/rand"
15 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020016 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020017 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "errors"
19 "fmt"
20 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020021 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020022 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "os"
24 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010025 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "path/filepath"
Serge Bazanski53458ba2024-06-18 09:56:46 +000027 "strconv"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020028 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "syscall"
30 "time"
31
32 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020034 "golang.org/x/net/proxy"
Lorenz Brun87bbf7e2024-03-18 18:22:25 +010035 "golang.org/x/sys/unix"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010037 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020039 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020040 "k8s.io/client-go/kubernetes"
41 "k8s.io/client-go/rest"
Jan Schärd1a8b642024-12-03 17:40:41 +010042 "k8s.io/utils/ptr"
Serge Bazanski66e58952021-10-05 17:06:56 +020043
Serge Bazanski37cfcc12024-03-21 11:59:07 +010044 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020045 apb "source.monogon.dev/metropolis/proto/api"
46 cpb "source.monogon.dev/metropolis/proto/common"
47
Serge Bazanskica8d9512024-09-12 14:20:57 +020048 "source.monogon.dev/go/logging"
Serge Bazanskidd5b03c2024-05-16 18:07:06 +020049 "source.monogon.dev/go/qcow2"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010050 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Serge Bazanski66e58952021-10-05 17:06:56 +020051 "source.monogon.dev/metropolis/node"
52 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020053 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020054 "source.monogon.dev/metropolis/test/localregistry"
55 "source.monogon.dev/osbase/test/launch"
Serge Bazanski66e58952021-10-05 17:06:56 +020056)
57
Serge Bazanski53458ba2024-06-18 09:56:46 +000058const (
Serge Bazanski20498dd2024-09-30 17:07:08 +000059 // NodeNumberKey is the key of the node label used to carry a node's numerical
Serge Bazanski53458ba2024-06-18 09:56:46 +000060 // index in the test system.
Serge Bazanski20498dd2024-09-30 17:07:08 +000061 NodeNumberKey string = "test-node-number"
Serge Bazanski53458ba2024-06-18 09:56:46 +000062)
63
Leopold20a036e2023-01-15 00:17:19 +010064// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020065type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010066 // Name is a human-readable identifier to be used in debug output.
67 Name string
68
Jan Schära9b060b2024-08-07 10:42:29 +020069 // CPUs is the number of virtual CPUs of the VM.
70 CPUs int
71
72 // ThreadsPerCPU is the number of threads per CPU. This is multiplied by
73 // CPUs to get the total number of threads.
74 ThreadsPerCPU int
75
76 // MemoryMiB is the RAM size in MiB of the VM.
77 MemoryMiB int
78
Jan Schär07003572024-08-26 10:42:16 +020079 // DiskBytes contains the size of the root disk in bytes or zero if the
80 // unmodified image size is used.
81 DiskBytes uint64
82
Serge Bazanski66e58952021-10-05 17:06:56 +020083 // Ports contains the port mapping where to expose the internal ports of the VM to
84 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
85 // ConnectToSocket is set.
86 Ports launch.PortMap
87
Leopold20a036e2023-01-15 00:17:19 +010088 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
89 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020090 // want to test reboot behavior this should be false.
91 AllowReboot bool
92
Leopold20a036e2023-01-15 00:17:19 +010093 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
94 // set, it is instead connected to the given file descriptor/socket. If this is
95 // set, all port maps from the Ports option are ignored. Intended for networking
96 // this instance together with others for running more complex network
97 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020098 ConnectToSocket *os.File
99
Leopoldacfad5b2023-01-15 14:05:25 +0100100 // When PcapDump is set, all traffic is dumped to a pcap file in the
101 // runtime directory (e.g. "net0.pcap" for the first interface).
102 PcapDump bool
103
Leopold20a036e2023-01-15 00:17:19 +0100104 // SerialPort is an io.ReadWriter over which you can communicate with the serial
105 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +0200106 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
107 SerialPort io.ReadWriter
108
109 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
110 // registering into a cluster.
111 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200112
113 // Mac is the node's MAC address.
114 Mac *net.HardwareAddr
115
116 // Runtime keeps the node's QEMU runtime state.
117 Runtime *NodeRuntime
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200118
119 // RunVNC starts a VNC socket for troubleshooting/testing console code. Note:
120 // this will not work in tests, as those use a built-in qemu which does not
121 // implement a VGA device.
122 RunVNC bool
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200123}
124
Leopold20a036e2023-01-15 00:17:19 +0100125// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200126type NodeRuntime struct {
127 // ld points at the node's launch directory storing data such as storage
128 // images, firmware variables or the TPM state.
129 ld string
130 // sd points at the node's socket directory.
131 sd string
132
133 // ctxT is the context QEMU will execute in.
134 ctxT context.Context
135 // CtxC is the QEMU context's cancellation function.
136 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200137}
138
139// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200140var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200141 node.ConsensusPort,
142
143 node.CuratorServicePort,
144 node.DebugServicePort,
145
146 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100147 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200148 node.CuratorServicePort,
149 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200150 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200151}
152
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200153// setupRuntime creates the node's QEMU runtime directory, together with all
154// files required to preserve its state, a level below the chosen path ld. The
155// node's socket directory is similarily created a level below sd. It may
156// return an I/O error.
Jan Schär07003572024-08-26 10:42:16 +0200157func setupRuntime(ld, sd string, diskBytes uint64) (*NodeRuntime, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200158 // Create a temporary directory to keep all the runtime files.
159 stdp, err := os.MkdirTemp(ld, "node_state*")
160 if err != nil {
161 return nil, fmt.Errorf("failed to create the state directory: %w", err)
162 }
163
164 // Initialize the node's storage with a prebuilt image.
Jan Schär07003572024-08-26 10:42:16 +0200165 st, err := os.Stat(xNodeImagePath)
166 if err != nil {
167 return nil, fmt.Errorf("cannot read image file: %w", err)
168 }
169 diskBytes = max(diskBytes, uint64(st.Size()))
170
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200171 di := filepath.Join(stdp, "image.qcow2")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000172 launch.Log("Cluster: generating node QCOW2 snapshot image: %s -> %s", xNodeImagePath, di)
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200173
174 df, err := os.Create(di)
175 if err != nil {
176 return nil, fmt.Errorf("while opening image for writing: %w", err)
177 }
178 defer df.Close()
Jan Schär07003572024-08-26 10:42:16 +0200179 if err := qcow2.Generate(df, qcow2.GenerateWithBackingFile(xNodeImagePath), qcow2.GenerateWithFileSize(diskBytes)); err != nil {
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200180 return nil, fmt.Errorf("while creating copy-on-write node image: %w", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200181 }
182
183 // Initialize the OVMF firmware variables file.
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000184 dv := filepath.Join(stdp, filepath.Base(xOvmfVarsPath))
185 if err := copyFile(xOvmfVarsPath, dv); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200186 return nil, fmt.Errorf("while copying firmware variables: %w", err)
187 }
188
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200189 // Create the socket directory.
190 sotdp, err := os.MkdirTemp(sd, "node_sock*")
191 if err != nil {
192 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
193 }
194
195 return &NodeRuntime{
196 ld: stdp,
197 sd: sotdp,
198 }, nil
199}
200
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200201// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200202// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200203func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200204 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200205 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanskica8d9512024-09-12 14:20:57 +0200206 r := resolver.New(c.ctxT, resolver.WithLogger(logging.NewFunctionBackend(func(severity logging.Severity, msg string) {
207 launch.Log("Cluster: client resolver: %s: %s", severity, msg)
208 })))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200209 for _, n := range c.NodeIDs {
210 ep, err := resolver.NodeWithDefaultPort(n)
211 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200212 return nil, fmt.Errorf("could not add node %q by DNS: %w", n, err)
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200213 }
214 r.AddEndpoint(ep)
215 }
216 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
217 grpc.WithTransportCredentials(authCreds),
218 grpc.WithResolvers(r),
219 grpc.WithContextDialer(c.DialNode),
220 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200221 if err != nil {
222 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
223 }
224 c.authClient = authClient
225 }
226 return c.authClient, nil
227}
228
Serge Bazanski66e58952021-10-05 17:06:56 +0200229// LaunchNode launches a single Metropolis node instance with the given options.
230// The instance runs mostly paravirtualized but with some emulated hardware
231// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200232// writable, and the changes are kept across reboots and shutdowns. ld and sd
233// point to the launch directory and the socket directory, holding the nodes'
234// state files (storage, tpm state, firmware state), and UNIX socket files
235// (swtpm <-> QEMU interplay) respectively. The directories must exist before
236// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
237// if either are not initialized.
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200238func LaunchNode(ctx context.Context, ld, sd string, tpmFactory *TPMFactory, options *NodeOptions, doneC chan error) error {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200239 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
240 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200241 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
242 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
243 // that we open and pass the name of it to QEMU. Not pinning this crashes both
244 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
245 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200246
Jan Schära9b060b2024-08-07 10:42:29 +0200247 if options.CPUs == 0 {
248 options.CPUs = 1
249 }
250 if options.ThreadsPerCPU == 0 {
251 options.ThreadsPerCPU = 1
252 }
253 if options.MemoryMiB == 0 {
254 options.MemoryMiB = 2048
255 }
256
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200257 // If it's the node's first start, set up its runtime directories.
258 if options.Runtime == nil {
Jan Schär07003572024-08-26 10:42:16 +0200259 r, err := setupRuntime(ld, sd, options.DiskBytes)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200260 if err != nil {
261 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200262 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200263 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200264 }
265
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200266 // Replace the node's context with a new one.
267 r := options.Runtime
268 if r.CtxC != nil {
269 r.CtxC()
270 }
271 r.ctxT, r.CtxC = context.WithCancel(ctx)
272
Serge Bazanski66e58952021-10-05 17:06:56 +0200273 var qemuNetType string
274 var qemuNetConfig launch.QemuValue
275 if options.ConnectToSocket != nil {
276 qemuNetType = "socket"
277 qemuNetConfig = launch.QemuValue{
278 "id": {"net0"},
279 "fd": {"3"},
280 }
281 } else {
282 qemuNetType = "user"
283 qemuNetConfig = launch.QemuValue{
284 "id": {"net0"},
285 "net": {"10.42.0.0/24"},
286 "dhcpstart": {"10.42.0.10"},
287 "hostfwd": options.Ports.ToQemuForwards(),
288 }
289 }
290
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200291 // Generate the node's MAC address if it isn't already set in NodeOptions.
292 if options.Mac == nil {
293 mac, err := generateRandomEthernetMAC()
294 if err != nil {
295 return err
296 }
297 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200298 }
299
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200300 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
301 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200302 storagePath := filepath.Join(r.ld, "image.qcow2")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200303 qemuArgs := []string{
Jan Schära9b060b2024-08-07 10:42:29 +0200304 "-machine", "q35",
305 "-accel", "kvm",
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200306 "-display", "none",
Jan Schära9b060b2024-08-07 10:42:29 +0200307 "-nodefaults",
308 "-cpu", "host",
309 "-m", fmt.Sprintf("%dM", options.MemoryMiB),
310 "-smp", fmt.Sprintf("cores=%d,threads=%d", options.CPUs, options.ThreadsPerCPU),
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000311 "-drive", "if=pflash,format=raw,readonly=on,file=" + xOvmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200312 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200313 "-drive", "if=virtio,format=qcow2,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200314 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200315 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200316 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
317 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
318 "-device", "tpm-tis,tpmdev=tpm0",
319 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200320 "-serial", "stdio",
321 }
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200322 if options.RunVNC {
323 vncSocketPath := filepath.Join(r.sd, "vnc-socket")
324 qemuArgs = append(qemuArgs,
325 "-vnc", "unix:"+vncSocketPath,
326 "-device", "virtio-vga",
327 )
328 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200329
330 if !options.AllowReboot {
331 qemuArgs = append(qemuArgs, "-no-reboot")
332 }
333
334 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200335 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200336 parametersRaw, err := proto.Marshal(options.NodeParameters)
337 if err != nil {
338 return fmt.Errorf("failed to encode node paraeters: %w", err)
339 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200340 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200341 return fmt.Errorf("failed to write node parameters: %w", err)
342 }
343 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
344 }
345
Leopoldacfad5b2023-01-15 14:05:25 +0100346 if options.PcapDump {
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200347 qemuNetDump := launch.QemuValue{
348 "id": {"net0"},
349 "netdev": {"net0"},
350 "file": {filepath.Join(r.ld, "net0.pcap")},
Leopoldacfad5b2023-01-15 14:05:25 +0100351 }
352 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
353 }
354
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200355 // Manufacture TPM if needed.
356 tpmd := filepath.Join(r.ld, "tpm")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000357 err := tpmFactory.Manufacture(ctx, tpmd, &TPMPlatform{
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200358 Manufacturer: "Monogon",
359 Version: "1.0",
360 Model: "TestCluster",
361 })
362 if err != nil {
363 return fmt.Errorf("could not manufacture TPM: %w", err)
364 }
365
Serge Bazanski66e58952021-10-05 17:06:56 +0200366 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200367 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200368
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000369 tpmEmuCmd := exec.CommandContext(tpmCtx, xSwtpmPath, "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanskib07c57a2024-06-04 14:33:27 +0000370 // Silence warnings from unsafe libtpms build (uses non-constant-time
371 // cryptographic operations).
372 tpmEmuCmd.Env = append(tpmEmuCmd.Env, "MONOGON_LIBTPMS_ACKNOWLEDGE_UNSAFE=yes")
Serge Bazanski66e58952021-10-05 17:06:56 +0200373 tpmEmuCmd.Stderr = os.Stderr
374 tpmEmuCmd.Stdout = os.Stdout
375
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100376 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200377 if err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200378 tpmCancel()
Serge Bazanski66e58952021-10-05 17:06:56 +0200379 return fmt.Errorf("failed to start TPM emulator: %w", err)
380 }
381
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200382 // Wait for the socket to be created by the TPM emulator before launching
383 // QEMU.
384 for {
385 _, err := os.Stat(tpmSocketPath)
386 if err == nil {
387 break
388 }
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200389 if !os.IsNotExist(err) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200390 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200391 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
392 }
393 if err := tpmCtx.Err(); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200394 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200395 return fmt.Errorf("while waiting for the TPM socket: %w", err)
396 }
397 time.Sleep(time.Millisecond * 100)
398 }
399
Serge Bazanski66e58952021-10-05 17:06:56 +0200400 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200401 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200402 if options.ConnectToSocket != nil {
403 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
404 }
405
406 var stdErrBuf bytes.Buffer
407 systemCmd.Stderr = &stdErrBuf
408 systemCmd.Stdout = options.SerialPort
409
Leopoldaf5086b2023-01-15 14:12:42 +0100410 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
411
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200412 go func() {
413 launch.Log("Node: Starting...")
414 err = systemCmd.Run()
415 launch.Log("Node: Returned: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200416
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200417 // Stop TPM emulator and wait for it to exit to properly reap the child process
418 tpmCancel()
419 launch.Log("Node: Waiting for TPM emulator to exit")
420 // Wait returns a SIGKILL error because we just cancelled its context.
421 // We still need to call it to avoid creating zombies.
422 errTpm := tpmEmuCmd.Wait()
423 launch.Log("Node: TPM emulator done: %v", errTpm)
Serge Bazanski66e58952021-10-05 17:06:56 +0200424
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200425 var exerr *exec.ExitError
426 if err != nil && errors.As(err, &exerr) {
427 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
428 if status.Signaled() && status.Signal() == syscall.SIGKILL {
429 // Process was killed externally (most likely by our context being canceled).
430 // This is a normal exit for us, so return nil
431 doneC <- nil
432 return
433 }
434 exerr.Stderr = stdErrBuf.Bytes()
435 newErr := launch.QEMUError(*exerr)
436 launch.Log("Node: %q", stdErrBuf.String())
437 doneC <- &newErr
438 return
Serge Bazanski66e58952021-10-05 17:06:56 +0200439 }
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200440 doneC <- err
441 }()
442 return nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200443}
444
445func copyFile(src, dst string) error {
446 in, err := os.Open(src)
447 if err != nil {
448 return fmt.Errorf("when opening source: %w", err)
449 }
450 defer in.Close()
451
452 out, err := os.Create(dst)
453 if err != nil {
454 return fmt.Errorf("when creating destination: %w", err)
455 }
456 defer out.Close()
457
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100458 endPos, err := in.Seek(0, io.SeekEnd)
Serge Bazanski66e58952021-10-05 17:06:56 +0200459 if err != nil {
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100460 return fmt.Errorf("when getting source end: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200461 }
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100462
463 // Copy the file while preserving its sparseness. The image files are very
464 // sparse (less than 10% allocated), so this is a lot faster.
465 var lastHoleStart int64
466 for {
467 dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
468 if err != nil {
469 return fmt.Errorf("when seeking to next data block: %w", err)
470 }
471 holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
472 if err != nil {
473 return fmt.Errorf("when seeking to next hole: %w", err)
474 }
475 lastHoleStart = holeStart
476 if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
477 return fmt.Errorf("when seeking to current data block: %w", err)
478 }
479 if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
480 return fmt.Errorf("when seeking output to next data block: %w", err)
481 }
482 if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
483 return fmt.Errorf("when copying file: %w", err)
484 }
485 if endPos == holeStart {
486 // The next hole is at the end of the file, we're done here.
487 break
488 }
489 }
490
Serge Bazanski66e58952021-10-05 17:06:56 +0200491 return out.Close()
492}
493
Serge Bazanskie78a0892021-10-07 17:03:49 +0200494// getNodes wraps around Management.GetNodes to return a list of nodes in a
495// cluster.
496func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200497 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100498 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100499 err := backoff.Retry(func() error {
500 res = nil
501 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200502 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100503 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200504 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100505 for {
506 node, err := srvN.Recv()
507 if err == io.EOF {
508 break
509 }
510 if err != nil {
511 return fmt.Errorf("GetNodes.Recv: %w", err)
512 }
513 res = append(res, node)
514 }
515 return nil
516 }, bo)
517 if err != nil {
518 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200519 }
520 return res, nil
521}
522
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200523// getNode wraps Management.GetNodes. It returns node information matching
524// given node ID.
525func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
526 nodes, err := getNodes(ctx, mgmt)
527 if err != nil {
528 return nil, fmt.Errorf("could not get nodes: %w", err)
529 }
530 for _, n := range nodes {
Jan Schär39d9c242024-09-24 13:49:55 +0200531 if n.Id == id {
532 return n, nil
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200533 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200534 }
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200535 return nil, fmt.Errorf("no such node")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200536}
537
Serge Bazanski66e58952021-10-05 17:06:56 +0200538// Gets a random EUI-48 Ethernet MAC address
539func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
540 macBuf := make([]byte, 6)
541 _, err := rand.Read(macBuf)
542 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200543 return nil, fmt.Errorf("failed to read randomness for MAC: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200544 }
545
546 // Set U/L bit and clear I/G bit (locally administered individual MAC)
547 // Ref IEEE 802-2014 Section 8.2.2
548 macBuf[0] = (macBuf[0] | 2) & 0xfe
549 mac := net.HardwareAddr(macBuf)
550 return &mac, nil
551}
552
Serge Bazanskibe742842022-04-04 13:18:50 +0200553const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200554
Serge Bazanskibe742842022-04-04 13:18:50 +0200555// ClusterPorts contains all ports handled by Nanoswitch.
556var ClusterPorts = []uint16{
557 // Forwarded to the first node.
558 uint16(node.CuratorServicePort),
559 uint16(node.DebugServicePort),
560 uint16(node.KubernetesAPIPort),
561 uint16(node.KubernetesAPIWrappedPort),
562
563 // SOCKS proxy to the switch network
564 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200565}
566
567// ClusterOptions contains all options for launching a Metropolis cluster.
568type ClusterOptions struct {
569 // The number of nodes this cluster should be started with.
570 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100571
Jan Schära9b060b2024-08-07 10:42:29 +0200572 // Node are default options of all nodes.
573 Node NodeOptions
574
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100575 // If true, node logs will be saved to individual files instead of being printed
576 // out to stderr. The path of these files will be still printed to stdout.
577 //
578 // The files will be located within the launch directory inside TEST_TMPDIR (or
579 // the default tempdir location, if not set).
580 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200581
582 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
583 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
584 // incomplete.
585 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200586
587 // Optional local registry which will be made available to the cluster to
588 // pull images from. This is a more efficient alternative to preseeding all
589 // images used for testing.
590 LocalRegistry *localregistry.Server
Serge Bazanskie564f172024-04-03 12:06:06 +0200591
592 // InitialClusterConfiguration will be passed to the first node when creating the
593 // cluster, and defines some basic properties of the cluster. If not specified,
594 // the cluster will default to defaults as defined in
595 // metropolis.proto.api.NodeParameters.
596 InitialClusterConfiguration *cpb.ClusterConfiguration
Serge Bazanski66e58952021-10-05 17:06:56 +0200597}
598
599// Cluster is the running Metropolis cluster launched using the LaunchCluster
600// function.
601type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200602 // Owner is the TLS Certificate of the owner of the test cluster. This can be
603 // used to authenticate further clients to the running cluster.
604 Owner tls.Certificate
605 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200606 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200607 Ports launch.PortMap
608
Serge Bazanskibe742842022-04-04 13:18:50 +0200609 // Nodes is a map from Node ID to its runtime information.
610 Nodes map[string]*NodeInCluster
611 // NodeIDs is a list of node IDs that are backing this cluster, in order of
612 // creation.
613 NodeIDs []string
614
Serge Bazanski54e212a2023-06-14 13:45:11 +0200615 // CACertificate is the cluster's CA certificate.
616 CACertificate *x509.Certificate
617
Serge Bazanski66e58952021-10-05 17:06:56 +0200618 // nodesDone is a list of channels populated with the return codes from all the
619 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100620 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200621 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200622 // nodeOpts are the cluster member nodes' mutable launch options, kept here
623 // to facilitate reboots.
624 nodeOpts []NodeOptions
625 // launchDir points at the directory keeping the nodes' state, such as storage
626 // images, firmware variable files, TPM state.
627 launchDir string
628 // socketDir points at the directory keeping UNIX socket files, such as these
629 // used to facilitate communication between QEMU and swtpm. It's different
630 // from launchDir, and anchored nearer the file system root, due to the
631 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100632 socketDir string
633 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200634
Lorenz Brun276a7462023-07-12 21:28:54 +0200635 // SOCKSDialer is used by DialNode to establish connections to nodes via the
Serge Bazanskibe742842022-04-04 13:18:50 +0200636 // SOCKS server ran by nanoswitch.
Lorenz Brun276a7462023-07-12 21:28:54 +0200637 SOCKSDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200638
639 // authClient is a cached authenticated owner connection to a Curator
640 // instance within the cluster.
641 authClient *grpc.ClientConn
642
643 // ctxT is the context individual node contexts are created from.
644 ctxT context.Context
645 // ctxC is used by Close to cancel the context under which the nodes are
646 // running.
647 ctxC context.CancelFunc
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200648
649 tpmFactory *TPMFactory
Serge Bazanskibe742842022-04-04 13:18:50 +0200650}
651
652// NodeInCluster represents information about a node that's part of a Cluster.
653type NodeInCluster struct {
654 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200655 ID string
656 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200657 // Address of the node on the network ran by nanoswitch. Not reachable from the
658 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
659 // on Cluster.Ports[SOCKSPort]).
660 ManagementAddress string
661}
662
663// firstConnection performs the initial owner credential escrow with a newly
664// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
665// running at 10.1.0.2, which is always the case with the current nanoswitch
666// implementation.
667//
Leopold20a036e2023-01-15 00:17:19 +0100668// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200669// information as NodeInCluster.
670func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
671 // Dial external service.
672 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100673 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200674 if err != nil {
675 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
676 }
677 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
678 return socksDialer.Dial("tcp", addr)
679 }
680 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
681 if err != nil {
682 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
683 }
684 defer initClient.Close()
685
686 // Retrieve owner certificate - this can take a while because the node is still
687 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100688 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200689 aaa := apb.NewAAAClient(initClient)
690 var cert *tls.Certificate
691 err = backoff.Retry(func() error {
692 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
693 if st, ok := status.FromError(err); ok {
694 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100695 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200696 return err
697 }
698 }
699 return backoff.Permanent(err)
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200700 }, backoff.WithContext(backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(time.Minute)), ctx))
Serge Bazanskibe742842022-04-04 13:18:50 +0200701 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200702 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200703 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100704 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200705
706 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200707 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200708 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
709 if err != nil {
710 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
711 }
712 defer authClient.Close()
713 mgmt := apb.NewManagementClient(authClient)
714
715 var node *NodeInCluster
716 err = backoff.Retry(func() error {
717 nodes, err := getNodes(ctx, mgmt)
718 if err != nil {
719 return fmt.Errorf("retrieving nodes failed: %w", err)
720 }
721 if len(nodes) != 1 {
722 return fmt.Errorf("expected one node, got %d", len(nodes))
723 }
724 n := nodes[0]
725 if n.Status == nil || n.Status.ExternalAddress == "" {
726 return fmt.Errorf("node has no status and/or address")
727 }
728 node = &NodeInCluster{
Jan Schär39d9c242024-09-24 13:49:55 +0200729 ID: n.Id,
Serge Bazanskibe742842022-04-04 13:18:50 +0200730 ManagementAddress: n.Status.ExternalAddress,
731 }
732 return nil
733 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
734 if err != nil {
735 return nil, nil, err
736 }
737
738 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200739}
740
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100741func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200742 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100743 if err != nil {
744 return nil, err
745 }
746 return f, nil
747}
748
Serge Bazanski66e58952021-10-05 17:06:56 +0200749// LaunchCluster launches a cluster of Metropolis node VMs together with a
750// Nanoswitch instance to network them all together.
751//
752// The given context will be used to run all qemu instances in the cluster, and
753// canceling the context or calling Close() will terminate them.
754func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200755 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200756 return nil, errors.New("refusing to start cluster with zero nodes")
757 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200758
Jan Schära9b060b2024-08-07 10:42:29 +0200759 // Prepare the node options. These will be kept as part of Cluster.
760 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
761 // launch. The runtime information can be later used to restart a node.
762 // The 0th node will be initialized first. The rest will follow after it
763 // had bootstrapped the cluster.
764 nodeOpts := make([]NodeOptions, opts.NumNodes)
765 for i := range opts.NumNodes {
766 nodeOpts[i] = opts.Node
767 nodeOpts[i].Name = fmt.Sprintf("node%d", i)
768 nodeOpts[i].SerialPort = newPrefixedStdio(i)
769 }
770 nodeOpts[0].NodeParameters = &apb.NodeParameters{
771 Cluster: &apb.NodeParameters_ClusterBootstrap_{
772 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
773 OwnerPublicKey: InsecurePublicKey,
774 InitialClusterConfiguration: opts.InitialClusterConfiguration,
775 Labels: &cpb.NodeLabels{
776 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski20498dd2024-09-30 17:07:08 +0000777 {Key: NodeNumberKey, Value: "0"},
Jan Schära9b060b2024-08-07 10:42:29 +0200778 },
779 },
780 },
781 },
782 }
783 nodeOpts[0].PcapDump = true
784
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200785 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100786 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200787 if err != nil {
788 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
789 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100790 // Create the metroctl config directory. We keep it in /tmp because in some
791 // scenarios it's end-user visible and we want it short.
792 md, err := os.MkdirTemp("/tmp", "metroctl-*")
793 if err != nil {
794 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
795 }
796
797 // Create the socket directory. We keep it in /tmp because of socket path limits.
798 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200799 if err != nil {
800 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
801 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200802
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200803 // Set up TPM factory.
804 tpmf, err := NewTPMFactory(filepath.Join(ld, "tpm"))
805 if err != nil {
806 return nil, fmt.Errorf("failed to create TPM factory: %w", err)
807 }
808
Serge Bazanski66e58952021-10-05 17:06:56 +0200809 // Prepare links between nodes and nanoswitch.
810 var switchPorts []*os.File
Jan Schära9b060b2024-08-07 10:42:29 +0200811 for i := range opts.NumNodes {
Serge Bazanski66e58952021-10-05 17:06:56 +0200812 switchPort, vmPort, err := launch.NewSocketPair()
813 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200814 return nil, fmt.Errorf("failed to get socketpair: %w", err)
815 }
816 switchPorts = append(switchPorts, switchPort)
Jan Schära9b060b2024-08-07 10:42:29 +0200817 nodeOpts[i].ConnectToSocket = vmPort
Serge Bazanski66e58952021-10-05 17:06:56 +0200818 }
819
Serge Bazanskie78a0892021-10-07 17:03:49 +0200820 // Make a list of channels that will be populated by all running node qemu
821 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200822 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200823 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200824 done[i] = make(chan error, 1)
825 }
826
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100827 if opts.NodeLogsToFiles {
Lorenz Brun4beaf4f2025-01-14 16:10:55 +0100828 nodeLogDir := ld
829 if os.Getenv("TEST_UNDECLARED_OUTPUTS_DIR") != "" {
830 nodeLogDir = os.Getenv("TEST_UNDECLARED_OUTPUTS_DIR")
831 }
Jan Schära9b060b2024-08-07 10:42:29 +0200832 for i := range opts.NumNodes {
Lorenz Brun4beaf4f2025-01-14 16:10:55 +0100833 path := path.Join(nodeLogDir, fmt.Sprintf("node-%d.txt", i))
Jan Schära9b060b2024-08-07 10:42:29 +0200834 port, err := NewSerialFileLogger(path)
835 if err != nil {
836 return nil, fmt.Errorf("could not open log file for node %d: %w", i, err)
837 }
838 launch.Log("Node %d logs at %s", i, path)
839 nodeOpts[i].SerialPort = port
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100840 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100841 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200842
843 // Start the first node.
844 ctxT, ctxC := context.WithCancel(ctx)
Jan Schär0b927652024-07-31 18:08:50 +0200845 launch.Log("Cluster: Starting node %d...", 0)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200846 if err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[0], done[0]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200847 ctxC()
848 return nil, fmt.Errorf("failed to launch first node: %w", err)
849 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200850
Lorenz Brun150f24a2023-07-13 20:11:06 +0200851 localRegistryAddr := net.TCPAddr{
852 IP: net.IPv4(10, 42, 0, 82),
853 Port: 5000,
854 }
855
856 var guestSvcMap launch.GuestServiceMap
857 if opts.LocalRegistry != nil {
858 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
859 if err != nil {
860 ctxC()
861 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
862 }
863 s := http.Server{
864 Handler: opts.LocalRegistry,
865 }
866 go s.Serve(l)
867 go func() {
868 <-ctxT.Done()
869 s.Close()
870 }()
871 guestSvcMap = launch.GuestServiceMap{
872 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
873 }
874 }
875
Serge Bazanskie78a0892021-10-07 17:03:49 +0200876 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200877 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
878 if err != nil {
879 ctxC()
880 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
881 }
882
883 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100884 var serialPort io.ReadWriter
Tim Windelschmidta5b00bd2024-12-09 22:52:31 +0100885 var err error
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100886 if opts.NodeLogsToFiles {
Tim Windelschmidta5b00bd2024-12-09 22:52:31 +0100887 loggerPath := path.Join(ld, "nanoswitch.txt")
888 serialPort, err = NewSerialFileLogger(loggerPath)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100889 if err != nil {
Tim Windelschmidta5b00bd2024-12-09 22:52:31 +0100890 launch.Fatal("Could not open log file for nanoswitch: %v", err)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100891 }
Tim Windelschmidta5b00bd2024-12-09 22:52:31 +0100892 launch.Log("Nanoswitch logs at %s", loggerPath)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100893 } else {
894 serialPort = newPrefixedStdio(99)
895 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200896 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100897 Name: "nanoswitch",
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000898 KernelPath: xKernelPath,
899 InitramfsPath: xInitramfsPath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200900 ExtraNetworkInterfaces: switchPorts,
901 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200902 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100903 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100904 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200905 }); err != nil {
906 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100907 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200908 }
909 }
910 }()
911
Serge Bazanskibe742842022-04-04 13:18:50 +0200912 // Build SOCKS dialer.
913 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
914 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200915 if err != nil {
916 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200917 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200918 }
919
Serge Bazanskibe742842022-04-04 13:18:50 +0200920 // Retrieve owner credentials and first node.
921 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200922 if err != nil {
923 ctxC()
924 return nil, err
925 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200926
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100927 // Write credentials to the metroctl directory.
928 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
929 ctxC()
930 return nil, fmt.Errorf("could not write owner key: %w", err)
931 }
932 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
933 ctxC()
934 return nil, fmt.Errorf("could not write owner certificate: %w", err)
935 }
936
Serge Bazanski53458ba2024-06-18 09:56:46 +0000937 launch.Log("Cluster: Node %d is %s", 0, firstNode.ID)
938
939 // Set up a partially initialized cluster instance, to be filled in the
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200940 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200941 cluster := &Cluster{
942 Owner: *cert,
943 Ports: portMap,
944 Nodes: map[string]*NodeInCluster{
945 firstNode.ID: firstNode,
946 },
947 NodeIDs: []string{
948 firstNode.ID,
949 },
950
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100951 nodesDone: done,
952 nodeOpts: nodeOpts,
953 launchDir: ld,
954 socketDir: sd,
955 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200956
Lorenz Brun276a7462023-07-12 21:28:54 +0200957 SOCKSDialer: socksDialer,
Serge Bazanskibe742842022-04-04 13:18:50 +0200958
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200959 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200960 ctxC: ctxC,
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200961
962 tpmFactory: tpmf,
Serge Bazanskibe742842022-04-04 13:18:50 +0200963 }
964
965 // Now start the rest of the nodes and register them into the cluster.
966
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200967 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200968 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200969 if err != nil {
970 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200971 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200972 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200973 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200974
975 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100976 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200977 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
978 if err != nil {
979 ctxC()
980 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
981 }
982 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100983 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200984
985 // Retrieve cluster info (for directory and ca public key) to register further
986 // nodes.
987 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
988 if err != nil {
989 ctxC()
990 return nil, fmt.Errorf("GetClusterInfo: %w", err)
991 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200992 caCert, err := x509.ParseCertificate(resI.CaCertificate)
993 if err != nil {
994 ctxC()
995 return nil, fmt.Errorf("ParseCertificate: %w", err)
996 }
997 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200998
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200999 // Use the retrieved information to configure the rest of the node options.
1000 for i := 1; i < opts.NumNodes; i++ {
Jan Schära9b060b2024-08-07 10:42:29 +02001001 nodeOpts[i].NodeParameters = &apb.NodeParameters{
1002 Cluster: &apb.NodeParameters_ClusterRegister_{
1003 ClusterRegister: &apb.NodeParameters_ClusterRegister{
1004 RegisterTicket: ticket,
1005 ClusterDirectory: resI.ClusterDirectory,
1006 CaCertificate: resI.CaCertificate,
1007 Labels: &cpb.NodeLabels{
1008 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski20498dd2024-09-30 17:07:08 +00001009 {Key: NodeNumberKey, Value: fmt.Sprintf("%d", i)},
Serge Bazanski30e30b32024-05-22 14:11:56 +02001010 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001011 },
1012 },
1013 },
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001014 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001015 }
1016
1017 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001018 for i := 1; i < opts.NumNodes; i++ {
Jan Schär0b927652024-07-31 18:08:50 +02001019 launch.Log("Cluster: Starting node %d...", i)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001020 err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[i], done[i])
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001021 if err != nil {
Jan Schär0b927652024-07-31 18:08:50 +02001022 return nil, fmt.Errorf("failed to launch node %d: %w", i, err)
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001023 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001024 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001025
Serge Bazanski53458ba2024-06-18 09:56:46 +00001026 // Wait for nodes to appear as NEW, populate a map from node number (index into
Jan Schära9b060b2024-08-07 10:42:29 +02001027 // nodeOpts, etc.) to Metropolis Node ID.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001028 seenNodes := make(map[string]bool)
Serge Bazanski53458ba2024-06-18 09:56:46 +00001029 nodeNumberToID := make(map[int]string)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001030 launch.Log("Cluster: waiting for nodes to appear as NEW...")
1031 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001032 for {
1033 nodes, err := getNodes(ctx, mgmt)
1034 if err != nil {
1035 ctxC()
1036 return nil, fmt.Errorf("could not get nodes: %w", err)
1037 }
1038 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001039 if n.State != cpb.NodeState_NODE_STATE_NEW {
1040 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +02001041 }
Serge Bazanski87d9c592024-03-20 12:35:11 +01001042 if seenNodes[n.Id] {
1043 continue
1044 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001045 seenNodes[n.Id] = true
1046 cluster.Nodes[n.Id] = &NodeInCluster{
1047 ID: n.Id,
1048 Pubkey: n.Pubkey,
1049 }
Serge Bazanski53458ba2024-06-18 09:56:46 +00001050
Serge Bazanski20498dd2024-09-30 17:07:08 +00001051 num, err := strconv.Atoi(node.GetNodeLabel(n.Labels, NodeNumberKey))
Serge Bazanski53458ba2024-06-18 09:56:46 +00001052 if err != nil {
1053 return nil, fmt.Errorf("node %s has undecodable number label: %w", n.Id, err)
1054 }
1055 launch.Log("Cluster: Node %d is %s", num, n.Id)
1056 nodeNumberToID[num] = n.Id
Serge Bazanskie78a0892021-10-07 17:03:49 +02001057 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001058
1059 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001060 break
1061 }
1062 time.Sleep(1 * time.Second)
1063 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001064 }
1065 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001066
Serge Bazanski53458ba2024-06-18 09:56:46 +00001067 // Build the rest of NodeIDs from map.
1068 for i := 1; i < opts.NumNodes; i++ {
1069 cluster.NodeIDs = append(cluster.NodeIDs, nodeNumberToID[i])
1070 }
1071
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001072 approvedNodes := make(map[string]bool)
1073 upNodes := make(map[string]bool)
1074 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001075 for {
1076 nodes, err := getNodes(ctx, mgmt)
1077 if err != nil {
1078 ctxC()
1079 return nil, fmt.Errorf("could not get nodes: %w", err)
1080 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001081 for _, node := range nodes {
1082 if !seenNodes[node.Id] {
1083 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001084 continue
1085 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001086
1087 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
1088 launch.Log("Cluster: node %s is up", node.Id)
1089 upNodes[node.Id] = true
1090 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +02001091 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001092 if upNodes[node.Id] {
1093 continue
Serge Bazanskibe742842022-04-04 13:18:50 +02001094 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001095
1096 if !approvedNodes[node.Id] {
1097 launch.Log("Cluster: approving node %s", node.Id)
1098 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1099 Pubkey: node.Pubkey,
1100 })
1101 if err != nil {
1102 ctxC()
1103 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
1104 }
1105 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +02001106 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001107 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001108
Jan Schär0b927652024-07-31 18:08:50 +02001109 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes, len(upNodes)+1)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001110 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001111 break
1112 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001113 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001114 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001115 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001116
Serge Bazanski05f813b2023-03-16 17:58:39 +01001117 launch.Log("Cluster: all nodes up:")
Jan Schär0b927652024-07-31 18:08:50 +02001118 for i, nodeID := range cluster.NodeIDs {
1119 launch.Log("Cluster: %d. %s at %s", i, nodeID, cluster.Nodes[nodeID].ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001120 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001121 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001122
Serge Bazanskibe742842022-04-04 13:18:50 +02001123 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001124}
1125
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001126// RebootNode reboots the cluster member node matching the given index, and
1127// waits for it to rejoin the cluster. It will use the given context ctx to run
1128// cluster API requests, whereas the resulting QEMU process will be created
1129// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1130func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1131 if idx < 0 || idx >= len(c.NodeIDs) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001132 return fmt.Errorf("index out of bounds")
1133 }
1134 if c.nodeOpts[idx].Runtime == nil {
1135 return fmt.Errorf("node not running")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001136 }
1137 id := c.NodeIDs[idx]
1138
1139 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001140 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001141 if err != nil {
1142 return err
1143 }
1144 mgmt := apb.NewManagementClient(curC)
1145
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001146 // Cancel the node's context. This will shut down QEMU.
1147 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001148 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001149 err = <-c.nodesDone[idx]
1150 if err != nil {
1151 return fmt.Errorf("while restarting node: %w", err)
1152 }
1153
1154 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001155 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001156 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001157 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1158 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001159
Serge Bazanskibc969572024-03-21 11:56:13 +01001160 start := time.Now()
1161
1162 // Poll Management.GetNodes until the node is healthy.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001163 for {
1164 cs, err := getNode(ctx, mgmt, id)
1165 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001166 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001167 return err
1168 }
Serge Bazanskibc969572024-03-21 11:56:13 +01001169 launch.Log("Cluster: node health: %+v", cs.Health)
1170
1171 lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
Tim Windelschmidta10d0cb2025-01-13 14:44:15 +01001172 if lhb.After(start) && cs.Health == apb.Node_HEALTH_HEALTHY {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001173 break
1174 }
1175 time.Sleep(time.Second)
1176 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001177 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001178 return nil
1179}
1180
Serge Bazanski500f6e02024-04-03 12:06:40 +02001181// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
1182// given by idx. If the node is already shut down, this is a no-op.
1183func (c *Cluster) ShutdownNode(idx int) error {
1184 if idx < 0 || idx >= len(c.NodeIDs) {
1185 return fmt.Errorf("index out of bounds")
1186 }
1187 // Return if node is already stopped.
1188 select {
1189 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1190 return nil
1191 default:
1192 }
1193 id := c.NodeIDs[idx]
1194
1195 // Cancel the node's context. This will shut down QEMU.
1196 c.nodeOpts[idx].Runtime.CtxC()
1197 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
1198 err := <-c.nodesDone[idx]
1199 if err != nil {
1200 return fmt.Errorf("while shutting down node: %w", err)
1201 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001202 launch.Log("Cluster: node %d (%s) stopped.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001203 return nil
1204}
1205
1206// StartNode performs a power on of the node given by idx. If the node is already
1207// running, this is a no-op.
1208func (c *Cluster) StartNode(idx int) error {
1209 if idx < 0 || idx >= len(c.NodeIDs) {
1210 return fmt.Errorf("index out of bounds")
1211 }
1212 id := c.NodeIDs[idx]
1213 // Return if node is already running.
1214 select {
1215 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1216 default:
1217 return nil
1218 }
1219
1220 // Start QEMU again.
1221 launch.Log("Cluster: starting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001222 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanski500f6e02024-04-03 12:06:40 +02001223 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1224 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001225 launch.Log("Cluster: node %d (%s) started.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001226 return nil
1227}
1228
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001229// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001230// nodes to stop. It returns an error if stopping the nodes failed, or one of
1231// the nodes failed to fully start in the first place.
1232func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001233 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001234 if c.authClient != nil {
1235 c.authClient.Close()
1236 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001237 c.ctxC()
1238
Leopold20a036e2023-01-15 00:17:19 +01001239 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001240 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001241 for _, c := range c.nodesDone {
1242 err := <-c
1243 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001244 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001245 }
1246 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001247 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001248 os.RemoveAll(c.launchDir)
1249 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001250 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001251 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001252 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001253}
Serge Bazanskibe742842022-04-04 13:18:50 +02001254
1255// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1256// their ID. This is performed by connecting to the cluster nanoswitch via its
1257// SOCKS proxy, and using the cluster node list for name resolution.
1258//
1259// For example:
1260//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001261// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001262func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1263 host, port, err := net.SplitHostPort(addr)
1264 if err != nil {
1265 return nil, fmt.Errorf("invalid host:port: %w", err)
1266 }
1267 // Already an IP address?
1268 if net.ParseIP(host) != nil {
Lorenz Brun276a7462023-07-12 21:28:54 +02001269 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001270 }
1271
1272 // Otherwise, expect a node name.
1273 node, ok := c.Nodes[host]
1274 if !ok {
1275 return nil, fmt.Errorf("unknown node %q", host)
1276 }
1277 addr = net.JoinHostPort(node.ManagementAddress, port)
Lorenz Brun276a7462023-07-12 21:28:54 +02001278 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001279}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001280
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001281// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1282// Kubernetes authenticating proxy using the cluster owner identity.
1283// It currently has access to everything (i.e. the cluster-admin role)
1284// via the owner-admin binding.
Lorenz Brun8f1254d2025-01-28 14:10:05 +01001285func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, *rest.Config, error) {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001286 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1287 if err != nil {
1288 // We explicitly pass an Ed25519 private key in, so this can't happen
1289 panic(err)
1290 }
1291
1292 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001293 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001294 Host: host,
1295 TLSClientConfig: rest.TLSClientConfig{
1296 // TODO(q3k): use CA certificate
1297 Insecure: true,
1298 ServerName: "kubernetes.default.svc",
1299 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1300 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1301 },
1302 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1303 return c.DialNode(ctx, address)
1304 },
1305 }
Lorenz Brun8f1254d2025-01-28 14:10:05 +01001306 clientSet, err := kubernetes.NewForConfig(&clientConfig)
1307 if err != nil {
1308 return nil, nil, err
1309 }
1310 return clientSet, &clientConfig, nil
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001311}
1312
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001313// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1314// which are currently Kubernetes controllers, ie. run an apiserver. This list
1315// might be empty if no node is currently configured with the
1316// 'KubernetesController' node.
1317func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1318 curC, err := c.CuratorClient()
1319 if err != nil {
1320 return nil, err
1321 }
1322 mgmt := apb.NewManagementClient(curC)
1323 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1324 Filter: "has(node.roles.kubernetes_controller)",
1325 })
1326 if err != nil {
1327 return nil, err
1328 }
1329 defer srv.CloseSend()
1330 var res []string
1331 for {
1332 n, err := srv.Recv()
1333 if err == io.EOF {
1334 break
1335 }
1336 if err != nil {
1337 return nil, err
1338 }
1339 if n.Status == nil || n.Status.ExternalAddress == "" {
1340 continue
1341 }
1342 res = append(res, n.Status.ExternalAddress)
1343 }
1344 return res, nil
1345}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001346
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001347// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1348// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001349func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1350 // Get an authenticated owner client within the cluster.
1351 curC, err := c.CuratorClient()
1352 if err != nil {
1353 return err
1354 }
1355 mgmt := apb.NewManagementClient(curC)
1356 nodes, err := getNodes(ctx, mgmt)
1357 if err != nil {
1358 return err
1359 }
1360
1361 var unhealthy []string
1362 for _, node := range nodes {
Tim Windelschmidta10d0cb2025-01-13 14:44:15 +01001363 if node.Health == apb.Node_HEALTH_HEALTHY {
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001364 continue
1365 }
1366 unhealthy = append(unhealthy, node.Id)
1367 }
1368 if len(unhealthy) == 0 {
1369 return nil
1370 }
1371 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1372}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001373
1374// ApproveNode approves a node by ID, waiting for it to become UP.
1375func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1376 curC, err := c.CuratorClient()
1377 if err != nil {
1378 return err
1379 }
1380 mgmt := apb.NewManagementClient(curC)
1381
1382 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1383 Pubkey: c.Nodes[id].Pubkey,
1384 })
1385 if err != nil {
1386 return fmt.Errorf("ApproveNode: %w", err)
1387 }
1388 launch.Log("Cluster: %s: approved, waiting for UP", id)
1389 for {
1390 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1391 if err != nil {
1392 return fmt.Errorf("GetNodes: %w", err)
1393 }
1394 found := false
1395 for {
1396 node, err := nodes.Recv()
1397 if errors.Is(err, io.EOF) {
1398 break
1399 }
1400 if err != nil {
1401 return fmt.Errorf("Nodes.Recv: %w", err)
1402 }
1403 if node.Id != id {
1404 continue
1405 }
1406 if node.State != cpb.NodeState_NODE_STATE_UP {
1407 continue
1408 }
1409 found = true
1410 break
1411 }
1412 nodes.CloseSend()
1413
1414 if found {
1415 break
1416 }
1417 time.Sleep(time.Second)
1418 }
1419 launch.Log("Cluster: %s: UP", id)
1420 return nil
1421}
1422
1423// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1424func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1425 curC, err := c.CuratorClient()
1426 if err != nil {
1427 return err
1428 }
1429 mgmt := apb.NewManagementClient(curC)
1430
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001431 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1432 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1433 Node: &apb.UpdateNodeRolesRequest_Id{
1434 Id: id,
1435 },
Jan Schärd1a8b642024-12-03 17:40:41 +01001436 KubernetesWorker: ptr.To(true),
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001437 })
1438 return err
1439}
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001440
Jan Schära9b060b2024-08-07 10:42:29 +02001441// MakeKubernetesController adds the KubernetesController role to a node by ID.
1442func (c *Cluster) MakeKubernetesController(ctx context.Context, id string) error {
1443 curC, err := c.CuratorClient()
1444 if err != nil {
1445 return err
1446 }
1447 mgmt := apb.NewManagementClient(curC)
1448
Jan Schära9b060b2024-08-07 10:42:29 +02001449 launch.Log("Cluster: %s: adding KubernetesController", id)
1450 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1451 Node: &apb.UpdateNodeRolesRequest_Id{
1452 Id: id,
1453 },
Jan Schärd1a8b642024-12-03 17:40:41 +01001454 KubernetesController: ptr.To(true),
Jan Schära9b060b2024-08-07 10:42:29 +02001455 })
1456 return err
1457}
1458
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001459// MakeConsensusMember adds the ConsensusMember role to a node by ID.
1460func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
1461 curC, err := c.CuratorClient()
1462 if err != nil {
1463 return err
1464 }
1465 mgmt := apb.NewManagementClient(curC)
1466 cur := ipb.NewCuratorClient(curC)
1467
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001468 launch.Log("Cluster: %s: adding ConsensusMember", id)
1469 bo := backoff.NewExponentialBackOff()
1470 bo.MaxElapsedTime = 10 * time.Second
1471
1472 backoff.Retry(func() error {
1473 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1474 Node: &apb.UpdateNodeRolesRequest_Id{
1475 Id: id,
1476 },
Jan Schärd1a8b642024-12-03 17:40:41 +01001477 ConsensusMember: ptr.To(true),
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001478 })
1479 if err != nil {
1480 launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
1481 }
1482 return err
1483 }, backoff.WithContext(bo, ctx))
1484 if err != nil {
1485 return err
1486 }
1487
1488 launch.Log("Cluster: %s: waiting for learner/full members...", id)
1489
1490 learner := false
1491 for {
1492 res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
1493 if err != nil {
1494 return fmt.Errorf("GetConsensusStatus: %w", err)
1495 }
1496 for _, member := range res.EtcdMember {
1497 if member.Id != id {
1498 continue
1499 }
1500 switch member.Status {
1501 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
1502 if !learner {
1503 learner = true
1504 launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
1505 }
1506 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
1507 launch.Log("Cluster: %s: became a full member", id)
1508 return nil
1509 }
1510 }
1511 time.Sleep(100 * time.Millisecond)
1512 }
1513}