m/t/installer: stream qemu output in harness

This allows the installer to hang forever in error cases instead of
having to reboot.

Change-Id: I328524727718e160ae8d6928b6d8b4921f7e036f
Reviewed-on: https://review.monogon.dev/c/monogon/+/490
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/metropolis/test/installer/BUILD.bazel b/metropolis/test/installer/BUILD.bazel
index 5763d75..df62c1c 100644
--- a/metropolis/test/installer/BUILD.bazel
+++ b/metropolis/test/installer/BUILD.bazel
@@ -22,6 +22,7 @@
     deps = [
         "//metropolis/cli/metroctl/core:go_default_library",
         "//metropolis/node/build/mkimage/osimage:go_default_library",
+        "//metropolis/pkg/logbuffer:go_default_library",
         "//metropolis/proto/api:go_default_library",
         "@com_github_diskfs_go_diskfs//:go_default_library",
         "@com_github_diskfs_go_diskfs//disk:go_default_library",
diff --git a/metropolis/test/installer/main.go b/metropolis/test/installer/main.go
index 34c4d31..b36c70d 100644
--- a/metropolis/test/installer/main.go
+++ b/metropolis/test/installer/main.go
@@ -20,14 +20,14 @@
 package main
 
 import (
-	"bufio"
-	"bytes"
+	"context"
 	"fmt"
 	"io"
 	"log"
 	"os"
 	"os/exec"
 	"path/filepath"
+	"strings"
 	"syscall"
 	"testing"
 
@@ -38,6 +38,7 @@
 
 	mctl "source.monogon.dev/metropolis/cli/metroctl/core"
 	"source.monogon.dev/metropolis/node/build/mkimage/osimage"
+	"source.monogon.dev/metropolis/pkg/logbuffer"
 	"source.monogon.dev/metropolis/proto/api"
 )
 
@@ -58,10 +59,13 @@
 	nodeStorage string
 )
 
-// runQemu starts a QEMU process and waits for it to finish. args is
-// concatenated to the list of predefined default arguments. It returns true if
-// expectedOutput is found in the serial port output. It may return an error.
-func runQemu(args []string, expectedOutput string) (bool, error) {
+// runQemu starts a QEMU process and waits until it either finishes or the given
+// expectedOutput appears in a line emitted to stdout or stderr. It returns true
+// if it was found, false otherwise.
+//
+// The qemu process will be killed when the context cancels or the function
+// exits.
+func runQemu(ctx context.Context, args []string, expectedOutput string) (bool, error) {
 	// Prepare the default parameter list.
 	defaultArgs := []string{
 		"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults",
@@ -73,36 +77,54 @@
 		"-serial", "stdio",
 		"-no-reboot",
 	}
+
+	// Make a sub-context to ensure that qemu exits when this function is done.
+	ctxQ, ctxC := context.WithCancel(ctx)
+	defer ctxC()
+
 	// Join the parameter lists and prepare the Qemu command, but don't run it
 	// just yet.
 	qemuArgs := append(defaultArgs, args...)
-	qemuCmd := exec.Command("external/qemu/qemu-x86_64-softmmu", qemuArgs...)
+	qemuCmd := exec.CommandContext(ctxQ, "external/qemu/qemu-x86_64-softmmu", qemuArgs...)
 
-	// Copy the stdout and stderr output so that it could be matched against
-	// expectedOutput later.
-	var outBuf, errBuf bytes.Buffer
-	outWriter := bufio.NewWriter(&outBuf)
-	errWriter := bufio.NewWriter(&errBuf)
-	qemuCmd.Stdout = io.MultiWriter(os.Stdout, outWriter)
-	qemuCmd.Stderr = io.MultiWriter(os.Stderr, errWriter)
-	if err := qemuCmd.Run(); err != nil {
+	// Copy the stdout and stderr output to a single channel of lines so that they
+	// can then be matched against expectedOutput.
+	lineC := make(chan string)
+	outBuffer := logbuffer.NewLineBuffer(1024, func(l *logbuffer.Line) {
+		lineC <- l.Data
+	})
+	defer outBuffer.Close()
+	errBuffer := logbuffer.NewLineBuffer(1024, func(l *logbuffer.Line) {
+		lineC <- l.Data
+	})
+	defer errBuffer.Close()
+
+	// Tee std{out,err} into the linebuffers above and the process' std{out,err}, to
+	// allow easier debugging.
+	qemuCmd.Stdout = io.MultiWriter(os.Stdout, outBuffer)
+	qemuCmd.Stderr = io.MultiWriter(os.Stderr, errBuffer)
+	if err := qemuCmd.Start(); err != nil {
 		return false, fmt.Errorf("couldn't start QEMU: %w", err)
 	}
-	outWriter.Flush()
-	errWriter.Flush()
 
 	// Try matching against expectedOutput and return the result.
-	result := bytes.Contains(outBuf.Bytes(), []byte(expectedOutput)) ||
-		bytes.Contains(errBuf.Bytes(), []byte(expectedOutput))
-	return result, nil
+	for {
+		select {
+		case <-ctx.Done():
+			return false, ctx.Err()
+		case line := <-lineC:
+			if strings.Contains(line, expectedOutput) {
+				return true, nil
+			}
+		}
+	}
 }
 
-// runQemuWithInstaller starts a QEMU process and waits for it to finish. args is
-// concatenated to the list of predefined default arguments. It returns true if
-// expectedOutput is found in the serial port output. It may return an error.
-func runQemuWithInstaller(args []string, expectedOutput string) (bool, error) {
+// runQemuWithInstaller runs the Metropolis Installer in a qemu, performing the
+// same search-through-std{out,err} as runQemu.
+func runQemuWithInstaller(ctx context.Context, args []string, expectedOutput string) (bool, error) {
 	args = append(args, "-drive", "if=virtio,format=raw,snapshot=on,cache=unsafe,file="+installerImage)
-	return runQemu(args, expectedOutput)
+	return runQemu(ctx, args, expectedOutput)
 }
 
 // getStorage creates a sparse file, given a size expressed in mebibytes, and
@@ -242,11 +264,14 @@
 }
 
 func TestNoBlockDevices(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
 	// No block devices are passed to QEMU aside from the install medium. Expect
 	// the installer to fail at the device probe stage rather than attempting to
 	// use the medium as the target device.
 	expectedOutput := "couldn't find a suitable block device"
-	result, err := runQemuWithInstaller(nil, expectedOutput)
+	result, err := runQemuWithInstaller(ctx, nil, expectedOutput)
 	if err != nil {
 		t.Error(err.Error())
 	}
@@ -256,6 +281,9 @@
 }
 
 func TestBlockDeviceTooSmall(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
 	// Prepare the block device the installer will install to. This time the
 	// target device is too small to host a Metropolis installation.
 	imagePath, err := getStorage(64)
@@ -266,7 +294,7 @@
 
 	// Run QEMU. Expect the installer to fail with a predefined error string.
 	expectedOutput := "couldn't find a suitable block device"
-	result, err := runQemuWithInstaller(qemuDriveParam(imagePath), expectedOutput)
+	result, err := runQemuWithInstaller(ctx, qemuDriveParam(imagePath), expectedOutput)
 	if err != nil {
 		t.Error(err.Error())
 	}
@@ -276,6 +304,9 @@
 }
 
 func TestInstall(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
 	// Prepare the block device image the installer will install to.
 	storagePath, err := getStorage(4096 + 128 + 128 + 1)
 	defer os.Remove(storagePath)
@@ -285,7 +316,7 @@
 
 	// Run QEMU. Expect the installer to succeed.
 	expectedOutput := "Installation completed"
-	result, err := runQemuWithInstaller(qemuDriveParam(storagePath), expectedOutput)
+	result, err := runQemuWithInstaller(ctx, qemuDriveParam(storagePath), expectedOutput)
 	if err != nil {
 		t.Error(err.Error())
 	}
@@ -330,7 +361,7 @@
 	}
 	// Run QEMU again. Expect TestOS to launch successfully.
 	expectedOutput = "_TESTOS_LAUNCH_SUCCESS_"
-	result, err = runQemu(qemuDriveParam(storagePath), expectedOutput)
+	result, err = runQemu(ctx, qemuDriveParam(storagePath), expectedOutput)
 	if err != nil {
 		t.Error(err.Error())
 	}