treewide: introduce osbase package and move things around

All except localregistry moved from metropolis/pkg to osbase,
localregistry moved to metropolis/test as its only used there anyway.

Change-Id: If1a4bf377364bef0ac23169e1b90379c71b06d72
Reviewed-on: https://review.monogon.dev/c/monogon/+/3079
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/metropolis/test/launch/cluster.go b/metropolis/test/launch/cluster.go
new file mode 100644
index 0000000..7ae5f83
--- /dev/null
+++ b/metropolis/test/launch/cluster.go
@@ -0,0 +1,1476 @@
+// cluster builds on the launch package and implements launching Metropolis
+// nodes and clusters in a virtualized environment using qemu. It's kept in a
+// separate package as it depends on a Metropolis node image, which might not be
+// required for some use of the launch library.
+package launch
+
+import (
+	"bytes"
+	"context"
+	"crypto/ed25519"
+	"crypto/rand"
+	"crypto/tls"
+	"crypto/x509"
+	"encoding/pem"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"net/http"
+	"os"
+	"os/exec"
+	"path"
+	"path/filepath"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"github.com/bazelbuild/rules_go/go/runfiles"
+	"github.com/cenkalti/backoff/v4"
+	"go.uber.org/multierr"
+	"golang.org/x/net/proxy"
+	"golang.org/x/sys/unix"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/proto"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/rest"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	apb "source.monogon.dev/metropolis/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
+
+	"source.monogon.dev/go/qcow2"
+	metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
+	"source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/node/core/rpc/resolver"
+	"source.monogon.dev/metropolis/test/localregistry"
+	"source.monogon.dev/osbase/test/launch"
+)
+
+const (
+	// nodeNumberKey is the key of the node label used to carry a node's numerical
+	// index in the test system.
+	nodeNumberKey string = "test-node-number"
+)
+
+// NodeOptions contains all options that can be passed to Launch()
+type NodeOptions struct {
+	// Name is a human-readable identifier to be used in debug output.
+	Name string
+
+	// Ports contains the port mapping where to expose the internal ports of the VM to
+	// the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
+	// ConnectToSocket is set.
+	Ports launch.PortMap
+
+	// If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
+	// command. Metropolis nodes generally restart on almost all errors, so unless you
+	// want to test reboot behavior this should be false.
+	AllowReboot bool
+
+	// By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
+	// set, it is instead connected to the given file descriptor/socket. If this is
+	// set, all port maps from the Ports option are ignored. Intended for networking
+	// this instance together with others for running more complex network
+	// configurations.
+	ConnectToSocket *os.File
+
+	// When PcapDump is set, all traffic is dumped to a pcap file in the
+	// runtime directory (e.g. "net0.pcap" for the first interface).
+	PcapDump bool
+
+	// SerialPort is an io.ReadWriter over which you can communicate with the serial
+	// port of the machine. It can be set to an existing file descriptor (like
+	// os.Stdout/os.Stderr) or any Go structure implementing this interface.
+	SerialPort io.ReadWriter
+
+	// NodeParameters is passed into the VM and subsequently used for bootstrapping or
+	// registering into a cluster.
+	NodeParameters *apb.NodeParameters
+
+	// Mac is the node's MAC address.
+	Mac *net.HardwareAddr
+
+	// Runtime keeps the node's QEMU runtime state.
+	Runtime *NodeRuntime
+}
+
+// NodeRuntime keeps the node's QEMU runtime options.
+type NodeRuntime struct {
+	// ld points at the node's launch directory storing data such as storage
+	// images, firmware variables or the TPM state.
+	ld string
+	// sd points at the node's socket directory.
+	sd string
+
+	// ctxT is the context QEMU will execute in.
+	ctxT context.Context
+	// CtxC is the QEMU context's cancellation function.
+	CtxC context.CancelFunc
+}
+
+// NodePorts is the list of ports a fully operational Metropolis node listens on
+var NodePorts = []node.Port{
+	node.ConsensusPort,
+
+	node.CuratorServicePort,
+	node.DebugServicePort,
+
+	node.KubernetesAPIPort,
+	node.KubernetesAPIWrappedPort,
+	node.CuratorServicePort,
+	node.DebuggerPort,
+	node.MetricsPort,
+}
+
+// setupRuntime creates the node's QEMU runtime directory, together with all
+// files required to preserve its state, a level below the chosen path ld. The
+// node's socket directory is similarily created a level below sd. It may
+// return an I/O error.
+func setupRuntime(ld, sd string) (*NodeRuntime, error) {
+	// Create a temporary directory to keep all the runtime files.
+	stdp, err := os.MkdirTemp(ld, "node_state*")
+	if err != nil {
+		return nil, fmt.Errorf("failed to create the state directory: %w", err)
+	}
+
+	// Initialize the node's storage with a prebuilt image.
+	si, err := runfiles.Rlocation("_main/metropolis/node/image.img")
+	if err != nil {
+		return nil, fmt.Errorf("while resolving a path: %w", err)
+	}
+
+	di := filepath.Join(stdp, "image.qcow2")
+	launch.Log("Cluster: generating node QCOW2 snapshot image: %s -> %s", si, di)
+
+	df, err := os.Create(di)
+	if err != nil {
+		return nil, fmt.Errorf("while opening image for writing: %w", err)
+	}
+	defer df.Close()
+	if err := qcow2.Generate(df, qcow2.GenerateWithBackingFile(si)); err != nil {
+		return nil, fmt.Errorf("while creating copy-on-write node image: %w", err)
+	}
+
+	// Initialize the OVMF firmware variables file.
+	sv, err := runfiles.Rlocation("edk2/OVMF_VARS.fd")
+	if err != nil {
+		return nil, fmt.Errorf("while resolving a path: %w", err)
+	}
+	dv := filepath.Join(stdp, filepath.Base(sv))
+	if err := copyFile(sv, dv); err != nil {
+		return nil, fmt.Errorf("while copying firmware variables: %w", err)
+	}
+
+	// Create the socket directory.
+	sotdp, err := os.MkdirTemp(sd, "node_sock*")
+	if err != nil {
+		return nil, fmt.Errorf("failed to create the socket directory: %w", err)
+	}
+
+	return &NodeRuntime{
+		ld: stdp,
+		sd: sotdp,
+	}, nil
+}
+
+// CuratorClient returns an authenticated owner connection to a Curator
+// instance within Cluster c, or nil together with an error.
+func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
+	if c.authClient == nil {
+		authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
+		r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
+			launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
+		}))
+		for _, n := range c.NodeIDs {
+			ep, err := resolver.NodeWithDefaultPort(n)
+			if err != nil {
+				return nil, fmt.Errorf("could not add node %q by DNS: %w", n, err)
+			}
+			r.AddEndpoint(ep)
+		}
+		authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
+			grpc.WithTransportCredentials(authCreds),
+			grpc.WithResolvers(r),
+			grpc.WithContextDialer(c.DialNode),
+		)
+		if err != nil {
+			return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
+		}
+		c.authClient = authClient
+	}
+	return c.authClient, nil
+}
+
+// LaunchNode launches a single Metropolis node instance with the given options.
+// The instance runs mostly paravirtualized but with some emulated hardware
+// similar to how a cloud provider might set up its VMs. The disk is fully
+// writable, and the changes are kept across reboots and shutdowns. ld and sd
+// point to the launch directory and the socket directory, holding the nodes'
+// state files (storage, tpm state, firmware state), and UNIX socket files
+// (swtpm <-> QEMU interplay) respectively. The directories must exist before
+// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
+// if either are not initialized.
+func LaunchNode(ctx context.Context, ld, sd string, tpmFactory *TPMFactory, options *NodeOptions, doneC chan error) error {
+	// TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
+	// of /tmp (requires QEMU version >5.0).
+	// https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
+	// swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
+	// that we open and pass the name of it to QEMU. Not pinning this crashes both
+	// swtpm and qemu because we run into UNIX socket length limitations (for legacy
+	// reasons 108 chars).
+
+	// If it's the node's first start, set up its runtime directories.
+	if options.Runtime == nil {
+		r, err := setupRuntime(ld, sd)
+		if err != nil {
+			return fmt.Errorf("while setting up node runtime: %w", err)
+		}
+		options.Runtime = r
+	}
+
+	// Replace the node's context with a new one.
+	r := options.Runtime
+	if r.CtxC != nil {
+		r.CtxC()
+	}
+	r.ctxT, r.CtxC = context.WithCancel(ctx)
+
+	var qemuNetType string
+	var qemuNetConfig launch.QemuValue
+	if options.ConnectToSocket != nil {
+		qemuNetType = "socket"
+		qemuNetConfig = launch.QemuValue{
+			"id": {"net0"},
+			"fd": {"3"},
+		}
+	} else {
+		qemuNetType = "user"
+		qemuNetConfig = launch.QemuValue{
+			"id":        {"net0"},
+			"net":       {"10.42.0.0/24"},
+			"dhcpstart": {"10.42.0.10"},
+			"hostfwd":   options.Ports.ToQemuForwards(),
+		}
+	}
+
+	// Generate the node's MAC address if it isn't already set in NodeOptions.
+	if options.Mac == nil {
+		mac, err := generateRandomEthernetMAC()
+		if err != nil {
+			return err
+		}
+		options.Mac = mac
+	}
+
+	ovmfCodePath, err := runfiles.Rlocation("edk2/OVMF_CODE.fd")
+	if err != nil {
+		return err
+	}
+
+	tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
+	fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
+	storagePath := filepath.Join(r.ld, "image.qcow2")
+	qemuArgs := []string{
+		"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "2048",
+		"-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
+		"-drive", "if=pflash,format=raw,readonly=on,file=" + ovmfCodePath,
+		"-drive", "if=pflash,format=raw,file=" + fwVarPath,
+		"-drive", "if=virtio,format=qcow2,cache=unsafe,file=" + storagePath,
+		"-netdev", qemuNetConfig.ToOption(qemuNetType),
+		"-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
+		"-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
+		"-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
+		"-device", "tpm-tis,tpmdev=tpm0",
+		"-device", "virtio-rng-pci",
+		"-serial", "stdio",
+	}
+
+	if !options.AllowReboot {
+		qemuArgs = append(qemuArgs, "-no-reboot")
+	}
+
+	if options.NodeParameters != nil {
+		parametersPath := filepath.Join(r.ld, "parameters.pb")
+		parametersRaw, err := proto.Marshal(options.NodeParameters)
+		if err != nil {
+			return fmt.Errorf("failed to encode node paraeters: %w", err)
+		}
+		if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
+			return fmt.Errorf("failed to write node parameters: %w", err)
+		}
+		qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
+	}
+
+	if options.PcapDump {
+		qemuNetDump := launch.QemuValue{
+			"id":     {"net0"},
+			"netdev": {"net0"},
+			"file":   {filepath.Join(r.ld, "net0.pcap")},
+		}
+		qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
+	}
+
+	// Manufacture TPM if needed.
+	tpmd := filepath.Join(r.ld, "tpm")
+	err = tpmFactory.Manufacture(ctx, tpmd, &TPMPlatform{
+		Manufacturer: "Monogon",
+		Version:      "1.0",
+		Model:        "TestCluster",
+	})
+	if err != nil {
+		return fmt.Errorf("could not manufacture TPM: %w", err)
+	}
+
+	// Start TPM emulator as a subprocess
+	swtpm, err := runfiles.Rlocation("swtpm/swtpm")
+	if err != nil {
+		return fmt.Errorf("could not find swtpm: %w", err)
+	}
+
+	tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
+
+	tpmEmuCmd := exec.CommandContext(tpmCtx, swtpm, "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
+	// Silence warnings from unsafe libtpms build (uses non-constant-time
+	// cryptographic operations).
+	tpmEmuCmd.Env = append(tpmEmuCmd.Env, "MONOGON_LIBTPMS_ACKNOWLEDGE_UNSAFE=yes")
+	tpmEmuCmd.Stderr = os.Stderr
+	tpmEmuCmd.Stdout = os.Stdout
+
+	err = tpmEmuCmd.Start()
+	if err != nil {
+		tpmCancel()
+		return fmt.Errorf("failed to start TPM emulator: %w", err)
+	}
+
+	// Wait for the socket to be created by the TPM emulator before launching
+	// QEMU.
+	for {
+		_, err := os.Stat(tpmSocketPath)
+		if err == nil {
+			break
+		}
+		if !os.IsNotExist(err) {
+			tpmCancel()
+			return fmt.Errorf("while stat-ing TPM socket path: %w", err)
+		}
+		if err := tpmCtx.Err(); err != nil {
+			tpmCancel()
+			return fmt.Errorf("while waiting for the TPM socket: %w", err)
+		}
+		time.Sleep(time.Millisecond * 100)
+	}
+
+	// Start the main qemu binary
+	systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
+	if options.ConnectToSocket != nil {
+		systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
+	}
+
+	var stdErrBuf bytes.Buffer
+	systemCmd.Stderr = &stdErrBuf
+	systemCmd.Stdout = options.SerialPort
+
+	launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
+
+	go func() {
+		launch.Log("Node: Starting...")
+		err = systemCmd.Run()
+		launch.Log("Node: Returned: %v", err)
+
+		// Stop TPM emulator and wait for it to exit to properly reap the child process
+		tpmCancel()
+		launch.Log("Node: Waiting for TPM emulator to exit")
+		// Wait returns a SIGKILL error because we just cancelled its context.
+		// We still need to call it to avoid creating zombies.
+		errTpm := tpmEmuCmd.Wait()
+		launch.Log("Node: TPM emulator done: %v", errTpm)
+
+		var exerr *exec.ExitError
+		if err != nil && errors.As(err, &exerr) {
+			status := exerr.ProcessState.Sys().(syscall.WaitStatus)
+			if status.Signaled() && status.Signal() == syscall.SIGKILL {
+				// Process was killed externally (most likely by our context being canceled).
+				// This is a normal exit for us, so return nil
+				doneC <- nil
+				return
+			}
+			exerr.Stderr = stdErrBuf.Bytes()
+			newErr := launch.QEMUError(*exerr)
+			launch.Log("Node: %q", stdErrBuf.String())
+			doneC <- &newErr
+			return
+		}
+		doneC <- err
+	}()
+	return nil
+}
+
+func copyFile(src, dst string) error {
+	in, err := os.Open(src)
+	if err != nil {
+		return fmt.Errorf("when opening source: %w", err)
+	}
+	defer in.Close()
+
+	out, err := os.Create(dst)
+	if err != nil {
+		return fmt.Errorf("when creating destination: %w", err)
+	}
+	defer out.Close()
+
+	endPos, err := in.Seek(0, io.SeekEnd)
+	if err != nil {
+		return fmt.Errorf("when getting source end: %w", err)
+	}
+
+	// Copy the file while preserving its sparseness. The image files are very
+	// sparse (less than 10% allocated), so this is a lot faster.
+	var lastHoleStart int64
+	for {
+		dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
+		if err != nil {
+			return fmt.Errorf("when seeking to next data block: %w", err)
+		}
+		holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
+		if err != nil {
+			return fmt.Errorf("when seeking to next hole: %w", err)
+		}
+		lastHoleStart = holeStart
+		if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
+			return fmt.Errorf("when seeking to current data block: %w", err)
+		}
+		if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
+			return fmt.Errorf("when seeking output to next data block: %w", err)
+		}
+		if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
+			return fmt.Errorf("when copying file: %w", err)
+		}
+		if endPos == holeStart {
+			// The next hole is at the end of the file, we're done here.
+			break
+		}
+	}
+
+	return out.Close()
+}
+
+// getNodes wraps around Management.GetNodes to return a list of nodes in a
+// cluster.
+func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
+	var res []*apb.Node
+	bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
+	err := backoff.Retry(func() error {
+		res = nil
+		srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+		if err != nil {
+			return fmt.Errorf("GetNodes: %w", err)
+		}
+		for {
+			node, err := srvN.Recv()
+			if err == io.EOF {
+				break
+			}
+			if err != nil {
+				return fmt.Errorf("GetNodes.Recv: %w", err)
+			}
+			res = append(res, node)
+		}
+		return nil
+	}, bo)
+	if err != nil {
+		return nil, err
+	}
+	return res, nil
+}
+
+// getNode wraps Management.GetNodes. It returns node information matching
+// given node ID.
+func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
+	nodes, err := getNodes(ctx, mgmt)
+	if err != nil {
+		return nil, fmt.Errorf("could not get nodes: %w", err)
+	}
+	for _, n := range nodes {
+		eid := identity.NodeID(n.Pubkey)
+		if eid != id {
+			continue
+		}
+		return n, nil
+	}
+	return nil, fmt.Errorf("no such node")
+}
+
+// Gets a random EUI-48 Ethernet MAC address
+func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
+	macBuf := make([]byte, 6)
+	_, err := rand.Read(macBuf)
+	if err != nil {
+		return nil, fmt.Errorf("failed to read randomness for MAC: %w", err)
+	}
+
+	// Set U/L bit and clear I/G bit (locally administered individual MAC)
+	// Ref IEEE 802-2014 Section 8.2.2
+	macBuf[0] = (macBuf[0] | 2) & 0xfe
+	mac := net.HardwareAddr(macBuf)
+	return &mac, nil
+}
+
+const SOCKSPort uint16 = 1080
+
+// ClusterPorts contains all ports handled by Nanoswitch.
+var ClusterPorts = []uint16{
+	// Forwarded to the first node.
+	uint16(node.CuratorServicePort),
+	uint16(node.DebugServicePort),
+	uint16(node.KubernetesAPIPort),
+	uint16(node.KubernetesAPIWrappedPort),
+
+	// SOCKS proxy to the switch network
+	SOCKSPort,
+}
+
+// ClusterOptions contains all options for launching a Metropolis cluster.
+type ClusterOptions struct {
+	// The number of nodes this cluster should be started with.
+	NumNodes int
+
+	// If true, node logs will be saved to individual files instead of being printed
+	// out to stderr. The path of these files will be still printed to stdout.
+	//
+	// The files will be located within the launch directory inside TEST_TMPDIR (or
+	// the default tempdir location, if not set).
+	NodeLogsToFiles bool
+
+	// LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
+	// bootstrapping them. The nodes' address information in Cluster.Nodes will be
+	// incomplete.
+	LeaveNodesNew bool
+
+	// Optional local registry which will be made available to the cluster to
+	// pull images from. This is a more efficient alternative to preseeding all
+	// images used for testing.
+	LocalRegistry *localregistry.Server
+
+	// InitialClusterConfiguration will be passed to the first node when creating the
+	// cluster, and defines some basic properties of the cluster. If not specified,
+	// the cluster will default to defaults as defined in
+	// metropolis.proto.api.NodeParameters.
+	InitialClusterConfiguration *cpb.ClusterConfiguration
+}
+
+// Cluster is the running Metropolis cluster launched using the LaunchCluster
+// function.
+type Cluster struct {
+	// Owner is the TLS Certificate of the owner of the test cluster. This can be
+	// used to authenticate further clients to the running cluster.
+	Owner tls.Certificate
+	// Ports is the PortMap used to access the first nodes' services (defined in
+	// ClusterPorts) and the SOCKS proxy (at SOCKSPort).
+	Ports launch.PortMap
+
+	// Nodes is a map from Node ID to its runtime information.
+	Nodes map[string]*NodeInCluster
+	// NodeIDs is a list of node IDs that are backing this cluster, in order of
+	// creation.
+	NodeIDs []string
+
+	// CACertificate is the cluster's CA certificate.
+	CACertificate *x509.Certificate
+
+	// nodesDone is a list of channels populated with the return codes from all the
+	// nodes' qemu instances. It's used by Close to ensure all nodes have
+	// successfully been stopped.
+	nodesDone []chan error
+	// nodeOpts are the cluster member nodes' mutable launch options, kept here
+	// to facilitate reboots.
+	nodeOpts []NodeOptions
+	// launchDir points at the directory keeping the nodes' state, such as storage
+	// images, firmware variable files, TPM state.
+	launchDir string
+	// socketDir points at the directory keeping UNIX socket files, such as these
+	// used to facilitate communication between QEMU and swtpm. It's different
+	// from launchDir, and anchored nearer the file system root, due to the
+	// socket path length limitation imposed by the kernel.
+	socketDir   string
+	metroctlDir string
+
+	// SOCKSDialer is used by DialNode to establish connections to nodes via the
+	// SOCKS server ran by nanoswitch.
+	SOCKSDialer proxy.Dialer
+
+	// authClient is a cached authenticated owner connection to a Curator
+	// instance within the cluster.
+	authClient *grpc.ClientConn
+
+	// ctxT is the context individual node contexts are created from.
+	ctxT context.Context
+	// ctxC is used by Close to cancel the context under which the nodes are
+	// running.
+	ctxC context.CancelFunc
+
+	tpmFactory *TPMFactory
+}
+
+// NodeInCluster represents information about a node that's part of a Cluster.
+type NodeInCluster struct {
+	// ID of the node, which can be used to dial this node's services via DialNode.
+	ID     string
+	Pubkey []byte
+	// Address of the node on the network ran by nanoswitch. Not reachable from the
+	// host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
+	// on Cluster.Ports[SOCKSPort]).
+	ManagementAddress string
+}
+
+// firstConnection performs the initial owner credential escrow with a newly
+// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
+// running at 10.1.0.2, which is always the case with the current nanoswitch
+// implementation.
+//
+// It returns the newly escrowed credentials as well as the first node's
+// information as NodeInCluster.
+func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
+	// Dial external service.
+	remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
+	initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
+	if err != nil {
+		return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
+	}
+	initDialer := func(_ context.Context, addr string) (net.Conn, error) {
+		return socksDialer.Dial("tcp", addr)
+	}
+	initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
+	if err != nil {
+		return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
+	}
+	defer initClient.Close()
+
+	// Retrieve owner certificate - this can take a while because the node is still
+	// coming up, so do it in a backoff loop.
+	launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
+	aaa := apb.NewAAAClient(initClient)
+	var cert *tls.Certificate
+	err = backoff.Retry(func() error {
+		cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
+		if st, ok := status.FromError(err); ok {
+			if st.Code() == codes.Unavailable {
+				launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
+				return err
+			}
+		}
+		return backoff.Permanent(err)
+	}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
+	if err != nil {
+		return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
+	}
+	launch.Log("Cluster: retrieved owner certificate.")
+
+	// Now connect authenticated and get the node ID.
+	creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
+	authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
+	if err != nil {
+		return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
+	}
+	defer authClient.Close()
+	mgmt := apb.NewManagementClient(authClient)
+
+	var node *NodeInCluster
+	err = backoff.Retry(func() error {
+		nodes, err := getNodes(ctx, mgmt)
+		if err != nil {
+			return fmt.Errorf("retrieving nodes failed: %w", err)
+		}
+		if len(nodes) != 1 {
+			return fmt.Errorf("expected one node, got %d", len(nodes))
+		}
+		n := nodes[0]
+		if n.Status == nil || n.Status.ExternalAddress == "" {
+			return fmt.Errorf("node has no status and/or address")
+		}
+		node = &NodeInCluster{
+			ID:                identity.NodeID(n.Pubkey),
+			ManagementAddress: n.Status.ExternalAddress,
+		}
+		return nil
+	}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
+	if err != nil {
+		return nil, nil, err
+	}
+
+	return cert, node, nil
+}
+
+func NewSerialFileLogger(p string) (io.ReadWriter, error) {
+	f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
+	if err != nil {
+		return nil, err
+	}
+	return f, nil
+}
+
+// LaunchCluster launches a cluster of Metropolis node VMs together with a
+// Nanoswitch instance to network them all together.
+//
+// The given context will be used to run all qemu instances in the cluster, and
+// canceling the context or calling Close() will terminate them.
+func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
+	if opts.NumNodes <= 0 {
+		return nil, errors.New("refusing to start cluster with zero nodes")
+	}
+
+	// Create the launch directory.
+	ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
+	if err != nil {
+		return nil, fmt.Errorf("failed to create the launch directory: %w", err)
+	}
+	// Create the metroctl config directory. We keep it in /tmp because in some
+	// scenarios it's end-user visible and we want it short.
+	md, err := os.MkdirTemp("/tmp", "metroctl-*")
+	if err != nil {
+		return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
+	}
+
+	// Create the socket directory. We keep it in /tmp because of socket path limits.
+	sd, err := os.MkdirTemp("/tmp", "cluster-*")
+	if err != nil {
+		return nil, fmt.Errorf("failed to create the socket directory: %w", err)
+	}
+
+	// Set up TPM factory.
+	tpmf, err := NewTPMFactory(filepath.Join(ld, "tpm"))
+	if err != nil {
+		return nil, fmt.Errorf("failed to create TPM factory: %w", err)
+	}
+
+	// Prepare links between nodes and nanoswitch.
+	var switchPorts []*os.File
+	var vmPorts []*os.File
+	for i := 0; i < opts.NumNodes; i++ {
+		switchPort, vmPort, err := launch.NewSocketPair()
+		if err != nil {
+			return nil, fmt.Errorf("failed to get socketpair: %w", err)
+		}
+		switchPorts = append(switchPorts, switchPort)
+		vmPorts = append(vmPorts, vmPort)
+	}
+
+	// Make a list of channels that will be populated by all running node qemu
+	// processes.
+	done := make([]chan error, opts.NumNodes)
+	for i := range done {
+		done[i] = make(chan error, 1)
+	}
+
+	// Prepare the node options. These will be kept as part of Cluster.
+	// nodeOpts[].Runtime will be initialized by LaunchNode during the first
+	// launch. The runtime information can be later used to restart a node.
+	// The 0th node will be initialized first. The rest will follow after it
+	// had bootstrapped the cluster.
+	nodeOpts := make([]NodeOptions, opts.NumNodes)
+	nodeOpts[0] = NodeOptions{
+		Name:            "node0",
+		ConnectToSocket: vmPorts[0],
+		NodeParameters: &apb.NodeParameters{
+			Cluster: &apb.NodeParameters_ClusterBootstrap_{
+				ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
+					OwnerPublicKey:              InsecurePublicKey,
+					InitialClusterConfiguration: opts.InitialClusterConfiguration,
+					Labels: &cpb.NodeLabels{
+						Pairs: []*cpb.NodeLabels_Pair{
+							{Key: nodeNumberKey, Value: "0"},
+						},
+					},
+				},
+			},
+		},
+		SerialPort: newPrefixedStdio(0),
+		PcapDump:   true,
+	}
+	if opts.NodeLogsToFiles {
+		path := path.Join(ld, "node-1.txt")
+		port, err := NewSerialFileLogger(path)
+		if err != nil {
+			return nil, fmt.Errorf("could not open log file for node 1: %w", err)
+		}
+		launch.Log("Node 1 logs at %s", path)
+		nodeOpts[0].SerialPort = port
+	}
+
+	// Start the first node.
+	ctxT, ctxC := context.WithCancel(ctx)
+	launch.Log("Cluster: Starting node %d...", 1)
+	if err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[0], done[0]); err != nil {
+		ctxC()
+		return nil, fmt.Errorf("failed to launch first node: %w", err)
+	}
+
+	localRegistryAddr := net.TCPAddr{
+		IP:   net.IPv4(10, 42, 0, 82),
+		Port: 5000,
+	}
+
+	var guestSvcMap launch.GuestServiceMap
+	if opts.LocalRegistry != nil {
+		l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
+		if err != nil {
+			ctxC()
+			return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
+		}
+		s := http.Server{
+			Handler: opts.LocalRegistry,
+		}
+		go s.Serve(l)
+		go func() {
+			<-ctxT.Done()
+			s.Close()
+		}()
+		guestSvcMap = launch.GuestServiceMap{
+			&localRegistryAddr: *l.Addr().(*net.TCPAddr),
+		}
+	}
+
+	// Launch nanoswitch.
+	portMap, err := launch.ConflictFreePortMap(ClusterPorts)
+	if err != nil {
+		ctxC()
+		return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
+	}
+
+	go func() {
+		var serialPort io.ReadWriter
+		if opts.NodeLogsToFiles {
+			path := path.Join(ld, "nanoswitch.txt")
+			serialPort, err = NewSerialFileLogger(path)
+			if err != nil {
+				launch.Log("Could not open log file for nanoswitch: %v", err)
+			}
+			launch.Log("Nanoswitch logs at %s", path)
+		} else {
+			serialPort = newPrefixedStdio(99)
+		}
+		kernelPath, err := runfiles.Rlocation("_main/osbase/test/ktest/vmlinux")
+		if err != nil {
+			launch.Fatal("Failed to resolved nanoswitch kernel: %v", err)
+		}
+		initramfsPath, err := runfiles.Rlocation("_main/metropolis/test/nanoswitch/initramfs.cpio.zst")
+		if err != nil {
+			launch.Fatal("Failed to resolved nanoswitch initramfs: %v", err)
+		}
+		if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
+			Name:                   "nanoswitch",
+			KernelPath:             kernelPath,
+			InitramfsPath:          initramfsPath,
+			ExtraNetworkInterfaces: switchPorts,
+			PortMap:                portMap,
+			GuestServiceMap:        guestSvcMap,
+			SerialPort:             serialPort,
+			PcapDump:               path.Join(ld, "nanoswitch.pcap"),
+		}); err != nil {
+			if !errors.Is(err, ctxT.Err()) {
+				launch.Fatal("Failed to launch nanoswitch: %v", err)
+			}
+		}
+	}()
+
+	// Build SOCKS dialer.
+	socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
+	socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
+	if err != nil {
+		ctxC()
+		return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
+	}
+
+	// Retrieve owner credentials and first node.
+	cert, firstNode, err := firstConnection(ctxT, socksDialer)
+	if err != nil {
+		ctxC()
+		return nil, err
+	}
+
+	// Write credentials to the metroctl directory.
+	if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
+		ctxC()
+		return nil, fmt.Errorf("could not write owner key: %w", err)
+	}
+	if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
+		ctxC()
+		return nil, fmt.Errorf("could not write owner certificate: %w", err)
+	}
+
+	launch.Log("Cluster: Node %d is %s", 0, firstNode.ID)
+
+	// Set up a partially initialized cluster instance, to be filled in the
+	// later steps.
+	cluster := &Cluster{
+		Owner: *cert,
+		Ports: portMap,
+		Nodes: map[string]*NodeInCluster{
+			firstNode.ID: firstNode,
+		},
+		NodeIDs: []string{
+			firstNode.ID,
+		},
+
+		nodesDone:   done,
+		nodeOpts:    nodeOpts,
+		launchDir:   ld,
+		socketDir:   sd,
+		metroctlDir: md,
+
+		SOCKSDialer: socksDialer,
+
+		ctxT: ctxT,
+		ctxC: ctxC,
+
+		tpmFactory: tpmf,
+	}
+
+	// Now start the rest of the nodes and register them into the cluster.
+
+	// Get an authenticated owner client within the cluster.
+	curC, err := cluster.CuratorClient()
+	if err != nil {
+		ctxC()
+		return nil, fmt.Errorf("CuratorClient: %w", err)
+	}
+	mgmt := apb.NewManagementClient(curC)
+
+	// Retrieve register ticket to register further nodes.
+	launch.Log("Cluster: retrieving register ticket...")
+	resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
+	if err != nil {
+		ctxC()
+		return nil, fmt.Errorf("GetRegisterTicket: %w", err)
+	}
+	ticket := resT.Ticket
+	launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
+
+	// Retrieve cluster info (for directory and ca public key) to register further
+	// nodes.
+	resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+	if err != nil {
+		ctxC()
+		return nil, fmt.Errorf("GetClusterInfo: %w", err)
+	}
+	caCert, err := x509.ParseCertificate(resI.CaCertificate)
+	if err != nil {
+		ctxC()
+		return nil, fmt.Errorf("ParseCertificate: %w", err)
+	}
+	cluster.CACertificate = caCert
+
+	// Use the retrieved information to configure the rest of the node options.
+	for i := 1; i < opts.NumNodes; i++ {
+		nodeOpts[i] = NodeOptions{
+			Name:            fmt.Sprintf("node%d", i),
+			ConnectToSocket: vmPorts[i],
+			NodeParameters: &apb.NodeParameters{
+				Cluster: &apb.NodeParameters_ClusterRegister_{
+					ClusterRegister: &apb.NodeParameters_ClusterRegister{
+						RegisterTicket:   ticket,
+						ClusterDirectory: resI.ClusterDirectory,
+						CaCertificate:    resI.CaCertificate,
+						Labels: &cpb.NodeLabels{
+							Pairs: []*cpb.NodeLabels_Pair{
+								{Key: nodeNumberKey, Value: fmt.Sprintf("%d", i)},
+							},
+						},
+					},
+				},
+			},
+			SerialPort: newPrefixedStdio(i),
+		}
+		if opts.NodeLogsToFiles {
+			path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
+			port, err := NewSerialFileLogger(path)
+			if err != nil {
+				return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
+			}
+			launch.Log("Node %d logs at %s", i+1, path)
+			nodeOpts[i].SerialPort = port
+		}
+	}
+
+	// Now run the rest of the nodes.
+	for i := 1; i < opts.NumNodes; i++ {
+		launch.Log("Cluster: Starting node %d...", i+1)
+		err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[i], done[i])
+		if err != nil {
+			return nil, fmt.Errorf("failed to launch node %d: %w", i+1, err)
+		}
+	}
+
+	// Wait for nodes to appear as NEW, populate a map from node number (index into
+	// NodeOpts, etc.) to Metropolis Node ID.
+	seenNodes := make(map[string]bool)
+	nodeNumberToID := make(map[int]string)
+	launch.Log("Cluster: waiting for nodes to appear as NEW...")
+	for i := 1; i < opts.NumNodes; i++ {
+		for {
+			nodes, err := getNodes(ctx, mgmt)
+			if err != nil {
+				ctxC()
+				return nil, fmt.Errorf("could not get nodes: %w", err)
+			}
+			for _, n := range nodes {
+				if n.State != cpb.NodeState_NODE_STATE_NEW {
+					continue
+				}
+				if seenNodes[n.Id] {
+					continue
+				}
+				seenNodes[n.Id] = true
+				cluster.Nodes[n.Id] = &NodeInCluster{
+					ID:     n.Id,
+					Pubkey: n.Pubkey,
+				}
+
+				num, err := strconv.Atoi(node.GetNodeLabel(n.Labels, nodeNumberKey))
+				if err != nil {
+					return nil, fmt.Errorf("node %s has undecodable number label: %w", n.Id, err)
+				}
+				launch.Log("Cluster: Node %d is %s", num, n.Id)
+				nodeNumberToID[num] = n.Id
+			}
+
+			if len(seenNodes) == opts.NumNodes-1 {
+				break
+			}
+			time.Sleep(1 * time.Second)
+		}
+	}
+	launch.Log("Found all expected nodes")
+
+	// Build the rest of NodeIDs from map.
+	for i := 1; i < opts.NumNodes; i++ {
+		cluster.NodeIDs = append(cluster.NodeIDs, nodeNumberToID[i])
+	}
+
+	approvedNodes := make(map[string]bool)
+	upNodes := make(map[string]bool)
+	if !opts.LeaveNodesNew {
+		for {
+			nodes, err := getNodes(ctx, mgmt)
+			if err != nil {
+				ctxC()
+				return nil, fmt.Errorf("could not get nodes: %w", err)
+			}
+			for _, node := range nodes {
+				if !seenNodes[node.Id] {
+					// Skip nodes that weren't NEW in the previous step.
+					continue
+				}
+
+				if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
+					launch.Log("Cluster: node %s is up", node.Id)
+					upNodes[node.Id] = true
+					cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
+				}
+				if upNodes[node.Id] {
+					continue
+				}
+
+				if !approvedNodes[node.Id] {
+					launch.Log("Cluster: approving node %s", node.Id)
+					_, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
+						Pubkey: node.Pubkey,
+					})
+					if err != nil {
+						ctxC()
+						return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
+					}
+					approvedNodes[node.Id] = true
+				}
+			}
+
+			launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
+			if len(upNodes) == opts.NumNodes-1 {
+				break
+			}
+			time.Sleep(time.Second)
+		}
+	}
+
+	launch.Log("Cluster: all nodes up:")
+	for _, node := range cluster.Nodes {
+		launch.Log("Cluster:  - %s at %s", node.ID, node.ManagementAddress)
+	}
+	launch.Log("Cluster: starting tests...")
+
+	return cluster, nil
+}
+
+// RebootNode reboots the cluster member node matching the given index, and
+// waits for it to rejoin the cluster. It will use the given context ctx to run
+// cluster API requests, whereas the resulting QEMU process will be created
+// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
+func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
+	if idx < 0 || idx >= len(c.NodeIDs) {
+		return fmt.Errorf("index out of bounds")
+	}
+	if c.nodeOpts[idx].Runtime == nil {
+		return fmt.Errorf("node not running")
+	}
+	id := c.NodeIDs[idx]
+
+	// Get an authenticated owner client within the cluster.
+	curC, err := c.CuratorClient()
+	if err != nil {
+		return err
+	}
+	mgmt := apb.NewManagementClient(curC)
+
+	// Cancel the node's context. This will shut down QEMU.
+	c.nodeOpts[idx].Runtime.CtxC()
+	launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
+	err = <-c.nodesDone[idx]
+	if err != nil {
+		return fmt.Errorf("while restarting node: %w", err)
+	}
+
+	// Start QEMU again.
+	launch.Log("Cluster: restarting node %d (%s).", idx, id)
+	if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
+		return fmt.Errorf("failed to launch node %d: %w", idx, err)
+	}
+
+	start := time.Now()
+
+	// Poll Management.GetNodes until the node is healthy.
+	for {
+		cs, err := getNode(ctx, mgmt, id)
+		if err != nil {
+			launch.Log("Cluster: node get error: %v", err)
+			return err
+		}
+		launch.Log("Cluster: node health: %+v", cs.Health)
+
+		lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
+		if lhb.After(start) && cs.Health == apb.Node_HEALTHY {
+			break
+		}
+		time.Sleep(time.Second)
+	}
+	launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
+	return nil
+}
+
+// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
+// given by idx. If the node is already shut down, this is a no-op.
+func (c *Cluster) ShutdownNode(idx int) error {
+	if idx < 0 || idx >= len(c.NodeIDs) {
+		return fmt.Errorf("index out of bounds")
+	}
+	// Return if node is already stopped.
+	select {
+	case <-c.nodeOpts[idx].Runtime.ctxT.Done():
+		return nil
+	default:
+	}
+	id := c.NodeIDs[idx]
+
+	// Cancel the node's context. This will shut down QEMU.
+	c.nodeOpts[idx].Runtime.CtxC()
+	launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
+	err := <-c.nodesDone[idx]
+	if err != nil {
+		return fmt.Errorf("while shutting down node: %w", err)
+	}
+	launch.Log("Cluster: node %d (%s) stopped.", idx, id)
+	return nil
+}
+
+// StartNode performs a power on of the node given by idx. If the node is already
+// running, this is a no-op.
+func (c *Cluster) StartNode(idx int) error {
+	if idx < 0 || idx >= len(c.NodeIDs) {
+		return fmt.Errorf("index out of bounds")
+	}
+	id := c.NodeIDs[idx]
+	// Return if node is already running.
+	select {
+	case <-c.nodeOpts[idx].Runtime.ctxT.Done():
+	default:
+		return nil
+	}
+
+	// Start QEMU again.
+	launch.Log("Cluster: starting node %d (%s).", idx, id)
+	if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
+		return fmt.Errorf("failed to launch node %d: %w", idx, err)
+	}
+	launch.Log("Cluster: node %d (%s) started.", idx, id)
+	return nil
+}
+
+// Close cancels the running clusters' context and waits for all virtualized
+// nodes to stop. It returns an error if stopping the nodes failed, or one of
+// the nodes failed to fully start in the first place.
+func (c *Cluster) Close() error {
+	launch.Log("Cluster: stopping...")
+	if c.authClient != nil {
+		c.authClient.Close()
+	}
+	c.ctxC()
+
+	var errs []error
+	launch.Log("Cluster: waiting for nodes to exit...")
+	for _, c := range c.nodesDone {
+		err := <-c
+		if err != nil {
+			errs = append(errs, err)
+		}
+	}
+	launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
+	os.RemoveAll(c.launchDir)
+	os.RemoveAll(c.socketDir)
+	os.RemoveAll(c.metroctlDir)
+	launch.Log("Cluster: done")
+	return multierr.Combine(errs...)
+}
+
+// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
+// their ID. This is performed by connecting to the cluster nanoswitch via its
+// SOCKS proxy, and using the cluster node list for name resolution.
+//
+// For example:
+//
+//	grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
+func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
+	host, port, err := net.SplitHostPort(addr)
+	if err != nil {
+		return nil, fmt.Errorf("invalid host:port: %w", err)
+	}
+	// Already an IP address?
+	if net.ParseIP(host) != nil {
+		return c.SOCKSDialer.Dial("tcp", addr)
+	}
+
+	// Otherwise, expect a node name.
+	node, ok := c.Nodes[host]
+	if !ok {
+		return nil, fmt.Errorf("unknown node %q", host)
+	}
+	addr = net.JoinHostPort(node.ManagementAddress, port)
+	return c.SOCKSDialer.Dial("tcp", addr)
+}
+
+// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
+// Kubernetes authenticating proxy using the cluster owner identity.
+// It currently has access to everything (i.e. the cluster-admin role)
+// via the owner-admin binding.
+func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
+	pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
+	if err != nil {
+		// We explicitly pass an Ed25519 private key in, so this can't happen
+		panic(err)
+	}
+
+	host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
+	clientConfig := rest.Config{
+		Host: host,
+		TLSClientConfig: rest.TLSClientConfig{
+			// TODO(q3k): use CA certificate
+			Insecure:   true,
+			ServerName: "kubernetes.default.svc",
+			CertData:   pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
+			KeyData:    pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
+		},
+		Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
+			return c.DialNode(ctx, address)
+		},
+	}
+	return kubernetes.NewForConfig(&clientConfig)
+}
+
+// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
+// which are currently Kubernetes controllers, ie. run an apiserver. This list
+// might be empty if no node is currently configured with the
+// 'KubernetesController' node.
+func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
+	curC, err := c.CuratorClient()
+	if err != nil {
+		return nil, err
+	}
+	mgmt := apb.NewManagementClient(curC)
+	srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
+		Filter: "has(node.roles.kubernetes_controller)",
+	})
+	if err != nil {
+		return nil, err
+	}
+	defer srv.CloseSend()
+	var res []string
+	for {
+		n, err := srv.Recv()
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return nil, err
+		}
+		if n.Status == nil || n.Status.ExternalAddress == "" {
+			continue
+		}
+		res = append(res, n.Status.ExternalAddress)
+	}
+	return res, nil
+}
+
+// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
+// healthy.
+func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
+	// Get an authenticated owner client within the cluster.
+	curC, err := c.CuratorClient()
+	if err != nil {
+		return err
+	}
+	mgmt := apb.NewManagementClient(curC)
+	nodes, err := getNodes(ctx, mgmt)
+	if err != nil {
+		return err
+	}
+
+	var unhealthy []string
+	for _, node := range nodes {
+		if node.Health == apb.Node_HEALTHY {
+			continue
+		}
+		unhealthy = append(unhealthy, node.Id)
+	}
+	if len(unhealthy) == 0 {
+		return nil
+	}
+	return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
+}
+
+// ApproveNode approves a node by ID, waiting for it to become UP.
+func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
+	curC, err := c.CuratorClient()
+	if err != nil {
+		return err
+	}
+	mgmt := apb.NewManagementClient(curC)
+
+	_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
+		Pubkey: c.Nodes[id].Pubkey,
+	})
+	if err != nil {
+		return fmt.Errorf("ApproveNode: %w", err)
+	}
+	launch.Log("Cluster: %s: approved, waiting for UP", id)
+	for {
+		nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+		if err != nil {
+			return fmt.Errorf("GetNodes: %w", err)
+		}
+		found := false
+		for {
+			node, err := nodes.Recv()
+			if errors.Is(err, io.EOF) {
+				break
+			}
+			if err != nil {
+				return fmt.Errorf("Nodes.Recv: %w", err)
+			}
+			if node.Id != id {
+				continue
+			}
+			if node.State != cpb.NodeState_NODE_STATE_UP {
+				continue
+			}
+			found = true
+			break
+		}
+		nodes.CloseSend()
+
+		if found {
+			break
+		}
+		time.Sleep(time.Second)
+	}
+	launch.Log("Cluster: %s: UP", id)
+	return nil
+}
+
+// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
+func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
+	curC, err := c.CuratorClient()
+	if err != nil {
+		return err
+	}
+	mgmt := apb.NewManagementClient(curC)
+
+	tr := true
+	launch.Log("Cluster: %s: adding KubernetesWorker", id)
+	_, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
+		Node: &apb.UpdateNodeRolesRequest_Id{
+			Id: id,
+		},
+		KubernetesWorker: &tr,
+	})
+	return err
+}
+
+// MakeConsensusMember adds the ConsensusMember role to a node by ID.
+func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
+	curC, err := c.CuratorClient()
+	if err != nil {
+		return err
+	}
+	mgmt := apb.NewManagementClient(curC)
+	cur := ipb.NewCuratorClient(curC)
+
+	tr := true
+	launch.Log("Cluster: %s: adding ConsensusMember", id)
+	bo := backoff.NewExponentialBackOff()
+	bo.MaxElapsedTime = 10 * time.Second
+
+	backoff.Retry(func() error {
+		_, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
+			Node: &apb.UpdateNodeRolesRequest_Id{
+				Id: id,
+			},
+			ConsensusMember: &tr,
+		})
+		if err != nil {
+			launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
+		}
+		return err
+	}, backoff.WithContext(bo, ctx))
+	if err != nil {
+		return err
+	}
+
+	launch.Log("Cluster: %s: waiting for learner/full members...", id)
+
+	learner := false
+	for {
+		res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
+		if err != nil {
+			return fmt.Errorf("GetConsensusStatus: %w", err)
+		}
+		for _, member := range res.EtcdMember {
+			if member.Id != id {
+				continue
+			}
+			switch member.Status {
+			case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
+				if !learner {
+					learner = true
+					launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
+				}
+			case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
+				launch.Log("Cluster: %s: became a full member", id)
+				return nil
+			}
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+}