blob: 107cd55a33d9508a7cce4829aff30ad572b50437 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
Tim Windelschmidt9f21f532024-05-07 15:14:20 +02005package launch
Serge Bazanski66e58952021-10-05 17:06:56 +02006
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020019 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "os"
21 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010022 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "path/filepath"
Serge Bazanski53458ba2024-06-18 09:56:46 +000024 "strconv"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020025 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "syscall"
27 "time"
28
29 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020031 "golang.org/x/net/proxy"
Lorenz Brun87bbf7e2024-03-18 18:22:25 +010032 "golang.org/x/sys/unix"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010034 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020037 "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020039
Serge Bazanski37cfcc12024-03-21 11:59:07 +010040 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020041 apb "source.monogon.dev/metropolis/proto/api"
42 cpb "source.monogon.dev/metropolis/proto/common"
43
Serge Bazanskica8d9512024-09-12 14:20:57 +020044 "source.monogon.dev/go/logging"
Serge Bazanskidd5b03c2024-05-16 18:07:06 +020045 "source.monogon.dev/go/qcow2"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010046 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Serge Bazanski66e58952021-10-05 17:06:56 +020047 "source.monogon.dev/metropolis/node"
48 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020049 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020050 "source.monogon.dev/metropolis/test/localregistry"
51 "source.monogon.dev/osbase/test/launch"
Serge Bazanski66e58952021-10-05 17:06:56 +020052)
53
Serge Bazanski53458ba2024-06-18 09:56:46 +000054const (
Serge Bazanski20498dd2024-09-30 17:07:08 +000055 // NodeNumberKey is the key of the node label used to carry a node's numerical
Serge Bazanski53458ba2024-06-18 09:56:46 +000056 // index in the test system.
Serge Bazanski20498dd2024-09-30 17:07:08 +000057 NodeNumberKey string = "test-node-number"
Serge Bazanski53458ba2024-06-18 09:56:46 +000058)
59
Leopold20a036e2023-01-15 00:17:19 +010060// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020061type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010062 // Name is a human-readable identifier to be used in debug output.
63 Name string
64
Jan Schära9b060b2024-08-07 10:42:29 +020065 // CPUs is the number of virtual CPUs of the VM.
66 CPUs int
67
68 // ThreadsPerCPU is the number of threads per CPU. This is multiplied by
69 // CPUs to get the total number of threads.
70 ThreadsPerCPU int
71
72 // MemoryMiB is the RAM size in MiB of the VM.
73 MemoryMiB int
74
Jan Schär07003572024-08-26 10:42:16 +020075 // DiskBytes contains the size of the root disk in bytes or zero if the
76 // unmodified image size is used.
77 DiskBytes uint64
78
Serge Bazanski66e58952021-10-05 17:06:56 +020079 // Ports contains the port mapping where to expose the internal ports of the VM to
80 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
81 // ConnectToSocket is set.
82 Ports launch.PortMap
83
Leopold20a036e2023-01-15 00:17:19 +010084 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
85 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020086 // want to test reboot behavior this should be false.
87 AllowReboot bool
88
Leopold20a036e2023-01-15 00:17:19 +010089 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
90 // set, it is instead connected to the given file descriptor/socket. If this is
91 // set, all port maps from the Ports option are ignored. Intended for networking
92 // this instance together with others for running more complex network
93 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020094 ConnectToSocket *os.File
95
Leopoldacfad5b2023-01-15 14:05:25 +010096 // When PcapDump is set, all traffic is dumped to a pcap file in the
97 // runtime directory (e.g. "net0.pcap" for the first interface).
98 PcapDump bool
99
Leopold20a036e2023-01-15 00:17:19 +0100100 // SerialPort is an io.ReadWriter over which you can communicate with the serial
101 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +0200102 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
103 SerialPort io.ReadWriter
104
105 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
106 // registering into a cluster.
107 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200108
109 // Mac is the node's MAC address.
110 Mac *net.HardwareAddr
111
112 // Runtime keeps the node's QEMU runtime state.
113 Runtime *NodeRuntime
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200114
115 // RunVNC starts a VNC socket for troubleshooting/testing console code. Note:
116 // this will not work in tests, as those use a built-in qemu which does not
117 // implement a VGA device.
118 RunVNC bool
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200119}
120
Leopold20a036e2023-01-15 00:17:19 +0100121// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200122type NodeRuntime struct {
123 // ld points at the node's launch directory storing data such as storage
124 // images, firmware variables or the TPM state.
125 ld string
126 // sd points at the node's socket directory.
127 sd string
128
129 // ctxT is the context QEMU will execute in.
130 ctxT context.Context
131 // CtxC is the QEMU context's cancellation function.
132 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200133}
134
135// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200136var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200137 node.ConsensusPort,
138
139 node.CuratorServicePort,
140 node.DebugServicePort,
141
142 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100143 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200144 node.CuratorServicePort,
145 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200146 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200147}
148
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200149// setupRuntime creates the node's QEMU runtime directory, together with all
150// files required to preserve its state, a level below the chosen path ld. The
151// node's socket directory is similarily created a level below sd. It may
152// return an I/O error.
Jan Schär07003572024-08-26 10:42:16 +0200153func setupRuntime(ld, sd string, diskBytes uint64) (*NodeRuntime, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200154 // Create a temporary directory to keep all the runtime files.
155 stdp, err := os.MkdirTemp(ld, "node_state*")
156 if err != nil {
157 return nil, fmt.Errorf("failed to create the state directory: %w", err)
158 }
159
160 // Initialize the node's storage with a prebuilt image.
Jan Schär07003572024-08-26 10:42:16 +0200161 st, err := os.Stat(xNodeImagePath)
162 if err != nil {
163 return nil, fmt.Errorf("cannot read image file: %w", err)
164 }
165 diskBytes = max(diskBytes, uint64(st.Size()))
166
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200167 di := filepath.Join(stdp, "image.qcow2")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000168 launch.Log("Cluster: generating node QCOW2 snapshot image: %s -> %s", xNodeImagePath, di)
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200169
170 df, err := os.Create(di)
171 if err != nil {
172 return nil, fmt.Errorf("while opening image for writing: %w", err)
173 }
174 defer df.Close()
Jan Schär07003572024-08-26 10:42:16 +0200175 if err := qcow2.Generate(df, qcow2.GenerateWithBackingFile(xNodeImagePath), qcow2.GenerateWithFileSize(diskBytes)); err != nil {
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200176 return nil, fmt.Errorf("while creating copy-on-write node image: %w", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200177 }
178
179 // Initialize the OVMF firmware variables file.
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000180 dv := filepath.Join(stdp, filepath.Base(xOvmfVarsPath))
181 if err := copyFile(xOvmfVarsPath, dv); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200182 return nil, fmt.Errorf("while copying firmware variables: %w", err)
183 }
184
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200185 // Create the socket directory.
186 sotdp, err := os.MkdirTemp(sd, "node_sock*")
187 if err != nil {
188 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
189 }
190
191 return &NodeRuntime{
192 ld: stdp,
193 sd: sotdp,
194 }, nil
195}
196
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200197// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200198// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200199func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200200 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200201 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanskica8d9512024-09-12 14:20:57 +0200202 r := resolver.New(c.ctxT, resolver.WithLogger(logging.NewFunctionBackend(func(severity logging.Severity, msg string) {
203 launch.Log("Cluster: client resolver: %s: %s", severity, msg)
204 })))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200205 for _, n := range c.NodeIDs {
206 ep, err := resolver.NodeWithDefaultPort(n)
207 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200208 return nil, fmt.Errorf("could not add node %q by DNS: %w", n, err)
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200209 }
210 r.AddEndpoint(ep)
211 }
212 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
213 grpc.WithTransportCredentials(authCreds),
214 grpc.WithResolvers(r),
215 grpc.WithContextDialer(c.DialNode),
216 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200217 if err != nil {
218 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
219 }
220 c.authClient = authClient
221 }
222 return c.authClient, nil
223}
224
Serge Bazanski66e58952021-10-05 17:06:56 +0200225// LaunchNode launches a single Metropolis node instance with the given options.
226// The instance runs mostly paravirtualized but with some emulated hardware
227// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200228// writable, and the changes are kept across reboots and shutdowns. ld and sd
229// point to the launch directory and the socket directory, holding the nodes'
230// state files (storage, tpm state, firmware state), and UNIX socket files
231// (swtpm <-> QEMU interplay) respectively. The directories must exist before
232// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
233// if either are not initialized.
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200234func LaunchNode(ctx context.Context, ld, sd string, tpmFactory *TPMFactory, options *NodeOptions, doneC chan error) error {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200235 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
236 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200237 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
238 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
239 // that we open and pass the name of it to QEMU. Not pinning this crashes both
240 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
241 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200242
Jan Schära9b060b2024-08-07 10:42:29 +0200243 if options.CPUs == 0 {
244 options.CPUs = 1
245 }
246 if options.ThreadsPerCPU == 0 {
247 options.ThreadsPerCPU = 1
248 }
249 if options.MemoryMiB == 0 {
250 options.MemoryMiB = 2048
251 }
252
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200253 // If it's the node's first start, set up its runtime directories.
254 if options.Runtime == nil {
Jan Schär07003572024-08-26 10:42:16 +0200255 r, err := setupRuntime(ld, sd, options.DiskBytes)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200256 if err != nil {
257 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200258 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200259 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200260 }
261
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200262 // Replace the node's context with a new one.
263 r := options.Runtime
264 if r.CtxC != nil {
265 r.CtxC()
266 }
267 r.ctxT, r.CtxC = context.WithCancel(ctx)
268
Serge Bazanski66e58952021-10-05 17:06:56 +0200269 var qemuNetType string
270 var qemuNetConfig launch.QemuValue
271 if options.ConnectToSocket != nil {
272 qemuNetType = "socket"
273 qemuNetConfig = launch.QemuValue{
274 "id": {"net0"},
275 "fd": {"3"},
276 }
277 } else {
278 qemuNetType = "user"
279 qemuNetConfig = launch.QemuValue{
280 "id": {"net0"},
281 "net": {"10.42.0.0/24"},
282 "dhcpstart": {"10.42.0.10"},
283 "hostfwd": options.Ports.ToQemuForwards(),
284 }
285 }
286
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200287 // Generate the node's MAC address if it isn't already set in NodeOptions.
288 if options.Mac == nil {
289 mac, err := generateRandomEthernetMAC()
290 if err != nil {
291 return err
292 }
293 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200294 }
295
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200296 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
297 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200298 storagePath := filepath.Join(r.ld, "image.qcow2")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200299 qemuArgs := []string{
Jan Schära9b060b2024-08-07 10:42:29 +0200300 "-machine", "q35",
301 "-accel", "kvm",
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200302 "-display", "none",
Jan Schära9b060b2024-08-07 10:42:29 +0200303 "-nodefaults",
304 "-cpu", "host",
305 "-m", fmt.Sprintf("%dM", options.MemoryMiB),
306 "-smp", fmt.Sprintf("cores=%d,threads=%d", options.CPUs, options.ThreadsPerCPU),
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000307 "-drive", "if=pflash,format=raw,readonly=on,file=" + xOvmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200308 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200309 "-drive", "if=virtio,format=qcow2,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200310 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200311 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200312 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
313 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
314 "-device", "tpm-tis,tpmdev=tpm0",
315 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200316 "-serial", "stdio",
317 }
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200318 if options.RunVNC {
319 vncSocketPath := filepath.Join(r.sd, "vnc-socket")
320 qemuArgs = append(qemuArgs,
321 "-vnc", "unix:"+vncSocketPath,
322 "-device", "virtio-vga",
323 )
324 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200325
326 if !options.AllowReboot {
327 qemuArgs = append(qemuArgs, "-no-reboot")
328 }
329
330 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200331 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200332 parametersRaw, err := proto.Marshal(options.NodeParameters)
333 if err != nil {
334 return fmt.Errorf("failed to encode node paraeters: %w", err)
335 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200336 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200337 return fmt.Errorf("failed to write node parameters: %w", err)
338 }
339 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
340 }
341
Leopoldacfad5b2023-01-15 14:05:25 +0100342 if options.PcapDump {
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200343 qemuNetDump := launch.QemuValue{
344 "id": {"net0"},
345 "netdev": {"net0"},
346 "file": {filepath.Join(r.ld, "net0.pcap")},
Leopoldacfad5b2023-01-15 14:05:25 +0100347 }
348 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
349 }
350
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200351 // Manufacture TPM if needed.
352 tpmd := filepath.Join(r.ld, "tpm")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000353 err := tpmFactory.Manufacture(ctx, tpmd, &TPMPlatform{
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200354 Manufacturer: "Monogon",
355 Version: "1.0",
356 Model: "TestCluster",
357 })
358 if err != nil {
359 return fmt.Errorf("could not manufacture TPM: %w", err)
360 }
361
Serge Bazanski66e58952021-10-05 17:06:56 +0200362 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200363 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200364
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000365 tpmEmuCmd := exec.CommandContext(tpmCtx, xSwtpmPath, "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanskib07c57a2024-06-04 14:33:27 +0000366 // Silence warnings from unsafe libtpms build (uses non-constant-time
367 // cryptographic operations).
368 tpmEmuCmd.Env = append(tpmEmuCmd.Env, "MONOGON_LIBTPMS_ACKNOWLEDGE_UNSAFE=yes")
Serge Bazanski66e58952021-10-05 17:06:56 +0200369 tpmEmuCmd.Stderr = os.Stderr
370 tpmEmuCmd.Stdout = os.Stdout
371
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100372 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200373 if err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200374 tpmCancel()
Serge Bazanski66e58952021-10-05 17:06:56 +0200375 return fmt.Errorf("failed to start TPM emulator: %w", err)
376 }
377
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200378 // Wait for the socket to be created by the TPM emulator before launching
379 // QEMU.
380 for {
381 _, err := os.Stat(tpmSocketPath)
382 if err == nil {
383 break
384 }
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200385 if !os.IsNotExist(err) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200386 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200387 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
388 }
389 if err := tpmCtx.Err(); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200390 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200391 return fmt.Errorf("while waiting for the TPM socket: %w", err)
392 }
393 time.Sleep(time.Millisecond * 100)
394 }
395
Serge Bazanski66e58952021-10-05 17:06:56 +0200396 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200397 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200398 if options.ConnectToSocket != nil {
399 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
400 }
401
402 var stdErrBuf bytes.Buffer
403 systemCmd.Stderr = &stdErrBuf
404 systemCmd.Stdout = options.SerialPort
405
Leopoldaf5086b2023-01-15 14:12:42 +0100406 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
407
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200408 go func() {
409 launch.Log("Node: Starting...")
410 err = systemCmd.Run()
411 launch.Log("Node: Returned: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200412
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200413 // Stop TPM emulator and wait for it to exit to properly reap the child process
414 tpmCancel()
415 launch.Log("Node: Waiting for TPM emulator to exit")
416 // Wait returns a SIGKILL error because we just cancelled its context.
417 // We still need to call it to avoid creating zombies.
418 errTpm := tpmEmuCmd.Wait()
419 launch.Log("Node: TPM emulator done: %v", errTpm)
Serge Bazanski66e58952021-10-05 17:06:56 +0200420
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200421 var exerr *exec.ExitError
422 if err != nil && errors.As(err, &exerr) {
423 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
424 if status.Signaled() && status.Signal() == syscall.SIGKILL {
425 // Process was killed externally (most likely by our context being canceled).
426 // This is a normal exit for us, so return nil
427 doneC <- nil
428 return
429 }
430 exerr.Stderr = stdErrBuf.Bytes()
431 newErr := launch.QEMUError(*exerr)
432 launch.Log("Node: %q", stdErrBuf.String())
433 doneC <- &newErr
434 return
Serge Bazanski66e58952021-10-05 17:06:56 +0200435 }
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200436 doneC <- err
437 }()
438 return nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200439}
440
441func copyFile(src, dst string) error {
442 in, err := os.Open(src)
443 if err != nil {
444 return fmt.Errorf("when opening source: %w", err)
445 }
446 defer in.Close()
447
448 out, err := os.Create(dst)
449 if err != nil {
450 return fmt.Errorf("when creating destination: %w", err)
451 }
452 defer out.Close()
453
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100454 endPos, err := in.Seek(0, io.SeekEnd)
Serge Bazanski66e58952021-10-05 17:06:56 +0200455 if err != nil {
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100456 return fmt.Errorf("when getting source end: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200457 }
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100458
459 // Copy the file while preserving its sparseness. The image files are very
460 // sparse (less than 10% allocated), so this is a lot faster.
461 var lastHoleStart int64
462 for {
463 dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
464 if err != nil {
465 return fmt.Errorf("when seeking to next data block: %w", err)
466 }
467 holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
468 if err != nil {
469 return fmt.Errorf("when seeking to next hole: %w", err)
470 }
471 lastHoleStart = holeStart
472 if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
473 return fmt.Errorf("when seeking to current data block: %w", err)
474 }
475 if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
476 return fmt.Errorf("when seeking output to next data block: %w", err)
477 }
478 if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
479 return fmt.Errorf("when copying file: %w", err)
480 }
481 if endPos == holeStart {
482 // The next hole is at the end of the file, we're done here.
483 break
484 }
485 }
486
Serge Bazanski66e58952021-10-05 17:06:56 +0200487 return out.Close()
488}
489
Serge Bazanskie78a0892021-10-07 17:03:49 +0200490// getNodes wraps around Management.GetNodes to return a list of nodes in a
491// cluster.
492func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200493 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100494 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100495 err := backoff.Retry(func() error {
496 res = nil
497 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200498 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100499 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200500 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100501 for {
502 node, err := srvN.Recv()
503 if err == io.EOF {
504 break
505 }
506 if err != nil {
507 return fmt.Errorf("GetNodes.Recv: %w", err)
508 }
509 res = append(res, node)
510 }
511 return nil
512 }, bo)
513 if err != nil {
514 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200515 }
516 return res, nil
517}
518
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200519// getNode wraps Management.GetNodes. It returns node information matching
520// given node ID.
521func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
522 nodes, err := getNodes(ctx, mgmt)
523 if err != nil {
524 return nil, fmt.Errorf("could not get nodes: %w", err)
525 }
526 for _, n := range nodes {
Jan Schär39d9c242024-09-24 13:49:55 +0200527 if n.Id == id {
528 return n, nil
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200529 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200530 }
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200531 return nil, fmt.Errorf("no such node")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200532}
533
Serge Bazanski66e58952021-10-05 17:06:56 +0200534// Gets a random EUI-48 Ethernet MAC address
535func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
536 macBuf := make([]byte, 6)
537 _, err := rand.Read(macBuf)
538 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200539 return nil, fmt.Errorf("failed to read randomness for MAC: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200540 }
541
542 // Set U/L bit and clear I/G bit (locally administered individual MAC)
543 // Ref IEEE 802-2014 Section 8.2.2
544 macBuf[0] = (macBuf[0] | 2) & 0xfe
545 mac := net.HardwareAddr(macBuf)
546 return &mac, nil
547}
548
Serge Bazanskibe742842022-04-04 13:18:50 +0200549const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200550
Serge Bazanskibe742842022-04-04 13:18:50 +0200551// ClusterPorts contains all ports handled by Nanoswitch.
552var ClusterPorts = []uint16{
553 // Forwarded to the first node.
554 uint16(node.CuratorServicePort),
555 uint16(node.DebugServicePort),
556 uint16(node.KubernetesAPIPort),
557 uint16(node.KubernetesAPIWrappedPort),
558
559 // SOCKS proxy to the switch network
560 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200561}
562
563// ClusterOptions contains all options for launching a Metropolis cluster.
564type ClusterOptions struct {
565 // The number of nodes this cluster should be started with.
566 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100567
Jan Schära9b060b2024-08-07 10:42:29 +0200568 // Node are default options of all nodes.
569 Node NodeOptions
570
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100571 // If true, node logs will be saved to individual files instead of being printed
572 // out to stderr. The path of these files will be still printed to stdout.
573 //
574 // The files will be located within the launch directory inside TEST_TMPDIR (or
575 // the default tempdir location, if not set).
576 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200577
578 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
579 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
580 // incomplete.
581 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200582
583 // Optional local registry which will be made available to the cluster to
584 // pull images from. This is a more efficient alternative to preseeding all
585 // images used for testing.
586 LocalRegistry *localregistry.Server
Serge Bazanskie564f172024-04-03 12:06:06 +0200587
588 // InitialClusterConfiguration will be passed to the first node when creating the
589 // cluster, and defines some basic properties of the cluster. If not specified,
590 // the cluster will default to defaults as defined in
591 // metropolis.proto.api.NodeParameters.
592 InitialClusterConfiguration *cpb.ClusterConfiguration
Serge Bazanski66e58952021-10-05 17:06:56 +0200593}
594
595// Cluster is the running Metropolis cluster launched using the LaunchCluster
596// function.
597type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200598 // Owner is the TLS Certificate of the owner of the test cluster. This can be
599 // used to authenticate further clients to the running cluster.
600 Owner tls.Certificate
601 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200602 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200603 Ports launch.PortMap
604
Serge Bazanskibe742842022-04-04 13:18:50 +0200605 // Nodes is a map from Node ID to its runtime information.
606 Nodes map[string]*NodeInCluster
607 // NodeIDs is a list of node IDs that are backing this cluster, in order of
608 // creation.
609 NodeIDs []string
610
Serge Bazanski54e212a2023-06-14 13:45:11 +0200611 // CACertificate is the cluster's CA certificate.
612 CACertificate *x509.Certificate
613
Serge Bazanski66e58952021-10-05 17:06:56 +0200614 // nodesDone is a list of channels populated with the return codes from all the
615 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100616 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200617 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200618 // nodeOpts are the cluster member nodes' mutable launch options, kept here
619 // to facilitate reboots.
620 nodeOpts []NodeOptions
621 // launchDir points at the directory keeping the nodes' state, such as storage
622 // images, firmware variable files, TPM state.
623 launchDir string
624 // socketDir points at the directory keeping UNIX socket files, such as these
625 // used to facilitate communication between QEMU and swtpm. It's different
626 // from launchDir, and anchored nearer the file system root, due to the
627 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100628 socketDir string
629 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200630
Lorenz Brun276a7462023-07-12 21:28:54 +0200631 // SOCKSDialer is used by DialNode to establish connections to nodes via the
Serge Bazanskibe742842022-04-04 13:18:50 +0200632 // SOCKS server ran by nanoswitch.
Lorenz Brun276a7462023-07-12 21:28:54 +0200633 SOCKSDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200634
635 // authClient is a cached authenticated owner connection to a Curator
636 // instance within the cluster.
637 authClient *grpc.ClientConn
638
639 // ctxT is the context individual node contexts are created from.
640 ctxT context.Context
641 // ctxC is used by Close to cancel the context under which the nodes are
642 // running.
643 ctxC context.CancelFunc
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200644
645 tpmFactory *TPMFactory
Serge Bazanskibe742842022-04-04 13:18:50 +0200646}
647
648// NodeInCluster represents information about a node that's part of a Cluster.
649type NodeInCluster struct {
650 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200651 ID string
652 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200653 // Address of the node on the network ran by nanoswitch. Not reachable from the
654 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
655 // on Cluster.Ports[SOCKSPort]).
656 ManagementAddress string
657}
658
659// firstConnection performs the initial owner credential escrow with a newly
660// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
661// running at 10.1.0.2, which is always the case with the current nanoswitch
662// implementation.
663//
Leopold20a036e2023-01-15 00:17:19 +0100664// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200665// information as NodeInCluster.
666func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
667 // Dial external service.
668 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100669 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200670 if err != nil {
671 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
672 }
673 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
674 return socksDialer.Dial("tcp", addr)
675 }
676 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
677 if err != nil {
678 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
679 }
680 defer initClient.Close()
681
682 // Retrieve owner certificate - this can take a while because the node is still
683 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100684 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200685 aaa := apb.NewAAAClient(initClient)
686 var cert *tls.Certificate
687 err = backoff.Retry(func() error {
688 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
689 if st, ok := status.FromError(err); ok {
690 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100691 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200692 return err
693 }
694 }
695 return backoff.Permanent(err)
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200696 }, backoff.WithContext(backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(time.Minute)), ctx))
Serge Bazanskibe742842022-04-04 13:18:50 +0200697 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200698 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200699 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100700 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200701
702 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200703 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200704 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
705 if err != nil {
706 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
707 }
708 defer authClient.Close()
709 mgmt := apb.NewManagementClient(authClient)
710
711 var node *NodeInCluster
712 err = backoff.Retry(func() error {
713 nodes, err := getNodes(ctx, mgmt)
714 if err != nil {
715 return fmt.Errorf("retrieving nodes failed: %w", err)
716 }
717 if len(nodes) != 1 {
718 return fmt.Errorf("expected one node, got %d", len(nodes))
719 }
720 n := nodes[0]
721 if n.Status == nil || n.Status.ExternalAddress == "" {
722 return fmt.Errorf("node has no status and/or address")
723 }
724 node = &NodeInCluster{
Jan Schär39d9c242024-09-24 13:49:55 +0200725 ID: n.Id,
Serge Bazanskibe742842022-04-04 13:18:50 +0200726 ManagementAddress: n.Status.ExternalAddress,
727 }
728 return nil
729 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
730 if err != nil {
731 return nil, nil, err
732 }
733
734 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200735}
736
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100737func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200738 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100739 if err != nil {
740 return nil, err
741 }
742 return f, nil
743}
744
Serge Bazanski66e58952021-10-05 17:06:56 +0200745// LaunchCluster launches a cluster of Metropolis node VMs together with a
746// Nanoswitch instance to network them all together.
747//
748// The given context will be used to run all qemu instances in the cluster, and
749// canceling the context or calling Close() will terminate them.
750func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200751 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200752 return nil, errors.New("refusing to start cluster with zero nodes")
753 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200754
Jan Schära9b060b2024-08-07 10:42:29 +0200755 // Prepare the node options. These will be kept as part of Cluster.
756 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
757 // launch. The runtime information can be later used to restart a node.
758 // The 0th node will be initialized first. The rest will follow after it
759 // had bootstrapped the cluster.
760 nodeOpts := make([]NodeOptions, opts.NumNodes)
761 for i := range opts.NumNodes {
762 nodeOpts[i] = opts.Node
763 nodeOpts[i].Name = fmt.Sprintf("node%d", i)
764 nodeOpts[i].SerialPort = newPrefixedStdio(i)
765 }
766 nodeOpts[0].NodeParameters = &apb.NodeParameters{
767 Cluster: &apb.NodeParameters_ClusterBootstrap_{
768 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
769 OwnerPublicKey: InsecurePublicKey,
770 InitialClusterConfiguration: opts.InitialClusterConfiguration,
771 Labels: &cpb.NodeLabels{
772 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski20498dd2024-09-30 17:07:08 +0000773 {Key: NodeNumberKey, Value: "0"},
Jan Schära9b060b2024-08-07 10:42:29 +0200774 },
775 },
776 },
777 },
778 }
779 nodeOpts[0].PcapDump = true
780
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200781 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100782 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200783 if err != nil {
784 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
785 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100786 // Create the metroctl config directory. We keep it in /tmp because in some
787 // scenarios it's end-user visible and we want it short.
788 md, err := os.MkdirTemp("/tmp", "metroctl-*")
789 if err != nil {
790 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
791 }
792
793 // Create the socket directory. We keep it in /tmp because of socket path limits.
794 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200795 if err != nil {
796 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
797 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200798
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200799 // Set up TPM factory.
800 tpmf, err := NewTPMFactory(filepath.Join(ld, "tpm"))
801 if err != nil {
802 return nil, fmt.Errorf("failed to create TPM factory: %w", err)
803 }
804
Serge Bazanski66e58952021-10-05 17:06:56 +0200805 // Prepare links between nodes and nanoswitch.
806 var switchPorts []*os.File
Jan Schära9b060b2024-08-07 10:42:29 +0200807 for i := range opts.NumNodes {
Serge Bazanski66e58952021-10-05 17:06:56 +0200808 switchPort, vmPort, err := launch.NewSocketPair()
809 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200810 return nil, fmt.Errorf("failed to get socketpair: %w", err)
811 }
812 switchPorts = append(switchPorts, switchPort)
Jan Schära9b060b2024-08-07 10:42:29 +0200813 nodeOpts[i].ConnectToSocket = vmPort
Serge Bazanski66e58952021-10-05 17:06:56 +0200814 }
815
Serge Bazanskie78a0892021-10-07 17:03:49 +0200816 // Make a list of channels that will be populated by all running node qemu
817 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200818 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200819 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200820 done[i] = make(chan error, 1)
821 }
822
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100823 if opts.NodeLogsToFiles {
Jan Schära9b060b2024-08-07 10:42:29 +0200824 for i := range opts.NumNodes {
825 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i))
826 port, err := NewSerialFileLogger(path)
827 if err != nil {
828 return nil, fmt.Errorf("could not open log file for node %d: %w", i, err)
829 }
830 launch.Log("Node %d logs at %s", i, path)
831 nodeOpts[i].SerialPort = port
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100832 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100833 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200834
835 // Start the first node.
836 ctxT, ctxC := context.WithCancel(ctx)
Jan Schär0b927652024-07-31 18:08:50 +0200837 launch.Log("Cluster: Starting node %d...", 0)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200838 if err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[0], done[0]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200839 ctxC()
840 return nil, fmt.Errorf("failed to launch first node: %w", err)
841 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200842
Lorenz Brun150f24a2023-07-13 20:11:06 +0200843 localRegistryAddr := net.TCPAddr{
844 IP: net.IPv4(10, 42, 0, 82),
845 Port: 5000,
846 }
847
848 var guestSvcMap launch.GuestServiceMap
849 if opts.LocalRegistry != nil {
850 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
851 if err != nil {
852 ctxC()
853 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
854 }
855 s := http.Server{
856 Handler: opts.LocalRegistry,
857 }
858 go s.Serve(l)
859 go func() {
860 <-ctxT.Done()
861 s.Close()
862 }()
863 guestSvcMap = launch.GuestServiceMap{
864 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
865 }
866 }
867
Serge Bazanskie78a0892021-10-07 17:03:49 +0200868 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200869 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
870 if err != nil {
871 ctxC()
872 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
873 }
874
875 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100876 var serialPort io.ReadWriter
877 if opts.NodeLogsToFiles {
878 path := path.Join(ld, "nanoswitch.txt")
879 serialPort, err = NewSerialFileLogger(path)
880 if err != nil {
881 launch.Log("Could not open log file for nanoswitch: %v", err)
882 }
883 launch.Log("Nanoswitch logs at %s", path)
884 } else {
885 serialPort = newPrefixedStdio(99)
886 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200887 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100888 Name: "nanoswitch",
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000889 KernelPath: xKernelPath,
890 InitramfsPath: xInitramfsPath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200891 ExtraNetworkInterfaces: switchPorts,
892 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200893 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100894 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100895 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200896 }); err != nil {
897 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100898 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200899 }
900 }
901 }()
902
Serge Bazanskibe742842022-04-04 13:18:50 +0200903 // Build SOCKS dialer.
904 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
905 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200906 if err != nil {
907 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200908 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200909 }
910
Serge Bazanskibe742842022-04-04 13:18:50 +0200911 // Retrieve owner credentials and first node.
912 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200913 if err != nil {
914 ctxC()
915 return nil, err
916 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200917
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100918 // Write credentials to the metroctl directory.
919 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
920 ctxC()
921 return nil, fmt.Errorf("could not write owner key: %w", err)
922 }
923 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
924 ctxC()
925 return nil, fmt.Errorf("could not write owner certificate: %w", err)
926 }
927
Serge Bazanski53458ba2024-06-18 09:56:46 +0000928 launch.Log("Cluster: Node %d is %s", 0, firstNode.ID)
929
930 // Set up a partially initialized cluster instance, to be filled in the
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200931 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200932 cluster := &Cluster{
933 Owner: *cert,
934 Ports: portMap,
935 Nodes: map[string]*NodeInCluster{
936 firstNode.ID: firstNode,
937 },
938 NodeIDs: []string{
939 firstNode.ID,
940 },
941
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100942 nodesDone: done,
943 nodeOpts: nodeOpts,
944 launchDir: ld,
945 socketDir: sd,
946 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200947
Lorenz Brun276a7462023-07-12 21:28:54 +0200948 SOCKSDialer: socksDialer,
Serge Bazanskibe742842022-04-04 13:18:50 +0200949
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200950 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200951 ctxC: ctxC,
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200952
953 tpmFactory: tpmf,
Serge Bazanskibe742842022-04-04 13:18:50 +0200954 }
955
956 // Now start the rest of the nodes and register them into the cluster.
957
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200958 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200959 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200960 if err != nil {
961 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200962 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200963 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200964 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200965
966 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100967 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200968 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
969 if err != nil {
970 ctxC()
971 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
972 }
973 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100974 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200975
976 // Retrieve cluster info (for directory and ca public key) to register further
977 // nodes.
978 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
979 if err != nil {
980 ctxC()
981 return nil, fmt.Errorf("GetClusterInfo: %w", err)
982 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200983 caCert, err := x509.ParseCertificate(resI.CaCertificate)
984 if err != nil {
985 ctxC()
986 return nil, fmt.Errorf("ParseCertificate: %w", err)
987 }
988 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200989
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200990 // Use the retrieved information to configure the rest of the node options.
991 for i := 1; i < opts.NumNodes; i++ {
Jan Schära9b060b2024-08-07 10:42:29 +0200992 nodeOpts[i].NodeParameters = &apb.NodeParameters{
993 Cluster: &apb.NodeParameters_ClusterRegister_{
994 ClusterRegister: &apb.NodeParameters_ClusterRegister{
995 RegisterTicket: ticket,
996 ClusterDirectory: resI.ClusterDirectory,
997 CaCertificate: resI.CaCertificate,
998 Labels: &cpb.NodeLabels{
999 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski20498dd2024-09-30 17:07:08 +00001000 {Key: NodeNumberKey, Value: fmt.Sprintf("%d", i)},
Serge Bazanski30e30b32024-05-22 14:11:56 +02001001 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001002 },
1003 },
1004 },
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001005 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001006 }
1007
1008 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001009 for i := 1; i < opts.NumNodes; i++ {
Jan Schär0b927652024-07-31 18:08:50 +02001010 launch.Log("Cluster: Starting node %d...", i)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001011 err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[i], done[i])
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001012 if err != nil {
Jan Schär0b927652024-07-31 18:08:50 +02001013 return nil, fmt.Errorf("failed to launch node %d: %w", i, err)
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001014 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001015 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001016
Serge Bazanski53458ba2024-06-18 09:56:46 +00001017 // Wait for nodes to appear as NEW, populate a map from node number (index into
Jan Schära9b060b2024-08-07 10:42:29 +02001018 // nodeOpts, etc.) to Metropolis Node ID.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001019 seenNodes := make(map[string]bool)
Serge Bazanski53458ba2024-06-18 09:56:46 +00001020 nodeNumberToID := make(map[int]string)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001021 launch.Log("Cluster: waiting for nodes to appear as NEW...")
1022 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001023 for {
1024 nodes, err := getNodes(ctx, mgmt)
1025 if err != nil {
1026 ctxC()
1027 return nil, fmt.Errorf("could not get nodes: %w", err)
1028 }
1029 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001030 if n.State != cpb.NodeState_NODE_STATE_NEW {
1031 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +02001032 }
Serge Bazanski87d9c592024-03-20 12:35:11 +01001033 if seenNodes[n.Id] {
1034 continue
1035 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001036 seenNodes[n.Id] = true
1037 cluster.Nodes[n.Id] = &NodeInCluster{
1038 ID: n.Id,
1039 Pubkey: n.Pubkey,
1040 }
Serge Bazanski53458ba2024-06-18 09:56:46 +00001041
Serge Bazanski20498dd2024-09-30 17:07:08 +00001042 num, err := strconv.Atoi(node.GetNodeLabel(n.Labels, NodeNumberKey))
Serge Bazanski53458ba2024-06-18 09:56:46 +00001043 if err != nil {
1044 return nil, fmt.Errorf("node %s has undecodable number label: %w", n.Id, err)
1045 }
1046 launch.Log("Cluster: Node %d is %s", num, n.Id)
1047 nodeNumberToID[num] = n.Id
Serge Bazanskie78a0892021-10-07 17:03:49 +02001048 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001049
1050 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001051 break
1052 }
1053 time.Sleep(1 * time.Second)
1054 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001055 }
1056 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001057
Serge Bazanski53458ba2024-06-18 09:56:46 +00001058 // Build the rest of NodeIDs from map.
1059 for i := 1; i < opts.NumNodes; i++ {
1060 cluster.NodeIDs = append(cluster.NodeIDs, nodeNumberToID[i])
1061 }
1062
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001063 approvedNodes := make(map[string]bool)
1064 upNodes := make(map[string]bool)
1065 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001066 for {
1067 nodes, err := getNodes(ctx, mgmt)
1068 if err != nil {
1069 ctxC()
1070 return nil, fmt.Errorf("could not get nodes: %w", err)
1071 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001072 for _, node := range nodes {
1073 if !seenNodes[node.Id] {
1074 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001075 continue
1076 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001077
1078 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
1079 launch.Log("Cluster: node %s is up", node.Id)
1080 upNodes[node.Id] = true
1081 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +02001082 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001083 if upNodes[node.Id] {
1084 continue
Serge Bazanskibe742842022-04-04 13:18:50 +02001085 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001086
1087 if !approvedNodes[node.Id] {
1088 launch.Log("Cluster: approving node %s", node.Id)
1089 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1090 Pubkey: node.Pubkey,
1091 })
1092 if err != nil {
1093 ctxC()
1094 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
1095 }
1096 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +02001097 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001098 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001099
Jan Schär0b927652024-07-31 18:08:50 +02001100 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes, len(upNodes)+1)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001101 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001102 break
1103 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001104 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001105 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001106 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001107
Serge Bazanski05f813b2023-03-16 17:58:39 +01001108 launch.Log("Cluster: all nodes up:")
Jan Schär0b927652024-07-31 18:08:50 +02001109 for i, nodeID := range cluster.NodeIDs {
1110 launch.Log("Cluster: %d. %s at %s", i, nodeID, cluster.Nodes[nodeID].ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001111 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001112 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001113
Serge Bazanskibe742842022-04-04 13:18:50 +02001114 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001115}
1116
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001117// RebootNode reboots the cluster member node matching the given index, and
1118// waits for it to rejoin the cluster. It will use the given context ctx to run
1119// cluster API requests, whereas the resulting QEMU process will be created
1120// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1121func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1122 if idx < 0 || idx >= len(c.NodeIDs) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001123 return fmt.Errorf("index out of bounds")
1124 }
1125 if c.nodeOpts[idx].Runtime == nil {
1126 return fmt.Errorf("node not running")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001127 }
1128 id := c.NodeIDs[idx]
1129
1130 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001131 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001132 if err != nil {
1133 return err
1134 }
1135 mgmt := apb.NewManagementClient(curC)
1136
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001137 // Cancel the node's context. This will shut down QEMU.
1138 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001139 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001140 err = <-c.nodesDone[idx]
1141 if err != nil {
1142 return fmt.Errorf("while restarting node: %w", err)
1143 }
1144
1145 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001146 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001147 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001148 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1149 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001150
Serge Bazanskibc969572024-03-21 11:56:13 +01001151 start := time.Now()
1152
1153 // Poll Management.GetNodes until the node is healthy.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001154 for {
1155 cs, err := getNode(ctx, mgmt, id)
1156 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001157 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001158 return err
1159 }
Serge Bazanskibc969572024-03-21 11:56:13 +01001160 launch.Log("Cluster: node health: %+v", cs.Health)
1161
1162 lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
1163 if lhb.After(start) && cs.Health == apb.Node_HEALTHY {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001164 break
1165 }
1166 time.Sleep(time.Second)
1167 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001168 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001169 return nil
1170}
1171
Serge Bazanski500f6e02024-04-03 12:06:40 +02001172// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
1173// given by idx. If the node is already shut down, this is a no-op.
1174func (c *Cluster) ShutdownNode(idx int) error {
1175 if idx < 0 || idx >= len(c.NodeIDs) {
1176 return fmt.Errorf("index out of bounds")
1177 }
1178 // Return if node is already stopped.
1179 select {
1180 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1181 return nil
1182 default:
1183 }
1184 id := c.NodeIDs[idx]
1185
1186 // Cancel the node's context. This will shut down QEMU.
1187 c.nodeOpts[idx].Runtime.CtxC()
1188 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
1189 err := <-c.nodesDone[idx]
1190 if err != nil {
1191 return fmt.Errorf("while shutting down node: %w", err)
1192 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001193 launch.Log("Cluster: node %d (%s) stopped.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001194 return nil
1195}
1196
1197// StartNode performs a power on of the node given by idx. If the node is already
1198// running, this is a no-op.
1199func (c *Cluster) StartNode(idx int) error {
1200 if idx < 0 || idx >= len(c.NodeIDs) {
1201 return fmt.Errorf("index out of bounds")
1202 }
1203 id := c.NodeIDs[idx]
1204 // Return if node is already running.
1205 select {
1206 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1207 default:
1208 return nil
1209 }
1210
1211 // Start QEMU again.
1212 launch.Log("Cluster: starting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001213 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanski500f6e02024-04-03 12:06:40 +02001214 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1215 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001216 launch.Log("Cluster: node %d (%s) started.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001217 return nil
1218}
1219
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001220// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001221// nodes to stop. It returns an error if stopping the nodes failed, or one of
1222// the nodes failed to fully start in the first place.
1223func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001224 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001225 if c.authClient != nil {
1226 c.authClient.Close()
1227 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001228 c.ctxC()
1229
Leopold20a036e2023-01-15 00:17:19 +01001230 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001231 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001232 for _, c := range c.nodesDone {
1233 err := <-c
1234 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001235 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001236 }
1237 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001238 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001239 os.RemoveAll(c.launchDir)
1240 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001241 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001242 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001243 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001244}
Serge Bazanskibe742842022-04-04 13:18:50 +02001245
1246// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1247// their ID. This is performed by connecting to the cluster nanoswitch via its
1248// SOCKS proxy, and using the cluster node list for name resolution.
1249//
1250// For example:
1251//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001252// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001253func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1254 host, port, err := net.SplitHostPort(addr)
1255 if err != nil {
1256 return nil, fmt.Errorf("invalid host:port: %w", err)
1257 }
1258 // Already an IP address?
1259 if net.ParseIP(host) != nil {
Lorenz Brun276a7462023-07-12 21:28:54 +02001260 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001261 }
1262
1263 // Otherwise, expect a node name.
1264 node, ok := c.Nodes[host]
1265 if !ok {
1266 return nil, fmt.Errorf("unknown node %q", host)
1267 }
1268 addr = net.JoinHostPort(node.ManagementAddress, port)
Lorenz Brun276a7462023-07-12 21:28:54 +02001269 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001270}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001271
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001272// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1273// Kubernetes authenticating proxy using the cluster owner identity.
1274// It currently has access to everything (i.e. the cluster-admin role)
1275// via the owner-admin binding.
1276func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1277 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1278 if err != nil {
1279 // We explicitly pass an Ed25519 private key in, so this can't happen
1280 panic(err)
1281 }
1282
1283 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001284 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001285 Host: host,
1286 TLSClientConfig: rest.TLSClientConfig{
1287 // TODO(q3k): use CA certificate
1288 Insecure: true,
1289 ServerName: "kubernetes.default.svc",
1290 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1291 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1292 },
1293 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1294 return c.DialNode(ctx, address)
1295 },
1296 }
1297 return kubernetes.NewForConfig(&clientConfig)
1298}
1299
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001300// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1301// which are currently Kubernetes controllers, ie. run an apiserver. This list
1302// might be empty if no node is currently configured with the
1303// 'KubernetesController' node.
1304func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1305 curC, err := c.CuratorClient()
1306 if err != nil {
1307 return nil, err
1308 }
1309 mgmt := apb.NewManagementClient(curC)
1310 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1311 Filter: "has(node.roles.kubernetes_controller)",
1312 })
1313 if err != nil {
1314 return nil, err
1315 }
1316 defer srv.CloseSend()
1317 var res []string
1318 for {
1319 n, err := srv.Recv()
1320 if err == io.EOF {
1321 break
1322 }
1323 if err != nil {
1324 return nil, err
1325 }
1326 if n.Status == nil || n.Status.ExternalAddress == "" {
1327 continue
1328 }
1329 res = append(res, n.Status.ExternalAddress)
1330 }
1331 return res, nil
1332}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001333
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001334// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1335// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001336func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1337 // Get an authenticated owner client within the cluster.
1338 curC, err := c.CuratorClient()
1339 if err != nil {
1340 return err
1341 }
1342 mgmt := apb.NewManagementClient(curC)
1343 nodes, err := getNodes(ctx, mgmt)
1344 if err != nil {
1345 return err
1346 }
1347
1348 var unhealthy []string
1349 for _, node := range nodes {
1350 if node.Health == apb.Node_HEALTHY {
1351 continue
1352 }
1353 unhealthy = append(unhealthy, node.Id)
1354 }
1355 if len(unhealthy) == 0 {
1356 return nil
1357 }
1358 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1359}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001360
1361// ApproveNode approves a node by ID, waiting for it to become UP.
1362func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1363 curC, err := c.CuratorClient()
1364 if err != nil {
1365 return err
1366 }
1367 mgmt := apb.NewManagementClient(curC)
1368
1369 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1370 Pubkey: c.Nodes[id].Pubkey,
1371 })
1372 if err != nil {
1373 return fmt.Errorf("ApproveNode: %w", err)
1374 }
1375 launch.Log("Cluster: %s: approved, waiting for UP", id)
1376 for {
1377 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1378 if err != nil {
1379 return fmt.Errorf("GetNodes: %w", err)
1380 }
1381 found := false
1382 for {
1383 node, err := nodes.Recv()
1384 if errors.Is(err, io.EOF) {
1385 break
1386 }
1387 if err != nil {
1388 return fmt.Errorf("Nodes.Recv: %w", err)
1389 }
1390 if node.Id != id {
1391 continue
1392 }
1393 if node.State != cpb.NodeState_NODE_STATE_UP {
1394 continue
1395 }
1396 found = true
1397 break
1398 }
1399 nodes.CloseSend()
1400
1401 if found {
1402 break
1403 }
1404 time.Sleep(time.Second)
1405 }
1406 launch.Log("Cluster: %s: UP", id)
1407 return nil
1408}
1409
1410// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1411func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1412 curC, err := c.CuratorClient()
1413 if err != nil {
1414 return err
1415 }
1416 mgmt := apb.NewManagementClient(curC)
1417
1418 tr := true
1419 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1420 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1421 Node: &apb.UpdateNodeRolesRequest_Id{
1422 Id: id,
1423 },
1424 KubernetesWorker: &tr,
1425 })
1426 return err
1427}
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001428
Jan Schära9b060b2024-08-07 10:42:29 +02001429// MakeKubernetesController adds the KubernetesController role to a node by ID.
1430func (c *Cluster) MakeKubernetesController(ctx context.Context, id string) error {
1431 curC, err := c.CuratorClient()
1432 if err != nil {
1433 return err
1434 }
1435 mgmt := apb.NewManagementClient(curC)
1436
1437 tr := true
1438 launch.Log("Cluster: %s: adding KubernetesController", id)
1439 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1440 Node: &apb.UpdateNodeRolesRequest_Id{
1441 Id: id,
1442 },
1443 KubernetesController: &tr,
1444 })
1445 return err
1446}
1447
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001448// MakeConsensusMember adds the ConsensusMember role to a node by ID.
1449func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
1450 curC, err := c.CuratorClient()
1451 if err != nil {
1452 return err
1453 }
1454 mgmt := apb.NewManagementClient(curC)
1455 cur := ipb.NewCuratorClient(curC)
1456
1457 tr := true
1458 launch.Log("Cluster: %s: adding ConsensusMember", id)
1459 bo := backoff.NewExponentialBackOff()
1460 bo.MaxElapsedTime = 10 * time.Second
1461
1462 backoff.Retry(func() error {
1463 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1464 Node: &apb.UpdateNodeRolesRequest_Id{
1465 Id: id,
1466 },
1467 ConsensusMember: &tr,
1468 })
1469 if err != nil {
1470 launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
1471 }
1472 return err
1473 }, backoff.WithContext(bo, ctx))
1474 if err != nil {
1475 return err
1476 }
1477
1478 launch.Log("Cluster: %s: waiting for learner/full members...", id)
1479
1480 learner := false
1481 for {
1482 res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
1483 if err != nil {
1484 return fmt.Errorf("GetConsensusStatus: %w", err)
1485 }
1486 for _, member := range res.EtcdMember {
1487 if member.Id != id {
1488 continue
1489 }
1490 switch member.Status {
1491 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
1492 if !learner {
1493 learner = true
1494 launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
1495 }
1496 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
1497 launch.Log("Cluster: %s: became a full member", id)
1498 return nil
1499 }
1500 }
1501 time.Sleep(100 * time.Millisecond)
1502 }
1503}