metropolis/cli/metroctl: implement install ssh

This implements another way of installing metropolis via ssh. It does
this by uploading the files to the target machine and then doing a kexec
into the install environment. If it fails at any point it will print the
error and reboot.

Change-Id: I1ac6538896709c386b053a84903fa04940c1f012
Reviewed-on: https://review.monogon.dev/c/monogon/+/2079
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/cli/takeover/e2e/BUILD.bazel b/metropolis/cli/takeover/e2e/BUILD.bazel
new file mode 100644
index 0000000..81ff44a
--- /dev/null
+++ b/metropolis/cli/takeover/e2e/BUILD.bazel
@@ -0,0 +1,32 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_test")
+
+go_test(
+    name = "e2e_test",
+    srcs = ["main_test.go"],
+    data = [
+        "//metropolis/cli/takeover",
+        "//metropolis/installer/test/testos:testos_bundle",
+        "//third_party/edk2:OVMF_CODE.fd",
+        "//third_party/edk2:OVMF_VARS.fd",
+        "@debian_11_cloudimage//file",
+    ],
+    x_defs = {
+        "xBundleFilePath": "$(rlocationpath //metropolis/installer/test/testos:testos_bundle )",
+        "xOvmfVarsPath": "$(rlocationpath //third_party/edk2:OVMF_VARS.fd )",
+        "xOvmfCodePath": "$(rlocationpath //third_party/edk2:OVMF_CODE.fd )",
+        "xCloudImagePath": "$(rlocationpath @debian_11_cloudimage//file )",
+        # TODO(tim): Hardcoded because of https://github.com/monogon-dev/monogon/issues/316
+        "xTakeoverPath": "_main/metropolis/cli/takeover/takeover/takeover_bin",
+    },
+    deps = [
+        "//go/net/ssh",
+        "//metropolis/proto/api",
+        "//metropolis/test/launch",
+        "//osbase/fat32",
+        "//osbase/freeport",
+        "@io_bazel_rules_go//go/runfiles:go_default_library",
+        "@org_golang_google_protobuf//proto",
+        "@org_golang_x_crypto//ssh",
+        "@org_golang_x_sys//unix",
+    ],
+)
diff --git a/metropolis/cli/takeover/e2e/main_test.go b/metropolis/cli/takeover/e2e/main_test.go
new file mode 100644
index 0000000..32d7fb8
--- /dev/null
+++ b/metropolis/cli/takeover/e2e/main_test.go
@@ -0,0 +1,229 @@
+package e2e
+
+import (
+	"bufio"
+	"context"
+	"crypto/ed25519"
+	"crypto/rand"
+	"encoding/json"
+	"fmt"
+	"net"
+	"os"
+	"os/exec"
+	"os/signal"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/bazelbuild/rules_go/go/runfiles"
+	xssh "golang.org/x/crypto/ssh"
+	"golang.org/x/sys/unix"
+	"google.golang.org/protobuf/proto"
+
+	"source.monogon.dev/metropolis/proto/api"
+
+	"source.monogon.dev/go/net/ssh"
+	"source.monogon.dev/metropolis/test/launch"
+	"source.monogon.dev/osbase/fat32"
+	"source.monogon.dev/osbase/freeport"
+)
+
+var (
+	// These are filled by bazel at linking time with the canonical path of
+	// their corresponding file. Inside the init function we resolve it
+	// with the rules_go runfiles package to the real path.
+	xBundleFilePath string
+	xOvmfVarsPath   string
+	xOvmfCodePath   string
+	xCloudImagePath string
+	xTakeoverPath   string
+)
+
+func init() {
+	var err error
+	for _, path := range []*string{
+		&xCloudImagePath, &xOvmfVarsPath, &xOvmfCodePath,
+		&xTakeoverPath, &xBundleFilePath,
+	} {
+		*path, err = runfiles.Rlocation(*path)
+		if err != nil {
+			panic(err)
+		}
+	}
+}
+
+const GiB = 1024 * 1024 * 1024
+
+func TestE2E(t *testing.T) {
+	pubKey, privKey, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	sshPubKey, err := xssh.NewPublicKey(pubKey)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	sshPrivkey, err := xssh.NewSignerFromKey(privKey)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// CloudConfig doesn't really have a rigid spec, so just put things into it
+	cloudConfig := make(map[string]any)
+	cloudConfig["ssh_authorized_keys"] = []string{
+		strings.TrimSuffix(string(xssh.MarshalAuthorizedKey(sshPubKey)), "\n"),
+	}
+
+	userData, err := json.Marshal(cloudConfig)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	rootInode := fat32.Inode{
+		Attrs: fat32.AttrDirectory,
+		Children: []*fat32.Inode{
+			{
+				Name:    "user-data",
+				Content: strings.NewReader("#cloud-config\n" + string(userData)),
+			},
+			{
+				Name:    "meta-data",
+				Content: strings.NewReader(""),
+			},
+		},
+	}
+	cloudInitDataFile, err := os.CreateTemp("", "cidata*.img")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.Remove(cloudInitDataFile.Name())
+	if err := fat32.WriteFS(cloudInitDataFile, rootInode, fat32.Options{Label: "cidata"}); err != nil {
+		t.Fatal(err)
+	}
+
+	rootDisk, err := os.CreateTemp("", "rootdisk")
+	if err != nil {
+		t.Fatal(err)
+	}
+	// Create a 10GiB sparse root disk
+	if err := unix.Ftruncate(int(rootDisk.Fd()), 10*GiB); err != nil {
+		t.Fatalf("ftruncate failed: %v", err)
+	}
+
+	defer os.Remove(rootDisk.Name())
+
+	sshPort, sshPortCloser, err := freeport.AllocateTCPPort()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	qemuArgs := []string{
+		"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "1024",
+		"-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
+		"-drive", "if=pflash,format=raw,readonly=on,file=" + xOvmfCodePath,
+		"-drive", "if=pflash,format=raw,snapshot=on,file=" + xOvmfVarsPath,
+		"-drive", "if=none,format=raw,cache=unsafe,id=root,file=" + rootDisk.Name(),
+		"-drive", "if=none,format=qcow2,snapshot=on,id=cloud,cache=unsafe,file=" + xCloudImagePath,
+		"-device", "virtio-blk-pci,drive=root,bootindex=1",
+		"-device", "virtio-blk-pci,drive=cloud,bootindex=2",
+		"-drive", "if=virtio,format=raw,snapshot=on,file=" + cloudInitDataFile.Name(),
+		"-netdev", fmt.Sprintf("user,id=net0,net=10.42.0.0/24,dhcpstart=10.42.0.10,hostfwd=tcp::%d-:22", sshPort),
+		"-device", "virtio-net-pci,netdev=net0,mac=22:d5:8e:76:1d:07",
+		"-device", "virtio-rng-pci",
+		"-serial", "stdio",
+	}
+	qemuCmd := exec.Command("qemu-system-x86_64", qemuArgs...)
+	stdoutPipe, err := qemuCmd.StdoutPipe()
+	if err != nil {
+		t.Fatal(err)
+	}
+	installSucceed := make(chan struct{})
+	go func() {
+		s := bufio.NewScanner(stdoutPipe)
+		for s.Scan() {
+			t.Log("kernel: " + s.Text())
+			if strings.Contains(s.Text(), "_TESTOS_LAUNCH_SUCCESS_") {
+				installSucceed <- struct{}{}
+				break
+			}
+		}
+		qemuCmd.Wait()
+	}()
+	qemuCmd.Stderr = os.Stderr
+	sshPortCloser.Close()
+	if err := qemuCmd.Start(); err != nil {
+		t.Fatal(err)
+	}
+	defer qemuCmd.Process.Kill()
+
+	cl := ssh.DirectClient{
+		Username:    "debian",
+		AuthMethods: []xssh.AuthMethod{xssh.PublicKeys(sshPrivkey)},
+	}
+
+	ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
+
+	var conn ssh.Connection
+	for {
+		conn, err = cl.Dial(ctx, net.JoinHostPort("localhost", fmt.Sprintf("%d", sshPort)), 5*time.Second)
+		if err != nil {
+			t.Logf("error connecting via SSH, retrying: %v", err)
+			time.Sleep(1 * time.Second)
+			continue
+		}
+		break
+	}
+
+	takeover, err := os.Open(xTakeoverPath)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	const takeoverTargetPath = "/tmp/takeover"
+	if err := conn.Upload(ctx, takeoverTargetPath, takeover); err != nil {
+		t.Fatalf("error while uploading takeover: %v", err)
+	}
+
+	bundleFile, err := os.Open(xBundleFilePath)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	const bundleTargetPath = "/tmp/bundle.zip"
+	if err := conn.Upload(ctx, bundleTargetPath, bundleFile); err != nil {
+		t.Fatalf("error while uploading bundle: %v", err)
+	}
+
+	params := &api.NodeParameters{
+		Cluster: &api.NodeParameters_ClusterBootstrap_{
+			ClusterBootstrap: &api.NodeParameters_ClusterBootstrap{
+				OwnerPublicKey: launch.InsecurePublicKey,
+			},
+		},
+		NetworkConfig: nil,
+	}
+	rawParams, err := proto.Marshal(params)
+	if err != nil {
+		t.Fatalf("error while marshaling node params: %v", err)
+	}
+
+	// Start the agent and wait for the agent's output to arrive.
+	t.Logf("Starting the takeover executable at path %q.", takeoverTargetPath)
+	_, stderr, err := conn.Execute(ctx, fmt.Sprintf("sudo %s -disk %s", takeoverTargetPath, "vda"), rawParams)
+	stderrStr := strings.TrimSpace(string(stderr))
+	if stderrStr != "" {
+		t.Logf("Agent stderr: %q", stderrStr)
+	}
+	if err != nil {
+		t.Fatalf("while starting the takeover executable: %v", err)
+	}
+
+	select {
+	case <-installSucceed:
+		// Done, test passed
+	case <-time.After(30 * time.Second):
+		t.Fatal("Waiting for installation timed out")
+	}
+}