blob: cf9cd07cbd2d582b69e23d911e840e85dbea0f99 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
24 grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
25 "go.uber.org/multierr"
26 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "google.golang.org/protobuf/proto"
30
31 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020032 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "source.monogon.dev/metropolis/node/core/rpc"
34 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020035 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "source.monogon.dev/metropolis/test/launch"
37)
38
39// Options contains all options that can be passed to Launch()
40type NodeOptions struct {
41 // Ports contains the port mapping where to expose the internal ports of the VM to
42 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
43 // ConnectToSocket is set.
44 Ports launch.PortMap
45
46 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
47 // command. Metropolis nodes generally restarts on almost all errors, so unless you
48 // want to test reboot behavior this should be false.
49 AllowReboot bool
50
51 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
52 // it is instead connected to the given file descriptor/socket. If this is set, all
53 // port maps from the Ports option are ignored. Intended for networking this
54 // instance together with others for running more complex network configurations.
55 ConnectToSocket *os.File
56
57 // SerialPort is a io.ReadWriter over which you can communicate with the serial
58 // port of the machine It can be set to an existing file descriptor (like
59 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
60 SerialPort io.ReadWriter
61
62 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
63 // registering into a cluster.
64 NodeParameters *apb.NodeParameters
65}
66
67// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020068var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020069 node.ConsensusPort,
70
71 node.CuratorServicePort,
72 node.DebugServicePort,
73
74 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +010075 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020076 node.CuratorServicePort,
77 node.DebuggerPort,
78}
79
80// LaunchNode launches a single Metropolis node instance with the given options.
81// The instance runs mostly paravirtualized but with some emulated hardware
82// similar to how a cloud provider might set up its VMs. The disk is fully
83// writable but is run in snapshot mode meaning that changes are not kept beyond
84// a single invocation.
85func LaunchNode(ctx context.Context, options NodeOptions) error {
86 // Pin temp directory to /tmp until we can use abstract socket namespace in QEMU
87 // (next release after 5.0,
88 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
89 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
90 // that we open and pass the name of it to QEMU. Not pinning this crashes both
91 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
92 // reasons 108 chars).
Lorenz Brun764a2de2021-11-22 16:26:36 +010093 tempDir, err := os.MkdirTemp("/tmp", "launch*")
Serge Bazanski66e58952021-10-05 17:06:56 +020094 if err != nil {
95 return fmt.Errorf("failed to create temporary directory: %w", err)
96 }
97 defer os.RemoveAll(tempDir)
98
99 // Copy TPM state into a temporary directory since it's being modified by the
100 // emulator
101 tpmTargetDir := filepath.Join(tempDir, "tpm")
102 tpmSrcDir := "metropolis/node/tpm"
103 if err := os.Mkdir(tpmTargetDir, 0755); err != nil {
104 return fmt.Errorf("failed to create TPM state directory: %w", err)
105 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100106 tpmFiles, err := os.ReadDir(tpmSrcDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200107 if err != nil {
108 return fmt.Errorf("failed to read TPM directory: %w", err)
109 }
110 for _, file := range tpmFiles {
111 name := file.Name()
112 src := filepath.Join(tpmSrcDir, name)
113 target := filepath.Join(tpmTargetDir, name)
114 if err := copyFile(src, target); err != nil {
115 return fmt.Errorf("failed to copy TPM directory: file %q to %q: %w", src, target, err)
116 }
117 }
118
119 var qemuNetType string
120 var qemuNetConfig launch.QemuValue
121 if options.ConnectToSocket != nil {
122 qemuNetType = "socket"
123 qemuNetConfig = launch.QemuValue{
124 "id": {"net0"},
125 "fd": {"3"},
126 }
127 } else {
128 qemuNetType = "user"
129 qemuNetConfig = launch.QemuValue{
130 "id": {"net0"},
131 "net": {"10.42.0.0/24"},
132 "dhcpstart": {"10.42.0.10"},
133 "hostfwd": options.Ports.ToQemuForwards(),
134 }
135 }
136
137 tpmSocketPath := filepath.Join(tempDir, "tpm-socket")
138
139 mac, err := generateRandomEthernetMAC()
140 if err != nil {
141 return err
142 }
143
144 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
145 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
146 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
147 "-drive", "if=pflash,format=raw,snapshot=on,file=external/edk2/OVMF_VARS.fd",
148 "-drive", "if=virtio,format=raw,snapshot=on,cache=unsafe,file=metropolis/node/node.img",
149 "-netdev", qemuNetConfig.ToOption(qemuNetType),
150 "-device", "virtio-net-pci,netdev=net0,mac=" + mac.String(),
151 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
152 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
153 "-device", "tpm-tis,tpmdev=tpm0",
154 "-device", "virtio-rng-pci",
155 "-serial", "stdio"}
156
157 if !options.AllowReboot {
158 qemuArgs = append(qemuArgs, "-no-reboot")
159 }
160
161 if options.NodeParameters != nil {
162 parametersPath := filepath.Join(tempDir, "parameters.pb")
163 parametersRaw, err := proto.Marshal(options.NodeParameters)
164 if err != nil {
165 return fmt.Errorf("failed to encode node paraeters: %w", err)
166 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100167 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200168 return fmt.Errorf("failed to write node parameters: %w", err)
169 }
170 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
171 }
172
173 // Start TPM emulator as a subprocess
174 tpmCtx, tpmCancel := context.WithCancel(ctx)
175 defer tpmCancel()
176
177 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmTargetDir, "--ctrl", "type=unixio,path="+tpmSocketPath)
178 tpmEmuCmd.Stderr = os.Stderr
179 tpmEmuCmd.Stdout = os.Stdout
180
181 err = tpmEmuCmd.Start()
182 if err != nil {
183 return fmt.Errorf("failed to start TPM emulator: %w", err)
184 }
185
186 // Start the main qemu binary
187 systemCmd := exec.CommandContext(ctx, "qemu-system-x86_64", qemuArgs...)
188 if options.ConnectToSocket != nil {
189 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
190 }
191
192 var stdErrBuf bytes.Buffer
193 systemCmd.Stderr = &stdErrBuf
194 systemCmd.Stdout = options.SerialPort
195
196 err = systemCmd.Run()
197
198 // Stop TPM emulator and wait for it to exit to properly reap the child process
199 tpmCancel()
200 log.Print("Node: Waiting for TPM emulator to exit")
201 // Wait returns a SIGKILL error because we just cancelled its context.
202 // We still need to call it to avoid creating zombies.
203 _ = tpmEmuCmd.Wait()
204 log.Print("Node: TPM emulator done")
205
206 var exerr *exec.ExitError
207 if err != nil && errors.As(err, &exerr) {
208 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
209 if status.Signaled() && status.Signal() == syscall.SIGKILL {
210 // Process was killed externally (most likely by our context being canceled).
211 // This is a normal exit for us, so return nil
212 return nil
213 }
214 exerr.Stderr = stdErrBuf.Bytes()
215 newErr := launch.QEMUError(*exerr)
216 return &newErr
217 }
218 return err
219}
220
221func copyFile(src, dst string) error {
222 in, err := os.Open(src)
223 if err != nil {
224 return fmt.Errorf("when opening source: %w", err)
225 }
226 defer in.Close()
227
228 out, err := os.Create(dst)
229 if err != nil {
230 return fmt.Errorf("when creating destination: %w", err)
231 }
232 defer out.Close()
233
234 _, err = io.Copy(out, in)
235 if err != nil {
236 return fmt.Errorf("when copying file: %w", err)
237 }
238 return out.Close()
239}
240
Serge Bazanskie78a0892021-10-07 17:03:49 +0200241// getNodes wraps around Management.GetNodes to return a list of nodes in a
242// cluster.
243func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200244 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100245 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100246 err := backoff.Retry(func() error {
247 res = nil
248 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200249 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100250 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200251 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100252 for {
253 node, err := srvN.Recv()
254 if err == io.EOF {
255 break
256 }
257 if err != nil {
258 return fmt.Errorf("GetNodes.Recv: %w", err)
259 }
260 res = append(res, node)
261 }
262 return nil
263 }, bo)
264 if err != nil {
265 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200266 }
267 return res, nil
268}
269
Serge Bazanski66e58952021-10-05 17:06:56 +0200270// Gets a random EUI-48 Ethernet MAC address
271func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
272 macBuf := make([]byte, 6)
273 _, err := rand.Read(macBuf)
274 if err != nil {
275 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
276 }
277
278 // Set U/L bit and clear I/G bit (locally administered individual MAC)
279 // Ref IEEE 802-2014 Section 8.2.2
280 macBuf[0] = (macBuf[0] | 2) & 0xfe
281 mac := net.HardwareAddr(macBuf)
282 return &mac, nil
283}
284
285// ClusterPorts contains all ports forwarded by Nanoswitch to the first VM in a
286// launched Metropolis cluster.
Serge Bazanski52304a82021-10-29 16:56:18 +0200287var ClusterPorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200288 node.CuratorServicePort,
289 node.DebugServicePort,
290
291 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100292 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200293}
294
295// ClusterOptions contains all options for launching a Metropolis cluster.
296type ClusterOptions struct {
297 // The number of nodes this cluster should be started with.
298 NumNodes int
299}
300
301// Cluster is the running Metropolis cluster launched using the LaunchCluster
302// function.
303type Cluster struct {
304 // Debug is the NodeDebugService gRPC service, allowing for debug
305 // unauthenticated cluster to the access.
306 Debug apb.NodeDebugServiceClient
307 // Management is the Management gRPC service, authenticated as the owner of the
308 // cluster.
309 Management apb.ManagementClient
310 // Owner is the TLS Certificate of the owner of the test cluster. This can be
311 // used to authenticate further clients to the running cluster.
312 Owner tls.Certificate
313 // Ports is the PortMap used to access the first nodes' services (defined in
314 // ClusterPorts).
315 Ports launch.PortMap
316
317 // nodesDone is a list of channels populated with the return codes from all the
318 // nodes' qemu instances. It's used by Close to ensure all nodes have
319 // succesfully been stopped.
320 nodesDone []chan error
321 // ctxC is used by Close to cancel the context under which the nodes are
322 // running.
323 ctxC context.CancelFunc
324}
325
326// LaunchCluster launches a cluster of Metropolis node VMs together with a
327// Nanoswitch instance to network them all together.
328//
329// The given context will be used to run all qemu instances in the cluster, and
330// canceling the context or calling Close() will terminate them.
331func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200332 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200333 return nil, errors.New("refusing to start cluster with zero nodes")
334 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200335
336 ctxT, ctxC := context.WithCancel(ctx)
337
338 // Prepare links between nodes and nanoswitch.
339 var switchPorts []*os.File
340 var vmPorts []*os.File
341 for i := 0; i < opts.NumNodes; i++ {
342 switchPort, vmPort, err := launch.NewSocketPair()
343 if err != nil {
344 ctxC()
345 return nil, fmt.Errorf("failed to get socketpair: %w", err)
346 }
347 switchPorts = append(switchPorts, switchPort)
348 vmPorts = append(vmPorts, vmPort)
349 }
350
Serge Bazanskie78a0892021-10-07 17:03:49 +0200351 // Make a list of channels that will be populated by all running node qemu
352 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200353 done := make([]chan error, opts.NumNodes)
354 for i, _ := range done {
355 done[i] = make(chan error, 1)
356 }
357
358 // Start first node.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200359 log.Printf("Cluster: Starting node %d...", 1)
Serge Bazanski66e58952021-10-05 17:06:56 +0200360 go func() {
361 err := LaunchNode(ctxT, NodeOptions{
362 ConnectToSocket: vmPorts[0],
363 NodeParameters: &apb.NodeParameters{
364 Cluster: &apb.NodeParameters_ClusterBootstrap_{
365 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
366 OwnerPublicKey: InsecurePublicKey,
367 },
368 },
369 },
Serge Bazanski075465c2021-11-16 15:38:49 +0100370 SerialPort: newPrefixedStdio(0),
Serge Bazanski66e58952021-10-05 17:06:56 +0200371 })
372 done[0] <- err
373 }()
374
Serge Bazanskie78a0892021-10-07 17:03:49 +0200375 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200376 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
377 if err != nil {
378 ctxC()
379 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
380 }
381
382 go func() {
383 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
384 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100385 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200386 ExtraNetworkInterfaces: switchPorts,
387 PortMap: portMap,
388 }); err != nil {
389 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100390 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200391 }
392 }
393 }()
394
395 // Dial debug service.
396 copts := []grpcretry.CallOption{
397 grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
398 }
399 debugConn, err := portMap.DialGRPC(node.DebugServicePort, grpc.WithInsecure(),
400 grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(copts...)))
401 if err != nil {
402 ctxC()
403 return nil, fmt.Errorf("failed to dial debug service: %w", err)
404 }
405
406 // Dial external service.
407 remote := fmt.Sprintf("localhost:%v", portMap[node.CuratorServicePort])
Serge Bazanski399ce552022-03-29 12:52:42 +0200408 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
Serge Bazanski66e58952021-10-05 17:06:56 +0200409 if err != nil {
410 ctxC()
Serge Bazanski399ce552022-03-29 12:52:42 +0200411 return nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
412 }
413 initClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(initCreds))
414 if err != nil {
415 ctxC()
416 return nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200417 }
418
419 // Retrieve owner certificate - this can take a while because the node is still
420 // coming up, so do it in a backoff loop.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200421 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanski66e58952021-10-05 17:06:56 +0200422 aaa := apb.NewAAAClient(initClient)
423 var cert *tls.Certificate
424 err = backoff.Retry(func() error {
425 cert, err = rpc.RetrieveOwnerCertificate(ctxT, aaa, InsecurePrivateKey)
Serge Bazanski636032e2022-01-26 14:21:33 +0100426 if st, ok := status.FromError(err); ok {
427 if st.Code() == codes.Unavailable {
428 return err
429 }
430 }
431 return backoff.Permanent(err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200432 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctxT))
433 if err != nil {
434 ctxC()
435 return nil, err
436 }
437 log.Printf("Cluster: retrieved owner certificate.")
438
Serge Bazanskie78a0892021-10-07 17:03:49 +0200439 // Build authenticated owner client to new node.
Serge Bazanski399ce552022-03-29 12:52:42 +0200440 authCreds := rpc.NewAuthenticatedCredentials(*cert, nil)
441 authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds))
Serge Bazanski66e58952021-10-05 17:06:56 +0200442 if err != nil {
443 ctxC()
Serge Bazanski399ce552022-03-29 12:52:42 +0200444 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200445 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200446 mgmt := apb.NewManagementClient(authClient)
447
448 // Retrieve register ticket to register further nodes.
449 log.Printf("Cluster: retrieving register ticket...")
450 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
451 if err != nil {
452 ctxC()
453 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
454 }
455 ticket := resT.Ticket
456 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
457
458 // Retrieve cluster info (for directory and ca public key) to register further
459 // nodes.
460 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
461 if err != nil {
462 ctxC()
463 return nil, fmt.Errorf("GetClusterInfo: %w", err)
464 }
465
466 // Now run the rest of the nodes.
467 //
468 // TODO(q3k): parallelize this
469 for i := 1; i < opts.NumNodes; i++ {
470 log.Printf("Cluster: Starting node %d...", i+1)
471 go func(i int) {
472 err := LaunchNode(ctxT, NodeOptions{
473 ConnectToSocket: vmPorts[i],
474 NodeParameters: &apb.NodeParameters{
475 Cluster: &apb.NodeParameters_ClusterRegister_{
476 ClusterRegister: &apb.NodeParameters_ClusterRegister{
477 RegisterTicket: ticket,
478 ClusterDirectory: resI.ClusterDirectory,
479 CaCertificate: resI.CaCertificate,
480 },
481 },
482 },
Serge Bazanski075465c2021-11-16 15:38:49 +0100483 SerialPort: newPrefixedStdio(i),
Serge Bazanskie78a0892021-10-07 17:03:49 +0200484 })
485 done[i] <- err
486 }(i)
487 var newNode *apb.Node
488
489 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
490 for {
491 nodes, err := getNodes(ctx, mgmt)
492 if err != nil {
493 ctxC()
494 return nil, fmt.Errorf("could not get nodes: %w", err)
495 }
496 for _, n := range nodes {
497 if n.State == cpb.NodeState_NODE_STATE_NEW {
498 newNode = n
499 break
500 }
501 }
502 if newNode != nil {
503 break
504 }
505 time.Sleep(1 * time.Second)
506 }
507 id := identity.NodeID(newNode.Pubkey)
508 log.Printf("Cluster: node %d is %s", i, id)
509
510 log.Printf("Cluster: approving node %d", i)
511 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
512 Pubkey: newNode.Pubkey,
513 })
514 if err != nil {
515 ctxC()
516 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
517 }
518 log.Printf("Cluster: node %d approved, waiting for it to appear as UP...", i)
519 for {
520 nodes, err := getNodes(ctx, mgmt)
521 if err != nil {
522 ctxC()
523 return nil, fmt.Errorf("could not get nodes: %w", err)
524 }
525 found := false
526 for _, n := range nodes {
527 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
528 continue
529 }
530 if n.State == cpb.NodeState_NODE_STATE_UP {
531 found = true
532 break
533 }
534 time.Sleep(time.Second)
535 }
536 if found {
537 break
538 }
539 }
540 log.Printf("Cluster: node %d (%s) UP!", i, id)
541 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200542
543 return &Cluster{
544 Debug: apb.NewNodeDebugServiceClient(debugConn),
Serge Bazanskie78a0892021-10-07 17:03:49 +0200545 Management: mgmt,
Serge Bazanski66e58952021-10-05 17:06:56 +0200546 Owner: *cert,
547 Ports: portMap,
548 nodesDone: done,
549
550 ctxC: ctxC,
551 }, nil
552}
553
554// Close cancels the running clusters' context and waits for all virtualized
555// nodes to stop. It returns an error if stopping the nodes failed, or one of
556// the nodes failed to fully start in the first place.
557func (c *Cluster) Close() error {
558 log.Printf("Cluster: stopping...")
559 c.ctxC()
560
561 var errors []error
562 log.Printf("Cluster: waiting for nodes to exit...")
563 for _, c := range c.nodesDone {
564 err := <-c
565 if err != nil {
566 errors = append(errors, err)
567 }
568 }
569 log.Printf("Cluster: done")
570 return multierr.Combine(errors...)
571}