blob: 4d5a7727255174c8a8bcbc634d8b3c8b372242b2 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
24 grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
25 "go.uber.org/multierr"
26 "google.golang.org/grpc"
27 "google.golang.org/protobuf/proto"
28
29 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020030 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020031 "source.monogon.dev/metropolis/node/core/rpc"
32 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020033 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "source.monogon.dev/metropolis/test/launch"
35)
36
37// Options contains all options that can be passed to Launch()
38type NodeOptions struct {
39 // Ports contains the port mapping where to expose the internal ports of the VM to
40 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
41 // ConnectToSocket is set.
42 Ports launch.PortMap
43
44 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
45 // command. Metropolis nodes generally restarts on almost all errors, so unless you
46 // want to test reboot behavior this should be false.
47 AllowReboot bool
48
49 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
50 // it is instead connected to the given file descriptor/socket. If this is set, all
51 // port maps from the Ports option are ignored. Intended for networking this
52 // instance together with others for running more complex network configurations.
53 ConnectToSocket *os.File
54
55 // SerialPort is a io.ReadWriter over which you can communicate with the serial
56 // port of the machine It can be set to an existing file descriptor (like
57 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
58 SerialPort io.ReadWriter
59
60 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
61 // registering into a cluster.
62 NodeParameters *apb.NodeParameters
63}
64
65// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020066var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020067 node.ConsensusPort,
68
69 node.CuratorServicePort,
70 node.DebugServicePort,
71
72 node.KubernetesAPIPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020073 node.CuratorServicePort,
74 node.DebuggerPort,
75}
76
77// LaunchNode launches a single Metropolis node instance with the given options.
78// The instance runs mostly paravirtualized but with some emulated hardware
79// similar to how a cloud provider might set up its VMs. The disk is fully
80// writable but is run in snapshot mode meaning that changes are not kept beyond
81// a single invocation.
82func LaunchNode(ctx context.Context, options NodeOptions) error {
83 // Pin temp directory to /tmp until we can use abstract socket namespace in QEMU
84 // (next release after 5.0,
85 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
86 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
87 // that we open and pass the name of it to QEMU. Not pinning this crashes both
88 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
89 // reasons 108 chars).
Lorenz Brun764a2de2021-11-22 16:26:36 +010090 tempDir, err := os.MkdirTemp("/tmp", "launch*")
Serge Bazanski66e58952021-10-05 17:06:56 +020091 if err != nil {
92 return fmt.Errorf("failed to create temporary directory: %w", err)
93 }
94 defer os.RemoveAll(tempDir)
95
96 // Copy TPM state into a temporary directory since it's being modified by the
97 // emulator
98 tpmTargetDir := filepath.Join(tempDir, "tpm")
99 tpmSrcDir := "metropolis/node/tpm"
100 if err := os.Mkdir(tpmTargetDir, 0755); err != nil {
101 return fmt.Errorf("failed to create TPM state directory: %w", err)
102 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100103 tpmFiles, err := os.ReadDir(tpmSrcDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200104 if err != nil {
105 return fmt.Errorf("failed to read TPM directory: %w", err)
106 }
107 for _, file := range tpmFiles {
108 name := file.Name()
109 src := filepath.Join(tpmSrcDir, name)
110 target := filepath.Join(tpmTargetDir, name)
111 if err := copyFile(src, target); err != nil {
112 return fmt.Errorf("failed to copy TPM directory: file %q to %q: %w", src, target, err)
113 }
114 }
115
116 var qemuNetType string
117 var qemuNetConfig launch.QemuValue
118 if options.ConnectToSocket != nil {
119 qemuNetType = "socket"
120 qemuNetConfig = launch.QemuValue{
121 "id": {"net0"},
122 "fd": {"3"},
123 }
124 } else {
125 qemuNetType = "user"
126 qemuNetConfig = launch.QemuValue{
127 "id": {"net0"},
128 "net": {"10.42.0.0/24"},
129 "dhcpstart": {"10.42.0.10"},
130 "hostfwd": options.Ports.ToQemuForwards(),
131 }
132 }
133
134 tpmSocketPath := filepath.Join(tempDir, "tpm-socket")
135
136 mac, err := generateRandomEthernetMAC()
137 if err != nil {
138 return err
139 }
140
141 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
142 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
143 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
144 "-drive", "if=pflash,format=raw,snapshot=on,file=external/edk2/OVMF_VARS.fd",
145 "-drive", "if=virtio,format=raw,snapshot=on,cache=unsafe,file=metropolis/node/node.img",
146 "-netdev", qemuNetConfig.ToOption(qemuNetType),
147 "-device", "virtio-net-pci,netdev=net0,mac=" + mac.String(),
148 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
149 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
150 "-device", "tpm-tis,tpmdev=tpm0",
151 "-device", "virtio-rng-pci",
152 "-serial", "stdio"}
153
154 if !options.AllowReboot {
155 qemuArgs = append(qemuArgs, "-no-reboot")
156 }
157
158 if options.NodeParameters != nil {
159 parametersPath := filepath.Join(tempDir, "parameters.pb")
160 parametersRaw, err := proto.Marshal(options.NodeParameters)
161 if err != nil {
162 return fmt.Errorf("failed to encode node paraeters: %w", err)
163 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100164 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200165 return fmt.Errorf("failed to write node parameters: %w", err)
166 }
167 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
168 }
169
170 // Start TPM emulator as a subprocess
171 tpmCtx, tpmCancel := context.WithCancel(ctx)
172 defer tpmCancel()
173
174 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmTargetDir, "--ctrl", "type=unixio,path="+tpmSocketPath)
175 tpmEmuCmd.Stderr = os.Stderr
176 tpmEmuCmd.Stdout = os.Stdout
177
178 err = tpmEmuCmd.Start()
179 if err != nil {
180 return fmt.Errorf("failed to start TPM emulator: %w", err)
181 }
182
183 // Start the main qemu binary
184 systemCmd := exec.CommandContext(ctx, "qemu-system-x86_64", qemuArgs...)
185 if options.ConnectToSocket != nil {
186 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
187 }
188
189 var stdErrBuf bytes.Buffer
190 systemCmd.Stderr = &stdErrBuf
191 systemCmd.Stdout = options.SerialPort
192
193 err = systemCmd.Run()
194
195 // Stop TPM emulator and wait for it to exit to properly reap the child process
196 tpmCancel()
197 log.Print("Node: Waiting for TPM emulator to exit")
198 // Wait returns a SIGKILL error because we just cancelled its context.
199 // We still need to call it to avoid creating zombies.
200 _ = tpmEmuCmd.Wait()
201 log.Print("Node: TPM emulator done")
202
203 var exerr *exec.ExitError
204 if err != nil && errors.As(err, &exerr) {
205 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
206 if status.Signaled() && status.Signal() == syscall.SIGKILL {
207 // Process was killed externally (most likely by our context being canceled).
208 // This is a normal exit for us, so return nil
209 return nil
210 }
211 exerr.Stderr = stdErrBuf.Bytes()
212 newErr := launch.QEMUError(*exerr)
213 return &newErr
214 }
215 return err
216}
217
218func copyFile(src, dst string) error {
219 in, err := os.Open(src)
220 if err != nil {
221 return fmt.Errorf("when opening source: %w", err)
222 }
223 defer in.Close()
224
225 out, err := os.Create(dst)
226 if err != nil {
227 return fmt.Errorf("when creating destination: %w", err)
228 }
229 defer out.Close()
230
231 _, err = io.Copy(out, in)
232 if err != nil {
233 return fmt.Errorf("when copying file: %w", err)
234 }
235 return out.Close()
236}
237
Serge Bazanskie78a0892021-10-07 17:03:49 +0200238// getNodes wraps around Management.GetNodes to return a list of nodes in a
239// cluster.
240func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
241 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
242 if err != nil {
243 return nil, fmt.Errorf("GetNodes: %w", err)
244 }
245 var res []*apb.Node
246 for {
247 node, err := srvN.Recv()
248 if err == io.EOF {
249 break
250 }
251 if err != nil {
252 return nil, fmt.Errorf("GetNodes.Recv: %w", err)
253 }
254 res = append(res, node)
255 }
256 return res, nil
257}
258
Serge Bazanski66e58952021-10-05 17:06:56 +0200259// Gets a random EUI-48 Ethernet MAC address
260func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
261 macBuf := make([]byte, 6)
262 _, err := rand.Read(macBuf)
263 if err != nil {
264 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
265 }
266
267 // Set U/L bit and clear I/G bit (locally administered individual MAC)
268 // Ref IEEE 802-2014 Section 8.2.2
269 macBuf[0] = (macBuf[0] | 2) & 0xfe
270 mac := net.HardwareAddr(macBuf)
271 return &mac, nil
272}
273
274// ClusterPorts contains all ports forwarded by Nanoswitch to the first VM in a
275// launched Metropolis cluster.
Serge Bazanski52304a82021-10-29 16:56:18 +0200276var ClusterPorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200277 node.CuratorServicePort,
278 node.DebugServicePort,
279
280 node.KubernetesAPIPort,
281}
282
283// ClusterOptions contains all options for launching a Metropolis cluster.
284type ClusterOptions struct {
285 // The number of nodes this cluster should be started with.
286 NumNodes int
287}
288
289// Cluster is the running Metropolis cluster launched using the LaunchCluster
290// function.
291type Cluster struct {
292 // Debug is the NodeDebugService gRPC service, allowing for debug
293 // unauthenticated cluster to the access.
294 Debug apb.NodeDebugServiceClient
295 // Management is the Management gRPC service, authenticated as the owner of the
296 // cluster.
297 Management apb.ManagementClient
298 // Owner is the TLS Certificate of the owner of the test cluster. This can be
299 // used to authenticate further clients to the running cluster.
300 Owner tls.Certificate
301 // Ports is the PortMap used to access the first nodes' services (defined in
302 // ClusterPorts).
303 Ports launch.PortMap
304
305 // nodesDone is a list of channels populated with the return codes from all the
306 // nodes' qemu instances. It's used by Close to ensure all nodes have
307 // succesfully been stopped.
308 nodesDone []chan error
309 // ctxC is used by Close to cancel the context under which the nodes are
310 // running.
311 ctxC context.CancelFunc
312}
313
314// LaunchCluster launches a cluster of Metropolis node VMs together with a
315// Nanoswitch instance to network them all together.
316//
317// The given context will be used to run all qemu instances in the cluster, and
318// canceling the context or calling Close() will terminate them.
319func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200320 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200321 return nil, errors.New("refusing to start cluster with zero nodes")
322 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200323
324 ctxT, ctxC := context.WithCancel(ctx)
325
326 // Prepare links between nodes and nanoswitch.
327 var switchPorts []*os.File
328 var vmPorts []*os.File
329 for i := 0; i < opts.NumNodes; i++ {
330 switchPort, vmPort, err := launch.NewSocketPair()
331 if err != nil {
332 ctxC()
333 return nil, fmt.Errorf("failed to get socketpair: %w", err)
334 }
335 switchPorts = append(switchPorts, switchPort)
336 vmPorts = append(vmPorts, vmPort)
337 }
338
Serge Bazanskie78a0892021-10-07 17:03:49 +0200339 // Make a list of channels that will be populated by all running node qemu
340 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200341 done := make([]chan error, opts.NumNodes)
342 for i, _ := range done {
343 done[i] = make(chan error, 1)
344 }
345
346 // Start first node.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200347 log.Printf("Cluster: Starting node %d...", 1)
Serge Bazanski66e58952021-10-05 17:06:56 +0200348 go func() {
349 err := LaunchNode(ctxT, NodeOptions{
350 ConnectToSocket: vmPorts[0],
351 NodeParameters: &apb.NodeParameters{
352 Cluster: &apb.NodeParameters_ClusterBootstrap_{
353 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
354 OwnerPublicKey: InsecurePublicKey,
355 },
356 },
357 },
358 })
359 done[0] <- err
360 }()
361
Serge Bazanskie78a0892021-10-07 17:03:49 +0200362 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200363 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
364 if err != nil {
365 ctxC()
366 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
367 }
368
369 go func() {
370 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
371 KernelPath: "metropolis/test/ktest/vmlinux",
372 InitramfsPath: "metropolis/test/nanoswitch/initramfs.lz4",
373 ExtraNetworkInterfaces: switchPorts,
374 PortMap: portMap,
375 }); err != nil {
376 if !errors.Is(err, ctxT.Err()) {
377 log.Printf("Failed to launch nanoswitch: %v", err)
378 }
379 }
380 }()
381
382 // Dial debug service.
383 copts := []grpcretry.CallOption{
384 grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
385 }
386 debugConn, err := portMap.DialGRPC(node.DebugServicePort, grpc.WithInsecure(),
387 grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(copts...)))
388 if err != nil {
389 ctxC()
390 return nil, fmt.Errorf("failed to dial debug service: %w", err)
391 }
392
393 // Dial external service.
394 remote := fmt.Sprintf("localhost:%v", portMap[node.CuratorServicePort])
395 initClient, err := rpc.NewEphemeralClient(remote, InsecurePrivateKey, nil)
396 if err != nil {
397 ctxC()
398 return nil, fmt.Errorf("NewInitialClient: %w", err)
399 }
400
401 // Retrieve owner certificate - this can take a while because the node is still
402 // coming up, so do it in a backoff loop.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200403 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanski66e58952021-10-05 17:06:56 +0200404 aaa := apb.NewAAAClient(initClient)
405 var cert *tls.Certificate
406 err = backoff.Retry(func() error {
407 cert, err = rpc.RetrieveOwnerCertificate(ctxT, aaa, InsecurePrivateKey)
408 return err
409 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctxT))
410 if err != nil {
411 ctxC()
412 return nil, err
413 }
414 log.Printf("Cluster: retrieved owner certificate.")
415
Serge Bazanskie78a0892021-10-07 17:03:49 +0200416 // Build authenticated owner client to new node.
Serge Bazanski66e58952021-10-05 17:06:56 +0200417 authClient, err := rpc.NewAuthenticatedClient(remote, *cert, nil)
418 if err != nil {
419 ctxC()
420 return nil, fmt.Errorf("NewAuthenticatedClient: %w", err)
421 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200422 mgmt := apb.NewManagementClient(authClient)
423
424 // Retrieve register ticket to register further nodes.
425 log.Printf("Cluster: retrieving register ticket...")
426 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
427 if err != nil {
428 ctxC()
429 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
430 }
431 ticket := resT.Ticket
432 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
433
434 // Retrieve cluster info (for directory and ca public key) to register further
435 // nodes.
436 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
437 if err != nil {
438 ctxC()
439 return nil, fmt.Errorf("GetClusterInfo: %w", err)
440 }
441
442 // Now run the rest of the nodes.
443 //
444 // TODO(q3k): parallelize this
445 for i := 1; i < opts.NumNodes; i++ {
446 log.Printf("Cluster: Starting node %d...", i+1)
447 go func(i int) {
448 err := LaunchNode(ctxT, NodeOptions{
449 ConnectToSocket: vmPorts[i],
450 NodeParameters: &apb.NodeParameters{
451 Cluster: &apb.NodeParameters_ClusterRegister_{
452 ClusterRegister: &apb.NodeParameters_ClusterRegister{
453 RegisterTicket: ticket,
454 ClusterDirectory: resI.ClusterDirectory,
455 CaCertificate: resI.CaCertificate,
456 },
457 },
458 },
459 SerialPort: os.Stdout,
460 })
461 done[i] <- err
462 }(i)
463 var newNode *apb.Node
464
465 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
466 for {
467 nodes, err := getNodes(ctx, mgmt)
468 if err != nil {
469 ctxC()
470 return nil, fmt.Errorf("could not get nodes: %w", err)
471 }
472 for _, n := range nodes {
473 if n.State == cpb.NodeState_NODE_STATE_NEW {
474 newNode = n
475 break
476 }
477 }
478 if newNode != nil {
479 break
480 }
481 time.Sleep(1 * time.Second)
482 }
483 id := identity.NodeID(newNode.Pubkey)
484 log.Printf("Cluster: node %d is %s", i, id)
485
486 log.Printf("Cluster: approving node %d", i)
487 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
488 Pubkey: newNode.Pubkey,
489 })
490 if err != nil {
491 ctxC()
492 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
493 }
494 log.Printf("Cluster: node %d approved, waiting for it to appear as UP...", i)
495 for {
496 nodes, err := getNodes(ctx, mgmt)
497 if err != nil {
498 ctxC()
499 return nil, fmt.Errorf("could not get nodes: %w", err)
500 }
501 found := false
502 for _, n := range nodes {
503 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
504 continue
505 }
506 if n.State == cpb.NodeState_NODE_STATE_UP {
507 found = true
508 break
509 }
510 time.Sleep(time.Second)
511 }
512 if found {
513 break
514 }
515 }
516 log.Printf("Cluster: node %d (%s) UP!", i, id)
517 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200518
519 return &Cluster{
520 Debug: apb.NewNodeDebugServiceClient(debugConn),
Serge Bazanskie78a0892021-10-07 17:03:49 +0200521 Management: mgmt,
Serge Bazanski66e58952021-10-05 17:06:56 +0200522 Owner: *cert,
523 Ports: portMap,
524 nodesDone: done,
525
526 ctxC: ctxC,
527 }, nil
528}
529
530// Close cancels the running clusters' context and waits for all virtualized
531// nodes to stop. It returns an error if stopping the nodes failed, or one of
532// the nodes failed to fully start in the first place.
533func (c *Cluster) Close() error {
534 log.Printf("Cluster: stopping...")
535 c.ctxC()
536
537 var errors []error
538 log.Printf("Cluster: waiting for nodes to exit...")
539 for _, c := range c.nodesDone {
540 err := <-c
541 if err != nil {
542 errors = append(errors, err)
543 }
544 }
545 log.Printf("Cluster: done")
546 return multierr.Combine(errors...)
547}