blob: 6be0efe38465bb7fc66f3ad37c4e526dcb60f831 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
15 "io/ioutil"
16 "log"
17 "net"
18 "os"
19 "os/exec"
20 "path/filepath"
21 "syscall"
22 "time"
23
24 "github.com/cenkalti/backoff/v4"
25 grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
26 "go.uber.org/multierr"
27 "google.golang.org/grpc"
28 "google.golang.org/protobuf/proto"
29
30 "source.monogon.dev/metropolis/node"
31 "source.monogon.dev/metropolis/node/core/rpc"
32 apb "source.monogon.dev/metropolis/proto/api"
33 "source.monogon.dev/metropolis/test/launch"
34)
35
36// Options contains all options that can be passed to Launch()
37type NodeOptions struct {
38 // Ports contains the port mapping where to expose the internal ports of the VM to
39 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
40 // ConnectToSocket is set.
41 Ports launch.PortMap
42
43 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
44 // command. Metropolis nodes generally restarts on almost all errors, so unless you
45 // want to test reboot behavior this should be false.
46 AllowReboot bool
47
48 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
49 // it is instead connected to the given file descriptor/socket. If this is set, all
50 // port maps from the Ports option are ignored. Intended for networking this
51 // instance together with others for running more complex network configurations.
52 ConnectToSocket *os.File
53
54 // SerialPort is a io.ReadWriter over which you can communicate with the serial
55 // port of the machine It can be set to an existing file descriptor (like
56 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
57 SerialPort io.ReadWriter
58
59 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
60 // registering into a cluster.
61 NodeParameters *apb.NodeParameters
62}
63
64// NodePorts is the list of ports a fully operational Metropolis node listens on
65var NodePorts = []uint16{
66 node.ConsensusPort,
67
68 node.CuratorServicePort,
69 node.DebugServicePort,
70
71 node.KubernetesAPIPort,
72 node.MasterServicePort,
73 node.CuratorServicePort,
74 node.DebuggerPort,
75}
76
77// LaunchNode launches a single Metropolis node instance with the given options.
78// The instance runs mostly paravirtualized but with some emulated hardware
79// similar to how a cloud provider might set up its VMs. The disk is fully
80// writable but is run in snapshot mode meaning that changes are not kept beyond
81// a single invocation.
82func LaunchNode(ctx context.Context, options NodeOptions) error {
83 // Pin temp directory to /tmp until we can use abstract socket namespace in QEMU
84 // (next release after 5.0,
85 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
86 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
87 // that we open and pass the name of it to QEMU. Not pinning this crashes both
88 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
89 // reasons 108 chars).
90 tempDir, err := ioutil.TempDir("/tmp", "launch*")
91 if err != nil {
92 return fmt.Errorf("failed to create temporary directory: %w", err)
93 }
94 defer os.RemoveAll(tempDir)
95
96 // Copy TPM state into a temporary directory since it's being modified by the
97 // emulator
98 tpmTargetDir := filepath.Join(tempDir, "tpm")
99 tpmSrcDir := "metropolis/node/tpm"
100 if err := os.Mkdir(tpmTargetDir, 0755); err != nil {
101 return fmt.Errorf("failed to create TPM state directory: %w", err)
102 }
103 tpmFiles, err := ioutil.ReadDir(tpmSrcDir)
104 if err != nil {
105 return fmt.Errorf("failed to read TPM directory: %w", err)
106 }
107 for _, file := range tpmFiles {
108 name := file.Name()
109 src := filepath.Join(tpmSrcDir, name)
110 target := filepath.Join(tpmTargetDir, name)
111 if err := copyFile(src, target); err != nil {
112 return fmt.Errorf("failed to copy TPM directory: file %q to %q: %w", src, target, err)
113 }
114 }
115
116 var qemuNetType string
117 var qemuNetConfig launch.QemuValue
118 if options.ConnectToSocket != nil {
119 qemuNetType = "socket"
120 qemuNetConfig = launch.QemuValue{
121 "id": {"net0"},
122 "fd": {"3"},
123 }
124 } else {
125 qemuNetType = "user"
126 qemuNetConfig = launch.QemuValue{
127 "id": {"net0"},
128 "net": {"10.42.0.0/24"},
129 "dhcpstart": {"10.42.0.10"},
130 "hostfwd": options.Ports.ToQemuForwards(),
131 }
132 }
133
134 tpmSocketPath := filepath.Join(tempDir, "tpm-socket")
135
136 mac, err := generateRandomEthernetMAC()
137 if err != nil {
138 return err
139 }
140
141 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
142 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
143 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
144 "-drive", "if=pflash,format=raw,snapshot=on,file=external/edk2/OVMF_VARS.fd",
145 "-drive", "if=virtio,format=raw,snapshot=on,cache=unsafe,file=metropolis/node/node.img",
146 "-netdev", qemuNetConfig.ToOption(qemuNetType),
147 "-device", "virtio-net-pci,netdev=net0,mac=" + mac.String(),
148 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
149 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
150 "-device", "tpm-tis,tpmdev=tpm0",
151 "-device", "virtio-rng-pci",
152 "-serial", "stdio"}
153
154 if !options.AllowReboot {
155 qemuArgs = append(qemuArgs, "-no-reboot")
156 }
157
158 if options.NodeParameters != nil {
159 parametersPath := filepath.Join(tempDir, "parameters.pb")
160 parametersRaw, err := proto.Marshal(options.NodeParameters)
161 if err != nil {
162 return fmt.Errorf("failed to encode node paraeters: %w", err)
163 }
164 if err := ioutil.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
165 return fmt.Errorf("failed to write node parameters: %w", err)
166 }
167 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
168 }
169
170 // Start TPM emulator as a subprocess
171 tpmCtx, tpmCancel := context.WithCancel(ctx)
172 defer tpmCancel()
173
174 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmTargetDir, "--ctrl", "type=unixio,path="+tpmSocketPath)
175 tpmEmuCmd.Stderr = os.Stderr
176 tpmEmuCmd.Stdout = os.Stdout
177
178 err = tpmEmuCmd.Start()
179 if err != nil {
180 return fmt.Errorf("failed to start TPM emulator: %w", err)
181 }
182
183 // Start the main qemu binary
184 systemCmd := exec.CommandContext(ctx, "qemu-system-x86_64", qemuArgs...)
185 if options.ConnectToSocket != nil {
186 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
187 }
188
189 var stdErrBuf bytes.Buffer
190 systemCmd.Stderr = &stdErrBuf
191 systemCmd.Stdout = options.SerialPort
192
193 err = systemCmd.Run()
194
195 // Stop TPM emulator and wait for it to exit to properly reap the child process
196 tpmCancel()
197 log.Print("Node: Waiting for TPM emulator to exit")
198 // Wait returns a SIGKILL error because we just cancelled its context.
199 // We still need to call it to avoid creating zombies.
200 _ = tpmEmuCmd.Wait()
201 log.Print("Node: TPM emulator done")
202
203 var exerr *exec.ExitError
204 if err != nil && errors.As(err, &exerr) {
205 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
206 if status.Signaled() && status.Signal() == syscall.SIGKILL {
207 // Process was killed externally (most likely by our context being canceled).
208 // This is a normal exit for us, so return nil
209 return nil
210 }
211 exerr.Stderr = stdErrBuf.Bytes()
212 newErr := launch.QEMUError(*exerr)
213 return &newErr
214 }
215 return err
216}
217
218func copyFile(src, dst string) error {
219 in, err := os.Open(src)
220 if err != nil {
221 return fmt.Errorf("when opening source: %w", err)
222 }
223 defer in.Close()
224
225 out, err := os.Create(dst)
226 if err != nil {
227 return fmt.Errorf("when creating destination: %w", err)
228 }
229 defer out.Close()
230
231 _, err = io.Copy(out, in)
232 if err != nil {
233 return fmt.Errorf("when copying file: %w", err)
234 }
235 return out.Close()
236}
237
238// Gets a random EUI-48 Ethernet MAC address
239func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
240 macBuf := make([]byte, 6)
241 _, err := rand.Read(macBuf)
242 if err != nil {
243 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
244 }
245
246 // Set U/L bit and clear I/G bit (locally administered individual MAC)
247 // Ref IEEE 802-2014 Section 8.2.2
248 macBuf[0] = (macBuf[0] | 2) & 0xfe
249 mac := net.HardwareAddr(macBuf)
250 return &mac, nil
251}
252
253// ClusterPorts contains all ports forwarded by Nanoswitch to the first VM in a
254// launched Metropolis cluster.
255var ClusterPorts = []uint16{
256 node.CuratorServicePort,
257 node.DebugServicePort,
258
259 node.KubernetesAPIPort,
260}
261
262// ClusterOptions contains all options for launching a Metropolis cluster.
263type ClusterOptions struct {
264 // The number of nodes this cluster should be started with.
265 NumNodes int
266}
267
268// Cluster is the running Metropolis cluster launched using the LaunchCluster
269// function.
270type Cluster struct {
271 // Debug is the NodeDebugService gRPC service, allowing for debug
272 // unauthenticated cluster to the access.
273 Debug apb.NodeDebugServiceClient
274 // Management is the Management gRPC service, authenticated as the owner of the
275 // cluster.
276 Management apb.ManagementClient
277 // Owner is the TLS Certificate of the owner of the test cluster. This can be
278 // used to authenticate further clients to the running cluster.
279 Owner tls.Certificate
280 // Ports is the PortMap used to access the first nodes' services (defined in
281 // ClusterPorts).
282 Ports launch.PortMap
283
284 // nodesDone is a list of channels populated with the return codes from all the
285 // nodes' qemu instances. It's used by Close to ensure all nodes have
286 // succesfully been stopped.
287 nodesDone []chan error
288 // ctxC is used by Close to cancel the context under which the nodes are
289 // running.
290 ctxC context.CancelFunc
291}
292
293// LaunchCluster launches a cluster of Metropolis node VMs together with a
294// Nanoswitch instance to network them all together.
295//
296// The given context will be used to run all qemu instances in the cluster, and
297// canceling the context or calling Close() will terminate them.
298func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
299 if opts.NumNodes == 0 {
300 return nil, errors.New("refusing to start cluster with zero nodes")
301 }
302 if opts.NumNodes > 1 {
303 return nil, errors.New("unimplemented")
304 }
305
306 ctxT, ctxC := context.WithCancel(ctx)
307
308 // Prepare links between nodes and nanoswitch.
309 var switchPorts []*os.File
310 var vmPorts []*os.File
311 for i := 0; i < opts.NumNodes; i++ {
312 switchPort, vmPort, err := launch.NewSocketPair()
313 if err != nil {
314 ctxC()
315 return nil, fmt.Errorf("failed to get socketpair: %w", err)
316 }
317 switchPorts = append(switchPorts, switchPort)
318 vmPorts = append(vmPorts, vmPort)
319 }
320
321 // Make a list of channels that will be populated and closed by all running node
322 // qemu processes.
323 done := make([]chan error, opts.NumNodes)
324 for i, _ := range done {
325 done[i] = make(chan error, 1)
326 }
327
328 // Start first node.
329 go func() {
330 err := LaunchNode(ctxT, NodeOptions{
331 ConnectToSocket: vmPorts[0],
332 NodeParameters: &apb.NodeParameters{
333 Cluster: &apb.NodeParameters_ClusterBootstrap_{
334 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
335 OwnerPublicKey: InsecurePublicKey,
336 },
337 },
338 },
339 })
340 done[0] <- err
341 }()
342
343 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
344 if err != nil {
345 ctxC()
346 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
347 }
348
349 go func() {
350 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
351 KernelPath: "metropolis/test/ktest/vmlinux",
352 InitramfsPath: "metropolis/test/nanoswitch/initramfs.lz4",
353 ExtraNetworkInterfaces: switchPorts,
354 PortMap: portMap,
355 }); err != nil {
356 if !errors.Is(err, ctxT.Err()) {
357 log.Printf("Failed to launch nanoswitch: %v", err)
358 }
359 }
360 }()
361
362 // Dial debug service.
363 copts := []grpcretry.CallOption{
364 grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
365 }
366 debugConn, err := portMap.DialGRPC(node.DebugServicePort, grpc.WithInsecure(),
367 grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(copts...)))
368 if err != nil {
369 ctxC()
370 return nil, fmt.Errorf("failed to dial debug service: %w", err)
371 }
372
373 // Dial external service.
374 remote := fmt.Sprintf("localhost:%v", portMap[node.CuratorServicePort])
375 initClient, err := rpc.NewEphemeralClient(remote, InsecurePrivateKey, nil)
376 if err != nil {
377 ctxC()
378 return nil, fmt.Errorf("NewInitialClient: %w", err)
379 }
380
381 // Retrieve owner certificate - this can take a while because the node is still
382 // coming up, so do it in a backoff loop.
383 log.Printf("Cluster: retrieving owner certificate...")
384 aaa := apb.NewAAAClient(initClient)
385 var cert *tls.Certificate
386 err = backoff.Retry(func() error {
387 cert, err = rpc.RetrieveOwnerCertificate(ctxT, aaa, InsecurePrivateKey)
388 return err
389 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctxT))
390 if err != nil {
391 ctxC()
392 return nil, err
393 }
394 log.Printf("Cluster: retrieved owner certificate.")
395
396 authClient, err := rpc.NewAuthenticatedClient(remote, *cert, nil)
397 if err != nil {
398 ctxC()
399 return nil, fmt.Errorf("NewAuthenticatedClient: %w", err)
400 }
401
402 return &Cluster{
403 Debug: apb.NewNodeDebugServiceClient(debugConn),
404 Management: apb.NewManagementClient(authClient),
405 Owner: *cert,
406 Ports: portMap,
407 nodesDone: done,
408
409 ctxC: ctxC,
410 }, nil
411}
412
413// Close cancels the running clusters' context and waits for all virtualized
414// nodes to stop. It returns an error if stopping the nodes failed, or one of
415// the nodes failed to fully start in the first place.
416func (c *Cluster) Close() error {
417 log.Printf("Cluster: stopping...")
418 c.ctxC()
419
420 var errors []error
421 log.Printf("Cluster: waiting for nodes to exit...")
422 for _, c := range c.nodesDone {
423 err := <-c
424 if err != nil {
425 errors = append(errors, err)
426 }
427 }
428 log.Printf("Cluster: done")
429 return multierr.Combine(errors...)
430}