blob: b1bb072d602e45fb4ae6eb31172d8e93ca857362 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020019 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "os"
21 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010022 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "path/filepath"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020024 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020025 "syscall"
26 "time"
27
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +010028 "github.com/bazelbuild/rules_go/go/runfiles"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020031 "golang.org/x/net/proxy"
Lorenz Brun87bbf7e2024-03-18 18:22:25 +010032 "golang.org/x/sys/unix"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010034 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020037 "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020039
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020040 apb "source.monogon.dev/metropolis/proto/api"
41 cpb "source.monogon.dev/metropolis/proto/common"
42
Serge Bazanski1f8cad72023-03-20 16:58:10 +010043 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Serge Bazanski66e58952021-10-05 17:06:56 +020044 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020045 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020046 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020047 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Lorenz Brun150f24a2023-07-13 20:11:06 +020048 "source.monogon.dev/metropolis/pkg/localregistry"
Serge Bazanski66e58952021-10-05 17:06:56 +020049 "source.monogon.dev/metropolis/test/launch"
50)
51
Leopold20a036e2023-01-15 00:17:19 +010052// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020053type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010054 // Name is a human-readable identifier to be used in debug output.
55 Name string
56
Serge Bazanski66e58952021-10-05 17:06:56 +020057 // Ports contains the port mapping where to expose the internal ports of the VM to
58 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
59 // ConnectToSocket is set.
60 Ports launch.PortMap
61
Leopold20a036e2023-01-15 00:17:19 +010062 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
63 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020064 // want to test reboot behavior this should be false.
65 AllowReboot bool
66
Leopold20a036e2023-01-15 00:17:19 +010067 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
68 // set, it is instead connected to the given file descriptor/socket. If this is
69 // set, all port maps from the Ports option are ignored. Intended for networking
70 // this instance together with others for running more complex network
71 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020072 ConnectToSocket *os.File
73
Leopoldacfad5b2023-01-15 14:05:25 +010074 // When PcapDump is set, all traffic is dumped to a pcap file in the
75 // runtime directory (e.g. "net0.pcap" for the first interface).
76 PcapDump bool
77
Leopold20a036e2023-01-15 00:17:19 +010078 // SerialPort is an io.ReadWriter over which you can communicate with the serial
79 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020080 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
81 SerialPort io.ReadWriter
82
83 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
84 // registering into a cluster.
85 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020086
87 // Mac is the node's MAC address.
88 Mac *net.HardwareAddr
89
90 // Runtime keeps the node's QEMU runtime state.
91 Runtime *NodeRuntime
92}
93
Leopold20a036e2023-01-15 00:17:19 +010094// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020095type NodeRuntime struct {
96 // ld points at the node's launch directory storing data such as storage
97 // images, firmware variables or the TPM state.
98 ld string
99 // sd points at the node's socket directory.
100 sd string
101
102 // ctxT is the context QEMU will execute in.
103 ctxT context.Context
104 // CtxC is the QEMU context's cancellation function.
105 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200106}
107
108// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200109var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200110 node.ConsensusPort,
111
112 node.CuratorServicePort,
113 node.DebugServicePort,
114
115 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100116 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200117 node.CuratorServicePort,
118 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200119 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200120}
121
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200122// setupRuntime creates the node's QEMU runtime directory, together with all
123// files required to preserve its state, a level below the chosen path ld. The
124// node's socket directory is similarily created a level below sd. It may
125// return an I/O error.
126func setupRuntime(ld, sd string) (*NodeRuntime, error) {
127 // Create a temporary directory to keep all the runtime files.
128 stdp, err := os.MkdirTemp(ld, "node_state*")
129 if err != nil {
130 return nil, fmt.Errorf("failed to create the state directory: %w", err)
131 }
132
133 // Initialize the node's storage with a prebuilt image.
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100134 si, err := runfiles.Rlocation("_main/metropolis/node/image.img")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200135 if err != nil {
136 return nil, fmt.Errorf("while resolving a path: %w", err)
137 }
138 di := filepath.Join(stdp, filepath.Base(si))
Serge Bazanski05f813b2023-03-16 17:58:39 +0100139 launch.Log("Cluster: copying node image: %s -> %s", si, di)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200140 if err := copyFile(si, di); err != nil {
141 return nil, fmt.Errorf("while copying the node image: %w", err)
142 }
143
144 // Initialize the OVMF firmware variables file.
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100145 sv, err := runfiles.Rlocation("edk2/OVMF_VARS.fd")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200146 if err != nil {
147 return nil, fmt.Errorf("while resolving a path: %w", err)
148 }
149 dv := filepath.Join(stdp, filepath.Base(sv))
150 if err := copyFile(sv, dv); err != nil {
151 return nil, fmt.Errorf("while copying firmware variables: %w", err)
152 }
153
154 // Create the TPM state directory and initialize all files required by swtpm.
155 tpmt := filepath.Join(stdp, "tpm")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200156 if err := os.Mkdir(tpmt, 0o755); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200157 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
158 }
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100159 tpms, err := runfiles.Rlocation("_main/metropolis/node/tpm")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200160 if err != nil {
161 return nil, fmt.Errorf("while resolving a path: %w", err)
162 }
163 tpmf, err := os.ReadDir(tpms)
164 if err != nil {
165 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
166 }
167 for _, file := range tpmf {
168 name := file.Name()
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100169 src, err := runfiles.Rlocation(filepath.Join(tpms, name))
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200170 if err != nil {
171 return nil, fmt.Errorf("while resolving a path: %w", err)
172 }
173 tgt := filepath.Join(tpmt, name)
174 if err := copyFile(src, tgt); err != nil {
175 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
176 }
177 }
178
179 // Create the socket directory.
180 sotdp, err := os.MkdirTemp(sd, "node_sock*")
181 if err != nil {
182 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
183 }
184
185 return &NodeRuntime{
186 ld: stdp,
187 sd: sotdp,
188 }, nil
189}
190
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200191// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200192// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200193func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200194 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200195 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200196 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100197 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200198 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200199 for _, n := range c.NodeIDs {
200 ep, err := resolver.NodeWithDefaultPort(n)
201 if err != nil {
202 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
203 }
204 r.AddEndpoint(ep)
205 }
206 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
207 grpc.WithTransportCredentials(authCreds),
208 grpc.WithResolvers(r),
209 grpc.WithContextDialer(c.DialNode),
210 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200211 if err != nil {
212 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
213 }
214 c.authClient = authClient
215 }
216 return c.authClient, nil
217}
218
Serge Bazanski66e58952021-10-05 17:06:56 +0200219// LaunchNode launches a single Metropolis node instance with the given options.
220// The instance runs mostly paravirtualized but with some emulated hardware
221// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200222// writable, and the changes are kept across reboots and shutdowns. ld and sd
223// point to the launch directory and the socket directory, holding the nodes'
224// state files (storage, tpm state, firmware state), and UNIX socket files
225// (swtpm <-> QEMU interplay) respectively. The directories must exist before
226// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
227// if either are not initialized.
228func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
229 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
230 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200231 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
232 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
233 // that we open and pass the name of it to QEMU. Not pinning this crashes both
234 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
235 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200236
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200237 // If it's the node's first start, set up its runtime directories.
238 if options.Runtime == nil {
239 r, err := setupRuntime(ld, sd)
240 if err != nil {
241 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200242 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200243 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200244 }
245
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200246 // Replace the node's context with a new one.
247 r := options.Runtime
248 if r.CtxC != nil {
249 r.CtxC()
250 }
251 r.ctxT, r.CtxC = context.WithCancel(ctx)
252
Serge Bazanski66e58952021-10-05 17:06:56 +0200253 var qemuNetType string
254 var qemuNetConfig launch.QemuValue
255 if options.ConnectToSocket != nil {
256 qemuNetType = "socket"
257 qemuNetConfig = launch.QemuValue{
258 "id": {"net0"},
259 "fd": {"3"},
260 }
261 } else {
262 qemuNetType = "user"
263 qemuNetConfig = launch.QemuValue{
264 "id": {"net0"},
265 "net": {"10.42.0.0/24"},
266 "dhcpstart": {"10.42.0.10"},
267 "hostfwd": options.Ports.ToQemuForwards(),
268 }
269 }
270
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200271 // Generate the node's MAC address if it isn't already set in NodeOptions.
272 if options.Mac == nil {
273 mac, err := generateRandomEthernetMAC()
274 if err != nil {
275 return err
276 }
277 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200278 }
279
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100280 ovmfCodePath, err := runfiles.Rlocation("edk2/OVMF_CODE.fd")
281 if err != nil {
282 return err
283 }
284
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200285 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
286 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Lorenz Brun1dc60af2023-10-03 15:40:09 +0200287 storagePath := filepath.Join(r.ld, "image.img")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200288 qemuArgs := []string{
289 "-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
Serge Bazanski66e58952021-10-05 17:06:56 +0200290 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100291 "-drive", "if=pflash,format=raw,readonly=on,file=" + ovmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200292 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
293 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200294 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200295 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200296 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
297 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
298 "-device", "tpm-tis,tpmdev=tpm0",
299 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200300 "-serial", "stdio",
301 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200302
303 if !options.AllowReboot {
304 qemuArgs = append(qemuArgs, "-no-reboot")
305 }
306
307 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200308 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200309 parametersRaw, err := proto.Marshal(options.NodeParameters)
310 if err != nil {
311 return fmt.Errorf("failed to encode node paraeters: %w", err)
312 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200313 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200314 return fmt.Errorf("failed to write node parameters: %w", err)
315 }
316 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
317 }
318
Leopoldacfad5b2023-01-15 14:05:25 +0100319 if options.PcapDump {
320 var qemuNetDump launch.QemuValue
321 pcapPath := filepath.Join(r.ld, "net0.pcap")
322 if options.PcapDump {
323 qemuNetDump = launch.QemuValue{
324 "id": {"net0"},
325 "netdev": {"net0"},
326 "file": {pcapPath},
327 }
328 }
329 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
330 }
331
Serge Bazanski66e58952021-10-05 17:06:56 +0200332 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200333 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200334 defer tpmCancel()
335
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200336 tpmd := filepath.Join(r.ld, "tpm")
337 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200338 tpmEmuCmd.Stderr = os.Stderr
339 tpmEmuCmd.Stdout = os.Stdout
340
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100341 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200342 if err != nil {
343 return fmt.Errorf("failed to start TPM emulator: %w", err)
344 }
345
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200346 // Wait for the socket to be created by the TPM emulator before launching
347 // QEMU.
348 for {
349 _, err := os.Stat(tpmSocketPath)
350 if err == nil {
351 break
352 }
353 if err != nil && !os.IsNotExist(err) {
354 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
355 }
356 if err := tpmCtx.Err(); err != nil {
357 return fmt.Errorf("while waiting for the TPM socket: %w", err)
358 }
359 time.Sleep(time.Millisecond * 100)
360 }
361
Serge Bazanski66e58952021-10-05 17:06:56 +0200362 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200363 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200364 if options.ConnectToSocket != nil {
365 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
366 }
367
368 var stdErrBuf bytes.Buffer
369 systemCmd.Stderr = &stdErrBuf
370 systemCmd.Stdout = options.SerialPort
371
Leopoldaf5086b2023-01-15 14:12:42 +0100372 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
373
Serge Bazanski66e58952021-10-05 17:06:56 +0200374 err = systemCmd.Run()
375
376 // Stop TPM emulator and wait for it to exit to properly reap the child process
377 tpmCancel()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100378 launch.Log("Node: Waiting for TPM emulator to exit")
Serge Bazanski66e58952021-10-05 17:06:56 +0200379 // Wait returns a SIGKILL error because we just cancelled its context.
380 // We still need to call it to avoid creating zombies.
381 _ = tpmEmuCmd.Wait()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100382 launch.Log("Node: TPM emulator done")
Serge Bazanski66e58952021-10-05 17:06:56 +0200383
384 var exerr *exec.ExitError
385 if err != nil && errors.As(err, &exerr) {
386 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
387 if status.Signaled() && status.Signal() == syscall.SIGKILL {
388 // Process was killed externally (most likely by our context being canceled).
389 // This is a normal exit for us, so return nil
390 return nil
391 }
392 exerr.Stderr = stdErrBuf.Bytes()
393 newErr := launch.QEMUError(*exerr)
394 return &newErr
395 }
396 return err
397}
398
399func copyFile(src, dst string) error {
400 in, err := os.Open(src)
401 if err != nil {
402 return fmt.Errorf("when opening source: %w", err)
403 }
404 defer in.Close()
405
406 out, err := os.Create(dst)
407 if err != nil {
408 return fmt.Errorf("when creating destination: %w", err)
409 }
410 defer out.Close()
411
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100412 endPos, err := in.Seek(0, io.SeekEnd)
Serge Bazanski66e58952021-10-05 17:06:56 +0200413 if err != nil {
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100414 return fmt.Errorf("when getting source end: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200415 }
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100416
417 // Copy the file while preserving its sparseness. The image files are very
418 // sparse (less than 10% allocated), so this is a lot faster.
419 var lastHoleStart int64
420 for {
421 dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
422 if err != nil {
423 return fmt.Errorf("when seeking to next data block: %w", err)
424 }
425 holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
426 if err != nil {
427 return fmt.Errorf("when seeking to next hole: %w", err)
428 }
429 lastHoleStart = holeStart
430 if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
431 return fmt.Errorf("when seeking to current data block: %w", err)
432 }
433 if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
434 return fmt.Errorf("when seeking output to next data block: %w", err)
435 }
436 if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
437 return fmt.Errorf("when copying file: %w", err)
438 }
439 if endPos == holeStart {
440 // The next hole is at the end of the file, we're done here.
441 break
442 }
443 }
444
Serge Bazanski66e58952021-10-05 17:06:56 +0200445 return out.Close()
446}
447
Serge Bazanskie78a0892021-10-07 17:03:49 +0200448// getNodes wraps around Management.GetNodes to return a list of nodes in a
449// cluster.
450func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200451 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100452 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100453 err := backoff.Retry(func() error {
454 res = nil
455 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200456 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100457 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200458 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100459 for {
460 node, err := srvN.Recv()
461 if err == io.EOF {
462 break
463 }
464 if err != nil {
465 return fmt.Errorf("GetNodes.Recv: %w", err)
466 }
467 res = append(res, node)
468 }
469 return nil
470 }, bo)
471 if err != nil {
472 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200473 }
474 return res, nil
475}
476
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200477// getNode wraps Management.GetNodes. It returns node information matching
478// given node ID.
479func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
480 nodes, err := getNodes(ctx, mgmt)
481 if err != nil {
482 return nil, fmt.Errorf("could not get nodes: %w", err)
483 }
484 for _, n := range nodes {
485 eid := identity.NodeID(n.Pubkey)
486 if eid != id {
487 continue
488 }
489 return n, nil
490 }
491 return nil, fmt.Errorf("no such node.")
492}
493
Serge Bazanski66e58952021-10-05 17:06:56 +0200494// Gets a random EUI-48 Ethernet MAC address
495func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
496 macBuf := make([]byte, 6)
497 _, err := rand.Read(macBuf)
498 if err != nil {
499 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
500 }
501
502 // Set U/L bit and clear I/G bit (locally administered individual MAC)
503 // Ref IEEE 802-2014 Section 8.2.2
504 macBuf[0] = (macBuf[0] | 2) & 0xfe
505 mac := net.HardwareAddr(macBuf)
506 return &mac, nil
507}
508
Serge Bazanskibe742842022-04-04 13:18:50 +0200509const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200510
Serge Bazanskibe742842022-04-04 13:18:50 +0200511// ClusterPorts contains all ports handled by Nanoswitch.
512var ClusterPorts = []uint16{
513 // Forwarded to the first node.
514 uint16(node.CuratorServicePort),
515 uint16(node.DebugServicePort),
516 uint16(node.KubernetesAPIPort),
517 uint16(node.KubernetesAPIWrappedPort),
518
519 // SOCKS proxy to the switch network
520 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200521}
522
523// ClusterOptions contains all options for launching a Metropolis cluster.
524type ClusterOptions struct {
525 // The number of nodes this cluster should be started with.
526 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100527
528 // If true, node logs will be saved to individual files instead of being printed
529 // out to stderr. The path of these files will be still printed to stdout.
530 //
531 // The files will be located within the launch directory inside TEST_TMPDIR (or
532 // the default tempdir location, if not set).
533 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200534
535 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
536 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
537 // incomplete.
538 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200539
540 // Optional local registry which will be made available to the cluster to
541 // pull images from. This is a more efficient alternative to preseeding all
542 // images used for testing.
543 LocalRegistry *localregistry.Server
Serge Bazanski66e58952021-10-05 17:06:56 +0200544}
545
546// Cluster is the running Metropolis cluster launched using the LaunchCluster
547// function.
548type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200549 // Owner is the TLS Certificate of the owner of the test cluster. This can be
550 // used to authenticate further clients to the running cluster.
551 Owner tls.Certificate
552 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200553 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200554 Ports launch.PortMap
555
Serge Bazanskibe742842022-04-04 13:18:50 +0200556 // Nodes is a map from Node ID to its runtime information.
557 Nodes map[string]*NodeInCluster
558 // NodeIDs is a list of node IDs that are backing this cluster, in order of
559 // creation.
560 NodeIDs []string
561
Serge Bazanski54e212a2023-06-14 13:45:11 +0200562 // CACertificate is the cluster's CA certificate.
563 CACertificate *x509.Certificate
564
Serge Bazanski66e58952021-10-05 17:06:56 +0200565 // nodesDone is a list of channels populated with the return codes from all the
566 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100567 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200568 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200569 // nodeOpts are the cluster member nodes' mutable launch options, kept here
570 // to facilitate reboots.
571 nodeOpts []NodeOptions
572 // launchDir points at the directory keeping the nodes' state, such as storage
573 // images, firmware variable files, TPM state.
574 launchDir string
575 // socketDir points at the directory keeping UNIX socket files, such as these
576 // used to facilitate communication between QEMU and swtpm. It's different
577 // from launchDir, and anchored nearer the file system root, due to the
578 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100579 socketDir string
580 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200581
Lorenz Brun276a7462023-07-12 21:28:54 +0200582 // SOCKSDialer is used by DialNode to establish connections to nodes via the
Serge Bazanskibe742842022-04-04 13:18:50 +0200583 // SOCKS server ran by nanoswitch.
Lorenz Brun276a7462023-07-12 21:28:54 +0200584 SOCKSDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200585
586 // authClient is a cached authenticated owner connection to a Curator
587 // instance within the cluster.
588 authClient *grpc.ClientConn
589
590 // ctxT is the context individual node contexts are created from.
591 ctxT context.Context
592 // ctxC is used by Close to cancel the context under which the nodes are
593 // running.
594 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200595}
596
597// NodeInCluster represents information about a node that's part of a Cluster.
598type NodeInCluster struct {
599 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200600 ID string
601 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200602 // Address of the node on the network ran by nanoswitch. Not reachable from the
603 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
604 // on Cluster.Ports[SOCKSPort]).
605 ManagementAddress string
606}
607
608// firstConnection performs the initial owner credential escrow with a newly
609// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
610// running at 10.1.0.2, which is always the case with the current nanoswitch
611// implementation.
612//
Leopold20a036e2023-01-15 00:17:19 +0100613// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200614// information as NodeInCluster.
615func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
616 // Dial external service.
617 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100618 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200619 if err != nil {
620 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
621 }
622 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
623 return socksDialer.Dial("tcp", addr)
624 }
625 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
626 if err != nil {
627 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
628 }
629 defer initClient.Close()
630
631 // Retrieve owner certificate - this can take a while because the node is still
632 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100633 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200634 aaa := apb.NewAAAClient(initClient)
635 var cert *tls.Certificate
636 err = backoff.Retry(func() error {
637 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
638 if st, ok := status.FromError(err); ok {
639 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100640 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200641 return err
642 }
643 }
644 return backoff.Permanent(err)
645 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
646 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200647 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200648 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100649 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200650
651 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200652 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200653 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
654 if err != nil {
655 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
656 }
657 defer authClient.Close()
658 mgmt := apb.NewManagementClient(authClient)
659
660 var node *NodeInCluster
661 err = backoff.Retry(func() error {
662 nodes, err := getNodes(ctx, mgmt)
663 if err != nil {
664 return fmt.Errorf("retrieving nodes failed: %w", err)
665 }
666 if len(nodes) != 1 {
667 return fmt.Errorf("expected one node, got %d", len(nodes))
668 }
669 n := nodes[0]
670 if n.Status == nil || n.Status.ExternalAddress == "" {
671 return fmt.Errorf("node has no status and/or address")
672 }
673 node = &NodeInCluster{
674 ID: identity.NodeID(n.Pubkey),
675 ManagementAddress: n.Status.ExternalAddress,
676 }
677 return nil
678 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
679 if err != nil {
680 return nil, nil, err
681 }
682
683 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200684}
685
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100686func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200687 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100688 if err != nil {
689 return nil, err
690 }
691 return f, nil
692}
693
Serge Bazanski66e58952021-10-05 17:06:56 +0200694// LaunchCluster launches a cluster of Metropolis node VMs together with a
695// Nanoswitch instance to network them all together.
696//
697// The given context will be used to run all qemu instances in the cluster, and
698// canceling the context or calling Close() will terminate them.
699func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200700 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200701 return nil, errors.New("refusing to start cluster with zero nodes")
702 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200703
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200704 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100705 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200706 if err != nil {
707 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
708 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100709 // Create the metroctl config directory. We keep it in /tmp because in some
710 // scenarios it's end-user visible and we want it short.
711 md, err := os.MkdirTemp("/tmp", "metroctl-*")
712 if err != nil {
713 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
714 }
715
716 // Create the socket directory. We keep it in /tmp because of socket path limits.
717 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200718 if err != nil {
719 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
720 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200721
722 // Prepare links between nodes and nanoswitch.
723 var switchPorts []*os.File
724 var vmPorts []*os.File
725 for i := 0; i < opts.NumNodes; i++ {
726 switchPort, vmPort, err := launch.NewSocketPair()
727 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200728 return nil, fmt.Errorf("failed to get socketpair: %w", err)
729 }
730 switchPorts = append(switchPorts, switchPort)
731 vmPorts = append(vmPorts, vmPort)
732 }
733
Serge Bazanskie78a0892021-10-07 17:03:49 +0200734 // Make a list of channels that will be populated by all running node qemu
735 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200736 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200737 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200738 done[i] = make(chan error, 1)
739 }
740
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200741 // Prepare the node options. These will be kept as part of Cluster.
742 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
743 // launch. The runtime information can be later used to restart a node.
744 // The 0th node will be initialized first. The rest will follow after it
745 // had bootstrapped the cluster.
746 nodeOpts := make([]NodeOptions, opts.NumNodes)
747 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100748 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200749 ConnectToSocket: vmPorts[0],
750 NodeParameters: &apb.NodeParameters{
751 Cluster: &apb.NodeParameters_ClusterBootstrap_{
752 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
753 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200754 },
755 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200756 },
757 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100758 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200759 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100760 if opts.NodeLogsToFiles {
761 path := path.Join(ld, "node-1.txt")
762 port, err := NewSerialFileLogger(path)
763 if err != nil {
764 return nil, fmt.Errorf("could not open log file for node 1: %w", err)
765 }
766 launch.Log("Node 1 logs at %s", path)
767 nodeOpts[0].SerialPort = port
768 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200769
770 // Start the first node.
771 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100772 launch.Log("Cluster: Starting node %d...", 1)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200773 go func() {
774 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200775 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100776 launch.Log("Node %d finished with an error: %v", 1, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200777 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200778 done[0] <- err
779 }()
780
Lorenz Brun150f24a2023-07-13 20:11:06 +0200781 localRegistryAddr := net.TCPAddr{
782 IP: net.IPv4(10, 42, 0, 82),
783 Port: 5000,
784 }
785
786 var guestSvcMap launch.GuestServiceMap
787 if opts.LocalRegistry != nil {
788 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
789 if err != nil {
790 ctxC()
791 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
792 }
793 s := http.Server{
794 Handler: opts.LocalRegistry,
795 }
796 go s.Serve(l)
797 go func() {
798 <-ctxT.Done()
799 s.Close()
800 }()
801 guestSvcMap = launch.GuestServiceMap{
802 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
803 }
804 }
805
Serge Bazanskie78a0892021-10-07 17:03:49 +0200806 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200807 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
808 if err != nil {
809 ctxC()
810 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
811 }
812
813 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100814 var serialPort io.ReadWriter
815 if opts.NodeLogsToFiles {
816 path := path.Join(ld, "nanoswitch.txt")
817 serialPort, err = NewSerialFileLogger(path)
818 if err != nil {
819 launch.Log("Could not open log file for nanoswitch: %v", err)
820 }
821 launch.Log("Nanoswitch logs at %s", path)
822 } else {
823 serialPort = newPrefixedStdio(99)
824 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200825 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100826 Name: "nanoswitch",
Serge Bazanski66e58952021-10-05 17:06:56 +0200827 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brun62f1d362023-11-14 16:18:24 +0100828 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.zst",
Serge Bazanski66e58952021-10-05 17:06:56 +0200829 ExtraNetworkInterfaces: switchPorts,
830 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200831 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100832 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100833 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200834 }); err != nil {
835 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100836 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200837 }
838 }
839 }()
840
Serge Bazanskibe742842022-04-04 13:18:50 +0200841 // Build SOCKS dialer.
842 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
843 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200844 if err != nil {
845 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200846 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200847 }
848
Serge Bazanskibe742842022-04-04 13:18:50 +0200849 // Retrieve owner credentials and first node.
850 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200851 if err != nil {
852 ctxC()
853 return nil, err
854 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200855
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100856 // Write credentials to the metroctl directory.
857 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
858 ctxC()
859 return nil, fmt.Errorf("could not write owner key: %w", err)
860 }
861 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
862 ctxC()
863 return nil, fmt.Errorf("could not write owner certificate: %w", err)
864 }
865
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200866 // Set up a partially initialized cluster instance, to be filled in in the
867 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200868 cluster := &Cluster{
869 Owner: *cert,
870 Ports: portMap,
871 Nodes: map[string]*NodeInCluster{
872 firstNode.ID: firstNode,
873 },
874 NodeIDs: []string{
875 firstNode.ID,
876 },
877
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100878 nodesDone: done,
879 nodeOpts: nodeOpts,
880 launchDir: ld,
881 socketDir: sd,
882 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200883
Lorenz Brun276a7462023-07-12 21:28:54 +0200884 SOCKSDialer: socksDialer,
Serge Bazanskibe742842022-04-04 13:18:50 +0200885
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200886 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200887 ctxC: ctxC,
888 }
889
890 // Now start the rest of the nodes and register them into the cluster.
891
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200892 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200893 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200894 if err != nil {
895 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200896 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200897 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200898 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200899
900 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100901 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200902 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
903 if err != nil {
904 ctxC()
905 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
906 }
907 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100908 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200909
910 // Retrieve cluster info (for directory and ca public key) to register further
911 // nodes.
912 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
913 if err != nil {
914 ctxC()
915 return nil, fmt.Errorf("GetClusterInfo: %w", err)
916 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200917 caCert, err := x509.ParseCertificate(resI.CaCertificate)
918 if err != nil {
919 ctxC()
920 return nil, fmt.Errorf("ParseCertificate: %w", err)
921 }
922 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200923
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200924 // Use the retrieved information to configure the rest of the node options.
925 for i := 1; i < opts.NumNodes; i++ {
926 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100927 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200928 ConnectToSocket: vmPorts[i],
929 NodeParameters: &apb.NodeParameters{
930 Cluster: &apb.NodeParameters_ClusterRegister_{
931 ClusterRegister: &apb.NodeParameters_ClusterRegister{
932 RegisterTicket: ticket,
933 ClusterDirectory: resI.ClusterDirectory,
934 CaCertificate: resI.CaCertificate,
935 },
936 },
937 },
938 SerialPort: newPrefixedStdio(i),
939 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100940 if opts.NodeLogsToFiles {
941 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
942 port, err := NewSerialFileLogger(path)
943 if err != nil {
944 return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
945 }
946 launch.Log("Node %d logs at %s", i+1, path)
947 nodeOpts[i].SerialPort = port
948 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200949 }
950
951 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200952 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100953 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200954 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200955 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200956 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100957 launch.Log("Node %d finished with an error: %v", i, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200958 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200959 done[i] <- err
960 }(i)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200961 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200962
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200963 seenNodes := make(map[string]bool)
964 launch.Log("Cluster: waiting for nodes to appear as NEW...")
965 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200966 for {
967 nodes, err := getNodes(ctx, mgmt)
968 if err != nil {
969 ctxC()
970 return nil, fmt.Errorf("could not get nodes: %w", err)
971 }
972 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200973 if n.State != cpb.NodeState_NODE_STATE_NEW {
974 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +0200975 }
Serge Bazanski87d9c592024-03-20 12:35:11 +0100976 if seenNodes[n.Id] {
977 continue
978 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200979 seenNodes[n.Id] = true
980 cluster.Nodes[n.Id] = &NodeInCluster{
981 ID: n.Id,
982 Pubkey: n.Pubkey,
983 }
984 cluster.NodeIDs = append(cluster.NodeIDs, n.Id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200985 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200986
987 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200988 break
989 }
990 time.Sleep(1 * time.Second)
991 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200992 }
993 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200994
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200995 approvedNodes := make(map[string]bool)
996 upNodes := make(map[string]bool)
997 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200998 for {
999 nodes, err := getNodes(ctx, mgmt)
1000 if err != nil {
1001 ctxC()
1002 return nil, fmt.Errorf("could not get nodes: %w", err)
1003 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001004 for _, node := range nodes {
1005 if !seenNodes[node.Id] {
1006 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001007 continue
1008 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001009
1010 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
1011 launch.Log("Cluster: node %s is up", node.Id)
1012 upNodes[node.Id] = true
1013 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +02001014 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001015 if upNodes[node.Id] {
1016 continue
Serge Bazanskibe742842022-04-04 13:18:50 +02001017 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001018
1019 if !approvedNodes[node.Id] {
1020 launch.Log("Cluster: approving node %s", node.Id)
1021 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1022 Pubkey: node.Pubkey,
1023 })
1024 if err != nil {
1025 ctxC()
1026 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
1027 }
1028 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +02001029 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001030 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001031
1032 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
1033 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001034 break
1035 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001036 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001037 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001038 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001039
Serge Bazanski05f813b2023-03-16 17:58:39 +01001040 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +02001041 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001042 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001043 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001044 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001045
Serge Bazanskibe742842022-04-04 13:18:50 +02001046 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001047}
1048
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001049// RebootNode reboots the cluster member node matching the given index, and
1050// waits for it to rejoin the cluster. It will use the given context ctx to run
1051// cluster API requests, whereas the resulting QEMU process will be created
1052// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1053func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1054 if idx < 0 || idx >= len(c.NodeIDs) {
1055 return fmt.Errorf("index out of bounds.")
1056 }
1057 id := c.NodeIDs[idx]
1058
1059 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001060 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001061 if err != nil {
1062 return err
1063 }
1064 mgmt := apb.NewManagementClient(curC)
1065
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001066 // Cancel the node's context. This will shut down QEMU.
1067 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001068 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001069 err = <-c.nodesDone[idx]
1070 if err != nil {
1071 return fmt.Errorf("while restarting node: %w", err)
1072 }
1073
1074 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001075 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001076 go func(n int) {
1077 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001078 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001079 launch.Log("Node %d finished with an error: %v", n, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001080 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001081 c.nodesDone[n] <- err
1082 }(idx)
1083
Serge Bazanskibc969572024-03-21 11:56:13 +01001084 start := time.Now()
1085
1086 // Poll Management.GetNodes until the node is healthy.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001087 for {
1088 cs, err := getNode(ctx, mgmt, id)
1089 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001090 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001091 return err
1092 }
Serge Bazanskibc969572024-03-21 11:56:13 +01001093 launch.Log("Cluster: node health: %+v", cs.Health)
1094
1095 lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
1096 if lhb.After(start) && cs.Health == apb.Node_HEALTHY {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001097 break
1098 }
1099 time.Sleep(time.Second)
1100 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001101 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001102 return nil
1103}
1104
1105// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001106// nodes to stop. It returns an error if stopping the nodes failed, or one of
1107// the nodes failed to fully start in the first place.
1108func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001109 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001110 if c.authClient != nil {
1111 c.authClient.Close()
1112 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001113 c.ctxC()
1114
Leopold20a036e2023-01-15 00:17:19 +01001115 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001116 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001117 for _, c := range c.nodesDone {
1118 err := <-c
1119 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001120 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001121 }
1122 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001123 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001124 os.RemoveAll(c.launchDir)
1125 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001126 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001127 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001128 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001129}
Serge Bazanskibe742842022-04-04 13:18:50 +02001130
1131// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1132// their ID. This is performed by connecting to the cluster nanoswitch via its
1133// SOCKS proxy, and using the cluster node list for name resolution.
1134//
1135// For example:
1136//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001137// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001138func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1139 host, port, err := net.SplitHostPort(addr)
1140 if err != nil {
1141 return nil, fmt.Errorf("invalid host:port: %w", err)
1142 }
1143 // Already an IP address?
1144 if net.ParseIP(host) != nil {
Lorenz Brun276a7462023-07-12 21:28:54 +02001145 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001146 }
1147
1148 // Otherwise, expect a node name.
1149 node, ok := c.Nodes[host]
1150 if !ok {
1151 return nil, fmt.Errorf("unknown node %q", host)
1152 }
1153 addr = net.JoinHostPort(node.ManagementAddress, port)
Lorenz Brun276a7462023-07-12 21:28:54 +02001154 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001155}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001156
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001157// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1158// Kubernetes authenticating proxy using the cluster owner identity.
1159// It currently has access to everything (i.e. the cluster-admin role)
1160// via the owner-admin binding.
1161func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1162 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1163 if err != nil {
1164 // We explicitly pass an Ed25519 private key in, so this can't happen
1165 panic(err)
1166 }
1167
1168 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001169 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001170 Host: host,
1171 TLSClientConfig: rest.TLSClientConfig{
1172 // TODO(q3k): use CA certificate
1173 Insecure: true,
1174 ServerName: "kubernetes.default.svc",
1175 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1176 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1177 },
1178 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1179 return c.DialNode(ctx, address)
1180 },
1181 }
1182 return kubernetes.NewForConfig(&clientConfig)
1183}
1184
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001185// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1186// which are currently Kubernetes controllers, ie. run an apiserver. This list
1187// might be empty if no node is currently configured with the
1188// 'KubernetesController' node.
1189func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1190 curC, err := c.CuratorClient()
1191 if err != nil {
1192 return nil, err
1193 }
1194 mgmt := apb.NewManagementClient(curC)
1195 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1196 Filter: "has(node.roles.kubernetes_controller)",
1197 })
1198 if err != nil {
1199 return nil, err
1200 }
1201 defer srv.CloseSend()
1202 var res []string
1203 for {
1204 n, err := srv.Recv()
1205 if err == io.EOF {
1206 break
1207 }
1208 if err != nil {
1209 return nil, err
1210 }
1211 if n.Status == nil || n.Status.ExternalAddress == "" {
1212 continue
1213 }
1214 res = append(res, n.Status.ExternalAddress)
1215 }
1216 return res, nil
1217}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001218
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001219// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1220// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001221func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1222 // Get an authenticated owner client within the cluster.
1223 curC, err := c.CuratorClient()
1224 if err != nil {
1225 return err
1226 }
1227 mgmt := apb.NewManagementClient(curC)
1228 nodes, err := getNodes(ctx, mgmt)
1229 if err != nil {
1230 return err
1231 }
1232
1233 var unhealthy []string
1234 for _, node := range nodes {
1235 if node.Health == apb.Node_HEALTHY {
1236 continue
1237 }
1238 unhealthy = append(unhealthy, node.Id)
1239 }
1240 if len(unhealthy) == 0 {
1241 return nil
1242 }
1243 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1244}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001245
1246// ApproveNode approves a node by ID, waiting for it to become UP.
1247func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1248 curC, err := c.CuratorClient()
1249 if err != nil {
1250 return err
1251 }
1252 mgmt := apb.NewManagementClient(curC)
1253
1254 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1255 Pubkey: c.Nodes[id].Pubkey,
1256 })
1257 if err != nil {
1258 return fmt.Errorf("ApproveNode: %w", err)
1259 }
1260 launch.Log("Cluster: %s: approved, waiting for UP", id)
1261 for {
1262 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1263 if err != nil {
1264 return fmt.Errorf("GetNodes: %w", err)
1265 }
1266 found := false
1267 for {
1268 node, err := nodes.Recv()
1269 if errors.Is(err, io.EOF) {
1270 break
1271 }
1272 if err != nil {
1273 return fmt.Errorf("Nodes.Recv: %w", err)
1274 }
1275 if node.Id != id {
1276 continue
1277 }
1278 if node.State != cpb.NodeState_NODE_STATE_UP {
1279 continue
1280 }
1281 found = true
1282 break
1283 }
1284 nodes.CloseSend()
1285
1286 if found {
1287 break
1288 }
1289 time.Sleep(time.Second)
1290 }
1291 launch.Log("Cluster: %s: UP", id)
1292 return nil
1293}
1294
1295// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1296func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1297 curC, err := c.CuratorClient()
1298 if err != nil {
1299 return err
1300 }
1301 mgmt := apb.NewManagementClient(curC)
1302
1303 tr := true
1304 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1305 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1306 Node: &apb.UpdateNodeRolesRequest_Id{
1307 Id: id,
1308 },
1309 KubernetesWorker: &tr,
1310 })
1311 return err
1312}