blob: a0ad82f029b8491a5ae4b04c86be25b21d466e02 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "net"
16 "os"
17 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010018 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020019 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020024 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020025 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "google.golang.org/protobuf/proto"
30
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020031 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020032 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020033 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020035 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020037 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020038 "source.monogon.dev/metropolis/test/launch"
39)
40
Leopold20a036e2023-01-15 00:17:19 +010041// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020042type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010043 // Name is a human-readable identifier to be used in debug output.
44 Name string
45
Serge Bazanski66e58952021-10-05 17:06:56 +020046 // Ports contains the port mapping where to expose the internal ports of the VM to
47 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
48 // ConnectToSocket is set.
49 Ports launch.PortMap
50
Leopold20a036e2023-01-15 00:17:19 +010051 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
52 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020053 // want to test reboot behavior this should be false.
54 AllowReboot bool
55
Leopold20a036e2023-01-15 00:17:19 +010056 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
57 // set, it is instead connected to the given file descriptor/socket. If this is
58 // set, all port maps from the Ports option are ignored. Intended for networking
59 // this instance together with others for running more complex network
60 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020061 ConnectToSocket *os.File
62
Leopoldacfad5b2023-01-15 14:05:25 +010063 // When PcapDump is set, all traffic is dumped to a pcap file in the
64 // runtime directory (e.g. "net0.pcap" for the first interface).
65 PcapDump bool
66
Leopold20a036e2023-01-15 00:17:19 +010067 // SerialPort is an io.ReadWriter over which you can communicate with the serial
68 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020069 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
70 SerialPort io.ReadWriter
71
72 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
73 // registering into a cluster.
74 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020075
76 // Mac is the node's MAC address.
77 Mac *net.HardwareAddr
78
79 // Runtime keeps the node's QEMU runtime state.
80 Runtime *NodeRuntime
81}
82
Leopold20a036e2023-01-15 00:17:19 +010083// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020084type NodeRuntime struct {
85 // ld points at the node's launch directory storing data such as storage
86 // images, firmware variables or the TPM state.
87 ld string
88 // sd points at the node's socket directory.
89 sd string
90
91 // ctxT is the context QEMU will execute in.
92 ctxT context.Context
93 // CtxC is the QEMU context's cancellation function.
94 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +020095}
96
97// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020098var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020099 node.ConsensusPort,
100
101 node.CuratorServicePort,
102 node.DebugServicePort,
103
104 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100105 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200106 node.CuratorServicePort,
107 node.DebuggerPort,
108}
109
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200110// setupRuntime creates the node's QEMU runtime directory, together with all
111// files required to preserve its state, a level below the chosen path ld. The
112// node's socket directory is similarily created a level below sd. It may
113// return an I/O error.
114func setupRuntime(ld, sd string) (*NodeRuntime, error) {
115 // Create a temporary directory to keep all the runtime files.
116 stdp, err := os.MkdirTemp(ld, "node_state*")
117 if err != nil {
118 return nil, fmt.Errorf("failed to create the state directory: %w", err)
119 }
120
121 // Initialize the node's storage with a prebuilt image.
122 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
123 if err != nil {
124 return nil, fmt.Errorf("while resolving a path: %w", err)
125 }
126 di := filepath.Join(stdp, filepath.Base(si))
Serge Bazanski05f813b2023-03-16 17:58:39 +0100127 launch.Log("Cluster: copying node image: %s -> %s", si, di)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200128 if err := copyFile(si, di); err != nil {
129 return nil, fmt.Errorf("while copying the node image: %w", err)
130 }
131
132 // Initialize the OVMF firmware variables file.
133 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
134 if err != nil {
135 return nil, fmt.Errorf("while resolving a path: %w", err)
136 }
137 dv := filepath.Join(stdp, filepath.Base(sv))
138 if err := copyFile(sv, dv); err != nil {
139 return nil, fmt.Errorf("while copying firmware variables: %w", err)
140 }
141
142 // Create the TPM state directory and initialize all files required by swtpm.
143 tpmt := filepath.Join(stdp, "tpm")
144 if err := os.Mkdir(tpmt, 0755); err != nil {
145 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
146 }
147 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
148 if err != nil {
149 return nil, fmt.Errorf("while resolving a path: %w", err)
150 }
151 tpmf, err := os.ReadDir(tpms)
152 if err != nil {
153 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
154 }
155 for _, file := range tpmf {
156 name := file.Name()
157 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
158 if err != nil {
159 return nil, fmt.Errorf("while resolving a path: %w", err)
160 }
161 tgt := filepath.Join(tpmt, name)
162 if err := copyFile(src, tgt); err != nil {
163 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
164 }
165 }
166
167 // Create the socket directory.
168 sotdp, err := os.MkdirTemp(sd, "node_sock*")
169 if err != nil {
170 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
171 }
172
173 return &NodeRuntime{
174 ld: stdp,
175 sd: sotdp,
176 }, nil
177}
178
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200179// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200180// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200181func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200182 if c.authClient == nil {
183 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, nil)
Serge Bazanski58ddc092022-06-30 18:23:33 +0200184 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100185 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200186 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200187 for _, n := range c.NodeIDs {
188 ep, err := resolver.NodeWithDefaultPort(n)
189 if err != nil {
190 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
191 }
192 r.AddEndpoint(ep)
193 }
194 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
195 grpc.WithTransportCredentials(authCreds),
196 grpc.WithResolvers(r),
197 grpc.WithContextDialer(c.DialNode),
198 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200199 if err != nil {
200 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
201 }
202 c.authClient = authClient
203 }
204 return c.authClient, nil
205}
206
Serge Bazanski66e58952021-10-05 17:06:56 +0200207// LaunchNode launches a single Metropolis node instance with the given options.
208// The instance runs mostly paravirtualized but with some emulated hardware
209// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200210// writable, and the changes are kept across reboots and shutdowns. ld and sd
211// point to the launch directory and the socket directory, holding the nodes'
212// state files (storage, tpm state, firmware state), and UNIX socket files
213// (swtpm <-> QEMU interplay) respectively. The directories must exist before
214// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
215// if either are not initialized.
216func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
217 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
218 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200219 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
220 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
221 // that we open and pass the name of it to QEMU. Not pinning this crashes both
222 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
223 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200224
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200225 // If it's the node's first start, set up its runtime directories.
226 if options.Runtime == nil {
227 r, err := setupRuntime(ld, sd)
228 if err != nil {
229 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200230 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200231 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200232 }
233
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200234 // Replace the node's context with a new one.
235 r := options.Runtime
236 if r.CtxC != nil {
237 r.CtxC()
238 }
239 r.ctxT, r.CtxC = context.WithCancel(ctx)
240
Serge Bazanski66e58952021-10-05 17:06:56 +0200241 var qemuNetType string
242 var qemuNetConfig launch.QemuValue
243 if options.ConnectToSocket != nil {
244 qemuNetType = "socket"
245 qemuNetConfig = launch.QemuValue{
246 "id": {"net0"},
247 "fd": {"3"},
248 }
249 } else {
250 qemuNetType = "user"
251 qemuNetConfig = launch.QemuValue{
252 "id": {"net0"},
253 "net": {"10.42.0.0/24"},
254 "dhcpstart": {"10.42.0.10"},
255 "hostfwd": options.Ports.ToQemuForwards(),
256 }
257 }
258
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200259 // Generate the node's MAC address if it isn't already set in NodeOptions.
260 if options.Mac == nil {
261 mac, err := generateRandomEthernetMAC()
262 if err != nil {
263 return err
264 }
265 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200266 }
267
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200268 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
269 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
270 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200271 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
272 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
273 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200274 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
275 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200276 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200277 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200278 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
279 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
280 "-device", "tpm-tis,tpmdev=tpm0",
281 "-device", "virtio-rng-pci",
282 "-serial", "stdio"}
283
284 if !options.AllowReboot {
285 qemuArgs = append(qemuArgs, "-no-reboot")
286 }
287
288 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200289 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200290 parametersRaw, err := proto.Marshal(options.NodeParameters)
291 if err != nil {
292 return fmt.Errorf("failed to encode node paraeters: %w", err)
293 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100294 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200295 return fmt.Errorf("failed to write node parameters: %w", err)
296 }
297 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
298 }
299
Leopoldacfad5b2023-01-15 14:05:25 +0100300 if options.PcapDump {
301 var qemuNetDump launch.QemuValue
302 pcapPath := filepath.Join(r.ld, "net0.pcap")
303 if options.PcapDump {
304 qemuNetDump = launch.QemuValue{
305 "id": {"net0"},
306 "netdev": {"net0"},
307 "file": {pcapPath},
308 }
309 }
310 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
311 }
312
Serge Bazanski66e58952021-10-05 17:06:56 +0200313 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200314 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200315 defer tpmCancel()
316
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200317 tpmd := filepath.Join(r.ld, "tpm")
318 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200319 tpmEmuCmd.Stderr = os.Stderr
320 tpmEmuCmd.Stdout = os.Stdout
321
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200322 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200323 if err != nil {
324 return fmt.Errorf("failed to start TPM emulator: %w", err)
325 }
326
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200327 // Wait for the socket to be created by the TPM emulator before launching
328 // QEMU.
329 for {
330 _, err := os.Stat(tpmSocketPath)
331 if err == nil {
332 break
333 }
334 if err != nil && !os.IsNotExist(err) {
335 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
336 }
337 if err := tpmCtx.Err(); err != nil {
338 return fmt.Errorf("while waiting for the TPM socket: %w", err)
339 }
340 time.Sleep(time.Millisecond * 100)
341 }
342
Serge Bazanski66e58952021-10-05 17:06:56 +0200343 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200344 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200345 if options.ConnectToSocket != nil {
346 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
347 }
348
349 var stdErrBuf bytes.Buffer
350 systemCmd.Stderr = &stdErrBuf
351 systemCmd.Stdout = options.SerialPort
352
Leopoldaf5086b2023-01-15 14:12:42 +0100353 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
354
Serge Bazanski66e58952021-10-05 17:06:56 +0200355 err = systemCmd.Run()
356
357 // Stop TPM emulator and wait for it to exit to properly reap the child process
358 tpmCancel()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100359 launch.Log("Node: Waiting for TPM emulator to exit")
Serge Bazanski66e58952021-10-05 17:06:56 +0200360 // Wait returns a SIGKILL error because we just cancelled its context.
361 // We still need to call it to avoid creating zombies.
362 _ = tpmEmuCmd.Wait()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100363 launch.Log("Node: TPM emulator done")
Serge Bazanski66e58952021-10-05 17:06:56 +0200364
365 var exerr *exec.ExitError
366 if err != nil && errors.As(err, &exerr) {
367 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
368 if status.Signaled() && status.Signal() == syscall.SIGKILL {
369 // Process was killed externally (most likely by our context being canceled).
370 // This is a normal exit for us, so return nil
371 return nil
372 }
373 exerr.Stderr = stdErrBuf.Bytes()
374 newErr := launch.QEMUError(*exerr)
375 return &newErr
376 }
377 return err
378}
379
380func copyFile(src, dst string) error {
381 in, err := os.Open(src)
382 if err != nil {
383 return fmt.Errorf("when opening source: %w", err)
384 }
385 defer in.Close()
386
387 out, err := os.Create(dst)
388 if err != nil {
389 return fmt.Errorf("when creating destination: %w", err)
390 }
391 defer out.Close()
392
393 _, err = io.Copy(out, in)
394 if err != nil {
395 return fmt.Errorf("when copying file: %w", err)
396 }
397 return out.Close()
398}
399
Serge Bazanskie78a0892021-10-07 17:03:49 +0200400// getNodes wraps around Management.GetNodes to return a list of nodes in a
401// cluster.
402func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200403 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100404 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100405 err := backoff.Retry(func() error {
406 res = nil
407 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200408 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100409 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200410 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100411 for {
412 node, err := srvN.Recv()
413 if err == io.EOF {
414 break
415 }
416 if err != nil {
417 return fmt.Errorf("GetNodes.Recv: %w", err)
418 }
419 res = append(res, node)
420 }
421 return nil
422 }, bo)
423 if err != nil {
424 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200425 }
426 return res, nil
427}
428
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200429// getNode wraps Management.GetNodes. It returns node information matching
430// given node ID.
431func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
432 nodes, err := getNodes(ctx, mgmt)
433 if err != nil {
434 return nil, fmt.Errorf("could not get nodes: %w", err)
435 }
436 for _, n := range nodes {
437 eid := identity.NodeID(n.Pubkey)
438 if eid != id {
439 continue
440 }
441 return n, nil
442 }
443 return nil, fmt.Errorf("no such node.")
444}
445
Serge Bazanski66e58952021-10-05 17:06:56 +0200446// Gets a random EUI-48 Ethernet MAC address
447func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
448 macBuf := make([]byte, 6)
449 _, err := rand.Read(macBuf)
450 if err != nil {
451 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
452 }
453
454 // Set U/L bit and clear I/G bit (locally administered individual MAC)
455 // Ref IEEE 802-2014 Section 8.2.2
456 macBuf[0] = (macBuf[0] | 2) & 0xfe
457 mac := net.HardwareAddr(macBuf)
458 return &mac, nil
459}
460
Serge Bazanskibe742842022-04-04 13:18:50 +0200461const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200462
Serge Bazanskibe742842022-04-04 13:18:50 +0200463// ClusterPorts contains all ports handled by Nanoswitch.
464var ClusterPorts = []uint16{
465 // Forwarded to the first node.
466 uint16(node.CuratorServicePort),
467 uint16(node.DebugServicePort),
468 uint16(node.KubernetesAPIPort),
469 uint16(node.KubernetesAPIWrappedPort),
470
471 // SOCKS proxy to the switch network
472 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200473}
474
475// ClusterOptions contains all options for launching a Metropolis cluster.
476type ClusterOptions struct {
477 // The number of nodes this cluster should be started with.
478 NumNodes int
479}
480
481// Cluster is the running Metropolis cluster launched using the LaunchCluster
482// function.
483type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200484 // Owner is the TLS Certificate of the owner of the test cluster. This can be
485 // used to authenticate further clients to the running cluster.
486 Owner tls.Certificate
487 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200488 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200489 Ports launch.PortMap
490
Serge Bazanskibe742842022-04-04 13:18:50 +0200491 // Nodes is a map from Node ID to its runtime information.
492 Nodes map[string]*NodeInCluster
493 // NodeIDs is a list of node IDs that are backing this cluster, in order of
494 // creation.
495 NodeIDs []string
496
Serge Bazanski66e58952021-10-05 17:06:56 +0200497 // nodesDone is a list of channels populated with the return codes from all the
498 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100499 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200500 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200501 // nodeOpts are the cluster member nodes' mutable launch options, kept here
502 // to facilitate reboots.
503 nodeOpts []NodeOptions
504 // launchDir points at the directory keeping the nodes' state, such as storage
505 // images, firmware variable files, TPM state.
506 launchDir string
507 // socketDir points at the directory keeping UNIX socket files, such as these
508 // used to facilitate communication between QEMU and swtpm. It's different
509 // from launchDir, and anchored nearer the file system root, due to the
510 // socket path length limitation imposed by the kernel.
511 socketDir string
512
Serge Bazanskibe742842022-04-04 13:18:50 +0200513 // socksDialer is used by DialNode to establish connections to nodes via the
514 // SOCKS server ran by nanoswitch.
515 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200516
517 // authClient is a cached authenticated owner connection to a Curator
518 // instance within the cluster.
519 authClient *grpc.ClientConn
520
521 // ctxT is the context individual node contexts are created from.
522 ctxT context.Context
523 // ctxC is used by Close to cancel the context under which the nodes are
524 // running.
525 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200526}
527
528// NodeInCluster represents information about a node that's part of a Cluster.
529type NodeInCluster struct {
530 // ID of the node, which can be used to dial this node's services via DialNode.
531 ID string
532 // Address of the node on the network ran by nanoswitch. Not reachable from the
533 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
534 // on Cluster.Ports[SOCKSPort]).
535 ManagementAddress string
536}
537
538// firstConnection performs the initial owner credential escrow with a newly
539// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
540// running at 10.1.0.2, which is always the case with the current nanoswitch
541// implementation.
542//
Leopold20a036e2023-01-15 00:17:19 +0100543// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200544// information as NodeInCluster.
545func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
546 // Dial external service.
547 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
548 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
549 if err != nil {
550 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
551 }
552 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
553 return socksDialer.Dial("tcp", addr)
554 }
555 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
556 if err != nil {
557 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
558 }
559 defer initClient.Close()
560
561 // Retrieve owner certificate - this can take a while because the node is still
562 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100563 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200564 aaa := apb.NewAAAClient(initClient)
565 var cert *tls.Certificate
566 err = backoff.Retry(func() error {
567 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
568 if st, ok := status.FromError(err); ok {
569 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100570 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200571 return err
572 }
573 }
574 return backoff.Permanent(err)
575 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
576 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200577 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200578 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100579 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200580
581 // Now connect authenticated and get the node ID.
582 creds := rpc.NewAuthenticatedCredentials(*cert, nil)
583 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
584 if err != nil {
585 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
586 }
587 defer authClient.Close()
588 mgmt := apb.NewManagementClient(authClient)
589
590 var node *NodeInCluster
591 err = backoff.Retry(func() error {
592 nodes, err := getNodes(ctx, mgmt)
593 if err != nil {
594 return fmt.Errorf("retrieving nodes failed: %w", err)
595 }
596 if len(nodes) != 1 {
597 return fmt.Errorf("expected one node, got %d", len(nodes))
598 }
599 n := nodes[0]
600 if n.Status == nil || n.Status.ExternalAddress == "" {
601 return fmt.Errorf("node has no status and/or address")
602 }
603 node = &NodeInCluster{
604 ID: identity.NodeID(n.Pubkey),
605 ManagementAddress: n.Status.ExternalAddress,
606 }
607 return nil
608 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
609 if err != nil {
610 return nil, nil, err
611 }
612
613 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200614}
615
616// LaunchCluster launches a cluster of Metropolis node VMs together with a
617// Nanoswitch instance to network them all together.
618//
619// The given context will be used to run all qemu instances in the cluster, and
620// canceling the context or calling Close() will terminate them.
621func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200622 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200623 return nil, errors.New("refusing to start cluster with zero nodes")
624 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200625
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200626 // Create the launch directory.
627 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster*")
628 if err != nil {
629 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
630 }
631 // Create the socket directory.
632 sd, err := os.MkdirTemp("/tmp", "cluster*")
633 if err != nil {
634 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
635 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200636
637 // Prepare links between nodes and nanoswitch.
638 var switchPorts []*os.File
639 var vmPorts []*os.File
640 for i := 0; i < opts.NumNodes; i++ {
641 switchPort, vmPort, err := launch.NewSocketPair()
642 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200643 return nil, fmt.Errorf("failed to get socketpair: %w", err)
644 }
645 switchPorts = append(switchPorts, switchPort)
646 vmPorts = append(vmPorts, vmPort)
647 }
648
Serge Bazanskie78a0892021-10-07 17:03:49 +0200649 // Make a list of channels that will be populated by all running node qemu
650 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200651 done := make([]chan error, opts.NumNodes)
652 for i, _ := range done {
653 done[i] = make(chan error, 1)
654 }
655
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200656 // Prepare the node options. These will be kept as part of Cluster.
657 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
658 // launch. The runtime information can be later used to restart a node.
659 // The 0th node will be initialized first. The rest will follow after it
660 // had bootstrapped the cluster.
661 nodeOpts := make([]NodeOptions, opts.NumNodes)
662 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100663 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200664 ConnectToSocket: vmPorts[0],
665 NodeParameters: &apb.NodeParameters{
666 Cluster: &apb.NodeParameters_ClusterBootstrap_{
667 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
668 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200669 },
670 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200671 },
672 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100673 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200674 }
675
676 // Start the first node.
677 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100678 launch.Log("Cluster: Starting node %d...", 1)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200679 go func() {
680 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200681 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100682 launch.Log("Node %d finished with an error: %v", 1, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200683 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200684 done[0] <- err
685 }()
686
Serge Bazanskie78a0892021-10-07 17:03:49 +0200687 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200688 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
689 if err != nil {
690 ctxC()
691 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
692 }
693
694 go func() {
695 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100696 Name: "nanoswitch [99]",
Serge Bazanski66e58952021-10-05 17:06:56 +0200697 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100698 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200699 ExtraNetworkInterfaces: switchPorts,
700 PortMap: portMap,
Serge Bazanski1fbc5972022-06-22 13:36:16 +0200701 SerialPort: newPrefixedStdio(99),
Leopoldacfad5b2023-01-15 14:05:25 +0100702 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200703 }); err != nil {
704 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100705 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200706 }
707 }
708 }()
709
Serge Bazanskibe742842022-04-04 13:18:50 +0200710 // Build SOCKS dialer.
711 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
712 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200713 if err != nil {
714 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200715 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200716 }
717
Serge Bazanskibe742842022-04-04 13:18:50 +0200718 // Retrieve owner credentials and first node.
719 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200720 if err != nil {
721 ctxC()
722 return nil, err
723 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200724
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200725 // Set up a partially initialized cluster instance, to be filled in in the
726 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200727 cluster := &Cluster{
728 Owner: *cert,
729 Ports: portMap,
730 Nodes: map[string]*NodeInCluster{
731 firstNode.ID: firstNode,
732 },
733 NodeIDs: []string{
734 firstNode.ID,
735 },
736
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200737 nodesDone: done,
738 nodeOpts: nodeOpts,
739 launchDir: ld,
740 socketDir: sd,
741
Serge Bazanskibe742842022-04-04 13:18:50 +0200742 socksDialer: socksDialer,
743
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200744 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200745 ctxC: ctxC,
746 }
747
748 // Now start the rest of the nodes and register them into the cluster.
749
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200750 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200751 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200752 if err != nil {
753 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200754 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200755 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200756 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200757
758 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100759 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200760 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
761 if err != nil {
762 ctxC()
763 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
764 }
765 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100766 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200767
768 // Retrieve cluster info (for directory and ca public key) to register further
769 // nodes.
770 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
771 if err != nil {
772 ctxC()
773 return nil, fmt.Errorf("GetClusterInfo: %w", err)
774 }
775
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200776 // Use the retrieved information to configure the rest of the node options.
777 for i := 1; i < opts.NumNodes; i++ {
778 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100779 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200780 ConnectToSocket: vmPorts[i],
781 NodeParameters: &apb.NodeParameters{
782 Cluster: &apb.NodeParameters_ClusterRegister_{
783 ClusterRegister: &apb.NodeParameters_ClusterRegister{
784 RegisterTicket: ticket,
785 ClusterDirectory: resI.ClusterDirectory,
786 CaCertificate: resI.CaCertificate,
787 },
788 },
789 },
790 SerialPort: newPrefixedStdio(i),
791 }
792 }
793
794 // Now run the rest of the nodes.
795 //
Serge Bazanskie78a0892021-10-07 17:03:49 +0200796 // TODO(q3k): parallelize this
797 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100798 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200799 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200800 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200801 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100802 launch.Log("Node %d finished with an error: %v", i, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200803 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200804 done[i] <- err
805 }(i)
806 var newNode *apb.Node
807
Serge Bazanski05f813b2023-03-16 17:58:39 +0100808 launch.Log("Cluster: waiting for node %d to appear as NEW...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200809 for {
810 nodes, err := getNodes(ctx, mgmt)
811 if err != nil {
812 ctxC()
813 return nil, fmt.Errorf("could not get nodes: %w", err)
814 }
815 for _, n := range nodes {
816 if n.State == cpb.NodeState_NODE_STATE_NEW {
817 newNode = n
818 break
819 }
820 }
821 if newNode != nil {
822 break
823 }
824 time.Sleep(1 * time.Second)
825 }
826 id := identity.NodeID(newNode.Pubkey)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100827 launch.Log("Cluster: node %d is %s", i, id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200828
Serge Bazanski05f813b2023-03-16 17:58:39 +0100829 launch.Log("Cluster: approving node %d", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200830 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
831 Pubkey: newNode.Pubkey,
832 })
833 if err != nil {
834 ctxC()
835 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
836 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100837 launch.Log("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200838 for {
839 nodes, err := getNodes(ctx, mgmt)
840 if err != nil {
841 ctxC()
842 return nil, fmt.Errorf("could not get nodes: %w", err)
843 }
844 found := false
845 for _, n := range nodes {
846 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
847 continue
848 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200849 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200850 break
851 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200852 if n.State != cpb.NodeState_NODE_STATE_UP {
853 break
854 }
855 found = true
856 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
857 ID: identity.NodeID(n.Pubkey),
858 ManagementAddress: n.Status.ExternalAddress,
859 }
860 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
861 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200862 }
863 if found {
864 break
865 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200866 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200867 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100868 launch.Log("Cluster: node %d (%s) UP!", i, id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200869 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200870
Serge Bazanski05f813b2023-03-16 17:58:39 +0100871 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +0200872 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100873 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +0200874 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200875
Serge Bazanskibe742842022-04-04 13:18:50 +0200876 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200877}
878
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200879// RebootNode reboots the cluster member node matching the given index, and
880// waits for it to rejoin the cluster. It will use the given context ctx to run
881// cluster API requests, whereas the resulting QEMU process will be created
882// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
883func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
884 if idx < 0 || idx >= len(c.NodeIDs) {
885 return fmt.Errorf("index out of bounds.")
886 }
887 id := c.NodeIDs[idx]
888
889 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200890 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200891 if err != nil {
892 return err
893 }
894 mgmt := apb.NewManagementClient(curC)
895
896 // Get the timestamp of the node's last update, as observed by Curator.
897 // It'll be needed to make sure it had rejoined the cluster after the reboot.
898 var is *apb.Node
899 for {
900 r, err := getNode(ctx, mgmt, id)
901 if err != nil {
902 return err
903 }
904
905 // Node status may be absent if it hasn't reported to the cluster yet. Wait
906 // for it to appear before progressing further.
907 if r.Status != nil {
908 is = r
909 break
910 }
911 time.Sleep(time.Second)
912 }
913
914 // Cancel the node's context. This will shut down QEMU.
915 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100916 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200917 err = <-c.nodesDone[idx]
918 if err != nil {
919 return fmt.Errorf("while restarting node: %w", err)
920 }
921
922 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100923 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200924 go func(n int) {
925 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200926 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100927 launch.Log("Node %d finished with an error: %v", n, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200928 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200929 c.nodesDone[n] <- err
930 }(idx)
931
932 // Poll Management.GetNodes until the node's timestamp is updated.
933 for {
934 cs, err := getNode(ctx, mgmt, id)
935 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100936 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200937 return err
938 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100939 launch.Log("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200940 if cs.Status == nil {
941 continue
942 }
Mateusz Zalega28800ad2022-07-08 14:56:02 +0200943 if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200944 break
945 }
946 time.Sleep(time.Second)
947 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100948 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200949 return nil
950}
951
952// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +0200953// nodes to stop. It returns an error if stopping the nodes failed, or one of
954// the nodes failed to fully start in the first place.
955func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100956 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200957 if c.authClient != nil {
958 c.authClient.Close()
959 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200960 c.ctxC()
961
Leopold20a036e2023-01-15 00:17:19 +0100962 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +0100963 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +0200964 for _, c := range c.nodesDone {
965 err := <-c
966 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +0100967 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200968 }
969 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100970 launch.Log("Cluster: removing nodes' state files.")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200971 os.RemoveAll(c.launchDir)
972 os.RemoveAll(c.socketDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100973 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +0100974 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200975}
Serge Bazanskibe742842022-04-04 13:18:50 +0200976
977// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
978// their ID. This is performed by connecting to the cluster nanoswitch via its
979// SOCKS proxy, and using the cluster node list for name resolution.
980//
981// For example:
982//
Serge Bazanski05f813b2023-03-16 17:58:39 +0100983// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +0200984func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
985 host, port, err := net.SplitHostPort(addr)
986 if err != nil {
987 return nil, fmt.Errorf("invalid host:port: %w", err)
988 }
989 // Already an IP address?
990 if net.ParseIP(host) != nil {
991 return c.socksDialer.Dial("tcp", addr)
992 }
993
994 // Otherwise, expect a node name.
995 node, ok := c.Nodes[host]
996 if !ok {
997 return nil, fmt.Errorf("unknown node %q", host)
998 }
999 addr = net.JoinHostPort(node.ManagementAddress, port)
1000 return c.socksDialer.Dial("tcp", addr)
1001}