m/test/launch: allow specifying launch parameters
This adds flags to the launch-cluster command for specifying the size of
the cluster, tpm and storage security configuration, number of CPUs and
RAM size for all nodes, and assigning roles to specific nodes.
As an example, the following command launches a cluster with tpm
disabled, 4 nodes, 2 CPUs and 4 GiB of RAM on each node, and assigns the
Kubernetes Worker role to all except the first node:
bazel run //metropolis:launch-cluster -- -tpm-mode=disabled \
-num-nodes=4 -cpu=2 -ram=4G -kubernetes-worker=1-3
The default storage security policy was changed to insecure, as this
speeds up cluster launch.
The cluster configuration flags are defined in a new separate package to
avoid code duplication.
Fixes: https://github.com/monogon-dev/monogon/issues/315
Change-Id: Icf8b7fcbd6e609f4785b2a60ce5e7be14b641884
Reviewed-on: https://review.monogon.dev/c/monogon/+/3307
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/cli/flagdefs/BUILD.bazel b/metropolis/cli/flagdefs/BUILD.bazel
new file mode 100644
index 0000000..b59ef50
--- /dev/null
+++ b/metropolis/cli/flagdefs/BUILD.bazel
@@ -0,0 +1,12 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "flagdefs",
+ srcs = ["flagdefs.go"],
+ importpath = "source.monogon.dev/metropolis/cli/flagdefs",
+ visibility = ["//metropolis:__subpackages__"],
+ deps = [
+ "//metropolis/proto/common",
+ "@com_github_spf13_pflag//:pflag",
+ ],
+)
diff --git a/metropolis/cli/flagdefs/flagdefs.go b/metropolis/cli/flagdefs/flagdefs.go
new file mode 100644
index 0000000..fe2423e
--- /dev/null
+++ b/metropolis/cli/flagdefs/flagdefs.go
@@ -0,0 +1,162 @@
+// Package flagdefs contains shared flag definitions for Metropolis.
+// The usage is the same as for the standard flags in the [flag] package,
+// except that the [flag.FlagSet] needs to be passed in the first parameter.
+// Pass [flag.CommandLine] to use the default FlagSet.
+// There are also separate functions for use with the [pflag] package.
+package flagdefs
+
+import (
+ "errors"
+ "flag"
+ "strings"
+
+ "github.com/spf13/pflag"
+
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// tpmModeValue implements the [flag.Value] and [pflag.Value] interfaces.
+type tpmModeValue cpb.ClusterConfiguration_TPMMode
+
+func (v *tpmModeValue) Set(val string) error {
+ var tpmMode cpb.ClusterConfiguration_TPMMode
+ switch strings.ToLower(val) {
+ case "required", "require":
+ tpmMode = cpb.ClusterConfiguration_TPM_MODE_REQUIRED
+ case "best-effort", "besteffort":
+ tpmMode = cpb.ClusterConfiguration_TPM_MODE_BEST_EFFORT
+ case "disabled", "disable":
+ tpmMode = cpb.ClusterConfiguration_TPM_MODE_DISABLED
+ default:
+ return errors.New("must be one of: required, best-effort, disabled")
+ }
+ *v = tpmModeValue(tpmMode)
+ return nil
+}
+
+func (v *tpmModeValue) String() string {
+ switch cpb.ClusterConfiguration_TPMMode(*v) {
+ case cpb.ClusterConfiguration_TPM_MODE_REQUIRED:
+ return "required"
+ case cpb.ClusterConfiguration_TPM_MODE_BEST_EFFORT:
+ return "best-effort"
+ case cpb.ClusterConfiguration_TPM_MODE_DISABLED:
+ return "disabled"
+ default:
+ return ""
+ }
+}
+
+func (*tpmModeValue) Type() string {
+ return "tpmMode"
+}
+
+// TPMModeVar defines a TPMMode flag with specified name, default value, and
+// usage string. The argument p points to a TPMMode variable in which to store
+// the value of the flag.
+func TPMModeVar(flags *flag.FlagSet, p *cpb.ClusterConfiguration_TPMMode, name string, value cpb.ClusterConfiguration_TPMMode, usage string) {
+ *p = value
+ flags.Var((*tpmModeValue)(p), name, usage+" (one of: required, best-effort, disabled)")
+}
+
+// TPMMode defines a TPMMode flag with specified name, default value, and
+// usage string. The return value is the address of a TPMMode variable that
+// stores the value of the flag.
+func TPMMode(flags *flag.FlagSet, name string, value cpb.ClusterConfiguration_TPMMode, usage string) *cpb.ClusterConfiguration_TPMMode {
+ val := new(cpb.ClusterConfiguration_TPMMode)
+ TPMModeVar(flags, val, name, value, usage)
+ return val
+}
+
+// TPMModeVarPflag defines a TPMMode flag with specified name, default value,
+// and usage string. The argument p points to a TPMMode variable in which to
+// store the value of the flag.
+func TPMModeVarPflag(flags *pflag.FlagSet, p *cpb.ClusterConfiguration_TPMMode, name string, value cpb.ClusterConfiguration_TPMMode, usage string) {
+ *p = value
+ flags.Var((*tpmModeValue)(p), name, usage+" (one of: required, best-effort, disabled)")
+}
+
+// TPMModePflag defines a TPMMode flag with specified name, default value, and
+// usage string. The return value is the address of a TPMMode variable that
+// stores the value of the flag.
+func TPMModePflag(flags *pflag.FlagSet, name string, value cpb.ClusterConfiguration_TPMMode, usage string) *cpb.ClusterConfiguration_TPMMode {
+ val := new(cpb.ClusterConfiguration_TPMMode)
+ TPMModeVarPflag(flags, val, name, value, usage)
+ return val
+}
+
+// storageSecurityPolicyValue implements the [flag.Value] and [pflag.Value]
+// interfaces.
+type storageSecurityPolicyValue cpb.ClusterConfiguration_StorageSecurityPolicy
+
+func (v *storageSecurityPolicyValue) Set(val string) error {
+ var storageSecurityPolicy cpb.ClusterConfiguration_StorageSecurityPolicy
+ switch strings.ToLower(val) {
+ case "permissive":
+ storageSecurityPolicy = cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_PERMISSIVE
+ case "needs-encryption":
+ storageSecurityPolicy = cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_ENCRYPTION
+ case "needs-encryption-and-authentication":
+ storageSecurityPolicy = cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_ENCRYPTION_AND_AUTHENTICATION
+ case "needs-insecure":
+ storageSecurityPolicy = cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_INSECURE
+ default:
+ return errors.New("must be one of: permissive, needs-encryption, needs-encryption-and-authentication, needs-insecure")
+ }
+ *v = storageSecurityPolicyValue(storageSecurityPolicy)
+ return nil
+}
+
+func (v *storageSecurityPolicyValue) String() string {
+ switch cpb.ClusterConfiguration_StorageSecurityPolicy(*v) {
+ case cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_PERMISSIVE:
+ return "permissive"
+ case cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_ENCRYPTION:
+ return "needs-encryption"
+ case cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_ENCRYPTION_AND_AUTHENTICATION:
+ return "needs-encryption-and-authentication"
+ case cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_INSECURE:
+ return "needs-insecure"
+ default:
+ return ""
+ }
+}
+
+func (*storageSecurityPolicyValue) Type() string {
+ return "storageSecurityPolicy"
+}
+
+// StorageSecurityPolicyVar defines a StorageSecurityPolicy flag with specified
+// name, default value, and usage string. The argument p points to a
+// StorageSecurityPolicy variable in which to store the value of the flag.
+func StorageSecurityPolicyVar(flags *flag.FlagSet, p *cpb.ClusterConfiguration_StorageSecurityPolicy, name string, value cpb.ClusterConfiguration_StorageSecurityPolicy, usage string) {
+ *p = value
+ flags.Var((*storageSecurityPolicyValue)(p), name, usage+" (one of: permissive, needs-encryption, needs-encryption-and-authentication, needs-insecure)")
+}
+
+// StorageSecurityPolicy defines a StorageSecurityPolicy flag with specified
+// name, default value, and usage string. The return value is the address of a
+// StorageSecurityPolicy variable that stores the value of the flag.
+func StorageSecurityPolicy(flags *flag.FlagSet, name string, value cpb.ClusterConfiguration_StorageSecurityPolicy, usage string) *cpb.ClusterConfiguration_StorageSecurityPolicy {
+ val := new(cpb.ClusterConfiguration_StorageSecurityPolicy)
+ StorageSecurityPolicyVar(flags, val, name, value, usage)
+ return val
+}
+
+// StorageSecurityPolicyVarPflag defines a StorageSecurityPolicy flag with
+// specified name, default value, and usage string. The argument p points to a
+// StorageSecurityPolicy variable in which to store the value of the flag.
+func StorageSecurityPolicyVarPflag(flags *pflag.FlagSet, p *cpb.ClusterConfiguration_StorageSecurityPolicy, name string, value cpb.ClusterConfiguration_StorageSecurityPolicy, usage string) {
+ *p = value
+ flags.Var((*storageSecurityPolicyValue)(p), name, usage+" (one of: permissive, needs-encryption, needs-encryption-and-authentication, needs-insecure)")
+}
+
+// StorageSecurityPolicyPflag defines a StorageSecurityPolicy flag with
+// specified name, default value, and usage string. The return value is the
+// address of a StorageSecurityPolicy variable that stores the value of the
+// flag.
+func StorageSecurityPolicyPflag(flags *pflag.FlagSet, name string, value cpb.ClusterConfiguration_StorageSecurityPolicy, usage string) *cpb.ClusterConfiguration_StorageSecurityPolicy {
+ val := new(cpb.ClusterConfiguration_StorageSecurityPolicy)
+ StorageSecurityPolicyVarPflag(flags, val, name, value, usage)
+ return val
+}
diff --git a/metropolis/cli/metroctl/BUILD.bazel b/metropolis/cli/metroctl/BUILD.bazel
index 2b6b31a..eed93c2 100644
--- a/metropolis/cli/metroctl/BUILD.bazel
+++ b/metropolis/cli/metroctl/BUILD.bazel
@@ -43,6 +43,7 @@
visibility = ["//visibility:private"],
deps = [
"//go/clitable",
+ "//metropolis/cli/flagdefs",
"//metropolis/cli/metroctl/core",
"//metropolis/node",
"//metropolis/node/core/identity",
diff --git a/metropolis/cli/metroctl/cmd_install.go b/metropolis/cli/metroctl/cmd_install.go
index 992da3e..cf43747 100644
--- a/metropolis/cli/metroctl/cmd_install.go
+++ b/metropolis/cli/metroctl/cmd_install.go
@@ -8,7 +8,6 @@
"log"
"os"
"os/signal"
- "strings"
"github.com/bazelbuild/rules_go/go/runfiles"
"github.com/spf13/cobra"
@@ -16,6 +15,7 @@
"source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
+ "source.monogon.dev/metropolis/cli/flagdefs"
"source.monogon.dev/metropolis/cli/metroctl/core"
"source.monogon.dev/osbase/blkio"
"source.monogon.dev/osbase/fat32"
@@ -31,38 +31,11 @@
// it will try to connect to the cluster which endpoints were provided with
// the --endpoints flag.
var bootstrap = installCmd.PersistentFlags().Bool("bootstrap", false, "Create a bootstrap installer image.")
-var bootstrapTPMMode = installCmd.PersistentFlags().String("bootstrap-tpm-mode", "required", "TPM mode to set on cluster (required, best-effort, disabled)")
-var bootstrapStorageSecurityPolicy = installCmd.PersistentFlags().String("bootstrap-storage-security", "needs-encryption-and-authentication", "Storage security policy to set on cluster (permissive, needs-encryption, needs-encryption-and-authentication, needs-insecure)")
+var bootstrapTPMMode = flagdefs.TPMModePflag(installCmd.PersistentFlags(), "bootstrap-tpm-mode", cpb.ClusterConfiguration_TPM_MODE_REQUIRED, "TPM mode to set on cluster")
+var bootstrapStorageSecurityPolicy = flagdefs.StorageSecurityPolicyPflag(installCmd.PersistentFlags(), "bootstrap-storage-security", cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_ENCRYPTION_AND_AUTHENTICATION, "Storage security policy to set on cluster")
var bundlePath = installCmd.PersistentFlags().StringP("bundle", "b", "", "Path to the Metropolis bundle to be installed")
func makeNodeParams() *api.NodeParameters {
- var tpmMode cpb.ClusterConfiguration_TPMMode
- switch strings.ToLower(*bootstrapTPMMode) {
- case "required", "require":
- tpmMode = cpb.ClusterConfiguration_TPM_MODE_REQUIRED
- case "best-effort", "besteffort":
- tpmMode = cpb.ClusterConfiguration_TPM_MODE_BEST_EFFORT
- case "disabled", "disable":
- tpmMode = cpb.ClusterConfiguration_TPM_MODE_DISABLED
- default:
- log.Fatalf("Invalid --bootstrap-tpm-mode (must be one of: required, best-effort, disabled)")
- }
-
- var bootstrapStorageSecurity cpb.ClusterConfiguration_StorageSecurityPolicy
- switch strings.ToLower(*bootstrapStorageSecurityPolicy) {
- case "permissive":
- bootstrapStorageSecurity = cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_PERMISSIVE
- case "needs-encryption":
- bootstrapStorageSecurity = cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_ENCRYPTION
- case "needs-encryption-and-authentication":
- bootstrapStorageSecurity = cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_ENCRYPTION_AND_AUTHENTICATION
- case "needs-insecure":
- bootstrapStorageSecurity = cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_INSECURE
- default:
-
- log.Fatalf("Invalid --bootstrap-storage-security (must be one of: permissive, needs-encryption, needs-encryption-and-authentication, needs-insecure)")
- }
-
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
if err := os.MkdirAll(flags.configPath, 0700); err != nil && !os.IsExist(err) {
@@ -82,8 +55,8 @@
ClusterBootstrap: &api.NodeParameters_ClusterBootstrap{
OwnerPublicKey: pub,
InitialClusterConfiguration: &cpb.ClusterConfiguration{
- StorageSecurityPolicy: bootstrapStorageSecurity,
- TpmMode: tpmMode,
+ StorageSecurityPolicy: *bootstrapStorageSecurityPolicy,
+ TpmMode: *bootstrapTPMMode,
},
},
},
diff --git a/metropolis/test/launch/cli/launch-cluster/BUILD.bazel b/metropolis/test/launch/cli/launch-cluster/BUILD.bazel
index a952abd..74067a7 100644
--- a/metropolis/test/launch/cli/launch-cluster/BUILD.bazel
+++ b/metropolis/test/launch/cli/launch-cluster/BUILD.bazel
@@ -7,7 +7,10 @@
importpath = "source.monogon.dev/metropolis/test/launch/cli/launch-cluster",
visibility = ["//visibility:private"],
deps = [
+ "//metropolis/cli/flagdefs",
"//metropolis/cli/metroctl/core",
+ "//metropolis/node",
+ "//metropolis/proto/common",
"//metropolis/test/launch",
],
)
diff --git a/metropolis/test/launch/cli/launch-cluster/main.go b/metropolis/test/launch/cli/launch-cluster/main.go
index 695fcc7..cceebd1 100644
--- a/metropolis/test/launch/cli/launch-cluster/main.go
+++ b/metropolis/test/launch/cli/launch-cluster/main.go
@@ -18,36 +18,139 @@
import (
"context"
+ "errors"
+ "flag"
+ "fmt"
"log"
+ "net"
"os"
"os/exec"
"os/signal"
+ "strconv"
+ "strings"
+ "time"
+ "source.monogon.dev/metropolis/cli/flagdefs"
metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
+ "source.monogon.dev/metropolis/node"
+ cpb "source.monogon.dev/metropolis/proto/common"
mlaunch "source.monogon.dev/metropolis/test/launch"
)
-func main() {
- ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
- cl, err := mlaunch.LaunchCluster(ctx, mlaunch.ClusterOptions{
- NumNodes: 3,
- NodeLogsToFiles: true,
+const maxNodes = 256
+
+func nodeSetFlag(p *[]int, name string, usage string) {
+ flag.Func(name, usage, func(val string) error {
+ for _, part := range strings.Split(val, ",") {
+ part = strings.TrimSpace(part)
+ if part == "" {
+ continue
+ }
+ startStr, endStr, ok := strings.Cut(part, "-")
+ if !ok {
+ endStr = startStr
+ }
+ start, err := strconv.Atoi(startStr)
+ if err != nil {
+ return err
+ }
+ end, err := strconv.Atoi(endStr)
+ if err != nil {
+ return err
+ }
+ if end >= maxNodes {
+ return fmt.Errorf("node index %v out of range, there can be at most %v nodes", end, maxNodes)
+ }
+ if end < start {
+ return fmt.Errorf("invalid range %q, end is smaller than start", part)
+ }
+ for i := start; i <= end; i++ {
+ *p = append(*p, i)
+ }
+ }
+ return nil
})
+}
+
+func sizeFlagMiB(p *int, name string, usage string) {
+ flag.Func(name, usage, func(val string) error {
+ multiplier := 1
+ switch {
+ case strings.HasSuffix(val, "M"):
+ case strings.HasSuffix(val, "G"):
+ multiplier = 1024
+ default:
+ return errors.New("must have suffix M for MiB or G for GiB")
+ }
+ intVal, err := strconv.Atoi(val[:len(val)-1])
+ if err != nil {
+ return err
+ }
+ *p = multiplier * intVal
+ return nil
+ })
+}
+
+func main() {
+ clusterConfig := cpb.ClusterConfiguration{}
+ opts := mlaunch.ClusterOptions{
+ NodeLogsToFiles: true,
+ InitialClusterConfiguration: &clusterConfig,
+ }
+ var consensusMemberList, kubernetesControllerList, kubernetesWorkerList []int
+
+ flag.IntVar(&opts.NumNodes, "num-nodes", 3, "Number of cluster nodes")
+ flagdefs.TPMModeVar(flag.CommandLine, &clusterConfig.TpmMode, "tpm-mode", cpb.ClusterConfiguration_TPM_MODE_REQUIRED, "TPM mode to set on cluster")
+ flagdefs.StorageSecurityPolicyVar(flag.CommandLine, &clusterConfig.StorageSecurityPolicy, "storage-security", cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_INSECURE, "Storage security policy to set on cluster")
+ flag.IntVar(&opts.Node.CPUs, "cpu", 1, "Number of virtual CPUs of each node")
+ flag.IntVar(&opts.Node.ThreadsPerCPU, "threads-per-cpu", 1, "Number of threads per CPU")
+ sizeFlagMiB(&opts.Node.MemoryMiB, "ram", "RAM size of each node, with suffix M for MiB or G for GiB")
+ nodeSetFlag(&consensusMemberList, "consensus-member", "List of nodes which get the Consensus Member role. Example: 0,3-5")
+ nodeSetFlag(&kubernetesControllerList, "kubernetes-controller", "List of nodes which get the Kubernetes Controller role. Example: 0,3-5")
+ nodeSetFlag(&kubernetesWorkerList, "kubernetes-worker", "List of nodes which get the Kubernetes Worker role. Example: 0,3-5")
+ flag.Parse()
+
+ if opts.NumNodes >= maxNodes {
+ log.Fatalf("num-nodes (%v) is too large, there can be at most %v nodes", opts.NumNodes, maxNodes)
+ }
+ for _, list := range [][]int{consensusMemberList, kubernetesControllerList, kubernetesWorkerList} {
+ for i := len(list) - 1; i >= 0; i-- {
+ if list[i] >= opts.NumNodes {
+ log.Fatalf("Node index %v out of range, can be at most %v", list[i], opts.NumNodes-1)
+ }
+ }
+ }
+
+ ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
+ cl, err := mlaunch.LaunchCluster(ctx, opts)
if err != nil {
log.Fatalf("LaunchCluster: %v", err)
}
+ for _, node := range consensusMemberList {
+ cl.MakeConsensusMember(ctx, cl.NodeIDs[node])
+ }
+ for _, node := range kubernetesControllerList {
+ cl.MakeKubernetesController(ctx, cl.NodeIDs[node])
+ }
+ for _, node := range kubernetesWorkerList {
+ cl.MakeKubernetesWorker(ctx, cl.NodeIDs[node])
+ }
+
wpath, err := cl.MakeMetroctlWrapper()
if err != nil {
log.Fatalf("MakeWrapper: %v", err)
}
- apiservers, err := cl.KubernetesControllerNodeAddresses(ctx)
- if err != nil {
- log.Fatalf("Could not get Kubernetes controller nodes: %v", err)
- }
- if len(apiservers) < 1 {
- log.Fatalf("Cluster has no Kubernetes controller nodes")
+ apiserver := cl.Nodes[cl.NodeIDs[0]].ManagementAddress
+ // Wait for the API server to start listening.
+ for {
+ conn, err := cl.DialNode(ctx, net.JoinHostPort(apiserver, node.KubernetesAPIWrappedPort.PortString()))
+ if err == nil {
+ conn.Close()
+ break
+ }
+ time.Sleep(100 * time.Millisecond)
}
// If the user has metroctl in their path, use the metroctl from path as
@@ -62,7 +165,7 @@
}
configName := "launch-cluster"
- if err := metroctl.InstallKubeletConfig(ctx, metroctlPath, cl.ConnectOptions(), configName, apiservers[0]); err != nil {
+ if err := metroctl.InstallKubeletConfig(ctx, metroctlPath, cl.ConnectOptions(), configName, apiserver); err != nil {
log.Fatalf("InstallKubeletConfig: %v", err)
}
diff --git a/metropolis/test/launch/cluster.go b/metropolis/test/launch/cluster.go
index 98ab760..1435925 100644
--- a/metropolis/test/launch/cluster.go
+++ b/metropolis/test/launch/cluster.go
@@ -62,6 +62,16 @@
// Name is a human-readable identifier to be used in debug output.
Name string
+ // CPUs is the number of virtual CPUs of the VM.
+ CPUs int
+
+ // ThreadsPerCPU is the number of threads per CPU. This is multiplied by
+ // CPUs to get the total number of threads.
+ ThreadsPerCPU int
+
+ // MemoryMiB is the RAM size in MiB of the VM.
+ MemoryMiB int
+
// Ports contains the port mapping where to expose the internal ports of the VM to
// the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
// ConnectToSocket is set.
@@ -215,6 +225,16 @@
// swtpm and qemu because we run into UNIX socket length limitations (for legacy
// reasons 108 chars).
+ if options.CPUs == 0 {
+ options.CPUs = 1
+ }
+ if options.ThreadsPerCPU == 0 {
+ options.ThreadsPerCPU = 1
+ }
+ if options.MemoryMiB == 0 {
+ options.MemoryMiB = 2048
+ }
+
// If it's the node's first start, set up its runtime directories.
if options.Runtime == nil {
r, err := setupRuntime(ld, sd)
@@ -262,8 +282,13 @@
fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
storagePath := filepath.Join(r.ld, "image.qcow2")
qemuArgs := []string{
- "-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "2048",
- "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
+ "-machine", "q35",
+ "-accel", "kvm",
+ "-nographic",
+ "-nodefaults",
+ "-cpu", "host",
+ "-m", fmt.Sprintf("%dM", options.MemoryMiB),
+ "-smp", fmt.Sprintf("cores=%d,threads=%d", options.CPUs, options.ThreadsPerCPU),
"-drive", "if=pflash,format=raw,readonly=on,file=" + xOvmfCodePath,
"-drive", "if=pflash,format=raw,file=" + fwVarPath,
"-drive", "if=virtio,format=qcow2,cache=unsafe,file=" + storagePath,
@@ -520,6 +545,9 @@
// The number of nodes this cluster should be started with.
NumNodes int
+ // Node are default options of all nodes.
+ Node NodeOptions
+
// If true, node logs will be saved to individual files instead of being printed
// out to stderr. The path of these files will be still printed to stdout.
//
@@ -704,6 +732,32 @@
return nil, errors.New("refusing to start cluster with zero nodes")
}
+ // Prepare the node options. These will be kept as part of Cluster.
+ // nodeOpts[].Runtime will be initialized by LaunchNode during the first
+ // launch. The runtime information can be later used to restart a node.
+ // The 0th node will be initialized first. The rest will follow after it
+ // had bootstrapped the cluster.
+ nodeOpts := make([]NodeOptions, opts.NumNodes)
+ for i := range opts.NumNodes {
+ nodeOpts[i] = opts.Node
+ nodeOpts[i].Name = fmt.Sprintf("node%d", i)
+ nodeOpts[i].SerialPort = newPrefixedStdio(i)
+ }
+ nodeOpts[0].NodeParameters = &apb.NodeParameters{
+ Cluster: &apb.NodeParameters_ClusterBootstrap_{
+ ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
+ OwnerPublicKey: InsecurePublicKey,
+ InitialClusterConfiguration: opts.InitialClusterConfiguration,
+ Labels: &cpb.NodeLabels{
+ Pairs: []*cpb.NodeLabels_Pair{
+ {Key: nodeNumberKey, Value: "0"},
+ },
+ },
+ },
+ },
+ }
+ nodeOpts[0].PcapDump = true
+
// Create the launch directory.
ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
if err != nil {
@@ -730,14 +784,13 @@
// Prepare links between nodes and nanoswitch.
var switchPorts []*os.File
- var vmPorts []*os.File
- for i := 0; i < opts.NumNodes; i++ {
+ for i := range opts.NumNodes {
switchPort, vmPort, err := launch.NewSocketPair()
if err != nil {
return nil, fmt.Errorf("failed to get socketpair: %w", err)
}
switchPorts = append(switchPorts, switchPort)
- vmPorts = append(vmPorts, vmPort)
+ nodeOpts[i].ConnectToSocket = vmPort
}
// Make a list of channels that will be populated by all running node qemu
@@ -747,39 +800,16 @@
done[i] = make(chan error, 1)
}
- // Prepare the node options. These will be kept as part of Cluster.
- // nodeOpts[].Runtime will be initialized by LaunchNode during the first
- // launch. The runtime information can be later used to restart a node.
- // The 0th node will be initialized first. The rest will follow after it
- // had bootstrapped the cluster.
- nodeOpts := make([]NodeOptions, opts.NumNodes)
- nodeOpts[0] = NodeOptions{
- Name: "node0",
- ConnectToSocket: vmPorts[0],
- NodeParameters: &apb.NodeParameters{
- Cluster: &apb.NodeParameters_ClusterBootstrap_{
- ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
- OwnerPublicKey: InsecurePublicKey,
- InitialClusterConfiguration: opts.InitialClusterConfiguration,
- Labels: &cpb.NodeLabels{
- Pairs: []*cpb.NodeLabels_Pair{
- {Key: nodeNumberKey, Value: "0"},
- },
- },
- },
- },
- },
- SerialPort: newPrefixedStdio(0),
- PcapDump: true,
- }
if opts.NodeLogsToFiles {
- path := path.Join(ld, "node-0.txt")
- port, err := NewSerialFileLogger(path)
- if err != nil {
- return nil, fmt.Errorf("could not open log file for node 0: %w", err)
+ for i := range opts.NumNodes {
+ path := path.Join(ld, fmt.Sprintf("node-%d.txt", i))
+ port, err := NewSerialFileLogger(path)
+ if err != nil {
+ return nil, fmt.Errorf("could not open log file for node %d: %w", i, err)
+ }
+ launch.Log("Node %d logs at %s", i, path)
+ nodeOpts[i].SerialPort = port
}
- launch.Log("Node 0 logs at %s", path)
- nodeOpts[0].SerialPort = port
}
// Start the first node.
@@ -939,33 +969,19 @@
// Use the retrieved information to configure the rest of the node options.
for i := 1; i < opts.NumNodes; i++ {
- nodeOpts[i] = NodeOptions{
- Name: fmt.Sprintf("node%d", i),
- ConnectToSocket: vmPorts[i],
- NodeParameters: &apb.NodeParameters{
- Cluster: &apb.NodeParameters_ClusterRegister_{
- ClusterRegister: &apb.NodeParameters_ClusterRegister{
- RegisterTicket: ticket,
- ClusterDirectory: resI.ClusterDirectory,
- CaCertificate: resI.CaCertificate,
- Labels: &cpb.NodeLabels{
- Pairs: []*cpb.NodeLabels_Pair{
- {Key: nodeNumberKey, Value: fmt.Sprintf("%d", i)},
- },
+ nodeOpts[i].NodeParameters = &apb.NodeParameters{
+ Cluster: &apb.NodeParameters_ClusterRegister_{
+ ClusterRegister: &apb.NodeParameters_ClusterRegister{
+ RegisterTicket: ticket,
+ ClusterDirectory: resI.ClusterDirectory,
+ CaCertificate: resI.CaCertificate,
+ Labels: &cpb.NodeLabels{
+ Pairs: []*cpb.NodeLabels_Pair{
+ {Key: nodeNumberKey, Value: fmt.Sprintf("%d", i)},
},
},
},
},
- SerialPort: newPrefixedStdio(i),
- }
- if opts.NodeLogsToFiles {
- path := path.Join(ld, fmt.Sprintf("node-%d.txt", i))
- port, err := NewSerialFileLogger(path)
- if err != nil {
- return nil, fmt.Errorf("could not open log file for node %d: %w", i, err)
- }
- launch.Log("Node %d logs at %s", i, path)
- nodeOpts[i].SerialPort = port
}
}
@@ -979,7 +995,7 @@
}
// Wait for nodes to appear as NEW, populate a map from node number (index into
- // NodeOpts, etc.) to Metropolis Node ID.
+ // nodeOpts, etc.) to Metropolis Node ID.
seenNodes := make(map[string]bool)
nodeNumberToID := make(map[int]string)
launch.Log("Cluster: waiting for nodes to appear as NEW...")
@@ -1390,6 +1406,25 @@
return err
}
+// MakeKubernetesController adds the KubernetesController role to a node by ID.
+func (c *Cluster) MakeKubernetesController(ctx context.Context, id string) error {
+ curC, err := c.CuratorClient()
+ if err != nil {
+ return err
+ }
+ mgmt := apb.NewManagementClient(curC)
+
+ tr := true
+ launch.Log("Cluster: %s: adding KubernetesController", id)
+ _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
+ Node: &apb.UpdateNodeRolesRequest_Id{
+ Id: id,
+ },
+ KubernetesController: &tr,
+ })
+ return err
+}
+
// MakeConsensusMember adds the ConsensusMember role to a node by ID.
func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
curC, err := c.CuratorClient()