m/test/launch: allow specifying launch parameters
This adds flags to the launch-cluster command for specifying the size of
the cluster, tpm and storage security configuration, number of CPUs and
RAM size for all nodes, and assigning roles to specific nodes.
As an example, the following command launches a cluster with tpm
disabled, 4 nodes, 2 CPUs and 4 GiB of RAM on each node, and assigns the
Kubernetes Worker role to all except the first node:
bazel run //metropolis:launch-cluster -- -tpm-mode=disabled \
-num-nodes=4 -cpu=2 -ram=4G -kubernetes-worker=1-3
The default storage security policy was changed to insecure, as this
speeds up cluster launch.
The cluster configuration flags are defined in a new separate package to
avoid code duplication.
Fixes: https://github.com/monogon-dev/monogon/issues/315
Change-Id: Icf8b7fcbd6e609f4785b2a60ce5e7be14b641884
Reviewed-on: https://review.monogon.dev/c/monogon/+/3307
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/test/launch/cli/launch-cluster/BUILD.bazel b/metropolis/test/launch/cli/launch-cluster/BUILD.bazel
index a952abd..74067a7 100644
--- a/metropolis/test/launch/cli/launch-cluster/BUILD.bazel
+++ b/metropolis/test/launch/cli/launch-cluster/BUILD.bazel
@@ -7,7 +7,10 @@
importpath = "source.monogon.dev/metropolis/test/launch/cli/launch-cluster",
visibility = ["//visibility:private"],
deps = [
+ "//metropolis/cli/flagdefs",
"//metropolis/cli/metroctl/core",
+ "//metropolis/node",
+ "//metropolis/proto/common",
"//metropolis/test/launch",
],
)
diff --git a/metropolis/test/launch/cli/launch-cluster/main.go b/metropolis/test/launch/cli/launch-cluster/main.go
index 695fcc7..cceebd1 100644
--- a/metropolis/test/launch/cli/launch-cluster/main.go
+++ b/metropolis/test/launch/cli/launch-cluster/main.go
@@ -18,36 +18,139 @@
import (
"context"
+ "errors"
+ "flag"
+ "fmt"
"log"
+ "net"
"os"
"os/exec"
"os/signal"
+ "strconv"
+ "strings"
+ "time"
+ "source.monogon.dev/metropolis/cli/flagdefs"
metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
+ "source.monogon.dev/metropolis/node"
+ cpb "source.monogon.dev/metropolis/proto/common"
mlaunch "source.monogon.dev/metropolis/test/launch"
)
-func main() {
- ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
- cl, err := mlaunch.LaunchCluster(ctx, mlaunch.ClusterOptions{
- NumNodes: 3,
- NodeLogsToFiles: true,
+const maxNodes = 256
+
+func nodeSetFlag(p *[]int, name string, usage string) {
+ flag.Func(name, usage, func(val string) error {
+ for _, part := range strings.Split(val, ",") {
+ part = strings.TrimSpace(part)
+ if part == "" {
+ continue
+ }
+ startStr, endStr, ok := strings.Cut(part, "-")
+ if !ok {
+ endStr = startStr
+ }
+ start, err := strconv.Atoi(startStr)
+ if err != nil {
+ return err
+ }
+ end, err := strconv.Atoi(endStr)
+ if err != nil {
+ return err
+ }
+ if end >= maxNodes {
+ return fmt.Errorf("node index %v out of range, there can be at most %v nodes", end, maxNodes)
+ }
+ if end < start {
+ return fmt.Errorf("invalid range %q, end is smaller than start", part)
+ }
+ for i := start; i <= end; i++ {
+ *p = append(*p, i)
+ }
+ }
+ return nil
})
+}
+
+func sizeFlagMiB(p *int, name string, usage string) {
+ flag.Func(name, usage, func(val string) error {
+ multiplier := 1
+ switch {
+ case strings.HasSuffix(val, "M"):
+ case strings.HasSuffix(val, "G"):
+ multiplier = 1024
+ default:
+ return errors.New("must have suffix M for MiB or G for GiB")
+ }
+ intVal, err := strconv.Atoi(val[:len(val)-1])
+ if err != nil {
+ return err
+ }
+ *p = multiplier * intVal
+ return nil
+ })
+}
+
+func main() {
+ clusterConfig := cpb.ClusterConfiguration{}
+ opts := mlaunch.ClusterOptions{
+ NodeLogsToFiles: true,
+ InitialClusterConfiguration: &clusterConfig,
+ }
+ var consensusMemberList, kubernetesControllerList, kubernetesWorkerList []int
+
+ flag.IntVar(&opts.NumNodes, "num-nodes", 3, "Number of cluster nodes")
+ flagdefs.TPMModeVar(flag.CommandLine, &clusterConfig.TpmMode, "tpm-mode", cpb.ClusterConfiguration_TPM_MODE_REQUIRED, "TPM mode to set on cluster")
+ flagdefs.StorageSecurityPolicyVar(flag.CommandLine, &clusterConfig.StorageSecurityPolicy, "storage-security", cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_INSECURE, "Storage security policy to set on cluster")
+ flag.IntVar(&opts.Node.CPUs, "cpu", 1, "Number of virtual CPUs of each node")
+ flag.IntVar(&opts.Node.ThreadsPerCPU, "threads-per-cpu", 1, "Number of threads per CPU")
+ sizeFlagMiB(&opts.Node.MemoryMiB, "ram", "RAM size of each node, with suffix M for MiB or G for GiB")
+ nodeSetFlag(&consensusMemberList, "consensus-member", "List of nodes which get the Consensus Member role. Example: 0,3-5")
+ nodeSetFlag(&kubernetesControllerList, "kubernetes-controller", "List of nodes which get the Kubernetes Controller role. Example: 0,3-5")
+ nodeSetFlag(&kubernetesWorkerList, "kubernetes-worker", "List of nodes which get the Kubernetes Worker role. Example: 0,3-5")
+ flag.Parse()
+
+ if opts.NumNodes >= maxNodes {
+ log.Fatalf("num-nodes (%v) is too large, there can be at most %v nodes", opts.NumNodes, maxNodes)
+ }
+ for _, list := range [][]int{consensusMemberList, kubernetesControllerList, kubernetesWorkerList} {
+ for i := len(list) - 1; i >= 0; i-- {
+ if list[i] >= opts.NumNodes {
+ log.Fatalf("Node index %v out of range, can be at most %v", list[i], opts.NumNodes-1)
+ }
+ }
+ }
+
+ ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
+ cl, err := mlaunch.LaunchCluster(ctx, opts)
if err != nil {
log.Fatalf("LaunchCluster: %v", err)
}
+ for _, node := range consensusMemberList {
+ cl.MakeConsensusMember(ctx, cl.NodeIDs[node])
+ }
+ for _, node := range kubernetesControllerList {
+ cl.MakeKubernetesController(ctx, cl.NodeIDs[node])
+ }
+ for _, node := range kubernetesWorkerList {
+ cl.MakeKubernetesWorker(ctx, cl.NodeIDs[node])
+ }
+
wpath, err := cl.MakeMetroctlWrapper()
if err != nil {
log.Fatalf("MakeWrapper: %v", err)
}
- apiservers, err := cl.KubernetesControllerNodeAddresses(ctx)
- if err != nil {
- log.Fatalf("Could not get Kubernetes controller nodes: %v", err)
- }
- if len(apiservers) < 1 {
- log.Fatalf("Cluster has no Kubernetes controller nodes")
+ apiserver := cl.Nodes[cl.NodeIDs[0]].ManagementAddress
+ // Wait for the API server to start listening.
+ for {
+ conn, err := cl.DialNode(ctx, net.JoinHostPort(apiserver, node.KubernetesAPIWrappedPort.PortString()))
+ if err == nil {
+ conn.Close()
+ break
+ }
+ time.Sleep(100 * time.Millisecond)
}
// If the user has metroctl in their path, use the metroctl from path as
@@ -62,7 +165,7 @@
}
configName := "launch-cluster"
- if err := metroctl.InstallKubeletConfig(ctx, metroctlPath, cl.ConnectOptions(), configName, apiservers[0]); err != nil {
+ if err := metroctl.InstallKubeletConfig(ctx, metroctlPath, cl.ConnectOptions(), configName, apiserver); err != nil {
log.Fatalf("InstallKubeletConfig: %v", err)
}
diff --git a/metropolis/test/launch/cluster.go b/metropolis/test/launch/cluster.go
index 98ab760..1435925 100644
--- a/metropolis/test/launch/cluster.go
+++ b/metropolis/test/launch/cluster.go
@@ -62,6 +62,16 @@
// Name is a human-readable identifier to be used in debug output.
Name string
+ // CPUs is the number of virtual CPUs of the VM.
+ CPUs int
+
+ // ThreadsPerCPU is the number of threads per CPU. This is multiplied by
+ // CPUs to get the total number of threads.
+ ThreadsPerCPU int
+
+ // MemoryMiB is the RAM size in MiB of the VM.
+ MemoryMiB int
+
// Ports contains the port mapping where to expose the internal ports of the VM to
// the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
// ConnectToSocket is set.
@@ -215,6 +225,16 @@
// swtpm and qemu because we run into UNIX socket length limitations (for legacy
// reasons 108 chars).
+ if options.CPUs == 0 {
+ options.CPUs = 1
+ }
+ if options.ThreadsPerCPU == 0 {
+ options.ThreadsPerCPU = 1
+ }
+ if options.MemoryMiB == 0 {
+ options.MemoryMiB = 2048
+ }
+
// If it's the node's first start, set up its runtime directories.
if options.Runtime == nil {
r, err := setupRuntime(ld, sd)
@@ -262,8 +282,13 @@
fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
storagePath := filepath.Join(r.ld, "image.qcow2")
qemuArgs := []string{
- "-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "2048",
- "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
+ "-machine", "q35",
+ "-accel", "kvm",
+ "-nographic",
+ "-nodefaults",
+ "-cpu", "host",
+ "-m", fmt.Sprintf("%dM", options.MemoryMiB),
+ "-smp", fmt.Sprintf("cores=%d,threads=%d", options.CPUs, options.ThreadsPerCPU),
"-drive", "if=pflash,format=raw,readonly=on,file=" + xOvmfCodePath,
"-drive", "if=pflash,format=raw,file=" + fwVarPath,
"-drive", "if=virtio,format=qcow2,cache=unsafe,file=" + storagePath,
@@ -520,6 +545,9 @@
// The number of nodes this cluster should be started with.
NumNodes int
+ // Node are default options of all nodes.
+ Node NodeOptions
+
// If true, node logs will be saved to individual files instead of being printed
// out to stderr. The path of these files will be still printed to stdout.
//
@@ -704,6 +732,32 @@
return nil, errors.New("refusing to start cluster with zero nodes")
}
+ // Prepare the node options. These will be kept as part of Cluster.
+ // nodeOpts[].Runtime will be initialized by LaunchNode during the first
+ // launch. The runtime information can be later used to restart a node.
+ // The 0th node will be initialized first. The rest will follow after it
+ // had bootstrapped the cluster.
+ nodeOpts := make([]NodeOptions, opts.NumNodes)
+ for i := range opts.NumNodes {
+ nodeOpts[i] = opts.Node
+ nodeOpts[i].Name = fmt.Sprintf("node%d", i)
+ nodeOpts[i].SerialPort = newPrefixedStdio(i)
+ }
+ nodeOpts[0].NodeParameters = &apb.NodeParameters{
+ Cluster: &apb.NodeParameters_ClusterBootstrap_{
+ ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
+ OwnerPublicKey: InsecurePublicKey,
+ InitialClusterConfiguration: opts.InitialClusterConfiguration,
+ Labels: &cpb.NodeLabels{
+ Pairs: []*cpb.NodeLabels_Pair{
+ {Key: nodeNumberKey, Value: "0"},
+ },
+ },
+ },
+ },
+ }
+ nodeOpts[0].PcapDump = true
+
// Create the launch directory.
ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
if err != nil {
@@ -730,14 +784,13 @@
// Prepare links between nodes and nanoswitch.
var switchPorts []*os.File
- var vmPorts []*os.File
- for i := 0; i < opts.NumNodes; i++ {
+ for i := range opts.NumNodes {
switchPort, vmPort, err := launch.NewSocketPair()
if err != nil {
return nil, fmt.Errorf("failed to get socketpair: %w", err)
}
switchPorts = append(switchPorts, switchPort)
- vmPorts = append(vmPorts, vmPort)
+ nodeOpts[i].ConnectToSocket = vmPort
}
// Make a list of channels that will be populated by all running node qemu
@@ -747,39 +800,16 @@
done[i] = make(chan error, 1)
}
- // Prepare the node options. These will be kept as part of Cluster.
- // nodeOpts[].Runtime will be initialized by LaunchNode during the first
- // launch. The runtime information can be later used to restart a node.
- // The 0th node will be initialized first. The rest will follow after it
- // had bootstrapped the cluster.
- nodeOpts := make([]NodeOptions, opts.NumNodes)
- nodeOpts[0] = NodeOptions{
- Name: "node0",
- ConnectToSocket: vmPorts[0],
- NodeParameters: &apb.NodeParameters{
- Cluster: &apb.NodeParameters_ClusterBootstrap_{
- ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
- OwnerPublicKey: InsecurePublicKey,
- InitialClusterConfiguration: opts.InitialClusterConfiguration,
- Labels: &cpb.NodeLabels{
- Pairs: []*cpb.NodeLabels_Pair{
- {Key: nodeNumberKey, Value: "0"},
- },
- },
- },
- },
- },
- SerialPort: newPrefixedStdio(0),
- PcapDump: true,
- }
if opts.NodeLogsToFiles {
- path := path.Join(ld, "node-0.txt")
- port, err := NewSerialFileLogger(path)
- if err != nil {
- return nil, fmt.Errorf("could not open log file for node 0: %w", err)
+ for i := range opts.NumNodes {
+ path := path.Join(ld, fmt.Sprintf("node-%d.txt", i))
+ port, err := NewSerialFileLogger(path)
+ if err != nil {
+ return nil, fmt.Errorf("could not open log file for node %d: %w", i, err)
+ }
+ launch.Log("Node %d logs at %s", i, path)
+ nodeOpts[i].SerialPort = port
}
- launch.Log("Node 0 logs at %s", path)
- nodeOpts[0].SerialPort = port
}
// Start the first node.
@@ -939,33 +969,19 @@
// Use the retrieved information to configure the rest of the node options.
for i := 1; i < opts.NumNodes; i++ {
- nodeOpts[i] = NodeOptions{
- Name: fmt.Sprintf("node%d", i),
- ConnectToSocket: vmPorts[i],
- NodeParameters: &apb.NodeParameters{
- Cluster: &apb.NodeParameters_ClusterRegister_{
- ClusterRegister: &apb.NodeParameters_ClusterRegister{
- RegisterTicket: ticket,
- ClusterDirectory: resI.ClusterDirectory,
- CaCertificate: resI.CaCertificate,
- Labels: &cpb.NodeLabels{
- Pairs: []*cpb.NodeLabels_Pair{
- {Key: nodeNumberKey, Value: fmt.Sprintf("%d", i)},
- },
+ nodeOpts[i].NodeParameters = &apb.NodeParameters{
+ Cluster: &apb.NodeParameters_ClusterRegister_{
+ ClusterRegister: &apb.NodeParameters_ClusterRegister{
+ RegisterTicket: ticket,
+ ClusterDirectory: resI.ClusterDirectory,
+ CaCertificate: resI.CaCertificate,
+ Labels: &cpb.NodeLabels{
+ Pairs: []*cpb.NodeLabels_Pair{
+ {Key: nodeNumberKey, Value: fmt.Sprintf("%d", i)},
},
},
},
},
- SerialPort: newPrefixedStdio(i),
- }
- if opts.NodeLogsToFiles {
- path := path.Join(ld, fmt.Sprintf("node-%d.txt", i))
- port, err := NewSerialFileLogger(path)
- if err != nil {
- return nil, fmt.Errorf("could not open log file for node %d: %w", i, err)
- }
- launch.Log("Node %d logs at %s", i, path)
- nodeOpts[i].SerialPort = port
}
}
@@ -979,7 +995,7 @@
}
// Wait for nodes to appear as NEW, populate a map from node number (index into
- // NodeOpts, etc.) to Metropolis Node ID.
+ // nodeOpts, etc.) to Metropolis Node ID.
seenNodes := make(map[string]bool)
nodeNumberToID := make(map[int]string)
launch.Log("Cluster: waiting for nodes to appear as NEW...")
@@ -1390,6 +1406,25 @@
return err
}
+// MakeKubernetesController adds the KubernetesController role to a node by ID.
+func (c *Cluster) MakeKubernetesController(ctx context.Context, id string) error {
+ curC, err := c.CuratorClient()
+ if err != nil {
+ return err
+ }
+ mgmt := apb.NewManagementClient(curC)
+
+ tr := true
+ launch.Log("Cluster: %s: adding KubernetesController", id)
+ _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
+ Node: &apb.UpdateNodeRolesRequest_Id{
+ Id: id,
+ },
+ KubernetesController: &tr,
+ })
+ return err
+}
+
// MakeConsensusMember adds the ConsensusMember role to a node by ID.
func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
curC, err := c.CuratorClient()