m/t/launch: multi-node launches, prefixed stdout
This reinstantiates //:launch-test2, with some small fixes for usability
(prefixed stdout and GetNodes retries to handle cluster connectivity
issues as the cluster grows).
We also drive-by port //:launch-test2 and //:launch to use the new and
shiny clicontext package.
Change-Id: I62a1d827b2087f1173abf19e792a2088dc8b80bb
Reviewed-on: https://review.monogon.dev/c/monogon/+/485
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/test/launch/cluster/BUILD.bazel b/metropolis/test/launch/cluster/BUILD.bazel
index bf010d2..abcdc07 100644
--- a/metropolis/test/launch/cluster/BUILD.bazel
+++ b/metropolis/test/launch/cluster/BUILD.bazel
@@ -5,6 +5,7 @@
srcs = [
"cluster.go",
"insecure_key.go",
+ "prefixed_stdio.go",
],
data = [
"//metropolis/node:image",
@@ -20,6 +21,7 @@
"//metropolis/node:go_default_library",
"//metropolis/node/core/identity:go_default_library",
"//metropolis/node/core/rpc:go_default_library",
+ "//metropolis/pkg/logbuffer:go_default_library",
"//metropolis/proto/api:go_default_library",
"//metropolis/proto/common:go_default_library",
"//metropolis/test/launch:go_default_library",
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index 4d5a772..5b1f3dc 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -238,20 +238,28 @@
// getNodes wraps around Management.GetNodes to return a list of nodes in a
// cluster.
func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
- srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
- if err != nil {
- return nil, fmt.Errorf("GetNodes: %w", err)
- }
var res []*apb.Node
- for {
- node, err := srvN.Recv()
- if err == io.EOF {
- break
- }
+ bo := backoff.NewExponentialBackOff()
+ err := backoff.Retry(func() error {
+ res = nil
+ srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
if err != nil {
- return nil, fmt.Errorf("GetNodes.Recv: %w", err)
+ return fmt.Errorf("GetNodes: %w", err)
}
- res = append(res, node)
+ for {
+ node, err := srvN.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("GetNodes.Recv: %w", err)
+ }
+ res = append(res, node)
+ }
+ return nil
+ }, bo)
+ if err != nil {
+ return nil, err
}
return res, nil
}
@@ -355,6 +363,7 @@
},
},
},
+ SerialPort: newPrefixedStdio(0),
})
done[0] <- err
}()
@@ -374,7 +383,7 @@
PortMap: portMap,
}); err != nil {
if !errors.Is(err, ctxT.Err()) {
- log.Printf("Failed to launch nanoswitch: %v", err)
+ log.Fatalf("Failed to launch nanoswitch: %v", err)
}
}
}()
@@ -456,7 +465,7 @@
},
},
},
- SerialPort: os.Stdout,
+ SerialPort: newPrefixedStdio(i),
})
done[i] <- err
}(i)
diff --git a/metropolis/test/launch/cluster/prefixed_stdio.go b/metropolis/test/launch/cluster/prefixed_stdio.go
new file mode 100644
index 0000000..059c3cb
--- /dev/null
+++ b/metropolis/test/launch/cluster/prefixed_stdio.go
@@ -0,0 +1,39 @@
+package cluster
+
+import (
+ "fmt"
+ "io"
+ "strings"
+
+ "source.monogon.dev/metropolis/pkg/logbuffer"
+)
+
+// prefixedStdio is a io.ReadWriter which splits written bytes into lines,
+// prefixes them with some known prefix, and spits them to os.Stdout.
+//
+// io.Reader is implemented for compatibility with code which expects an
+// io.ReadWriter, but always returns EOF.
+type prefixedStdio struct {
+ *logbuffer.LineBuffer
+}
+
+// newPrefixedStdio returns a prefixedStdio that prefixes all lines with <num>|,
+// used to distinguish different VMs used within the launch codebase.
+func newPrefixedStdio(num int) prefixedStdio {
+ return prefixedStdio{
+ logbuffer.NewLineBuffer(1024, func(l *logbuffer.Line) {
+ s := strings.TrimSpace(l.String())
+ // TODO(q3k): don't just skip lines containing escape sequences, strip the
+ // sequences out. Or stop parsing qemu logs and instead dial log endpoint in
+ // spawned nodes.
+ if strings.Contains(s, "\u001b") {
+ return
+ }
+ fmt.Printf("%02d| %s\n", num, s)
+ }),
+ }
+}
+
+func (p prefixedStdio) Read(_ []byte) (int, error) {
+ return 0, io.EOF
+}