blob: 8cc0a489be3c64e5b93aaf37bb92c6b3740bef25 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
24 grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
25 "go.uber.org/multierr"
26 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "google.golang.org/protobuf/proto"
30
31 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020032 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "source.monogon.dev/metropolis/node/core/rpc"
34 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020035 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "source.monogon.dev/metropolis/test/launch"
37)
38
39// Options contains all options that can be passed to Launch()
40type NodeOptions struct {
41 // Ports contains the port mapping where to expose the internal ports of the VM to
42 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
43 // ConnectToSocket is set.
44 Ports launch.PortMap
45
46 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
47 // command. Metropolis nodes generally restarts on almost all errors, so unless you
48 // want to test reboot behavior this should be false.
49 AllowReboot bool
50
51 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
52 // it is instead connected to the given file descriptor/socket. If this is set, all
53 // port maps from the Ports option are ignored. Intended for networking this
54 // instance together with others for running more complex network configurations.
55 ConnectToSocket *os.File
56
57 // SerialPort is a io.ReadWriter over which you can communicate with the serial
58 // port of the machine It can be set to an existing file descriptor (like
59 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
60 SerialPort io.ReadWriter
61
62 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
63 // registering into a cluster.
64 NodeParameters *apb.NodeParameters
65}
66
67// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020068var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020069 node.ConsensusPort,
70
71 node.CuratorServicePort,
72 node.DebugServicePort,
73
74 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +010075 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020076 node.CuratorServicePort,
77 node.DebuggerPort,
78}
79
80// LaunchNode launches a single Metropolis node instance with the given options.
81// The instance runs mostly paravirtualized but with some emulated hardware
82// similar to how a cloud provider might set up its VMs. The disk is fully
83// writable but is run in snapshot mode meaning that changes are not kept beyond
84// a single invocation.
85func LaunchNode(ctx context.Context, options NodeOptions) error {
86 // Pin temp directory to /tmp until we can use abstract socket namespace in QEMU
87 // (next release after 5.0,
88 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
89 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
90 // that we open and pass the name of it to QEMU. Not pinning this crashes both
91 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
92 // reasons 108 chars).
Lorenz Brun764a2de2021-11-22 16:26:36 +010093 tempDir, err := os.MkdirTemp("/tmp", "launch*")
Serge Bazanski66e58952021-10-05 17:06:56 +020094 if err != nil {
95 return fmt.Errorf("failed to create temporary directory: %w", err)
96 }
97 defer os.RemoveAll(tempDir)
98
99 // Copy TPM state into a temporary directory since it's being modified by the
100 // emulator
101 tpmTargetDir := filepath.Join(tempDir, "tpm")
102 tpmSrcDir := "metropolis/node/tpm"
103 if err := os.Mkdir(tpmTargetDir, 0755); err != nil {
104 return fmt.Errorf("failed to create TPM state directory: %w", err)
105 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100106 tpmFiles, err := os.ReadDir(tpmSrcDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200107 if err != nil {
108 return fmt.Errorf("failed to read TPM directory: %w", err)
109 }
110 for _, file := range tpmFiles {
111 name := file.Name()
112 src := filepath.Join(tpmSrcDir, name)
113 target := filepath.Join(tpmTargetDir, name)
114 if err := copyFile(src, target); err != nil {
115 return fmt.Errorf("failed to copy TPM directory: file %q to %q: %w", src, target, err)
116 }
117 }
118
119 var qemuNetType string
120 var qemuNetConfig launch.QemuValue
121 if options.ConnectToSocket != nil {
122 qemuNetType = "socket"
123 qemuNetConfig = launch.QemuValue{
124 "id": {"net0"},
125 "fd": {"3"},
126 }
127 } else {
128 qemuNetType = "user"
129 qemuNetConfig = launch.QemuValue{
130 "id": {"net0"},
131 "net": {"10.42.0.0/24"},
132 "dhcpstart": {"10.42.0.10"},
133 "hostfwd": options.Ports.ToQemuForwards(),
134 }
135 }
136
137 tpmSocketPath := filepath.Join(tempDir, "tpm-socket")
138
139 mac, err := generateRandomEthernetMAC()
140 if err != nil {
141 return err
142 }
143
144 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
145 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
146 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
147 "-drive", "if=pflash,format=raw,snapshot=on,file=external/edk2/OVMF_VARS.fd",
148 "-drive", "if=virtio,format=raw,snapshot=on,cache=unsafe,file=metropolis/node/node.img",
149 "-netdev", qemuNetConfig.ToOption(qemuNetType),
150 "-device", "virtio-net-pci,netdev=net0,mac=" + mac.String(),
151 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
152 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
153 "-device", "tpm-tis,tpmdev=tpm0",
154 "-device", "virtio-rng-pci",
155 "-serial", "stdio"}
156
157 if !options.AllowReboot {
158 qemuArgs = append(qemuArgs, "-no-reboot")
159 }
160
161 if options.NodeParameters != nil {
162 parametersPath := filepath.Join(tempDir, "parameters.pb")
163 parametersRaw, err := proto.Marshal(options.NodeParameters)
164 if err != nil {
165 return fmt.Errorf("failed to encode node paraeters: %w", err)
166 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100167 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200168 return fmt.Errorf("failed to write node parameters: %w", err)
169 }
170 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
171 }
172
173 // Start TPM emulator as a subprocess
174 tpmCtx, tpmCancel := context.WithCancel(ctx)
175 defer tpmCancel()
176
177 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmTargetDir, "--ctrl", "type=unixio,path="+tpmSocketPath)
178 tpmEmuCmd.Stderr = os.Stderr
179 tpmEmuCmd.Stdout = os.Stdout
180
181 err = tpmEmuCmd.Start()
182 if err != nil {
183 return fmt.Errorf("failed to start TPM emulator: %w", err)
184 }
185
186 // Start the main qemu binary
187 systemCmd := exec.CommandContext(ctx, "qemu-system-x86_64", qemuArgs...)
188 if options.ConnectToSocket != nil {
189 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
190 }
191
192 var stdErrBuf bytes.Buffer
193 systemCmd.Stderr = &stdErrBuf
194 systemCmd.Stdout = options.SerialPort
195
196 err = systemCmd.Run()
197
198 // Stop TPM emulator and wait for it to exit to properly reap the child process
199 tpmCancel()
200 log.Print("Node: Waiting for TPM emulator to exit")
201 // Wait returns a SIGKILL error because we just cancelled its context.
202 // We still need to call it to avoid creating zombies.
203 _ = tpmEmuCmd.Wait()
204 log.Print("Node: TPM emulator done")
205
206 var exerr *exec.ExitError
207 if err != nil && errors.As(err, &exerr) {
208 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
209 if status.Signaled() && status.Signal() == syscall.SIGKILL {
210 // Process was killed externally (most likely by our context being canceled).
211 // This is a normal exit for us, so return nil
212 return nil
213 }
214 exerr.Stderr = stdErrBuf.Bytes()
215 newErr := launch.QEMUError(*exerr)
216 return &newErr
217 }
218 return err
219}
220
221func copyFile(src, dst string) error {
222 in, err := os.Open(src)
223 if err != nil {
224 return fmt.Errorf("when opening source: %w", err)
225 }
226 defer in.Close()
227
228 out, err := os.Create(dst)
229 if err != nil {
230 return fmt.Errorf("when creating destination: %w", err)
231 }
232 defer out.Close()
233
234 _, err = io.Copy(out, in)
235 if err != nil {
236 return fmt.Errorf("when copying file: %w", err)
237 }
238 return out.Close()
239}
240
Serge Bazanskie78a0892021-10-07 17:03:49 +0200241// getNodes wraps around Management.GetNodes to return a list of nodes in a
242// cluster.
243func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200244 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100245 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100246 err := backoff.Retry(func() error {
247 res = nil
248 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200249 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100250 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200251 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100252 for {
253 node, err := srvN.Recv()
254 if err == io.EOF {
255 break
256 }
257 if err != nil {
258 return fmt.Errorf("GetNodes.Recv: %w", err)
259 }
260 res = append(res, node)
261 }
262 return nil
263 }, bo)
264 if err != nil {
265 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200266 }
267 return res, nil
268}
269
Serge Bazanski66e58952021-10-05 17:06:56 +0200270// Gets a random EUI-48 Ethernet MAC address
271func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
272 macBuf := make([]byte, 6)
273 _, err := rand.Read(macBuf)
274 if err != nil {
275 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
276 }
277
278 // Set U/L bit and clear I/G bit (locally administered individual MAC)
279 // Ref IEEE 802-2014 Section 8.2.2
280 macBuf[0] = (macBuf[0] | 2) & 0xfe
281 mac := net.HardwareAddr(macBuf)
282 return &mac, nil
283}
284
285// ClusterPorts contains all ports forwarded by Nanoswitch to the first VM in a
286// launched Metropolis cluster.
Serge Bazanski52304a82021-10-29 16:56:18 +0200287var ClusterPorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200288 node.CuratorServicePort,
289 node.DebugServicePort,
290
291 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100292 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200293}
294
295// ClusterOptions contains all options for launching a Metropolis cluster.
296type ClusterOptions struct {
297 // The number of nodes this cluster should be started with.
298 NumNodes int
299}
300
301// Cluster is the running Metropolis cluster launched using the LaunchCluster
302// function.
303type Cluster struct {
304 // Debug is the NodeDebugService gRPC service, allowing for debug
305 // unauthenticated cluster to the access.
306 Debug apb.NodeDebugServiceClient
307 // Management is the Management gRPC service, authenticated as the owner of the
308 // cluster.
309 Management apb.ManagementClient
310 // Owner is the TLS Certificate of the owner of the test cluster. This can be
311 // used to authenticate further clients to the running cluster.
312 Owner tls.Certificate
313 // Ports is the PortMap used to access the first nodes' services (defined in
314 // ClusterPorts).
315 Ports launch.PortMap
316
317 // nodesDone is a list of channels populated with the return codes from all the
318 // nodes' qemu instances. It's used by Close to ensure all nodes have
319 // succesfully been stopped.
320 nodesDone []chan error
321 // ctxC is used by Close to cancel the context under which the nodes are
322 // running.
323 ctxC context.CancelFunc
324}
325
326// LaunchCluster launches a cluster of Metropolis node VMs together with a
327// Nanoswitch instance to network them all together.
328//
329// The given context will be used to run all qemu instances in the cluster, and
330// canceling the context or calling Close() will terminate them.
331func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200332 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200333 return nil, errors.New("refusing to start cluster with zero nodes")
334 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200335
336 ctxT, ctxC := context.WithCancel(ctx)
337
338 // Prepare links between nodes and nanoswitch.
339 var switchPorts []*os.File
340 var vmPorts []*os.File
341 for i := 0; i < opts.NumNodes; i++ {
342 switchPort, vmPort, err := launch.NewSocketPair()
343 if err != nil {
344 ctxC()
345 return nil, fmt.Errorf("failed to get socketpair: %w", err)
346 }
347 switchPorts = append(switchPorts, switchPort)
348 vmPorts = append(vmPorts, vmPort)
349 }
350
Serge Bazanskie78a0892021-10-07 17:03:49 +0200351 // Make a list of channels that will be populated by all running node qemu
352 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200353 done := make([]chan error, opts.NumNodes)
354 for i, _ := range done {
355 done[i] = make(chan error, 1)
356 }
357
358 // Start first node.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200359 log.Printf("Cluster: Starting node %d...", 1)
Serge Bazanski66e58952021-10-05 17:06:56 +0200360 go func() {
361 err := LaunchNode(ctxT, NodeOptions{
362 ConnectToSocket: vmPorts[0],
363 NodeParameters: &apb.NodeParameters{
364 Cluster: &apb.NodeParameters_ClusterBootstrap_{
365 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
366 OwnerPublicKey: InsecurePublicKey,
367 },
368 },
369 },
Serge Bazanski075465c2021-11-16 15:38:49 +0100370 SerialPort: newPrefixedStdio(0),
Serge Bazanski66e58952021-10-05 17:06:56 +0200371 })
372 done[0] <- err
373 }()
374
Serge Bazanskie78a0892021-10-07 17:03:49 +0200375 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200376 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
377 if err != nil {
378 ctxC()
379 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
380 }
381
382 go func() {
383 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
384 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100385 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200386 ExtraNetworkInterfaces: switchPorts,
387 PortMap: portMap,
388 }); err != nil {
389 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100390 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200391 }
392 }
393 }()
394
395 // Dial debug service.
396 copts := []grpcretry.CallOption{
397 grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
398 }
399 debugConn, err := portMap.DialGRPC(node.DebugServicePort, grpc.WithInsecure(),
400 grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(copts...)))
401 if err != nil {
402 ctxC()
403 return nil, fmt.Errorf("failed to dial debug service: %w", err)
404 }
405
406 // Dial external service.
407 remote := fmt.Sprintf("localhost:%v", portMap[node.CuratorServicePort])
408 initClient, err := rpc.NewEphemeralClient(remote, InsecurePrivateKey, nil)
409 if err != nil {
410 ctxC()
411 return nil, fmt.Errorf("NewInitialClient: %w", err)
412 }
413
414 // Retrieve owner certificate - this can take a while because the node is still
415 // coming up, so do it in a backoff loop.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200416 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanski66e58952021-10-05 17:06:56 +0200417 aaa := apb.NewAAAClient(initClient)
418 var cert *tls.Certificate
419 err = backoff.Retry(func() error {
420 cert, err = rpc.RetrieveOwnerCertificate(ctxT, aaa, InsecurePrivateKey)
Serge Bazanski636032e2022-01-26 14:21:33 +0100421 if st, ok := status.FromError(err); ok {
422 if st.Code() == codes.Unavailable {
423 return err
424 }
425 }
426 return backoff.Permanent(err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200427 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctxT))
428 if err != nil {
429 ctxC()
430 return nil, err
431 }
432 log.Printf("Cluster: retrieved owner certificate.")
433
Serge Bazanskie78a0892021-10-07 17:03:49 +0200434 // Build authenticated owner client to new node.
Serge Bazanski66e58952021-10-05 17:06:56 +0200435 authClient, err := rpc.NewAuthenticatedClient(remote, *cert, nil)
436 if err != nil {
437 ctxC()
438 return nil, fmt.Errorf("NewAuthenticatedClient: %w", err)
439 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200440 mgmt := apb.NewManagementClient(authClient)
441
442 // Retrieve register ticket to register further nodes.
443 log.Printf("Cluster: retrieving register ticket...")
444 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
445 if err != nil {
446 ctxC()
447 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
448 }
449 ticket := resT.Ticket
450 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
451
452 // Retrieve cluster info (for directory and ca public key) to register further
453 // nodes.
454 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
455 if err != nil {
456 ctxC()
457 return nil, fmt.Errorf("GetClusterInfo: %w", err)
458 }
459
460 // Now run the rest of the nodes.
461 //
462 // TODO(q3k): parallelize this
463 for i := 1; i < opts.NumNodes; i++ {
464 log.Printf("Cluster: Starting node %d...", i+1)
465 go func(i int) {
466 err := LaunchNode(ctxT, NodeOptions{
467 ConnectToSocket: vmPorts[i],
468 NodeParameters: &apb.NodeParameters{
469 Cluster: &apb.NodeParameters_ClusterRegister_{
470 ClusterRegister: &apb.NodeParameters_ClusterRegister{
471 RegisterTicket: ticket,
472 ClusterDirectory: resI.ClusterDirectory,
473 CaCertificate: resI.CaCertificate,
474 },
475 },
476 },
Serge Bazanski075465c2021-11-16 15:38:49 +0100477 SerialPort: newPrefixedStdio(i),
Serge Bazanskie78a0892021-10-07 17:03:49 +0200478 })
479 done[i] <- err
480 }(i)
481 var newNode *apb.Node
482
483 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
484 for {
485 nodes, err := getNodes(ctx, mgmt)
486 if err != nil {
487 ctxC()
488 return nil, fmt.Errorf("could not get nodes: %w", err)
489 }
490 for _, n := range nodes {
491 if n.State == cpb.NodeState_NODE_STATE_NEW {
492 newNode = n
493 break
494 }
495 }
496 if newNode != nil {
497 break
498 }
499 time.Sleep(1 * time.Second)
500 }
501 id := identity.NodeID(newNode.Pubkey)
502 log.Printf("Cluster: node %d is %s", i, id)
503
504 log.Printf("Cluster: approving node %d", i)
505 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
506 Pubkey: newNode.Pubkey,
507 })
508 if err != nil {
509 ctxC()
510 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
511 }
512 log.Printf("Cluster: node %d approved, waiting for it to appear as UP...", i)
513 for {
514 nodes, err := getNodes(ctx, mgmt)
515 if err != nil {
516 ctxC()
517 return nil, fmt.Errorf("could not get nodes: %w", err)
518 }
519 found := false
520 for _, n := range nodes {
521 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
522 continue
523 }
524 if n.State == cpb.NodeState_NODE_STATE_UP {
525 found = true
526 break
527 }
528 time.Sleep(time.Second)
529 }
530 if found {
531 break
532 }
533 }
534 log.Printf("Cluster: node %d (%s) UP!", i, id)
535 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200536
537 return &Cluster{
538 Debug: apb.NewNodeDebugServiceClient(debugConn),
Serge Bazanskie78a0892021-10-07 17:03:49 +0200539 Management: mgmt,
Serge Bazanski66e58952021-10-05 17:06:56 +0200540 Owner: *cert,
541 Ports: portMap,
542 nodesDone: done,
543
544 ctxC: ctxC,
545 }, nil
546}
547
548// Close cancels the running clusters' context and waits for all virtualized
549// nodes to stop. It returns an error if stopping the nodes failed, or one of
550// the nodes failed to fully start in the first place.
551func (c *Cluster) Close() error {
552 log.Printf("Cluster: stopping...")
553 c.ctxC()
554
555 var errors []error
556 log.Printf("Cluster: waiting for nodes to exit...")
557 for _, c := range c.nodesDone {
558 err := <-c
559 if err != nil {
560 errors = append(errors, err)
561 }
562 }
563 log.Printf("Cluster: done")
564 return multierr.Combine(errors...)
565}