blob: 8be6736a9114c40557cc51b0bd970eec4b7a2720 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
Tim Windelschmidt9f21f532024-05-07 15:14:20 +02005package launch
Serge Bazanski66e58952021-10-05 17:06:56 +02006
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020019 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "os"
21 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010022 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "path/filepath"
Serge Bazanski53458ba2024-06-18 09:56:46 +000024 "strconv"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020025 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "syscall"
27 "time"
28
29 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020031 "golang.org/x/net/proxy"
Lorenz Brun87bbf7e2024-03-18 18:22:25 +010032 "golang.org/x/sys/unix"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010034 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020037 "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020039
Serge Bazanski37cfcc12024-03-21 11:59:07 +010040 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020041 apb "source.monogon.dev/metropolis/proto/api"
42 cpb "source.monogon.dev/metropolis/proto/common"
43
Serge Bazanskica8d9512024-09-12 14:20:57 +020044 "source.monogon.dev/go/logging"
Serge Bazanskidd5b03c2024-05-16 18:07:06 +020045 "source.monogon.dev/go/qcow2"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010046 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Serge Bazanski66e58952021-10-05 17:06:56 +020047 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020048 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020049 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020050 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020051 "source.monogon.dev/metropolis/test/localregistry"
52 "source.monogon.dev/osbase/test/launch"
Serge Bazanski66e58952021-10-05 17:06:56 +020053)
54
Serge Bazanski53458ba2024-06-18 09:56:46 +000055const (
56 // nodeNumberKey is the key of the node label used to carry a node's numerical
57 // index in the test system.
58 nodeNumberKey string = "test-node-number"
59)
60
Leopold20a036e2023-01-15 00:17:19 +010061// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020062type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010063 // Name is a human-readable identifier to be used in debug output.
64 Name string
65
Jan Schära9b060b2024-08-07 10:42:29 +020066 // CPUs is the number of virtual CPUs of the VM.
67 CPUs int
68
69 // ThreadsPerCPU is the number of threads per CPU. This is multiplied by
70 // CPUs to get the total number of threads.
71 ThreadsPerCPU int
72
73 // MemoryMiB is the RAM size in MiB of the VM.
74 MemoryMiB int
75
Jan Schär07003572024-08-26 10:42:16 +020076 // DiskBytes contains the size of the root disk in bytes or zero if the
77 // unmodified image size is used.
78 DiskBytes uint64
79
Serge Bazanski66e58952021-10-05 17:06:56 +020080 // Ports contains the port mapping where to expose the internal ports of the VM to
81 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
82 // ConnectToSocket is set.
83 Ports launch.PortMap
84
Leopold20a036e2023-01-15 00:17:19 +010085 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
86 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020087 // want to test reboot behavior this should be false.
88 AllowReboot bool
89
Leopold20a036e2023-01-15 00:17:19 +010090 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
91 // set, it is instead connected to the given file descriptor/socket. If this is
92 // set, all port maps from the Ports option are ignored. Intended for networking
93 // this instance together with others for running more complex network
94 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020095 ConnectToSocket *os.File
96
Leopoldacfad5b2023-01-15 14:05:25 +010097 // When PcapDump is set, all traffic is dumped to a pcap file in the
98 // runtime directory (e.g. "net0.pcap" for the first interface).
99 PcapDump bool
100
Leopold20a036e2023-01-15 00:17:19 +0100101 // SerialPort is an io.ReadWriter over which you can communicate with the serial
102 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +0200103 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
104 SerialPort io.ReadWriter
105
106 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
107 // registering into a cluster.
108 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200109
110 // Mac is the node's MAC address.
111 Mac *net.HardwareAddr
112
113 // Runtime keeps the node's QEMU runtime state.
114 Runtime *NodeRuntime
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200115
116 // RunVNC starts a VNC socket for troubleshooting/testing console code. Note:
117 // this will not work in tests, as those use a built-in qemu which does not
118 // implement a VGA device.
119 RunVNC bool
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200120}
121
Leopold20a036e2023-01-15 00:17:19 +0100122// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200123type NodeRuntime struct {
124 // ld points at the node's launch directory storing data such as storage
125 // images, firmware variables or the TPM state.
126 ld string
127 // sd points at the node's socket directory.
128 sd string
129
130 // ctxT is the context QEMU will execute in.
131 ctxT context.Context
132 // CtxC is the QEMU context's cancellation function.
133 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200134}
135
136// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200137var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200138 node.ConsensusPort,
139
140 node.CuratorServicePort,
141 node.DebugServicePort,
142
143 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100144 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200145 node.CuratorServicePort,
146 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200147 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200148}
149
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200150// setupRuntime creates the node's QEMU runtime directory, together with all
151// files required to preserve its state, a level below the chosen path ld. The
152// node's socket directory is similarily created a level below sd. It may
153// return an I/O error.
Jan Schär07003572024-08-26 10:42:16 +0200154func setupRuntime(ld, sd string, diskBytes uint64) (*NodeRuntime, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200155 // Create a temporary directory to keep all the runtime files.
156 stdp, err := os.MkdirTemp(ld, "node_state*")
157 if err != nil {
158 return nil, fmt.Errorf("failed to create the state directory: %w", err)
159 }
160
161 // Initialize the node's storage with a prebuilt image.
Jan Schär07003572024-08-26 10:42:16 +0200162 st, err := os.Stat(xNodeImagePath)
163 if err != nil {
164 return nil, fmt.Errorf("cannot read image file: %w", err)
165 }
166 diskBytes = max(diskBytes, uint64(st.Size()))
167
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200168 di := filepath.Join(stdp, "image.qcow2")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000169 launch.Log("Cluster: generating node QCOW2 snapshot image: %s -> %s", xNodeImagePath, di)
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200170
171 df, err := os.Create(di)
172 if err != nil {
173 return nil, fmt.Errorf("while opening image for writing: %w", err)
174 }
175 defer df.Close()
Jan Schär07003572024-08-26 10:42:16 +0200176 if err := qcow2.Generate(df, qcow2.GenerateWithBackingFile(xNodeImagePath), qcow2.GenerateWithFileSize(diskBytes)); err != nil {
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200177 return nil, fmt.Errorf("while creating copy-on-write node image: %w", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200178 }
179
180 // Initialize the OVMF firmware variables file.
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000181 dv := filepath.Join(stdp, filepath.Base(xOvmfVarsPath))
182 if err := copyFile(xOvmfVarsPath, dv); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200183 return nil, fmt.Errorf("while copying firmware variables: %w", err)
184 }
185
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200186 // Create the socket directory.
187 sotdp, err := os.MkdirTemp(sd, "node_sock*")
188 if err != nil {
189 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
190 }
191
192 return &NodeRuntime{
193 ld: stdp,
194 sd: sotdp,
195 }, nil
196}
197
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200198// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200199// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200200func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200201 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200202 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanskica8d9512024-09-12 14:20:57 +0200203 r := resolver.New(c.ctxT, resolver.WithLogger(logging.NewFunctionBackend(func(severity logging.Severity, msg string) {
204 launch.Log("Cluster: client resolver: %s: %s", severity, msg)
205 })))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200206 for _, n := range c.NodeIDs {
207 ep, err := resolver.NodeWithDefaultPort(n)
208 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200209 return nil, fmt.Errorf("could not add node %q by DNS: %w", n, err)
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200210 }
211 r.AddEndpoint(ep)
212 }
213 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
214 grpc.WithTransportCredentials(authCreds),
215 grpc.WithResolvers(r),
216 grpc.WithContextDialer(c.DialNode),
217 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200218 if err != nil {
219 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
220 }
221 c.authClient = authClient
222 }
223 return c.authClient, nil
224}
225
Serge Bazanski66e58952021-10-05 17:06:56 +0200226// LaunchNode launches a single Metropolis node instance with the given options.
227// The instance runs mostly paravirtualized but with some emulated hardware
228// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200229// writable, and the changes are kept across reboots and shutdowns. ld and sd
230// point to the launch directory and the socket directory, holding the nodes'
231// state files (storage, tpm state, firmware state), and UNIX socket files
232// (swtpm <-> QEMU interplay) respectively. The directories must exist before
233// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
234// if either are not initialized.
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200235func LaunchNode(ctx context.Context, ld, sd string, tpmFactory *TPMFactory, options *NodeOptions, doneC chan error) error {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200236 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
237 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200238 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
239 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
240 // that we open and pass the name of it to QEMU. Not pinning this crashes both
241 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
242 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200243
Jan Schära9b060b2024-08-07 10:42:29 +0200244 if options.CPUs == 0 {
245 options.CPUs = 1
246 }
247 if options.ThreadsPerCPU == 0 {
248 options.ThreadsPerCPU = 1
249 }
250 if options.MemoryMiB == 0 {
251 options.MemoryMiB = 2048
252 }
253
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200254 // If it's the node's first start, set up its runtime directories.
255 if options.Runtime == nil {
Jan Schär07003572024-08-26 10:42:16 +0200256 r, err := setupRuntime(ld, sd, options.DiskBytes)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200257 if err != nil {
258 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200259 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200260 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200261 }
262
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200263 // Replace the node's context with a new one.
264 r := options.Runtime
265 if r.CtxC != nil {
266 r.CtxC()
267 }
268 r.ctxT, r.CtxC = context.WithCancel(ctx)
269
Serge Bazanski66e58952021-10-05 17:06:56 +0200270 var qemuNetType string
271 var qemuNetConfig launch.QemuValue
272 if options.ConnectToSocket != nil {
273 qemuNetType = "socket"
274 qemuNetConfig = launch.QemuValue{
275 "id": {"net0"},
276 "fd": {"3"},
277 }
278 } else {
279 qemuNetType = "user"
280 qemuNetConfig = launch.QemuValue{
281 "id": {"net0"},
282 "net": {"10.42.0.0/24"},
283 "dhcpstart": {"10.42.0.10"},
284 "hostfwd": options.Ports.ToQemuForwards(),
285 }
286 }
287
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200288 // Generate the node's MAC address if it isn't already set in NodeOptions.
289 if options.Mac == nil {
290 mac, err := generateRandomEthernetMAC()
291 if err != nil {
292 return err
293 }
294 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200295 }
296
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200297 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
298 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200299 storagePath := filepath.Join(r.ld, "image.qcow2")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200300 qemuArgs := []string{
Jan Schära9b060b2024-08-07 10:42:29 +0200301 "-machine", "q35",
302 "-accel", "kvm",
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200303 "-display", "none",
Jan Schära9b060b2024-08-07 10:42:29 +0200304 "-nodefaults",
305 "-cpu", "host",
306 "-m", fmt.Sprintf("%dM", options.MemoryMiB),
307 "-smp", fmt.Sprintf("cores=%d,threads=%d", options.CPUs, options.ThreadsPerCPU),
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000308 "-drive", "if=pflash,format=raw,readonly=on,file=" + xOvmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200309 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200310 "-drive", "if=virtio,format=qcow2,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200311 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200312 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200313 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
314 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
315 "-device", "tpm-tis,tpmdev=tpm0",
316 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200317 "-serial", "stdio",
318 }
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200319 if options.RunVNC {
320 vncSocketPath := filepath.Join(r.sd, "vnc-socket")
321 qemuArgs = append(qemuArgs,
322 "-vnc", "unix:"+vncSocketPath,
323 "-device", "virtio-vga",
324 )
325 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200326
327 if !options.AllowReboot {
328 qemuArgs = append(qemuArgs, "-no-reboot")
329 }
330
331 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200332 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200333 parametersRaw, err := proto.Marshal(options.NodeParameters)
334 if err != nil {
335 return fmt.Errorf("failed to encode node paraeters: %w", err)
336 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200337 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200338 return fmt.Errorf("failed to write node parameters: %w", err)
339 }
340 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
341 }
342
Leopoldacfad5b2023-01-15 14:05:25 +0100343 if options.PcapDump {
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200344 qemuNetDump := launch.QemuValue{
345 "id": {"net0"},
346 "netdev": {"net0"},
347 "file": {filepath.Join(r.ld, "net0.pcap")},
Leopoldacfad5b2023-01-15 14:05:25 +0100348 }
349 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
350 }
351
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200352 // Manufacture TPM if needed.
353 tpmd := filepath.Join(r.ld, "tpm")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000354 err := tpmFactory.Manufacture(ctx, tpmd, &TPMPlatform{
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200355 Manufacturer: "Monogon",
356 Version: "1.0",
357 Model: "TestCluster",
358 })
359 if err != nil {
360 return fmt.Errorf("could not manufacture TPM: %w", err)
361 }
362
Serge Bazanski66e58952021-10-05 17:06:56 +0200363 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200364 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200365
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000366 tpmEmuCmd := exec.CommandContext(tpmCtx, xSwtpmPath, "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanskib07c57a2024-06-04 14:33:27 +0000367 // Silence warnings from unsafe libtpms build (uses non-constant-time
368 // cryptographic operations).
369 tpmEmuCmd.Env = append(tpmEmuCmd.Env, "MONOGON_LIBTPMS_ACKNOWLEDGE_UNSAFE=yes")
Serge Bazanski66e58952021-10-05 17:06:56 +0200370 tpmEmuCmd.Stderr = os.Stderr
371 tpmEmuCmd.Stdout = os.Stdout
372
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100373 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200374 if err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200375 tpmCancel()
Serge Bazanski66e58952021-10-05 17:06:56 +0200376 return fmt.Errorf("failed to start TPM emulator: %w", err)
377 }
378
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200379 // Wait for the socket to be created by the TPM emulator before launching
380 // QEMU.
381 for {
382 _, err := os.Stat(tpmSocketPath)
383 if err == nil {
384 break
385 }
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200386 if !os.IsNotExist(err) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200387 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200388 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
389 }
390 if err := tpmCtx.Err(); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200391 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200392 return fmt.Errorf("while waiting for the TPM socket: %w", err)
393 }
394 time.Sleep(time.Millisecond * 100)
395 }
396
Serge Bazanski66e58952021-10-05 17:06:56 +0200397 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200398 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200399 if options.ConnectToSocket != nil {
400 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
401 }
402
403 var stdErrBuf bytes.Buffer
404 systemCmd.Stderr = &stdErrBuf
405 systemCmd.Stdout = options.SerialPort
406
Leopoldaf5086b2023-01-15 14:12:42 +0100407 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
408
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200409 go func() {
410 launch.Log("Node: Starting...")
411 err = systemCmd.Run()
412 launch.Log("Node: Returned: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200413
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200414 // Stop TPM emulator and wait for it to exit to properly reap the child process
415 tpmCancel()
416 launch.Log("Node: Waiting for TPM emulator to exit")
417 // Wait returns a SIGKILL error because we just cancelled its context.
418 // We still need to call it to avoid creating zombies.
419 errTpm := tpmEmuCmd.Wait()
420 launch.Log("Node: TPM emulator done: %v", errTpm)
Serge Bazanski66e58952021-10-05 17:06:56 +0200421
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200422 var exerr *exec.ExitError
423 if err != nil && errors.As(err, &exerr) {
424 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
425 if status.Signaled() && status.Signal() == syscall.SIGKILL {
426 // Process was killed externally (most likely by our context being canceled).
427 // This is a normal exit for us, so return nil
428 doneC <- nil
429 return
430 }
431 exerr.Stderr = stdErrBuf.Bytes()
432 newErr := launch.QEMUError(*exerr)
433 launch.Log("Node: %q", stdErrBuf.String())
434 doneC <- &newErr
435 return
Serge Bazanski66e58952021-10-05 17:06:56 +0200436 }
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200437 doneC <- err
438 }()
439 return nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200440}
441
442func copyFile(src, dst string) error {
443 in, err := os.Open(src)
444 if err != nil {
445 return fmt.Errorf("when opening source: %w", err)
446 }
447 defer in.Close()
448
449 out, err := os.Create(dst)
450 if err != nil {
451 return fmt.Errorf("when creating destination: %w", err)
452 }
453 defer out.Close()
454
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100455 endPos, err := in.Seek(0, io.SeekEnd)
Serge Bazanski66e58952021-10-05 17:06:56 +0200456 if err != nil {
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100457 return fmt.Errorf("when getting source end: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200458 }
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100459
460 // Copy the file while preserving its sparseness. The image files are very
461 // sparse (less than 10% allocated), so this is a lot faster.
462 var lastHoleStart int64
463 for {
464 dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
465 if err != nil {
466 return fmt.Errorf("when seeking to next data block: %w", err)
467 }
468 holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
469 if err != nil {
470 return fmt.Errorf("when seeking to next hole: %w", err)
471 }
472 lastHoleStart = holeStart
473 if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
474 return fmt.Errorf("when seeking to current data block: %w", err)
475 }
476 if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
477 return fmt.Errorf("when seeking output to next data block: %w", err)
478 }
479 if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
480 return fmt.Errorf("when copying file: %w", err)
481 }
482 if endPos == holeStart {
483 // The next hole is at the end of the file, we're done here.
484 break
485 }
486 }
487
Serge Bazanski66e58952021-10-05 17:06:56 +0200488 return out.Close()
489}
490
Serge Bazanskie78a0892021-10-07 17:03:49 +0200491// getNodes wraps around Management.GetNodes to return a list of nodes in a
492// cluster.
493func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200494 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100495 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100496 err := backoff.Retry(func() error {
497 res = nil
498 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200499 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100500 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200501 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100502 for {
503 node, err := srvN.Recv()
504 if err == io.EOF {
505 break
506 }
507 if err != nil {
508 return fmt.Errorf("GetNodes.Recv: %w", err)
509 }
510 res = append(res, node)
511 }
512 return nil
513 }, bo)
514 if err != nil {
515 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200516 }
517 return res, nil
518}
519
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200520// getNode wraps Management.GetNodes. It returns node information matching
521// given node ID.
522func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
523 nodes, err := getNodes(ctx, mgmt)
524 if err != nil {
525 return nil, fmt.Errorf("could not get nodes: %w", err)
526 }
527 for _, n := range nodes {
528 eid := identity.NodeID(n.Pubkey)
529 if eid != id {
530 continue
531 }
532 return n, nil
533 }
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200534 return nil, fmt.Errorf("no such node")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200535}
536
Serge Bazanski66e58952021-10-05 17:06:56 +0200537// Gets a random EUI-48 Ethernet MAC address
538func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
539 macBuf := make([]byte, 6)
540 _, err := rand.Read(macBuf)
541 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200542 return nil, fmt.Errorf("failed to read randomness for MAC: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200543 }
544
545 // Set U/L bit and clear I/G bit (locally administered individual MAC)
546 // Ref IEEE 802-2014 Section 8.2.2
547 macBuf[0] = (macBuf[0] | 2) & 0xfe
548 mac := net.HardwareAddr(macBuf)
549 return &mac, nil
550}
551
Serge Bazanskibe742842022-04-04 13:18:50 +0200552const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200553
Serge Bazanskibe742842022-04-04 13:18:50 +0200554// ClusterPorts contains all ports handled by Nanoswitch.
555var ClusterPorts = []uint16{
556 // Forwarded to the first node.
557 uint16(node.CuratorServicePort),
558 uint16(node.DebugServicePort),
559 uint16(node.KubernetesAPIPort),
560 uint16(node.KubernetesAPIWrappedPort),
561
562 // SOCKS proxy to the switch network
563 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200564}
565
566// ClusterOptions contains all options for launching a Metropolis cluster.
567type ClusterOptions struct {
568 // The number of nodes this cluster should be started with.
569 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100570
Jan Schära9b060b2024-08-07 10:42:29 +0200571 // Node are default options of all nodes.
572 Node NodeOptions
573
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100574 // If true, node logs will be saved to individual files instead of being printed
575 // out to stderr. The path of these files will be still printed to stdout.
576 //
577 // The files will be located within the launch directory inside TEST_TMPDIR (or
578 // the default tempdir location, if not set).
579 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200580
581 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
582 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
583 // incomplete.
584 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200585
586 // Optional local registry which will be made available to the cluster to
587 // pull images from. This is a more efficient alternative to preseeding all
588 // images used for testing.
589 LocalRegistry *localregistry.Server
Serge Bazanskie564f172024-04-03 12:06:06 +0200590
591 // InitialClusterConfiguration will be passed to the first node when creating the
592 // cluster, and defines some basic properties of the cluster. If not specified,
593 // the cluster will default to defaults as defined in
594 // metropolis.proto.api.NodeParameters.
595 InitialClusterConfiguration *cpb.ClusterConfiguration
Serge Bazanski66e58952021-10-05 17:06:56 +0200596}
597
598// Cluster is the running Metropolis cluster launched using the LaunchCluster
599// function.
600type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200601 // Owner is the TLS Certificate of the owner of the test cluster. This can be
602 // used to authenticate further clients to the running cluster.
603 Owner tls.Certificate
604 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200605 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200606 Ports launch.PortMap
607
Serge Bazanskibe742842022-04-04 13:18:50 +0200608 // Nodes is a map from Node ID to its runtime information.
609 Nodes map[string]*NodeInCluster
610 // NodeIDs is a list of node IDs that are backing this cluster, in order of
611 // creation.
612 NodeIDs []string
613
Serge Bazanski54e212a2023-06-14 13:45:11 +0200614 // CACertificate is the cluster's CA certificate.
615 CACertificate *x509.Certificate
616
Serge Bazanski66e58952021-10-05 17:06:56 +0200617 // nodesDone is a list of channels populated with the return codes from all the
618 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100619 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200620 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200621 // nodeOpts are the cluster member nodes' mutable launch options, kept here
622 // to facilitate reboots.
623 nodeOpts []NodeOptions
624 // launchDir points at the directory keeping the nodes' state, such as storage
625 // images, firmware variable files, TPM state.
626 launchDir string
627 // socketDir points at the directory keeping UNIX socket files, such as these
628 // used to facilitate communication between QEMU and swtpm. It's different
629 // from launchDir, and anchored nearer the file system root, due to the
630 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100631 socketDir string
632 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200633
Lorenz Brun276a7462023-07-12 21:28:54 +0200634 // SOCKSDialer is used by DialNode to establish connections to nodes via the
Serge Bazanskibe742842022-04-04 13:18:50 +0200635 // SOCKS server ran by nanoswitch.
Lorenz Brun276a7462023-07-12 21:28:54 +0200636 SOCKSDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200637
638 // authClient is a cached authenticated owner connection to a Curator
639 // instance within the cluster.
640 authClient *grpc.ClientConn
641
642 // ctxT is the context individual node contexts are created from.
643 ctxT context.Context
644 // ctxC is used by Close to cancel the context under which the nodes are
645 // running.
646 ctxC context.CancelFunc
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200647
648 tpmFactory *TPMFactory
Serge Bazanskibe742842022-04-04 13:18:50 +0200649}
650
651// NodeInCluster represents information about a node that's part of a Cluster.
652type NodeInCluster struct {
653 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200654 ID string
655 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200656 // Address of the node on the network ran by nanoswitch. Not reachable from the
657 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
658 // on Cluster.Ports[SOCKSPort]).
659 ManagementAddress string
660}
661
662// firstConnection performs the initial owner credential escrow with a newly
663// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
664// running at 10.1.0.2, which is always the case with the current nanoswitch
665// implementation.
666//
Leopold20a036e2023-01-15 00:17:19 +0100667// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200668// information as NodeInCluster.
669func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
670 // Dial external service.
671 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100672 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200673 if err != nil {
674 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
675 }
676 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
677 return socksDialer.Dial("tcp", addr)
678 }
679 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
680 if err != nil {
681 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
682 }
683 defer initClient.Close()
684
685 // Retrieve owner certificate - this can take a while because the node is still
686 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100687 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200688 aaa := apb.NewAAAClient(initClient)
689 var cert *tls.Certificate
690 err = backoff.Retry(func() error {
691 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
692 if st, ok := status.FromError(err); ok {
693 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100694 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200695 return err
696 }
697 }
698 return backoff.Permanent(err)
Serge Bazanski62e6f0b2024-09-03 12:18:56 +0200699 }, backoff.WithContext(backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(time.Minute)), ctx))
Serge Bazanskibe742842022-04-04 13:18:50 +0200700 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200701 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200702 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100703 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200704
705 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200706 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200707 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
708 if err != nil {
709 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
710 }
711 defer authClient.Close()
712 mgmt := apb.NewManagementClient(authClient)
713
714 var node *NodeInCluster
715 err = backoff.Retry(func() error {
716 nodes, err := getNodes(ctx, mgmt)
717 if err != nil {
718 return fmt.Errorf("retrieving nodes failed: %w", err)
719 }
720 if len(nodes) != 1 {
721 return fmt.Errorf("expected one node, got %d", len(nodes))
722 }
723 n := nodes[0]
724 if n.Status == nil || n.Status.ExternalAddress == "" {
725 return fmt.Errorf("node has no status and/or address")
726 }
727 node = &NodeInCluster{
728 ID: identity.NodeID(n.Pubkey),
729 ManagementAddress: n.Status.ExternalAddress,
730 }
731 return nil
732 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
733 if err != nil {
734 return nil, nil, err
735 }
736
737 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200738}
739
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100740func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200741 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100742 if err != nil {
743 return nil, err
744 }
745 return f, nil
746}
747
Serge Bazanski66e58952021-10-05 17:06:56 +0200748// LaunchCluster launches a cluster of Metropolis node VMs together with a
749// Nanoswitch instance to network them all together.
750//
751// The given context will be used to run all qemu instances in the cluster, and
752// canceling the context or calling Close() will terminate them.
753func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200754 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200755 return nil, errors.New("refusing to start cluster with zero nodes")
756 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200757
Jan Schära9b060b2024-08-07 10:42:29 +0200758 // Prepare the node options. These will be kept as part of Cluster.
759 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
760 // launch. The runtime information can be later used to restart a node.
761 // The 0th node will be initialized first. The rest will follow after it
762 // had bootstrapped the cluster.
763 nodeOpts := make([]NodeOptions, opts.NumNodes)
764 for i := range opts.NumNodes {
765 nodeOpts[i] = opts.Node
766 nodeOpts[i].Name = fmt.Sprintf("node%d", i)
767 nodeOpts[i].SerialPort = newPrefixedStdio(i)
768 }
769 nodeOpts[0].NodeParameters = &apb.NodeParameters{
770 Cluster: &apb.NodeParameters_ClusterBootstrap_{
771 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
772 OwnerPublicKey: InsecurePublicKey,
773 InitialClusterConfiguration: opts.InitialClusterConfiguration,
774 Labels: &cpb.NodeLabels{
775 Pairs: []*cpb.NodeLabels_Pair{
776 {Key: nodeNumberKey, Value: "0"},
777 },
778 },
779 },
780 },
781 }
782 nodeOpts[0].PcapDump = true
783
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200784 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100785 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200786 if err != nil {
787 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
788 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100789 // Create the metroctl config directory. We keep it in /tmp because in some
790 // scenarios it's end-user visible and we want it short.
791 md, err := os.MkdirTemp("/tmp", "metroctl-*")
792 if err != nil {
793 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
794 }
795
796 // Create the socket directory. We keep it in /tmp because of socket path limits.
797 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200798 if err != nil {
799 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
800 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200801
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200802 // Set up TPM factory.
803 tpmf, err := NewTPMFactory(filepath.Join(ld, "tpm"))
804 if err != nil {
805 return nil, fmt.Errorf("failed to create TPM factory: %w", err)
806 }
807
Serge Bazanski66e58952021-10-05 17:06:56 +0200808 // Prepare links between nodes and nanoswitch.
809 var switchPorts []*os.File
Jan Schära9b060b2024-08-07 10:42:29 +0200810 for i := range opts.NumNodes {
Serge Bazanski66e58952021-10-05 17:06:56 +0200811 switchPort, vmPort, err := launch.NewSocketPair()
812 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200813 return nil, fmt.Errorf("failed to get socketpair: %w", err)
814 }
815 switchPorts = append(switchPorts, switchPort)
Jan Schära9b060b2024-08-07 10:42:29 +0200816 nodeOpts[i].ConnectToSocket = vmPort
Serge Bazanski66e58952021-10-05 17:06:56 +0200817 }
818
Serge Bazanskie78a0892021-10-07 17:03:49 +0200819 // Make a list of channels that will be populated by all running node qemu
820 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200821 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200822 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200823 done[i] = make(chan error, 1)
824 }
825
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100826 if opts.NodeLogsToFiles {
Jan Schära9b060b2024-08-07 10:42:29 +0200827 for i := range opts.NumNodes {
828 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i))
829 port, err := NewSerialFileLogger(path)
830 if err != nil {
831 return nil, fmt.Errorf("could not open log file for node %d: %w", i, err)
832 }
833 launch.Log("Node %d logs at %s", i, path)
834 nodeOpts[i].SerialPort = port
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100835 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100836 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200837
838 // Start the first node.
839 ctxT, ctxC := context.WithCancel(ctx)
Jan Schär0b927652024-07-31 18:08:50 +0200840 launch.Log("Cluster: Starting node %d...", 0)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200841 if err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[0], done[0]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200842 ctxC()
843 return nil, fmt.Errorf("failed to launch first node: %w", err)
844 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200845
Lorenz Brun150f24a2023-07-13 20:11:06 +0200846 localRegistryAddr := net.TCPAddr{
847 IP: net.IPv4(10, 42, 0, 82),
848 Port: 5000,
849 }
850
851 var guestSvcMap launch.GuestServiceMap
852 if opts.LocalRegistry != nil {
853 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
854 if err != nil {
855 ctxC()
856 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
857 }
858 s := http.Server{
859 Handler: opts.LocalRegistry,
860 }
861 go s.Serve(l)
862 go func() {
863 <-ctxT.Done()
864 s.Close()
865 }()
866 guestSvcMap = launch.GuestServiceMap{
867 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
868 }
869 }
870
Serge Bazanskie78a0892021-10-07 17:03:49 +0200871 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200872 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
873 if err != nil {
874 ctxC()
875 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
876 }
877
878 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100879 var serialPort io.ReadWriter
880 if opts.NodeLogsToFiles {
881 path := path.Join(ld, "nanoswitch.txt")
882 serialPort, err = NewSerialFileLogger(path)
883 if err != nil {
884 launch.Log("Could not open log file for nanoswitch: %v", err)
885 }
886 launch.Log("Nanoswitch logs at %s", path)
887 } else {
888 serialPort = newPrefixedStdio(99)
889 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200890 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100891 Name: "nanoswitch",
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000892 KernelPath: xKernelPath,
893 InitramfsPath: xInitramfsPath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200894 ExtraNetworkInterfaces: switchPorts,
895 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200896 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100897 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100898 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200899 }); err != nil {
900 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100901 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200902 }
903 }
904 }()
905
Serge Bazanskibe742842022-04-04 13:18:50 +0200906 // Build SOCKS dialer.
907 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
908 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200909 if err != nil {
910 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200911 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200912 }
913
Serge Bazanskibe742842022-04-04 13:18:50 +0200914 // Retrieve owner credentials and first node.
915 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200916 if err != nil {
917 ctxC()
918 return nil, err
919 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200920
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100921 // Write credentials to the metroctl directory.
922 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
923 ctxC()
924 return nil, fmt.Errorf("could not write owner key: %w", err)
925 }
926 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
927 ctxC()
928 return nil, fmt.Errorf("could not write owner certificate: %w", err)
929 }
930
Serge Bazanski53458ba2024-06-18 09:56:46 +0000931 launch.Log("Cluster: Node %d is %s", 0, firstNode.ID)
932
933 // Set up a partially initialized cluster instance, to be filled in the
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200934 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200935 cluster := &Cluster{
936 Owner: *cert,
937 Ports: portMap,
938 Nodes: map[string]*NodeInCluster{
939 firstNode.ID: firstNode,
940 },
941 NodeIDs: []string{
942 firstNode.ID,
943 },
944
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100945 nodesDone: done,
946 nodeOpts: nodeOpts,
947 launchDir: ld,
948 socketDir: sd,
949 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200950
Lorenz Brun276a7462023-07-12 21:28:54 +0200951 SOCKSDialer: socksDialer,
Serge Bazanskibe742842022-04-04 13:18:50 +0200952
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200953 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200954 ctxC: ctxC,
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200955
956 tpmFactory: tpmf,
Serge Bazanskibe742842022-04-04 13:18:50 +0200957 }
958
959 // Now start the rest of the nodes and register them into the cluster.
960
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200961 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200962 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200963 if err != nil {
964 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200965 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200966 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200967 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200968
969 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100970 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200971 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
972 if err != nil {
973 ctxC()
974 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
975 }
976 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100977 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200978
979 // Retrieve cluster info (for directory and ca public key) to register further
980 // nodes.
981 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
982 if err != nil {
983 ctxC()
984 return nil, fmt.Errorf("GetClusterInfo: %w", err)
985 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200986 caCert, err := x509.ParseCertificate(resI.CaCertificate)
987 if err != nil {
988 ctxC()
989 return nil, fmt.Errorf("ParseCertificate: %w", err)
990 }
991 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200992
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200993 // Use the retrieved information to configure the rest of the node options.
994 for i := 1; i < opts.NumNodes; i++ {
Jan Schära9b060b2024-08-07 10:42:29 +0200995 nodeOpts[i].NodeParameters = &apb.NodeParameters{
996 Cluster: &apb.NodeParameters_ClusterRegister_{
997 ClusterRegister: &apb.NodeParameters_ClusterRegister{
998 RegisterTicket: ticket,
999 ClusterDirectory: resI.ClusterDirectory,
1000 CaCertificate: resI.CaCertificate,
1001 Labels: &cpb.NodeLabels{
1002 Pairs: []*cpb.NodeLabels_Pair{
1003 {Key: nodeNumberKey, Value: fmt.Sprintf("%d", i)},
Serge Bazanski30e30b32024-05-22 14:11:56 +02001004 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001005 },
1006 },
1007 },
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001008 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001009 }
1010
1011 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001012 for i := 1; i < opts.NumNodes; i++ {
Jan Schär0b927652024-07-31 18:08:50 +02001013 launch.Log("Cluster: Starting node %d...", i)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001014 err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[i], done[i])
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001015 if err != nil {
Jan Schär0b927652024-07-31 18:08:50 +02001016 return nil, fmt.Errorf("failed to launch node %d: %w", i, err)
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001017 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001018 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001019
Serge Bazanski53458ba2024-06-18 09:56:46 +00001020 // Wait for nodes to appear as NEW, populate a map from node number (index into
Jan Schära9b060b2024-08-07 10:42:29 +02001021 // nodeOpts, etc.) to Metropolis Node ID.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001022 seenNodes := make(map[string]bool)
Serge Bazanski53458ba2024-06-18 09:56:46 +00001023 nodeNumberToID := make(map[int]string)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001024 launch.Log("Cluster: waiting for nodes to appear as NEW...")
1025 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001026 for {
1027 nodes, err := getNodes(ctx, mgmt)
1028 if err != nil {
1029 ctxC()
1030 return nil, fmt.Errorf("could not get nodes: %w", err)
1031 }
1032 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001033 if n.State != cpb.NodeState_NODE_STATE_NEW {
1034 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +02001035 }
Serge Bazanski87d9c592024-03-20 12:35:11 +01001036 if seenNodes[n.Id] {
1037 continue
1038 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001039 seenNodes[n.Id] = true
1040 cluster.Nodes[n.Id] = &NodeInCluster{
1041 ID: n.Id,
1042 Pubkey: n.Pubkey,
1043 }
Serge Bazanski53458ba2024-06-18 09:56:46 +00001044
1045 num, err := strconv.Atoi(node.GetNodeLabel(n.Labels, nodeNumberKey))
1046 if err != nil {
1047 return nil, fmt.Errorf("node %s has undecodable number label: %w", n.Id, err)
1048 }
1049 launch.Log("Cluster: Node %d is %s", num, n.Id)
1050 nodeNumberToID[num] = n.Id
Serge Bazanskie78a0892021-10-07 17:03:49 +02001051 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001052
1053 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001054 break
1055 }
1056 time.Sleep(1 * time.Second)
1057 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001058 }
1059 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001060
Serge Bazanski53458ba2024-06-18 09:56:46 +00001061 // Build the rest of NodeIDs from map.
1062 for i := 1; i < opts.NumNodes; i++ {
1063 cluster.NodeIDs = append(cluster.NodeIDs, nodeNumberToID[i])
1064 }
1065
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001066 approvedNodes := make(map[string]bool)
1067 upNodes := make(map[string]bool)
1068 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001069 for {
1070 nodes, err := getNodes(ctx, mgmt)
1071 if err != nil {
1072 ctxC()
1073 return nil, fmt.Errorf("could not get nodes: %w", err)
1074 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001075 for _, node := range nodes {
1076 if !seenNodes[node.Id] {
1077 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001078 continue
1079 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001080
1081 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
1082 launch.Log("Cluster: node %s is up", node.Id)
1083 upNodes[node.Id] = true
1084 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +02001085 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001086 if upNodes[node.Id] {
1087 continue
Serge Bazanskibe742842022-04-04 13:18:50 +02001088 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001089
1090 if !approvedNodes[node.Id] {
1091 launch.Log("Cluster: approving node %s", node.Id)
1092 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1093 Pubkey: node.Pubkey,
1094 })
1095 if err != nil {
1096 ctxC()
1097 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
1098 }
1099 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +02001100 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001101 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001102
Jan Schär0b927652024-07-31 18:08:50 +02001103 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes, len(upNodes)+1)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001104 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001105 break
1106 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001107 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001108 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001109 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001110
Serge Bazanski05f813b2023-03-16 17:58:39 +01001111 launch.Log("Cluster: all nodes up:")
Jan Schär0b927652024-07-31 18:08:50 +02001112 for i, nodeID := range cluster.NodeIDs {
1113 launch.Log("Cluster: %d. %s at %s", i, nodeID, cluster.Nodes[nodeID].ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001114 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001115 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001116
Serge Bazanskibe742842022-04-04 13:18:50 +02001117 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001118}
1119
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001120// RebootNode reboots the cluster member node matching the given index, and
1121// waits for it to rejoin the cluster. It will use the given context ctx to run
1122// cluster API requests, whereas the resulting QEMU process will be created
1123// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1124func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1125 if idx < 0 || idx >= len(c.NodeIDs) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001126 return fmt.Errorf("index out of bounds")
1127 }
1128 if c.nodeOpts[idx].Runtime == nil {
1129 return fmt.Errorf("node not running")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001130 }
1131 id := c.NodeIDs[idx]
1132
1133 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001134 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001135 if err != nil {
1136 return err
1137 }
1138 mgmt := apb.NewManagementClient(curC)
1139
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001140 // Cancel the node's context. This will shut down QEMU.
1141 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001142 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001143 err = <-c.nodesDone[idx]
1144 if err != nil {
1145 return fmt.Errorf("while restarting node: %w", err)
1146 }
1147
1148 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001149 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001150 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001151 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1152 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001153
Serge Bazanskibc969572024-03-21 11:56:13 +01001154 start := time.Now()
1155
1156 // Poll Management.GetNodes until the node is healthy.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001157 for {
1158 cs, err := getNode(ctx, mgmt, id)
1159 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001160 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001161 return err
1162 }
Serge Bazanskibc969572024-03-21 11:56:13 +01001163 launch.Log("Cluster: node health: %+v", cs.Health)
1164
1165 lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
1166 if lhb.After(start) && cs.Health == apb.Node_HEALTHY {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001167 break
1168 }
1169 time.Sleep(time.Second)
1170 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001171 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001172 return nil
1173}
1174
Serge Bazanski500f6e02024-04-03 12:06:40 +02001175// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
1176// given by idx. If the node is already shut down, this is a no-op.
1177func (c *Cluster) ShutdownNode(idx int) error {
1178 if idx < 0 || idx >= len(c.NodeIDs) {
1179 return fmt.Errorf("index out of bounds")
1180 }
1181 // Return if node is already stopped.
1182 select {
1183 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1184 return nil
1185 default:
1186 }
1187 id := c.NodeIDs[idx]
1188
1189 // Cancel the node's context. This will shut down QEMU.
1190 c.nodeOpts[idx].Runtime.CtxC()
1191 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
1192 err := <-c.nodesDone[idx]
1193 if err != nil {
1194 return fmt.Errorf("while shutting down node: %w", err)
1195 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001196 launch.Log("Cluster: node %d (%s) stopped.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001197 return nil
1198}
1199
1200// StartNode performs a power on of the node given by idx. If the node is already
1201// running, this is a no-op.
1202func (c *Cluster) StartNode(idx int) error {
1203 if idx < 0 || idx >= len(c.NodeIDs) {
1204 return fmt.Errorf("index out of bounds")
1205 }
1206 id := c.NodeIDs[idx]
1207 // Return if node is already running.
1208 select {
1209 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1210 default:
1211 return nil
1212 }
1213
1214 // Start QEMU again.
1215 launch.Log("Cluster: starting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001216 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanski500f6e02024-04-03 12:06:40 +02001217 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1218 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001219 launch.Log("Cluster: node %d (%s) started.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001220 return nil
1221}
1222
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001223// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001224// nodes to stop. It returns an error if stopping the nodes failed, or one of
1225// the nodes failed to fully start in the first place.
1226func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001227 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001228 if c.authClient != nil {
1229 c.authClient.Close()
1230 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001231 c.ctxC()
1232
Leopold20a036e2023-01-15 00:17:19 +01001233 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001234 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001235 for _, c := range c.nodesDone {
1236 err := <-c
1237 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001238 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001239 }
1240 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001241 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001242 os.RemoveAll(c.launchDir)
1243 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001244 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001245 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001246 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001247}
Serge Bazanskibe742842022-04-04 13:18:50 +02001248
1249// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1250// their ID. This is performed by connecting to the cluster nanoswitch via its
1251// SOCKS proxy, and using the cluster node list for name resolution.
1252//
1253// For example:
1254//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001255// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001256func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1257 host, port, err := net.SplitHostPort(addr)
1258 if err != nil {
1259 return nil, fmt.Errorf("invalid host:port: %w", err)
1260 }
1261 // Already an IP address?
1262 if net.ParseIP(host) != nil {
Lorenz Brun276a7462023-07-12 21:28:54 +02001263 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001264 }
1265
1266 // Otherwise, expect a node name.
1267 node, ok := c.Nodes[host]
1268 if !ok {
1269 return nil, fmt.Errorf("unknown node %q", host)
1270 }
1271 addr = net.JoinHostPort(node.ManagementAddress, port)
Lorenz Brun276a7462023-07-12 21:28:54 +02001272 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001273}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001274
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001275// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1276// Kubernetes authenticating proxy using the cluster owner identity.
1277// It currently has access to everything (i.e. the cluster-admin role)
1278// via the owner-admin binding.
1279func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1280 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1281 if err != nil {
1282 // We explicitly pass an Ed25519 private key in, so this can't happen
1283 panic(err)
1284 }
1285
1286 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001287 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001288 Host: host,
1289 TLSClientConfig: rest.TLSClientConfig{
1290 // TODO(q3k): use CA certificate
1291 Insecure: true,
1292 ServerName: "kubernetes.default.svc",
1293 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1294 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1295 },
1296 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1297 return c.DialNode(ctx, address)
1298 },
1299 }
1300 return kubernetes.NewForConfig(&clientConfig)
1301}
1302
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001303// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1304// which are currently Kubernetes controllers, ie. run an apiserver. This list
1305// might be empty if no node is currently configured with the
1306// 'KubernetesController' node.
1307func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1308 curC, err := c.CuratorClient()
1309 if err != nil {
1310 return nil, err
1311 }
1312 mgmt := apb.NewManagementClient(curC)
1313 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1314 Filter: "has(node.roles.kubernetes_controller)",
1315 })
1316 if err != nil {
1317 return nil, err
1318 }
1319 defer srv.CloseSend()
1320 var res []string
1321 for {
1322 n, err := srv.Recv()
1323 if err == io.EOF {
1324 break
1325 }
1326 if err != nil {
1327 return nil, err
1328 }
1329 if n.Status == nil || n.Status.ExternalAddress == "" {
1330 continue
1331 }
1332 res = append(res, n.Status.ExternalAddress)
1333 }
1334 return res, nil
1335}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001336
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001337// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1338// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001339func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1340 // Get an authenticated owner client within the cluster.
1341 curC, err := c.CuratorClient()
1342 if err != nil {
1343 return err
1344 }
1345 mgmt := apb.NewManagementClient(curC)
1346 nodes, err := getNodes(ctx, mgmt)
1347 if err != nil {
1348 return err
1349 }
1350
1351 var unhealthy []string
1352 for _, node := range nodes {
1353 if node.Health == apb.Node_HEALTHY {
1354 continue
1355 }
1356 unhealthy = append(unhealthy, node.Id)
1357 }
1358 if len(unhealthy) == 0 {
1359 return nil
1360 }
1361 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1362}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001363
1364// ApproveNode approves a node by ID, waiting for it to become UP.
1365func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1366 curC, err := c.CuratorClient()
1367 if err != nil {
1368 return err
1369 }
1370 mgmt := apb.NewManagementClient(curC)
1371
1372 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1373 Pubkey: c.Nodes[id].Pubkey,
1374 })
1375 if err != nil {
1376 return fmt.Errorf("ApproveNode: %w", err)
1377 }
1378 launch.Log("Cluster: %s: approved, waiting for UP", id)
1379 for {
1380 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1381 if err != nil {
1382 return fmt.Errorf("GetNodes: %w", err)
1383 }
1384 found := false
1385 for {
1386 node, err := nodes.Recv()
1387 if errors.Is(err, io.EOF) {
1388 break
1389 }
1390 if err != nil {
1391 return fmt.Errorf("Nodes.Recv: %w", err)
1392 }
1393 if node.Id != id {
1394 continue
1395 }
1396 if node.State != cpb.NodeState_NODE_STATE_UP {
1397 continue
1398 }
1399 found = true
1400 break
1401 }
1402 nodes.CloseSend()
1403
1404 if found {
1405 break
1406 }
1407 time.Sleep(time.Second)
1408 }
1409 launch.Log("Cluster: %s: UP", id)
1410 return nil
1411}
1412
1413// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1414func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1415 curC, err := c.CuratorClient()
1416 if err != nil {
1417 return err
1418 }
1419 mgmt := apb.NewManagementClient(curC)
1420
1421 tr := true
1422 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1423 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1424 Node: &apb.UpdateNodeRolesRequest_Id{
1425 Id: id,
1426 },
1427 KubernetesWorker: &tr,
1428 })
1429 return err
1430}
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001431
Jan Schära9b060b2024-08-07 10:42:29 +02001432// MakeKubernetesController adds the KubernetesController role to a node by ID.
1433func (c *Cluster) MakeKubernetesController(ctx context.Context, id string) error {
1434 curC, err := c.CuratorClient()
1435 if err != nil {
1436 return err
1437 }
1438 mgmt := apb.NewManagementClient(curC)
1439
1440 tr := true
1441 launch.Log("Cluster: %s: adding KubernetesController", id)
1442 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1443 Node: &apb.UpdateNodeRolesRequest_Id{
1444 Id: id,
1445 },
1446 KubernetesController: &tr,
1447 })
1448 return err
1449}
1450
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001451// MakeConsensusMember adds the ConsensusMember role to a node by ID.
1452func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
1453 curC, err := c.CuratorClient()
1454 if err != nil {
1455 return err
1456 }
1457 mgmt := apb.NewManagementClient(curC)
1458 cur := ipb.NewCuratorClient(curC)
1459
1460 tr := true
1461 launch.Log("Cluster: %s: adding ConsensusMember", id)
1462 bo := backoff.NewExponentialBackOff()
1463 bo.MaxElapsedTime = 10 * time.Second
1464
1465 backoff.Retry(func() error {
1466 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1467 Node: &apb.UpdateNodeRolesRequest_Id{
1468 Id: id,
1469 },
1470 ConsensusMember: &tr,
1471 })
1472 if err != nil {
1473 launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
1474 }
1475 return err
1476 }, backoff.WithContext(bo, ctx))
1477 if err != nil {
1478 return err
1479 }
1480
1481 launch.Log("Cluster: %s: waiting for learner/full members...", id)
1482
1483 learner := false
1484 for {
1485 res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
1486 if err != nil {
1487 return fmt.Errorf("GetConsensusStatus: %w", err)
1488 }
1489 for _, member := range res.EtcdMember {
1490 if member.Id != id {
1491 continue
1492 }
1493 switch member.Status {
1494 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
1495 if !learner {
1496 learner = true
1497 launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
1498 }
1499 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
1500 launch.Log("Cluster: %s: became a full member", id)
1501 return nil
1502 }
1503 }
1504 time.Sleep(100 * time.Millisecond)
1505 }
1506}