blob: 7af1f55bf1fa97e0af2688116d54170b682b0022 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
24 grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
25 "go.uber.org/multierr"
26 "google.golang.org/grpc"
27 "google.golang.org/protobuf/proto"
28
29 "source.monogon.dev/metropolis/node"
30 "source.monogon.dev/metropolis/node/core/rpc"
31 apb "source.monogon.dev/metropolis/proto/api"
32 "source.monogon.dev/metropolis/test/launch"
33)
34
35// Options contains all options that can be passed to Launch()
36type NodeOptions struct {
37 // Ports contains the port mapping where to expose the internal ports of the VM to
38 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
39 // ConnectToSocket is set.
40 Ports launch.PortMap
41
42 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
43 // command. Metropolis nodes generally restarts on almost all errors, so unless you
44 // want to test reboot behavior this should be false.
45 AllowReboot bool
46
47 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
48 // it is instead connected to the given file descriptor/socket. If this is set, all
49 // port maps from the Ports option are ignored. Intended for networking this
50 // instance together with others for running more complex network configurations.
51 ConnectToSocket *os.File
52
53 // SerialPort is a io.ReadWriter over which you can communicate with the serial
54 // port of the machine It can be set to an existing file descriptor (like
55 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
56 SerialPort io.ReadWriter
57
58 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
59 // registering into a cluster.
60 NodeParameters *apb.NodeParameters
61}
62
63// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020064var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020065 node.ConsensusPort,
66
67 node.CuratorServicePort,
68 node.DebugServicePort,
69
70 node.KubernetesAPIPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020071 node.CuratorServicePort,
72 node.DebuggerPort,
73}
74
75// LaunchNode launches a single Metropolis node instance with the given options.
76// The instance runs mostly paravirtualized but with some emulated hardware
77// similar to how a cloud provider might set up its VMs. The disk is fully
78// writable but is run in snapshot mode meaning that changes are not kept beyond
79// a single invocation.
80func LaunchNode(ctx context.Context, options NodeOptions) error {
81 // Pin temp directory to /tmp until we can use abstract socket namespace in QEMU
82 // (next release after 5.0,
83 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
84 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
85 // that we open and pass the name of it to QEMU. Not pinning this crashes both
86 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
87 // reasons 108 chars).
Lorenz Brun764a2de2021-11-22 16:26:36 +010088 tempDir, err := os.MkdirTemp("/tmp", "launch*")
Serge Bazanski66e58952021-10-05 17:06:56 +020089 if err != nil {
90 return fmt.Errorf("failed to create temporary directory: %w", err)
91 }
92 defer os.RemoveAll(tempDir)
93
94 // Copy TPM state into a temporary directory since it's being modified by the
95 // emulator
96 tpmTargetDir := filepath.Join(tempDir, "tpm")
97 tpmSrcDir := "metropolis/node/tpm"
98 if err := os.Mkdir(tpmTargetDir, 0755); err != nil {
99 return fmt.Errorf("failed to create TPM state directory: %w", err)
100 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100101 tpmFiles, err := os.ReadDir(tpmSrcDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200102 if err != nil {
103 return fmt.Errorf("failed to read TPM directory: %w", err)
104 }
105 for _, file := range tpmFiles {
106 name := file.Name()
107 src := filepath.Join(tpmSrcDir, name)
108 target := filepath.Join(tpmTargetDir, name)
109 if err := copyFile(src, target); err != nil {
110 return fmt.Errorf("failed to copy TPM directory: file %q to %q: %w", src, target, err)
111 }
112 }
113
114 var qemuNetType string
115 var qemuNetConfig launch.QemuValue
116 if options.ConnectToSocket != nil {
117 qemuNetType = "socket"
118 qemuNetConfig = launch.QemuValue{
119 "id": {"net0"},
120 "fd": {"3"},
121 }
122 } else {
123 qemuNetType = "user"
124 qemuNetConfig = launch.QemuValue{
125 "id": {"net0"},
126 "net": {"10.42.0.0/24"},
127 "dhcpstart": {"10.42.0.10"},
128 "hostfwd": options.Ports.ToQemuForwards(),
129 }
130 }
131
132 tpmSocketPath := filepath.Join(tempDir, "tpm-socket")
133
134 mac, err := generateRandomEthernetMAC()
135 if err != nil {
136 return err
137 }
138
139 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
140 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
141 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
142 "-drive", "if=pflash,format=raw,snapshot=on,file=external/edk2/OVMF_VARS.fd",
143 "-drive", "if=virtio,format=raw,snapshot=on,cache=unsafe,file=metropolis/node/node.img",
144 "-netdev", qemuNetConfig.ToOption(qemuNetType),
145 "-device", "virtio-net-pci,netdev=net0,mac=" + mac.String(),
146 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
147 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
148 "-device", "tpm-tis,tpmdev=tpm0",
149 "-device", "virtio-rng-pci",
150 "-serial", "stdio"}
151
152 if !options.AllowReboot {
153 qemuArgs = append(qemuArgs, "-no-reboot")
154 }
155
156 if options.NodeParameters != nil {
157 parametersPath := filepath.Join(tempDir, "parameters.pb")
158 parametersRaw, err := proto.Marshal(options.NodeParameters)
159 if err != nil {
160 return fmt.Errorf("failed to encode node paraeters: %w", err)
161 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100162 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200163 return fmt.Errorf("failed to write node parameters: %w", err)
164 }
165 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
166 }
167
168 // Start TPM emulator as a subprocess
169 tpmCtx, tpmCancel := context.WithCancel(ctx)
170 defer tpmCancel()
171
172 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmTargetDir, "--ctrl", "type=unixio,path="+tpmSocketPath)
173 tpmEmuCmd.Stderr = os.Stderr
174 tpmEmuCmd.Stdout = os.Stdout
175
176 err = tpmEmuCmd.Start()
177 if err != nil {
178 return fmt.Errorf("failed to start TPM emulator: %w", err)
179 }
180
181 // Start the main qemu binary
182 systemCmd := exec.CommandContext(ctx, "qemu-system-x86_64", qemuArgs...)
183 if options.ConnectToSocket != nil {
184 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
185 }
186
187 var stdErrBuf bytes.Buffer
188 systemCmd.Stderr = &stdErrBuf
189 systemCmd.Stdout = options.SerialPort
190
191 err = systemCmd.Run()
192
193 // Stop TPM emulator and wait for it to exit to properly reap the child process
194 tpmCancel()
195 log.Print("Node: Waiting for TPM emulator to exit")
196 // Wait returns a SIGKILL error because we just cancelled its context.
197 // We still need to call it to avoid creating zombies.
198 _ = tpmEmuCmd.Wait()
199 log.Print("Node: TPM emulator done")
200
201 var exerr *exec.ExitError
202 if err != nil && errors.As(err, &exerr) {
203 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
204 if status.Signaled() && status.Signal() == syscall.SIGKILL {
205 // Process was killed externally (most likely by our context being canceled).
206 // This is a normal exit for us, so return nil
207 return nil
208 }
209 exerr.Stderr = stdErrBuf.Bytes()
210 newErr := launch.QEMUError(*exerr)
211 return &newErr
212 }
213 return err
214}
215
216func copyFile(src, dst string) error {
217 in, err := os.Open(src)
218 if err != nil {
219 return fmt.Errorf("when opening source: %w", err)
220 }
221 defer in.Close()
222
223 out, err := os.Create(dst)
224 if err != nil {
225 return fmt.Errorf("when creating destination: %w", err)
226 }
227 defer out.Close()
228
229 _, err = io.Copy(out, in)
230 if err != nil {
231 return fmt.Errorf("when copying file: %w", err)
232 }
233 return out.Close()
234}
235
236// Gets a random EUI-48 Ethernet MAC address
237func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
238 macBuf := make([]byte, 6)
239 _, err := rand.Read(macBuf)
240 if err != nil {
241 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
242 }
243
244 // Set U/L bit and clear I/G bit (locally administered individual MAC)
245 // Ref IEEE 802-2014 Section 8.2.2
246 macBuf[0] = (macBuf[0] | 2) & 0xfe
247 mac := net.HardwareAddr(macBuf)
248 return &mac, nil
249}
250
251// ClusterPorts contains all ports forwarded by Nanoswitch to the first VM in a
252// launched Metropolis cluster.
Serge Bazanski52304a82021-10-29 16:56:18 +0200253var ClusterPorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200254 node.CuratorServicePort,
255 node.DebugServicePort,
256
257 node.KubernetesAPIPort,
258}
259
260// ClusterOptions contains all options for launching a Metropolis cluster.
261type ClusterOptions struct {
262 // The number of nodes this cluster should be started with.
263 NumNodes int
264}
265
266// Cluster is the running Metropolis cluster launched using the LaunchCluster
267// function.
268type Cluster struct {
269 // Debug is the NodeDebugService gRPC service, allowing for debug
270 // unauthenticated cluster to the access.
271 Debug apb.NodeDebugServiceClient
272 // Management is the Management gRPC service, authenticated as the owner of the
273 // cluster.
274 Management apb.ManagementClient
275 // Owner is the TLS Certificate of the owner of the test cluster. This can be
276 // used to authenticate further clients to the running cluster.
277 Owner tls.Certificate
278 // Ports is the PortMap used to access the first nodes' services (defined in
279 // ClusterPorts).
280 Ports launch.PortMap
281
282 // nodesDone is a list of channels populated with the return codes from all the
283 // nodes' qemu instances. It's used by Close to ensure all nodes have
284 // succesfully been stopped.
285 nodesDone []chan error
286 // ctxC is used by Close to cancel the context under which the nodes are
287 // running.
288 ctxC context.CancelFunc
289}
290
291// LaunchCluster launches a cluster of Metropolis node VMs together with a
292// Nanoswitch instance to network them all together.
293//
294// The given context will be used to run all qemu instances in the cluster, and
295// canceling the context or calling Close() will terminate them.
296func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
297 if opts.NumNodes == 0 {
298 return nil, errors.New("refusing to start cluster with zero nodes")
299 }
300 if opts.NumNodes > 1 {
301 return nil, errors.New("unimplemented")
302 }
303
304 ctxT, ctxC := context.WithCancel(ctx)
305
306 // Prepare links between nodes and nanoswitch.
307 var switchPorts []*os.File
308 var vmPorts []*os.File
309 for i := 0; i < opts.NumNodes; i++ {
310 switchPort, vmPort, err := launch.NewSocketPair()
311 if err != nil {
312 ctxC()
313 return nil, fmt.Errorf("failed to get socketpair: %w", err)
314 }
315 switchPorts = append(switchPorts, switchPort)
316 vmPorts = append(vmPorts, vmPort)
317 }
318
319 // Make a list of channels that will be populated and closed by all running node
320 // qemu processes.
321 done := make([]chan error, opts.NumNodes)
322 for i, _ := range done {
323 done[i] = make(chan error, 1)
324 }
325
326 // Start first node.
327 go func() {
328 err := LaunchNode(ctxT, NodeOptions{
329 ConnectToSocket: vmPorts[0],
330 NodeParameters: &apb.NodeParameters{
331 Cluster: &apb.NodeParameters_ClusterBootstrap_{
332 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
333 OwnerPublicKey: InsecurePublicKey,
334 },
335 },
336 },
337 })
338 done[0] <- err
339 }()
340
341 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
342 if err != nil {
343 ctxC()
344 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
345 }
346
347 go func() {
348 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
349 KernelPath: "metropolis/test/ktest/vmlinux",
350 InitramfsPath: "metropolis/test/nanoswitch/initramfs.lz4",
351 ExtraNetworkInterfaces: switchPorts,
352 PortMap: portMap,
353 }); err != nil {
354 if !errors.Is(err, ctxT.Err()) {
355 log.Printf("Failed to launch nanoswitch: %v", err)
356 }
357 }
358 }()
359
360 // Dial debug service.
361 copts := []grpcretry.CallOption{
362 grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
363 }
364 debugConn, err := portMap.DialGRPC(node.DebugServicePort, grpc.WithInsecure(),
365 grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(copts...)))
366 if err != nil {
367 ctxC()
368 return nil, fmt.Errorf("failed to dial debug service: %w", err)
369 }
370
371 // Dial external service.
372 remote := fmt.Sprintf("localhost:%v", portMap[node.CuratorServicePort])
373 initClient, err := rpc.NewEphemeralClient(remote, InsecurePrivateKey, nil)
374 if err != nil {
375 ctxC()
376 return nil, fmt.Errorf("NewInitialClient: %w", err)
377 }
378
379 // Retrieve owner certificate - this can take a while because the node is still
380 // coming up, so do it in a backoff loop.
381 log.Printf("Cluster: retrieving owner certificate...")
382 aaa := apb.NewAAAClient(initClient)
383 var cert *tls.Certificate
384 err = backoff.Retry(func() error {
385 cert, err = rpc.RetrieveOwnerCertificate(ctxT, aaa, InsecurePrivateKey)
386 return err
387 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctxT))
388 if err != nil {
389 ctxC()
390 return nil, err
391 }
392 log.Printf("Cluster: retrieved owner certificate.")
393
394 authClient, err := rpc.NewAuthenticatedClient(remote, *cert, nil)
395 if err != nil {
396 ctxC()
397 return nil, fmt.Errorf("NewAuthenticatedClient: %w", err)
398 }
399
400 return &Cluster{
401 Debug: apb.NewNodeDebugServiceClient(debugConn),
402 Management: apb.NewManagementClient(authClient),
403 Owner: *cert,
404 Ports: portMap,
405 nodesDone: done,
406
407 ctxC: ctxC,
408 }, nil
409}
410
411// Close cancels the running clusters' context and waits for all virtualized
412// nodes to stop. It returns an error if stopping the nodes failed, or one of
413// the nodes failed to fully start in the first place.
414func (c *Cluster) Close() error {
415 log.Printf("Cluster: stopping...")
416 c.ctxC()
417
418 var errors []error
419 log.Printf("Cluster: waiting for nodes to exit...")
420 for _, c := range c.nodesDone {
421 err := <-c
422 if err != nil {
423 errors = append(errors, err)
424 }
425 }
426 log.Printf("Cluster: done")
427 return multierr.Combine(errors...)
428}