blob: 7f64427a867159e0a88f70e5550fe7cc4e623084 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanski66e58952021-10-05 17:06:56 +02004// cluster builds on the launch package and implements launching Metropolis
5// nodes and clusters in a virtualized environment using qemu. It's kept in a
6// separate package as it depends on a Metropolis node image, which might not be
7// required for some use of the launch library.
Tim Windelschmidt9f21f532024-05-07 15:14:20 +02008package launch
Serge Bazanski66e58952021-10-05 17:06:56 +02009
10import (
11 "bytes"
12 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010013 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020014 "crypto/rand"
15 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020016 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020017 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "errors"
19 "fmt"
20 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020021 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020022 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "os"
24 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010025 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "path/filepath"
Serge Bazanski53458ba2024-06-18 09:56:46 +000027 "strconv"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020028 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "syscall"
30 "time"
31
32 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020034 "golang.org/x/net/proxy"
Lorenz Brun87bbf7e2024-03-18 18:22:25 +010035 "golang.org/x/sys/unix"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010037 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020039 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020040 "k8s.io/client-go/kubernetes"
41 "k8s.io/client-go/rest"
Jan Schärd1a8b642024-12-03 17:40:41 +010042 "k8s.io/utils/ptr"
Serge Bazanski66e58952021-10-05 17:06:56 +020043
Serge Bazanski37cfcc12024-03-21 11:59:07 +010044 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020045 apb "source.monogon.dev/metropolis/proto/api"
46 cpb "source.monogon.dev/metropolis/proto/common"
47
Serge Bazanskica8d9512024-09-12 14:20:57 +020048 "source.monogon.dev/go/logging"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010049 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Jan Schäre19d2792025-06-23 12:37:58 +000050 "source.monogon.dev/metropolis/installer/install"
Serge Bazanski66e58952021-10-05 17:06:56 +020051 "source.monogon.dev/metropolis/node"
52 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020053 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Jan Schär3b0c8dd2025-06-23 10:32:07 +000054 "source.monogon.dev/osbase/blockdev"
Jan Schär3b0c8dd2025-06-23 10:32:07 +000055 "source.monogon.dev/osbase/oci"
Jan Schäre19d2792025-06-23 12:37:58 +000056 "source.monogon.dev/osbase/oci/osimage"
Jan Schär9d2f3c62025-04-14 11:17:22 +000057 "source.monogon.dev/osbase/oci/registry"
Jan Schär3b0c8dd2025-06-23 10:32:07 +000058 "source.monogon.dev/osbase/structfs"
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +010059 "source.monogon.dev/osbase/test/qemu"
Serge Bazanski66e58952021-10-05 17:06:56 +020060)
61
Serge Bazanski53458ba2024-06-18 09:56:46 +000062const (
Serge Bazanski20498dd2024-09-30 17:07:08 +000063 // NodeNumberKey is the key of the node label used to carry a node's numerical
Serge Bazanski53458ba2024-06-18 09:56:46 +000064 // index in the test system.
Serge Bazanski20498dd2024-09-30 17:07:08 +000065 NodeNumberKey string = "test-node-number"
Serge Bazanski53458ba2024-06-18 09:56:46 +000066)
67
Leopold20a036e2023-01-15 00:17:19 +010068// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020069type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010070 // Name is a human-readable identifier to be used in debug output.
71 Name string
72
Jan Schära9b060b2024-08-07 10:42:29 +020073 // CPUs is the number of virtual CPUs of the VM.
74 CPUs int
75
76 // ThreadsPerCPU is the number of threads per CPU. This is multiplied by
77 // CPUs to get the total number of threads.
78 ThreadsPerCPU int
79
80 // MemoryMiB is the RAM size in MiB of the VM.
81 MemoryMiB int
82
Jan Schär07003572024-08-26 10:42:16 +020083 // DiskBytes contains the size of the root disk in bytes or zero if the
84 // unmodified image size is used.
85 DiskBytes uint64
86
Serge Bazanski66e58952021-10-05 17:06:56 +020087 // Ports contains the port mapping where to expose the internal ports of the VM to
88 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
89 // ConnectToSocket is set.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +010090 Ports qemu.PortMap
Serge Bazanski66e58952021-10-05 17:06:56 +020091
Leopold20a036e2023-01-15 00:17:19 +010092 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
93 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020094 // want to test reboot behavior this should be false.
95 AllowReboot bool
96
Leopold20a036e2023-01-15 00:17:19 +010097 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
98 // set, it is instead connected to the given file descriptor/socket. If this is
99 // set, all port maps from the Ports option are ignored. Intended for networking
100 // this instance together with others for running more complex network
101 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +0200102 ConnectToSocket *os.File
103
Leopoldacfad5b2023-01-15 14:05:25 +0100104 // When PcapDump is set, all traffic is dumped to a pcap file in the
105 // runtime directory (e.g. "net0.pcap" for the first interface).
106 PcapDump bool
107
Leopold20a036e2023-01-15 00:17:19 +0100108 // SerialPort is an io.ReadWriter over which you can communicate with the serial
109 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +0200110 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
111 SerialPort io.ReadWriter
112
113 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
114 // registering into a cluster.
115 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200116
117 // Mac is the node's MAC address.
118 Mac *net.HardwareAddr
119
120 // Runtime keeps the node's QEMU runtime state.
121 Runtime *NodeRuntime
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200122
123 // RunVNC starts a VNC socket for troubleshooting/testing console code. Note:
124 // this will not work in tests, as those use a built-in qemu which does not
125 // implement a VGA device.
126 RunVNC bool
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200127}
128
Leopold20a036e2023-01-15 00:17:19 +0100129// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200130type NodeRuntime struct {
131 // ld points at the node's launch directory storing data such as storage
132 // images, firmware variables or the TPM state.
133 ld string
134 // sd points at the node's socket directory.
135 sd string
136
137 // ctxT is the context QEMU will execute in.
138 ctxT context.Context
139 // CtxC is the QEMU context's cancellation function.
140 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200141}
142
143// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200144var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200145 node.ConsensusPort,
146
147 node.CuratorServicePort,
148 node.DebugServicePort,
149
150 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100151 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200152 node.CuratorServicePort,
153 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200154 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200155}
156
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200157// setupRuntime creates the node's QEMU runtime directory, together with all
158// files required to preserve its state, a level below the chosen path ld. The
159// node's socket directory is similarily created a level below sd. It may
160// return an I/O error.
Jan Schär07003572024-08-26 10:42:16 +0200161func setupRuntime(ld, sd string, diskBytes uint64) (*NodeRuntime, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200162 // Create a temporary directory to keep all the runtime files.
163 stdp, err := os.MkdirTemp(ld, "node_state*")
164 if err != nil {
165 return nil, fmt.Errorf("failed to create the state directory: %w", err)
166 }
167
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000168 // Initialize the node's storage.
169 ociImage, err := oci.ReadLayout(xNodeImagePath)
Jan Schär07003572024-08-26 10:42:16 +0200170 if err != nil {
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000171 return nil, fmt.Errorf("failed to read OS image: %w", err)
Jan Schär07003572024-08-26 10:42:16 +0200172 }
Jan Schäre19d2792025-06-23 12:37:58 +0000173 osImage, err := osimage.Read(ociImage)
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000174 if err != nil {
175 return nil, fmt.Errorf("failed to read OS image: %w", err)
176 }
Jan Schär07003572024-08-26 10:42:16 +0200177
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000178 abloader, err := structfs.OSPathBlob(xAbloaderPath)
179 if err != nil {
180 return nil, fmt.Errorf("cannot open abloader: %w", err)
181 }
182
183 di := filepath.Join(stdp, "image.img")
184 logf("Cluster: generating node image: %s -> %s", xNodeImagePath, di)
185
186 df, err := blockdev.CreateFile(di, 512, int64(diskBytes/512))
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200187 if err != nil {
188 return nil, fmt.Errorf("while opening image for writing: %w", err)
189 }
190 defer df.Close()
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000191
Jan Schäre19d2792025-06-23 12:37:58 +0000192 installParams := &install.Params{
193 PartitionSize: install.PartitionSizeInfo{
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000194 ESP: 128,
195 System: 1024,
196 Data: 128,
197 },
Jan Schärdaf9e952025-06-23 13:28:16 +0000198 OSImage: osImage,
199 UnverifiedPayloads: true,
200 ABLoader: abloader,
201 Output: df,
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000202 }
Jan Schäre19d2792025-06-23 12:37:58 +0000203
204 if _, err := install.Write(installParams); err != nil {
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000205 return nil, fmt.Errorf("while creating node image: %w", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200206 }
207
208 // Initialize the OVMF firmware variables file.
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000209 dv := filepath.Join(stdp, filepath.Base(xOvmfVarsPath))
210 if err := copyFile(xOvmfVarsPath, dv); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200211 return nil, fmt.Errorf("while copying firmware variables: %w", err)
212 }
213
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200214 // Create the socket directory.
215 sotdp, err := os.MkdirTemp(sd, "node_sock*")
216 if err != nil {
217 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
218 }
219
220 return &NodeRuntime{
221 ld: stdp,
222 sd: sotdp,
223 }, nil
224}
225
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200226// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200227// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200228func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200229 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200230 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanskica8d9512024-09-12 14:20:57 +0200231 r := resolver.New(c.ctxT, resolver.WithLogger(logging.NewFunctionBackend(func(severity logging.Severity, msg string) {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100232 logf("Cluster: client resolver: %s: %s", severity, msg)
Serge Bazanskica8d9512024-09-12 14:20:57 +0200233 })))
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100234 for _, n := range c.Nodes {
235 r.AddEndpoint(resolver.NodeAtAddressWithDefaultPort(n.ManagementAddress))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200236 }
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100237 authClient, err := grpc.NewClient(resolver.MetropolisControlAddress,
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200238 grpc.WithTransportCredentials(authCreds),
239 grpc.WithResolvers(r),
240 grpc.WithContextDialer(c.DialNode),
241 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200242 if err != nil {
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100243 return nil, fmt.Errorf("creating client with owner credentials failed: %w", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200244 }
245 c.authClient = authClient
246 }
247 return c.authClient, nil
248}
249
Serge Bazanski66e58952021-10-05 17:06:56 +0200250// LaunchNode launches a single Metropolis node instance with the given options.
251// The instance runs mostly paravirtualized but with some emulated hardware
252// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200253// writable, and the changes are kept across reboots and shutdowns. ld and sd
254// point to the launch directory and the socket directory, holding the nodes'
255// state files (storage, tpm state, firmware state), and UNIX socket files
256// (swtpm <-> QEMU interplay) respectively. The directories must exist before
257// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
258// if either are not initialized.
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200259func LaunchNode(ctx context.Context, ld, sd string, tpmFactory *TPMFactory, options *NodeOptions, doneC chan error) error {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200260 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
261 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200262 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
263 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
264 // that we open and pass the name of it to QEMU. Not pinning this crashes both
265 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
266 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200267
Jan Schära9b060b2024-08-07 10:42:29 +0200268 if options.CPUs == 0 {
269 options.CPUs = 1
270 }
271 if options.ThreadsPerCPU == 0 {
272 options.ThreadsPerCPU = 1
273 }
274 if options.MemoryMiB == 0 {
275 options.MemoryMiB = 2048
276 }
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000277 if options.DiskBytes == 0 {
278 options.DiskBytes = 5 * 1024 * 1024 * 1024
279 }
Jan Schära9b060b2024-08-07 10:42:29 +0200280
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200281 // If it's the node's first start, set up its runtime directories.
282 if options.Runtime == nil {
Jan Schär07003572024-08-26 10:42:16 +0200283 r, err := setupRuntime(ld, sd, options.DiskBytes)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200284 if err != nil {
285 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200286 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200287 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200288 }
289
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200290 // Replace the node's context with a new one.
291 r := options.Runtime
292 if r.CtxC != nil {
293 r.CtxC()
294 }
295 r.ctxT, r.CtxC = context.WithCancel(ctx)
296
Serge Bazanski66e58952021-10-05 17:06:56 +0200297 var qemuNetType string
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100298 var qemuNetConfig qemu.QemuValue
Serge Bazanski66e58952021-10-05 17:06:56 +0200299 if options.ConnectToSocket != nil {
300 qemuNetType = "socket"
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100301 qemuNetConfig = qemu.QemuValue{
Serge Bazanski66e58952021-10-05 17:06:56 +0200302 "id": {"net0"},
303 "fd": {"3"},
304 }
305 } else {
306 qemuNetType = "user"
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100307 qemuNetConfig = qemu.QemuValue{
Serge Bazanski66e58952021-10-05 17:06:56 +0200308 "id": {"net0"},
309 "net": {"10.42.0.0/24"},
310 "dhcpstart": {"10.42.0.10"},
311 "hostfwd": options.Ports.ToQemuForwards(),
312 }
313 }
314
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200315 // Generate the node's MAC address if it isn't already set in NodeOptions.
316 if options.Mac == nil {
317 mac, err := generateRandomEthernetMAC()
318 if err != nil {
319 return err
320 }
321 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200322 }
323
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200324 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
Tim Windelschmidt12240f92025-04-28 14:59:33 +0200325 fwVarPath := filepath.Join(r.ld, "VARS.fd")
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000326 storagePath := filepath.Join(r.ld, "image.img")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200327 qemuArgs := []string{
Jan Schära9b060b2024-08-07 10:42:29 +0200328 "-machine", "q35",
329 "-accel", "kvm",
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200330 "-display", "none",
Jan Schära9b060b2024-08-07 10:42:29 +0200331 "-nodefaults",
332 "-cpu", "host",
333 "-m", fmt.Sprintf("%dM", options.MemoryMiB),
334 "-smp", fmt.Sprintf("cores=%d,threads=%d", options.CPUs, options.ThreadsPerCPU),
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000335 "-drive", "if=pflash,format=raw,readonly=on,file=" + xOvmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200336 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000337 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200338 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200339 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200340 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
341 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
342 "-device", "tpm-tis,tpmdev=tpm0",
343 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200344 "-serial", "stdio",
345 }
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200346 if options.RunVNC {
347 vncSocketPath := filepath.Join(r.sd, "vnc-socket")
348 qemuArgs = append(qemuArgs,
349 "-vnc", "unix:"+vncSocketPath,
350 "-device", "virtio-vga",
351 )
352 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200353
354 if !options.AllowReboot {
355 qemuArgs = append(qemuArgs, "-no-reboot")
356 }
357
358 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200359 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200360 parametersRaw, err := proto.Marshal(options.NodeParameters)
361 if err != nil {
362 return fmt.Errorf("failed to encode node paraeters: %w", err)
363 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200364 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200365 return fmt.Errorf("failed to write node parameters: %w", err)
366 }
367 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
368 }
369
Leopoldacfad5b2023-01-15 14:05:25 +0100370 if options.PcapDump {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100371 qemuNetDump := qemu.QemuValue{
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200372 "id": {"net0"},
373 "netdev": {"net0"},
374 "file": {filepath.Join(r.ld, "net0.pcap")},
Leopoldacfad5b2023-01-15 14:05:25 +0100375 }
376 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
377 }
378
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200379 // Manufacture TPM if needed.
380 tpmd := filepath.Join(r.ld, "tpm")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000381 err := tpmFactory.Manufacture(ctx, tpmd, &TPMPlatform{
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200382 Manufacturer: "Monogon",
383 Version: "1.0",
384 Model: "TestCluster",
385 })
386 if err != nil {
387 return fmt.Errorf("could not manufacture TPM: %w", err)
388 }
389
Serge Bazanski66e58952021-10-05 17:06:56 +0200390 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200391 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200392
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000393 tpmEmuCmd := exec.CommandContext(tpmCtx, xSwtpmPath, "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanskib07c57a2024-06-04 14:33:27 +0000394 // Silence warnings from unsafe libtpms build (uses non-constant-time
395 // cryptographic operations).
396 tpmEmuCmd.Env = append(tpmEmuCmd.Env, "MONOGON_LIBTPMS_ACKNOWLEDGE_UNSAFE=yes")
Serge Bazanski66e58952021-10-05 17:06:56 +0200397 tpmEmuCmd.Stderr = os.Stderr
398 tpmEmuCmd.Stdout = os.Stdout
399
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100400 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200401 if err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200402 tpmCancel()
Serge Bazanski66e58952021-10-05 17:06:56 +0200403 return fmt.Errorf("failed to start TPM emulator: %w", err)
404 }
405
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200406 // Wait for the socket to be created by the TPM emulator before launching
407 // QEMU.
408 for {
409 _, err := os.Stat(tpmSocketPath)
410 if err == nil {
411 break
412 }
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200413 if !os.IsNotExist(err) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200414 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200415 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
416 }
417 if err := tpmCtx.Err(); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200418 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200419 return fmt.Errorf("while waiting for the TPM socket: %w", err)
420 }
421 time.Sleep(time.Millisecond * 100)
422 }
423
Serge Bazanski66e58952021-10-05 17:06:56 +0200424 // Start the main qemu binary
Tim Windelschmidt8f1efe92025-04-01 01:28:43 +0200425 systemCmd := exec.CommandContext(options.Runtime.ctxT, xQEMUPath, qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200426 if options.ConnectToSocket != nil {
427 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
428 }
429
430 var stdErrBuf bytes.Buffer
431 systemCmd.Stderr = &stdErrBuf
432 systemCmd.Stdout = options.SerialPort
433
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100434 qemu.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
Leopoldaf5086b2023-01-15 14:12:42 +0100435
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200436 go func() {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100437 logf("Node: Starting...")
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200438 err = systemCmd.Run()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100439 logf("Node: Returned: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200440
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200441 // Stop TPM emulator and wait for it to exit to properly reap the child process
442 tpmCancel()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100443 logf("Node: Waiting for TPM emulator to exit")
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200444 // Wait returns a SIGKILL error because we just cancelled its context.
445 // We still need to call it to avoid creating zombies.
446 errTpm := tpmEmuCmd.Wait()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100447 logf("Node: TPM emulator done: %v", errTpm)
Serge Bazanski66e58952021-10-05 17:06:56 +0200448
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200449 var exerr *exec.ExitError
450 if err != nil && errors.As(err, &exerr) {
451 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
452 if status.Signaled() && status.Signal() == syscall.SIGKILL {
453 // Process was killed externally (most likely by our context being canceled).
454 // This is a normal exit for us, so return nil
455 doneC <- nil
456 return
457 }
458 exerr.Stderr = stdErrBuf.Bytes()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100459 newErr := qemu.QEMUError(*exerr)
460 logf("Node: %q", stdErrBuf.String())
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200461 doneC <- &newErr
462 return
Serge Bazanski66e58952021-10-05 17:06:56 +0200463 }
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200464 doneC <- err
465 }()
466 return nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200467}
468
469func copyFile(src, dst string) error {
470 in, err := os.Open(src)
471 if err != nil {
472 return fmt.Errorf("when opening source: %w", err)
473 }
474 defer in.Close()
475
476 out, err := os.Create(dst)
477 if err != nil {
478 return fmt.Errorf("when creating destination: %w", err)
479 }
480 defer out.Close()
481
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100482 endPos, err := in.Seek(0, io.SeekEnd)
Serge Bazanski66e58952021-10-05 17:06:56 +0200483 if err != nil {
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100484 return fmt.Errorf("when getting source end: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200485 }
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100486
487 // Copy the file while preserving its sparseness. The image files are very
488 // sparse (less than 10% allocated), so this is a lot faster.
489 var lastHoleStart int64
490 for {
491 dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
492 if err != nil {
493 return fmt.Errorf("when seeking to next data block: %w", err)
494 }
495 holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
496 if err != nil {
497 return fmt.Errorf("when seeking to next hole: %w", err)
498 }
499 lastHoleStart = holeStart
500 if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
501 return fmt.Errorf("when seeking to current data block: %w", err)
502 }
503 if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
504 return fmt.Errorf("when seeking output to next data block: %w", err)
505 }
506 if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
507 return fmt.Errorf("when copying file: %w", err)
508 }
509 if endPos == holeStart {
510 // The next hole is at the end of the file, we're done here.
511 break
512 }
513 }
514
Serge Bazanski66e58952021-10-05 17:06:56 +0200515 return out.Close()
516}
517
Serge Bazanskie78a0892021-10-07 17:03:49 +0200518// getNodes wraps around Management.GetNodes to return a list of nodes in a
519// cluster.
520func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200521 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100522 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100523 err := backoff.Retry(func() error {
524 res = nil
525 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200526 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100527 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200528 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100529 for {
530 node, err := srvN.Recv()
531 if err == io.EOF {
532 break
533 }
534 if err != nil {
535 return fmt.Errorf("GetNodes.Recv: %w", err)
536 }
537 res = append(res, node)
538 }
539 return nil
540 }, bo)
541 if err != nil {
542 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200543 }
544 return res, nil
545}
546
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200547// getNode wraps Management.GetNodes. It returns node information matching
548// given node ID.
549func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
550 nodes, err := getNodes(ctx, mgmt)
551 if err != nil {
552 return nil, fmt.Errorf("could not get nodes: %w", err)
553 }
554 for _, n := range nodes {
Jan Schär39d9c242024-09-24 13:49:55 +0200555 if n.Id == id {
556 return n, nil
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200557 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200558 }
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200559 return nil, fmt.Errorf("no such node")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200560}
561
Serge Bazanski66e58952021-10-05 17:06:56 +0200562// Gets a random EUI-48 Ethernet MAC address
563func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
564 macBuf := make([]byte, 6)
565 _, err := rand.Read(macBuf)
566 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200567 return nil, fmt.Errorf("failed to read randomness for MAC: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200568 }
569
570 // Set U/L bit and clear I/G bit (locally administered individual MAC)
571 // Ref IEEE 802-2014 Section 8.2.2
572 macBuf[0] = (macBuf[0] | 2) & 0xfe
573 mac := net.HardwareAddr(macBuf)
574 return &mac, nil
575}
576
Serge Bazanskibe742842022-04-04 13:18:50 +0200577const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200578
Serge Bazanskibe742842022-04-04 13:18:50 +0200579// ClusterPorts contains all ports handled by Nanoswitch.
580var ClusterPorts = []uint16{
581 // Forwarded to the first node.
582 uint16(node.CuratorServicePort),
583 uint16(node.DebugServicePort),
584 uint16(node.KubernetesAPIPort),
585 uint16(node.KubernetesAPIWrappedPort),
586
587 // SOCKS proxy to the switch network
588 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200589}
590
591// ClusterOptions contains all options for launching a Metropolis cluster.
592type ClusterOptions struct {
593 // The number of nodes this cluster should be started with.
594 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100595
Jan Schära9b060b2024-08-07 10:42:29 +0200596 // Node are default options of all nodes.
597 Node NodeOptions
598
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100599 // If true, node logs will be saved to individual files instead of being printed
600 // out to stderr. The path of these files will be still printed to stdout.
601 //
602 // The files will be located within the launch directory inside TEST_TMPDIR (or
603 // the default tempdir location, if not set).
604 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200605
606 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
607 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
608 // incomplete.
609 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200610
611 // Optional local registry which will be made available to the cluster to
612 // pull images from. This is a more efficient alternative to preseeding all
613 // images used for testing.
Jan Schär9d2f3c62025-04-14 11:17:22 +0000614 LocalRegistry *registry.Server
Serge Bazanskie564f172024-04-03 12:06:06 +0200615
616 // InitialClusterConfiguration will be passed to the first node when creating the
617 // cluster, and defines some basic properties of the cluster. If not specified,
618 // the cluster will default to defaults as defined in
619 // metropolis.proto.api.NodeParameters.
620 InitialClusterConfiguration *cpb.ClusterConfiguration
Serge Bazanski66e58952021-10-05 17:06:56 +0200621}
622
623// Cluster is the running Metropolis cluster launched using the LaunchCluster
624// function.
625type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200626 // Owner is the TLS Certificate of the owner of the test cluster. This can be
627 // used to authenticate further clients to the running cluster.
628 Owner tls.Certificate
629 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200630 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100631 Ports qemu.PortMap
Serge Bazanski66e58952021-10-05 17:06:56 +0200632
Serge Bazanskibe742842022-04-04 13:18:50 +0200633 // Nodes is a map from Node ID to its runtime information.
634 Nodes map[string]*NodeInCluster
635 // NodeIDs is a list of node IDs that are backing this cluster, in order of
636 // creation.
637 NodeIDs []string
638
Serge Bazanski54e212a2023-06-14 13:45:11 +0200639 // CACertificate is the cluster's CA certificate.
640 CACertificate *x509.Certificate
641
Serge Bazanski66e58952021-10-05 17:06:56 +0200642 // nodesDone is a list of channels populated with the return codes from all the
643 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100644 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200645 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200646 // nodeOpts are the cluster member nodes' mutable launch options, kept here
647 // to facilitate reboots.
648 nodeOpts []NodeOptions
649 // launchDir points at the directory keeping the nodes' state, such as storage
650 // images, firmware variable files, TPM state.
651 launchDir string
652 // socketDir points at the directory keeping UNIX socket files, such as these
653 // used to facilitate communication between QEMU and swtpm. It's different
654 // from launchDir, and anchored nearer the file system root, due to the
655 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100656 socketDir string
657 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200658
Lorenz Brun276a7462023-07-12 21:28:54 +0200659 // SOCKSDialer is used by DialNode to establish connections to nodes via the
Serge Bazanskibe742842022-04-04 13:18:50 +0200660 // SOCKS server ran by nanoswitch.
Lorenz Brun276a7462023-07-12 21:28:54 +0200661 SOCKSDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200662
663 // authClient is a cached authenticated owner connection to a Curator
664 // instance within the cluster.
665 authClient *grpc.ClientConn
666
667 // ctxT is the context individual node contexts are created from.
668 ctxT context.Context
669 // ctxC is used by Close to cancel the context under which the nodes are
670 // running.
671 ctxC context.CancelFunc
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200672
673 tpmFactory *TPMFactory
Serge Bazanskibe742842022-04-04 13:18:50 +0200674}
675
676// NodeInCluster represents information about a node that's part of a Cluster.
677type NodeInCluster struct {
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100678 // ID of the node, which can be used to dial this node's services via
679 // NewNodeClient.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200680 ID string
681 Pubkey []byte
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100682 // Address of the node on the network ran by nanoswitch. Not reachable from
683 // the host unless dialed via NewNodeClient or via the nanoswitch SOCKS
684 // proxy (reachable on Cluster.Ports[SOCKSPort]).
Serge Bazanskibe742842022-04-04 13:18:50 +0200685 ManagementAddress string
686}
687
688// firstConnection performs the initial owner credential escrow with a newly
689// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
690// running at 10.1.0.2, which is always the case with the current nanoswitch
691// implementation.
692//
Leopold20a036e2023-01-15 00:17:19 +0100693// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200694// information as NodeInCluster.
695func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
696 // Dial external service.
697 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100698 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200699 if err != nil {
700 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
701 }
702 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
703 return socksDialer.Dial("tcp", addr)
704 }
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100705 initClient, err := grpc.NewClient(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
Serge Bazanskibe742842022-04-04 13:18:50 +0200706 if err != nil {
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100707 return nil, nil, fmt.Errorf("creating client with ephemeral credentials failed: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200708 }
709 defer initClient.Close()
710
711 // Retrieve owner certificate - this can take a while because the node is still
712 // coming up, so do it in a backoff loop.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100713 logf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200714 aaa := apb.NewAAAClient(initClient)
715 var cert *tls.Certificate
716 err = backoff.Retry(func() error {
717 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
718 if st, ok := status.FromError(err); ok {
719 if st.Code() == codes.Unavailable {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100720 logf("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200721 return err
722 }
723 }
724 return backoff.Permanent(err)
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200725 }, backoff.WithContext(backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(time.Minute)), ctx))
Serge Bazanskibe742842022-04-04 13:18:50 +0200726 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200727 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200728 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100729 logf("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200730
731 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200732 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100733 authClient, err := grpc.NewClient(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
Serge Bazanskibe742842022-04-04 13:18:50 +0200734 if err != nil {
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100735 return nil, nil, fmt.Errorf("creating client with owner credentials failed: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200736 }
737 defer authClient.Close()
738 mgmt := apb.NewManagementClient(authClient)
739
740 var node *NodeInCluster
741 err = backoff.Retry(func() error {
742 nodes, err := getNodes(ctx, mgmt)
743 if err != nil {
744 return fmt.Errorf("retrieving nodes failed: %w", err)
745 }
746 if len(nodes) != 1 {
747 return fmt.Errorf("expected one node, got %d", len(nodes))
748 }
749 n := nodes[0]
750 if n.Status == nil || n.Status.ExternalAddress == "" {
751 return fmt.Errorf("node has no status and/or address")
752 }
753 node = &NodeInCluster{
Jan Schär39d9c242024-09-24 13:49:55 +0200754 ID: n.Id,
Serge Bazanskibe742842022-04-04 13:18:50 +0200755 ManagementAddress: n.Status.ExternalAddress,
756 }
757 return nil
758 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
759 if err != nil {
760 return nil, nil, err
761 }
762
763 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200764}
765
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100766func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200767 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100768 if err != nil {
769 return nil, err
770 }
771 return f, nil
772}
773
Serge Bazanski66e58952021-10-05 17:06:56 +0200774// LaunchCluster launches a cluster of Metropolis node VMs together with a
775// Nanoswitch instance to network them all together.
776//
777// The given context will be used to run all qemu instances in the cluster, and
778// canceling the context or calling Close() will terminate them.
779func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200780 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200781 return nil, errors.New("refusing to start cluster with zero nodes")
782 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200783
Jan Schära9b060b2024-08-07 10:42:29 +0200784 // Prepare the node options. These will be kept as part of Cluster.
785 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
786 // launch. The runtime information can be later used to restart a node.
787 // The 0th node will be initialized first. The rest will follow after it
788 // had bootstrapped the cluster.
789 nodeOpts := make([]NodeOptions, opts.NumNodes)
790 for i := range opts.NumNodes {
791 nodeOpts[i] = opts.Node
792 nodeOpts[i].Name = fmt.Sprintf("node%d", i)
793 nodeOpts[i].SerialPort = newPrefixedStdio(i)
Tim Windelschmidtbd57c1d2025-07-16 15:57:55 +0200794 nodeOpts[i].RunVNC = true
Jan Schära9b060b2024-08-07 10:42:29 +0200795 }
796 nodeOpts[0].NodeParameters = &apb.NodeParameters{
797 Cluster: &apb.NodeParameters_ClusterBootstrap_{
798 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
799 OwnerPublicKey: InsecurePublicKey,
800 InitialClusterConfiguration: opts.InitialClusterConfiguration,
801 Labels: &cpb.NodeLabels{
802 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski20498dd2024-09-30 17:07:08 +0000803 {Key: NodeNumberKey, Value: "0"},
Jan Schära9b060b2024-08-07 10:42:29 +0200804 },
805 },
806 },
807 },
808 }
809 nodeOpts[0].PcapDump = true
810
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200811 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100812 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200813 if err != nil {
814 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
815 }
Tim Windelschmidt1fe3f942025-04-01 14:22:11 +0200816
817 nodeLogDir := ld
818 if os.Getenv("TEST_UNDECLARED_OUTPUTS_DIR") != "" {
819 nodeLogDir = os.Getenv("TEST_UNDECLARED_OUTPUTS_DIR")
820 }
821
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100822 // Create the metroctl config directory. We keep it in /tmp because in some
823 // scenarios it's end-user visible and we want it short.
824 md, err := os.MkdirTemp("/tmp", "metroctl-*")
825 if err != nil {
826 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
827 }
828
829 // Create the socket directory. We keep it in /tmp because of socket path limits.
830 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200831 if err != nil {
832 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
833 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200834
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200835 // Set up TPM factory.
836 tpmf, err := NewTPMFactory(filepath.Join(ld, "tpm"))
837 if err != nil {
838 return nil, fmt.Errorf("failed to create TPM factory: %w", err)
839 }
840
Serge Bazanski66e58952021-10-05 17:06:56 +0200841 // Prepare links between nodes and nanoswitch.
842 var switchPorts []*os.File
Jan Schära9b060b2024-08-07 10:42:29 +0200843 for i := range opts.NumNodes {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100844 switchPort, vmPort, err := qemu.NewSocketPair()
Serge Bazanski66e58952021-10-05 17:06:56 +0200845 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200846 return nil, fmt.Errorf("failed to get socketpair: %w", err)
847 }
848 switchPorts = append(switchPorts, switchPort)
Jan Schära9b060b2024-08-07 10:42:29 +0200849 nodeOpts[i].ConnectToSocket = vmPort
Serge Bazanski66e58952021-10-05 17:06:56 +0200850 }
851
Serge Bazanskie78a0892021-10-07 17:03:49 +0200852 // Make a list of channels that will be populated by all running node qemu
853 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200854 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200855 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200856 done[i] = make(chan error, 1)
857 }
858
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100859 if opts.NodeLogsToFiles {
Jan Schära9b060b2024-08-07 10:42:29 +0200860 for i := range opts.NumNodes {
Lorenz Brun4beaf4f2025-01-14 16:10:55 +0100861 path := path.Join(nodeLogDir, fmt.Sprintf("node-%d.txt", i))
Jan Schära9b060b2024-08-07 10:42:29 +0200862 port, err := NewSerialFileLogger(path)
863 if err != nil {
864 return nil, fmt.Errorf("could not open log file for node %d: %w", i, err)
865 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100866 logf("Node %d logs at %s", i, path)
Jan Schära9b060b2024-08-07 10:42:29 +0200867 nodeOpts[i].SerialPort = port
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100868 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100869 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200870
871 // Start the first node.
872 ctxT, ctxC := context.WithCancel(ctx)
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100873 logf("Cluster: Starting node %d...", 0)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200874 if err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[0], done[0]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200875 ctxC()
876 return nil, fmt.Errorf("failed to launch first node: %w", err)
877 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200878
Lorenz Brun150f24a2023-07-13 20:11:06 +0200879 localRegistryAddr := net.TCPAddr{
880 IP: net.IPv4(10, 42, 0, 82),
881 Port: 5000,
882 }
883
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100884 var guestSvcMap qemu.GuestServiceMap
Lorenz Brun150f24a2023-07-13 20:11:06 +0200885 if opts.LocalRegistry != nil {
886 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
887 if err != nil {
888 ctxC()
889 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
890 }
891 s := http.Server{
892 Handler: opts.LocalRegistry,
893 }
894 go s.Serve(l)
895 go func() {
896 <-ctxT.Done()
897 s.Close()
898 }()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100899 guestSvcMap = qemu.GuestServiceMap{
Lorenz Brun150f24a2023-07-13 20:11:06 +0200900 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
901 }
902 }
903
Serge Bazanskie78a0892021-10-07 17:03:49 +0200904 // Launch nanoswitch.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100905 portMap, err := qemu.ConflictFreePortMap(ClusterPorts)
Serge Bazanski66e58952021-10-05 17:06:56 +0200906 if err != nil {
907 ctxC()
908 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
909 }
910
911 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100912 var serialPort io.ReadWriter
Tim Windelschmidta5b00bd2024-12-09 22:52:31 +0100913 var err error
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100914 if opts.NodeLogsToFiles {
Tim Windelschmidt1fe3f942025-04-01 14:22:11 +0200915
916 loggerPath := path.Join(nodeLogDir, "nanoswitch.txt")
Tim Windelschmidta5b00bd2024-12-09 22:52:31 +0100917 serialPort, err = NewSerialFileLogger(loggerPath)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100918 if err != nil {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100919 logf("Could not open log file for nanoswitch: %v", err)
920 os.Exit(1)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100921 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100922 logf("Nanoswitch logs at %s", loggerPath)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100923 } else {
924 serialPort = newPrefixedStdio(99)
925 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100926 if err := qemu.RunMicroVM(ctxT, &qemu.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100927 Name: "nanoswitch",
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000928 KernelPath: xKernelPath,
929 InitramfsPath: xInitramfsPath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200930 ExtraNetworkInterfaces: switchPorts,
931 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200932 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100933 SerialPort: serialPort,
Tim Windelschmidt1fe3f942025-04-01 14:22:11 +0200934 PcapDump: path.Join(nodeLogDir, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200935 }); err != nil {
936 if !errors.Is(err, ctxT.Err()) {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100937 logf("Failed to launch nanoswitch: %v", err)
938 os.Exit(1)
Serge Bazanski66e58952021-10-05 17:06:56 +0200939 }
940 }
941 }()
942
Serge Bazanskibe742842022-04-04 13:18:50 +0200943 // Build SOCKS dialer.
944 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
945 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200946 if err != nil {
947 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200948 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200949 }
950
Serge Bazanskibe742842022-04-04 13:18:50 +0200951 // Retrieve owner credentials and first node.
952 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200953 if err != nil {
954 ctxC()
955 return nil, err
956 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200957
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100958 // Write credentials to the metroctl directory.
959 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
960 ctxC()
961 return nil, fmt.Errorf("could not write owner key: %w", err)
962 }
963 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
964 ctxC()
965 return nil, fmt.Errorf("could not write owner certificate: %w", err)
966 }
967
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100968 logf("Cluster: Node %d is %s", 0, firstNode.ID)
Serge Bazanski53458ba2024-06-18 09:56:46 +0000969
970 // Set up a partially initialized cluster instance, to be filled in the
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200971 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200972 cluster := &Cluster{
973 Owner: *cert,
974 Ports: portMap,
975 Nodes: map[string]*NodeInCluster{
976 firstNode.ID: firstNode,
977 },
978 NodeIDs: []string{
979 firstNode.ID,
980 },
981
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100982 nodesDone: done,
983 nodeOpts: nodeOpts,
984 launchDir: ld,
985 socketDir: sd,
986 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200987
Lorenz Brun276a7462023-07-12 21:28:54 +0200988 SOCKSDialer: socksDialer,
Serge Bazanskibe742842022-04-04 13:18:50 +0200989
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200990 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200991 ctxC: ctxC,
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200992
993 tpmFactory: tpmf,
Serge Bazanskibe742842022-04-04 13:18:50 +0200994 }
995
996 // Now start the rest of the nodes and register them into the cluster.
997
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200998 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200999 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +02001000 if err != nil {
1001 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001002 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001003 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001004 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001005
1006 // Retrieve register ticket to register further nodes.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001007 logf("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001008 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
1009 if err != nil {
1010 ctxC()
1011 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
1012 }
1013 ticket := resT.Ticket
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001014 logf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +02001015
1016 // Retrieve cluster info (for directory and ca public key) to register further
1017 // nodes.
1018 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
1019 if err != nil {
1020 ctxC()
1021 return nil, fmt.Errorf("GetClusterInfo: %w", err)
1022 }
Serge Bazanski54e212a2023-06-14 13:45:11 +02001023 caCert, err := x509.ParseCertificate(resI.CaCertificate)
1024 if err != nil {
1025 ctxC()
1026 return nil, fmt.Errorf("ParseCertificate: %w", err)
1027 }
1028 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +02001029
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001030 // Use the retrieved information to configure the rest of the node options.
1031 for i := 1; i < opts.NumNodes; i++ {
Jan Schära9b060b2024-08-07 10:42:29 +02001032 nodeOpts[i].NodeParameters = &apb.NodeParameters{
1033 Cluster: &apb.NodeParameters_ClusterRegister_{
1034 ClusterRegister: &apb.NodeParameters_ClusterRegister{
1035 RegisterTicket: ticket,
1036 ClusterDirectory: resI.ClusterDirectory,
1037 CaCertificate: resI.CaCertificate,
1038 Labels: &cpb.NodeLabels{
1039 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski20498dd2024-09-30 17:07:08 +00001040 {Key: NodeNumberKey, Value: fmt.Sprintf("%d", i)},
Serge Bazanski30e30b32024-05-22 14:11:56 +02001041 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001042 },
1043 },
1044 },
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001045 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001046 }
1047
1048 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001049 for i := 1; i < opts.NumNodes; i++ {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001050 logf("Cluster: Starting node %d...", i)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001051 err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[i], done[i])
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001052 if err != nil {
Jan Schär0b927652024-07-31 18:08:50 +02001053 return nil, fmt.Errorf("failed to launch node %d: %w", i, err)
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001054 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001055 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001056
Serge Bazanski53458ba2024-06-18 09:56:46 +00001057 // Wait for nodes to appear as NEW, populate a map from node number (index into
Jan Schära9b060b2024-08-07 10:42:29 +02001058 // nodeOpts, etc.) to Metropolis Node ID.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001059 seenNodes := make(map[string]bool)
Serge Bazanski53458ba2024-06-18 09:56:46 +00001060 nodeNumberToID := make(map[int]string)
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001061 logf("Cluster: waiting for nodes to appear as NEW...")
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001062 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001063 for {
1064 nodes, err := getNodes(ctx, mgmt)
1065 if err != nil {
1066 ctxC()
1067 return nil, fmt.Errorf("could not get nodes: %w", err)
1068 }
1069 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001070 if n.State != cpb.NodeState_NODE_STATE_NEW {
1071 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +02001072 }
Serge Bazanski87d9c592024-03-20 12:35:11 +01001073 if seenNodes[n.Id] {
1074 continue
1075 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001076 seenNodes[n.Id] = true
1077 cluster.Nodes[n.Id] = &NodeInCluster{
1078 ID: n.Id,
1079 Pubkey: n.Pubkey,
1080 }
Serge Bazanski53458ba2024-06-18 09:56:46 +00001081
Serge Bazanski20498dd2024-09-30 17:07:08 +00001082 num, err := strconv.Atoi(node.GetNodeLabel(n.Labels, NodeNumberKey))
Serge Bazanski53458ba2024-06-18 09:56:46 +00001083 if err != nil {
1084 return nil, fmt.Errorf("node %s has undecodable number label: %w", n.Id, err)
1085 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001086 logf("Cluster: Node %d is %s", num, n.Id)
Serge Bazanski53458ba2024-06-18 09:56:46 +00001087 nodeNumberToID[num] = n.Id
Serge Bazanskie78a0892021-10-07 17:03:49 +02001088 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001089
1090 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001091 break
1092 }
1093 time.Sleep(1 * time.Second)
1094 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001095 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001096 logf("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001097
Serge Bazanski53458ba2024-06-18 09:56:46 +00001098 // Build the rest of NodeIDs from map.
1099 for i := 1; i < opts.NumNodes; i++ {
1100 cluster.NodeIDs = append(cluster.NodeIDs, nodeNumberToID[i])
1101 }
1102
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001103 approvedNodes := make(map[string]bool)
1104 upNodes := make(map[string]bool)
1105 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001106 for {
1107 nodes, err := getNodes(ctx, mgmt)
1108 if err != nil {
1109 ctxC()
1110 return nil, fmt.Errorf("could not get nodes: %w", err)
1111 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001112 for _, node := range nodes {
1113 if !seenNodes[node.Id] {
1114 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001115 continue
1116 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001117
1118 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001119 logf("Cluster: node %s is up", node.Id)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001120 upNodes[node.Id] = true
1121 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +02001122 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001123 if upNodes[node.Id] {
1124 continue
Serge Bazanskibe742842022-04-04 13:18:50 +02001125 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001126
1127 if !approvedNodes[node.Id] {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001128 logf("Cluster: approving node %s", node.Id)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001129 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1130 Pubkey: node.Pubkey,
1131 })
1132 if err != nil {
1133 ctxC()
1134 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
1135 }
1136 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +02001137 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001138 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001139
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001140 logf("Cluster: want %d up nodes, have %d", opts.NumNodes, len(upNodes)+1)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001141 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001142 break
1143 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001144 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001145 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001146 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001147
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001148 logf("Cluster: all nodes up:")
Jan Schär0b927652024-07-31 18:08:50 +02001149 for i, nodeID := range cluster.NodeIDs {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001150 logf("Cluster: %d. %s at %s", i, nodeID, cluster.Nodes[nodeID].ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001151 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001152 logf("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001153
Serge Bazanskibe742842022-04-04 13:18:50 +02001154 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001155}
1156
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001157// RebootNode reboots the cluster member node matching the given index, and
1158// waits for it to rejoin the cluster. It will use the given context ctx to run
1159// cluster API requests, whereas the resulting QEMU process will be created
1160// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1161func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1162 if idx < 0 || idx >= len(c.NodeIDs) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001163 return fmt.Errorf("index out of bounds")
1164 }
1165 if c.nodeOpts[idx].Runtime == nil {
1166 return fmt.Errorf("node not running")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001167 }
1168 id := c.NodeIDs[idx]
1169
1170 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001171 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001172 if err != nil {
1173 return err
1174 }
1175 mgmt := apb.NewManagementClient(curC)
1176
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001177 // Cancel the node's context. This will shut down QEMU.
1178 c.nodeOpts[idx].Runtime.CtxC()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001179 logf("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001180 err = <-c.nodesDone[idx]
1181 if err != nil {
1182 return fmt.Errorf("while restarting node: %w", err)
1183 }
1184
1185 // Start QEMU again.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001186 logf("Cluster: restarting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001187 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001188 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1189 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001190
Serge Bazanskibc969572024-03-21 11:56:13 +01001191 start := time.Now()
1192
1193 // Poll Management.GetNodes until the node is healthy.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001194 for {
1195 cs, err := getNode(ctx, mgmt, id)
1196 if err != nil {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001197 logf("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001198 return err
1199 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001200 logf("Cluster: node health: %+v", cs.Health)
Serge Bazanskibc969572024-03-21 11:56:13 +01001201
1202 lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
Tim Windelschmidta10d0cb2025-01-13 14:44:15 +01001203 if lhb.After(start) && cs.Health == apb.Node_HEALTH_HEALTHY {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001204 break
1205 }
1206 time.Sleep(time.Second)
1207 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001208 logf("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001209 return nil
1210}
1211
Serge Bazanski500f6e02024-04-03 12:06:40 +02001212// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
1213// given by idx. If the node is already shut down, this is a no-op.
1214func (c *Cluster) ShutdownNode(idx int) error {
1215 if idx < 0 || idx >= len(c.NodeIDs) {
1216 return fmt.Errorf("index out of bounds")
1217 }
1218 // Return if node is already stopped.
1219 select {
1220 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1221 return nil
1222 default:
1223 }
1224 id := c.NodeIDs[idx]
1225
1226 // Cancel the node's context. This will shut down QEMU.
1227 c.nodeOpts[idx].Runtime.CtxC()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001228 logf("Cluster: waiting for node %d (%s) to stop.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001229 err := <-c.nodesDone[idx]
1230 if err != nil {
1231 return fmt.Errorf("while shutting down node: %w", err)
1232 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001233 logf("Cluster: node %d (%s) stopped.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001234 return nil
1235}
1236
1237// StartNode performs a power on of the node given by idx. If the node is already
1238// running, this is a no-op.
1239func (c *Cluster) StartNode(idx int) error {
1240 if idx < 0 || idx >= len(c.NodeIDs) {
1241 return fmt.Errorf("index out of bounds")
1242 }
1243 id := c.NodeIDs[idx]
1244 // Return if node is already running.
1245 select {
1246 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1247 default:
1248 return nil
1249 }
1250
1251 // Start QEMU again.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001252 logf("Cluster: starting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001253 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanski500f6e02024-04-03 12:06:40 +02001254 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1255 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001256 logf("Cluster: node %d (%s) started.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001257 return nil
1258}
1259
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001260// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001261// nodes to stop. It returns an error if stopping the nodes failed, or one of
1262// the nodes failed to fully start in the first place.
1263func (c *Cluster) Close() error {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001264 logf("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001265 if c.authClient != nil {
1266 c.authClient.Close()
1267 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001268 c.ctxC()
1269
Leopold20a036e2023-01-15 00:17:19 +01001270 var errs []error
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001271 logf("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001272 for _, c := range c.nodesDone {
1273 err := <-c
1274 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001275 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001276 }
1277 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001278 logf("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001279 os.RemoveAll(c.launchDir)
1280 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001281 os.RemoveAll(c.metroctlDir)
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001282 logf("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001283 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001284}
Serge Bazanskibe742842022-04-04 13:18:50 +02001285
1286// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1287// their ID. This is performed by connecting to the cluster nanoswitch via its
1288// SOCKS proxy, and using the cluster node list for name resolution.
1289//
1290// For example:
1291//
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +01001292// grpc.NewClient("passthrough:///metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001293func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1294 host, port, err := net.SplitHostPort(addr)
1295 if err != nil {
1296 return nil, fmt.Errorf("invalid host:port: %w", err)
1297 }
1298 // Already an IP address?
1299 if net.ParseIP(host) != nil {
Lorenz Brun276a7462023-07-12 21:28:54 +02001300 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001301 }
1302
1303 // Otherwise, expect a node name.
1304 node, ok := c.Nodes[host]
1305 if !ok {
1306 return nil, fmt.Errorf("unknown node %q", host)
1307 }
1308 addr = net.JoinHostPort(node.ManagementAddress, port)
Lorenz Brun276a7462023-07-12 21:28:54 +02001309 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001310}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001311
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001312// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1313// Kubernetes authenticating proxy using the cluster owner identity.
1314// It currently has access to everything (i.e. the cluster-admin role)
1315// via the owner-admin binding.
Lorenz Brun8f1254d2025-01-28 14:10:05 +01001316func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, *rest.Config, error) {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001317 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1318 if err != nil {
1319 // We explicitly pass an Ed25519 private key in, so this can't happen
1320 panic(err)
1321 }
1322
1323 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001324 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001325 Host: host,
1326 TLSClientConfig: rest.TLSClientConfig{
1327 // TODO(q3k): use CA certificate
1328 Insecure: true,
1329 ServerName: "kubernetes.default.svc",
1330 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1331 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1332 },
1333 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1334 return c.DialNode(ctx, address)
1335 },
1336 }
Lorenz Brun8f1254d2025-01-28 14:10:05 +01001337 clientSet, err := kubernetes.NewForConfig(&clientConfig)
1338 if err != nil {
1339 return nil, nil, err
1340 }
1341 return clientSet, &clientConfig, nil
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001342}
1343
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001344// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1345// which are currently Kubernetes controllers, ie. run an apiserver. This list
1346// might be empty if no node is currently configured with the
1347// 'KubernetesController' node.
1348func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1349 curC, err := c.CuratorClient()
1350 if err != nil {
1351 return nil, err
1352 }
1353 mgmt := apb.NewManagementClient(curC)
1354 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1355 Filter: "has(node.roles.kubernetes_controller)",
1356 })
1357 if err != nil {
1358 return nil, err
1359 }
1360 defer srv.CloseSend()
1361 var res []string
1362 for {
1363 n, err := srv.Recv()
1364 if err == io.EOF {
1365 break
1366 }
1367 if err != nil {
1368 return nil, err
1369 }
1370 if n.Status == nil || n.Status.ExternalAddress == "" {
1371 continue
1372 }
1373 res = append(res, n.Status.ExternalAddress)
1374 }
1375 return res, nil
1376}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001377
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001378// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1379// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001380func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1381 // Get an authenticated owner client within the cluster.
1382 curC, err := c.CuratorClient()
1383 if err != nil {
1384 return err
1385 }
1386 mgmt := apb.NewManagementClient(curC)
1387 nodes, err := getNodes(ctx, mgmt)
1388 if err != nil {
1389 return err
1390 }
1391
1392 var unhealthy []string
1393 for _, node := range nodes {
Tim Windelschmidta10d0cb2025-01-13 14:44:15 +01001394 if node.Health == apb.Node_HEALTH_HEALTHY {
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001395 continue
1396 }
1397 unhealthy = append(unhealthy, node.Id)
1398 }
1399 if len(unhealthy) == 0 {
1400 return nil
1401 }
1402 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1403}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001404
1405// ApproveNode approves a node by ID, waiting for it to become UP.
1406func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1407 curC, err := c.CuratorClient()
1408 if err != nil {
1409 return err
1410 }
1411 mgmt := apb.NewManagementClient(curC)
1412
1413 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1414 Pubkey: c.Nodes[id].Pubkey,
1415 })
1416 if err != nil {
1417 return fmt.Errorf("ApproveNode: %w", err)
1418 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001419 logf("Cluster: %s: approved, waiting for UP", id)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001420 for {
1421 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1422 if err != nil {
1423 return fmt.Errorf("GetNodes: %w", err)
1424 }
1425 found := false
1426 for {
1427 node, err := nodes.Recv()
1428 if errors.Is(err, io.EOF) {
1429 break
1430 }
1431 if err != nil {
1432 return fmt.Errorf("Nodes.Recv: %w", err)
1433 }
1434 if node.Id != id {
1435 continue
1436 }
1437 if node.State != cpb.NodeState_NODE_STATE_UP {
1438 continue
1439 }
1440 found = true
1441 break
1442 }
1443 nodes.CloseSend()
1444
1445 if found {
1446 break
1447 }
1448 time.Sleep(time.Second)
1449 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001450 logf("Cluster: %s: UP", id)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001451 return nil
1452}
1453
1454// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1455func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1456 curC, err := c.CuratorClient()
1457 if err != nil {
1458 return err
1459 }
1460 mgmt := apb.NewManagementClient(curC)
1461
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001462 logf("Cluster: %s: adding KubernetesWorker", id)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001463 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1464 Node: &apb.UpdateNodeRolesRequest_Id{
1465 Id: id,
1466 },
Jan Schärd1a8b642024-12-03 17:40:41 +01001467 KubernetesWorker: ptr.To(true),
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001468 })
1469 return err
1470}
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001471
Jan Schära9b060b2024-08-07 10:42:29 +02001472// MakeKubernetesController adds the KubernetesController role to a node by ID.
1473func (c *Cluster) MakeKubernetesController(ctx context.Context, id string) error {
1474 curC, err := c.CuratorClient()
1475 if err != nil {
1476 return err
1477 }
1478 mgmt := apb.NewManagementClient(curC)
1479
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001480 logf("Cluster: %s: adding KubernetesController", id)
Jan Schära9b060b2024-08-07 10:42:29 +02001481 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1482 Node: &apb.UpdateNodeRolesRequest_Id{
1483 Id: id,
1484 },
Jan Schärd1a8b642024-12-03 17:40:41 +01001485 KubernetesController: ptr.To(true),
Jan Schära9b060b2024-08-07 10:42:29 +02001486 })
1487 return err
1488}
1489
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001490// MakeConsensusMember adds the ConsensusMember role to a node by ID.
1491func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
1492 curC, err := c.CuratorClient()
1493 if err != nil {
1494 return err
1495 }
1496 mgmt := apb.NewManagementClient(curC)
1497 cur := ipb.NewCuratorClient(curC)
1498
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001499 logf("Cluster: %s: adding ConsensusMember", id)
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001500 bo := backoff.NewExponentialBackOff()
1501 bo.MaxElapsedTime = 10 * time.Second
1502
1503 backoff.Retry(func() error {
1504 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1505 Node: &apb.UpdateNodeRolesRequest_Id{
1506 Id: id,
1507 },
Jan Schärd1a8b642024-12-03 17:40:41 +01001508 ConsensusMember: ptr.To(true),
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001509 })
1510 if err != nil {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001511 logf("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001512 }
1513 return err
1514 }, backoff.WithContext(bo, ctx))
1515 if err != nil {
1516 return err
1517 }
1518
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001519 logf("Cluster: %s: waiting for learner/full members...", id)
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001520
1521 learner := false
1522 for {
1523 res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
1524 if err != nil {
1525 return fmt.Errorf("GetConsensusStatus: %w", err)
1526 }
1527 for _, member := range res.EtcdMember {
1528 if member.Id != id {
1529 continue
1530 }
1531 switch member.Status {
1532 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
1533 if !learner {
1534 learner = true
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001535 logf("Cluster: %s: became a learner, waiting for full member...", id)
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001536 }
1537 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001538 logf("Cluster: %s: became a full member", id)
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001539 return nil
1540 }
1541 }
1542 time.Sleep(100 * time.Millisecond)
1543 }
1544}