blob: 744c0ccda660c8c8f99cdfc5684d0f209eaad8e3 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanski66e58952021-10-05 17:06:56 +02004// cluster builds on the launch package and implements launching Metropolis
5// nodes and clusters in a virtualized environment using qemu. It's kept in a
6// separate package as it depends on a Metropolis node image, which might not be
7// required for some use of the launch library.
Tim Windelschmidt9f21f532024-05-07 15:14:20 +02008package launch
Serge Bazanski66e58952021-10-05 17:06:56 +02009
10import (
11 "bytes"
12 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010013 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020014 "crypto/rand"
15 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020016 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020017 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "errors"
19 "fmt"
20 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020021 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020022 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "os"
24 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010025 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "path/filepath"
Serge Bazanski53458ba2024-06-18 09:56:46 +000027 "strconv"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020028 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "syscall"
30 "time"
31
32 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020034 "golang.org/x/net/proxy"
Lorenz Brun87bbf7e2024-03-18 18:22:25 +010035 "golang.org/x/sys/unix"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010037 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020039 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020040 "k8s.io/client-go/kubernetes"
41 "k8s.io/client-go/rest"
Jan Schärd1a8b642024-12-03 17:40:41 +010042 "k8s.io/utils/ptr"
Serge Bazanski66e58952021-10-05 17:06:56 +020043
Serge Bazanski37cfcc12024-03-21 11:59:07 +010044 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020045 apb "source.monogon.dev/metropolis/proto/api"
46 cpb "source.monogon.dev/metropolis/proto/common"
47
Serge Bazanskica8d9512024-09-12 14:20:57 +020048 "source.monogon.dev/go/logging"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010049 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Jan Schäre19d2792025-06-23 12:37:58 +000050 "source.monogon.dev/metropolis/installer/install"
Serge Bazanski66e58952021-10-05 17:06:56 +020051 "source.monogon.dev/metropolis/node"
52 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020053 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Jan Schär3b0c8dd2025-06-23 10:32:07 +000054 "source.monogon.dev/osbase/blockdev"
Jan Schär3b0c8dd2025-06-23 10:32:07 +000055 "source.monogon.dev/osbase/oci"
Jan Schäre19d2792025-06-23 12:37:58 +000056 "source.monogon.dev/osbase/oci/osimage"
Jan Schär9d2f3c62025-04-14 11:17:22 +000057 "source.monogon.dev/osbase/oci/registry"
Jan Schär3b0c8dd2025-06-23 10:32:07 +000058 "source.monogon.dev/osbase/structfs"
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +010059 "source.monogon.dev/osbase/test/qemu"
Serge Bazanski66e58952021-10-05 17:06:56 +020060)
61
Serge Bazanski53458ba2024-06-18 09:56:46 +000062const (
Serge Bazanski20498dd2024-09-30 17:07:08 +000063 // NodeNumberKey is the key of the node label used to carry a node's numerical
Serge Bazanski53458ba2024-06-18 09:56:46 +000064 // index in the test system.
Serge Bazanski20498dd2024-09-30 17:07:08 +000065 NodeNumberKey string = "test-node-number"
Serge Bazanski53458ba2024-06-18 09:56:46 +000066)
67
Leopold20a036e2023-01-15 00:17:19 +010068// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020069type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010070 // Name is a human-readable identifier to be used in debug output.
71 Name string
72
Jan Schära9b060b2024-08-07 10:42:29 +020073 // CPUs is the number of virtual CPUs of the VM.
74 CPUs int
75
76 // ThreadsPerCPU is the number of threads per CPU. This is multiplied by
77 // CPUs to get the total number of threads.
78 ThreadsPerCPU int
79
80 // MemoryMiB is the RAM size in MiB of the VM.
81 MemoryMiB int
82
Jan Schär07003572024-08-26 10:42:16 +020083 // DiskBytes contains the size of the root disk in bytes or zero if the
84 // unmodified image size is used.
85 DiskBytes uint64
86
Serge Bazanski66e58952021-10-05 17:06:56 +020087 // Ports contains the port mapping where to expose the internal ports of the VM to
88 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
89 // ConnectToSocket is set.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +010090 Ports qemu.PortMap
Serge Bazanski66e58952021-10-05 17:06:56 +020091
Leopold20a036e2023-01-15 00:17:19 +010092 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
93 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020094 // want to test reboot behavior this should be false.
95 AllowReboot bool
96
Leopold20a036e2023-01-15 00:17:19 +010097 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
98 // set, it is instead connected to the given file descriptor/socket. If this is
99 // set, all port maps from the Ports option are ignored. Intended for networking
100 // this instance together with others for running more complex network
101 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +0200102 ConnectToSocket *os.File
103
Leopoldacfad5b2023-01-15 14:05:25 +0100104 // When PcapDump is set, all traffic is dumped to a pcap file in the
105 // runtime directory (e.g. "net0.pcap" for the first interface).
106 PcapDump bool
107
Leopold20a036e2023-01-15 00:17:19 +0100108 // SerialPort is an io.ReadWriter over which you can communicate with the serial
109 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +0200110 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
111 SerialPort io.ReadWriter
112
113 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
114 // registering into a cluster.
115 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200116
117 // Mac is the node's MAC address.
118 Mac *net.HardwareAddr
119
120 // Runtime keeps the node's QEMU runtime state.
121 Runtime *NodeRuntime
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200122
123 // RunVNC starts a VNC socket for troubleshooting/testing console code. Note:
124 // this will not work in tests, as those use a built-in qemu which does not
125 // implement a VGA device.
126 RunVNC bool
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200127}
128
Leopold20a036e2023-01-15 00:17:19 +0100129// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200130type NodeRuntime struct {
131 // ld points at the node's launch directory storing data such as storage
132 // images, firmware variables or the TPM state.
133 ld string
134 // sd points at the node's socket directory.
135 sd string
136
137 // ctxT is the context QEMU will execute in.
138 ctxT context.Context
139 // CtxC is the QEMU context's cancellation function.
140 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200141}
142
143// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200144var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200145 node.ConsensusPort,
146
147 node.CuratorServicePort,
148 node.DebugServicePort,
149
150 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100151 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200152 node.CuratorServicePort,
153 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200154 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200155}
156
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200157// setupRuntime creates the node's QEMU runtime directory, together with all
158// files required to preserve its state, a level below the chosen path ld. The
159// node's socket directory is similarily created a level below sd. It may
160// return an I/O error.
Jan Schär07003572024-08-26 10:42:16 +0200161func setupRuntime(ld, sd string, diskBytes uint64) (*NodeRuntime, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200162 // Create a temporary directory to keep all the runtime files.
163 stdp, err := os.MkdirTemp(ld, "node_state*")
164 if err != nil {
165 return nil, fmt.Errorf("failed to create the state directory: %w", err)
166 }
167
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000168 // Initialize the node's storage.
169 ociImage, err := oci.ReadLayout(xNodeImagePath)
Jan Schär07003572024-08-26 10:42:16 +0200170 if err != nil {
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000171 return nil, fmt.Errorf("failed to read OS image: %w", err)
Jan Schär07003572024-08-26 10:42:16 +0200172 }
Jan Schäre19d2792025-06-23 12:37:58 +0000173 osImage, err := osimage.Read(ociImage)
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000174 if err != nil {
175 return nil, fmt.Errorf("failed to read OS image: %w", err)
176 }
Jan Schär07003572024-08-26 10:42:16 +0200177
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000178 efiPayload, err := osImage.PayloadUnverified("kernel.efi")
179 if err != nil {
180 return nil, fmt.Errorf("cannot open EFI payload in OS image: %w", err)
181 }
182 systemImage, err := osImage.PayloadUnverified("system")
183 if err != nil {
184 return nil, fmt.Errorf("cannot open system image in OS image: %w", err)
185 }
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200186
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000187 abloader, err := structfs.OSPathBlob(xAbloaderPath)
188 if err != nil {
189 return nil, fmt.Errorf("cannot open abloader: %w", err)
190 }
191
192 di := filepath.Join(stdp, "image.img")
193 logf("Cluster: generating node image: %s -> %s", xNodeImagePath, di)
194
195 df, err := blockdev.CreateFile(di, 512, int64(diskBytes/512))
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200196 if err != nil {
197 return nil, fmt.Errorf("while opening image for writing: %w", err)
198 }
199 defer df.Close()
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000200
Jan Schäre19d2792025-06-23 12:37:58 +0000201 installParams := &install.Params{
202 PartitionSize: install.PartitionSizeInfo{
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000203 ESP: 128,
204 System: 1024,
205 Data: 128,
206 },
207 Architecture: osImage.Config.ProductInfo.Architecture(),
208 SystemImage: systemImage,
209 EFIPayload: efiPayload,
210 ABLoader: abloader,
211 Output: df,
212 }
Jan Schäre19d2792025-06-23 12:37:58 +0000213
214 if _, err := install.Write(installParams); err != nil {
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000215 return nil, fmt.Errorf("while creating node image: %w", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200216 }
217
218 // Initialize the OVMF firmware variables file.
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000219 dv := filepath.Join(stdp, filepath.Base(xOvmfVarsPath))
220 if err := copyFile(xOvmfVarsPath, dv); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200221 return nil, fmt.Errorf("while copying firmware variables: %w", err)
222 }
223
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200224 // Create the socket directory.
225 sotdp, err := os.MkdirTemp(sd, "node_sock*")
226 if err != nil {
227 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
228 }
229
230 return &NodeRuntime{
231 ld: stdp,
232 sd: sotdp,
233 }, nil
234}
235
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200236// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200237// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200238func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200239 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200240 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanskica8d9512024-09-12 14:20:57 +0200241 r := resolver.New(c.ctxT, resolver.WithLogger(logging.NewFunctionBackend(func(severity logging.Severity, msg string) {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100242 logf("Cluster: client resolver: %s: %s", severity, msg)
Serge Bazanskica8d9512024-09-12 14:20:57 +0200243 })))
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100244 for _, n := range c.Nodes {
245 r.AddEndpoint(resolver.NodeAtAddressWithDefaultPort(n.ManagementAddress))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200246 }
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100247 authClient, err := grpc.NewClient(resolver.MetropolisControlAddress,
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200248 grpc.WithTransportCredentials(authCreds),
249 grpc.WithResolvers(r),
250 grpc.WithContextDialer(c.DialNode),
251 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200252 if err != nil {
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100253 return nil, fmt.Errorf("creating client with owner credentials failed: %w", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200254 }
255 c.authClient = authClient
256 }
257 return c.authClient, nil
258}
259
Serge Bazanski66e58952021-10-05 17:06:56 +0200260// LaunchNode launches a single Metropolis node instance with the given options.
261// The instance runs mostly paravirtualized but with some emulated hardware
262// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200263// writable, and the changes are kept across reboots and shutdowns. ld and sd
264// point to the launch directory and the socket directory, holding the nodes'
265// state files (storage, tpm state, firmware state), and UNIX socket files
266// (swtpm <-> QEMU interplay) respectively. The directories must exist before
267// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
268// if either are not initialized.
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200269func LaunchNode(ctx context.Context, ld, sd string, tpmFactory *TPMFactory, options *NodeOptions, doneC chan error) error {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200270 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
271 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200272 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
273 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
274 // that we open and pass the name of it to QEMU. Not pinning this crashes both
275 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
276 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200277
Jan Schära9b060b2024-08-07 10:42:29 +0200278 if options.CPUs == 0 {
279 options.CPUs = 1
280 }
281 if options.ThreadsPerCPU == 0 {
282 options.ThreadsPerCPU = 1
283 }
284 if options.MemoryMiB == 0 {
285 options.MemoryMiB = 2048
286 }
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000287 if options.DiskBytes == 0 {
288 options.DiskBytes = 5 * 1024 * 1024 * 1024
289 }
Jan Schära9b060b2024-08-07 10:42:29 +0200290
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200291 // If it's the node's first start, set up its runtime directories.
292 if options.Runtime == nil {
Jan Schär07003572024-08-26 10:42:16 +0200293 r, err := setupRuntime(ld, sd, options.DiskBytes)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200294 if err != nil {
295 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200296 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200297 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200298 }
299
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200300 // Replace the node's context with a new one.
301 r := options.Runtime
302 if r.CtxC != nil {
303 r.CtxC()
304 }
305 r.ctxT, r.CtxC = context.WithCancel(ctx)
306
Serge Bazanski66e58952021-10-05 17:06:56 +0200307 var qemuNetType string
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100308 var qemuNetConfig qemu.QemuValue
Serge Bazanski66e58952021-10-05 17:06:56 +0200309 if options.ConnectToSocket != nil {
310 qemuNetType = "socket"
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100311 qemuNetConfig = qemu.QemuValue{
Serge Bazanski66e58952021-10-05 17:06:56 +0200312 "id": {"net0"},
313 "fd": {"3"},
314 }
315 } else {
316 qemuNetType = "user"
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100317 qemuNetConfig = qemu.QemuValue{
Serge Bazanski66e58952021-10-05 17:06:56 +0200318 "id": {"net0"},
319 "net": {"10.42.0.0/24"},
320 "dhcpstart": {"10.42.0.10"},
321 "hostfwd": options.Ports.ToQemuForwards(),
322 }
323 }
324
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200325 // Generate the node's MAC address if it isn't already set in NodeOptions.
326 if options.Mac == nil {
327 mac, err := generateRandomEthernetMAC()
328 if err != nil {
329 return err
330 }
331 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200332 }
333
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200334 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
335 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000336 storagePath := filepath.Join(r.ld, "image.img")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200337 qemuArgs := []string{
Jan Schära9b060b2024-08-07 10:42:29 +0200338 "-machine", "q35",
339 "-accel", "kvm",
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200340 "-display", "none",
Jan Schära9b060b2024-08-07 10:42:29 +0200341 "-nodefaults",
342 "-cpu", "host",
343 "-m", fmt.Sprintf("%dM", options.MemoryMiB),
344 "-smp", fmt.Sprintf("cores=%d,threads=%d", options.CPUs, options.ThreadsPerCPU),
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000345 "-drive", "if=pflash,format=raw,readonly=on,file=" + xOvmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200346 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
Jan Schär3b0c8dd2025-06-23 10:32:07 +0000347 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200348 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200349 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200350 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
351 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
352 "-device", "tpm-tis,tpmdev=tpm0",
353 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200354 "-serial", "stdio",
355 }
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200356 if options.RunVNC {
357 vncSocketPath := filepath.Join(r.sd, "vnc-socket")
358 qemuArgs = append(qemuArgs,
359 "-vnc", "unix:"+vncSocketPath,
360 "-device", "virtio-vga",
361 )
362 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200363
364 if !options.AllowReboot {
365 qemuArgs = append(qemuArgs, "-no-reboot")
366 }
367
368 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200369 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200370 parametersRaw, err := proto.Marshal(options.NodeParameters)
371 if err != nil {
372 return fmt.Errorf("failed to encode node paraeters: %w", err)
373 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200374 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200375 return fmt.Errorf("failed to write node parameters: %w", err)
376 }
377 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
378 }
379
Leopoldacfad5b2023-01-15 14:05:25 +0100380 if options.PcapDump {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100381 qemuNetDump := qemu.QemuValue{
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200382 "id": {"net0"},
383 "netdev": {"net0"},
384 "file": {filepath.Join(r.ld, "net0.pcap")},
Leopoldacfad5b2023-01-15 14:05:25 +0100385 }
386 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
387 }
388
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200389 // Manufacture TPM if needed.
390 tpmd := filepath.Join(r.ld, "tpm")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000391 err := tpmFactory.Manufacture(ctx, tpmd, &TPMPlatform{
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200392 Manufacturer: "Monogon",
393 Version: "1.0",
394 Model: "TestCluster",
395 })
396 if err != nil {
397 return fmt.Errorf("could not manufacture TPM: %w", err)
398 }
399
Serge Bazanski66e58952021-10-05 17:06:56 +0200400 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200401 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200402
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000403 tpmEmuCmd := exec.CommandContext(tpmCtx, xSwtpmPath, "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanskib07c57a2024-06-04 14:33:27 +0000404 // Silence warnings from unsafe libtpms build (uses non-constant-time
405 // cryptographic operations).
406 tpmEmuCmd.Env = append(tpmEmuCmd.Env, "MONOGON_LIBTPMS_ACKNOWLEDGE_UNSAFE=yes")
Serge Bazanski66e58952021-10-05 17:06:56 +0200407 tpmEmuCmd.Stderr = os.Stderr
408 tpmEmuCmd.Stdout = os.Stdout
409
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100410 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200411 if err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200412 tpmCancel()
Serge Bazanski66e58952021-10-05 17:06:56 +0200413 return fmt.Errorf("failed to start TPM emulator: %w", err)
414 }
415
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200416 // Wait for the socket to be created by the TPM emulator before launching
417 // QEMU.
418 for {
419 _, err := os.Stat(tpmSocketPath)
420 if err == nil {
421 break
422 }
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200423 if !os.IsNotExist(err) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200424 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200425 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
426 }
427 if err := tpmCtx.Err(); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200428 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200429 return fmt.Errorf("while waiting for the TPM socket: %w", err)
430 }
431 time.Sleep(time.Millisecond * 100)
432 }
433
Serge Bazanski66e58952021-10-05 17:06:56 +0200434 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200435 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200436 if options.ConnectToSocket != nil {
437 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
438 }
439
440 var stdErrBuf bytes.Buffer
441 systemCmd.Stderr = &stdErrBuf
442 systemCmd.Stdout = options.SerialPort
443
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100444 qemu.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
Leopoldaf5086b2023-01-15 14:12:42 +0100445
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200446 go func() {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100447 logf("Node: Starting...")
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200448 err = systemCmd.Run()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100449 logf("Node: Returned: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200450
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200451 // Stop TPM emulator and wait for it to exit to properly reap the child process
452 tpmCancel()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100453 logf("Node: Waiting for TPM emulator to exit")
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200454 // Wait returns a SIGKILL error because we just cancelled its context.
455 // We still need to call it to avoid creating zombies.
456 errTpm := tpmEmuCmd.Wait()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100457 logf("Node: TPM emulator done: %v", errTpm)
Serge Bazanski66e58952021-10-05 17:06:56 +0200458
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200459 var exerr *exec.ExitError
460 if err != nil && errors.As(err, &exerr) {
461 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
462 if status.Signaled() && status.Signal() == syscall.SIGKILL {
463 // Process was killed externally (most likely by our context being canceled).
464 // This is a normal exit for us, so return nil
465 doneC <- nil
466 return
467 }
468 exerr.Stderr = stdErrBuf.Bytes()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100469 newErr := qemu.QEMUError(*exerr)
470 logf("Node: %q", stdErrBuf.String())
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200471 doneC <- &newErr
472 return
Serge Bazanski66e58952021-10-05 17:06:56 +0200473 }
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200474 doneC <- err
475 }()
476 return nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200477}
478
479func copyFile(src, dst string) error {
480 in, err := os.Open(src)
481 if err != nil {
482 return fmt.Errorf("when opening source: %w", err)
483 }
484 defer in.Close()
485
486 out, err := os.Create(dst)
487 if err != nil {
488 return fmt.Errorf("when creating destination: %w", err)
489 }
490 defer out.Close()
491
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100492 endPos, err := in.Seek(0, io.SeekEnd)
Serge Bazanski66e58952021-10-05 17:06:56 +0200493 if err != nil {
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100494 return fmt.Errorf("when getting source end: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200495 }
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100496
497 // Copy the file while preserving its sparseness. The image files are very
498 // sparse (less than 10% allocated), so this is a lot faster.
499 var lastHoleStart int64
500 for {
501 dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
502 if err != nil {
503 return fmt.Errorf("when seeking to next data block: %w", err)
504 }
505 holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
506 if err != nil {
507 return fmt.Errorf("when seeking to next hole: %w", err)
508 }
509 lastHoleStart = holeStart
510 if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
511 return fmt.Errorf("when seeking to current data block: %w", err)
512 }
513 if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
514 return fmt.Errorf("when seeking output to next data block: %w", err)
515 }
516 if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
517 return fmt.Errorf("when copying file: %w", err)
518 }
519 if endPos == holeStart {
520 // The next hole is at the end of the file, we're done here.
521 break
522 }
523 }
524
Serge Bazanski66e58952021-10-05 17:06:56 +0200525 return out.Close()
526}
527
Serge Bazanskie78a0892021-10-07 17:03:49 +0200528// getNodes wraps around Management.GetNodes to return a list of nodes in a
529// cluster.
530func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200531 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100532 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100533 err := backoff.Retry(func() error {
534 res = nil
535 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200536 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100537 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200538 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100539 for {
540 node, err := srvN.Recv()
541 if err == io.EOF {
542 break
543 }
544 if err != nil {
545 return fmt.Errorf("GetNodes.Recv: %w", err)
546 }
547 res = append(res, node)
548 }
549 return nil
550 }, bo)
551 if err != nil {
552 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200553 }
554 return res, nil
555}
556
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200557// getNode wraps Management.GetNodes. It returns node information matching
558// given node ID.
559func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
560 nodes, err := getNodes(ctx, mgmt)
561 if err != nil {
562 return nil, fmt.Errorf("could not get nodes: %w", err)
563 }
564 for _, n := range nodes {
Jan Schär39d9c242024-09-24 13:49:55 +0200565 if n.Id == id {
566 return n, nil
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200567 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200568 }
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200569 return nil, fmt.Errorf("no such node")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200570}
571
Serge Bazanski66e58952021-10-05 17:06:56 +0200572// Gets a random EUI-48 Ethernet MAC address
573func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
574 macBuf := make([]byte, 6)
575 _, err := rand.Read(macBuf)
576 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200577 return nil, fmt.Errorf("failed to read randomness for MAC: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200578 }
579
580 // Set U/L bit and clear I/G bit (locally administered individual MAC)
581 // Ref IEEE 802-2014 Section 8.2.2
582 macBuf[0] = (macBuf[0] | 2) & 0xfe
583 mac := net.HardwareAddr(macBuf)
584 return &mac, nil
585}
586
Serge Bazanskibe742842022-04-04 13:18:50 +0200587const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200588
Serge Bazanskibe742842022-04-04 13:18:50 +0200589// ClusterPorts contains all ports handled by Nanoswitch.
590var ClusterPorts = []uint16{
591 // Forwarded to the first node.
592 uint16(node.CuratorServicePort),
593 uint16(node.DebugServicePort),
594 uint16(node.KubernetesAPIPort),
595 uint16(node.KubernetesAPIWrappedPort),
596
597 // SOCKS proxy to the switch network
598 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200599}
600
601// ClusterOptions contains all options for launching a Metropolis cluster.
602type ClusterOptions struct {
603 // The number of nodes this cluster should be started with.
604 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100605
Jan Schära9b060b2024-08-07 10:42:29 +0200606 // Node are default options of all nodes.
607 Node NodeOptions
608
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100609 // If true, node logs will be saved to individual files instead of being printed
610 // out to stderr. The path of these files will be still printed to stdout.
611 //
612 // The files will be located within the launch directory inside TEST_TMPDIR (or
613 // the default tempdir location, if not set).
614 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200615
616 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
617 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
618 // incomplete.
619 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200620
621 // Optional local registry which will be made available to the cluster to
622 // pull images from. This is a more efficient alternative to preseeding all
623 // images used for testing.
Jan Schär9d2f3c62025-04-14 11:17:22 +0000624 LocalRegistry *registry.Server
Serge Bazanskie564f172024-04-03 12:06:06 +0200625
626 // InitialClusterConfiguration will be passed to the first node when creating the
627 // cluster, and defines some basic properties of the cluster. If not specified,
628 // the cluster will default to defaults as defined in
629 // metropolis.proto.api.NodeParameters.
630 InitialClusterConfiguration *cpb.ClusterConfiguration
Serge Bazanski66e58952021-10-05 17:06:56 +0200631}
632
633// Cluster is the running Metropolis cluster launched using the LaunchCluster
634// function.
635type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200636 // Owner is the TLS Certificate of the owner of the test cluster. This can be
637 // used to authenticate further clients to the running cluster.
638 Owner tls.Certificate
639 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200640 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100641 Ports qemu.PortMap
Serge Bazanski66e58952021-10-05 17:06:56 +0200642
Serge Bazanskibe742842022-04-04 13:18:50 +0200643 // Nodes is a map from Node ID to its runtime information.
644 Nodes map[string]*NodeInCluster
645 // NodeIDs is a list of node IDs that are backing this cluster, in order of
646 // creation.
647 NodeIDs []string
648
Serge Bazanski54e212a2023-06-14 13:45:11 +0200649 // CACertificate is the cluster's CA certificate.
650 CACertificate *x509.Certificate
651
Serge Bazanski66e58952021-10-05 17:06:56 +0200652 // nodesDone is a list of channels populated with the return codes from all the
653 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100654 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200655 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200656 // nodeOpts are the cluster member nodes' mutable launch options, kept here
657 // to facilitate reboots.
658 nodeOpts []NodeOptions
659 // launchDir points at the directory keeping the nodes' state, such as storage
660 // images, firmware variable files, TPM state.
661 launchDir string
662 // socketDir points at the directory keeping UNIX socket files, such as these
663 // used to facilitate communication between QEMU and swtpm. It's different
664 // from launchDir, and anchored nearer the file system root, due to the
665 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100666 socketDir string
667 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200668
Lorenz Brun276a7462023-07-12 21:28:54 +0200669 // SOCKSDialer is used by DialNode to establish connections to nodes via the
Serge Bazanskibe742842022-04-04 13:18:50 +0200670 // SOCKS server ran by nanoswitch.
Lorenz Brun276a7462023-07-12 21:28:54 +0200671 SOCKSDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200672
673 // authClient is a cached authenticated owner connection to a Curator
674 // instance within the cluster.
675 authClient *grpc.ClientConn
676
677 // ctxT is the context individual node contexts are created from.
678 ctxT context.Context
679 // ctxC is used by Close to cancel the context under which the nodes are
680 // running.
681 ctxC context.CancelFunc
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200682
683 tpmFactory *TPMFactory
Serge Bazanskibe742842022-04-04 13:18:50 +0200684}
685
686// NodeInCluster represents information about a node that's part of a Cluster.
687type NodeInCluster struct {
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100688 // ID of the node, which can be used to dial this node's services via
689 // NewNodeClient.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200690 ID string
691 Pubkey []byte
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100692 // Address of the node on the network ran by nanoswitch. Not reachable from
693 // the host unless dialed via NewNodeClient or via the nanoswitch SOCKS
694 // proxy (reachable on Cluster.Ports[SOCKSPort]).
Serge Bazanskibe742842022-04-04 13:18:50 +0200695 ManagementAddress string
696}
697
698// firstConnection performs the initial owner credential escrow with a newly
699// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
700// running at 10.1.0.2, which is always the case with the current nanoswitch
701// implementation.
702//
Leopold20a036e2023-01-15 00:17:19 +0100703// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200704// information as NodeInCluster.
705func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
706 // Dial external service.
707 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100708 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200709 if err != nil {
710 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
711 }
712 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
713 return socksDialer.Dial("tcp", addr)
714 }
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100715 initClient, err := grpc.NewClient(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
Serge Bazanskibe742842022-04-04 13:18:50 +0200716 if err != nil {
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100717 return nil, nil, fmt.Errorf("creating client with ephemeral credentials failed: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200718 }
719 defer initClient.Close()
720
721 // Retrieve owner certificate - this can take a while because the node is still
722 // coming up, so do it in a backoff loop.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100723 logf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200724 aaa := apb.NewAAAClient(initClient)
725 var cert *tls.Certificate
726 err = backoff.Retry(func() error {
727 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
728 if st, ok := status.FromError(err); ok {
729 if st.Code() == codes.Unavailable {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100730 logf("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200731 return err
732 }
733 }
734 return backoff.Permanent(err)
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200735 }, backoff.WithContext(backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(time.Minute)), ctx))
Serge Bazanskibe742842022-04-04 13:18:50 +0200736 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200737 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200738 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100739 logf("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200740
741 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200742 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100743 authClient, err := grpc.NewClient(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
Serge Bazanskibe742842022-04-04 13:18:50 +0200744 if err != nil {
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +0100745 return nil, nil, fmt.Errorf("creating client with owner credentials failed: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200746 }
747 defer authClient.Close()
748 mgmt := apb.NewManagementClient(authClient)
749
750 var node *NodeInCluster
751 err = backoff.Retry(func() error {
752 nodes, err := getNodes(ctx, mgmt)
753 if err != nil {
754 return fmt.Errorf("retrieving nodes failed: %w", err)
755 }
756 if len(nodes) != 1 {
757 return fmt.Errorf("expected one node, got %d", len(nodes))
758 }
759 n := nodes[0]
760 if n.Status == nil || n.Status.ExternalAddress == "" {
761 return fmt.Errorf("node has no status and/or address")
762 }
763 node = &NodeInCluster{
Jan Schär39d9c242024-09-24 13:49:55 +0200764 ID: n.Id,
Serge Bazanskibe742842022-04-04 13:18:50 +0200765 ManagementAddress: n.Status.ExternalAddress,
766 }
767 return nil
768 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
769 if err != nil {
770 return nil, nil, err
771 }
772
773 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200774}
775
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100776func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200777 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100778 if err != nil {
779 return nil, err
780 }
781 return f, nil
782}
783
Serge Bazanski66e58952021-10-05 17:06:56 +0200784// LaunchCluster launches a cluster of Metropolis node VMs together with a
785// Nanoswitch instance to network them all together.
786//
787// The given context will be used to run all qemu instances in the cluster, and
788// canceling the context or calling Close() will terminate them.
789func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200790 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200791 return nil, errors.New("refusing to start cluster with zero nodes")
792 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200793
Jan Schära9b060b2024-08-07 10:42:29 +0200794 // Prepare the node options. These will be kept as part of Cluster.
795 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
796 // launch. The runtime information can be later used to restart a node.
797 // The 0th node will be initialized first. The rest will follow after it
798 // had bootstrapped the cluster.
799 nodeOpts := make([]NodeOptions, opts.NumNodes)
800 for i := range opts.NumNodes {
801 nodeOpts[i] = opts.Node
802 nodeOpts[i].Name = fmt.Sprintf("node%d", i)
803 nodeOpts[i].SerialPort = newPrefixedStdio(i)
804 }
805 nodeOpts[0].NodeParameters = &apb.NodeParameters{
806 Cluster: &apb.NodeParameters_ClusterBootstrap_{
807 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
808 OwnerPublicKey: InsecurePublicKey,
809 InitialClusterConfiguration: opts.InitialClusterConfiguration,
810 Labels: &cpb.NodeLabels{
811 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski20498dd2024-09-30 17:07:08 +0000812 {Key: NodeNumberKey, Value: "0"},
Jan Schära9b060b2024-08-07 10:42:29 +0200813 },
814 },
815 },
816 },
817 }
818 nodeOpts[0].PcapDump = true
819
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200820 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100821 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200822 if err != nil {
823 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
824 }
Tim Windelschmidt1fe3f942025-04-01 14:22:11 +0200825
826 nodeLogDir := ld
827 if os.Getenv("TEST_UNDECLARED_OUTPUTS_DIR") != "" {
828 nodeLogDir = os.Getenv("TEST_UNDECLARED_OUTPUTS_DIR")
829 }
830
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100831 // Create the metroctl config directory. We keep it in /tmp because in some
832 // scenarios it's end-user visible and we want it short.
833 md, err := os.MkdirTemp("/tmp", "metroctl-*")
834 if err != nil {
835 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
836 }
837
838 // Create the socket directory. We keep it in /tmp because of socket path limits.
839 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200840 if err != nil {
841 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
842 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200843
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200844 // Set up TPM factory.
845 tpmf, err := NewTPMFactory(filepath.Join(ld, "tpm"))
846 if err != nil {
847 return nil, fmt.Errorf("failed to create TPM factory: %w", err)
848 }
849
Serge Bazanski66e58952021-10-05 17:06:56 +0200850 // Prepare links between nodes and nanoswitch.
851 var switchPorts []*os.File
Jan Schära9b060b2024-08-07 10:42:29 +0200852 for i := range opts.NumNodes {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100853 switchPort, vmPort, err := qemu.NewSocketPair()
Serge Bazanski66e58952021-10-05 17:06:56 +0200854 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200855 return nil, fmt.Errorf("failed to get socketpair: %w", err)
856 }
857 switchPorts = append(switchPorts, switchPort)
Jan Schära9b060b2024-08-07 10:42:29 +0200858 nodeOpts[i].ConnectToSocket = vmPort
Serge Bazanski66e58952021-10-05 17:06:56 +0200859 }
860
Serge Bazanskie78a0892021-10-07 17:03:49 +0200861 // Make a list of channels that will be populated by all running node qemu
862 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200863 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200864 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200865 done[i] = make(chan error, 1)
866 }
867
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100868 if opts.NodeLogsToFiles {
Jan Schära9b060b2024-08-07 10:42:29 +0200869 for i := range opts.NumNodes {
Lorenz Brun4beaf4f2025-01-14 16:10:55 +0100870 path := path.Join(nodeLogDir, fmt.Sprintf("node-%d.txt", i))
Jan Schära9b060b2024-08-07 10:42:29 +0200871 port, err := NewSerialFileLogger(path)
872 if err != nil {
873 return nil, fmt.Errorf("could not open log file for node %d: %w", i, err)
874 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100875 logf("Node %d logs at %s", i, path)
Jan Schära9b060b2024-08-07 10:42:29 +0200876 nodeOpts[i].SerialPort = port
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100877 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100878 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200879
880 // Start the first node.
881 ctxT, ctxC := context.WithCancel(ctx)
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100882 logf("Cluster: Starting node %d...", 0)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200883 if err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[0], done[0]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200884 ctxC()
885 return nil, fmt.Errorf("failed to launch first node: %w", err)
886 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200887
Lorenz Brun150f24a2023-07-13 20:11:06 +0200888 localRegistryAddr := net.TCPAddr{
889 IP: net.IPv4(10, 42, 0, 82),
890 Port: 5000,
891 }
892
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100893 var guestSvcMap qemu.GuestServiceMap
Lorenz Brun150f24a2023-07-13 20:11:06 +0200894 if opts.LocalRegistry != nil {
895 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
896 if err != nil {
897 ctxC()
898 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
899 }
900 s := http.Server{
901 Handler: opts.LocalRegistry,
902 }
903 go s.Serve(l)
904 go func() {
905 <-ctxT.Done()
906 s.Close()
907 }()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100908 guestSvcMap = qemu.GuestServiceMap{
Lorenz Brun150f24a2023-07-13 20:11:06 +0200909 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
910 }
911 }
912
Serge Bazanskie78a0892021-10-07 17:03:49 +0200913 // Launch nanoswitch.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100914 portMap, err := qemu.ConflictFreePortMap(ClusterPorts)
Serge Bazanski66e58952021-10-05 17:06:56 +0200915 if err != nil {
916 ctxC()
917 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
918 }
919
920 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100921 var serialPort io.ReadWriter
Tim Windelschmidta5b00bd2024-12-09 22:52:31 +0100922 var err error
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100923 if opts.NodeLogsToFiles {
Tim Windelschmidt1fe3f942025-04-01 14:22:11 +0200924
925 loggerPath := path.Join(nodeLogDir, "nanoswitch.txt")
Tim Windelschmidta5b00bd2024-12-09 22:52:31 +0100926 serialPort, err = NewSerialFileLogger(loggerPath)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100927 if err != nil {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100928 logf("Could not open log file for nanoswitch: %v", err)
929 os.Exit(1)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100930 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100931 logf("Nanoswitch logs at %s", loggerPath)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100932 } else {
933 serialPort = newPrefixedStdio(99)
934 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100935 if err := qemu.RunMicroVM(ctxT, &qemu.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100936 Name: "nanoswitch",
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000937 KernelPath: xKernelPath,
938 InitramfsPath: xInitramfsPath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200939 ExtraNetworkInterfaces: switchPorts,
940 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200941 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100942 SerialPort: serialPort,
Tim Windelschmidt1fe3f942025-04-01 14:22:11 +0200943 PcapDump: path.Join(nodeLogDir, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200944 }); err != nil {
945 if !errors.Is(err, ctxT.Err()) {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100946 logf("Failed to launch nanoswitch: %v", err)
947 os.Exit(1)
Serge Bazanski66e58952021-10-05 17:06:56 +0200948 }
949 }
950 }()
951
Serge Bazanskibe742842022-04-04 13:18:50 +0200952 // Build SOCKS dialer.
953 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
954 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200955 if err != nil {
956 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200957 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200958 }
959
Serge Bazanskibe742842022-04-04 13:18:50 +0200960 // Retrieve owner credentials and first node.
961 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200962 if err != nil {
963 ctxC()
964 return nil, err
965 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200966
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100967 // Write credentials to the metroctl directory.
968 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
969 ctxC()
970 return nil, fmt.Errorf("could not write owner key: %w", err)
971 }
972 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
973 ctxC()
974 return nil, fmt.Errorf("could not write owner certificate: %w", err)
975 }
976
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +0100977 logf("Cluster: Node %d is %s", 0, firstNode.ID)
Serge Bazanski53458ba2024-06-18 09:56:46 +0000978
979 // Set up a partially initialized cluster instance, to be filled in the
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200980 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200981 cluster := &Cluster{
982 Owner: *cert,
983 Ports: portMap,
984 Nodes: map[string]*NodeInCluster{
985 firstNode.ID: firstNode,
986 },
987 NodeIDs: []string{
988 firstNode.ID,
989 },
990
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100991 nodesDone: done,
992 nodeOpts: nodeOpts,
993 launchDir: ld,
994 socketDir: sd,
995 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200996
Lorenz Brun276a7462023-07-12 21:28:54 +0200997 SOCKSDialer: socksDialer,
Serge Bazanskibe742842022-04-04 13:18:50 +0200998
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200999 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +02001000 ctxC: ctxC,
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001001
1002 tpmFactory: tpmf,
Serge Bazanskibe742842022-04-04 13:18:50 +02001003 }
1004
1005 // Now start the rest of the nodes and register them into the cluster.
1006
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001007 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001008 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +02001009 if err != nil {
1010 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001011 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001012 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001013 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001014
1015 // Retrieve register ticket to register further nodes.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001016 logf("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001017 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
1018 if err != nil {
1019 ctxC()
1020 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
1021 }
1022 ticket := resT.Ticket
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001023 logf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +02001024
1025 // Retrieve cluster info (for directory and ca public key) to register further
1026 // nodes.
1027 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
1028 if err != nil {
1029 ctxC()
1030 return nil, fmt.Errorf("GetClusterInfo: %w", err)
1031 }
Serge Bazanski54e212a2023-06-14 13:45:11 +02001032 caCert, err := x509.ParseCertificate(resI.CaCertificate)
1033 if err != nil {
1034 ctxC()
1035 return nil, fmt.Errorf("ParseCertificate: %w", err)
1036 }
1037 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +02001038
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001039 // Use the retrieved information to configure the rest of the node options.
1040 for i := 1; i < opts.NumNodes; i++ {
Jan Schära9b060b2024-08-07 10:42:29 +02001041 nodeOpts[i].NodeParameters = &apb.NodeParameters{
1042 Cluster: &apb.NodeParameters_ClusterRegister_{
1043 ClusterRegister: &apb.NodeParameters_ClusterRegister{
1044 RegisterTicket: ticket,
1045 ClusterDirectory: resI.ClusterDirectory,
1046 CaCertificate: resI.CaCertificate,
1047 Labels: &cpb.NodeLabels{
1048 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski20498dd2024-09-30 17:07:08 +00001049 {Key: NodeNumberKey, Value: fmt.Sprintf("%d", i)},
Serge Bazanski30e30b32024-05-22 14:11:56 +02001050 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001051 },
1052 },
1053 },
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001054 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001055 }
1056
1057 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001058 for i := 1; i < opts.NumNodes; i++ {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001059 logf("Cluster: Starting node %d...", i)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001060 err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[i], done[i])
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001061 if err != nil {
Jan Schär0b927652024-07-31 18:08:50 +02001062 return nil, fmt.Errorf("failed to launch node %d: %w", i, err)
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001063 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001064 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001065
Serge Bazanski53458ba2024-06-18 09:56:46 +00001066 // Wait for nodes to appear as NEW, populate a map from node number (index into
Jan Schära9b060b2024-08-07 10:42:29 +02001067 // nodeOpts, etc.) to Metropolis Node ID.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001068 seenNodes := make(map[string]bool)
Serge Bazanski53458ba2024-06-18 09:56:46 +00001069 nodeNumberToID := make(map[int]string)
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001070 logf("Cluster: waiting for nodes to appear as NEW...")
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001071 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001072 for {
1073 nodes, err := getNodes(ctx, mgmt)
1074 if err != nil {
1075 ctxC()
1076 return nil, fmt.Errorf("could not get nodes: %w", err)
1077 }
1078 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001079 if n.State != cpb.NodeState_NODE_STATE_NEW {
1080 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +02001081 }
Serge Bazanski87d9c592024-03-20 12:35:11 +01001082 if seenNodes[n.Id] {
1083 continue
1084 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001085 seenNodes[n.Id] = true
1086 cluster.Nodes[n.Id] = &NodeInCluster{
1087 ID: n.Id,
1088 Pubkey: n.Pubkey,
1089 }
Serge Bazanski53458ba2024-06-18 09:56:46 +00001090
Serge Bazanski20498dd2024-09-30 17:07:08 +00001091 num, err := strconv.Atoi(node.GetNodeLabel(n.Labels, NodeNumberKey))
Serge Bazanski53458ba2024-06-18 09:56:46 +00001092 if err != nil {
1093 return nil, fmt.Errorf("node %s has undecodable number label: %w", n.Id, err)
1094 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001095 logf("Cluster: Node %d is %s", num, n.Id)
Serge Bazanski53458ba2024-06-18 09:56:46 +00001096 nodeNumberToID[num] = n.Id
Serge Bazanskie78a0892021-10-07 17:03:49 +02001097 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001098
1099 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001100 break
1101 }
1102 time.Sleep(1 * time.Second)
1103 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001104 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001105 logf("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001106
Serge Bazanski53458ba2024-06-18 09:56:46 +00001107 // Build the rest of NodeIDs from map.
1108 for i := 1; i < opts.NumNodes; i++ {
1109 cluster.NodeIDs = append(cluster.NodeIDs, nodeNumberToID[i])
1110 }
1111
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001112 approvedNodes := make(map[string]bool)
1113 upNodes := make(map[string]bool)
1114 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001115 for {
1116 nodes, err := getNodes(ctx, mgmt)
1117 if err != nil {
1118 ctxC()
1119 return nil, fmt.Errorf("could not get nodes: %w", err)
1120 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001121 for _, node := range nodes {
1122 if !seenNodes[node.Id] {
1123 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001124 continue
1125 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001126
1127 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001128 logf("Cluster: node %s is up", node.Id)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001129 upNodes[node.Id] = true
1130 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +02001131 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001132 if upNodes[node.Id] {
1133 continue
Serge Bazanskibe742842022-04-04 13:18:50 +02001134 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001135
1136 if !approvedNodes[node.Id] {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001137 logf("Cluster: approving node %s", node.Id)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001138 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1139 Pubkey: node.Pubkey,
1140 })
1141 if err != nil {
1142 ctxC()
1143 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
1144 }
1145 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +02001146 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001147 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001148
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001149 logf("Cluster: want %d up nodes, have %d", opts.NumNodes, len(upNodes)+1)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001150 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001151 break
1152 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001153 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001154 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001155 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001156
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001157 logf("Cluster: all nodes up:")
Jan Schär0b927652024-07-31 18:08:50 +02001158 for i, nodeID := range cluster.NodeIDs {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001159 logf("Cluster: %d. %s at %s", i, nodeID, cluster.Nodes[nodeID].ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001160 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001161 logf("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001162
Serge Bazanskibe742842022-04-04 13:18:50 +02001163 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001164}
1165
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001166// RebootNode reboots the cluster member node matching the given index, and
1167// waits for it to rejoin the cluster. It will use the given context ctx to run
1168// cluster API requests, whereas the resulting QEMU process will be created
1169// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1170func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1171 if idx < 0 || idx >= len(c.NodeIDs) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001172 return fmt.Errorf("index out of bounds")
1173 }
1174 if c.nodeOpts[idx].Runtime == nil {
1175 return fmt.Errorf("node not running")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001176 }
1177 id := c.NodeIDs[idx]
1178
1179 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001180 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001181 if err != nil {
1182 return err
1183 }
1184 mgmt := apb.NewManagementClient(curC)
1185
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001186 // Cancel the node's context. This will shut down QEMU.
1187 c.nodeOpts[idx].Runtime.CtxC()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001188 logf("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001189 err = <-c.nodesDone[idx]
1190 if err != nil {
1191 return fmt.Errorf("while restarting node: %w", err)
1192 }
1193
1194 // Start QEMU again.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001195 logf("Cluster: restarting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001196 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001197 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1198 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001199
Serge Bazanskibc969572024-03-21 11:56:13 +01001200 start := time.Now()
1201
1202 // Poll Management.GetNodes until the node is healthy.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001203 for {
1204 cs, err := getNode(ctx, mgmt, id)
1205 if err != nil {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001206 logf("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001207 return err
1208 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001209 logf("Cluster: node health: %+v", cs.Health)
Serge Bazanskibc969572024-03-21 11:56:13 +01001210
1211 lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
Tim Windelschmidta10d0cb2025-01-13 14:44:15 +01001212 if lhb.After(start) && cs.Health == apb.Node_HEALTH_HEALTHY {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001213 break
1214 }
1215 time.Sleep(time.Second)
1216 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001217 logf("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001218 return nil
1219}
1220
Serge Bazanski500f6e02024-04-03 12:06:40 +02001221// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
1222// given by idx. If the node is already shut down, this is a no-op.
1223func (c *Cluster) ShutdownNode(idx int) error {
1224 if idx < 0 || idx >= len(c.NodeIDs) {
1225 return fmt.Errorf("index out of bounds")
1226 }
1227 // Return if node is already stopped.
1228 select {
1229 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1230 return nil
1231 default:
1232 }
1233 id := c.NodeIDs[idx]
1234
1235 // Cancel the node's context. This will shut down QEMU.
1236 c.nodeOpts[idx].Runtime.CtxC()
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001237 logf("Cluster: waiting for node %d (%s) to stop.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001238 err := <-c.nodesDone[idx]
1239 if err != nil {
1240 return fmt.Errorf("while shutting down node: %w", err)
1241 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001242 logf("Cluster: node %d (%s) stopped.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001243 return nil
1244}
1245
1246// StartNode performs a power on of the node given by idx. If the node is already
1247// running, this is a no-op.
1248func (c *Cluster) StartNode(idx int) error {
1249 if idx < 0 || idx >= len(c.NodeIDs) {
1250 return fmt.Errorf("index out of bounds")
1251 }
1252 id := c.NodeIDs[idx]
1253 // Return if node is already running.
1254 select {
1255 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1256 default:
1257 return nil
1258 }
1259
1260 // Start QEMU again.
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001261 logf("Cluster: starting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001262 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanski500f6e02024-04-03 12:06:40 +02001263 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1264 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001265 logf("Cluster: node %d (%s) started.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001266 return nil
1267}
1268
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001269// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001270// nodes to stop. It returns an error if stopping the nodes failed, or one of
1271// the nodes failed to fully start in the first place.
1272func (c *Cluster) Close() error {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001273 logf("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001274 if c.authClient != nil {
1275 c.authClient.Close()
1276 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001277 c.ctxC()
1278
Leopold20a036e2023-01-15 00:17:19 +01001279 var errs []error
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001280 logf("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001281 for _, c := range c.nodesDone {
1282 err := <-c
1283 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001284 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001285 }
1286 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001287 logf("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001288 os.RemoveAll(c.launchDir)
1289 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001290 os.RemoveAll(c.metroctlDir)
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001291 logf("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001292 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001293}
Serge Bazanskibe742842022-04-04 13:18:50 +02001294
1295// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1296// their ID. This is performed by connecting to the cluster nanoswitch via its
1297// SOCKS proxy, and using the cluster node list for name resolution.
1298//
1299// For example:
1300//
Tim Windelschmidt9bd9bd42025-02-14 17:08:52 +01001301// grpc.NewClient("passthrough:///metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001302func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1303 host, port, err := net.SplitHostPort(addr)
1304 if err != nil {
1305 return nil, fmt.Errorf("invalid host:port: %w", err)
1306 }
1307 // Already an IP address?
1308 if net.ParseIP(host) != nil {
Lorenz Brun276a7462023-07-12 21:28:54 +02001309 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001310 }
1311
1312 // Otherwise, expect a node name.
1313 node, ok := c.Nodes[host]
1314 if !ok {
1315 return nil, fmt.Errorf("unknown node %q", host)
1316 }
1317 addr = net.JoinHostPort(node.ManagementAddress, port)
Lorenz Brun276a7462023-07-12 21:28:54 +02001318 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001319}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001320
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001321// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1322// Kubernetes authenticating proxy using the cluster owner identity.
1323// It currently has access to everything (i.e. the cluster-admin role)
1324// via the owner-admin binding.
Lorenz Brun8f1254d2025-01-28 14:10:05 +01001325func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, *rest.Config, error) {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001326 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1327 if err != nil {
1328 // We explicitly pass an Ed25519 private key in, so this can't happen
1329 panic(err)
1330 }
1331
1332 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001333 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001334 Host: host,
1335 TLSClientConfig: rest.TLSClientConfig{
1336 // TODO(q3k): use CA certificate
1337 Insecure: true,
1338 ServerName: "kubernetes.default.svc",
1339 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1340 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1341 },
1342 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1343 return c.DialNode(ctx, address)
1344 },
1345 }
Lorenz Brun8f1254d2025-01-28 14:10:05 +01001346 clientSet, err := kubernetes.NewForConfig(&clientConfig)
1347 if err != nil {
1348 return nil, nil, err
1349 }
1350 return clientSet, &clientConfig, nil
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001351}
1352
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001353// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1354// which are currently Kubernetes controllers, ie. run an apiserver. This list
1355// might be empty if no node is currently configured with the
1356// 'KubernetesController' node.
1357func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1358 curC, err := c.CuratorClient()
1359 if err != nil {
1360 return nil, err
1361 }
1362 mgmt := apb.NewManagementClient(curC)
1363 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1364 Filter: "has(node.roles.kubernetes_controller)",
1365 })
1366 if err != nil {
1367 return nil, err
1368 }
1369 defer srv.CloseSend()
1370 var res []string
1371 for {
1372 n, err := srv.Recv()
1373 if err == io.EOF {
1374 break
1375 }
1376 if err != nil {
1377 return nil, err
1378 }
1379 if n.Status == nil || n.Status.ExternalAddress == "" {
1380 continue
1381 }
1382 res = append(res, n.Status.ExternalAddress)
1383 }
1384 return res, nil
1385}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001386
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001387// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1388// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001389func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1390 // Get an authenticated owner client within the cluster.
1391 curC, err := c.CuratorClient()
1392 if err != nil {
1393 return err
1394 }
1395 mgmt := apb.NewManagementClient(curC)
1396 nodes, err := getNodes(ctx, mgmt)
1397 if err != nil {
1398 return err
1399 }
1400
1401 var unhealthy []string
1402 for _, node := range nodes {
Tim Windelschmidta10d0cb2025-01-13 14:44:15 +01001403 if node.Health == apb.Node_HEALTH_HEALTHY {
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001404 continue
1405 }
1406 unhealthy = append(unhealthy, node.Id)
1407 }
1408 if len(unhealthy) == 0 {
1409 return nil
1410 }
1411 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1412}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001413
1414// ApproveNode approves a node by ID, waiting for it to become UP.
1415func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1416 curC, err := c.CuratorClient()
1417 if err != nil {
1418 return err
1419 }
1420 mgmt := apb.NewManagementClient(curC)
1421
1422 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1423 Pubkey: c.Nodes[id].Pubkey,
1424 })
1425 if err != nil {
1426 return fmt.Errorf("ApproveNode: %w", err)
1427 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001428 logf("Cluster: %s: approved, waiting for UP", id)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001429 for {
1430 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1431 if err != nil {
1432 return fmt.Errorf("GetNodes: %w", err)
1433 }
1434 found := false
1435 for {
1436 node, err := nodes.Recv()
1437 if errors.Is(err, io.EOF) {
1438 break
1439 }
1440 if err != nil {
1441 return fmt.Errorf("Nodes.Recv: %w", err)
1442 }
1443 if node.Id != id {
1444 continue
1445 }
1446 if node.State != cpb.NodeState_NODE_STATE_UP {
1447 continue
1448 }
1449 found = true
1450 break
1451 }
1452 nodes.CloseSend()
1453
1454 if found {
1455 break
1456 }
1457 time.Sleep(time.Second)
1458 }
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001459 logf("Cluster: %s: UP", id)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001460 return nil
1461}
1462
1463// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1464func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1465 curC, err := c.CuratorClient()
1466 if err != nil {
1467 return err
1468 }
1469 mgmt := apb.NewManagementClient(curC)
1470
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001471 logf("Cluster: %s: adding KubernetesWorker", id)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001472 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1473 Node: &apb.UpdateNodeRolesRequest_Id{
1474 Id: id,
1475 },
Jan Schärd1a8b642024-12-03 17:40:41 +01001476 KubernetesWorker: ptr.To(true),
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001477 })
1478 return err
1479}
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001480
Jan Schära9b060b2024-08-07 10:42:29 +02001481// MakeKubernetesController adds the KubernetesController role to a node by ID.
1482func (c *Cluster) MakeKubernetesController(ctx context.Context, id string) error {
1483 curC, err := c.CuratorClient()
1484 if err != nil {
1485 return err
1486 }
1487 mgmt := apb.NewManagementClient(curC)
1488
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001489 logf("Cluster: %s: adding KubernetesController", id)
Jan Schära9b060b2024-08-07 10:42:29 +02001490 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1491 Node: &apb.UpdateNodeRolesRequest_Id{
1492 Id: id,
1493 },
Jan Schärd1a8b642024-12-03 17:40:41 +01001494 KubernetesController: ptr.To(true),
Jan Schära9b060b2024-08-07 10:42:29 +02001495 })
1496 return err
1497}
1498
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001499// MakeConsensusMember adds the ConsensusMember role to a node by ID.
1500func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
1501 curC, err := c.CuratorClient()
1502 if err != nil {
1503 return err
1504 }
1505 mgmt := apb.NewManagementClient(curC)
1506 cur := ipb.NewCuratorClient(curC)
1507
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001508 logf("Cluster: %s: adding ConsensusMember", id)
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001509 bo := backoff.NewExponentialBackOff()
1510 bo.MaxElapsedTime = 10 * time.Second
1511
1512 backoff.Retry(func() error {
1513 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1514 Node: &apb.UpdateNodeRolesRequest_Id{
1515 Id: id,
1516 },
Jan Schärd1a8b642024-12-03 17:40:41 +01001517 ConsensusMember: ptr.To(true),
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001518 })
1519 if err != nil {
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001520 logf("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001521 }
1522 return err
1523 }, backoff.WithContext(bo, ctx))
1524 if err != nil {
1525 return err
1526 }
1527
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001528 logf("Cluster: %s: waiting for learner/full members...", id)
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001529
1530 learner := false
1531 for {
1532 res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
1533 if err != nil {
1534 return fmt.Errorf("GetConsensusStatus: %w", err)
1535 }
1536 for _, member := range res.EtcdMember {
1537 if member.Id != id {
1538 continue
1539 }
1540 switch member.Status {
1541 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
1542 if !learner {
1543 learner = true
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001544 logf("Cluster: %s: became a learner, waiting for full member...", id)
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001545 }
1546 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
Tim Windelschmidtd0cdb572025-03-27 17:18:39 +01001547 logf("Cluster: %s: became a full member", id)
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001548 return nil
1549 }
1550 }
1551 time.Sleep(100 * time.Millisecond)
1552 }
1553}