blob: 5b1f3dc1ec309b0c95c476fd8a6586eb07cb2f89 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
24 grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
25 "go.uber.org/multierr"
26 "google.golang.org/grpc"
27 "google.golang.org/protobuf/proto"
28
29 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020030 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020031 "source.monogon.dev/metropolis/node/core/rpc"
32 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020033 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "source.monogon.dev/metropolis/test/launch"
35)
36
37// Options contains all options that can be passed to Launch()
38type NodeOptions struct {
39 // Ports contains the port mapping where to expose the internal ports of the VM to
40 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
41 // ConnectToSocket is set.
42 Ports launch.PortMap
43
44 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
45 // command. Metropolis nodes generally restarts on almost all errors, so unless you
46 // want to test reboot behavior this should be false.
47 AllowReboot bool
48
49 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
50 // it is instead connected to the given file descriptor/socket. If this is set, all
51 // port maps from the Ports option are ignored. Intended for networking this
52 // instance together with others for running more complex network configurations.
53 ConnectToSocket *os.File
54
55 // SerialPort is a io.ReadWriter over which you can communicate with the serial
56 // port of the machine It can be set to an existing file descriptor (like
57 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
58 SerialPort io.ReadWriter
59
60 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
61 // registering into a cluster.
62 NodeParameters *apb.NodeParameters
63}
64
65// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020066var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020067 node.ConsensusPort,
68
69 node.CuratorServicePort,
70 node.DebugServicePort,
71
72 node.KubernetesAPIPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020073 node.CuratorServicePort,
74 node.DebuggerPort,
75}
76
77// LaunchNode launches a single Metropolis node instance with the given options.
78// The instance runs mostly paravirtualized but with some emulated hardware
79// similar to how a cloud provider might set up its VMs. The disk is fully
80// writable but is run in snapshot mode meaning that changes are not kept beyond
81// a single invocation.
82func LaunchNode(ctx context.Context, options NodeOptions) error {
83 // Pin temp directory to /tmp until we can use abstract socket namespace in QEMU
84 // (next release after 5.0,
85 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
86 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
87 // that we open and pass the name of it to QEMU. Not pinning this crashes both
88 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
89 // reasons 108 chars).
Lorenz Brun764a2de2021-11-22 16:26:36 +010090 tempDir, err := os.MkdirTemp("/tmp", "launch*")
Serge Bazanski66e58952021-10-05 17:06:56 +020091 if err != nil {
92 return fmt.Errorf("failed to create temporary directory: %w", err)
93 }
94 defer os.RemoveAll(tempDir)
95
96 // Copy TPM state into a temporary directory since it's being modified by the
97 // emulator
98 tpmTargetDir := filepath.Join(tempDir, "tpm")
99 tpmSrcDir := "metropolis/node/tpm"
100 if err := os.Mkdir(tpmTargetDir, 0755); err != nil {
101 return fmt.Errorf("failed to create TPM state directory: %w", err)
102 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100103 tpmFiles, err := os.ReadDir(tpmSrcDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200104 if err != nil {
105 return fmt.Errorf("failed to read TPM directory: %w", err)
106 }
107 for _, file := range tpmFiles {
108 name := file.Name()
109 src := filepath.Join(tpmSrcDir, name)
110 target := filepath.Join(tpmTargetDir, name)
111 if err := copyFile(src, target); err != nil {
112 return fmt.Errorf("failed to copy TPM directory: file %q to %q: %w", src, target, err)
113 }
114 }
115
116 var qemuNetType string
117 var qemuNetConfig launch.QemuValue
118 if options.ConnectToSocket != nil {
119 qemuNetType = "socket"
120 qemuNetConfig = launch.QemuValue{
121 "id": {"net0"},
122 "fd": {"3"},
123 }
124 } else {
125 qemuNetType = "user"
126 qemuNetConfig = launch.QemuValue{
127 "id": {"net0"},
128 "net": {"10.42.0.0/24"},
129 "dhcpstart": {"10.42.0.10"},
130 "hostfwd": options.Ports.ToQemuForwards(),
131 }
132 }
133
134 tpmSocketPath := filepath.Join(tempDir, "tpm-socket")
135
136 mac, err := generateRandomEthernetMAC()
137 if err != nil {
138 return err
139 }
140
141 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
142 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
143 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
144 "-drive", "if=pflash,format=raw,snapshot=on,file=external/edk2/OVMF_VARS.fd",
145 "-drive", "if=virtio,format=raw,snapshot=on,cache=unsafe,file=metropolis/node/node.img",
146 "-netdev", qemuNetConfig.ToOption(qemuNetType),
147 "-device", "virtio-net-pci,netdev=net0,mac=" + mac.String(),
148 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
149 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
150 "-device", "tpm-tis,tpmdev=tpm0",
151 "-device", "virtio-rng-pci",
152 "-serial", "stdio"}
153
154 if !options.AllowReboot {
155 qemuArgs = append(qemuArgs, "-no-reboot")
156 }
157
158 if options.NodeParameters != nil {
159 parametersPath := filepath.Join(tempDir, "parameters.pb")
160 parametersRaw, err := proto.Marshal(options.NodeParameters)
161 if err != nil {
162 return fmt.Errorf("failed to encode node paraeters: %w", err)
163 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100164 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200165 return fmt.Errorf("failed to write node parameters: %w", err)
166 }
167 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
168 }
169
170 // Start TPM emulator as a subprocess
171 tpmCtx, tpmCancel := context.WithCancel(ctx)
172 defer tpmCancel()
173
174 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmTargetDir, "--ctrl", "type=unixio,path="+tpmSocketPath)
175 tpmEmuCmd.Stderr = os.Stderr
176 tpmEmuCmd.Stdout = os.Stdout
177
178 err = tpmEmuCmd.Start()
179 if err != nil {
180 return fmt.Errorf("failed to start TPM emulator: %w", err)
181 }
182
183 // Start the main qemu binary
184 systemCmd := exec.CommandContext(ctx, "qemu-system-x86_64", qemuArgs...)
185 if options.ConnectToSocket != nil {
186 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
187 }
188
189 var stdErrBuf bytes.Buffer
190 systemCmd.Stderr = &stdErrBuf
191 systemCmd.Stdout = options.SerialPort
192
193 err = systemCmd.Run()
194
195 // Stop TPM emulator and wait for it to exit to properly reap the child process
196 tpmCancel()
197 log.Print("Node: Waiting for TPM emulator to exit")
198 // Wait returns a SIGKILL error because we just cancelled its context.
199 // We still need to call it to avoid creating zombies.
200 _ = tpmEmuCmd.Wait()
201 log.Print("Node: TPM emulator done")
202
203 var exerr *exec.ExitError
204 if err != nil && errors.As(err, &exerr) {
205 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
206 if status.Signaled() && status.Signal() == syscall.SIGKILL {
207 // Process was killed externally (most likely by our context being canceled).
208 // This is a normal exit for us, so return nil
209 return nil
210 }
211 exerr.Stderr = stdErrBuf.Bytes()
212 newErr := launch.QEMUError(*exerr)
213 return &newErr
214 }
215 return err
216}
217
218func copyFile(src, dst string) error {
219 in, err := os.Open(src)
220 if err != nil {
221 return fmt.Errorf("when opening source: %w", err)
222 }
223 defer in.Close()
224
225 out, err := os.Create(dst)
226 if err != nil {
227 return fmt.Errorf("when creating destination: %w", err)
228 }
229 defer out.Close()
230
231 _, err = io.Copy(out, in)
232 if err != nil {
233 return fmt.Errorf("when copying file: %w", err)
234 }
235 return out.Close()
236}
237
Serge Bazanskie78a0892021-10-07 17:03:49 +0200238// getNodes wraps around Management.GetNodes to return a list of nodes in a
239// cluster.
240func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200241 var res []*apb.Node
Serge Bazanski075465c2021-11-16 15:38:49 +0100242 bo := backoff.NewExponentialBackOff()
243 err := backoff.Retry(func() error {
244 res = nil
245 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200246 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100247 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200248 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100249 for {
250 node, err := srvN.Recv()
251 if err == io.EOF {
252 break
253 }
254 if err != nil {
255 return fmt.Errorf("GetNodes.Recv: %w", err)
256 }
257 res = append(res, node)
258 }
259 return nil
260 }, bo)
261 if err != nil {
262 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200263 }
264 return res, nil
265}
266
Serge Bazanski66e58952021-10-05 17:06:56 +0200267// Gets a random EUI-48 Ethernet MAC address
268func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
269 macBuf := make([]byte, 6)
270 _, err := rand.Read(macBuf)
271 if err != nil {
272 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
273 }
274
275 // Set U/L bit and clear I/G bit (locally administered individual MAC)
276 // Ref IEEE 802-2014 Section 8.2.2
277 macBuf[0] = (macBuf[0] | 2) & 0xfe
278 mac := net.HardwareAddr(macBuf)
279 return &mac, nil
280}
281
282// ClusterPorts contains all ports forwarded by Nanoswitch to the first VM in a
283// launched Metropolis cluster.
Serge Bazanski52304a82021-10-29 16:56:18 +0200284var ClusterPorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200285 node.CuratorServicePort,
286 node.DebugServicePort,
287
288 node.KubernetesAPIPort,
289}
290
291// ClusterOptions contains all options for launching a Metropolis cluster.
292type ClusterOptions struct {
293 // The number of nodes this cluster should be started with.
294 NumNodes int
295}
296
297// Cluster is the running Metropolis cluster launched using the LaunchCluster
298// function.
299type Cluster struct {
300 // Debug is the NodeDebugService gRPC service, allowing for debug
301 // unauthenticated cluster to the access.
302 Debug apb.NodeDebugServiceClient
303 // Management is the Management gRPC service, authenticated as the owner of the
304 // cluster.
305 Management apb.ManagementClient
306 // Owner is the TLS Certificate of the owner of the test cluster. This can be
307 // used to authenticate further clients to the running cluster.
308 Owner tls.Certificate
309 // Ports is the PortMap used to access the first nodes' services (defined in
310 // ClusterPorts).
311 Ports launch.PortMap
312
313 // nodesDone is a list of channels populated with the return codes from all the
314 // nodes' qemu instances. It's used by Close to ensure all nodes have
315 // succesfully been stopped.
316 nodesDone []chan error
317 // ctxC is used by Close to cancel the context under which the nodes are
318 // running.
319 ctxC context.CancelFunc
320}
321
322// LaunchCluster launches a cluster of Metropolis node VMs together with a
323// Nanoswitch instance to network them all together.
324//
325// The given context will be used to run all qemu instances in the cluster, and
326// canceling the context or calling Close() will terminate them.
327func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200328 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200329 return nil, errors.New("refusing to start cluster with zero nodes")
330 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200331
332 ctxT, ctxC := context.WithCancel(ctx)
333
334 // Prepare links between nodes and nanoswitch.
335 var switchPorts []*os.File
336 var vmPorts []*os.File
337 for i := 0; i < opts.NumNodes; i++ {
338 switchPort, vmPort, err := launch.NewSocketPair()
339 if err != nil {
340 ctxC()
341 return nil, fmt.Errorf("failed to get socketpair: %w", err)
342 }
343 switchPorts = append(switchPorts, switchPort)
344 vmPorts = append(vmPorts, vmPort)
345 }
346
Serge Bazanskie78a0892021-10-07 17:03:49 +0200347 // Make a list of channels that will be populated by all running node qemu
348 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200349 done := make([]chan error, opts.NumNodes)
350 for i, _ := range done {
351 done[i] = make(chan error, 1)
352 }
353
354 // Start first node.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200355 log.Printf("Cluster: Starting node %d...", 1)
Serge Bazanski66e58952021-10-05 17:06:56 +0200356 go func() {
357 err := LaunchNode(ctxT, NodeOptions{
358 ConnectToSocket: vmPorts[0],
359 NodeParameters: &apb.NodeParameters{
360 Cluster: &apb.NodeParameters_ClusterBootstrap_{
361 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
362 OwnerPublicKey: InsecurePublicKey,
363 },
364 },
365 },
Serge Bazanski075465c2021-11-16 15:38:49 +0100366 SerialPort: newPrefixedStdio(0),
Serge Bazanski66e58952021-10-05 17:06:56 +0200367 })
368 done[0] <- err
369 }()
370
Serge Bazanskie78a0892021-10-07 17:03:49 +0200371 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200372 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
373 if err != nil {
374 ctxC()
375 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
376 }
377
378 go func() {
379 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
380 KernelPath: "metropolis/test/ktest/vmlinux",
381 InitramfsPath: "metropolis/test/nanoswitch/initramfs.lz4",
382 ExtraNetworkInterfaces: switchPorts,
383 PortMap: portMap,
384 }); err != nil {
385 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100386 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200387 }
388 }
389 }()
390
391 // Dial debug service.
392 copts := []grpcretry.CallOption{
393 grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
394 }
395 debugConn, err := portMap.DialGRPC(node.DebugServicePort, grpc.WithInsecure(),
396 grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(copts...)))
397 if err != nil {
398 ctxC()
399 return nil, fmt.Errorf("failed to dial debug service: %w", err)
400 }
401
402 // Dial external service.
403 remote := fmt.Sprintf("localhost:%v", portMap[node.CuratorServicePort])
404 initClient, err := rpc.NewEphemeralClient(remote, InsecurePrivateKey, nil)
405 if err != nil {
406 ctxC()
407 return nil, fmt.Errorf("NewInitialClient: %w", err)
408 }
409
410 // Retrieve owner certificate - this can take a while because the node is still
411 // coming up, so do it in a backoff loop.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200412 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanski66e58952021-10-05 17:06:56 +0200413 aaa := apb.NewAAAClient(initClient)
414 var cert *tls.Certificate
415 err = backoff.Retry(func() error {
416 cert, err = rpc.RetrieveOwnerCertificate(ctxT, aaa, InsecurePrivateKey)
417 return err
418 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctxT))
419 if err != nil {
420 ctxC()
421 return nil, err
422 }
423 log.Printf("Cluster: retrieved owner certificate.")
424
Serge Bazanskie78a0892021-10-07 17:03:49 +0200425 // Build authenticated owner client to new node.
Serge Bazanski66e58952021-10-05 17:06:56 +0200426 authClient, err := rpc.NewAuthenticatedClient(remote, *cert, nil)
427 if err != nil {
428 ctxC()
429 return nil, fmt.Errorf("NewAuthenticatedClient: %w", err)
430 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200431 mgmt := apb.NewManagementClient(authClient)
432
433 // Retrieve register ticket to register further nodes.
434 log.Printf("Cluster: retrieving register ticket...")
435 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
436 if err != nil {
437 ctxC()
438 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
439 }
440 ticket := resT.Ticket
441 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
442
443 // Retrieve cluster info (for directory and ca public key) to register further
444 // nodes.
445 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
446 if err != nil {
447 ctxC()
448 return nil, fmt.Errorf("GetClusterInfo: %w", err)
449 }
450
451 // Now run the rest of the nodes.
452 //
453 // TODO(q3k): parallelize this
454 for i := 1; i < opts.NumNodes; i++ {
455 log.Printf("Cluster: Starting node %d...", i+1)
456 go func(i int) {
457 err := LaunchNode(ctxT, NodeOptions{
458 ConnectToSocket: vmPorts[i],
459 NodeParameters: &apb.NodeParameters{
460 Cluster: &apb.NodeParameters_ClusterRegister_{
461 ClusterRegister: &apb.NodeParameters_ClusterRegister{
462 RegisterTicket: ticket,
463 ClusterDirectory: resI.ClusterDirectory,
464 CaCertificate: resI.CaCertificate,
465 },
466 },
467 },
Serge Bazanski075465c2021-11-16 15:38:49 +0100468 SerialPort: newPrefixedStdio(i),
Serge Bazanskie78a0892021-10-07 17:03:49 +0200469 })
470 done[i] <- err
471 }(i)
472 var newNode *apb.Node
473
474 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
475 for {
476 nodes, err := getNodes(ctx, mgmt)
477 if err != nil {
478 ctxC()
479 return nil, fmt.Errorf("could not get nodes: %w", err)
480 }
481 for _, n := range nodes {
482 if n.State == cpb.NodeState_NODE_STATE_NEW {
483 newNode = n
484 break
485 }
486 }
487 if newNode != nil {
488 break
489 }
490 time.Sleep(1 * time.Second)
491 }
492 id := identity.NodeID(newNode.Pubkey)
493 log.Printf("Cluster: node %d is %s", i, id)
494
495 log.Printf("Cluster: approving node %d", i)
496 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
497 Pubkey: newNode.Pubkey,
498 })
499 if err != nil {
500 ctxC()
501 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
502 }
503 log.Printf("Cluster: node %d approved, waiting for it to appear as UP...", i)
504 for {
505 nodes, err := getNodes(ctx, mgmt)
506 if err != nil {
507 ctxC()
508 return nil, fmt.Errorf("could not get nodes: %w", err)
509 }
510 found := false
511 for _, n := range nodes {
512 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
513 continue
514 }
515 if n.State == cpb.NodeState_NODE_STATE_UP {
516 found = true
517 break
518 }
519 time.Sleep(time.Second)
520 }
521 if found {
522 break
523 }
524 }
525 log.Printf("Cluster: node %d (%s) UP!", i, id)
526 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200527
528 return &Cluster{
529 Debug: apb.NewNodeDebugServiceClient(debugConn),
Serge Bazanskie78a0892021-10-07 17:03:49 +0200530 Management: mgmt,
Serge Bazanski66e58952021-10-05 17:06:56 +0200531 Owner: *cert,
532 Ports: portMap,
533 nodesDone: done,
534
535 ctxC: ctxC,
536 }, nil
537}
538
539// Close cancels the running clusters' context and waits for all virtualized
540// nodes to stop. It returns an error if stopping the nodes failed, or one of
541// the nodes failed to fully start in the first place.
542func (c *Cluster) Close() error {
543 log.Printf("Cluster: stopping...")
544 c.ctxC()
545
546 var errors []error
547 log.Printf("Cluster: waiting for nodes to exit...")
548 for _, c := range c.nodesDone {
549 err := <-c
550 if err != nil {
551 errors = append(errors, err)
552 }
553 }
554 log.Printf("Cluster: done")
555 return multierr.Combine(errors...)
556}