| // cluster builds on the launch package and implements launching Metropolis | 
 | // nodes and clusters in a virtualized environment using qemu. It's kept in a | 
 | // separate package as it depends on a Metropolis node image, which might not be | 
 | // required for some use of the launch library. | 
 | package cluster | 
 |  | 
 | import ( | 
 | 	"bytes" | 
 | 	"context" | 
 | 	"crypto/ed25519" | 
 | 	"crypto/rand" | 
 | 	"crypto/tls" | 
 | 	"crypto/x509" | 
 | 	"encoding/pem" | 
 | 	"errors" | 
 | 	"fmt" | 
 | 	"io" | 
 | 	"net" | 
 | 	"net/http" | 
 | 	"os" | 
 | 	"os/exec" | 
 | 	"path" | 
 | 	"path/filepath" | 
 | 	"strings" | 
 | 	"syscall" | 
 | 	"time" | 
 |  | 
 | 	"github.com/cenkalti/backoff/v4" | 
 | 	"go.uber.org/multierr" | 
 | 	"golang.org/x/net/proxy" | 
 | 	"google.golang.org/grpc" | 
 | 	"google.golang.org/grpc/codes" | 
 | 	"google.golang.org/grpc/status" | 
 | 	"google.golang.org/protobuf/proto" | 
 | 	"k8s.io/client-go/kubernetes" | 
 | 	"k8s.io/client-go/rest" | 
 |  | 
 | 	apb "source.monogon.dev/metropolis/proto/api" | 
 | 	cpb "source.monogon.dev/metropolis/proto/common" | 
 |  | 
 | 	metroctl "source.monogon.dev/metropolis/cli/metroctl/core" | 
 | 	"source.monogon.dev/metropolis/cli/pkg/datafile" | 
 | 	"source.monogon.dev/metropolis/node" | 
 | 	"source.monogon.dev/metropolis/node/core/identity" | 
 | 	"source.monogon.dev/metropolis/node/core/rpc" | 
 | 	"source.monogon.dev/metropolis/node/core/rpc/resolver" | 
 | 	"source.monogon.dev/metropolis/pkg/localregistry" | 
 | 	"source.monogon.dev/metropolis/test/launch" | 
 | ) | 
 |  | 
 | // NodeOptions contains all options that can be passed to Launch() | 
 | type NodeOptions struct { | 
 | 	// Name is a human-readable identifier to be used in debug output. | 
 | 	Name string | 
 |  | 
 | 	// Ports contains the port mapping where to expose the internal ports of the VM to | 
 | 	// the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when | 
 | 	// ConnectToSocket is set. | 
 | 	Ports launch.PortMap | 
 |  | 
 | 	// If set to true, reboots are honored. Otherwise, all reboots exit the Launch() | 
 | 	// command. Metropolis nodes generally restart on almost all errors, so unless you | 
 | 	// want to test reboot behavior this should be false. | 
 | 	AllowReboot bool | 
 |  | 
 | 	// By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is | 
 | 	// set, it is instead connected to the given file descriptor/socket. If this is | 
 | 	// set, all port maps from the Ports option are ignored. Intended for networking | 
 | 	// this instance together with others for running more complex network | 
 | 	// configurations. | 
 | 	ConnectToSocket *os.File | 
 |  | 
 | 	// When PcapDump is set, all traffic is dumped to a pcap file in the | 
 | 	// runtime directory (e.g. "net0.pcap" for the first interface). | 
 | 	PcapDump bool | 
 |  | 
 | 	// SerialPort is an io.ReadWriter over which you can communicate with the serial | 
 | 	// port of the machine. It can be set to an existing file descriptor (like | 
 | 	// os.Stdout/os.Stderr) or any Go structure implementing this interface. | 
 | 	SerialPort io.ReadWriter | 
 |  | 
 | 	// NodeParameters is passed into the VM and subsequently used for bootstrapping or | 
 | 	// registering into a cluster. | 
 | 	NodeParameters *apb.NodeParameters | 
 |  | 
 | 	// Mac is the node's MAC address. | 
 | 	Mac *net.HardwareAddr | 
 |  | 
 | 	// Runtime keeps the node's QEMU runtime state. | 
 | 	Runtime *NodeRuntime | 
 | } | 
 |  | 
 | // NodeRuntime keeps the node's QEMU runtime options. | 
 | type NodeRuntime struct { | 
 | 	// ld points at the node's launch directory storing data such as storage | 
 | 	// images, firmware variables or the TPM state. | 
 | 	ld string | 
 | 	// sd points at the node's socket directory. | 
 | 	sd string | 
 |  | 
 | 	// ctxT is the context QEMU will execute in. | 
 | 	ctxT context.Context | 
 | 	// CtxC is the QEMU context's cancellation function. | 
 | 	CtxC context.CancelFunc | 
 | } | 
 |  | 
 | // NodePorts is the list of ports a fully operational Metropolis node listens on | 
 | var NodePorts = []node.Port{ | 
 | 	node.ConsensusPort, | 
 |  | 
 | 	node.CuratorServicePort, | 
 | 	node.DebugServicePort, | 
 |  | 
 | 	node.KubernetesAPIPort, | 
 | 	node.KubernetesAPIWrappedPort, | 
 | 	node.CuratorServicePort, | 
 | 	node.DebuggerPort, | 
 | 	node.MetricsPort, | 
 | } | 
 |  | 
 | // setupRuntime creates the node's QEMU runtime directory, together with all | 
 | // files required to preserve its state, a level below the chosen path ld. The | 
 | // node's socket directory is similarily created a level below sd. It may | 
 | // return an I/O error. | 
 | func setupRuntime(ld, sd string) (*NodeRuntime, error) { | 
 | 	// Create a temporary directory to keep all the runtime files. | 
 | 	stdp, err := os.MkdirTemp(ld, "node_state*") | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("failed to create the state directory: %w", err) | 
 | 	} | 
 |  | 
 | 	// Initialize the node's storage with a prebuilt image. | 
 | 	si, err := datafile.ResolveRunfile("metropolis/node/image.img") | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("while resolving a path: %w", err) | 
 | 	} | 
 | 	di := filepath.Join(stdp, filepath.Base(si)) | 
 | 	launch.Log("Cluster: copying node image: %s -> %s", si, di) | 
 | 	if err := copyFile(si, di); err != nil { | 
 | 		return nil, fmt.Errorf("while copying the node image: %w", err) | 
 | 	} | 
 |  | 
 | 	// Initialize the OVMF firmware variables file. | 
 | 	sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd") | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("while resolving a path: %w", err) | 
 | 	} | 
 | 	dv := filepath.Join(stdp, filepath.Base(sv)) | 
 | 	if err := copyFile(sv, dv); err != nil { | 
 | 		return nil, fmt.Errorf("while copying firmware variables: %w", err) | 
 | 	} | 
 |  | 
 | 	// Create the TPM state directory and initialize all files required by swtpm. | 
 | 	tpmt := filepath.Join(stdp, "tpm") | 
 | 	if err := os.Mkdir(tpmt, 0o755); err != nil { | 
 | 		return nil, fmt.Errorf("while creating the TPM directory: %w", err) | 
 | 	} | 
 | 	tpms, err := datafile.ResolveRunfile("metropolis/node/tpm") | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("while resolving a path: %w", err) | 
 | 	} | 
 | 	tpmf, err := os.ReadDir(tpms) | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("failed to read TPM directory: %w", err) | 
 | 	} | 
 | 	for _, file := range tpmf { | 
 | 		name := file.Name() | 
 | 		src, err := datafile.ResolveRunfile(filepath.Join(tpms, name)) | 
 | 		if err != nil { | 
 | 			return nil, fmt.Errorf("while resolving a path: %w", err) | 
 | 		} | 
 | 		tgt := filepath.Join(tpmt, name) | 
 | 		if err := copyFile(src, tgt); err != nil { | 
 | 			return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err) | 
 | 		} | 
 | 	} | 
 |  | 
 | 	// Create the socket directory. | 
 | 	sotdp, err := os.MkdirTemp(sd, "node_sock*") | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("failed to create the socket directory: %w", err) | 
 | 	} | 
 |  | 
 | 	return &NodeRuntime{ | 
 | 		ld: stdp, | 
 | 		sd: sotdp, | 
 | 	}, nil | 
 | } | 
 |  | 
 | // CuratorClient returns an authenticated owner connection to a Curator | 
 | // instance within Cluster c, or nil together with an error. | 
 | func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) { | 
 | 	if c.authClient == nil { | 
 | 		authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure()) | 
 | 		r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) { | 
 | 			launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...)) | 
 | 		})) | 
 | 		for _, n := range c.NodeIDs { | 
 | 			ep, err := resolver.NodeWithDefaultPort(n) | 
 | 			if err != nil { | 
 | 				return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err) | 
 | 			} | 
 | 			r.AddEndpoint(ep) | 
 | 		} | 
 | 		authClient, err := grpc.Dial(resolver.MetropolisControlAddress, | 
 | 			grpc.WithTransportCredentials(authCreds), | 
 | 			grpc.WithResolvers(r), | 
 | 			grpc.WithContextDialer(c.DialNode), | 
 | 		) | 
 | 		if err != nil { | 
 | 			return nil, fmt.Errorf("dialing with owner credentials failed: %w", err) | 
 | 		} | 
 | 		c.authClient = authClient | 
 | 	} | 
 | 	return c.authClient, nil | 
 | } | 
 |  | 
 | // LaunchNode launches a single Metropolis node instance with the given options. | 
 | // The instance runs mostly paravirtualized but with some emulated hardware | 
 | // similar to how a cloud provider might set up its VMs. The disk is fully | 
 | // writable, and the changes are kept across reboots and shutdowns. ld and sd | 
 | // point to the launch directory and the socket directory, holding the nodes' | 
 | // state files (storage, tpm state, firmware state), and UNIX socket files | 
 | // (swtpm <-> QEMU interplay) respectively. The directories must exist before | 
 | // LaunchNode is called. LaunchNode will update options.Runtime and options.Mac | 
 | // if either are not initialized. | 
 | func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error { | 
 | 	// TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead | 
 | 	// of /tmp (requires QEMU version >5.0). | 
 | 	// https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9). | 
 | 	// swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD | 
 | 	// that we open and pass the name of it to QEMU. Not pinning this crashes both | 
 | 	// swtpm and qemu because we run into UNIX socket length limitations (for legacy | 
 | 	// reasons 108 chars). | 
 |  | 
 | 	// If it's the node's first start, set up its runtime directories. | 
 | 	if options.Runtime == nil { | 
 | 		r, err := setupRuntime(ld, sd) | 
 | 		if err != nil { | 
 | 			return fmt.Errorf("while setting up node runtime: %w", err) | 
 | 		} | 
 | 		options.Runtime = r | 
 | 	} | 
 |  | 
 | 	// Replace the node's context with a new one. | 
 | 	r := options.Runtime | 
 | 	if r.CtxC != nil { | 
 | 		r.CtxC() | 
 | 	} | 
 | 	r.ctxT, r.CtxC = context.WithCancel(ctx) | 
 |  | 
 | 	var qemuNetType string | 
 | 	var qemuNetConfig launch.QemuValue | 
 | 	if options.ConnectToSocket != nil { | 
 | 		qemuNetType = "socket" | 
 | 		qemuNetConfig = launch.QemuValue{ | 
 | 			"id": {"net0"}, | 
 | 			"fd": {"3"}, | 
 | 		} | 
 | 	} else { | 
 | 		qemuNetType = "user" | 
 | 		qemuNetConfig = launch.QemuValue{ | 
 | 			"id":        {"net0"}, | 
 | 			"net":       {"10.42.0.0/24"}, | 
 | 			"dhcpstart": {"10.42.0.10"}, | 
 | 			"hostfwd":   options.Ports.ToQemuForwards(), | 
 | 		} | 
 | 	} | 
 |  | 
 | 	// Generate the node's MAC address if it isn't already set in NodeOptions. | 
 | 	if options.Mac == nil { | 
 | 		mac, err := generateRandomEthernetMAC() | 
 | 		if err != nil { | 
 | 			return err | 
 | 		} | 
 | 		options.Mac = mac | 
 | 	} | 
 |  | 
 | 	tpmSocketPath := filepath.Join(r.sd, "tpm-socket") | 
 | 	fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd") | 
 | 	storagePath := filepath.Join(r.ld, "image.img") | 
 | 	qemuArgs := []string{ | 
 | 		"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096", | 
 | 		"-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4", | 
 | 		"-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd", | 
 | 		"-drive", "if=pflash,format=raw,file=" + fwVarPath, | 
 | 		"-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath, | 
 | 		"-netdev", qemuNetConfig.ToOption(qemuNetType), | 
 | 		"-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(), | 
 | 		"-chardev", "socket,id=chrtpm,path=" + tpmSocketPath, | 
 | 		"-tpmdev", "emulator,id=tpm0,chardev=chrtpm", | 
 | 		"-device", "tpm-tis,tpmdev=tpm0", | 
 | 		"-device", "virtio-rng-pci", | 
 | 		"-serial", "stdio", | 
 | 	} | 
 |  | 
 | 	if !options.AllowReboot { | 
 | 		qemuArgs = append(qemuArgs, "-no-reboot") | 
 | 	} | 
 |  | 
 | 	if options.NodeParameters != nil { | 
 | 		parametersPath := filepath.Join(r.ld, "parameters.pb") | 
 | 		parametersRaw, err := proto.Marshal(options.NodeParameters) | 
 | 		if err != nil { | 
 | 			return fmt.Errorf("failed to encode node paraeters: %w", err) | 
 | 		} | 
 | 		if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil { | 
 | 			return fmt.Errorf("failed to write node parameters: %w", err) | 
 | 		} | 
 | 		qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath) | 
 | 	} | 
 |  | 
 | 	if options.PcapDump { | 
 | 		var qemuNetDump launch.QemuValue | 
 | 		pcapPath := filepath.Join(r.ld, "net0.pcap") | 
 | 		if options.PcapDump { | 
 | 			qemuNetDump = launch.QemuValue{ | 
 | 				"id":     {"net0"}, | 
 | 				"netdev": {"net0"}, | 
 | 				"file":   {pcapPath}, | 
 | 			} | 
 | 		} | 
 | 		qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump")) | 
 | 	} | 
 |  | 
 | 	// Start TPM emulator as a subprocess | 
 | 	tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT) | 
 | 	defer tpmCancel() | 
 |  | 
 | 	tpmd := filepath.Join(r.ld, "tpm") | 
 | 	tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath) | 
 | 	tpmEmuCmd.Stderr = os.Stderr | 
 | 	tpmEmuCmd.Stdout = os.Stdout | 
 |  | 
 | 	err := tpmEmuCmd.Start() | 
 | 	if err != nil { | 
 | 		return fmt.Errorf("failed to start TPM emulator: %w", err) | 
 | 	} | 
 |  | 
 | 	// Wait for the socket to be created by the TPM emulator before launching | 
 | 	// QEMU. | 
 | 	for { | 
 | 		_, err := os.Stat(tpmSocketPath) | 
 | 		if err == nil { | 
 | 			break | 
 | 		} | 
 | 		if err != nil && !os.IsNotExist(err) { | 
 | 			return fmt.Errorf("while stat-ing TPM socket path: %w", err) | 
 | 		} | 
 | 		if err := tpmCtx.Err(); err != nil { | 
 | 			return fmt.Errorf("while waiting for the TPM socket: %w", err) | 
 | 		} | 
 | 		time.Sleep(time.Millisecond * 100) | 
 | 	} | 
 |  | 
 | 	// Start the main qemu binary | 
 | 	systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...) | 
 | 	if options.ConnectToSocket != nil { | 
 | 		systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket} | 
 | 	} | 
 |  | 
 | 	var stdErrBuf bytes.Buffer | 
 | 	systemCmd.Stderr = &stdErrBuf | 
 | 	systemCmd.Stdout = options.SerialPort | 
 |  | 
 | 	launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args) | 
 |  | 
 | 	err = systemCmd.Run() | 
 |  | 
 | 	// Stop TPM emulator and wait for it to exit to properly reap the child process | 
 | 	tpmCancel() | 
 | 	launch.Log("Node: Waiting for TPM emulator to exit") | 
 | 	// Wait returns a SIGKILL error because we just cancelled its context. | 
 | 	// We still need to call it to avoid creating zombies. | 
 | 	_ = tpmEmuCmd.Wait() | 
 | 	launch.Log("Node: TPM emulator done") | 
 |  | 
 | 	var exerr *exec.ExitError | 
 | 	if err != nil && errors.As(err, &exerr) { | 
 | 		status := exerr.ProcessState.Sys().(syscall.WaitStatus) | 
 | 		if status.Signaled() && status.Signal() == syscall.SIGKILL { | 
 | 			// Process was killed externally (most likely by our context being canceled). | 
 | 			// This is a normal exit for us, so return nil | 
 | 			return nil | 
 | 		} | 
 | 		exerr.Stderr = stdErrBuf.Bytes() | 
 | 		newErr := launch.QEMUError(*exerr) | 
 | 		return &newErr | 
 | 	} | 
 | 	return err | 
 | } | 
 |  | 
 | func copyFile(src, dst string) error { | 
 | 	in, err := os.Open(src) | 
 | 	if err != nil { | 
 | 		return fmt.Errorf("when opening source: %w", err) | 
 | 	} | 
 | 	defer in.Close() | 
 |  | 
 | 	out, err := os.Create(dst) | 
 | 	if err != nil { | 
 | 		return fmt.Errorf("when creating destination: %w", err) | 
 | 	} | 
 | 	defer out.Close() | 
 |  | 
 | 	_, err = io.Copy(out, in) | 
 | 	if err != nil { | 
 | 		return fmt.Errorf("when copying file: %w", err) | 
 | 	} | 
 | 	return out.Close() | 
 | } | 
 |  | 
 | // getNodes wraps around Management.GetNodes to return a list of nodes in a | 
 | // cluster. | 
 | func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) { | 
 | 	var res []*apb.Node | 
 | 	bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) | 
 | 	err := backoff.Retry(func() error { | 
 | 		res = nil | 
 | 		srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{}) | 
 | 		if err != nil { | 
 | 			return fmt.Errorf("GetNodes: %w", err) | 
 | 		} | 
 | 		for { | 
 | 			node, err := srvN.Recv() | 
 | 			if err == io.EOF { | 
 | 				break | 
 | 			} | 
 | 			if err != nil { | 
 | 				return fmt.Errorf("GetNodes.Recv: %w", err) | 
 | 			} | 
 | 			res = append(res, node) | 
 | 		} | 
 | 		return nil | 
 | 	}, bo) | 
 | 	if err != nil { | 
 | 		return nil, err | 
 | 	} | 
 | 	return res, nil | 
 | } | 
 |  | 
 | // getNode wraps Management.GetNodes. It returns node information matching | 
 | // given node ID. | 
 | func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) { | 
 | 	nodes, err := getNodes(ctx, mgmt) | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("could not get nodes: %w", err) | 
 | 	} | 
 | 	for _, n := range nodes { | 
 | 		eid := identity.NodeID(n.Pubkey) | 
 | 		if eid != id { | 
 | 			continue | 
 | 		} | 
 | 		return n, nil | 
 | 	} | 
 | 	return nil, fmt.Errorf("no such node.") | 
 | } | 
 |  | 
 | // Gets a random EUI-48 Ethernet MAC address | 
 | func generateRandomEthernetMAC() (*net.HardwareAddr, error) { | 
 | 	macBuf := make([]byte, 6) | 
 | 	_, err := rand.Read(macBuf) | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("failed to read randomness for MAC: %v", err) | 
 | 	} | 
 |  | 
 | 	// Set U/L bit and clear I/G bit (locally administered individual MAC) | 
 | 	// Ref IEEE 802-2014 Section 8.2.2 | 
 | 	macBuf[0] = (macBuf[0] | 2) & 0xfe | 
 | 	mac := net.HardwareAddr(macBuf) | 
 | 	return &mac, nil | 
 | } | 
 |  | 
 | const SOCKSPort uint16 = 1080 | 
 |  | 
 | // ClusterPorts contains all ports handled by Nanoswitch. | 
 | var ClusterPorts = []uint16{ | 
 | 	// Forwarded to the first node. | 
 | 	uint16(node.CuratorServicePort), | 
 | 	uint16(node.DebugServicePort), | 
 | 	uint16(node.KubernetesAPIPort), | 
 | 	uint16(node.KubernetesAPIWrappedPort), | 
 |  | 
 | 	// SOCKS proxy to the switch network | 
 | 	SOCKSPort, | 
 | } | 
 |  | 
 | // ClusterOptions contains all options for launching a Metropolis cluster. | 
 | type ClusterOptions struct { | 
 | 	// The number of nodes this cluster should be started with. | 
 | 	NumNodes int | 
 |  | 
 | 	// If true, node logs will be saved to individual files instead of being printed | 
 | 	// out to stderr. The path of these files will be still printed to stdout. | 
 | 	// | 
 | 	// The files will be located within the launch directory inside TEST_TMPDIR (or | 
 | 	// the default tempdir location, if not set). | 
 | 	NodeLogsToFiles bool | 
 |  | 
 | 	// LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without | 
 | 	// bootstrapping them. The nodes' address information in Cluster.Nodes will be | 
 | 	// incomplete. | 
 | 	LeaveNodesNew bool | 
 |  | 
 | 	// Optional local registry which will be made available to the cluster to | 
 | 	// pull images from. This is a more efficient alternative to preseeding all | 
 | 	// images used for testing. | 
 | 	LocalRegistry *localregistry.Server | 
 | } | 
 |  | 
 | // Cluster is the running Metropolis cluster launched using the LaunchCluster | 
 | // function. | 
 | type Cluster struct { | 
 | 	// Owner is the TLS Certificate of the owner of the test cluster. This can be | 
 | 	// used to authenticate further clients to the running cluster. | 
 | 	Owner tls.Certificate | 
 | 	// Ports is the PortMap used to access the first nodes' services (defined in | 
 | 	// ClusterPorts) and the SOCKS proxy (at SOCKSPort). | 
 | 	Ports launch.PortMap | 
 |  | 
 | 	// Nodes is a map from Node ID to its runtime information. | 
 | 	Nodes map[string]*NodeInCluster | 
 | 	// NodeIDs is a list of node IDs that are backing this cluster, in order of | 
 | 	// creation. | 
 | 	NodeIDs []string | 
 |  | 
 | 	// CACertificate is the cluster's CA certificate. | 
 | 	CACertificate *x509.Certificate | 
 |  | 
 | 	// nodesDone is a list of channels populated with the return codes from all the | 
 | 	// nodes' qemu instances. It's used by Close to ensure all nodes have | 
 | 	// successfully been stopped. | 
 | 	nodesDone []chan error | 
 | 	// nodeOpts are the cluster member nodes' mutable launch options, kept here | 
 | 	// to facilitate reboots. | 
 | 	nodeOpts []NodeOptions | 
 | 	// launchDir points at the directory keeping the nodes' state, such as storage | 
 | 	// images, firmware variable files, TPM state. | 
 | 	launchDir string | 
 | 	// socketDir points at the directory keeping UNIX socket files, such as these | 
 | 	// used to facilitate communication between QEMU and swtpm. It's different | 
 | 	// from launchDir, and anchored nearer the file system root, due to the | 
 | 	// socket path length limitation imposed by the kernel. | 
 | 	socketDir   string | 
 | 	metroctlDir string | 
 |  | 
 | 	// socksDialer is used by DialNode to establish connections to nodes via the | 
 | 	// SOCKS server ran by nanoswitch. | 
 | 	socksDialer proxy.Dialer | 
 |  | 
 | 	// authClient is a cached authenticated owner connection to a Curator | 
 | 	// instance within the cluster. | 
 | 	authClient *grpc.ClientConn | 
 |  | 
 | 	// ctxT is the context individual node contexts are created from. | 
 | 	ctxT context.Context | 
 | 	// ctxC is used by Close to cancel the context under which the nodes are | 
 | 	// running. | 
 | 	ctxC context.CancelFunc | 
 | } | 
 |  | 
 | // NodeInCluster represents information about a node that's part of a Cluster. | 
 | type NodeInCluster struct { | 
 | 	// ID of the node, which can be used to dial this node's services via DialNode. | 
 | 	ID     string | 
 | 	Pubkey []byte | 
 | 	// Address of the node on the network ran by nanoswitch. Not reachable from the | 
 | 	// host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable | 
 | 	// on Cluster.Ports[SOCKSPort]). | 
 | 	ManagementAddress string | 
 | } | 
 |  | 
 | // firstConnection performs the initial owner credential escrow with a newly | 
 | // started nanoswitch-backed cluster over SOCKS. It expects the first node to be | 
 | // running at 10.1.0.2, which is always the case with the current nanoswitch | 
 | // implementation. | 
 | // | 
 | // It returns the newly escrowed credentials as well as the first node's | 
 | // information as NodeInCluster. | 
 | func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) { | 
 | 	// Dial external service. | 
 | 	remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString()) | 
 | 	initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil) | 
 | 	if err != nil { | 
 | 		return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err) | 
 | 	} | 
 | 	initDialer := func(_ context.Context, addr string) (net.Conn, error) { | 
 | 		return socksDialer.Dial("tcp", addr) | 
 | 	} | 
 | 	initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds)) | 
 | 	if err != nil { | 
 | 		return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err) | 
 | 	} | 
 | 	defer initClient.Close() | 
 |  | 
 | 	// Retrieve owner certificate - this can take a while because the node is still | 
 | 	// coming up, so do it in a backoff loop. | 
 | 	launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...") | 
 | 	aaa := apb.NewAAAClient(initClient) | 
 | 	var cert *tls.Certificate | 
 | 	err = backoff.Retry(func() error { | 
 | 		cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey) | 
 | 		if st, ok := status.FromError(err); ok { | 
 | 			if st.Code() == codes.Unavailable { | 
 | 				launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message()) | 
 | 				return err | 
 | 			} | 
 | 		} | 
 | 		return backoff.Permanent(err) | 
 | 	}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)) | 
 | 	if err != nil { | 
 | 		return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err) | 
 | 	} | 
 | 	launch.Log("Cluster: retrieved owner certificate.") | 
 |  | 
 | 	// Now connect authenticated and get the node ID. | 
 | 	creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure()) | 
 | 	authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds)) | 
 | 	if err != nil { | 
 | 		return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err) | 
 | 	} | 
 | 	defer authClient.Close() | 
 | 	mgmt := apb.NewManagementClient(authClient) | 
 |  | 
 | 	var node *NodeInCluster | 
 | 	err = backoff.Retry(func() error { | 
 | 		nodes, err := getNodes(ctx, mgmt) | 
 | 		if err != nil { | 
 | 			return fmt.Errorf("retrieving nodes failed: %w", err) | 
 | 		} | 
 | 		if len(nodes) != 1 { | 
 | 			return fmt.Errorf("expected one node, got %d", len(nodes)) | 
 | 		} | 
 | 		n := nodes[0] | 
 | 		if n.Status == nil || n.Status.ExternalAddress == "" { | 
 | 			return fmt.Errorf("node has no status and/or address") | 
 | 		} | 
 | 		node = &NodeInCluster{ | 
 | 			ID:                identity.NodeID(n.Pubkey), | 
 | 			ManagementAddress: n.Status.ExternalAddress, | 
 | 		} | 
 | 		return nil | 
 | 	}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)) | 
 | 	if err != nil { | 
 | 		return nil, nil, err | 
 | 	} | 
 |  | 
 | 	return cert, node, nil | 
 | } | 
 |  | 
 | func NewSerialFileLogger(p string) (io.ReadWriter, error) { | 
 | 	f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600) | 
 | 	if err != nil { | 
 | 		return nil, err | 
 | 	} | 
 | 	return f, nil | 
 | } | 
 |  | 
 | // LaunchCluster launches a cluster of Metropolis node VMs together with a | 
 | // Nanoswitch instance to network them all together. | 
 | // | 
 | // The given context will be used to run all qemu instances in the cluster, and | 
 | // canceling the context or calling Close() will terminate them. | 
 | func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) { | 
 | 	if opts.NumNodes <= 0 { | 
 | 		return nil, errors.New("refusing to start cluster with zero nodes") | 
 | 	} | 
 |  | 
 | 	// Create the launch directory. | 
 | 	ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*") | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("failed to create the launch directory: %w", err) | 
 | 	} | 
 | 	// Create the metroctl config directory. We keep it in /tmp because in some | 
 | 	// scenarios it's end-user visible and we want it short. | 
 | 	md, err := os.MkdirTemp("/tmp", "metroctl-*") | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("failed to create the metroctl directory: %w", err) | 
 | 	} | 
 |  | 
 | 	// Create the socket directory. We keep it in /tmp because of socket path limits. | 
 | 	sd, err := os.MkdirTemp("/tmp", "cluster-*") | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("failed to create the socket directory: %w", err) | 
 | 	} | 
 |  | 
 | 	// Prepare links between nodes and nanoswitch. | 
 | 	var switchPorts []*os.File | 
 | 	var vmPorts []*os.File | 
 | 	for i := 0; i < opts.NumNodes; i++ { | 
 | 		switchPort, vmPort, err := launch.NewSocketPair() | 
 | 		if err != nil { | 
 | 			return nil, fmt.Errorf("failed to get socketpair: %w", err) | 
 | 		} | 
 | 		switchPorts = append(switchPorts, switchPort) | 
 | 		vmPorts = append(vmPorts, vmPort) | 
 | 	} | 
 |  | 
 | 	// Make a list of channels that will be populated by all running node qemu | 
 | 	// processes. | 
 | 	done := make([]chan error, opts.NumNodes) | 
 | 	for i := range done { | 
 | 		done[i] = make(chan error, 1) | 
 | 	} | 
 |  | 
 | 	// Prepare the node options. These will be kept as part of Cluster. | 
 | 	// nodeOpts[].Runtime will be initialized by LaunchNode during the first | 
 | 	// launch. The runtime information can be later used to restart a node. | 
 | 	// The 0th node will be initialized first. The rest will follow after it | 
 | 	// had bootstrapped the cluster. | 
 | 	nodeOpts := make([]NodeOptions, opts.NumNodes) | 
 | 	nodeOpts[0] = NodeOptions{ | 
 | 		Name:            "node0", | 
 | 		ConnectToSocket: vmPorts[0], | 
 | 		NodeParameters: &apb.NodeParameters{ | 
 | 			Cluster: &apb.NodeParameters_ClusterBootstrap_{ | 
 | 				ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{ | 
 | 					OwnerPublicKey: InsecurePublicKey, | 
 | 				}, | 
 | 			}, | 
 | 		}, | 
 | 		SerialPort: newPrefixedStdio(0), | 
 | 		PcapDump:   true, | 
 | 	} | 
 | 	if opts.NodeLogsToFiles { | 
 | 		path := path.Join(ld, "node-1.txt") | 
 | 		port, err := NewSerialFileLogger(path) | 
 | 		if err != nil { | 
 | 			return nil, fmt.Errorf("could not open log file for node 1: %w", err) | 
 | 		} | 
 | 		launch.Log("Node 1 logs at %s", path) | 
 | 		nodeOpts[0].SerialPort = port | 
 | 	} | 
 |  | 
 | 	// Start the first node. | 
 | 	ctxT, ctxC := context.WithCancel(ctx) | 
 | 	launch.Log("Cluster: Starting node %d...", 1) | 
 | 	go func() { | 
 | 		err := LaunchNode(ctxT, ld, sd, &nodeOpts[0]) | 
 | 		if err != nil { | 
 | 			launch.Log("Node %d finished with an error: %v", 1, err) | 
 | 		} | 
 | 		done[0] <- err | 
 | 	}() | 
 |  | 
 | 	localRegistryAddr := net.TCPAddr{ | 
 | 		IP:   net.IPv4(10, 42, 0, 82), | 
 | 		Port: 5000, | 
 | 	} | 
 |  | 
 | 	var guestSvcMap launch.GuestServiceMap | 
 | 	if opts.LocalRegistry != nil { | 
 | 		l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)}) | 
 | 		if err != nil { | 
 | 			ctxC() | 
 | 			return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err) | 
 | 		} | 
 | 		s := http.Server{ | 
 | 			Handler: opts.LocalRegistry, | 
 | 		} | 
 | 		go s.Serve(l) | 
 | 		go func() { | 
 | 			<-ctxT.Done() | 
 | 			s.Close() | 
 | 		}() | 
 | 		guestSvcMap = launch.GuestServiceMap{ | 
 | 			&localRegistryAddr: *l.Addr().(*net.TCPAddr), | 
 | 		} | 
 | 	} | 
 |  | 
 | 	// Launch nanoswitch. | 
 | 	portMap, err := launch.ConflictFreePortMap(ClusterPorts) | 
 | 	if err != nil { | 
 | 		ctxC() | 
 | 		return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err) | 
 | 	} | 
 |  | 
 | 	go func() { | 
 | 		var serialPort io.ReadWriter | 
 | 		if opts.NodeLogsToFiles { | 
 | 			path := path.Join(ld, "nanoswitch.txt") | 
 | 			serialPort, err = NewSerialFileLogger(path) | 
 | 			if err != nil { | 
 | 				launch.Log("Could not open log file for nanoswitch: %v", err) | 
 | 			} | 
 | 			launch.Log("Nanoswitch logs at %s", path) | 
 | 		} else { | 
 | 			serialPort = newPrefixedStdio(99) | 
 | 		} | 
 | 		if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{ | 
 | 			Name:                   "nanoswitch", | 
 | 			KernelPath:             "metropolis/test/ktest/vmlinux", | 
 | 			InitramfsPath:          "metropolis/test/nanoswitch/initramfs.cpio.zst", | 
 | 			ExtraNetworkInterfaces: switchPorts, | 
 | 			PortMap:                portMap, | 
 | 			GuestServiceMap:        guestSvcMap, | 
 | 			SerialPort:             serialPort, | 
 | 			PcapDump:               path.Join(ld, "nanoswitch.pcap"), | 
 | 		}); err != nil { | 
 | 			if !errors.Is(err, ctxT.Err()) { | 
 | 				launch.Fatal("Failed to launch nanoswitch: %v", err) | 
 | 			} | 
 | 		} | 
 | 	}() | 
 |  | 
 | 	// Build SOCKS dialer. | 
 | 	socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort]) | 
 | 	socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct) | 
 | 	if err != nil { | 
 | 		ctxC() | 
 | 		return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err) | 
 | 	} | 
 |  | 
 | 	// Retrieve owner credentials and first node. | 
 | 	cert, firstNode, err := firstConnection(ctxT, socksDialer) | 
 | 	if err != nil { | 
 | 		ctxC() | 
 | 		return nil, err | 
 | 	} | 
 |  | 
 | 	// Write credentials to the metroctl directory. | 
 | 	if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil { | 
 | 		ctxC() | 
 | 		return nil, fmt.Errorf("could not write owner key: %w", err) | 
 | 	} | 
 | 	if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil { | 
 | 		ctxC() | 
 | 		return nil, fmt.Errorf("could not write owner certificate: %w", err) | 
 | 	} | 
 |  | 
 | 	// Set up a partially initialized cluster instance, to be filled in in the | 
 | 	// later steps. | 
 | 	cluster := &Cluster{ | 
 | 		Owner: *cert, | 
 | 		Ports: portMap, | 
 | 		Nodes: map[string]*NodeInCluster{ | 
 | 			firstNode.ID: firstNode, | 
 | 		}, | 
 | 		NodeIDs: []string{ | 
 | 			firstNode.ID, | 
 | 		}, | 
 |  | 
 | 		nodesDone:   done, | 
 | 		nodeOpts:    nodeOpts, | 
 | 		launchDir:   ld, | 
 | 		socketDir:   sd, | 
 | 		metroctlDir: md, | 
 |  | 
 | 		socksDialer: socksDialer, | 
 |  | 
 | 		ctxT: ctxT, | 
 | 		ctxC: ctxC, | 
 | 	} | 
 |  | 
 | 	// Now start the rest of the nodes and register them into the cluster. | 
 |  | 
 | 	// Get an authenticated owner client within the cluster. | 
 | 	curC, err := cluster.CuratorClient() | 
 | 	if err != nil { | 
 | 		ctxC() | 
 | 		return nil, fmt.Errorf("CuratorClient: %w", err) | 
 | 	} | 
 | 	mgmt := apb.NewManagementClient(curC) | 
 |  | 
 | 	// Retrieve register ticket to register further nodes. | 
 | 	launch.Log("Cluster: retrieving register ticket...") | 
 | 	resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{}) | 
 | 	if err != nil { | 
 | 		ctxC() | 
 | 		return nil, fmt.Errorf("GetRegisterTicket: %w", err) | 
 | 	} | 
 | 	ticket := resT.Ticket | 
 | 	launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket)) | 
 |  | 
 | 	// Retrieve cluster info (for directory and ca public key) to register further | 
 | 	// nodes. | 
 | 	resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{}) | 
 | 	if err != nil { | 
 | 		ctxC() | 
 | 		return nil, fmt.Errorf("GetClusterInfo: %w", err) | 
 | 	} | 
 | 	caCert, err := x509.ParseCertificate(resI.CaCertificate) | 
 | 	if err != nil { | 
 | 		ctxC() | 
 | 		return nil, fmt.Errorf("ParseCertificate: %w", err) | 
 | 	} | 
 | 	cluster.CACertificate = caCert | 
 |  | 
 | 	// Use the retrieved information to configure the rest of the node options. | 
 | 	for i := 1; i < opts.NumNodes; i++ { | 
 | 		nodeOpts[i] = NodeOptions{ | 
 | 			Name:            fmt.Sprintf("node%d", i), | 
 | 			ConnectToSocket: vmPorts[i], | 
 | 			NodeParameters: &apb.NodeParameters{ | 
 | 				Cluster: &apb.NodeParameters_ClusterRegister_{ | 
 | 					ClusterRegister: &apb.NodeParameters_ClusterRegister{ | 
 | 						RegisterTicket:   ticket, | 
 | 						ClusterDirectory: resI.ClusterDirectory, | 
 | 						CaCertificate:    resI.CaCertificate, | 
 | 					}, | 
 | 				}, | 
 | 			}, | 
 | 			SerialPort: newPrefixedStdio(i), | 
 | 		} | 
 | 		if opts.NodeLogsToFiles { | 
 | 			path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1)) | 
 | 			port, err := NewSerialFileLogger(path) | 
 | 			if err != nil { | 
 | 				return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err) | 
 | 			} | 
 | 			launch.Log("Node %d logs at %s", i+1, path) | 
 | 			nodeOpts[i].SerialPort = port | 
 | 		} | 
 | 	} | 
 |  | 
 | 	// Now run the rest of the nodes. | 
 | 	for i := 1; i < opts.NumNodes; i++ { | 
 | 		launch.Log("Cluster: Starting node %d...", i+1) | 
 | 		go func(i int) { | 
 | 			err := LaunchNode(ctxT, ld, sd, &nodeOpts[i]) | 
 | 			if err != nil { | 
 | 				launch.Log("Node %d finished with an error: %v", i, err) | 
 | 			} | 
 | 			done[i] <- err | 
 | 		}(i) | 
 | 	} | 
 |  | 
 | 	seenNodes := make(map[string]bool) | 
 | 	launch.Log("Cluster: waiting for nodes to appear as NEW...") | 
 | 	for i := 1; i < opts.NumNodes; i++ { | 
 | 		for { | 
 | 			nodes, err := getNodes(ctx, mgmt) | 
 | 			if err != nil { | 
 | 				ctxC() | 
 | 				return nil, fmt.Errorf("could not get nodes: %w", err) | 
 | 			} | 
 | 			for _, n := range nodes { | 
 | 				if n.State != cpb.NodeState_NODE_STATE_NEW { | 
 | 					continue | 
 | 				} | 
 | 				seenNodes[n.Id] = true | 
 | 				cluster.Nodes[n.Id] = &NodeInCluster{ | 
 | 					ID:     n.Id, | 
 | 					Pubkey: n.Pubkey, | 
 | 				} | 
 | 				cluster.NodeIDs = append(cluster.NodeIDs, n.Id) | 
 | 			} | 
 |  | 
 | 			if len(seenNodes) == opts.NumNodes-1 { | 
 | 				break | 
 | 			} | 
 | 			time.Sleep(1 * time.Second) | 
 | 		} | 
 | 	} | 
 | 	launch.Log("Found all expected nodes") | 
 |  | 
 | 	approvedNodes := make(map[string]bool) | 
 | 	upNodes := make(map[string]bool) | 
 | 	if !opts.LeaveNodesNew { | 
 | 		for { | 
 | 			nodes, err := getNodes(ctx, mgmt) | 
 | 			if err != nil { | 
 | 				ctxC() | 
 | 				return nil, fmt.Errorf("could not get nodes: %w", err) | 
 | 			} | 
 | 			for _, node := range nodes { | 
 | 				if !seenNodes[node.Id] { | 
 | 					// Skip nodes that weren't NEW in the previous step. | 
 | 					continue | 
 | 				} | 
 |  | 
 | 				if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" { | 
 | 					launch.Log("Cluster: node %s is up", node.Id) | 
 | 					upNodes[node.Id] = true | 
 | 					cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress | 
 | 				} | 
 | 				if upNodes[node.Id] { | 
 | 					continue | 
 | 				} | 
 |  | 
 | 				if !approvedNodes[node.Id] { | 
 | 					launch.Log("Cluster: approving node %s", node.Id) | 
 | 					_, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{ | 
 | 						Pubkey: node.Pubkey, | 
 | 					}) | 
 | 					if err != nil { | 
 | 						ctxC() | 
 | 						return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err) | 
 | 					} | 
 | 					approvedNodes[node.Id] = true | 
 | 				} | 
 | 			} | 
 |  | 
 | 			launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes)) | 
 | 			if len(upNodes) == opts.NumNodes-1 { | 
 | 				break | 
 | 			} | 
 | 			time.Sleep(time.Second) | 
 | 		} | 
 | 	} | 
 |  | 
 | 	launch.Log("Cluster: all nodes up:") | 
 | 	for _, node := range cluster.Nodes { | 
 | 		launch.Log("Cluster:  - %s at %s", node.ID, node.ManagementAddress) | 
 | 	} | 
 | 	launch.Log("Cluster: starting tests...") | 
 |  | 
 | 	return cluster, nil | 
 | } | 
 |  | 
 | // RebootNode reboots the cluster member node matching the given index, and | 
 | // waits for it to rejoin the cluster. It will use the given context ctx to run | 
 | // cluster API requests, whereas the resulting QEMU process will be created | 
 | // using the cluster's context c.ctxT. The nodes are indexed starting at 0. | 
 | func (c *Cluster) RebootNode(ctx context.Context, idx int) error { | 
 | 	if idx < 0 || idx >= len(c.NodeIDs) { | 
 | 		return fmt.Errorf("index out of bounds.") | 
 | 	} | 
 | 	id := c.NodeIDs[idx] | 
 |  | 
 | 	// Get an authenticated owner client within the cluster. | 
 | 	curC, err := c.CuratorClient() | 
 | 	if err != nil { | 
 | 		return err | 
 | 	} | 
 | 	mgmt := apb.NewManagementClient(curC) | 
 |  | 
 | 	// Get the timestamp of the node's last update, as observed by Curator. | 
 | 	// It'll be needed to make sure it had rejoined the cluster after the reboot. | 
 | 	var is *apb.Node | 
 | 	for { | 
 | 		r, err := getNode(ctx, mgmt, id) | 
 | 		if err != nil { | 
 | 			return err | 
 | 		} | 
 |  | 
 | 		// Node status may be absent if it hasn't reported to the cluster yet. Wait | 
 | 		// for it to appear before progressing further. | 
 | 		if r.Status != nil { | 
 | 			is = r | 
 | 			break | 
 | 		} | 
 | 		time.Sleep(time.Second) | 
 | 	} | 
 |  | 
 | 	// Cancel the node's context. This will shut down QEMU. | 
 | 	c.nodeOpts[idx].Runtime.CtxC() | 
 | 	launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id) | 
 | 	err = <-c.nodesDone[idx] | 
 | 	if err != nil { | 
 | 		return fmt.Errorf("while restarting node: %w", err) | 
 | 	} | 
 |  | 
 | 	// Start QEMU again. | 
 | 	launch.Log("Cluster: restarting node %d (%s).", idx, id) | 
 | 	go func(n int) { | 
 | 		err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n]) | 
 | 		if err != nil { | 
 | 			launch.Log("Node %d finished with an error: %v", n, err) | 
 | 		} | 
 | 		c.nodesDone[n] <- err | 
 | 	}(idx) | 
 |  | 
 | 	// Poll Management.GetNodes until the node's timestamp is updated. | 
 | 	for { | 
 | 		cs, err := getNode(ctx, mgmt, id) | 
 | 		if err != nil { | 
 | 			launch.Log("Cluster: node get error: %v", err) | 
 | 			return err | 
 | 		} | 
 | 		launch.Log("Cluster: node status: %+v", cs) | 
 | 		if cs.Status == nil { | 
 | 			continue | 
 | 		} | 
 | 		if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 { | 
 | 			break | 
 | 		} | 
 | 		time.Sleep(time.Second) | 
 | 	} | 
 | 	launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id) | 
 | 	return nil | 
 | } | 
 |  | 
 | // Close cancels the running clusters' context and waits for all virtualized | 
 | // nodes to stop. It returns an error if stopping the nodes failed, or one of | 
 | // the nodes failed to fully start in the first place. | 
 | func (c *Cluster) Close() error { | 
 | 	launch.Log("Cluster: stopping...") | 
 | 	if c.authClient != nil { | 
 | 		c.authClient.Close() | 
 | 	} | 
 | 	c.ctxC() | 
 |  | 
 | 	var errs []error | 
 | 	launch.Log("Cluster: waiting for nodes to exit...") | 
 | 	for _, c := range c.nodesDone { | 
 | 		err := <-c | 
 | 		if err != nil { | 
 | 			errs = append(errs, err) | 
 | 		} | 
 | 	} | 
 | 	launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir) | 
 | 	os.RemoveAll(c.launchDir) | 
 | 	os.RemoveAll(c.socketDir) | 
 | 	os.RemoveAll(c.metroctlDir) | 
 | 	launch.Log("Cluster: done") | 
 | 	return multierr.Combine(errs...) | 
 | } | 
 |  | 
 | // DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by | 
 | // their ID. This is performed by connecting to the cluster nanoswitch via its | 
 | // SOCKS proxy, and using the cluster node list for name resolution. | 
 | // | 
 | // For example: | 
 | // | 
 | //	grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode)) | 
 | func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) { | 
 | 	host, port, err := net.SplitHostPort(addr) | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("invalid host:port: %w", err) | 
 | 	} | 
 | 	// Already an IP address? | 
 | 	if net.ParseIP(host) != nil { | 
 | 		return c.socksDialer.Dial("tcp", addr) | 
 | 	} | 
 |  | 
 | 	// Otherwise, expect a node name. | 
 | 	node, ok := c.Nodes[host] | 
 | 	if !ok { | 
 | 		return nil, fmt.Errorf("unknown node %q", host) | 
 | 	} | 
 | 	addr = net.JoinHostPort(node.ManagementAddress, port) | 
 | 	return c.socksDialer.Dial("tcp", addr) | 
 | } | 
 |  | 
 | // GetKubeClientSet gets a Kubernetes client set accessing the Metropolis | 
 | // Kubernetes authenticating proxy using the cluster owner identity. | 
 | // It currently has access to everything (i.e. the cluster-admin role) | 
 | // via the owner-admin binding. | 
 | func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) { | 
 | 	pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey) | 
 | 	if err != nil { | 
 | 		// We explicitly pass an Ed25519 private key in, so this can't happen | 
 | 		panic(err) | 
 | 	} | 
 |  | 
 | 	host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString()) | 
 | 	clientConfig := rest.Config{ | 
 | 		Host: host, | 
 | 		TLSClientConfig: rest.TLSClientConfig{ | 
 | 			// TODO(q3k): use CA certificate | 
 | 			Insecure:   true, | 
 | 			ServerName: "kubernetes.default.svc", | 
 | 			CertData:   pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}), | 
 | 			KeyData:    pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}), | 
 | 		}, | 
 | 		Dial: func(ctx context.Context, network, address string) (net.Conn, error) { | 
 | 			return c.DialNode(ctx, address) | 
 | 		}, | 
 | 	} | 
 | 	return kubernetes.NewForConfig(&clientConfig) | 
 | } | 
 |  | 
 | // KubernetesControllerNodeAddresses returns the list of IP addresses of nodes | 
 | // which are currently Kubernetes controllers, ie. run an apiserver. This list | 
 | // might be empty if no node is currently configured with the | 
 | // 'KubernetesController' node. | 
 | func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) { | 
 | 	curC, err := c.CuratorClient() | 
 | 	if err != nil { | 
 | 		return nil, err | 
 | 	} | 
 | 	mgmt := apb.NewManagementClient(curC) | 
 | 	srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{ | 
 | 		Filter: "has(node.roles.kubernetes_controller)", | 
 | 	}) | 
 | 	if err != nil { | 
 | 		return nil, err | 
 | 	} | 
 | 	defer srv.CloseSend() | 
 | 	var res []string | 
 | 	for { | 
 | 		n, err := srv.Recv() | 
 | 		if err == io.EOF { | 
 | 			break | 
 | 		} | 
 | 		if err != nil { | 
 | 			return nil, err | 
 | 		} | 
 | 		if n.Status == nil || n.Status.ExternalAddress == "" { | 
 | 			continue | 
 | 		} | 
 | 		res = append(res, n.Status.ExternalAddress) | 
 | 	} | 
 | 	return res, nil | 
 | } | 
 |  | 
 | // AllNodesHealthy returns nil if all the nodes in the cluster are seemingly | 
 | // healthy. | 
 | func (c *Cluster) AllNodesHealthy(ctx context.Context) error { | 
 | 	// Get an authenticated owner client within the cluster. | 
 | 	curC, err := c.CuratorClient() | 
 | 	if err != nil { | 
 | 		return err | 
 | 	} | 
 | 	mgmt := apb.NewManagementClient(curC) | 
 | 	nodes, err := getNodes(ctx, mgmt) | 
 | 	if err != nil { | 
 | 		return err | 
 | 	} | 
 |  | 
 | 	var unhealthy []string | 
 | 	for _, node := range nodes { | 
 | 		if node.Health == apb.Node_HEALTHY { | 
 | 			continue | 
 | 		} | 
 | 		unhealthy = append(unhealthy, node.Id) | 
 | 	} | 
 | 	if len(unhealthy) == 0 { | 
 | 		return nil | 
 | 	} | 
 | 	return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", ")) | 
 | } | 
 |  | 
 | // ApproveNode approves a node by ID, waiting for it to become UP. | 
 | func (c *Cluster) ApproveNode(ctx context.Context, id string) error { | 
 | 	curC, err := c.CuratorClient() | 
 | 	if err != nil { | 
 | 		return err | 
 | 	} | 
 | 	mgmt := apb.NewManagementClient(curC) | 
 |  | 
 | 	_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{ | 
 | 		Pubkey: c.Nodes[id].Pubkey, | 
 | 	}) | 
 | 	if err != nil { | 
 | 		return fmt.Errorf("ApproveNode: %w", err) | 
 | 	} | 
 | 	launch.Log("Cluster: %s: approved, waiting for UP", id) | 
 | 	for { | 
 | 		nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{}) | 
 | 		if err != nil { | 
 | 			return fmt.Errorf("GetNodes: %w", err) | 
 | 		} | 
 | 		found := false | 
 | 		for { | 
 | 			node, err := nodes.Recv() | 
 | 			if errors.Is(err, io.EOF) { | 
 | 				break | 
 | 			} | 
 | 			if err != nil { | 
 | 				return fmt.Errorf("Nodes.Recv: %w", err) | 
 | 			} | 
 | 			if node.Id != id { | 
 | 				continue | 
 | 			} | 
 | 			if node.State != cpb.NodeState_NODE_STATE_UP { | 
 | 				continue | 
 | 			} | 
 | 			found = true | 
 | 			break | 
 | 		} | 
 | 		nodes.CloseSend() | 
 |  | 
 | 		if found { | 
 | 			break | 
 | 		} | 
 | 		time.Sleep(time.Second) | 
 | 	} | 
 | 	launch.Log("Cluster: %s: UP", id) | 
 | 	return nil | 
 | } | 
 |  | 
 | // MakeKubernetesWorker adds the KubernetesWorker role to a node by ID. | 
 | func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error { | 
 | 	curC, err := c.CuratorClient() | 
 | 	if err != nil { | 
 | 		return err | 
 | 	} | 
 | 	mgmt := apb.NewManagementClient(curC) | 
 |  | 
 | 	tr := true | 
 | 	launch.Log("Cluster: %s: adding KubernetesWorker", id) | 
 | 	_, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{ | 
 | 		Node: &apb.UpdateNodeRolesRequest_Id{ | 
 | 			Id: id, | 
 | 		}, | 
 | 		KubernetesWorker: &tr, | 
 | 	}) | 
 | 	return err | 
 | } |