m/t/launch: multi-node launches, prefixed stdout

This reinstantiates //:launch-test2, with some small fixes for usability
(prefixed stdout and GetNodes retries to handle cluster connectivity
issues as the cluster grows).

We also drive-by port //:launch-test2 and //:launch to use the new and
shiny clicontext package.

Change-Id: I62a1d827b2087f1173abf19e792a2088dc8b80bb
Reviewed-on: https://review.monogon.dev/c/monogon/+/485
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/test/launch/cli/launch-multi2/BUILD.bazel b/metropolis/test/launch/cli/launch-multi2/BUILD.bazel
index 51118cf..932b57b 100644
--- a/metropolis/test/launch/cli/launch-multi2/BUILD.bazel
+++ b/metropolis/test/launch/cli/launch-multi2/BUILD.bazel
@@ -5,7 +5,10 @@
     srcs = ["main.go"],
     importpath = "source.monogon.dev/metropolis/test/launch/cli/launch-multi2",
     visibility = ["//visibility:private"],
-    deps = ["//metropolis/pkg/logbuffer:go_default_library"],
+    deps = [
+        "//metropolis/cli/pkg/context:go_default_library",
+        "//metropolis/test/launch/cluster:go_default_library",
+    ],
 )
 
 go_binary(
diff --git a/metropolis/test/launch/cli/launch-multi2/main.go b/metropolis/test/launch/cli/launch-multi2/main.go
index d6c5f05..70c3745 100644
--- a/metropolis/test/launch/cli/launch-multi2/main.go
+++ b/metropolis/test/launch/cli/launch-multi2/main.go
@@ -17,35 +17,23 @@
 package main
 
 import (
-	"fmt"
-	"io"
+	"context"
 	"log"
-	"os"
 
-	"source.monogon.dev/metropolis/pkg/logbuffer"
+	clicontext "source.monogon.dev/metropolis/cli/pkg/context"
+	"source.monogon.dev/metropolis/test/launch/cluster"
 )
 
-// prefixedStdout is a os.Stdout proxy that prefixes every line with a constant
-// prefix. This is used to show logs from two Metropolis nodes without getting
-// them confused.
-// TODO(q3k): move to logging API instead of relying on qemu stdout, and remove
-// this function.
-func prefixedStdout(prefix string) io.ReadWriter {
-	lb := logbuffer.NewLineBuffer(2048, func(l *logbuffer.Line) {
-		fmt.Fprintf(os.Stdout, "%s%s\n", prefix, l.Data)
-	})
-	// Make a ReaderWriter from LineBuffer (a Reader), by combining into an
-	// anonymous struct with a io.MultiReader() (which will always return EOF
-	// on every Read if given no underlying readers).
-	return struct {
-		io.Reader
-		io.Writer
-	}{
-		Reader: io.MultiReader(),
-		Writer: lb,
-	}
-}
-
 func main() {
-	log.Fatal("unimplemented")
+	ctx := clicontext.WithInterrupt(context.Background())
+	cl, err := cluster.LaunchCluster(ctx, cluster.ClusterOptions{
+		NumNodes: 2,
+	})
+	if err != nil {
+		log.Fatalf("LaunchCluster: %v", err)
+	}
+	log.Printf("Launch: Cluster running!")
+
+	<-ctx.Done()
+	cl.Close()
 }
diff --git a/metropolis/test/launch/cli/launch/BUILD.bazel b/metropolis/test/launch/cli/launch/BUILD.bazel
index e7e9271..cb2cbb5 100644
--- a/metropolis/test/launch/cli/launch/BUILD.bazel
+++ b/metropolis/test/launch/cli/launch/BUILD.bazel
@@ -6,6 +6,7 @@
     importpath = "source.monogon.dev/metropolis/test/launch/cli/launch",
     visibility = ["//visibility:private"],
     deps = [
+        "//metropolis/cli/pkg/context:go_default_library",
         "//metropolis/proto/api:go_default_library",
         "//metropolis/test/launch:go_default_library",
         "//metropolis/test/launch/cluster:go_default_library",
diff --git a/metropolis/test/launch/cli/launch/main.go b/metropolis/test/launch/cli/launch/main.go
index aacba06..5567379 100644
--- a/metropolis/test/launch/cli/launch/main.go
+++ b/metropolis/test/launch/cli/launch/main.go
@@ -20,23 +20,16 @@
 	"context"
 	"log"
 	"os"
-	"os/signal"
-	"syscall"
 
+	clicontext "source.monogon.dev/metropolis/cli/pkg/context"
 	apb "source.monogon.dev/metropolis/proto/api"
 	"source.monogon.dev/metropolis/test/launch"
 	"source.monogon.dev/metropolis/test/launch/cluster"
 )
 
 func main() {
-	sigs := make(chan os.Signal, 1)
-	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
-	ctx, cancel := context.WithCancel(context.Background())
-	go func() {
-		<-sigs
-		cancel()
-	}()
-	if err := cluster.LaunchNode(ctx, cluster.NodeOptions{
+	ctx := clicontext.WithInterrupt(context.Background())
+	err := cluster.LaunchNode(ctx, cluster.NodeOptions{
 		Ports:      launch.IdentityPortMap(cluster.NodePorts),
 		SerialPort: os.Stdout,
 		NodeParameters: &apb.NodeParameters{
@@ -44,10 +37,8 @@
 				ClusterBootstrap: cluster.InsecureClusterBootstrap,
 			},
 		},
-	}); err != nil {
-		if err == ctx.Err() {
-			return
-		}
-		log.Fatalf("Failed to execute: %v\n", err)
+	})
+	if err != nil {
+		log.Fatalf("LaunchNode: %v", err)
 	}
 }
diff --git a/metropolis/test/launch/cluster/BUILD.bazel b/metropolis/test/launch/cluster/BUILD.bazel
index bf010d2..abcdc07 100644
--- a/metropolis/test/launch/cluster/BUILD.bazel
+++ b/metropolis/test/launch/cluster/BUILD.bazel
@@ -5,6 +5,7 @@
     srcs = [
         "cluster.go",
         "insecure_key.go",
+        "prefixed_stdio.go",
     ],
     data = [
         "//metropolis/node:image",
@@ -20,6 +21,7 @@
         "//metropolis/node:go_default_library",
         "//metropolis/node/core/identity:go_default_library",
         "//metropolis/node/core/rpc:go_default_library",
+        "//metropolis/pkg/logbuffer:go_default_library",
         "//metropolis/proto/api:go_default_library",
         "//metropolis/proto/common:go_default_library",
         "//metropolis/test/launch:go_default_library",
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index 4d5a772..5b1f3dc 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -238,20 +238,28 @@
 // getNodes wraps around Management.GetNodes to return a list of nodes in a
 // cluster.
 func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
-	srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
-	if err != nil {
-		return nil, fmt.Errorf("GetNodes: %w", err)
-	}
 	var res []*apb.Node
-	for {
-		node, err := srvN.Recv()
-		if err == io.EOF {
-			break
-		}
+	bo := backoff.NewExponentialBackOff()
+	err := backoff.Retry(func() error {
+		res = nil
+		srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
 		if err != nil {
-			return nil, fmt.Errorf("GetNodes.Recv: %w", err)
+			return fmt.Errorf("GetNodes: %w", err)
 		}
-		res = append(res, node)
+		for {
+			node, err := srvN.Recv()
+			if err == io.EOF {
+				break
+			}
+			if err != nil {
+				return fmt.Errorf("GetNodes.Recv: %w", err)
+			}
+			res = append(res, node)
+		}
+		return nil
+	}, bo)
+	if err != nil {
+		return nil, err
 	}
 	return res, nil
 }
@@ -355,6 +363,7 @@
 					},
 				},
 			},
+			SerialPort: newPrefixedStdio(0),
 		})
 		done[0] <- err
 	}()
@@ -374,7 +383,7 @@
 			PortMap:                portMap,
 		}); err != nil {
 			if !errors.Is(err, ctxT.Err()) {
-				log.Printf("Failed to launch nanoswitch: %v", err)
+				log.Fatalf("Failed to launch nanoswitch: %v", err)
 			}
 		}
 	}()
@@ -456,7 +465,7 @@
 						},
 					},
 				},
-				SerialPort: os.Stdout,
+				SerialPort: newPrefixedStdio(i),
 			})
 			done[i] <- err
 		}(i)
diff --git a/metropolis/test/launch/cluster/prefixed_stdio.go b/metropolis/test/launch/cluster/prefixed_stdio.go
new file mode 100644
index 0000000..059c3cb
--- /dev/null
+++ b/metropolis/test/launch/cluster/prefixed_stdio.go
@@ -0,0 +1,39 @@
+package cluster
+
+import (
+	"fmt"
+	"io"
+	"strings"
+
+	"source.monogon.dev/metropolis/pkg/logbuffer"
+)
+
+// prefixedStdio is a io.ReadWriter which splits written bytes into lines,
+// prefixes them with some known prefix, and spits them to os.Stdout.
+//
+// io.Reader is implemented for compatibility with code which expects an
+// io.ReadWriter, but always returns EOF.
+type prefixedStdio struct {
+	*logbuffer.LineBuffer
+}
+
+// newPrefixedStdio returns a prefixedStdio that prefixes all lines with <num>|,
+// used to distinguish different VMs used within the launch codebase.
+func newPrefixedStdio(num int) prefixedStdio {
+	return prefixedStdio{
+		logbuffer.NewLineBuffer(1024, func(l *logbuffer.Line) {
+			s := strings.TrimSpace(l.String())
+			// TODO(q3k): don't just skip lines containing escape sequences, strip the
+			// sequences out. Or stop parsing qemu logs and instead dial log endpoint in
+			// spawned nodes.
+			if strings.Contains(s, "\u001b") {
+				return
+			}
+			fmt.Printf("%02d| %s\n", num, s)
+		}),
+	}
+}
+
+func (p prefixedStdio) Read(_ []byte) (int, error) {
+	return 0, io.EOF
+}