m/cli/takeover/e2e: test with metroctl

This extends the starting end of the takeover end-to-end test by
executing metroctl instead of duplicating some of the logic in the test.

VM logs are now printed with %q such that that terminal escape codes are
printed escaped instead of raw. Without this, parts of the logs are
obscured after a new kernel boots.

Change-Id: I05de3eb2ce142a99815eba6b978dad92772fe10f
Reviewed-on: https://review.monogon.dev/c/monogon/+/4034
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/metropolis/cli/takeover/e2e/BUILD.bazel b/metropolis/cli/takeover/e2e/BUILD.bazel
index 25032ed..b085947 100644
--- a/metropolis/cli/takeover/e2e/BUILD.bazel
+++ b/metropolis/cli/takeover/e2e/BUILD.bazel
@@ -4,6 +4,7 @@
     name = "e2e_test",
     srcs = ["main_test.go"],
     data = [
+        "//metropolis/cli/metroctl:metroctl_lite",
         "//metropolis/cli/takeover",
         "//metropolis/installer/test/testos:testos_bundle",
         "//third_party/edk2:OVMF_CODE.fd",
@@ -16,16 +17,14 @@
         "xOvmfCodePath": "$(rlocationpath //third_party/edk2:OVMF_CODE.fd )",
         "xCloudImagePath": "$(rlocationpath @debian_11_cloudimage//file )",
         "xTakeoverPath": "$(rlocationpath //metropolis/cli/takeover )",
+        "xMetroctlPath": "$(rlocationpath //metropolis/cli/metroctl:metroctl_lite )",
     },
     deps = [
-        "//go/net/ssh",
-        "//metropolis/proto/api",
-        "//metropolis/test/launch",
         "//osbase/fat32",
         "//osbase/freeport",
         "@io_bazel_rules_go//go/runfiles",
-        "@org_golang_google_protobuf//proto",
         "@org_golang_x_crypto//ssh",
+        "@org_golang_x_crypto//ssh/agent",
         "@org_golang_x_sys//unix",
     ],
 )
diff --git a/metropolis/cli/takeover/e2e/main_test.go b/metropolis/cli/takeover/e2e/main_test.go
index c9108cd..1ba5354 100644
--- a/metropolis/cli/takeover/e2e/main_test.go
+++ b/metropolis/cli/takeover/e2e/main_test.go
@@ -5,28 +5,24 @@
 
 import (
 	"bufio"
-	"context"
 	"crypto/ed25519"
 	"crypto/rand"
 	"encoding/json"
+	"errors"
 	"fmt"
+	"io"
 	"net"
 	"os"
 	"os/exec"
-	"os/signal"
 	"strings"
 	"testing"
 	"time"
 
 	"github.com/bazelbuild/rules_go/go/runfiles"
 	xssh "golang.org/x/crypto/ssh"
+	"golang.org/x/crypto/ssh/agent"
 	"golang.org/x/sys/unix"
-	"google.golang.org/protobuf/proto"
 
-	"source.monogon.dev/metropolis/proto/api"
-
-	"source.monogon.dev/go/net/ssh"
-	"source.monogon.dev/metropolis/test/launch"
 	"source.monogon.dev/osbase/fat32"
 	"source.monogon.dev/osbase/freeport"
 )
@@ -40,13 +36,14 @@
 	xOvmfCodePath   string
 	xCloudImagePath string
 	xTakeoverPath   string
+	xMetroctlPath   string
 )
 
 func init() {
 	var err error
 	for _, path := range []*string{
 		&xCloudImagePath, &xOvmfVarsPath, &xOvmfCodePath,
-		&xTakeoverPath, &xBundleFilePath,
+		&xTakeoverPath, &xBundleFilePath, &xMetroctlPath,
 	} {
 		*path, err = runfiles.Rlocation(*path)
 		if err != nil {
@@ -68,16 +65,47 @@
 		t.Fatal(err)
 	}
 
-	sshPrivkey, err := xssh.NewSignerFromKey(privKey)
+	keyring := agent.NewKeyring()
+	err = keyring.Add(agent.AddedKey{
+		PrivateKey: privKey,
+	})
 	if err != nil {
 		t.Fatal(err)
 	}
 
+	// Create the socket directory. We keep it in /tmp because of socket path limits.
+	socketDir, err := os.MkdirTemp("/tmp", "test-sockets-*")
+	if err != nil {
+		t.Fatalf("Failed to create socket directory: %v", err)
+	}
+	defer os.RemoveAll(socketDir)
+
+	// Start ssh agent server.
+	sshAuthSock := socketDir + "/ssh-auth"
+	agentListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: sshAuthSock, Net: "unix"})
+	if err != nil {
+		t.Fatal(err)
+	}
+	go func() {
+		for {
+			conn, err := agentListener.AcceptUnix()
+			if err != nil {
+				return
+			}
+			err = agent.ServeAgent(keyring, conn)
+			if err != nil && !errors.Is(err, io.EOF) {
+				t.Logf("ServeAgent error: %v", err)
+			}
+			conn.Close()
+		}
+	}()
+
 	// CloudConfig doesn't really have a rigid spec, so just put things into it
 	cloudConfig := make(map[string]any)
 	cloudConfig["ssh_authorized_keys"] = []string{
 		strings.TrimSuffix(string(xssh.MarshalAuthorizedKey(sshPubKey)), "\n"),
 	}
+	cloudConfig["disable_root"] = false
 
 	userData, err := json.Marshal(cloudConfig)
 	if err != nil {
@@ -142,11 +170,20 @@
 	if err != nil {
 		t.Fatal(err)
 	}
+	sshdStarted := make(chan struct{})
 	installSucceed := make(chan struct{})
 	go func() {
 		s := bufio.NewScanner(stdoutPipe)
 		for s.Scan() {
-			t.Log("kernel: " + s.Text())
+			t.Logf("VM: %q", s.Text())
+			if strings.Contains(s.Text(), "Started") &&
+				strings.Contains(s.Text(), "Secure Shell server") {
+				sshdStarted <- struct{}{}
+				break
+			}
+		}
+		for s.Scan() {
+			t.Logf("VM: %q", s.Text())
 			if strings.Contains(s.Text(), "_TESTOS_LAUNCH_SUCCESS_") {
 				installSucceed <- struct{}{}
 				break
@@ -161,68 +198,30 @@
 	}
 	defer qemuCmd.Process.Kill()
 
-	cl := ssh.DirectClient{
-		Username:    "debian",
-		AuthMethods: []xssh.AuthMethod{xssh.PublicKeys(sshPrivkey)},
+	select {
+	case <-sshdStarted:
+	case <-time.After(30 * time.Second):
+		t.Fatal("Waiting for sshd start timed out")
 	}
 
-	ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
-
-	var conn ssh.Connection
-	for {
-		conn, err = cl.Dial(ctx, net.JoinHostPort("localhost", fmt.Sprintf("%d", sshPort)), 5*time.Second)
-		if err != nil {
-			t.Logf("error connecting via SSH, retrying: %v", err)
-			time.Sleep(1 * time.Second)
-			continue
-		}
-		break
+	installArgs := []string{
+		"install", "ssh",
+		fmt.Sprintf("root@localhost:%d", sshPort),
+		"--disk", "vda",
+		"--bootstrap",
+		"--cluster", "cluster.internal",
+		"--takeover", xTakeoverPath,
+		"--bundle", xBundleFilePath,
 	}
-
-	takeover, err := os.Open(xTakeoverPath)
-	if err != nil {
+	installCmd := exec.Command(xMetroctlPath, installArgs...)
+	installCmd.Env = append(installCmd.Environ(), fmt.Sprintf("SSH_AUTH_SOCK=%s", sshAuthSock))
+	installCmd.Stdout = os.Stdout
+	installCmd.Stderr = os.Stderr
+	t.Logf("Running %s", installCmd.String())
+	if err := installCmd.Run(); err != nil {
 		t.Fatal(err)
 	}
 
-	const takeoverTargetPath = "/tmp/takeover"
-	if err := conn.Upload(ctx, takeoverTargetPath, takeover); err != nil {
-		t.Fatalf("error while uploading takeover: %v", err)
-	}
-
-	bundleFile, err := os.Open(xBundleFilePath)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	const bundleTargetPath = "/tmp/bundle.zip"
-	if err := conn.Upload(ctx, bundleTargetPath, bundleFile); err != nil {
-		t.Fatalf("error while uploading bundle: %v", err)
-	}
-
-	params := &api.NodeParameters{
-		Cluster: &api.NodeParameters_ClusterBootstrap_{
-			ClusterBootstrap: &api.NodeParameters_ClusterBootstrap{
-				OwnerPublicKey: launch.InsecurePublicKey,
-			},
-		},
-		NetworkConfig: nil,
-	}
-	rawParams, err := proto.Marshal(params)
-	if err != nil {
-		t.Fatalf("error while marshaling node params: %v", err)
-	}
-
-	// Start the agent and wait for the agent's output to arrive.
-	t.Logf("Starting the takeover executable at path %q.", takeoverTargetPath)
-	_, stderr, err := conn.Execute(ctx, fmt.Sprintf("sudo %s -disk %s", takeoverTargetPath, "vda"), rawParams)
-	stderrStr := strings.TrimSpace(string(stderr))
-	if stderrStr != "" {
-		t.Logf("Agent stderr: %q", stderrStr)
-	}
-	if err != nil {
-		t.Fatalf("while starting the takeover executable: %v", err)
-	}
-
 	select {
 	case <-installSucceed:
 		// Done, test passed