m/t/launch: multi-node launches, prefixed stdout
This reinstantiates //:launch-test2, with some small fixes for usability
(prefixed stdout and GetNodes retries to handle cluster connectivity
issues as the cluster grows).
We also drive-by port //:launch-test2 and //:launch to use the new and
shiny clicontext package.
Change-Id: I62a1d827b2087f1173abf19e792a2088dc8b80bb
Reviewed-on: https://review.monogon.dev/c/monogon/+/485
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/test/launch/cli/launch-multi2/BUILD.bazel b/metropolis/test/launch/cli/launch-multi2/BUILD.bazel
index 51118cf..932b57b 100644
--- a/metropolis/test/launch/cli/launch-multi2/BUILD.bazel
+++ b/metropolis/test/launch/cli/launch-multi2/BUILD.bazel
@@ -5,7 +5,10 @@
srcs = ["main.go"],
importpath = "source.monogon.dev/metropolis/test/launch/cli/launch-multi2",
visibility = ["//visibility:private"],
- deps = ["//metropolis/pkg/logbuffer:go_default_library"],
+ deps = [
+ "//metropolis/cli/pkg/context:go_default_library",
+ "//metropolis/test/launch/cluster:go_default_library",
+ ],
)
go_binary(
diff --git a/metropolis/test/launch/cli/launch-multi2/main.go b/metropolis/test/launch/cli/launch-multi2/main.go
index d6c5f05..70c3745 100644
--- a/metropolis/test/launch/cli/launch-multi2/main.go
+++ b/metropolis/test/launch/cli/launch-multi2/main.go
@@ -17,35 +17,23 @@
package main
import (
- "fmt"
- "io"
+ "context"
"log"
- "os"
- "source.monogon.dev/metropolis/pkg/logbuffer"
+ clicontext "source.monogon.dev/metropolis/cli/pkg/context"
+ "source.monogon.dev/metropolis/test/launch/cluster"
)
-// prefixedStdout is a os.Stdout proxy that prefixes every line with a constant
-// prefix. This is used to show logs from two Metropolis nodes without getting
-// them confused.
-// TODO(q3k): move to logging API instead of relying on qemu stdout, and remove
-// this function.
-func prefixedStdout(prefix string) io.ReadWriter {
- lb := logbuffer.NewLineBuffer(2048, func(l *logbuffer.Line) {
- fmt.Fprintf(os.Stdout, "%s%s\n", prefix, l.Data)
- })
- // Make a ReaderWriter from LineBuffer (a Reader), by combining into an
- // anonymous struct with a io.MultiReader() (which will always return EOF
- // on every Read if given no underlying readers).
- return struct {
- io.Reader
- io.Writer
- }{
- Reader: io.MultiReader(),
- Writer: lb,
- }
-}
-
func main() {
- log.Fatal("unimplemented")
+ ctx := clicontext.WithInterrupt(context.Background())
+ cl, err := cluster.LaunchCluster(ctx, cluster.ClusterOptions{
+ NumNodes: 2,
+ })
+ if err != nil {
+ log.Fatalf("LaunchCluster: %v", err)
+ }
+ log.Printf("Launch: Cluster running!")
+
+ <-ctx.Done()
+ cl.Close()
}
diff --git a/metropolis/test/launch/cli/launch/BUILD.bazel b/metropolis/test/launch/cli/launch/BUILD.bazel
index e7e9271..cb2cbb5 100644
--- a/metropolis/test/launch/cli/launch/BUILD.bazel
+++ b/metropolis/test/launch/cli/launch/BUILD.bazel
@@ -6,6 +6,7 @@
importpath = "source.monogon.dev/metropolis/test/launch/cli/launch",
visibility = ["//visibility:private"],
deps = [
+ "//metropolis/cli/pkg/context:go_default_library",
"//metropolis/proto/api:go_default_library",
"//metropolis/test/launch:go_default_library",
"//metropolis/test/launch/cluster:go_default_library",
diff --git a/metropolis/test/launch/cli/launch/main.go b/metropolis/test/launch/cli/launch/main.go
index aacba06..5567379 100644
--- a/metropolis/test/launch/cli/launch/main.go
+++ b/metropolis/test/launch/cli/launch/main.go
@@ -20,23 +20,16 @@
"context"
"log"
"os"
- "os/signal"
- "syscall"
+ clicontext "source.monogon.dev/metropolis/cli/pkg/context"
apb "source.monogon.dev/metropolis/proto/api"
"source.monogon.dev/metropolis/test/launch"
"source.monogon.dev/metropolis/test/launch/cluster"
)
func main() {
- sigs := make(chan os.Signal, 1)
- signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
- ctx, cancel := context.WithCancel(context.Background())
- go func() {
- <-sigs
- cancel()
- }()
- if err := cluster.LaunchNode(ctx, cluster.NodeOptions{
+ ctx := clicontext.WithInterrupt(context.Background())
+ err := cluster.LaunchNode(ctx, cluster.NodeOptions{
Ports: launch.IdentityPortMap(cluster.NodePorts),
SerialPort: os.Stdout,
NodeParameters: &apb.NodeParameters{
@@ -44,10 +37,8 @@
ClusterBootstrap: cluster.InsecureClusterBootstrap,
},
},
- }); err != nil {
- if err == ctx.Err() {
- return
- }
- log.Fatalf("Failed to execute: %v\n", err)
+ })
+ if err != nil {
+ log.Fatalf("LaunchNode: %v", err)
}
}
diff --git a/metropolis/test/launch/cluster/BUILD.bazel b/metropolis/test/launch/cluster/BUILD.bazel
index bf010d2..abcdc07 100644
--- a/metropolis/test/launch/cluster/BUILD.bazel
+++ b/metropolis/test/launch/cluster/BUILD.bazel
@@ -5,6 +5,7 @@
srcs = [
"cluster.go",
"insecure_key.go",
+ "prefixed_stdio.go",
],
data = [
"//metropolis/node:image",
@@ -20,6 +21,7 @@
"//metropolis/node:go_default_library",
"//metropolis/node/core/identity:go_default_library",
"//metropolis/node/core/rpc:go_default_library",
+ "//metropolis/pkg/logbuffer:go_default_library",
"//metropolis/proto/api:go_default_library",
"//metropolis/proto/common:go_default_library",
"//metropolis/test/launch:go_default_library",
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index 4d5a772..5b1f3dc 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -238,20 +238,28 @@
// getNodes wraps around Management.GetNodes to return a list of nodes in a
// cluster.
func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
- srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
- if err != nil {
- return nil, fmt.Errorf("GetNodes: %w", err)
- }
var res []*apb.Node
- for {
- node, err := srvN.Recv()
- if err == io.EOF {
- break
- }
+ bo := backoff.NewExponentialBackOff()
+ err := backoff.Retry(func() error {
+ res = nil
+ srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
if err != nil {
- return nil, fmt.Errorf("GetNodes.Recv: %w", err)
+ return fmt.Errorf("GetNodes: %w", err)
}
- res = append(res, node)
+ for {
+ node, err := srvN.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("GetNodes.Recv: %w", err)
+ }
+ res = append(res, node)
+ }
+ return nil
+ }, bo)
+ if err != nil {
+ return nil, err
}
return res, nil
}
@@ -355,6 +363,7 @@
},
},
},
+ SerialPort: newPrefixedStdio(0),
})
done[0] <- err
}()
@@ -374,7 +383,7 @@
PortMap: portMap,
}); err != nil {
if !errors.Is(err, ctxT.Err()) {
- log.Printf("Failed to launch nanoswitch: %v", err)
+ log.Fatalf("Failed to launch nanoswitch: %v", err)
}
}
}()
@@ -456,7 +465,7 @@
},
},
},
- SerialPort: os.Stdout,
+ SerialPort: newPrefixedStdio(i),
})
done[i] <- err
}(i)
diff --git a/metropolis/test/launch/cluster/prefixed_stdio.go b/metropolis/test/launch/cluster/prefixed_stdio.go
new file mode 100644
index 0000000..059c3cb
--- /dev/null
+++ b/metropolis/test/launch/cluster/prefixed_stdio.go
@@ -0,0 +1,39 @@
+package cluster
+
+import (
+ "fmt"
+ "io"
+ "strings"
+
+ "source.monogon.dev/metropolis/pkg/logbuffer"
+)
+
+// prefixedStdio is a io.ReadWriter which splits written bytes into lines,
+// prefixes them with some known prefix, and spits them to os.Stdout.
+//
+// io.Reader is implemented for compatibility with code which expects an
+// io.ReadWriter, but always returns EOF.
+type prefixedStdio struct {
+ *logbuffer.LineBuffer
+}
+
+// newPrefixedStdio returns a prefixedStdio that prefixes all lines with <num>|,
+// used to distinguish different VMs used within the launch codebase.
+func newPrefixedStdio(num int) prefixedStdio {
+ return prefixedStdio{
+ logbuffer.NewLineBuffer(1024, func(l *logbuffer.Line) {
+ s := strings.TrimSpace(l.String())
+ // TODO(q3k): don't just skip lines containing escape sequences, strip the
+ // sequences out. Or stop parsing qemu logs and instead dial log endpoint in
+ // spawned nodes.
+ if strings.Contains(s, "\u001b") {
+ return
+ }
+ fmt.Printf("%02d| %s\n", num, s)
+ }),
+ }
+}
+
+func (p prefixedStdio) Read(_ []byte) (int, error) {
+ return 0, io.EOF
+}