blob: 9b32f977908dd8f9ea65630256a82fd6d16c2281 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
24 grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
25 "go.uber.org/multierr"
26 "google.golang.org/grpc"
27 "google.golang.org/protobuf/proto"
28
29 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020030 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020031 "source.monogon.dev/metropolis/node/core/rpc"
32 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020033 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "source.monogon.dev/metropolis/test/launch"
35)
36
37// Options contains all options that can be passed to Launch()
38type NodeOptions struct {
39 // Ports contains the port mapping where to expose the internal ports of the VM to
40 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
41 // ConnectToSocket is set.
42 Ports launch.PortMap
43
44 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
45 // command. Metropolis nodes generally restarts on almost all errors, so unless you
46 // want to test reboot behavior this should be false.
47 AllowReboot bool
48
49 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
50 // it is instead connected to the given file descriptor/socket. If this is set, all
51 // port maps from the Ports option are ignored. Intended for networking this
52 // instance together with others for running more complex network configurations.
53 ConnectToSocket *os.File
54
55 // SerialPort is a io.ReadWriter over which you can communicate with the serial
56 // port of the machine It can be set to an existing file descriptor (like
57 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
58 SerialPort io.ReadWriter
59
60 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
61 // registering into a cluster.
62 NodeParameters *apb.NodeParameters
63}
64
65// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020066var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020067 node.ConsensusPort,
68
69 node.CuratorServicePort,
70 node.DebugServicePort,
71
72 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +010073 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020074 node.CuratorServicePort,
75 node.DebuggerPort,
76}
77
78// LaunchNode launches a single Metropolis node instance with the given options.
79// The instance runs mostly paravirtualized but with some emulated hardware
80// similar to how a cloud provider might set up its VMs. The disk is fully
81// writable but is run in snapshot mode meaning that changes are not kept beyond
82// a single invocation.
83func LaunchNode(ctx context.Context, options NodeOptions) error {
84 // Pin temp directory to /tmp until we can use abstract socket namespace in QEMU
85 // (next release after 5.0,
86 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
87 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
88 // that we open and pass the name of it to QEMU. Not pinning this crashes both
89 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
90 // reasons 108 chars).
Lorenz Brun764a2de2021-11-22 16:26:36 +010091 tempDir, err := os.MkdirTemp("/tmp", "launch*")
Serge Bazanski66e58952021-10-05 17:06:56 +020092 if err != nil {
93 return fmt.Errorf("failed to create temporary directory: %w", err)
94 }
95 defer os.RemoveAll(tempDir)
96
97 // Copy TPM state into a temporary directory since it's being modified by the
98 // emulator
99 tpmTargetDir := filepath.Join(tempDir, "tpm")
100 tpmSrcDir := "metropolis/node/tpm"
101 if err := os.Mkdir(tpmTargetDir, 0755); err != nil {
102 return fmt.Errorf("failed to create TPM state directory: %w", err)
103 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100104 tpmFiles, err := os.ReadDir(tpmSrcDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200105 if err != nil {
106 return fmt.Errorf("failed to read TPM directory: %w", err)
107 }
108 for _, file := range tpmFiles {
109 name := file.Name()
110 src := filepath.Join(tpmSrcDir, name)
111 target := filepath.Join(tpmTargetDir, name)
112 if err := copyFile(src, target); err != nil {
113 return fmt.Errorf("failed to copy TPM directory: file %q to %q: %w", src, target, err)
114 }
115 }
116
117 var qemuNetType string
118 var qemuNetConfig launch.QemuValue
119 if options.ConnectToSocket != nil {
120 qemuNetType = "socket"
121 qemuNetConfig = launch.QemuValue{
122 "id": {"net0"},
123 "fd": {"3"},
124 }
125 } else {
126 qemuNetType = "user"
127 qemuNetConfig = launch.QemuValue{
128 "id": {"net0"},
129 "net": {"10.42.0.0/24"},
130 "dhcpstart": {"10.42.0.10"},
131 "hostfwd": options.Ports.ToQemuForwards(),
132 }
133 }
134
135 tpmSocketPath := filepath.Join(tempDir, "tpm-socket")
136
137 mac, err := generateRandomEthernetMAC()
138 if err != nil {
139 return err
140 }
141
142 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
143 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
144 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
145 "-drive", "if=pflash,format=raw,snapshot=on,file=external/edk2/OVMF_VARS.fd",
146 "-drive", "if=virtio,format=raw,snapshot=on,cache=unsafe,file=metropolis/node/node.img",
147 "-netdev", qemuNetConfig.ToOption(qemuNetType),
148 "-device", "virtio-net-pci,netdev=net0,mac=" + mac.String(),
149 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
150 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
151 "-device", "tpm-tis,tpmdev=tpm0",
152 "-device", "virtio-rng-pci",
153 "-serial", "stdio"}
154
155 if !options.AllowReboot {
156 qemuArgs = append(qemuArgs, "-no-reboot")
157 }
158
159 if options.NodeParameters != nil {
160 parametersPath := filepath.Join(tempDir, "parameters.pb")
161 parametersRaw, err := proto.Marshal(options.NodeParameters)
162 if err != nil {
163 return fmt.Errorf("failed to encode node paraeters: %w", err)
164 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100165 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200166 return fmt.Errorf("failed to write node parameters: %w", err)
167 }
168 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
169 }
170
171 // Start TPM emulator as a subprocess
172 tpmCtx, tpmCancel := context.WithCancel(ctx)
173 defer tpmCancel()
174
175 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmTargetDir, "--ctrl", "type=unixio,path="+tpmSocketPath)
176 tpmEmuCmd.Stderr = os.Stderr
177 tpmEmuCmd.Stdout = os.Stdout
178
179 err = tpmEmuCmd.Start()
180 if err != nil {
181 return fmt.Errorf("failed to start TPM emulator: %w", err)
182 }
183
184 // Start the main qemu binary
185 systemCmd := exec.CommandContext(ctx, "qemu-system-x86_64", qemuArgs...)
186 if options.ConnectToSocket != nil {
187 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
188 }
189
190 var stdErrBuf bytes.Buffer
191 systemCmd.Stderr = &stdErrBuf
192 systemCmd.Stdout = options.SerialPort
193
194 err = systemCmd.Run()
195
196 // Stop TPM emulator and wait for it to exit to properly reap the child process
197 tpmCancel()
198 log.Print("Node: Waiting for TPM emulator to exit")
199 // Wait returns a SIGKILL error because we just cancelled its context.
200 // We still need to call it to avoid creating zombies.
201 _ = tpmEmuCmd.Wait()
202 log.Print("Node: TPM emulator done")
203
204 var exerr *exec.ExitError
205 if err != nil && errors.As(err, &exerr) {
206 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
207 if status.Signaled() && status.Signal() == syscall.SIGKILL {
208 // Process was killed externally (most likely by our context being canceled).
209 // This is a normal exit for us, so return nil
210 return nil
211 }
212 exerr.Stderr = stdErrBuf.Bytes()
213 newErr := launch.QEMUError(*exerr)
214 return &newErr
215 }
216 return err
217}
218
219func copyFile(src, dst string) error {
220 in, err := os.Open(src)
221 if err != nil {
222 return fmt.Errorf("when opening source: %w", err)
223 }
224 defer in.Close()
225
226 out, err := os.Create(dst)
227 if err != nil {
228 return fmt.Errorf("when creating destination: %w", err)
229 }
230 defer out.Close()
231
232 _, err = io.Copy(out, in)
233 if err != nil {
234 return fmt.Errorf("when copying file: %w", err)
235 }
236 return out.Close()
237}
238
Serge Bazanskie78a0892021-10-07 17:03:49 +0200239// getNodes wraps around Management.GetNodes to return a list of nodes in a
240// cluster.
241func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200242 var res []*apb.Node
Serge Bazanski075465c2021-11-16 15:38:49 +0100243 bo := backoff.NewExponentialBackOff()
244 err := backoff.Retry(func() error {
245 res = nil
246 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200247 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100248 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200249 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100250 for {
251 node, err := srvN.Recv()
252 if err == io.EOF {
253 break
254 }
255 if err != nil {
256 return fmt.Errorf("GetNodes.Recv: %w", err)
257 }
258 res = append(res, node)
259 }
260 return nil
261 }, bo)
262 if err != nil {
263 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200264 }
265 return res, nil
266}
267
Serge Bazanski66e58952021-10-05 17:06:56 +0200268// Gets a random EUI-48 Ethernet MAC address
269func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
270 macBuf := make([]byte, 6)
271 _, err := rand.Read(macBuf)
272 if err != nil {
273 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
274 }
275
276 // Set U/L bit and clear I/G bit (locally administered individual MAC)
277 // Ref IEEE 802-2014 Section 8.2.2
278 macBuf[0] = (macBuf[0] | 2) & 0xfe
279 mac := net.HardwareAddr(macBuf)
280 return &mac, nil
281}
282
283// ClusterPorts contains all ports forwarded by Nanoswitch to the first VM in a
284// launched Metropolis cluster.
Serge Bazanski52304a82021-10-29 16:56:18 +0200285var ClusterPorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200286 node.CuratorServicePort,
287 node.DebugServicePort,
288
289 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100290 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200291}
292
293// ClusterOptions contains all options for launching a Metropolis cluster.
294type ClusterOptions struct {
295 // The number of nodes this cluster should be started with.
296 NumNodes int
297}
298
299// Cluster is the running Metropolis cluster launched using the LaunchCluster
300// function.
301type Cluster struct {
302 // Debug is the NodeDebugService gRPC service, allowing for debug
303 // unauthenticated cluster to the access.
304 Debug apb.NodeDebugServiceClient
305 // Management is the Management gRPC service, authenticated as the owner of the
306 // cluster.
307 Management apb.ManagementClient
308 // Owner is the TLS Certificate of the owner of the test cluster. This can be
309 // used to authenticate further clients to the running cluster.
310 Owner tls.Certificate
311 // Ports is the PortMap used to access the first nodes' services (defined in
312 // ClusterPorts).
313 Ports launch.PortMap
314
315 // nodesDone is a list of channels populated with the return codes from all the
316 // nodes' qemu instances. It's used by Close to ensure all nodes have
317 // succesfully been stopped.
318 nodesDone []chan error
319 // ctxC is used by Close to cancel the context under which the nodes are
320 // running.
321 ctxC context.CancelFunc
322}
323
324// LaunchCluster launches a cluster of Metropolis node VMs together with a
325// Nanoswitch instance to network them all together.
326//
327// The given context will be used to run all qemu instances in the cluster, and
328// canceling the context or calling Close() will terminate them.
329func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200330 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200331 return nil, errors.New("refusing to start cluster with zero nodes")
332 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200333
334 ctxT, ctxC := context.WithCancel(ctx)
335
336 // Prepare links between nodes and nanoswitch.
337 var switchPorts []*os.File
338 var vmPorts []*os.File
339 for i := 0; i < opts.NumNodes; i++ {
340 switchPort, vmPort, err := launch.NewSocketPair()
341 if err != nil {
342 ctxC()
343 return nil, fmt.Errorf("failed to get socketpair: %w", err)
344 }
345 switchPorts = append(switchPorts, switchPort)
346 vmPorts = append(vmPorts, vmPort)
347 }
348
Serge Bazanskie78a0892021-10-07 17:03:49 +0200349 // Make a list of channels that will be populated by all running node qemu
350 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200351 done := make([]chan error, opts.NumNodes)
352 for i, _ := range done {
353 done[i] = make(chan error, 1)
354 }
355
356 // Start first node.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200357 log.Printf("Cluster: Starting node %d...", 1)
Serge Bazanski66e58952021-10-05 17:06:56 +0200358 go func() {
359 err := LaunchNode(ctxT, NodeOptions{
360 ConnectToSocket: vmPorts[0],
361 NodeParameters: &apb.NodeParameters{
362 Cluster: &apb.NodeParameters_ClusterBootstrap_{
363 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
364 OwnerPublicKey: InsecurePublicKey,
365 },
366 },
367 },
Serge Bazanski075465c2021-11-16 15:38:49 +0100368 SerialPort: newPrefixedStdio(0),
Serge Bazanski66e58952021-10-05 17:06:56 +0200369 })
370 done[0] <- err
371 }()
372
Serge Bazanskie78a0892021-10-07 17:03:49 +0200373 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200374 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
375 if err != nil {
376 ctxC()
377 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
378 }
379
380 go func() {
381 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
382 KernelPath: "metropolis/test/ktest/vmlinux",
383 InitramfsPath: "metropolis/test/nanoswitch/initramfs.lz4",
384 ExtraNetworkInterfaces: switchPorts,
385 PortMap: portMap,
386 }); err != nil {
387 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100388 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200389 }
390 }
391 }()
392
393 // Dial debug service.
394 copts := []grpcretry.CallOption{
395 grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
396 }
397 debugConn, err := portMap.DialGRPC(node.DebugServicePort, grpc.WithInsecure(),
398 grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(copts...)))
399 if err != nil {
400 ctxC()
401 return nil, fmt.Errorf("failed to dial debug service: %w", err)
402 }
403
404 // Dial external service.
405 remote := fmt.Sprintf("localhost:%v", portMap[node.CuratorServicePort])
406 initClient, err := rpc.NewEphemeralClient(remote, InsecurePrivateKey, nil)
407 if err != nil {
408 ctxC()
409 return nil, fmt.Errorf("NewInitialClient: %w", err)
410 }
411
412 // Retrieve owner certificate - this can take a while because the node is still
413 // coming up, so do it in a backoff loop.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200414 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanski66e58952021-10-05 17:06:56 +0200415 aaa := apb.NewAAAClient(initClient)
416 var cert *tls.Certificate
417 err = backoff.Retry(func() error {
418 cert, err = rpc.RetrieveOwnerCertificate(ctxT, aaa, InsecurePrivateKey)
419 return err
420 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctxT))
421 if err != nil {
422 ctxC()
423 return nil, err
424 }
425 log.Printf("Cluster: retrieved owner certificate.")
426
Serge Bazanskie78a0892021-10-07 17:03:49 +0200427 // Build authenticated owner client to new node.
Serge Bazanski66e58952021-10-05 17:06:56 +0200428 authClient, err := rpc.NewAuthenticatedClient(remote, *cert, nil)
429 if err != nil {
430 ctxC()
431 return nil, fmt.Errorf("NewAuthenticatedClient: %w", err)
432 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200433 mgmt := apb.NewManagementClient(authClient)
434
435 // Retrieve register ticket to register further nodes.
436 log.Printf("Cluster: retrieving register ticket...")
437 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
438 if err != nil {
439 ctxC()
440 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
441 }
442 ticket := resT.Ticket
443 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
444
445 // Retrieve cluster info (for directory and ca public key) to register further
446 // nodes.
447 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
448 if err != nil {
449 ctxC()
450 return nil, fmt.Errorf("GetClusterInfo: %w", err)
451 }
452
453 // Now run the rest of the nodes.
454 //
455 // TODO(q3k): parallelize this
456 for i := 1; i < opts.NumNodes; i++ {
457 log.Printf("Cluster: Starting node %d...", i+1)
458 go func(i int) {
459 err := LaunchNode(ctxT, NodeOptions{
460 ConnectToSocket: vmPorts[i],
461 NodeParameters: &apb.NodeParameters{
462 Cluster: &apb.NodeParameters_ClusterRegister_{
463 ClusterRegister: &apb.NodeParameters_ClusterRegister{
464 RegisterTicket: ticket,
465 ClusterDirectory: resI.ClusterDirectory,
466 CaCertificate: resI.CaCertificate,
467 },
468 },
469 },
Serge Bazanski075465c2021-11-16 15:38:49 +0100470 SerialPort: newPrefixedStdio(i),
Serge Bazanskie78a0892021-10-07 17:03:49 +0200471 })
472 done[i] <- err
473 }(i)
474 var newNode *apb.Node
475
476 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
477 for {
478 nodes, err := getNodes(ctx, mgmt)
479 if err != nil {
480 ctxC()
481 return nil, fmt.Errorf("could not get nodes: %w", err)
482 }
483 for _, n := range nodes {
484 if n.State == cpb.NodeState_NODE_STATE_NEW {
485 newNode = n
486 break
487 }
488 }
489 if newNode != nil {
490 break
491 }
492 time.Sleep(1 * time.Second)
493 }
494 id := identity.NodeID(newNode.Pubkey)
495 log.Printf("Cluster: node %d is %s", i, id)
496
497 log.Printf("Cluster: approving node %d", i)
498 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
499 Pubkey: newNode.Pubkey,
500 })
501 if err != nil {
502 ctxC()
503 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
504 }
505 log.Printf("Cluster: node %d approved, waiting for it to appear as UP...", i)
506 for {
507 nodes, err := getNodes(ctx, mgmt)
508 if err != nil {
509 ctxC()
510 return nil, fmt.Errorf("could not get nodes: %w", err)
511 }
512 found := false
513 for _, n := range nodes {
514 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
515 continue
516 }
517 if n.State == cpb.NodeState_NODE_STATE_UP {
518 found = true
519 break
520 }
521 time.Sleep(time.Second)
522 }
523 if found {
524 break
525 }
526 }
527 log.Printf("Cluster: node %d (%s) UP!", i, id)
528 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200529
530 return &Cluster{
531 Debug: apb.NewNodeDebugServiceClient(debugConn),
Serge Bazanskie78a0892021-10-07 17:03:49 +0200532 Management: mgmt,
Serge Bazanski66e58952021-10-05 17:06:56 +0200533 Owner: *cert,
534 Ports: portMap,
535 nodesDone: done,
536
537 ctxC: ctxC,
538 }, nil
539}
540
541// Close cancels the running clusters' context and waits for all virtualized
542// nodes to stop. It returns an error if stopping the nodes failed, or one of
543// the nodes failed to fully start in the first place.
544func (c *Cluster) Close() error {
545 log.Printf("Cluster: stopping...")
546 c.ctxC()
547
548 var errors []error
549 log.Printf("Cluster: waiting for nodes to exit...")
550 for _, c := range c.nodesDone {
551 err := <-c
552 if err != nil {
553 errors = append(errors, err)
554 }
555 }
556 log.Printf("Cluster: done")
557 return multierr.Combine(errors...)
558}