|  | // cluster builds on the launch package and implements launching Metropolis | 
|  | // nodes and clusters in a virtualized environment using qemu. It's kept in a | 
|  | // separate package as it depends on a Metropolis node image, which might not be | 
|  | // required for some use of the launch library. | 
|  | package cluster | 
|  |  | 
|  | import ( | 
|  | "bytes" | 
|  | "context" | 
|  | "crypto/rand" | 
|  | "crypto/tls" | 
|  | "errors" | 
|  | "fmt" | 
|  | "io" | 
|  | "log" | 
|  | "net" | 
|  | "os" | 
|  | "os/exec" | 
|  | "path/filepath" | 
|  | "syscall" | 
|  | "time" | 
|  |  | 
|  | "github.com/cenkalti/backoff/v4" | 
|  | "go.uber.org/multierr" | 
|  | "golang.org/x/net/proxy" | 
|  | "google.golang.org/grpc" | 
|  | "google.golang.org/grpc/codes" | 
|  | "google.golang.org/grpc/status" | 
|  | "google.golang.org/protobuf/proto" | 
|  |  | 
|  | "source.monogon.dev/metropolis/cli/pkg/datafile" | 
|  | "source.monogon.dev/metropolis/node" | 
|  | common "source.monogon.dev/metropolis/node" | 
|  | "source.monogon.dev/metropolis/node/core/identity" | 
|  | "source.monogon.dev/metropolis/node/core/rpc" | 
|  | apb "source.monogon.dev/metropolis/proto/api" | 
|  | cpb "source.monogon.dev/metropolis/proto/common" | 
|  | "source.monogon.dev/metropolis/test/launch" | 
|  | ) | 
|  |  | 
|  | // Options contains all options that can be passed to Launch() | 
|  | type NodeOptions struct { | 
|  | // Ports contains the port mapping where to expose the internal ports of the VM to | 
|  | // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when | 
|  | // ConnectToSocket is set. | 
|  | Ports launch.PortMap | 
|  |  | 
|  | // If set to true, reboots are honored. Otherwise all reboots exit the Launch() | 
|  | // command. Metropolis nodes generally restarts on almost all errors, so unless you | 
|  | // want to test reboot behavior this should be false. | 
|  | AllowReboot bool | 
|  |  | 
|  | // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set, | 
|  | // it is instead connected to the given file descriptor/socket. If this is set, all | 
|  | // port maps from the Ports option are ignored. Intended for networking this | 
|  | // instance together with others for running  more complex network configurations. | 
|  | ConnectToSocket *os.File | 
|  |  | 
|  | // SerialPort is a io.ReadWriter over which you can communicate with the serial | 
|  | // port of the machine It can be set to an existing file descriptor (like | 
|  | // os.Stdout/os.Stderr) or any Go structure implementing this interface. | 
|  | SerialPort io.ReadWriter | 
|  |  | 
|  | // NodeParameters is passed into the VM and subsequently used for bootstrapping or | 
|  | // registering into a cluster. | 
|  | NodeParameters *apb.NodeParameters | 
|  |  | 
|  | // Mac is the node's MAC address. | 
|  | Mac *net.HardwareAddr | 
|  |  | 
|  | // Runtime keeps the node's QEMU runtime state. | 
|  | Runtime *NodeRuntime | 
|  | } | 
|  |  | 
|  | // Runtime keeps the node's QEMU runtime options. | 
|  | type NodeRuntime struct { | 
|  | // ld points at the node's launch directory storing data such as storage | 
|  | // images, firmware variables or the TPM state. | 
|  | ld string | 
|  | // sd points at the node's socket directory. | 
|  | sd string | 
|  |  | 
|  | // ctxT is the context QEMU will execute in. | 
|  | ctxT context.Context | 
|  | // CtxC is the QEMU context's cancellation function. | 
|  | CtxC context.CancelFunc | 
|  | } | 
|  |  | 
|  | // NodePorts is the list of ports a fully operational Metropolis node listens on | 
|  | var NodePorts = []node.Port{ | 
|  | node.ConsensusPort, | 
|  |  | 
|  | node.CuratorServicePort, | 
|  | node.DebugServicePort, | 
|  |  | 
|  | node.KubernetesAPIPort, | 
|  | node.KubernetesAPIWrappedPort, | 
|  | node.CuratorServicePort, | 
|  | node.DebuggerPort, | 
|  | } | 
|  |  | 
|  | // setupRuntime creates the node's QEMU runtime directory, together with all | 
|  | // files required to preserve its state, a level below the chosen path ld. The | 
|  | // node's socket directory is similarily created a level below sd. It may | 
|  | // return an I/O error. | 
|  | func setupRuntime(ld, sd string) (*NodeRuntime, error) { | 
|  | // Create a temporary directory to keep all the runtime files. | 
|  | stdp, err := os.MkdirTemp(ld, "node_state*") | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("failed to create the state directory: %w", err) | 
|  | } | 
|  |  | 
|  | // Initialize the node's storage with a prebuilt image. | 
|  | si, err := datafile.ResolveRunfile("metropolis/node/node.img") | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("while resolving a path: %w", err) | 
|  | } | 
|  | di := filepath.Join(stdp, filepath.Base(si)) | 
|  | log.Printf("Cluster: copying the node image: %s -> %s", si, di) | 
|  | if err := copyFile(si, di); err != nil { | 
|  | return nil, fmt.Errorf("while copying the node image: %w", err) | 
|  | } | 
|  |  | 
|  | // Initialize the OVMF firmware variables file. | 
|  | sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd") | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("while resolving a path: %w", err) | 
|  | } | 
|  | dv := filepath.Join(stdp, filepath.Base(sv)) | 
|  | if err := copyFile(sv, dv); err != nil { | 
|  | return nil, fmt.Errorf("while copying firmware variables: %w", err) | 
|  | } | 
|  |  | 
|  | // Create the TPM state directory and initialize all files required by swtpm. | 
|  | tpmt := filepath.Join(stdp, "tpm") | 
|  | if err := os.Mkdir(tpmt, 0755); err != nil { | 
|  | return nil, fmt.Errorf("while creating the TPM directory: %w", err) | 
|  | } | 
|  | tpms, err := datafile.ResolveRunfile("metropolis/node/tpm") | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("while resolving a path: %w", err) | 
|  | } | 
|  | tpmf, err := os.ReadDir(tpms) | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("failed to read TPM directory: %w", err) | 
|  | } | 
|  | for _, file := range tpmf { | 
|  | name := file.Name() | 
|  | src, err := datafile.ResolveRunfile(filepath.Join(tpms, name)) | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("while resolving a path: %w", err) | 
|  | } | 
|  | tgt := filepath.Join(tpmt, name) | 
|  | if err := copyFile(src, tgt); err != nil { | 
|  | return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err) | 
|  | } | 
|  | } | 
|  |  | 
|  | // Create the socket directory. | 
|  | sotdp, err := os.MkdirTemp(sd, "node_sock*") | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("failed to create the socket directory: %w", err) | 
|  | } | 
|  |  | 
|  | return &NodeRuntime{ | 
|  | ld: stdp, | 
|  | sd: sotdp, | 
|  | }, nil | 
|  | } | 
|  |  | 
|  | // curatorClient returns an authenticated owner connection to a Curator | 
|  | // instance within Cluster c, or nil together with an error. | 
|  | func (c *Cluster) curatorClient() (*grpc.ClientConn, error) { | 
|  | if c.authClient == nil { | 
|  | authCreds := rpc.NewAuthenticatedCredentials(c.Owner, nil) | 
|  | remote := net.JoinHostPort(c.NodeIDs[0], common.CuratorServicePort.PortString()) | 
|  | authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds), grpc.WithContextDialer(c.DialNode)) | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("dialing with owner credentials failed: %w", err) | 
|  | } | 
|  | c.authClient = authClient | 
|  | } | 
|  | return c.authClient, nil | 
|  | } | 
|  |  | 
|  | // LaunchNode launches a single Metropolis node instance with the given options. | 
|  | // The instance runs mostly paravirtualized but with some emulated hardware | 
|  | // similar to how a cloud provider might set up its VMs. The disk is fully | 
|  | // writable, and the changes are kept across reboots and shutdowns. ld and sd | 
|  | // point to the launch directory and the socket directory, holding the nodes' | 
|  | // state files (storage, tpm state, firmware state), and UNIX socket files | 
|  | // (swtpm <-> QEMU interplay) respectively. The directories must exist before | 
|  | // LaunchNode is called. LaunchNode will update options.Runtime and options.Mac | 
|  | // if either are not initialized. | 
|  | func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error { | 
|  | // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead | 
|  | // of /tmp (requires QEMU version >5.0). | 
|  | // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9). | 
|  | // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD | 
|  | // that we open and pass the name of it to QEMU. Not pinning this crashes both | 
|  | // swtpm and qemu because we run into UNIX socket length limitations (for legacy | 
|  | // reasons 108 chars). | 
|  |  | 
|  | // If it's the node's first start, set up its runtime directories. | 
|  | if options.Runtime == nil { | 
|  | r, err := setupRuntime(ld, sd) | 
|  | if err != nil { | 
|  | return fmt.Errorf("while setting up node runtime: %w", err) | 
|  | } | 
|  | options.Runtime = r | 
|  | } | 
|  |  | 
|  | // Replace the node's context with a new one. | 
|  | r := options.Runtime | 
|  | if r.CtxC != nil { | 
|  | r.CtxC() | 
|  | } | 
|  | r.ctxT, r.CtxC = context.WithCancel(ctx) | 
|  |  | 
|  | var qemuNetType string | 
|  | var qemuNetConfig launch.QemuValue | 
|  | if options.ConnectToSocket != nil { | 
|  | qemuNetType = "socket" | 
|  | qemuNetConfig = launch.QemuValue{ | 
|  | "id": {"net0"}, | 
|  | "fd": {"3"}, | 
|  | } | 
|  | } else { | 
|  | qemuNetType = "user" | 
|  | qemuNetConfig = launch.QemuValue{ | 
|  | "id":        {"net0"}, | 
|  | "net":       {"10.42.0.0/24"}, | 
|  | "dhcpstart": {"10.42.0.10"}, | 
|  | "hostfwd":   options.Ports.ToQemuForwards(), | 
|  | } | 
|  | } | 
|  |  | 
|  | // Generate the node's MAC address if it isn't already set in NodeOptions. | 
|  | if options.Mac == nil { | 
|  | mac, err := generateRandomEthernetMAC() | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | options.Mac = mac | 
|  | } | 
|  |  | 
|  | tpmSocketPath := filepath.Join(r.sd, "tpm-socket") | 
|  | fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd") | 
|  | storagePath := filepath.Join(r.ld, "node.img") | 
|  | qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096", | 
|  | "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4", | 
|  | "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd", | 
|  | "-drive", "if=pflash,format=raw,file=" + fwVarPath, | 
|  | "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath, | 
|  | "-netdev", qemuNetConfig.ToOption(qemuNetType), | 
|  | "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(), | 
|  | "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath, | 
|  | "-tpmdev", "emulator,id=tpm0,chardev=chrtpm", | 
|  | "-device", "tpm-tis,tpmdev=tpm0", | 
|  | "-device", "virtio-rng-pci", | 
|  | "-serial", "stdio"} | 
|  |  | 
|  | if !options.AllowReboot { | 
|  | qemuArgs = append(qemuArgs, "-no-reboot") | 
|  | } | 
|  |  | 
|  | if options.NodeParameters != nil { | 
|  | parametersPath := filepath.Join(r.ld, "parameters.pb") | 
|  | parametersRaw, err := proto.Marshal(options.NodeParameters) | 
|  | if err != nil { | 
|  | return fmt.Errorf("failed to encode node paraeters: %w", err) | 
|  | } | 
|  | if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil { | 
|  | return fmt.Errorf("failed to write node parameters: %w", err) | 
|  | } | 
|  | qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath) | 
|  | } | 
|  |  | 
|  | // Start TPM emulator as a subprocess | 
|  | tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT) | 
|  | defer tpmCancel() | 
|  |  | 
|  | tpmd := filepath.Join(r.ld, "tpm") | 
|  | tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath) | 
|  | tpmEmuCmd.Stderr = os.Stderr | 
|  | tpmEmuCmd.Stdout = os.Stdout | 
|  |  | 
|  | err := tpmEmuCmd.Start() | 
|  | if err != nil { | 
|  | return fmt.Errorf("failed to start TPM emulator: %w", err) | 
|  | } | 
|  |  | 
|  | // Wait for the socket to be created by the TPM emulator before launching | 
|  | // QEMU. | 
|  | for { | 
|  | _, err := os.Stat(tpmSocketPath) | 
|  | if err == nil { | 
|  | break | 
|  | } | 
|  | if err != nil && !os.IsNotExist(err) { | 
|  | return fmt.Errorf("while stat-ing TPM socket path: %w", err) | 
|  | } | 
|  | if err := tpmCtx.Err(); err != nil { | 
|  | return fmt.Errorf("while waiting for the TPM socket: %w", err) | 
|  | } | 
|  | time.Sleep(time.Millisecond * 100) | 
|  | } | 
|  |  | 
|  | // Start the main qemu binary | 
|  | systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...) | 
|  | if options.ConnectToSocket != nil { | 
|  | systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket} | 
|  | } | 
|  |  | 
|  | var stdErrBuf bytes.Buffer | 
|  | systemCmd.Stderr = &stdErrBuf | 
|  | systemCmd.Stdout = options.SerialPort | 
|  |  | 
|  | err = systemCmd.Run() | 
|  |  | 
|  | // Stop TPM emulator and wait for it to exit to properly reap the child process | 
|  | tpmCancel() | 
|  | log.Print("Node: Waiting for TPM emulator to exit") | 
|  | // Wait returns a SIGKILL error because we just cancelled its context. | 
|  | // We still need to call it to avoid creating zombies. | 
|  | _ = tpmEmuCmd.Wait() | 
|  | log.Print("Node: TPM emulator done") | 
|  |  | 
|  | var exerr *exec.ExitError | 
|  | if err != nil && errors.As(err, &exerr) { | 
|  | status := exerr.ProcessState.Sys().(syscall.WaitStatus) | 
|  | if status.Signaled() && status.Signal() == syscall.SIGKILL { | 
|  | // Process was killed externally (most likely by our context being canceled). | 
|  | // This is a normal exit for us, so return nil | 
|  | return nil | 
|  | } | 
|  | exerr.Stderr = stdErrBuf.Bytes() | 
|  | newErr := launch.QEMUError(*exerr) | 
|  | return &newErr | 
|  | } | 
|  | return err | 
|  | } | 
|  |  | 
|  | func copyFile(src, dst string) error { | 
|  | in, err := os.Open(src) | 
|  | if err != nil { | 
|  | return fmt.Errorf("when opening source: %w", err) | 
|  | } | 
|  | defer in.Close() | 
|  |  | 
|  | out, err := os.Create(dst) | 
|  | if err != nil { | 
|  | return fmt.Errorf("when creating destination: %w", err) | 
|  | } | 
|  | defer out.Close() | 
|  |  | 
|  | _, err = io.Copy(out, in) | 
|  | if err != nil { | 
|  | return fmt.Errorf("when copying file: %w", err) | 
|  | } | 
|  | return out.Close() | 
|  | } | 
|  |  | 
|  | // getNodes wraps around Management.GetNodes to return a list of nodes in a | 
|  | // cluster. | 
|  | func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) { | 
|  | var res []*apb.Node | 
|  | bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) | 
|  | err := backoff.Retry(func() error { | 
|  | res = nil | 
|  | srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{}) | 
|  | if err != nil { | 
|  | return fmt.Errorf("GetNodes: %w", err) | 
|  | } | 
|  | for { | 
|  | node, err := srvN.Recv() | 
|  | if err == io.EOF { | 
|  | break | 
|  | } | 
|  | if err != nil { | 
|  | return fmt.Errorf("GetNodes.Recv: %w", err) | 
|  | } | 
|  | res = append(res, node) | 
|  | } | 
|  | return nil | 
|  | }, bo) | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  | return res, nil | 
|  | } | 
|  |  | 
|  | // getNode wraps Management.GetNodes. It returns node information matching | 
|  | // given node ID. | 
|  | func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) { | 
|  | nodes, err := getNodes(ctx, mgmt) | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("could not get nodes: %w", err) | 
|  | } | 
|  | for _, n := range nodes { | 
|  | eid := identity.NodeID(n.Pubkey) | 
|  | if eid != id { | 
|  | continue | 
|  | } | 
|  | return n, nil | 
|  | } | 
|  | return nil, fmt.Errorf("no such node.") | 
|  | } | 
|  |  | 
|  | // Gets a random EUI-48 Ethernet MAC address | 
|  | func generateRandomEthernetMAC() (*net.HardwareAddr, error) { | 
|  | macBuf := make([]byte, 6) | 
|  | _, err := rand.Read(macBuf) | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("failed to read randomness for MAC: %v", err) | 
|  | } | 
|  |  | 
|  | // Set U/L bit and clear I/G bit (locally administered individual MAC) | 
|  | // Ref IEEE 802-2014 Section 8.2.2 | 
|  | macBuf[0] = (macBuf[0] | 2) & 0xfe | 
|  | mac := net.HardwareAddr(macBuf) | 
|  | return &mac, nil | 
|  | } | 
|  |  | 
|  | const SOCKSPort uint16 = 1080 | 
|  |  | 
|  | // ClusterPorts contains all ports handled by Nanoswitch. | 
|  | var ClusterPorts = []uint16{ | 
|  | // Forwarded to the first node. | 
|  | uint16(node.CuratorServicePort), | 
|  | uint16(node.DebugServicePort), | 
|  | uint16(node.KubernetesAPIPort), | 
|  | uint16(node.KubernetesAPIWrappedPort), | 
|  |  | 
|  | // SOCKS proxy to the switch network | 
|  | SOCKSPort, | 
|  | } | 
|  |  | 
|  | // ClusterOptions contains all options for launching a Metropolis cluster. | 
|  | type ClusterOptions struct { | 
|  | // The number of nodes this cluster should be started with. | 
|  | NumNodes int | 
|  | } | 
|  |  | 
|  | // Cluster is the running Metropolis cluster launched using the LaunchCluster | 
|  | // function. | 
|  | type Cluster struct { | 
|  | // Owner is the TLS Certificate of the owner of the test cluster. This can be | 
|  | // used to authenticate further clients to the running cluster. | 
|  | Owner tls.Certificate | 
|  | // Ports is the PortMap used to access the first nodes' services (defined in | 
|  | // ClusterPorts) and the SOCKS proxy (at SOCKSPort). | 
|  | Ports launch.PortMap | 
|  |  | 
|  | // Nodes is a map from Node ID to its runtime information. | 
|  | Nodes map[string]*NodeInCluster | 
|  | // NodeIDs is a list of node IDs that are backing this cluster, in order of | 
|  | // creation. | 
|  | NodeIDs []string | 
|  |  | 
|  | // nodesDone is a list of channels populated with the return codes from all the | 
|  | // nodes' qemu instances. It's used by Close to ensure all nodes have | 
|  | // succesfully been stopped. | 
|  | nodesDone []chan error | 
|  | // nodeOpts are the cluster member nodes' mutable launch options, kept here | 
|  | // to facilitate reboots. | 
|  | nodeOpts []NodeOptions | 
|  | // launchDir points at the directory keeping the nodes' state, such as storage | 
|  | // images, firmware variable files, TPM state. | 
|  | launchDir string | 
|  | // socketDir points at the directory keeping UNIX socket files, such as these | 
|  | // used to facilitate communication between QEMU and swtpm. It's different | 
|  | // from launchDir, and anchored nearer the file system root, due to the | 
|  | // socket path length limitation imposed by the kernel. | 
|  | socketDir string | 
|  |  | 
|  | // socksDialer is used by DialNode to establish connections to nodes via the | 
|  | // SOCKS server ran by nanoswitch. | 
|  | socksDialer proxy.Dialer | 
|  |  | 
|  | // authClient is a cached authenticated owner connection to a Curator | 
|  | // instance within the cluster. | 
|  | authClient *grpc.ClientConn | 
|  |  | 
|  | // ctxT is the context individual node contexts are created from. | 
|  | ctxT context.Context | 
|  | // ctxC is used by Close to cancel the context under which the nodes are | 
|  | // running. | 
|  | ctxC context.CancelFunc | 
|  | } | 
|  |  | 
|  | // NodeInCluster represents information about a node that's part of a Cluster. | 
|  | type NodeInCluster struct { | 
|  | // ID of the node, which can be used to dial this node's services via DialNode. | 
|  | ID string | 
|  | // Address of the node on the network ran by nanoswitch. Not reachable from the | 
|  | // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable | 
|  | // on Cluster.Ports[SOCKSPort]). | 
|  | ManagementAddress string | 
|  | } | 
|  |  | 
|  | // firstConnection performs the initial owner credential escrow with a newly | 
|  | // started nanoswitch-backed cluster over SOCKS. It expects the first node to be | 
|  | // running at 10.1.0.2, which is always the case with the current nanoswitch | 
|  | // implementation. | 
|  | // | 
|  | // It returns the newly escrowed credentials as well as the firt node's | 
|  | // information as NodeInCluster. | 
|  | func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) { | 
|  | // Dial external service. | 
|  | remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString()) | 
|  | initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil) | 
|  | if err != nil { | 
|  | return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err) | 
|  | } | 
|  | initDialer := func(_ context.Context, addr string) (net.Conn, error) { | 
|  | return socksDialer.Dial("tcp", addr) | 
|  | } | 
|  | initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds)) | 
|  | if err != nil { | 
|  | return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err) | 
|  | } | 
|  | defer initClient.Close() | 
|  |  | 
|  | // Retrieve owner certificate - this can take a while because the node is still | 
|  | // coming up, so do it in a backoff loop. | 
|  | log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...") | 
|  | aaa := apb.NewAAAClient(initClient) | 
|  | var cert *tls.Certificate | 
|  | err = backoff.Retry(func() error { | 
|  | cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey) | 
|  | if st, ok := status.FromError(err); ok { | 
|  | if st.Code() == codes.Unavailable { | 
|  | return err | 
|  | } | 
|  | } | 
|  | return backoff.Permanent(err) | 
|  | }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)) | 
|  | if err != nil { | 
|  | return nil, nil, err | 
|  | } | 
|  | log.Printf("Cluster: retrieved owner certificate.") | 
|  |  | 
|  | // Now connect authenticated and get the node ID. | 
|  | creds := rpc.NewAuthenticatedCredentials(*cert, nil) | 
|  | authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds)) | 
|  | if err != nil { | 
|  | return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err) | 
|  | } | 
|  | defer authClient.Close() | 
|  | mgmt := apb.NewManagementClient(authClient) | 
|  |  | 
|  | var node *NodeInCluster | 
|  | err = backoff.Retry(func() error { | 
|  | nodes, err := getNodes(ctx, mgmt) | 
|  | if err != nil { | 
|  | return fmt.Errorf("retrieving nodes failed: %w", err) | 
|  | } | 
|  | if len(nodes) != 1 { | 
|  | return fmt.Errorf("expected one node, got %d", len(nodes)) | 
|  | } | 
|  | n := nodes[0] | 
|  | if n.Status == nil || n.Status.ExternalAddress == "" { | 
|  | return fmt.Errorf("node has no status and/or address") | 
|  | } | 
|  | node = &NodeInCluster{ | 
|  | ID:                identity.NodeID(n.Pubkey), | 
|  | ManagementAddress: n.Status.ExternalAddress, | 
|  | } | 
|  | return nil | 
|  | }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)) | 
|  | if err != nil { | 
|  | return nil, nil, err | 
|  | } | 
|  |  | 
|  | return cert, node, nil | 
|  | } | 
|  |  | 
|  | // LaunchCluster launches a cluster of Metropolis node VMs together with a | 
|  | // Nanoswitch instance to network them all together. | 
|  | // | 
|  | // The given context will be used to run all qemu instances in the cluster, and | 
|  | // canceling the context or calling Close() will terminate them. | 
|  | func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) { | 
|  | if opts.NumNodes <= 0 { | 
|  | return nil, errors.New("refusing to start cluster with zero nodes") | 
|  | } | 
|  |  | 
|  | // Create the launch directory. | 
|  | ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster*") | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("failed to create the launch directory: %w", err) | 
|  | } | 
|  | // Create the socket directory. | 
|  | sd, err := os.MkdirTemp("/tmp", "cluster*") | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("failed to create the socket directory: %w", err) | 
|  | } | 
|  |  | 
|  | // Prepare links between nodes and nanoswitch. | 
|  | var switchPorts []*os.File | 
|  | var vmPorts []*os.File | 
|  | for i := 0; i < opts.NumNodes; i++ { | 
|  | switchPort, vmPort, err := launch.NewSocketPair() | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("failed to get socketpair: %w", err) | 
|  | } | 
|  | switchPorts = append(switchPorts, switchPort) | 
|  | vmPorts = append(vmPorts, vmPort) | 
|  | } | 
|  |  | 
|  | // Make a list of channels that will be populated by all running node qemu | 
|  | // processes. | 
|  | done := make([]chan error, opts.NumNodes) | 
|  | for i, _ := range done { | 
|  | done[i] = make(chan error, 1) | 
|  | } | 
|  |  | 
|  | // Prepare the node options. These will be kept as part of Cluster. | 
|  | // nodeOpts[].Runtime will be initialized by LaunchNode during the first | 
|  | // launch. The runtime information can be later used to restart a node. | 
|  | // The 0th node will be initialized first. The rest will follow after it | 
|  | // had bootstrapped the cluster. | 
|  | nodeOpts := make([]NodeOptions, opts.NumNodes) | 
|  | nodeOpts[0] = NodeOptions{ | 
|  | ConnectToSocket: vmPorts[0], | 
|  | NodeParameters: &apb.NodeParameters{ | 
|  | Cluster: &apb.NodeParameters_ClusterBootstrap_{ | 
|  | ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{ | 
|  | OwnerPublicKey: InsecurePublicKey, | 
|  | }, | 
|  | }, | 
|  | }, | 
|  | SerialPort: newPrefixedStdio(0), | 
|  | } | 
|  |  | 
|  | // Start the first node. | 
|  | ctxT, ctxC := context.WithCancel(ctx) | 
|  | log.Printf("Cluster: Starting node %d...", 1) | 
|  | go func() { | 
|  | err := LaunchNode(ctxT, ld, sd, &nodeOpts[0]) | 
|  | if err != nil { | 
|  | log.Printf("Node %d finished with an error: %v", 1, err) | 
|  | } | 
|  | done[0] <- err | 
|  | }() | 
|  |  | 
|  | // Launch nanoswitch. | 
|  | portMap, err := launch.ConflictFreePortMap(ClusterPorts) | 
|  | if err != nil { | 
|  | ctxC() | 
|  | return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err) | 
|  | } | 
|  |  | 
|  | go func() { | 
|  | if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{ | 
|  | KernelPath:             "metropolis/test/ktest/vmlinux", | 
|  | InitramfsPath:          "metropolis/test/nanoswitch/initramfs.cpio.lz4", | 
|  | ExtraNetworkInterfaces: switchPorts, | 
|  | PortMap:                portMap, | 
|  | SerialPort:             newPrefixedStdio(99), | 
|  | }); err != nil { | 
|  | if !errors.Is(err, ctxT.Err()) { | 
|  | log.Fatalf("Failed to launch nanoswitch: %v", err) | 
|  | } | 
|  | } | 
|  | }() | 
|  |  | 
|  | // Build SOCKS dialer. | 
|  | socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort]) | 
|  | socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct) | 
|  | if err != nil { | 
|  | ctxC() | 
|  | return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err) | 
|  | } | 
|  |  | 
|  | // Retrieve owner credentials and first node. | 
|  | cert, firstNode, err := firstConnection(ctxT, socksDialer) | 
|  | if err != nil { | 
|  | ctxC() | 
|  | return nil, err | 
|  | } | 
|  |  | 
|  | // Set up a partially initialized cluster instance, to be filled in in the | 
|  | // later steps. | 
|  | cluster := &Cluster{ | 
|  | Owner: *cert, | 
|  | Ports: portMap, | 
|  | Nodes: map[string]*NodeInCluster{ | 
|  | firstNode.ID: firstNode, | 
|  | }, | 
|  | NodeIDs: []string{ | 
|  | firstNode.ID, | 
|  | }, | 
|  |  | 
|  | nodesDone: done, | 
|  | nodeOpts:  nodeOpts, | 
|  | launchDir: ld, | 
|  | socketDir: sd, | 
|  |  | 
|  | socksDialer: socksDialer, | 
|  |  | 
|  | ctxT: ctxT, | 
|  | ctxC: ctxC, | 
|  | } | 
|  |  | 
|  | // Now start the rest of the nodes and register them into the cluster. | 
|  |  | 
|  | // Get an authenticated owner client within the cluster. | 
|  | curC, err := cluster.curatorClient() | 
|  | if err != nil { | 
|  | ctxC() | 
|  | return nil, fmt.Errorf("curatorClient: %w", err) | 
|  | } | 
|  | mgmt := apb.NewManagementClient(curC) | 
|  |  | 
|  | // Retrieve register ticket to register further nodes. | 
|  | log.Printf("Cluster: retrieving register ticket...") | 
|  | resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{}) | 
|  | if err != nil { | 
|  | ctxC() | 
|  | return nil, fmt.Errorf("GetRegisterTicket: %w", err) | 
|  | } | 
|  | ticket := resT.Ticket | 
|  | log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket)) | 
|  |  | 
|  | // Retrieve cluster info (for directory and ca public key) to register further | 
|  | // nodes. | 
|  | resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{}) | 
|  | if err != nil { | 
|  | ctxC() | 
|  | return nil, fmt.Errorf("GetClusterInfo: %w", err) | 
|  | } | 
|  |  | 
|  | // Use the retrieved information to configure the rest of the node options. | 
|  | for i := 1; i < opts.NumNodes; i++ { | 
|  | nodeOpts[i] = NodeOptions{ | 
|  | ConnectToSocket: vmPorts[i], | 
|  | NodeParameters: &apb.NodeParameters{ | 
|  | Cluster: &apb.NodeParameters_ClusterRegister_{ | 
|  | ClusterRegister: &apb.NodeParameters_ClusterRegister{ | 
|  | RegisterTicket:   ticket, | 
|  | ClusterDirectory: resI.ClusterDirectory, | 
|  | CaCertificate:    resI.CaCertificate, | 
|  | }, | 
|  | }, | 
|  | }, | 
|  | SerialPort: newPrefixedStdio(i), | 
|  | } | 
|  | } | 
|  |  | 
|  | // Now run the rest of the nodes. | 
|  | // | 
|  | // TODO(q3k): parallelize this | 
|  | for i := 1; i < opts.NumNodes; i++ { | 
|  | log.Printf("Cluster: Starting node %d...", i+1) | 
|  | go func(i int) { | 
|  | err := LaunchNode(ctxT, ld, sd, &nodeOpts[i]) | 
|  | if err != nil { | 
|  | log.Printf("Node %d finished with an error: %v", i, err) | 
|  | } | 
|  | done[i] <- err | 
|  | }(i) | 
|  | var newNode *apb.Node | 
|  |  | 
|  | log.Printf("Cluster: waiting for node %d to appear as NEW...", i) | 
|  | for { | 
|  | nodes, err := getNodes(ctx, mgmt) | 
|  | if err != nil { | 
|  | ctxC() | 
|  | return nil, fmt.Errorf("could not get nodes: %w", err) | 
|  | } | 
|  | for _, n := range nodes { | 
|  | if n.State == cpb.NodeState_NODE_STATE_NEW { | 
|  | newNode = n | 
|  | break | 
|  | } | 
|  | } | 
|  | if newNode != nil { | 
|  | break | 
|  | } | 
|  | time.Sleep(1 * time.Second) | 
|  | } | 
|  | id := identity.NodeID(newNode.Pubkey) | 
|  | log.Printf("Cluster: node %d is %s", i, id) | 
|  |  | 
|  | log.Printf("Cluster: approving node %d", i) | 
|  | _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{ | 
|  | Pubkey: newNode.Pubkey, | 
|  | }) | 
|  | if err != nil { | 
|  | ctxC() | 
|  | return nil, fmt.Errorf("ApproveNode(%s): %w", id, err) | 
|  | } | 
|  | log.Printf("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i) | 
|  | for { | 
|  | nodes, err := getNodes(ctx, mgmt) | 
|  | if err != nil { | 
|  | ctxC() | 
|  | return nil, fmt.Errorf("could not get nodes: %w", err) | 
|  | } | 
|  | found := false | 
|  | for _, n := range nodes { | 
|  | if !bytes.Equal(n.Pubkey, newNode.Pubkey) { | 
|  | continue | 
|  | } | 
|  | if n.Status == nil || n.Status.ExternalAddress == "" { | 
|  | break | 
|  | } | 
|  | if n.State != cpb.NodeState_NODE_STATE_UP { | 
|  | break | 
|  | } | 
|  | found = true | 
|  | cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{ | 
|  | ID:                identity.NodeID(n.Pubkey), | 
|  | ManagementAddress: n.Status.ExternalAddress, | 
|  | } | 
|  | cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey)) | 
|  | break | 
|  | } | 
|  | if found { | 
|  | break | 
|  | } | 
|  | time.Sleep(time.Second) | 
|  | } | 
|  | log.Printf("Cluster: node %d (%s) UP!", i, id) | 
|  | } | 
|  |  | 
|  | log.Printf("Cluster: all nodes up:") | 
|  | for _, node := range cluster.Nodes { | 
|  | log.Printf("Cluster:  - %s at %s", node.ID, node.ManagementAddress) | 
|  | } | 
|  |  | 
|  | return cluster, nil | 
|  | } | 
|  |  | 
|  | // RebootNode reboots the cluster member node matching the given index, and | 
|  | // waits for it to rejoin the cluster. It will use the given context ctx to run | 
|  | // cluster API requests, whereas the resulting QEMU process will be created | 
|  | // using the cluster's context c.ctxT. The nodes are indexed starting at 0. | 
|  | func (c *Cluster) RebootNode(ctx context.Context, idx int) error { | 
|  | if idx < 0 || idx >= len(c.NodeIDs) { | 
|  | return fmt.Errorf("index out of bounds.") | 
|  | } | 
|  | id := c.NodeIDs[idx] | 
|  |  | 
|  | // Get an authenticated owner client within the cluster. | 
|  | curC, err := c.curatorClient() | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | mgmt := apb.NewManagementClient(curC) | 
|  |  | 
|  | // Get the timestamp of the node's last update, as observed by Curator. | 
|  | // It'll be needed to make sure it had rejoined the cluster after the reboot. | 
|  | var is *apb.Node | 
|  | for { | 
|  | r, err := getNode(ctx, mgmt, id) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  |  | 
|  | // Node status may be absent if it hasn't reported to the cluster yet. Wait | 
|  | // for it to appear before progressing further. | 
|  | if r.Status != nil { | 
|  | is = r | 
|  | break | 
|  | } | 
|  | time.Sleep(time.Second) | 
|  | } | 
|  |  | 
|  | // Cancel the node's context. This will shut down QEMU. | 
|  | c.nodeOpts[idx].Runtime.CtxC() | 
|  | log.Printf("Cluster: waiting for node %d (%s) to stop.", idx, id) | 
|  | err = <-c.nodesDone[idx] | 
|  | if err != nil { | 
|  | return fmt.Errorf("while restarting node: %w", err) | 
|  | } | 
|  |  | 
|  | // Start QEMU again. | 
|  | log.Printf("Cluster: restarting node %d (%s).", idx, id) | 
|  | go func(n int) { | 
|  | err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n]) | 
|  | if err != nil { | 
|  | log.Printf("Node %d finished with an error: %v", n, err) | 
|  | } | 
|  | c.nodesDone[n] <- err | 
|  | }(idx) | 
|  |  | 
|  | // Poll Management.GetNodes until the node's timestamp is updated. | 
|  | for { | 
|  | cs, err := getNode(ctx, mgmt, id) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | if cs.Status == nil { | 
|  | continue | 
|  | } | 
|  | if cs.Status.Timestamp > is.Status.Timestamp { | 
|  | break | 
|  | } | 
|  | time.Sleep(time.Second) | 
|  | } | 
|  | log.Printf("Cluster: node %d (%s) has rejoined the cluster.", idx, id) | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // Close cancels the running clusters' context and waits for all virtualized | 
|  | // nodes to stop. It returns an error if stopping the nodes failed, or one of | 
|  | // the nodes failed to fully start in the first place. | 
|  | func (c *Cluster) Close() error { | 
|  | log.Printf("Cluster: stopping...") | 
|  | if c.authClient != nil { | 
|  | c.authClient.Close() | 
|  | } | 
|  | c.ctxC() | 
|  |  | 
|  | var errors []error | 
|  | log.Printf("Cluster: waiting for nodes to exit...") | 
|  | for _, c := range c.nodesDone { | 
|  | err := <-c | 
|  | if err != nil { | 
|  | errors = append(errors, err) | 
|  | } | 
|  | } | 
|  | log.Printf("Cluster: removing nodes' state files.") | 
|  | os.RemoveAll(c.launchDir) | 
|  | os.RemoveAll(c.socketDir) | 
|  | log.Printf("Cluster: done") | 
|  | return multierr.Combine(errors...) | 
|  | } | 
|  |  | 
|  | // DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by | 
|  | // their ID. This is performed by connecting to the cluster nanoswitch via its | 
|  | // SOCKS proxy, and using the cluster node list for name resolution. | 
|  | // | 
|  | // For example: | 
|  | // | 
|  | //   grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode)) | 
|  | // | 
|  | func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) { | 
|  | host, port, err := net.SplitHostPort(addr) | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("invalid host:port: %w", err) | 
|  | } | 
|  | // Already an IP address? | 
|  | if net.ParseIP(host) != nil { | 
|  | return c.socksDialer.Dial("tcp", addr) | 
|  | } | 
|  |  | 
|  | // Otherwise, expect a node name. | 
|  | node, ok := c.Nodes[host] | 
|  | if !ok { | 
|  | return nil, fmt.Errorf("unknown node %q", host) | 
|  | } | 
|  | addr = net.JoinHostPort(node.ManagementAddress, port) | 
|  | return c.socksDialer.Dial("tcp", addr) | 
|  | } |