blob: a4f11a24a2e0204aed61da62b2fb53365b80844d [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
15 "io/ioutil"
16 "log"
17 "net"
18 "os"
19 "os/exec"
20 "path/filepath"
21 "syscall"
22 "time"
23
24 "github.com/cenkalti/backoff/v4"
25 grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
26 "go.uber.org/multierr"
27 "google.golang.org/grpc"
28 "google.golang.org/protobuf/proto"
29
30 "source.monogon.dev/metropolis/node"
31 "source.monogon.dev/metropolis/node/core/rpc"
32 apb "source.monogon.dev/metropolis/proto/api"
33 "source.monogon.dev/metropolis/test/launch"
34)
35
36// Options contains all options that can be passed to Launch()
37type NodeOptions struct {
38 // Ports contains the port mapping where to expose the internal ports of the VM to
39 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
40 // ConnectToSocket is set.
41 Ports launch.PortMap
42
43 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
44 // command. Metropolis nodes generally restarts on almost all errors, so unless you
45 // want to test reboot behavior this should be false.
46 AllowReboot bool
47
48 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
49 // it is instead connected to the given file descriptor/socket. If this is set, all
50 // port maps from the Ports option are ignored. Intended for networking this
51 // instance together with others for running more complex network configurations.
52 ConnectToSocket *os.File
53
54 // SerialPort is a io.ReadWriter over which you can communicate with the serial
55 // port of the machine It can be set to an existing file descriptor (like
56 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
57 SerialPort io.ReadWriter
58
59 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
60 // registering into a cluster.
61 NodeParameters *apb.NodeParameters
62}
63
64// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020065var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020066 node.ConsensusPort,
67
68 node.CuratorServicePort,
69 node.DebugServicePort,
70
71 node.KubernetesAPIPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020072 node.CuratorServicePort,
73 node.DebuggerPort,
74}
75
76// LaunchNode launches a single Metropolis node instance with the given options.
77// The instance runs mostly paravirtualized but with some emulated hardware
78// similar to how a cloud provider might set up its VMs. The disk is fully
79// writable but is run in snapshot mode meaning that changes are not kept beyond
80// a single invocation.
81func LaunchNode(ctx context.Context, options NodeOptions) error {
82 // Pin temp directory to /tmp until we can use abstract socket namespace in QEMU
83 // (next release after 5.0,
84 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
85 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
86 // that we open and pass the name of it to QEMU. Not pinning this crashes both
87 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
88 // reasons 108 chars).
89 tempDir, err := ioutil.TempDir("/tmp", "launch*")
90 if err != nil {
91 return fmt.Errorf("failed to create temporary directory: %w", err)
92 }
93 defer os.RemoveAll(tempDir)
94
95 // Copy TPM state into a temporary directory since it's being modified by the
96 // emulator
97 tpmTargetDir := filepath.Join(tempDir, "tpm")
98 tpmSrcDir := "metropolis/node/tpm"
99 if err := os.Mkdir(tpmTargetDir, 0755); err != nil {
100 return fmt.Errorf("failed to create TPM state directory: %w", err)
101 }
102 tpmFiles, err := ioutil.ReadDir(tpmSrcDir)
103 if err != nil {
104 return fmt.Errorf("failed to read TPM directory: %w", err)
105 }
106 for _, file := range tpmFiles {
107 name := file.Name()
108 src := filepath.Join(tpmSrcDir, name)
109 target := filepath.Join(tpmTargetDir, name)
110 if err := copyFile(src, target); err != nil {
111 return fmt.Errorf("failed to copy TPM directory: file %q to %q: %w", src, target, err)
112 }
113 }
114
115 var qemuNetType string
116 var qemuNetConfig launch.QemuValue
117 if options.ConnectToSocket != nil {
118 qemuNetType = "socket"
119 qemuNetConfig = launch.QemuValue{
120 "id": {"net0"},
121 "fd": {"3"},
122 }
123 } else {
124 qemuNetType = "user"
125 qemuNetConfig = launch.QemuValue{
126 "id": {"net0"},
127 "net": {"10.42.0.0/24"},
128 "dhcpstart": {"10.42.0.10"},
129 "hostfwd": options.Ports.ToQemuForwards(),
130 }
131 }
132
133 tpmSocketPath := filepath.Join(tempDir, "tpm-socket")
134
135 mac, err := generateRandomEthernetMAC()
136 if err != nil {
137 return err
138 }
139
140 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
141 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
142 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
143 "-drive", "if=pflash,format=raw,snapshot=on,file=external/edk2/OVMF_VARS.fd",
144 "-drive", "if=virtio,format=raw,snapshot=on,cache=unsafe,file=metropolis/node/node.img",
145 "-netdev", qemuNetConfig.ToOption(qemuNetType),
146 "-device", "virtio-net-pci,netdev=net0,mac=" + mac.String(),
147 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
148 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
149 "-device", "tpm-tis,tpmdev=tpm0",
150 "-device", "virtio-rng-pci",
151 "-serial", "stdio"}
152
153 if !options.AllowReboot {
154 qemuArgs = append(qemuArgs, "-no-reboot")
155 }
156
157 if options.NodeParameters != nil {
158 parametersPath := filepath.Join(tempDir, "parameters.pb")
159 parametersRaw, err := proto.Marshal(options.NodeParameters)
160 if err != nil {
161 return fmt.Errorf("failed to encode node paraeters: %w", err)
162 }
163 if err := ioutil.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
164 return fmt.Errorf("failed to write node parameters: %w", err)
165 }
166 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
167 }
168
169 // Start TPM emulator as a subprocess
170 tpmCtx, tpmCancel := context.WithCancel(ctx)
171 defer tpmCancel()
172
173 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmTargetDir, "--ctrl", "type=unixio,path="+tpmSocketPath)
174 tpmEmuCmd.Stderr = os.Stderr
175 tpmEmuCmd.Stdout = os.Stdout
176
177 err = tpmEmuCmd.Start()
178 if err != nil {
179 return fmt.Errorf("failed to start TPM emulator: %w", err)
180 }
181
182 // Start the main qemu binary
183 systemCmd := exec.CommandContext(ctx, "qemu-system-x86_64", qemuArgs...)
184 if options.ConnectToSocket != nil {
185 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
186 }
187
188 var stdErrBuf bytes.Buffer
189 systemCmd.Stderr = &stdErrBuf
190 systemCmd.Stdout = options.SerialPort
191
192 err = systemCmd.Run()
193
194 // Stop TPM emulator and wait for it to exit to properly reap the child process
195 tpmCancel()
196 log.Print("Node: Waiting for TPM emulator to exit")
197 // Wait returns a SIGKILL error because we just cancelled its context.
198 // We still need to call it to avoid creating zombies.
199 _ = tpmEmuCmd.Wait()
200 log.Print("Node: TPM emulator done")
201
202 var exerr *exec.ExitError
203 if err != nil && errors.As(err, &exerr) {
204 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
205 if status.Signaled() && status.Signal() == syscall.SIGKILL {
206 // Process was killed externally (most likely by our context being canceled).
207 // This is a normal exit for us, so return nil
208 return nil
209 }
210 exerr.Stderr = stdErrBuf.Bytes()
211 newErr := launch.QEMUError(*exerr)
212 return &newErr
213 }
214 return err
215}
216
217func copyFile(src, dst string) error {
218 in, err := os.Open(src)
219 if err != nil {
220 return fmt.Errorf("when opening source: %w", err)
221 }
222 defer in.Close()
223
224 out, err := os.Create(dst)
225 if err != nil {
226 return fmt.Errorf("when creating destination: %w", err)
227 }
228 defer out.Close()
229
230 _, err = io.Copy(out, in)
231 if err != nil {
232 return fmt.Errorf("when copying file: %w", err)
233 }
234 return out.Close()
235}
236
237// Gets a random EUI-48 Ethernet MAC address
238func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
239 macBuf := make([]byte, 6)
240 _, err := rand.Read(macBuf)
241 if err != nil {
242 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
243 }
244
245 // Set U/L bit and clear I/G bit (locally administered individual MAC)
246 // Ref IEEE 802-2014 Section 8.2.2
247 macBuf[0] = (macBuf[0] | 2) & 0xfe
248 mac := net.HardwareAddr(macBuf)
249 return &mac, nil
250}
251
252// ClusterPorts contains all ports forwarded by Nanoswitch to the first VM in a
253// launched Metropolis cluster.
Serge Bazanski52304a82021-10-29 16:56:18 +0200254var ClusterPorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200255 node.CuratorServicePort,
256 node.DebugServicePort,
257
258 node.KubernetesAPIPort,
259}
260
261// ClusterOptions contains all options for launching a Metropolis cluster.
262type ClusterOptions struct {
263 // The number of nodes this cluster should be started with.
264 NumNodes int
265}
266
267// Cluster is the running Metropolis cluster launched using the LaunchCluster
268// function.
269type Cluster struct {
270 // Debug is the NodeDebugService gRPC service, allowing for debug
271 // unauthenticated cluster to the access.
272 Debug apb.NodeDebugServiceClient
273 // Management is the Management gRPC service, authenticated as the owner of the
274 // cluster.
275 Management apb.ManagementClient
276 // Owner is the TLS Certificate of the owner of the test cluster. This can be
277 // used to authenticate further clients to the running cluster.
278 Owner tls.Certificate
279 // Ports is the PortMap used to access the first nodes' services (defined in
280 // ClusterPorts).
281 Ports launch.PortMap
282
283 // nodesDone is a list of channels populated with the return codes from all the
284 // nodes' qemu instances. It's used by Close to ensure all nodes have
285 // succesfully been stopped.
286 nodesDone []chan error
287 // ctxC is used by Close to cancel the context under which the nodes are
288 // running.
289 ctxC context.CancelFunc
290}
291
292// LaunchCluster launches a cluster of Metropolis node VMs together with a
293// Nanoswitch instance to network them all together.
294//
295// The given context will be used to run all qemu instances in the cluster, and
296// canceling the context or calling Close() will terminate them.
297func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
298 if opts.NumNodes == 0 {
299 return nil, errors.New("refusing to start cluster with zero nodes")
300 }
301 if opts.NumNodes > 1 {
302 return nil, errors.New("unimplemented")
303 }
304
305 ctxT, ctxC := context.WithCancel(ctx)
306
307 // Prepare links between nodes and nanoswitch.
308 var switchPorts []*os.File
309 var vmPorts []*os.File
310 for i := 0; i < opts.NumNodes; i++ {
311 switchPort, vmPort, err := launch.NewSocketPair()
312 if err != nil {
313 ctxC()
314 return nil, fmt.Errorf("failed to get socketpair: %w", err)
315 }
316 switchPorts = append(switchPorts, switchPort)
317 vmPorts = append(vmPorts, vmPort)
318 }
319
320 // Make a list of channels that will be populated and closed by all running node
321 // qemu processes.
322 done := make([]chan error, opts.NumNodes)
323 for i, _ := range done {
324 done[i] = make(chan error, 1)
325 }
326
327 // Start first node.
328 go func() {
329 err := LaunchNode(ctxT, NodeOptions{
330 ConnectToSocket: vmPorts[0],
331 NodeParameters: &apb.NodeParameters{
332 Cluster: &apb.NodeParameters_ClusterBootstrap_{
333 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
334 OwnerPublicKey: InsecurePublicKey,
335 },
336 },
337 },
338 })
339 done[0] <- err
340 }()
341
342 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
343 if err != nil {
344 ctxC()
345 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
346 }
347
348 go func() {
349 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
350 KernelPath: "metropolis/test/ktest/vmlinux",
351 InitramfsPath: "metropolis/test/nanoswitch/initramfs.lz4",
352 ExtraNetworkInterfaces: switchPorts,
353 PortMap: portMap,
354 }); err != nil {
355 if !errors.Is(err, ctxT.Err()) {
356 log.Printf("Failed to launch nanoswitch: %v", err)
357 }
358 }
359 }()
360
361 // Dial debug service.
362 copts := []grpcretry.CallOption{
363 grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
364 }
365 debugConn, err := portMap.DialGRPC(node.DebugServicePort, grpc.WithInsecure(),
366 grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(copts...)))
367 if err != nil {
368 ctxC()
369 return nil, fmt.Errorf("failed to dial debug service: %w", err)
370 }
371
372 // Dial external service.
373 remote := fmt.Sprintf("localhost:%v", portMap[node.CuratorServicePort])
374 initClient, err := rpc.NewEphemeralClient(remote, InsecurePrivateKey, nil)
375 if err != nil {
376 ctxC()
377 return nil, fmt.Errorf("NewInitialClient: %w", err)
378 }
379
380 // Retrieve owner certificate - this can take a while because the node is still
381 // coming up, so do it in a backoff loop.
382 log.Printf("Cluster: retrieving owner certificate...")
383 aaa := apb.NewAAAClient(initClient)
384 var cert *tls.Certificate
385 err = backoff.Retry(func() error {
386 cert, err = rpc.RetrieveOwnerCertificate(ctxT, aaa, InsecurePrivateKey)
387 return err
388 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctxT))
389 if err != nil {
390 ctxC()
391 return nil, err
392 }
393 log.Printf("Cluster: retrieved owner certificate.")
394
395 authClient, err := rpc.NewAuthenticatedClient(remote, *cert, nil)
396 if err != nil {
397 ctxC()
398 return nil, fmt.Errorf("NewAuthenticatedClient: %w", err)
399 }
400
401 return &Cluster{
402 Debug: apb.NewNodeDebugServiceClient(debugConn),
403 Management: apb.NewManagementClient(authClient),
404 Owner: *cert,
405 Ports: portMap,
406 nodesDone: done,
407
408 ctxC: ctxC,
409 }, nil
410}
411
412// Close cancels the running clusters' context and waits for all virtualized
413// nodes to stop. It returns an error if stopping the nodes failed, or one of
414// the nodes failed to fully start in the first place.
415func (c *Cluster) Close() error {
416 log.Printf("Cluster: stopping...")
417 c.ctxC()
418
419 var errors []error
420 log.Printf("Cluster: waiting for nodes to exit...")
421 for _, c := range c.nodesDone {
422 err := <-c
423 if err != nil {
424 errors = append(errors, err)
425 }
426 }
427 log.Printf("Cluster: done")
428 return multierr.Combine(errors...)
429}