// cluster builds on the launch package and implements launching Metropolis
// nodes and clusters in a virtualized environment using qemu. It's kept in a
// separate package as it depends on a Metropolis node image, which might not be
// required for some use of the launch library.
package cluster

import (
	"bytes"
	"context"
	"crypto/ed25519"
	"crypto/rand"
	"crypto/tls"
	"crypto/x509"
	"encoding/pem"
	"errors"
	"fmt"
	"io"
	"net"
	"net/http"
	"os"
	"os/exec"
	"path"
	"path/filepath"
	"strconv"
	"strings"
	"syscall"
	"time"

	"github.com/bazelbuild/rules_go/go/runfiles"
	"github.com/cenkalti/backoff/v4"
	"go.uber.org/multierr"
	"golang.org/x/net/proxy"
	"golang.org/x/sys/unix"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/proto"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"

	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
	apb "source.monogon.dev/metropolis/proto/api"
	cpb "source.monogon.dev/metropolis/proto/common"

	"source.monogon.dev/go/qcow2"
	metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
	"source.monogon.dev/metropolis/node"
	"source.monogon.dev/metropolis/node/core/identity"
	"source.monogon.dev/metropolis/node/core/rpc"
	"source.monogon.dev/metropolis/node/core/rpc/resolver"
	"source.monogon.dev/metropolis/pkg/localregistry"
	"source.monogon.dev/metropolis/test/launch"
)

const (
	// nodeNumberKey is the key of the node label used to carry a node's numerical
	// index in the test system.
	nodeNumberKey string = "test-node-number"
)

// NodeOptions contains all options that can be passed to Launch()
type NodeOptions struct {
	// Name is a human-readable identifier to be used in debug output.
	Name string

	// Ports contains the port mapping where to expose the internal ports of the VM to
	// the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
	// ConnectToSocket is set.
	Ports launch.PortMap

	// If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
	// command. Metropolis nodes generally restart on almost all errors, so unless you
	// want to test reboot behavior this should be false.
	AllowReboot bool

	// By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
	// set, it is instead connected to the given file descriptor/socket. If this is
	// set, all port maps from the Ports option are ignored. Intended for networking
	// this instance together with others for running more complex network
	// configurations.
	ConnectToSocket *os.File

	// When PcapDump is set, all traffic is dumped to a pcap file in the
	// runtime directory (e.g. "net0.pcap" for the first interface).
	PcapDump bool

	// SerialPort is an io.ReadWriter over which you can communicate with the serial
	// port of the machine. It can be set to an existing file descriptor (like
	// os.Stdout/os.Stderr) or any Go structure implementing this interface.
	SerialPort io.ReadWriter

	// NodeParameters is passed into the VM and subsequently used for bootstrapping or
	// registering into a cluster.
	NodeParameters *apb.NodeParameters

	// Mac is the node's MAC address.
	Mac *net.HardwareAddr

	// Runtime keeps the node's QEMU runtime state.
	Runtime *NodeRuntime
}

// NodeRuntime keeps the node's QEMU runtime options.
type NodeRuntime struct {
	// ld points at the node's launch directory storing data such as storage
	// images, firmware variables or the TPM state.
	ld string
	// sd points at the node's socket directory.
	sd string

	// ctxT is the context QEMU will execute in.
	ctxT context.Context
	// CtxC is the QEMU context's cancellation function.
	CtxC context.CancelFunc
}

// NodePorts is the list of ports a fully operational Metropolis node listens on
var NodePorts = []node.Port{
	node.ConsensusPort,

	node.CuratorServicePort,
	node.DebugServicePort,

	node.KubernetesAPIPort,
	node.KubernetesAPIWrappedPort,
	node.CuratorServicePort,
	node.DebuggerPort,
	node.MetricsPort,
}

// setupRuntime creates the node's QEMU runtime directory, together with all
// files required to preserve its state, a level below the chosen path ld. The
// node's socket directory is similarily created a level below sd. It may
// return an I/O error.
func setupRuntime(ld, sd string) (*NodeRuntime, error) {
	// Create a temporary directory to keep all the runtime files.
	stdp, err := os.MkdirTemp(ld, "node_state*")
	if err != nil {
		return nil, fmt.Errorf("failed to create the state directory: %w", err)
	}

	// Initialize the node's storage with a prebuilt image.
	si, err := runfiles.Rlocation("_main/metropolis/node/image.img")
	if err != nil {
		return nil, fmt.Errorf("while resolving a path: %w", err)
	}

	di := filepath.Join(stdp, "image.qcow2")
	launch.Log("Cluster: generating node QCOW2 snapshot image: %s -> %s", si, di)

	df, err := os.Create(di)
	if err != nil {
		return nil, fmt.Errorf("while opening image for writing: %w", err)
	}
	defer df.Close()
	if err := qcow2.Generate(df, qcow2.GenerateWithBackingFile(si)); err != nil {
		return nil, fmt.Errorf("while creating copy-on-write node image: %w", err)
	}

	// Initialize the OVMF firmware variables file.
	sv, err := runfiles.Rlocation("edk2/OVMF_VARS.fd")
	if err != nil {
		return nil, fmt.Errorf("while resolving a path: %w", err)
	}
	dv := filepath.Join(stdp, filepath.Base(sv))
	if err := copyFile(sv, dv); err != nil {
		return nil, fmt.Errorf("while copying firmware variables: %w", err)
	}

	// Create the socket directory.
	sotdp, err := os.MkdirTemp(sd, "node_sock*")
	if err != nil {
		return nil, fmt.Errorf("failed to create the socket directory: %w", err)
	}

	return &NodeRuntime{
		ld: stdp,
		sd: sotdp,
	}, nil
}

// CuratorClient returns an authenticated owner connection to a Curator
// instance within Cluster c, or nil together with an error.
func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
	if c.authClient == nil {
		authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
		r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
			launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
		}))
		for _, n := range c.NodeIDs {
			ep, err := resolver.NodeWithDefaultPort(n)
			if err != nil {
				return nil, fmt.Errorf("could not add node %q by DNS: %w", n, err)
			}
			r.AddEndpoint(ep)
		}
		authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
			grpc.WithTransportCredentials(authCreds),
			grpc.WithResolvers(r),
			grpc.WithContextDialer(c.DialNode),
		)
		if err != nil {
			return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
		}
		c.authClient = authClient
	}
	return c.authClient, nil
}

// LaunchNode launches a single Metropolis node instance with the given options.
// The instance runs mostly paravirtualized but with some emulated hardware
// similar to how a cloud provider might set up its VMs. The disk is fully
// writable, and the changes are kept across reboots and shutdowns. ld and sd
// point to the launch directory and the socket directory, holding the nodes'
// state files (storage, tpm state, firmware state), and UNIX socket files
// (swtpm <-> QEMU interplay) respectively. The directories must exist before
// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
// if either are not initialized.
func LaunchNode(ctx context.Context, ld, sd string, tpmFactory *TPMFactory, options *NodeOptions, doneC chan error) error {
	// TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
	// of /tmp (requires QEMU version >5.0).
	// https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
	// swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
	// that we open and pass the name of it to QEMU. Not pinning this crashes both
	// swtpm and qemu because we run into UNIX socket length limitations (for legacy
	// reasons 108 chars).

	// If it's the node's first start, set up its runtime directories.
	if options.Runtime == nil {
		r, err := setupRuntime(ld, sd)
		if err != nil {
			return fmt.Errorf("while setting up node runtime: %w", err)
		}
		options.Runtime = r
	}

	// Replace the node's context with a new one.
	r := options.Runtime
	if r.CtxC != nil {
		r.CtxC()
	}
	r.ctxT, r.CtxC = context.WithCancel(ctx)

	var qemuNetType string
	var qemuNetConfig launch.QemuValue
	if options.ConnectToSocket != nil {
		qemuNetType = "socket"
		qemuNetConfig = launch.QemuValue{
			"id": {"net0"},
			"fd": {"3"},
		}
	} else {
		qemuNetType = "user"
		qemuNetConfig = launch.QemuValue{
			"id":        {"net0"},
			"net":       {"10.42.0.0/24"},
			"dhcpstart": {"10.42.0.10"},
			"hostfwd":   options.Ports.ToQemuForwards(),
		}
	}

	// Generate the node's MAC address if it isn't already set in NodeOptions.
	if options.Mac == nil {
		mac, err := generateRandomEthernetMAC()
		if err != nil {
			return err
		}
		options.Mac = mac
	}

	ovmfCodePath, err := runfiles.Rlocation("edk2/OVMF_CODE.fd")
	if err != nil {
		return err
	}

	tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
	fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
	storagePath := filepath.Join(r.ld, "image.qcow2")
	qemuArgs := []string{
		"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "2048",
		"-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
		"-drive", "if=pflash,format=raw,readonly=on,file=" + ovmfCodePath,
		"-drive", "if=pflash,format=raw,file=" + fwVarPath,
		"-drive", "if=virtio,format=qcow2,cache=unsafe,file=" + storagePath,
		"-netdev", qemuNetConfig.ToOption(qemuNetType),
		"-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
		"-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
		"-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
		"-device", "tpm-tis,tpmdev=tpm0",
		"-device", "virtio-rng-pci",
		"-serial", "stdio",
	}

	if !options.AllowReboot {
		qemuArgs = append(qemuArgs, "-no-reboot")
	}

	if options.NodeParameters != nil {
		parametersPath := filepath.Join(r.ld, "parameters.pb")
		parametersRaw, err := proto.Marshal(options.NodeParameters)
		if err != nil {
			return fmt.Errorf("failed to encode node paraeters: %w", err)
		}
		if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
			return fmt.Errorf("failed to write node parameters: %w", err)
		}
		qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
	}

	if options.PcapDump {
		qemuNetDump := launch.QemuValue{
			"id":     {"net0"},
			"netdev": {"net0"},
			"file":   {filepath.Join(r.ld, "net0.pcap")},
		}
		qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
	}

	// Manufacture TPM if needed.
	tpmd := filepath.Join(r.ld, "tpm")
	err = tpmFactory.Manufacture(ctx, tpmd, &TPMPlatform{
		Manufacturer: "Monogon",
		Version:      "1.0",
		Model:        "TestCluster",
	})
	if err != nil {
		return fmt.Errorf("could not manufacture TPM: %w", err)
	}

	// Start TPM emulator as a subprocess
	swtpm, err := runfiles.Rlocation("swtpm/swtpm")
	if err != nil {
		return fmt.Errorf("could not find swtpm: %w", err)
	}

	tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)

	tpmEmuCmd := exec.CommandContext(tpmCtx, swtpm, "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
	// Silence warnings from unsafe libtpms build (uses non-constant-time
	// cryptographic operations).
	tpmEmuCmd.Env = append(tpmEmuCmd.Env, "MONOGON_LIBTPMS_ACKNOWLEDGE_UNSAFE=yes")
	tpmEmuCmd.Stderr = os.Stderr
	tpmEmuCmd.Stdout = os.Stdout

	err = tpmEmuCmd.Start()
	if err != nil {
		tpmCancel()
		return fmt.Errorf("failed to start TPM emulator: %w", err)
	}

	// Wait for the socket to be created by the TPM emulator before launching
	// QEMU.
	for {
		_, err := os.Stat(tpmSocketPath)
		if err == nil {
			break
		}
		if !os.IsNotExist(err) {
			tpmCancel()
			return fmt.Errorf("while stat-ing TPM socket path: %w", err)
		}
		if err := tpmCtx.Err(); err != nil {
			tpmCancel()
			return fmt.Errorf("while waiting for the TPM socket: %w", err)
		}
		time.Sleep(time.Millisecond * 100)
	}

	// Start the main qemu binary
	systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
	if options.ConnectToSocket != nil {
		systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
	}

	var stdErrBuf bytes.Buffer
	systemCmd.Stderr = &stdErrBuf
	systemCmd.Stdout = options.SerialPort

	launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)

	go func() {
		launch.Log("Node: Starting...")
		err = systemCmd.Run()
		launch.Log("Node: Returned: %v", err)

		// Stop TPM emulator and wait for it to exit to properly reap the child process
		tpmCancel()
		launch.Log("Node: Waiting for TPM emulator to exit")
		// Wait returns a SIGKILL error because we just cancelled its context.
		// We still need to call it to avoid creating zombies.
		errTpm := tpmEmuCmd.Wait()
		launch.Log("Node: TPM emulator done: %v", errTpm)

		var exerr *exec.ExitError
		if err != nil && errors.As(err, &exerr) {
			status := exerr.ProcessState.Sys().(syscall.WaitStatus)
			if status.Signaled() && status.Signal() == syscall.SIGKILL {
				// Process was killed externally (most likely by our context being canceled).
				// This is a normal exit for us, so return nil
				doneC <- nil
				return
			}
			exerr.Stderr = stdErrBuf.Bytes()
			newErr := launch.QEMUError(*exerr)
			launch.Log("Node: %q", stdErrBuf.String())
			doneC <- &newErr
			return
		}
		doneC <- err
	}()
	return nil
}

func copyFile(src, dst string) error {
	in, err := os.Open(src)
	if err != nil {
		return fmt.Errorf("when opening source: %w", err)
	}
	defer in.Close()

	out, err := os.Create(dst)
	if err != nil {
		return fmt.Errorf("when creating destination: %w", err)
	}
	defer out.Close()

	endPos, err := in.Seek(0, io.SeekEnd)
	if err != nil {
		return fmt.Errorf("when getting source end: %w", err)
	}

	// Copy the file while preserving its sparseness. The image files are very
	// sparse (less than 10% allocated), so this is a lot faster.
	var lastHoleStart int64
	for {
		dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
		if err != nil {
			return fmt.Errorf("when seeking to next data block: %w", err)
		}
		holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
		if err != nil {
			return fmt.Errorf("when seeking to next hole: %w", err)
		}
		lastHoleStart = holeStart
		if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
			return fmt.Errorf("when seeking to current data block: %w", err)
		}
		if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
			return fmt.Errorf("when seeking output to next data block: %w", err)
		}
		if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
			return fmt.Errorf("when copying file: %w", err)
		}
		if endPos == holeStart {
			// The next hole is at the end of the file, we're done here.
			break
		}
	}

	return out.Close()
}

// getNodes wraps around Management.GetNodes to return a list of nodes in a
// cluster.
func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
	var res []*apb.Node
	bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
	err := backoff.Retry(func() error {
		res = nil
		srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
		if err != nil {
			return fmt.Errorf("GetNodes: %w", err)
		}
		for {
			node, err := srvN.Recv()
			if err == io.EOF {
				break
			}
			if err != nil {
				return fmt.Errorf("GetNodes.Recv: %w", err)
			}
			res = append(res, node)
		}
		return nil
	}, bo)
	if err != nil {
		return nil, err
	}
	return res, nil
}

// getNode wraps Management.GetNodes. It returns node information matching
// given node ID.
func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
	nodes, err := getNodes(ctx, mgmt)
	if err != nil {
		return nil, fmt.Errorf("could not get nodes: %w", err)
	}
	for _, n := range nodes {
		eid := identity.NodeID(n.Pubkey)
		if eid != id {
			continue
		}
		return n, nil
	}
	return nil, fmt.Errorf("no such node")
}

// Gets a random EUI-48 Ethernet MAC address
func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
	macBuf := make([]byte, 6)
	_, err := rand.Read(macBuf)
	if err != nil {
		return nil, fmt.Errorf("failed to read randomness for MAC: %w", err)
	}

	// Set U/L bit and clear I/G bit (locally administered individual MAC)
	// Ref IEEE 802-2014 Section 8.2.2
	macBuf[0] = (macBuf[0] | 2) & 0xfe
	mac := net.HardwareAddr(macBuf)
	return &mac, nil
}

const SOCKSPort uint16 = 1080

// ClusterPorts contains all ports handled by Nanoswitch.
var ClusterPorts = []uint16{
	// Forwarded to the first node.
	uint16(node.CuratorServicePort),
	uint16(node.DebugServicePort),
	uint16(node.KubernetesAPIPort),
	uint16(node.KubernetesAPIWrappedPort),

	// SOCKS proxy to the switch network
	SOCKSPort,
}

// ClusterOptions contains all options for launching a Metropolis cluster.
type ClusterOptions struct {
	// The number of nodes this cluster should be started with.
	NumNodes int

	// If true, node logs will be saved to individual files instead of being printed
	// out to stderr. The path of these files will be still printed to stdout.
	//
	// The files will be located within the launch directory inside TEST_TMPDIR (or
	// the default tempdir location, if not set).
	NodeLogsToFiles bool

	// LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
	// bootstrapping them. The nodes' address information in Cluster.Nodes will be
	// incomplete.
	LeaveNodesNew bool

	// Optional local registry which will be made available to the cluster to
	// pull images from. This is a more efficient alternative to preseeding all
	// images used for testing.
	LocalRegistry *localregistry.Server

	// InitialClusterConfiguration will be passed to the first node when creating the
	// cluster, and defines some basic properties of the cluster. If not specified,
	// the cluster will default to defaults as defined in
	// metropolis.proto.api.NodeParameters.
	InitialClusterConfiguration *cpb.ClusterConfiguration
}

// Cluster is the running Metropolis cluster launched using the LaunchCluster
// function.
type Cluster struct {
	// Owner is the TLS Certificate of the owner of the test cluster. This can be
	// used to authenticate further clients to the running cluster.
	Owner tls.Certificate
	// Ports is the PortMap used to access the first nodes' services (defined in
	// ClusterPorts) and the SOCKS proxy (at SOCKSPort).
	Ports launch.PortMap

	// Nodes is a map from Node ID to its runtime information.
	Nodes map[string]*NodeInCluster
	// NodeIDs is a list of node IDs that are backing this cluster, in order of
	// creation.
	NodeIDs []string

	// CACertificate is the cluster's CA certificate.
	CACertificate *x509.Certificate

	// nodesDone is a list of channels populated with the return codes from all the
	// nodes' qemu instances. It's used by Close to ensure all nodes have
	// successfully been stopped.
	nodesDone []chan error
	// nodeOpts are the cluster member nodes' mutable launch options, kept here
	// to facilitate reboots.
	nodeOpts []NodeOptions
	// launchDir points at the directory keeping the nodes' state, such as storage
	// images, firmware variable files, TPM state.
	launchDir string
	// socketDir points at the directory keeping UNIX socket files, such as these
	// used to facilitate communication between QEMU and swtpm. It's different
	// from launchDir, and anchored nearer the file system root, due to the
	// socket path length limitation imposed by the kernel.
	socketDir   string
	metroctlDir string

	// SOCKSDialer is used by DialNode to establish connections to nodes via the
	// SOCKS server ran by nanoswitch.
	SOCKSDialer proxy.Dialer

	// authClient is a cached authenticated owner connection to a Curator
	// instance within the cluster.
	authClient *grpc.ClientConn

	// ctxT is the context individual node contexts are created from.
	ctxT context.Context
	// ctxC is used by Close to cancel the context under which the nodes are
	// running.
	ctxC context.CancelFunc

	tpmFactory *TPMFactory
}

// NodeInCluster represents information about a node that's part of a Cluster.
type NodeInCluster struct {
	// ID of the node, which can be used to dial this node's services via DialNode.
	ID     string
	Pubkey []byte
	// Address of the node on the network ran by nanoswitch. Not reachable from the
	// host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
	// on Cluster.Ports[SOCKSPort]).
	ManagementAddress string
}

// firstConnection performs the initial owner credential escrow with a newly
// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
// running at 10.1.0.2, which is always the case with the current nanoswitch
// implementation.
//
// It returns the newly escrowed credentials as well as the first node's
// information as NodeInCluster.
func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
	// Dial external service.
	remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
	initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
	if err != nil {
		return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
	}
	initDialer := func(_ context.Context, addr string) (net.Conn, error) {
		return socksDialer.Dial("tcp", addr)
	}
	initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
	if err != nil {
		return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
	}
	defer initClient.Close()

	// Retrieve owner certificate - this can take a while because the node is still
	// coming up, so do it in a backoff loop.
	launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
	aaa := apb.NewAAAClient(initClient)
	var cert *tls.Certificate
	err = backoff.Retry(func() error {
		cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.Unavailable {
				launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
				return err
			}
		}
		return backoff.Permanent(err)
	}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
	if err != nil {
		return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
	}
	launch.Log("Cluster: retrieved owner certificate.")

	// Now connect authenticated and get the node ID.
	creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
	authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
	if err != nil {
		return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
	}
	defer authClient.Close()
	mgmt := apb.NewManagementClient(authClient)

	var node *NodeInCluster
	err = backoff.Retry(func() error {
		nodes, err := getNodes(ctx, mgmt)
		if err != nil {
			return fmt.Errorf("retrieving nodes failed: %w", err)
		}
		if len(nodes) != 1 {
			return fmt.Errorf("expected one node, got %d", len(nodes))
		}
		n := nodes[0]
		if n.Status == nil || n.Status.ExternalAddress == "" {
			return fmt.Errorf("node has no status and/or address")
		}
		node = &NodeInCluster{
			ID:                identity.NodeID(n.Pubkey),
			ManagementAddress: n.Status.ExternalAddress,
		}
		return nil
	}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
	if err != nil {
		return nil, nil, err
	}

	return cert, node, nil
}

func NewSerialFileLogger(p string) (io.ReadWriter, error) {
	f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
	if err != nil {
		return nil, err
	}
	return f, nil
}

// LaunchCluster launches a cluster of Metropolis node VMs together with a
// Nanoswitch instance to network them all together.
//
// The given context will be used to run all qemu instances in the cluster, and
// canceling the context or calling Close() will terminate them.
func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
	if opts.NumNodes <= 0 {
		return nil, errors.New("refusing to start cluster with zero nodes")
	}

	// Create the launch directory.
	ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
	if err != nil {
		return nil, fmt.Errorf("failed to create the launch directory: %w", err)
	}
	// Create the metroctl config directory. We keep it in /tmp because in some
	// scenarios it's end-user visible and we want it short.
	md, err := os.MkdirTemp("/tmp", "metroctl-*")
	if err != nil {
		return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
	}

	// Create the socket directory. We keep it in /tmp because of socket path limits.
	sd, err := os.MkdirTemp("/tmp", "cluster-*")
	if err != nil {
		return nil, fmt.Errorf("failed to create the socket directory: %w", err)
	}

	// Set up TPM factory.
	tpmf, err := NewTPMFactory(filepath.Join(ld, "tpm"))
	if err != nil {
		return nil, fmt.Errorf("failed to create TPM factory: %w", err)
	}

	// Prepare links between nodes and nanoswitch.
	var switchPorts []*os.File
	var vmPorts []*os.File
	for i := 0; i < opts.NumNodes; i++ {
		switchPort, vmPort, err := launch.NewSocketPair()
		if err != nil {
			return nil, fmt.Errorf("failed to get socketpair: %w", err)
		}
		switchPorts = append(switchPorts, switchPort)
		vmPorts = append(vmPorts, vmPort)
	}

	// Make a list of channels that will be populated by all running node qemu
	// processes.
	done := make([]chan error, opts.NumNodes)
	for i := range done {
		done[i] = make(chan error, 1)
	}

	// Prepare the node options. These will be kept as part of Cluster.
	// nodeOpts[].Runtime will be initialized by LaunchNode during the first
	// launch. The runtime information can be later used to restart a node.
	// The 0th node will be initialized first. The rest will follow after it
	// had bootstrapped the cluster.
	nodeOpts := make([]NodeOptions, opts.NumNodes)
	nodeOpts[0] = NodeOptions{
		Name:            "node0",
		ConnectToSocket: vmPorts[0],
		NodeParameters: &apb.NodeParameters{
			Cluster: &apb.NodeParameters_ClusterBootstrap_{
				ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
					OwnerPublicKey:              InsecurePublicKey,
					InitialClusterConfiguration: opts.InitialClusterConfiguration,
					Labels: &cpb.NodeLabels{
						Pairs: []*cpb.NodeLabels_Pair{
							{Key: nodeNumberKey, Value: "0"},
						},
					},
				},
			},
		},
		SerialPort: newPrefixedStdio(0),
		PcapDump:   true,
	}
	if opts.NodeLogsToFiles {
		path := path.Join(ld, "node-1.txt")
		port, err := NewSerialFileLogger(path)
		if err != nil {
			return nil, fmt.Errorf("could not open log file for node 1: %w", err)
		}
		launch.Log("Node 1 logs at %s", path)
		nodeOpts[0].SerialPort = port
	}

	// Start the first node.
	ctxT, ctxC := context.WithCancel(ctx)
	launch.Log("Cluster: Starting node %d...", 1)
	if err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[0], done[0]); err != nil {
		ctxC()
		return nil, fmt.Errorf("failed to launch first node: %w", err)
	}

	localRegistryAddr := net.TCPAddr{
		IP:   net.IPv4(10, 42, 0, 82),
		Port: 5000,
	}

	var guestSvcMap launch.GuestServiceMap
	if opts.LocalRegistry != nil {
		l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
		if err != nil {
			ctxC()
			return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
		}
		s := http.Server{
			Handler: opts.LocalRegistry,
		}
		go s.Serve(l)
		go func() {
			<-ctxT.Done()
			s.Close()
		}()
		guestSvcMap = launch.GuestServiceMap{
			&localRegistryAddr: *l.Addr().(*net.TCPAddr),
		}
	}

	// Launch nanoswitch.
	portMap, err := launch.ConflictFreePortMap(ClusterPorts)
	if err != nil {
		ctxC()
		return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
	}

	go func() {
		var serialPort io.ReadWriter
		if opts.NodeLogsToFiles {
			path := path.Join(ld, "nanoswitch.txt")
			serialPort, err = NewSerialFileLogger(path)
			if err != nil {
				launch.Log("Could not open log file for nanoswitch: %v", err)
			}
			launch.Log("Nanoswitch logs at %s", path)
		} else {
			serialPort = newPrefixedStdio(99)
		}
		kernelPath, err := runfiles.Rlocation("_main/metropolis/test/ktest/vmlinux")
		if err != nil {
			launch.Fatal("Failed to resolved nanoswitch kernel: %v", err)
		}
		initramfsPath, err := runfiles.Rlocation("_main/metropolis/test/nanoswitch/initramfs.cpio.zst")
		if err != nil {
			launch.Fatal("Failed to resolved nanoswitch initramfs: %v", err)
		}
		if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
			Name:                   "nanoswitch",
			KernelPath:             kernelPath,
			InitramfsPath:          initramfsPath,
			ExtraNetworkInterfaces: switchPorts,
			PortMap:                portMap,
			GuestServiceMap:        guestSvcMap,
			SerialPort:             serialPort,
			PcapDump:               path.Join(ld, "nanoswitch.pcap"),
		}); err != nil {
			if !errors.Is(err, ctxT.Err()) {
				launch.Fatal("Failed to launch nanoswitch: %v", err)
			}
		}
	}()

	// Build SOCKS dialer.
	socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
	socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
	if err != nil {
		ctxC()
		return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
	}

	// Retrieve owner credentials and first node.
	cert, firstNode, err := firstConnection(ctxT, socksDialer)
	if err != nil {
		ctxC()
		return nil, err
	}

	// Write credentials to the metroctl directory.
	if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
		ctxC()
		return nil, fmt.Errorf("could not write owner key: %w", err)
	}
	if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
		ctxC()
		return nil, fmt.Errorf("could not write owner certificate: %w", err)
	}

	launch.Log("Cluster: Node %d is %s", 0, firstNode.ID)

	// Set up a partially initialized cluster instance, to be filled in the
	// later steps.
	cluster := &Cluster{
		Owner: *cert,
		Ports: portMap,
		Nodes: map[string]*NodeInCluster{
			firstNode.ID: firstNode,
		},
		NodeIDs: []string{
			firstNode.ID,
		},

		nodesDone:   done,
		nodeOpts:    nodeOpts,
		launchDir:   ld,
		socketDir:   sd,
		metroctlDir: md,

		SOCKSDialer: socksDialer,

		ctxT: ctxT,
		ctxC: ctxC,

		tpmFactory: tpmf,
	}

	// Now start the rest of the nodes and register them into the cluster.

	// Get an authenticated owner client within the cluster.
	curC, err := cluster.CuratorClient()
	if err != nil {
		ctxC()
		return nil, fmt.Errorf("CuratorClient: %w", err)
	}
	mgmt := apb.NewManagementClient(curC)

	// Retrieve register ticket to register further nodes.
	launch.Log("Cluster: retrieving register ticket...")
	resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
	if err != nil {
		ctxC()
		return nil, fmt.Errorf("GetRegisterTicket: %w", err)
	}
	ticket := resT.Ticket
	launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))

	// Retrieve cluster info (for directory and ca public key) to register further
	// nodes.
	resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
	if err != nil {
		ctxC()
		return nil, fmt.Errorf("GetClusterInfo: %w", err)
	}
	caCert, err := x509.ParseCertificate(resI.CaCertificate)
	if err != nil {
		ctxC()
		return nil, fmt.Errorf("ParseCertificate: %w", err)
	}
	cluster.CACertificate = caCert

	// Use the retrieved information to configure the rest of the node options.
	for i := 1; i < opts.NumNodes; i++ {
		nodeOpts[i] = NodeOptions{
			Name:            fmt.Sprintf("node%d", i),
			ConnectToSocket: vmPorts[i],
			NodeParameters: &apb.NodeParameters{
				Cluster: &apb.NodeParameters_ClusterRegister_{
					ClusterRegister: &apb.NodeParameters_ClusterRegister{
						RegisterTicket:   ticket,
						ClusterDirectory: resI.ClusterDirectory,
						CaCertificate:    resI.CaCertificate,
						Labels: &cpb.NodeLabels{
							Pairs: []*cpb.NodeLabels_Pair{
								{Key: nodeNumberKey, Value: fmt.Sprintf("%d", i)},
							},
						},
					},
				},
			},
			SerialPort: newPrefixedStdio(i),
		}
		if opts.NodeLogsToFiles {
			path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
			port, err := NewSerialFileLogger(path)
			if err != nil {
				return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
			}
			launch.Log("Node %d logs at %s", i+1, path)
			nodeOpts[i].SerialPort = port
		}
	}

	// Now run the rest of the nodes.
	for i := 1; i < opts.NumNodes; i++ {
		launch.Log("Cluster: Starting node %d...", i+1)
		err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[i], done[i])
		if err != nil {
			return nil, fmt.Errorf("failed to launch node %d: %w", i+1, err)
		}
	}

	// Wait for nodes to appear as NEW, populate a map from node number (index into
	// NodeOpts, etc.) to Metropolis Node ID.
	seenNodes := make(map[string]bool)
	nodeNumberToID := make(map[int]string)
	launch.Log("Cluster: waiting for nodes to appear as NEW...")
	for i := 1; i < opts.NumNodes; i++ {
		for {
			nodes, err := getNodes(ctx, mgmt)
			if err != nil {
				ctxC()
				return nil, fmt.Errorf("could not get nodes: %w", err)
			}
			for _, n := range nodes {
				if n.State != cpb.NodeState_NODE_STATE_NEW {
					continue
				}
				if seenNodes[n.Id] {
					continue
				}
				seenNodes[n.Id] = true
				cluster.Nodes[n.Id] = &NodeInCluster{
					ID:     n.Id,
					Pubkey: n.Pubkey,
				}

				num, err := strconv.Atoi(node.GetNodeLabel(n.Labels, nodeNumberKey))
				if err != nil {
					return nil, fmt.Errorf("node %s has undecodable number label: %w", n.Id, err)
				}
				launch.Log("Cluster: Node %d is %s", num, n.Id)
				nodeNumberToID[num] = n.Id
			}

			if len(seenNodes) == opts.NumNodes-1 {
				break
			}
			time.Sleep(1 * time.Second)
		}
	}
	launch.Log("Found all expected nodes")

	// Build the rest of NodeIDs from map.
	for i := 1; i < opts.NumNodes; i++ {
		cluster.NodeIDs = append(cluster.NodeIDs, nodeNumberToID[i])
	}

	approvedNodes := make(map[string]bool)
	upNodes := make(map[string]bool)
	if !opts.LeaveNodesNew {
		for {
			nodes, err := getNodes(ctx, mgmt)
			if err != nil {
				ctxC()
				return nil, fmt.Errorf("could not get nodes: %w", err)
			}
			for _, node := range nodes {
				if !seenNodes[node.Id] {
					// Skip nodes that weren't NEW in the previous step.
					continue
				}

				if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
					launch.Log("Cluster: node %s is up", node.Id)
					upNodes[node.Id] = true
					cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
				}
				if upNodes[node.Id] {
					continue
				}

				if !approvedNodes[node.Id] {
					launch.Log("Cluster: approving node %s", node.Id)
					_, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
						Pubkey: node.Pubkey,
					})
					if err != nil {
						ctxC()
						return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
					}
					approvedNodes[node.Id] = true
				}
			}

			launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
			if len(upNodes) == opts.NumNodes-1 {
				break
			}
			time.Sleep(time.Second)
		}
	}

	launch.Log("Cluster: all nodes up:")
	for _, node := range cluster.Nodes {
		launch.Log("Cluster:  - %s at %s", node.ID, node.ManagementAddress)
	}
	launch.Log("Cluster: starting tests...")

	return cluster, nil
}

// RebootNode reboots the cluster member node matching the given index, and
// waits for it to rejoin the cluster. It will use the given context ctx to run
// cluster API requests, whereas the resulting QEMU process will be created
// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
	if idx < 0 || idx >= len(c.NodeIDs) {
		return fmt.Errorf("index out of bounds")
	}
	if c.nodeOpts[idx].Runtime == nil {
		return fmt.Errorf("node not running")
	}
	id := c.NodeIDs[idx]

	// Get an authenticated owner client within the cluster.
	curC, err := c.CuratorClient()
	if err != nil {
		return err
	}
	mgmt := apb.NewManagementClient(curC)

	// Cancel the node's context. This will shut down QEMU.
	c.nodeOpts[idx].Runtime.CtxC()
	launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
	err = <-c.nodesDone[idx]
	if err != nil {
		return fmt.Errorf("while restarting node: %w", err)
	}

	// Start QEMU again.
	launch.Log("Cluster: restarting node %d (%s).", idx, id)
	if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
		return fmt.Errorf("failed to launch node %d: %w", idx, err)
	}

	start := time.Now()

	// Poll Management.GetNodes until the node is healthy.
	for {
		cs, err := getNode(ctx, mgmt, id)
		if err != nil {
			launch.Log("Cluster: node get error: %v", err)
			return err
		}
		launch.Log("Cluster: node health: %+v", cs.Health)

		lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
		if lhb.After(start) && cs.Health == apb.Node_HEALTHY {
			break
		}
		time.Sleep(time.Second)
	}
	launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
	return nil
}

// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
// given by idx. If the node is already shut down, this is a no-op.
func (c *Cluster) ShutdownNode(idx int) error {
	if idx < 0 || idx >= len(c.NodeIDs) {
		return fmt.Errorf("index out of bounds")
	}
	// Return if node is already stopped.
	select {
	case <-c.nodeOpts[idx].Runtime.ctxT.Done():
		return nil
	default:
	}
	id := c.NodeIDs[idx]

	// Cancel the node's context. This will shut down QEMU.
	c.nodeOpts[idx].Runtime.CtxC()
	launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
	err := <-c.nodesDone[idx]
	if err != nil {
		return fmt.Errorf("while shutting down node: %w", err)
	}
	launch.Log("Cluster: node %d (%s) stopped.", idx, id)
	return nil
}

// StartNode performs a power on of the node given by idx. If the node is already
// running, this is a no-op.
func (c *Cluster) StartNode(idx int) error {
	if idx < 0 || idx >= len(c.NodeIDs) {
		return fmt.Errorf("index out of bounds")
	}
	id := c.NodeIDs[idx]
	// Return if node is already running.
	select {
	case <-c.nodeOpts[idx].Runtime.ctxT.Done():
	default:
		return nil
	}

	// Start QEMU again.
	launch.Log("Cluster: starting node %d (%s).", idx, id)
	if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
		return fmt.Errorf("failed to launch node %d: %w", idx, err)
	}
	launch.Log("Cluster: node %d (%s) started.", idx, id)
	return nil
}

// Close cancels the running clusters' context and waits for all virtualized
// nodes to stop. It returns an error if stopping the nodes failed, or one of
// the nodes failed to fully start in the first place.
func (c *Cluster) Close() error {
	launch.Log("Cluster: stopping...")
	if c.authClient != nil {
		c.authClient.Close()
	}
	c.ctxC()

	var errs []error
	launch.Log("Cluster: waiting for nodes to exit...")
	for _, c := range c.nodesDone {
		err := <-c
		if err != nil {
			errs = append(errs, err)
		}
	}
	launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
	os.RemoveAll(c.launchDir)
	os.RemoveAll(c.socketDir)
	os.RemoveAll(c.metroctlDir)
	launch.Log("Cluster: done")
	return multierr.Combine(errs...)
}

// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
// their ID. This is performed by connecting to the cluster nanoswitch via its
// SOCKS proxy, and using the cluster node list for name resolution.
//
// For example:
//
//	grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
	host, port, err := net.SplitHostPort(addr)
	if err != nil {
		return nil, fmt.Errorf("invalid host:port: %w", err)
	}
	// Already an IP address?
	if net.ParseIP(host) != nil {
		return c.SOCKSDialer.Dial("tcp", addr)
	}

	// Otherwise, expect a node name.
	node, ok := c.Nodes[host]
	if !ok {
		return nil, fmt.Errorf("unknown node %q", host)
	}
	addr = net.JoinHostPort(node.ManagementAddress, port)
	return c.SOCKSDialer.Dial("tcp", addr)
}

// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
// Kubernetes authenticating proxy using the cluster owner identity.
// It currently has access to everything (i.e. the cluster-admin role)
// via the owner-admin binding.
func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
	pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
	if err != nil {
		// We explicitly pass an Ed25519 private key in, so this can't happen
		panic(err)
	}

	host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
	clientConfig := rest.Config{
		Host: host,
		TLSClientConfig: rest.TLSClientConfig{
			// TODO(q3k): use CA certificate
			Insecure:   true,
			ServerName: "kubernetes.default.svc",
			CertData:   pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
			KeyData:    pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
		},
		Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
			return c.DialNode(ctx, address)
		},
	}
	return kubernetes.NewForConfig(&clientConfig)
}

// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
// which are currently Kubernetes controllers, ie. run an apiserver. This list
// might be empty if no node is currently configured with the
// 'KubernetesController' node.
func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
	curC, err := c.CuratorClient()
	if err != nil {
		return nil, err
	}
	mgmt := apb.NewManagementClient(curC)
	srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
		Filter: "has(node.roles.kubernetes_controller)",
	})
	if err != nil {
		return nil, err
	}
	defer srv.CloseSend()
	var res []string
	for {
		n, err := srv.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return nil, err
		}
		if n.Status == nil || n.Status.ExternalAddress == "" {
			continue
		}
		res = append(res, n.Status.ExternalAddress)
	}
	return res, nil
}

// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
// healthy.
func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
	// Get an authenticated owner client within the cluster.
	curC, err := c.CuratorClient()
	if err != nil {
		return err
	}
	mgmt := apb.NewManagementClient(curC)
	nodes, err := getNodes(ctx, mgmt)
	if err != nil {
		return err
	}

	var unhealthy []string
	for _, node := range nodes {
		if node.Health == apb.Node_HEALTHY {
			continue
		}
		unhealthy = append(unhealthy, node.Id)
	}
	if len(unhealthy) == 0 {
		return nil
	}
	return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
}

// ApproveNode approves a node by ID, waiting for it to become UP.
func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
	curC, err := c.CuratorClient()
	if err != nil {
		return err
	}
	mgmt := apb.NewManagementClient(curC)

	_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
		Pubkey: c.Nodes[id].Pubkey,
	})
	if err != nil {
		return fmt.Errorf("ApproveNode: %w", err)
	}
	launch.Log("Cluster: %s: approved, waiting for UP", id)
	for {
		nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
		if err != nil {
			return fmt.Errorf("GetNodes: %w", err)
		}
		found := false
		for {
			node, err := nodes.Recv()
			if errors.Is(err, io.EOF) {
				break
			}
			if err != nil {
				return fmt.Errorf("Nodes.Recv: %w", err)
			}
			if node.Id != id {
				continue
			}
			if node.State != cpb.NodeState_NODE_STATE_UP {
				continue
			}
			found = true
			break
		}
		nodes.CloseSend()

		if found {
			break
		}
		time.Sleep(time.Second)
	}
	launch.Log("Cluster: %s: UP", id)
	return nil
}

// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
	curC, err := c.CuratorClient()
	if err != nil {
		return err
	}
	mgmt := apb.NewManagementClient(curC)

	tr := true
	launch.Log("Cluster: %s: adding KubernetesWorker", id)
	_, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
		Node: &apb.UpdateNodeRolesRequest_Id{
			Id: id,
		},
		KubernetesWorker: &tr,
	})
	return err
}

// MakeConsensusMember adds the ConsensusMember role to a node by ID.
func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
	curC, err := c.CuratorClient()
	if err != nil {
		return err
	}
	mgmt := apb.NewManagementClient(curC)
	cur := ipb.NewCuratorClient(curC)

	tr := true
	launch.Log("Cluster: %s: adding ConsensusMember", id)
	bo := backoff.NewExponentialBackOff()
	bo.MaxElapsedTime = 10 * time.Second

	backoff.Retry(func() error {
		_, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
			Node: &apb.UpdateNodeRolesRequest_Id{
				Id: id,
			},
			ConsensusMember: &tr,
		})
		if err != nil {
			launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
		}
		return err
	}, backoff.WithContext(bo, ctx))
	if err != nil {
		return err
	}

	launch.Log("Cluster: %s: waiting for learner/full members...", id)

	learner := false
	for {
		res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
		if err != nil {
			return fmt.Errorf("GetConsensusStatus: %w", err)
		}
		for _, member := range res.EtcdMember {
			if member.Id != id {
				continue
			}
			switch member.Status {
			case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
				if !learner {
					learner = true
					launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
				}
			case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
				launch.Log("Cluster: %s: became a full member", id)
				return nil
			}
		}
		time.Sleep(100 * time.Millisecond)
	}
}
