m/test: implement non-transient QEMU VMs

This patch reworks the launch code, enabling rebooting of cluster
member VMs, while precluding erasure of their transient state (disk
image, OVMF firmware variables, TPM state, MAC address).

RebootNode method included in this patch is cluster-aware in the sense
that it blocks until the node has re-joined the cluster.

Change-Id: Ie1236297d214399e927a67295200f8b8879a5b39
Reviewed-on: https://review.monogon.dev/c/monogon/+/664
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index 51dbe4b..c0af1fd 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -142,6 +142,13 @@
 				}
 				return nil
 			})
+			testEventual(t, "Node rejoin successful", ctx, 60*time.Second, func(ctx context.Context) error {
+				// Ensure nodes rejoin the cluster after a reboot by reboting the 1st node.
+				if err := cluster.RebootNode(ctx, 1); err != nil {
+					return fmt.Errorf("while rebooting a node: %w", err)
+				}
+				return nil
+			})
 		})
 		t.Run("Kubernetes", func(t *testing.T) {
 			t.Parallel()
diff --git a/metropolis/test/launch/cli/launch/main.go b/metropolis/test/launch/cli/launch/main.go
index a826ea0..b195ecf 100644
--- a/metropolis/test/launch/cli/launch/main.go
+++ b/metropolis/test/launch/cli/launch/main.go
@@ -28,12 +28,27 @@
 )
 
 func main() {
+	// Create the launch directory.
+	ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "node_state*")
+	if err != nil {
+		log.Fatalf("couldn't create a launch directory: %v", err)
+	}
+	defer os.RemoveAll(ld)
+	// Create the socket directory. Since using TEST_TMPDIR will often result in
+	// paths too long to place UNIX sockets at, we'll use the LSB temporary
+	// directory.
+	sd, err := os.MkdirTemp("/tmp", "node_sock*")
+	if err != nil {
+		log.Fatalf("couldn't create a socket directory: %v", err)
+	}
+	defer os.RemoveAll(sd)
+
 	var ports []uint16
 	for _, p := range cluster.NodePorts {
 		ports = append(ports, uint16(p))
 	}
 	ctx := clicontext.WithInterrupt(context.Background())
-	err := cluster.LaunchNode(ctx, cluster.NodeOptions{
+	err = cluster.LaunchNode(ctx, ld, sd, &cluster.NodeOptions{
 		Ports:      launch.IdentityPortMap(ports),
 		SerialPort: os.Stdout,
 		NodeParameters: &apb.NodeParameters{
diff --git a/metropolis/test/launch/cluster/BUILD.bazel b/metropolis/test/launch/cluster/BUILD.bazel
index b70428c..54e6a46 100644
--- a/metropolis/test/launch/cluster/BUILD.bazel
+++ b/metropolis/test/launch/cluster/BUILD.bazel
@@ -18,6 +18,7 @@
     importpath = "source.monogon.dev/metropolis/test/launch/cluster",
     visibility = ["//visibility:public"],
     deps = [
+        "//metropolis/cli/pkg/datafile",
         "//metropolis/node",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/rpc",
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index da7862f..a25e992 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -28,6 +28,7 @@
 	"google.golang.org/grpc/status"
 	"google.golang.org/protobuf/proto"
 
+	"source.monogon.dev/metropolis/cli/pkg/datafile"
 	"source.monogon.dev/metropolis/node"
 	common "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/identity"
@@ -63,6 +64,26 @@
 	// NodeParameters is passed into the VM and subsequently used for bootstrapping or
 	// registering into a cluster.
 	NodeParameters *apb.NodeParameters
+
+	// Mac is the node's MAC address.
+	Mac *net.HardwareAddr
+
+	// Runtime keeps the node's QEMU runtime state.
+	Runtime *NodeRuntime
+}
+
+// Runtime keeps the node's QEMU runtime options.
+type NodeRuntime struct {
+	// ld points at the node's launch directory storing data such as storage
+	// images, firmware variables or the TPM state.
+	ld string
+	// sd points at the node's socket directory.
+	sd string
+
+	// ctxT is the context QEMU will execute in.
+	ctxT context.Context
+	// CtxC is the QEMU context's cancellation function.
+	CtxC context.CancelFunc
 }
 
 // NodePorts is the list of ports a fully operational Metropolis node listens on
@@ -78,45 +99,124 @@
 	node.DebuggerPort,
 }
 
+// setupRuntime creates the node's QEMU runtime directory, together with all
+// files required to preserve its state, a level below the chosen path ld. The
+// node's socket directory is similarily created a level below sd. It may
+// return an I/O error.
+func setupRuntime(ld, sd string) (*NodeRuntime, error) {
+	// Create a temporary directory to keep all the runtime files.
+	stdp, err := os.MkdirTemp(ld, "node_state*")
+	if err != nil {
+		return nil, fmt.Errorf("failed to create the state directory: %w", err)
+	}
+
+	// Initialize the node's storage with a prebuilt image.
+	si, err := datafile.ResolveRunfile("metropolis/node/node.img")
+	if err != nil {
+		return nil, fmt.Errorf("while resolving a path: %w", err)
+	}
+	di := filepath.Join(stdp, filepath.Base(si))
+	log.Printf("Cluster: copying the node image: %s -> %s", si, di)
+	if err := copyFile(si, di); err != nil {
+		return nil, fmt.Errorf("while copying the node image: %w", err)
+	}
+
+	// Initialize the OVMF firmware variables file.
+	sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
+	if err != nil {
+		return nil, fmt.Errorf("while resolving a path: %w", err)
+	}
+	dv := filepath.Join(stdp, filepath.Base(sv))
+	if err := copyFile(sv, dv); err != nil {
+		return nil, fmt.Errorf("while copying firmware variables: %w", err)
+	}
+
+	// Create the TPM state directory and initialize all files required by swtpm.
+	tpmt := filepath.Join(stdp, "tpm")
+	if err := os.Mkdir(tpmt, 0755); err != nil {
+		return nil, fmt.Errorf("while creating the TPM directory: %w", err)
+	}
+	tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
+	if err != nil {
+		return nil, fmt.Errorf("while resolving a path: %w", err)
+	}
+	tpmf, err := os.ReadDir(tpms)
+	if err != nil {
+		return nil, fmt.Errorf("failed to read TPM directory: %w", err)
+	}
+	for _, file := range tpmf {
+		name := file.Name()
+		src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
+		if err != nil {
+			return nil, fmt.Errorf("while resolving a path: %w", err)
+		}
+		tgt := filepath.Join(tpmt, name)
+		if err := copyFile(src, tgt); err != nil {
+			return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
+		}
+	}
+
+	// Create the socket directory.
+	sotdp, err := os.MkdirTemp(sd, "node_sock*")
+	if err != nil {
+		return nil, fmt.Errorf("failed to create the socket directory: %w", err)
+	}
+
+	return &NodeRuntime{
+		ld: stdp,
+		sd: sotdp,
+	}, nil
+}
+
+// curatorClient returns an authenticated owner connection to a Curator
+// instance within Cluster c, or nil together with an error.
+func (c *Cluster) curatorClient() (*grpc.ClientConn, error) {
+	if c.authClient == nil {
+		authCreds := rpc.NewAuthenticatedCredentials(c.Owner, nil)
+		remote := net.JoinHostPort(c.NodeIDs[0], common.CuratorServicePort.PortString())
+		authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds), grpc.WithContextDialer(c.DialNode))
+		if err != nil {
+			return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
+		}
+		c.authClient = authClient
+	}
+	return c.authClient, nil
+}
+
 // LaunchNode launches a single Metropolis node instance with the given options.
 // The instance runs mostly paravirtualized but with some emulated hardware
 // similar to how a cloud provider might set up its VMs. The disk is fully
-// writable but is run in snapshot mode meaning that changes are not kept beyond
-// a single invocation.
-func LaunchNode(ctx context.Context, options NodeOptions) error {
-	// Pin temp directory to /tmp until we can use abstract socket namespace in QEMU
-	// (next release after 5.0,
+// writable, and the changes are kept across reboots and shutdowns. ld and sd
+// point to the launch directory and the socket directory, holding the nodes'
+// state files (storage, tpm state, firmware state), and UNIX socket files
+// (swtpm <-> QEMU interplay) respectively. The directories must exist before
+// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
+// if either are not initialized.
+func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
+	// TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
+	// of /tmp (requires QEMU version >5.0).
 	// https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
 	// swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
 	// that we open and pass the name of it to QEMU. Not pinning this crashes both
 	// swtpm and qemu because we run into UNIX socket length limitations (for legacy
 	// reasons 108 chars).
-	tempDir, err := os.MkdirTemp("/tmp", "launch*")
-	if err != nil {
-		return fmt.Errorf("failed to create temporary directory: %w", err)
-	}
-	defer os.RemoveAll(tempDir)
 
-	// Copy TPM state into a temporary directory since it's being modified by the
-	// emulator
-	tpmTargetDir := filepath.Join(tempDir, "tpm")
-	tpmSrcDir := "metropolis/node/tpm"
-	if err := os.Mkdir(tpmTargetDir, 0755); err != nil {
-		return fmt.Errorf("failed to create TPM state directory: %w", err)
-	}
-	tpmFiles, err := os.ReadDir(tpmSrcDir)
-	if err != nil {
-		return fmt.Errorf("failed to read TPM directory: %w", err)
-	}
-	for _, file := range tpmFiles {
-		name := file.Name()
-		src := filepath.Join(tpmSrcDir, name)
-		target := filepath.Join(tpmTargetDir, name)
-		if err := copyFile(src, target); err != nil {
-			return fmt.Errorf("failed to copy TPM directory: file %q to %q: %w", src, target, err)
+	// If it's the node's first start, set up its runtime directories.
+	if options.Runtime == nil {
+		r, err := setupRuntime(ld, sd)
+		if err != nil {
+			return fmt.Errorf("while setting up node runtime: %w", err)
 		}
+		options.Runtime = r
 	}
 
+	// Replace the node's context with a new one.
+	r := options.Runtime
+	if r.CtxC != nil {
+		r.CtxC()
+	}
+	r.ctxT, r.CtxC = context.WithCancel(ctx)
+
 	var qemuNetType string
 	var qemuNetConfig launch.QemuValue
 	if options.ConnectToSocket != nil {
@@ -135,20 +235,25 @@
 		}
 	}
 
-	tpmSocketPath := filepath.Join(tempDir, "tpm-socket")
-
-	mac, err := generateRandomEthernetMAC()
-	if err != nil {
-		return err
+	// Generate the node's MAC address if it isn't already set in NodeOptions.
+	if options.Mac == nil {
+		mac, err := generateRandomEthernetMAC()
+		if err != nil {
+			return err
+		}
+		options.Mac = mac
 	}
 
+	tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
+	fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
+	storagePath := filepath.Join(r.ld, "node.img")
 	qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
 		"-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
 		"-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
-		"-drive", "if=pflash,format=raw,snapshot=on,file=external/edk2/OVMF_VARS.fd",
-		"-drive", "if=virtio,format=raw,snapshot=on,cache=unsafe,file=metropolis/node/node.img",
+		"-drive", "if=pflash,format=raw,file=" + fwVarPath,
+		"-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
 		"-netdev", qemuNetConfig.ToOption(qemuNetType),
-		"-device", "virtio-net-pci,netdev=net0,mac=" + mac.String(),
+		"-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
 		"-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
 		"-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
 		"-device", "tpm-tis,tpmdev=tpm0",
@@ -160,7 +265,7 @@
 	}
 
 	if options.NodeParameters != nil {
-		parametersPath := filepath.Join(tempDir, "parameters.pb")
+		parametersPath := filepath.Join(r.ld, "parameters.pb")
 		parametersRaw, err := proto.Marshal(options.NodeParameters)
 		if err != nil {
 			return fmt.Errorf("failed to encode node paraeters: %w", err)
@@ -172,20 +277,21 @@
 	}
 
 	// Start TPM emulator as a subprocess
-	tpmCtx, tpmCancel := context.WithCancel(ctx)
+	tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
 	defer tpmCancel()
 
-	tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmTargetDir, "--ctrl", "type=unixio,path="+tpmSocketPath)
+	tpmd := filepath.Join(r.ld, "tpm")
+	tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
 	tpmEmuCmd.Stderr = os.Stderr
 	tpmEmuCmd.Stdout = os.Stdout
 
-	err = tpmEmuCmd.Start()
+	err := tpmEmuCmd.Start()
 	if err != nil {
 		return fmt.Errorf("failed to start TPM emulator: %w", err)
 	}
 
 	// Start the main qemu binary
-	systemCmd := exec.CommandContext(ctx, "qemu-system-x86_64", qemuArgs...)
+	systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
 	if options.ConnectToSocket != nil {
 		systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
 	}
@@ -268,6 +374,23 @@
 	return res, nil
 }
 
+// getNode wraps Management.GetNodes. It returns node information matching
+// given node ID.
+func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
+	nodes, err := getNodes(ctx, mgmt)
+	if err != nil {
+		return nil, fmt.Errorf("could not get nodes: %w", err)
+	}
+	for _, n := range nodes {
+		eid := identity.NodeID(n.Pubkey)
+		if eid != id {
+			continue
+		}
+		return n, nil
+	}
+	return nil, fmt.Errorf("no such node.")
+}
+
 // Gets a random EUI-48 Ethernet MAC address
 func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
 	macBuf := make([]byte, 6)
@@ -323,12 +446,31 @@
 	// nodes' qemu instances. It's used by Close to ensure all nodes have
 	// succesfully been stopped.
 	nodesDone []chan error
-	// ctxC is used by Close to cancel the context under which the nodes are
-	// running.
-	ctxC context.CancelFunc
+	// nodeOpts are the cluster member nodes' mutable launch options, kept here
+	// to facilitate reboots.
+	nodeOpts []NodeOptions
+	// launchDir points at the directory keeping the nodes' state, such as storage
+	// images, firmware variable files, TPM state.
+	launchDir string
+	// socketDir points at the directory keeping UNIX socket files, such as these
+	// used to facilitate communication between QEMU and swtpm. It's different
+	// from launchDir, and anchored nearer the file system root, due to the
+	// socket path length limitation imposed by the kernel.
+	socketDir string
+
 	// socksDialer is used by DialNode to establish connections to nodes via the
 	// SOCKS server ran by nanoswitch.
 	socksDialer proxy.Dialer
+
+	// authClient is a cached authenticated owner connection to a Curator
+	// instance within the cluster.
+	authClient *grpc.ClientConn
+
+	// ctxT is the context individual node contexts are created from.
+	ctxT context.Context
+	// ctxC is used by Close to cancel the context under which the nodes are
+	// running.
+	ctxC context.CancelFunc
 }
 
 // NodeInCluster represents information about a node that's part of a Cluster.
@@ -428,7 +570,16 @@
 		return nil, errors.New("refusing to start cluster with zero nodes")
 	}
 
-	ctxT, ctxC := context.WithCancel(ctx)
+	// Create the launch directory.
+	ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster*")
+	if err != nil {
+		return nil, fmt.Errorf("failed to create the launch directory: %w", err)
+	}
+	// Create the socket directory.
+	sd, err := os.MkdirTemp("/tmp", "cluster*")
+	if err != nil {
+		return nil, fmt.Errorf("failed to create the socket directory: %w", err)
+	}
 
 	// Prepare links between nodes and nanoswitch.
 	var switchPorts []*os.File
@@ -436,7 +587,6 @@
 	for i := 0; i < opts.NumNodes; i++ {
 		switchPort, vmPort, err := launch.NewSocketPair()
 		if err != nil {
-			ctxC()
 			return nil, fmt.Errorf("failed to get socketpair: %w", err)
 		}
 		switchPorts = append(switchPorts, switchPort)
@@ -450,20 +600,29 @@
 		done[i] = make(chan error, 1)
 	}
 
-	// Start first node.
-	log.Printf("Cluster: Starting node %d...", 1)
-	go func() {
-		err := LaunchNode(ctxT, NodeOptions{
-			ConnectToSocket: vmPorts[0],
-			NodeParameters: &apb.NodeParameters{
-				Cluster: &apb.NodeParameters_ClusterBootstrap_{
-					ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
-						OwnerPublicKey: InsecurePublicKey,
-					},
+	// Prepare the node options. These will be kept as part of Cluster.
+	// nodeOpts[].Runtime will be initialized by LaunchNode during the first
+	// launch. The runtime information can be later used to restart a node.
+	// The 0th node will be initialized first. The rest will follow after it
+	// had bootstrapped the cluster.
+	nodeOpts := make([]NodeOptions, opts.NumNodes)
+	nodeOpts[0] = NodeOptions{
+		ConnectToSocket: vmPorts[0],
+		NodeParameters: &apb.NodeParameters{
+			Cluster: &apb.NodeParameters_ClusterBootstrap_{
+				ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
+					OwnerPublicKey: InsecurePublicKey,
 				},
 			},
-			SerialPort: newPrefixedStdio(0),
-		})
+		},
+		SerialPort: newPrefixedStdio(0),
+	}
+
+	// Start the first node.
+	ctxT, ctxC := context.WithCancel(ctx)
+	log.Printf("Cluster: Starting node %d...", 1)
+	go func() {
+		err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
 		done[0] <- err
 	}()
 
@@ -502,6 +661,8 @@
 		return nil, err
 	}
 
+	// Set up a partially initialized cluster instance, to be filled in in the
+	// later steps.
 	cluster := &Cluster{
 		Owner: *cert,
 		Ports: portMap,
@@ -512,24 +673,26 @@
 			firstNode.ID,
 		},
 
-		nodesDone:   done,
+		nodesDone: done,
+		nodeOpts:  nodeOpts,
+		launchDir: ld,
+		socketDir: sd,
+
 		socksDialer: socksDialer,
 
+		ctxT: ctxT,
 		ctxC: ctxC,
 	}
 
 	// Now start the rest of the nodes and register them into the cluster.
 
-	// Build authenticated owner client to first node.
-	authCreds := rpc.NewAuthenticatedCredentials(*cert, nil)
-	remote := net.JoinHostPort(cluster.NodeIDs[0], common.CuratorServicePort.PortString())
-	authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds), grpc.WithContextDialer(cluster.DialNode))
+	// Get an authenticated owner client within the cluster.
+	curC, err := cluster.curatorClient()
 	if err != nil {
 		ctxC()
-		return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
+		return nil, fmt.Errorf("curatorClient: %w", err)
 	}
-	defer authClient.Close()
-	mgmt := apb.NewManagementClient(authClient)
+	mgmt := apb.NewManagementClient(curC)
 
 	// Retrieve register ticket to register further nodes.
 	log.Printf("Cluster: retrieving register ticket...")
@@ -549,23 +712,30 @@
 		return nil, fmt.Errorf("GetClusterInfo: %w", err)
 	}
 
+	// Use the retrieved information to configure the rest of the node options.
+	for i := 1; i < opts.NumNodes; i++ {
+		nodeOpts[i] = NodeOptions{
+			ConnectToSocket: vmPorts[i],
+			NodeParameters: &apb.NodeParameters{
+				Cluster: &apb.NodeParameters_ClusterRegister_{
+					ClusterRegister: &apb.NodeParameters_ClusterRegister{
+						RegisterTicket:   ticket,
+						ClusterDirectory: resI.ClusterDirectory,
+						CaCertificate:    resI.CaCertificate,
+					},
+				},
+			},
+			SerialPort: newPrefixedStdio(i),
+		}
+	}
+
+	// Now run the rest of the nodes.
+	//
 	// TODO(q3k): parallelize this
 	for i := 1; i < opts.NumNodes; i++ {
 		log.Printf("Cluster: Starting node %d...", i+1)
 		go func(i int) {
-			err := LaunchNode(ctxT, NodeOptions{
-				ConnectToSocket: vmPorts[i],
-				NodeParameters: &apb.NodeParameters{
-					Cluster: &apb.NodeParameters_ClusterRegister_{
-						ClusterRegister: &apb.NodeParameters_ClusterRegister{
-							RegisterTicket:   ticket,
-							ClusterDirectory: resI.ClusterDirectory,
-							CaCertificate:    resI.CaCertificate,
-						},
-					},
-				},
-				SerialPort: newPrefixedStdio(i),
-			})
+			err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
 			done[i] <- err
 		}(i)
 		var newNode *apb.Node
@@ -641,11 +811,82 @@
 	return cluster, nil
 }
 
-// Close cancels the running cluster's context and waits for all virtualized
+// RebootNode reboots the cluster member node matching the given index, and
+// waits for it to rejoin the cluster. It will use the given context ctx to run
+// cluster API requests, whereas the resulting QEMU process will be created
+// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
+func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
+	if idx < 0 || idx >= len(c.NodeIDs) {
+		return fmt.Errorf("index out of bounds.")
+	}
+	id := c.NodeIDs[idx]
+
+	// Get an authenticated owner client within the cluster.
+	curC, err := c.curatorClient()
+	if err != nil {
+		return err
+	}
+	mgmt := apb.NewManagementClient(curC)
+
+	// Get the timestamp of the node's last update, as observed by Curator.
+	// It'll be needed to make sure it had rejoined the cluster after the reboot.
+	var is *apb.Node
+	for {
+		r, err := getNode(ctx, mgmt, id)
+		if err != nil {
+			return err
+		}
+
+		// Node status may be absent if it hasn't reported to the cluster yet. Wait
+		// for it to appear before progressing further.
+		if r.Status != nil {
+			is = r
+			break
+		}
+		time.Sleep(time.Second)
+	}
+
+	// Cancel the node's context. This will shut down QEMU.
+	c.nodeOpts[idx].Runtime.CtxC()
+	log.Printf("Cluster: waiting for node %d (%s) to stop.", idx, id)
+	err = <-c.nodesDone[idx]
+	if err != nil {
+		return fmt.Errorf("while restarting node: %w", err)
+	}
+
+	// Start QEMU again.
+	log.Printf("Cluster: restarting node %d (%s).", idx, id)
+	go func(n int) {
+		err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
+		c.nodesDone[n] <- err
+	}(idx)
+
+	// Poll Management.GetNodes until the node's timestamp is updated.
+	for {
+		cs, err := getNode(ctx, mgmt, id)
+		if err != nil {
+			return err
+		}
+		if cs.Status == nil {
+			continue
+		}
+		if cs.Status.Timestamp > is.Status.Timestamp {
+			break
+		}
+		time.Sleep(time.Second)
+	}
+	log.Printf("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
+	return nil
+}
+
+// Close cancels the running clusters' context and waits for all virtualized
 // nodes to stop. It returns an error if stopping the nodes failed, or one of
 // the nodes failed to fully start in the first place.
 func (c *Cluster) Close() error {
 	log.Printf("Cluster: stopping...")
+	if c.authClient != nil {
+		c.authClient.Close()
+	}
 	c.ctxC()
 
 	var errors []error
@@ -656,6 +897,9 @@
 			errors = append(errors, err)
 		}
 	}
+	log.Printf("Cluster: removing nodes' state files.")
+	os.RemoveAll(c.launchDir)
+	os.RemoveAll(c.socketDir)
 	log.Printf("Cluster: done")
 	return multierr.Combine(errors...)
 }