blob: da7862f92a28ca4018b516de0026f55dae7f4efb [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020024 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020025 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "google.golang.org/protobuf/proto"
30
31 "source.monogon.dev/metropolis/node"
Serge Bazanskibe742842022-04-04 13:18:50 +020032 common "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020033 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "source.monogon.dev/metropolis/node/core/rpc"
35 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020036 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020037 "source.monogon.dev/metropolis/test/launch"
38)
39
40// Options contains all options that can be passed to Launch()
41type NodeOptions struct {
42 // Ports contains the port mapping where to expose the internal ports of the VM to
43 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
44 // ConnectToSocket is set.
45 Ports launch.PortMap
46
47 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
48 // command. Metropolis nodes generally restarts on almost all errors, so unless you
49 // want to test reboot behavior this should be false.
50 AllowReboot bool
51
52 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
53 // it is instead connected to the given file descriptor/socket. If this is set, all
54 // port maps from the Ports option are ignored. Intended for networking this
55 // instance together with others for running more complex network configurations.
56 ConnectToSocket *os.File
57
58 // SerialPort is a io.ReadWriter over which you can communicate with the serial
59 // port of the machine It can be set to an existing file descriptor (like
60 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
61 SerialPort io.ReadWriter
62
63 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
64 // registering into a cluster.
65 NodeParameters *apb.NodeParameters
66}
67
68// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020069var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020070 node.ConsensusPort,
71
72 node.CuratorServicePort,
73 node.DebugServicePort,
74
75 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +010076 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020077 node.CuratorServicePort,
78 node.DebuggerPort,
79}
80
81// LaunchNode launches a single Metropolis node instance with the given options.
82// The instance runs mostly paravirtualized but with some emulated hardware
83// similar to how a cloud provider might set up its VMs. The disk is fully
84// writable but is run in snapshot mode meaning that changes are not kept beyond
85// a single invocation.
86func LaunchNode(ctx context.Context, options NodeOptions) error {
87 // Pin temp directory to /tmp until we can use abstract socket namespace in QEMU
88 // (next release after 5.0,
89 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
90 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
91 // that we open and pass the name of it to QEMU. Not pinning this crashes both
92 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
93 // reasons 108 chars).
Lorenz Brun764a2de2021-11-22 16:26:36 +010094 tempDir, err := os.MkdirTemp("/tmp", "launch*")
Serge Bazanski66e58952021-10-05 17:06:56 +020095 if err != nil {
96 return fmt.Errorf("failed to create temporary directory: %w", err)
97 }
98 defer os.RemoveAll(tempDir)
99
100 // Copy TPM state into a temporary directory since it's being modified by the
101 // emulator
102 tpmTargetDir := filepath.Join(tempDir, "tpm")
103 tpmSrcDir := "metropolis/node/tpm"
104 if err := os.Mkdir(tpmTargetDir, 0755); err != nil {
105 return fmt.Errorf("failed to create TPM state directory: %w", err)
106 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100107 tpmFiles, err := os.ReadDir(tpmSrcDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200108 if err != nil {
109 return fmt.Errorf("failed to read TPM directory: %w", err)
110 }
111 for _, file := range tpmFiles {
112 name := file.Name()
113 src := filepath.Join(tpmSrcDir, name)
114 target := filepath.Join(tpmTargetDir, name)
115 if err := copyFile(src, target); err != nil {
116 return fmt.Errorf("failed to copy TPM directory: file %q to %q: %w", src, target, err)
117 }
118 }
119
120 var qemuNetType string
121 var qemuNetConfig launch.QemuValue
122 if options.ConnectToSocket != nil {
123 qemuNetType = "socket"
124 qemuNetConfig = launch.QemuValue{
125 "id": {"net0"},
126 "fd": {"3"},
127 }
128 } else {
129 qemuNetType = "user"
130 qemuNetConfig = launch.QemuValue{
131 "id": {"net0"},
132 "net": {"10.42.0.0/24"},
133 "dhcpstart": {"10.42.0.10"},
134 "hostfwd": options.Ports.ToQemuForwards(),
135 }
136 }
137
138 tpmSocketPath := filepath.Join(tempDir, "tpm-socket")
139
140 mac, err := generateRandomEthernetMAC()
141 if err != nil {
142 return err
143 }
144
145 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
146 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
147 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
148 "-drive", "if=pflash,format=raw,snapshot=on,file=external/edk2/OVMF_VARS.fd",
149 "-drive", "if=virtio,format=raw,snapshot=on,cache=unsafe,file=metropolis/node/node.img",
150 "-netdev", qemuNetConfig.ToOption(qemuNetType),
151 "-device", "virtio-net-pci,netdev=net0,mac=" + mac.String(),
152 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
153 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
154 "-device", "tpm-tis,tpmdev=tpm0",
155 "-device", "virtio-rng-pci",
156 "-serial", "stdio"}
157
158 if !options.AllowReboot {
159 qemuArgs = append(qemuArgs, "-no-reboot")
160 }
161
162 if options.NodeParameters != nil {
163 parametersPath := filepath.Join(tempDir, "parameters.pb")
164 parametersRaw, err := proto.Marshal(options.NodeParameters)
165 if err != nil {
166 return fmt.Errorf("failed to encode node paraeters: %w", err)
167 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100168 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200169 return fmt.Errorf("failed to write node parameters: %w", err)
170 }
171 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
172 }
173
174 // Start TPM emulator as a subprocess
175 tpmCtx, tpmCancel := context.WithCancel(ctx)
176 defer tpmCancel()
177
178 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmTargetDir, "--ctrl", "type=unixio,path="+tpmSocketPath)
179 tpmEmuCmd.Stderr = os.Stderr
180 tpmEmuCmd.Stdout = os.Stdout
181
182 err = tpmEmuCmd.Start()
183 if err != nil {
184 return fmt.Errorf("failed to start TPM emulator: %w", err)
185 }
186
187 // Start the main qemu binary
188 systemCmd := exec.CommandContext(ctx, "qemu-system-x86_64", qemuArgs...)
189 if options.ConnectToSocket != nil {
190 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
191 }
192
193 var stdErrBuf bytes.Buffer
194 systemCmd.Stderr = &stdErrBuf
195 systemCmd.Stdout = options.SerialPort
196
197 err = systemCmd.Run()
198
199 // Stop TPM emulator and wait for it to exit to properly reap the child process
200 tpmCancel()
201 log.Print("Node: Waiting for TPM emulator to exit")
202 // Wait returns a SIGKILL error because we just cancelled its context.
203 // We still need to call it to avoid creating zombies.
204 _ = tpmEmuCmd.Wait()
205 log.Print("Node: TPM emulator done")
206
207 var exerr *exec.ExitError
208 if err != nil && errors.As(err, &exerr) {
209 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
210 if status.Signaled() && status.Signal() == syscall.SIGKILL {
211 // Process was killed externally (most likely by our context being canceled).
212 // This is a normal exit for us, so return nil
213 return nil
214 }
215 exerr.Stderr = stdErrBuf.Bytes()
216 newErr := launch.QEMUError(*exerr)
217 return &newErr
218 }
219 return err
220}
221
222func copyFile(src, dst string) error {
223 in, err := os.Open(src)
224 if err != nil {
225 return fmt.Errorf("when opening source: %w", err)
226 }
227 defer in.Close()
228
229 out, err := os.Create(dst)
230 if err != nil {
231 return fmt.Errorf("when creating destination: %w", err)
232 }
233 defer out.Close()
234
235 _, err = io.Copy(out, in)
236 if err != nil {
237 return fmt.Errorf("when copying file: %w", err)
238 }
239 return out.Close()
240}
241
Serge Bazanskie78a0892021-10-07 17:03:49 +0200242// getNodes wraps around Management.GetNodes to return a list of nodes in a
243// cluster.
244func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200245 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100246 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100247 err := backoff.Retry(func() error {
248 res = nil
249 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200250 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100251 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200252 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100253 for {
254 node, err := srvN.Recv()
255 if err == io.EOF {
256 break
257 }
258 if err != nil {
259 return fmt.Errorf("GetNodes.Recv: %w", err)
260 }
261 res = append(res, node)
262 }
263 return nil
264 }, bo)
265 if err != nil {
266 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200267 }
268 return res, nil
269}
270
Serge Bazanski66e58952021-10-05 17:06:56 +0200271// Gets a random EUI-48 Ethernet MAC address
272func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
273 macBuf := make([]byte, 6)
274 _, err := rand.Read(macBuf)
275 if err != nil {
276 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
277 }
278
279 // Set U/L bit and clear I/G bit (locally administered individual MAC)
280 // Ref IEEE 802-2014 Section 8.2.2
281 macBuf[0] = (macBuf[0] | 2) & 0xfe
282 mac := net.HardwareAddr(macBuf)
283 return &mac, nil
284}
285
Serge Bazanskibe742842022-04-04 13:18:50 +0200286const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200287
Serge Bazanskibe742842022-04-04 13:18:50 +0200288// ClusterPorts contains all ports handled by Nanoswitch.
289var ClusterPorts = []uint16{
290 // Forwarded to the first node.
291 uint16(node.CuratorServicePort),
292 uint16(node.DebugServicePort),
293 uint16(node.KubernetesAPIPort),
294 uint16(node.KubernetesAPIWrappedPort),
295
296 // SOCKS proxy to the switch network
297 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200298}
299
300// ClusterOptions contains all options for launching a Metropolis cluster.
301type ClusterOptions struct {
302 // The number of nodes this cluster should be started with.
303 NumNodes int
304}
305
306// Cluster is the running Metropolis cluster launched using the LaunchCluster
307// function.
308type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200309 // Owner is the TLS Certificate of the owner of the test cluster. This can be
310 // used to authenticate further clients to the running cluster.
311 Owner tls.Certificate
312 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200313 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200314 Ports launch.PortMap
315
Serge Bazanskibe742842022-04-04 13:18:50 +0200316 // Nodes is a map from Node ID to its runtime information.
317 Nodes map[string]*NodeInCluster
318 // NodeIDs is a list of node IDs that are backing this cluster, in order of
319 // creation.
320 NodeIDs []string
321
Serge Bazanski66e58952021-10-05 17:06:56 +0200322 // nodesDone is a list of channels populated with the return codes from all the
323 // nodes' qemu instances. It's used by Close to ensure all nodes have
324 // succesfully been stopped.
325 nodesDone []chan error
326 // ctxC is used by Close to cancel the context under which the nodes are
327 // running.
328 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200329 // socksDialer is used by DialNode to establish connections to nodes via the
330 // SOCKS server ran by nanoswitch.
331 socksDialer proxy.Dialer
332}
333
334// NodeInCluster represents information about a node that's part of a Cluster.
335type NodeInCluster struct {
336 // ID of the node, which can be used to dial this node's services via DialNode.
337 ID string
338 // Address of the node on the network ran by nanoswitch. Not reachable from the
339 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
340 // on Cluster.Ports[SOCKSPort]).
341 ManagementAddress string
342}
343
344// firstConnection performs the initial owner credential escrow with a newly
345// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
346// running at 10.1.0.2, which is always the case with the current nanoswitch
347// implementation.
348//
349// It returns the newly escrowed credentials as well as the firt node's
350// information as NodeInCluster.
351func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
352 // Dial external service.
353 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
354 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
355 if err != nil {
356 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
357 }
358 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
359 return socksDialer.Dial("tcp", addr)
360 }
361 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
362 if err != nil {
363 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
364 }
365 defer initClient.Close()
366
367 // Retrieve owner certificate - this can take a while because the node is still
368 // coming up, so do it in a backoff loop.
369 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
370 aaa := apb.NewAAAClient(initClient)
371 var cert *tls.Certificate
372 err = backoff.Retry(func() error {
373 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
374 if st, ok := status.FromError(err); ok {
375 if st.Code() == codes.Unavailable {
376 return err
377 }
378 }
379 return backoff.Permanent(err)
380 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
381 if err != nil {
382 return nil, nil, err
383 }
384 log.Printf("Cluster: retrieved owner certificate.")
385
386 // Now connect authenticated and get the node ID.
387 creds := rpc.NewAuthenticatedCredentials(*cert, nil)
388 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
389 if err != nil {
390 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
391 }
392 defer authClient.Close()
393 mgmt := apb.NewManagementClient(authClient)
394
395 var node *NodeInCluster
396 err = backoff.Retry(func() error {
397 nodes, err := getNodes(ctx, mgmt)
398 if err != nil {
399 return fmt.Errorf("retrieving nodes failed: %w", err)
400 }
401 if len(nodes) != 1 {
402 return fmt.Errorf("expected one node, got %d", len(nodes))
403 }
404 n := nodes[0]
405 if n.Status == nil || n.Status.ExternalAddress == "" {
406 return fmt.Errorf("node has no status and/or address")
407 }
408 node = &NodeInCluster{
409 ID: identity.NodeID(n.Pubkey),
410 ManagementAddress: n.Status.ExternalAddress,
411 }
412 return nil
413 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
414 if err != nil {
415 return nil, nil, err
416 }
417
418 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200419}
420
421// LaunchCluster launches a cluster of Metropolis node VMs together with a
422// Nanoswitch instance to network them all together.
423//
424// The given context will be used to run all qemu instances in the cluster, and
425// canceling the context or calling Close() will terminate them.
426func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200427 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200428 return nil, errors.New("refusing to start cluster with zero nodes")
429 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200430
431 ctxT, ctxC := context.WithCancel(ctx)
432
433 // Prepare links between nodes and nanoswitch.
434 var switchPorts []*os.File
435 var vmPorts []*os.File
436 for i := 0; i < opts.NumNodes; i++ {
437 switchPort, vmPort, err := launch.NewSocketPair()
438 if err != nil {
439 ctxC()
440 return nil, fmt.Errorf("failed to get socketpair: %w", err)
441 }
442 switchPorts = append(switchPorts, switchPort)
443 vmPorts = append(vmPorts, vmPort)
444 }
445
Serge Bazanskie78a0892021-10-07 17:03:49 +0200446 // Make a list of channels that will be populated by all running node qemu
447 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200448 done := make([]chan error, opts.NumNodes)
449 for i, _ := range done {
450 done[i] = make(chan error, 1)
451 }
452
453 // Start first node.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200454 log.Printf("Cluster: Starting node %d...", 1)
Serge Bazanski66e58952021-10-05 17:06:56 +0200455 go func() {
456 err := LaunchNode(ctxT, NodeOptions{
457 ConnectToSocket: vmPorts[0],
458 NodeParameters: &apb.NodeParameters{
459 Cluster: &apb.NodeParameters_ClusterBootstrap_{
460 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
461 OwnerPublicKey: InsecurePublicKey,
462 },
463 },
464 },
Serge Bazanski075465c2021-11-16 15:38:49 +0100465 SerialPort: newPrefixedStdio(0),
Serge Bazanski66e58952021-10-05 17:06:56 +0200466 })
467 done[0] <- err
468 }()
469
Serge Bazanskie78a0892021-10-07 17:03:49 +0200470 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200471 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
472 if err != nil {
473 ctxC()
474 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
475 }
476
477 go func() {
478 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
479 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100480 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200481 ExtraNetworkInterfaces: switchPorts,
482 PortMap: portMap,
483 }); err != nil {
484 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100485 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200486 }
487 }
488 }()
489
Serge Bazanskibe742842022-04-04 13:18:50 +0200490 // Build SOCKS dialer.
491 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
492 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200493 if err != nil {
494 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200495 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200496 }
497
Serge Bazanskibe742842022-04-04 13:18:50 +0200498 // Retrieve owner credentials and first node.
499 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200500 if err != nil {
501 ctxC()
502 return nil, err
503 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200504
Serge Bazanskibe742842022-04-04 13:18:50 +0200505 cluster := &Cluster{
506 Owner: *cert,
507 Ports: portMap,
508 Nodes: map[string]*NodeInCluster{
509 firstNode.ID: firstNode,
510 },
511 NodeIDs: []string{
512 firstNode.ID,
513 },
514
515 nodesDone: done,
516 socksDialer: socksDialer,
517
518 ctxC: ctxC,
519 }
520
521 // Now start the rest of the nodes and register them into the cluster.
522
523 // Build authenticated owner client to first node.
Serge Bazanski399ce552022-03-29 12:52:42 +0200524 authCreds := rpc.NewAuthenticatedCredentials(*cert, nil)
Serge Bazanskibe742842022-04-04 13:18:50 +0200525 remote := net.JoinHostPort(cluster.NodeIDs[0], common.CuratorServicePort.PortString())
526 authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds), grpc.WithContextDialer(cluster.DialNode))
Serge Bazanski66e58952021-10-05 17:06:56 +0200527 if err != nil {
528 ctxC()
Serge Bazanski399ce552022-03-29 12:52:42 +0200529 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200530 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200531 defer authClient.Close()
Serge Bazanskie78a0892021-10-07 17:03:49 +0200532 mgmt := apb.NewManagementClient(authClient)
533
534 // Retrieve register ticket to register further nodes.
535 log.Printf("Cluster: retrieving register ticket...")
536 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
537 if err != nil {
538 ctxC()
539 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
540 }
541 ticket := resT.Ticket
542 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
543
544 // Retrieve cluster info (for directory and ca public key) to register further
545 // nodes.
546 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
547 if err != nil {
548 ctxC()
549 return nil, fmt.Errorf("GetClusterInfo: %w", err)
550 }
551
Serge Bazanskie78a0892021-10-07 17:03:49 +0200552 // TODO(q3k): parallelize this
553 for i := 1; i < opts.NumNodes; i++ {
554 log.Printf("Cluster: Starting node %d...", i+1)
555 go func(i int) {
556 err := LaunchNode(ctxT, NodeOptions{
557 ConnectToSocket: vmPorts[i],
558 NodeParameters: &apb.NodeParameters{
559 Cluster: &apb.NodeParameters_ClusterRegister_{
560 ClusterRegister: &apb.NodeParameters_ClusterRegister{
561 RegisterTicket: ticket,
562 ClusterDirectory: resI.ClusterDirectory,
563 CaCertificate: resI.CaCertificate,
564 },
565 },
566 },
Serge Bazanski075465c2021-11-16 15:38:49 +0100567 SerialPort: newPrefixedStdio(i),
Serge Bazanskie78a0892021-10-07 17:03:49 +0200568 })
569 done[i] <- err
570 }(i)
571 var newNode *apb.Node
572
573 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
574 for {
575 nodes, err := getNodes(ctx, mgmt)
576 if err != nil {
577 ctxC()
578 return nil, fmt.Errorf("could not get nodes: %w", err)
579 }
580 for _, n := range nodes {
581 if n.State == cpb.NodeState_NODE_STATE_NEW {
582 newNode = n
583 break
584 }
585 }
586 if newNode != nil {
587 break
588 }
589 time.Sleep(1 * time.Second)
590 }
591 id := identity.NodeID(newNode.Pubkey)
592 log.Printf("Cluster: node %d is %s", i, id)
593
594 log.Printf("Cluster: approving node %d", i)
595 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
596 Pubkey: newNode.Pubkey,
597 })
598 if err != nil {
599 ctxC()
600 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
601 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200602 log.Printf("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200603 for {
604 nodes, err := getNodes(ctx, mgmt)
605 if err != nil {
606 ctxC()
607 return nil, fmt.Errorf("could not get nodes: %w", err)
608 }
609 found := false
610 for _, n := range nodes {
611 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
612 continue
613 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200614 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200615 break
616 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200617 if n.State != cpb.NodeState_NODE_STATE_UP {
618 break
619 }
620 found = true
621 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
622 ID: identity.NodeID(n.Pubkey),
623 ManagementAddress: n.Status.ExternalAddress,
624 }
625 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
626 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200627 }
628 if found {
629 break
630 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200631 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200632 }
633 log.Printf("Cluster: node %d (%s) UP!", i, id)
634 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200635
Serge Bazanskibe742842022-04-04 13:18:50 +0200636 log.Printf("Cluster: all nodes up:")
637 for _, node := range cluster.Nodes {
638 log.Printf("Cluster: - %s at %s", node.ID, node.ManagementAddress)
639 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200640
Serge Bazanskibe742842022-04-04 13:18:50 +0200641 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200642}
643
Serge Bazanskibe742842022-04-04 13:18:50 +0200644// Close cancels the running cluster's context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +0200645// nodes to stop. It returns an error if stopping the nodes failed, or one of
646// the nodes failed to fully start in the first place.
647func (c *Cluster) Close() error {
648 log.Printf("Cluster: stopping...")
649 c.ctxC()
650
651 var errors []error
652 log.Printf("Cluster: waiting for nodes to exit...")
653 for _, c := range c.nodesDone {
654 err := <-c
655 if err != nil {
656 errors = append(errors, err)
657 }
658 }
659 log.Printf("Cluster: done")
660 return multierr.Combine(errors...)
661}
Serge Bazanskibe742842022-04-04 13:18:50 +0200662
663// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
664// their ID. This is performed by connecting to the cluster nanoswitch via its
665// SOCKS proxy, and using the cluster node list for name resolution.
666//
667// For example:
668//
669// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
670//
671func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
672 host, port, err := net.SplitHostPort(addr)
673 if err != nil {
674 return nil, fmt.Errorf("invalid host:port: %w", err)
675 }
676 // Already an IP address?
677 if net.ParseIP(host) != nil {
678 return c.socksDialer.Dial("tcp", addr)
679 }
680
681 // Otherwise, expect a node name.
682 node, ok := c.Nodes[host]
683 if !ok {
684 return nil, fmt.Errorf("unknown node %q", host)
685 }
686 addr = net.JoinHostPort(node.ManagementAddress, port)
687 return c.socksDialer.Dial("tcp", addr)
688}