blob: b3049e3972039955b53222655dc0e6c2d2eea7a2 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020024 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020025 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "google.golang.org/protobuf/proto"
30
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020031 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020032 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020033 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020035 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020037 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020038 "source.monogon.dev/metropolis/test/launch"
39)
40
41// Options contains all options that can be passed to Launch()
42type NodeOptions struct {
43 // Ports contains the port mapping where to expose the internal ports of the VM to
44 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
45 // ConnectToSocket is set.
46 Ports launch.PortMap
47
48 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
49 // command. Metropolis nodes generally restarts on almost all errors, so unless you
50 // want to test reboot behavior this should be false.
51 AllowReboot bool
52
53 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
54 // it is instead connected to the given file descriptor/socket. If this is set, all
55 // port maps from the Ports option are ignored. Intended for networking this
56 // instance together with others for running more complex network configurations.
57 ConnectToSocket *os.File
58
59 // SerialPort is a io.ReadWriter over which you can communicate with the serial
60 // port of the machine It can be set to an existing file descriptor (like
61 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
62 SerialPort io.ReadWriter
63
64 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
65 // registering into a cluster.
66 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020067
68 // Mac is the node's MAC address.
69 Mac *net.HardwareAddr
70
71 // Runtime keeps the node's QEMU runtime state.
72 Runtime *NodeRuntime
73}
74
75// Runtime keeps the node's QEMU runtime options.
76type NodeRuntime struct {
77 // ld points at the node's launch directory storing data such as storage
78 // images, firmware variables or the TPM state.
79 ld string
80 // sd points at the node's socket directory.
81 sd string
82
83 // ctxT is the context QEMU will execute in.
84 ctxT context.Context
85 // CtxC is the QEMU context's cancellation function.
86 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +020087}
88
89// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020090var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020091 node.ConsensusPort,
92
93 node.CuratorServicePort,
94 node.DebugServicePort,
95
96 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +010097 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020098 node.CuratorServicePort,
99 node.DebuggerPort,
100}
101
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200102// setupRuntime creates the node's QEMU runtime directory, together with all
103// files required to preserve its state, a level below the chosen path ld. The
104// node's socket directory is similarily created a level below sd. It may
105// return an I/O error.
106func setupRuntime(ld, sd string) (*NodeRuntime, error) {
107 // Create a temporary directory to keep all the runtime files.
108 stdp, err := os.MkdirTemp(ld, "node_state*")
109 if err != nil {
110 return nil, fmt.Errorf("failed to create the state directory: %w", err)
111 }
112
113 // Initialize the node's storage with a prebuilt image.
114 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
115 if err != nil {
116 return nil, fmt.Errorf("while resolving a path: %w", err)
117 }
118 di := filepath.Join(stdp, filepath.Base(si))
119 log.Printf("Cluster: copying the node image: %s -> %s", si, di)
120 if err := copyFile(si, di); err != nil {
121 return nil, fmt.Errorf("while copying the node image: %w", err)
122 }
123
124 // Initialize the OVMF firmware variables file.
125 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
126 if err != nil {
127 return nil, fmt.Errorf("while resolving a path: %w", err)
128 }
129 dv := filepath.Join(stdp, filepath.Base(sv))
130 if err := copyFile(sv, dv); err != nil {
131 return nil, fmt.Errorf("while copying firmware variables: %w", err)
132 }
133
134 // Create the TPM state directory and initialize all files required by swtpm.
135 tpmt := filepath.Join(stdp, "tpm")
136 if err := os.Mkdir(tpmt, 0755); err != nil {
137 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
138 }
139 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
140 if err != nil {
141 return nil, fmt.Errorf("while resolving a path: %w", err)
142 }
143 tpmf, err := os.ReadDir(tpms)
144 if err != nil {
145 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
146 }
147 for _, file := range tpmf {
148 name := file.Name()
149 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
150 if err != nil {
151 return nil, fmt.Errorf("while resolving a path: %w", err)
152 }
153 tgt := filepath.Join(tpmt, name)
154 if err := copyFile(src, tgt); err != nil {
155 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
156 }
157 }
158
159 // Create the socket directory.
160 sotdp, err := os.MkdirTemp(sd, "node_sock*")
161 if err != nil {
162 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
163 }
164
165 return &NodeRuntime{
166 ld: stdp,
167 sd: sotdp,
168 }, nil
169}
170
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200171// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200172// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200173func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200174 if c.authClient == nil {
175 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, nil)
Serge Bazanski58ddc092022-06-30 18:23:33 +0200176 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200177 log.Printf("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200178 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200179 for _, n := range c.NodeIDs {
180 ep, err := resolver.NodeWithDefaultPort(n)
181 if err != nil {
182 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
183 }
184 r.AddEndpoint(ep)
185 }
186 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
187 grpc.WithTransportCredentials(authCreds),
188 grpc.WithResolvers(r),
189 grpc.WithContextDialer(c.DialNode),
190 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200191 if err != nil {
192 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
193 }
194 c.authClient = authClient
195 }
196 return c.authClient, nil
197}
198
Serge Bazanski66e58952021-10-05 17:06:56 +0200199// LaunchNode launches a single Metropolis node instance with the given options.
200// The instance runs mostly paravirtualized but with some emulated hardware
201// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200202// writable, and the changes are kept across reboots and shutdowns. ld and sd
203// point to the launch directory and the socket directory, holding the nodes'
204// state files (storage, tpm state, firmware state), and UNIX socket files
205// (swtpm <-> QEMU interplay) respectively. The directories must exist before
206// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
207// if either are not initialized.
208func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
209 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
210 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200211 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
212 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
213 // that we open and pass the name of it to QEMU. Not pinning this crashes both
214 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
215 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200216
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200217 // If it's the node's first start, set up its runtime directories.
218 if options.Runtime == nil {
219 r, err := setupRuntime(ld, sd)
220 if err != nil {
221 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200222 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200223 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200224 }
225
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200226 // Replace the node's context with a new one.
227 r := options.Runtime
228 if r.CtxC != nil {
229 r.CtxC()
230 }
231 r.ctxT, r.CtxC = context.WithCancel(ctx)
232
Serge Bazanski66e58952021-10-05 17:06:56 +0200233 var qemuNetType string
234 var qemuNetConfig launch.QemuValue
235 if options.ConnectToSocket != nil {
236 qemuNetType = "socket"
237 qemuNetConfig = launch.QemuValue{
238 "id": {"net0"},
239 "fd": {"3"},
240 }
241 } else {
242 qemuNetType = "user"
243 qemuNetConfig = launch.QemuValue{
244 "id": {"net0"},
245 "net": {"10.42.0.0/24"},
246 "dhcpstart": {"10.42.0.10"},
247 "hostfwd": options.Ports.ToQemuForwards(),
248 }
249 }
250
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200251 // Generate the node's MAC address if it isn't already set in NodeOptions.
252 if options.Mac == nil {
253 mac, err := generateRandomEthernetMAC()
254 if err != nil {
255 return err
256 }
257 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200258 }
259
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200260 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
261 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
262 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200263 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
264 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
265 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200266 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
267 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200268 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200269 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200270 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
271 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
272 "-device", "tpm-tis,tpmdev=tpm0",
273 "-device", "virtio-rng-pci",
274 "-serial", "stdio"}
275
276 if !options.AllowReboot {
277 qemuArgs = append(qemuArgs, "-no-reboot")
278 }
279
280 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200281 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200282 parametersRaw, err := proto.Marshal(options.NodeParameters)
283 if err != nil {
284 return fmt.Errorf("failed to encode node paraeters: %w", err)
285 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100286 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200287 return fmt.Errorf("failed to write node parameters: %w", err)
288 }
289 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
290 }
291
292 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200293 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200294 defer tpmCancel()
295
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200296 tpmd := filepath.Join(r.ld, "tpm")
297 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200298 tpmEmuCmd.Stderr = os.Stderr
299 tpmEmuCmd.Stdout = os.Stdout
300
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200301 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200302 if err != nil {
303 return fmt.Errorf("failed to start TPM emulator: %w", err)
304 }
305
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200306 // Wait for the socket to be created by the TPM emulator before launching
307 // QEMU.
308 for {
309 _, err := os.Stat(tpmSocketPath)
310 if err == nil {
311 break
312 }
313 if err != nil && !os.IsNotExist(err) {
314 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
315 }
316 if err := tpmCtx.Err(); err != nil {
317 return fmt.Errorf("while waiting for the TPM socket: %w", err)
318 }
319 time.Sleep(time.Millisecond * 100)
320 }
321
Serge Bazanski66e58952021-10-05 17:06:56 +0200322 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200323 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200324 if options.ConnectToSocket != nil {
325 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
326 }
327
328 var stdErrBuf bytes.Buffer
329 systemCmd.Stderr = &stdErrBuf
330 systemCmd.Stdout = options.SerialPort
331
332 err = systemCmd.Run()
333
334 // Stop TPM emulator and wait for it to exit to properly reap the child process
335 tpmCancel()
336 log.Print("Node: Waiting for TPM emulator to exit")
337 // Wait returns a SIGKILL error because we just cancelled its context.
338 // We still need to call it to avoid creating zombies.
339 _ = tpmEmuCmd.Wait()
340 log.Print("Node: TPM emulator done")
341
342 var exerr *exec.ExitError
343 if err != nil && errors.As(err, &exerr) {
344 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
345 if status.Signaled() && status.Signal() == syscall.SIGKILL {
346 // Process was killed externally (most likely by our context being canceled).
347 // This is a normal exit for us, so return nil
348 return nil
349 }
350 exerr.Stderr = stdErrBuf.Bytes()
351 newErr := launch.QEMUError(*exerr)
352 return &newErr
353 }
354 return err
355}
356
357func copyFile(src, dst string) error {
358 in, err := os.Open(src)
359 if err != nil {
360 return fmt.Errorf("when opening source: %w", err)
361 }
362 defer in.Close()
363
364 out, err := os.Create(dst)
365 if err != nil {
366 return fmt.Errorf("when creating destination: %w", err)
367 }
368 defer out.Close()
369
370 _, err = io.Copy(out, in)
371 if err != nil {
372 return fmt.Errorf("when copying file: %w", err)
373 }
374 return out.Close()
375}
376
Serge Bazanskie78a0892021-10-07 17:03:49 +0200377// getNodes wraps around Management.GetNodes to return a list of nodes in a
378// cluster.
379func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200380 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100381 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100382 err := backoff.Retry(func() error {
383 res = nil
384 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200385 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100386 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200387 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100388 for {
389 node, err := srvN.Recv()
390 if err == io.EOF {
391 break
392 }
393 if err != nil {
394 return fmt.Errorf("GetNodes.Recv: %w", err)
395 }
396 res = append(res, node)
397 }
398 return nil
399 }, bo)
400 if err != nil {
401 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200402 }
403 return res, nil
404}
405
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200406// getNode wraps Management.GetNodes. It returns node information matching
407// given node ID.
408func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
409 nodes, err := getNodes(ctx, mgmt)
410 if err != nil {
411 return nil, fmt.Errorf("could not get nodes: %w", err)
412 }
413 for _, n := range nodes {
414 eid := identity.NodeID(n.Pubkey)
415 if eid != id {
416 continue
417 }
418 return n, nil
419 }
420 return nil, fmt.Errorf("no such node.")
421}
422
Serge Bazanski66e58952021-10-05 17:06:56 +0200423// Gets a random EUI-48 Ethernet MAC address
424func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
425 macBuf := make([]byte, 6)
426 _, err := rand.Read(macBuf)
427 if err != nil {
428 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
429 }
430
431 // Set U/L bit and clear I/G bit (locally administered individual MAC)
432 // Ref IEEE 802-2014 Section 8.2.2
433 macBuf[0] = (macBuf[0] | 2) & 0xfe
434 mac := net.HardwareAddr(macBuf)
435 return &mac, nil
436}
437
Serge Bazanskibe742842022-04-04 13:18:50 +0200438const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200439
Serge Bazanskibe742842022-04-04 13:18:50 +0200440// ClusterPorts contains all ports handled by Nanoswitch.
441var ClusterPorts = []uint16{
442 // Forwarded to the first node.
443 uint16(node.CuratorServicePort),
444 uint16(node.DebugServicePort),
445 uint16(node.KubernetesAPIPort),
446 uint16(node.KubernetesAPIWrappedPort),
447
448 // SOCKS proxy to the switch network
449 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200450}
451
452// ClusterOptions contains all options for launching a Metropolis cluster.
453type ClusterOptions struct {
454 // The number of nodes this cluster should be started with.
455 NumNodes int
456}
457
458// Cluster is the running Metropolis cluster launched using the LaunchCluster
459// function.
460type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200461 // Owner is the TLS Certificate of the owner of the test cluster. This can be
462 // used to authenticate further clients to the running cluster.
463 Owner tls.Certificate
464 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200465 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200466 Ports launch.PortMap
467
Serge Bazanskibe742842022-04-04 13:18:50 +0200468 // Nodes is a map from Node ID to its runtime information.
469 Nodes map[string]*NodeInCluster
470 // NodeIDs is a list of node IDs that are backing this cluster, in order of
471 // creation.
472 NodeIDs []string
473
Serge Bazanski66e58952021-10-05 17:06:56 +0200474 // nodesDone is a list of channels populated with the return codes from all the
475 // nodes' qemu instances. It's used by Close to ensure all nodes have
476 // succesfully been stopped.
477 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200478 // nodeOpts are the cluster member nodes' mutable launch options, kept here
479 // to facilitate reboots.
480 nodeOpts []NodeOptions
481 // launchDir points at the directory keeping the nodes' state, such as storage
482 // images, firmware variable files, TPM state.
483 launchDir string
484 // socketDir points at the directory keeping UNIX socket files, such as these
485 // used to facilitate communication between QEMU and swtpm. It's different
486 // from launchDir, and anchored nearer the file system root, due to the
487 // socket path length limitation imposed by the kernel.
488 socketDir string
489
Serge Bazanskibe742842022-04-04 13:18:50 +0200490 // socksDialer is used by DialNode to establish connections to nodes via the
491 // SOCKS server ran by nanoswitch.
492 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200493
494 // authClient is a cached authenticated owner connection to a Curator
495 // instance within the cluster.
496 authClient *grpc.ClientConn
497
498 // ctxT is the context individual node contexts are created from.
499 ctxT context.Context
500 // ctxC is used by Close to cancel the context under which the nodes are
501 // running.
502 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200503}
504
505// NodeInCluster represents information about a node that's part of a Cluster.
506type NodeInCluster struct {
507 // ID of the node, which can be used to dial this node's services via DialNode.
508 ID string
509 // Address of the node on the network ran by nanoswitch. Not reachable from the
510 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
511 // on Cluster.Ports[SOCKSPort]).
512 ManagementAddress string
513}
514
515// firstConnection performs the initial owner credential escrow with a newly
516// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
517// running at 10.1.0.2, which is always the case with the current nanoswitch
518// implementation.
519//
520// It returns the newly escrowed credentials as well as the firt node's
521// information as NodeInCluster.
522func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
523 // Dial external service.
524 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
525 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
526 if err != nil {
527 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
528 }
529 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
530 return socksDialer.Dial("tcp", addr)
531 }
532 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
533 if err != nil {
534 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
535 }
536 defer initClient.Close()
537
538 // Retrieve owner certificate - this can take a while because the node is still
539 // coming up, so do it in a backoff loop.
540 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
541 aaa := apb.NewAAAClient(initClient)
542 var cert *tls.Certificate
543 err = backoff.Retry(func() error {
544 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
545 if st, ok := status.FromError(err); ok {
546 if st.Code() == codes.Unavailable {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200547 log.Printf("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200548 return err
549 }
550 }
551 return backoff.Permanent(err)
552 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
553 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200554 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200555 }
556 log.Printf("Cluster: retrieved owner certificate.")
557
558 // Now connect authenticated and get the node ID.
559 creds := rpc.NewAuthenticatedCredentials(*cert, nil)
560 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
561 if err != nil {
562 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
563 }
564 defer authClient.Close()
565 mgmt := apb.NewManagementClient(authClient)
566
567 var node *NodeInCluster
568 err = backoff.Retry(func() error {
569 nodes, err := getNodes(ctx, mgmt)
570 if err != nil {
571 return fmt.Errorf("retrieving nodes failed: %w", err)
572 }
573 if len(nodes) != 1 {
574 return fmt.Errorf("expected one node, got %d", len(nodes))
575 }
576 n := nodes[0]
577 if n.Status == nil || n.Status.ExternalAddress == "" {
578 return fmt.Errorf("node has no status and/or address")
579 }
580 node = &NodeInCluster{
581 ID: identity.NodeID(n.Pubkey),
582 ManagementAddress: n.Status.ExternalAddress,
583 }
584 return nil
585 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
586 if err != nil {
587 return nil, nil, err
588 }
589
590 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200591}
592
593// LaunchCluster launches a cluster of Metropolis node VMs together with a
594// Nanoswitch instance to network them all together.
595//
596// The given context will be used to run all qemu instances in the cluster, and
597// canceling the context or calling Close() will terminate them.
598func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200599 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200600 return nil, errors.New("refusing to start cluster with zero nodes")
601 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200602
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200603 // Create the launch directory.
604 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster*")
605 if err != nil {
606 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
607 }
608 // Create the socket directory.
609 sd, err := os.MkdirTemp("/tmp", "cluster*")
610 if err != nil {
611 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
612 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200613
614 // Prepare links between nodes and nanoswitch.
615 var switchPorts []*os.File
616 var vmPorts []*os.File
617 for i := 0; i < opts.NumNodes; i++ {
618 switchPort, vmPort, err := launch.NewSocketPair()
619 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200620 return nil, fmt.Errorf("failed to get socketpair: %w", err)
621 }
622 switchPorts = append(switchPorts, switchPort)
623 vmPorts = append(vmPorts, vmPort)
624 }
625
Serge Bazanskie78a0892021-10-07 17:03:49 +0200626 // Make a list of channels that will be populated by all running node qemu
627 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200628 done := make([]chan error, opts.NumNodes)
629 for i, _ := range done {
630 done[i] = make(chan error, 1)
631 }
632
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200633 // Prepare the node options. These will be kept as part of Cluster.
634 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
635 // launch. The runtime information can be later used to restart a node.
636 // The 0th node will be initialized first. The rest will follow after it
637 // had bootstrapped the cluster.
638 nodeOpts := make([]NodeOptions, opts.NumNodes)
639 nodeOpts[0] = NodeOptions{
640 ConnectToSocket: vmPorts[0],
641 NodeParameters: &apb.NodeParameters{
642 Cluster: &apb.NodeParameters_ClusterBootstrap_{
643 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
644 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200645 },
646 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200647 },
648 SerialPort: newPrefixedStdio(0),
649 }
650
651 // Start the first node.
652 ctxT, ctxC := context.WithCancel(ctx)
653 log.Printf("Cluster: Starting node %d...", 1)
654 go func() {
655 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200656 if err != nil {
657 log.Printf("Node %d finished with an error: %v", 1, err)
658 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200659 done[0] <- err
660 }()
661
Serge Bazanskie78a0892021-10-07 17:03:49 +0200662 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200663 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
664 if err != nil {
665 ctxC()
666 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
667 }
668
669 go func() {
670 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
671 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100672 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200673 ExtraNetworkInterfaces: switchPorts,
674 PortMap: portMap,
Serge Bazanski1fbc5972022-06-22 13:36:16 +0200675 SerialPort: newPrefixedStdio(99),
Serge Bazanski66e58952021-10-05 17:06:56 +0200676 }); err != nil {
677 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100678 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200679 }
680 }
681 }()
682
Serge Bazanskibe742842022-04-04 13:18:50 +0200683 // Build SOCKS dialer.
684 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
685 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200686 if err != nil {
687 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200688 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200689 }
690
Serge Bazanskibe742842022-04-04 13:18:50 +0200691 // Retrieve owner credentials and first node.
692 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200693 if err != nil {
694 ctxC()
695 return nil, err
696 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200697
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200698 // Set up a partially initialized cluster instance, to be filled in in the
699 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200700 cluster := &Cluster{
701 Owner: *cert,
702 Ports: portMap,
703 Nodes: map[string]*NodeInCluster{
704 firstNode.ID: firstNode,
705 },
706 NodeIDs: []string{
707 firstNode.ID,
708 },
709
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200710 nodesDone: done,
711 nodeOpts: nodeOpts,
712 launchDir: ld,
713 socketDir: sd,
714
Serge Bazanskibe742842022-04-04 13:18:50 +0200715 socksDialer: socksDialer,
716
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200717 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200718 ctxC: ctxC,
719 }
720
721 // Now start the rest of the nodes and register them into the cluster.
722
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200723 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200724 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200725 if err != nil {
726 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200727 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200728 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200729 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200730
731 // Retrieve register ticket to register further nodes.
732 log.Printf("Cluster: retrieving register ticket...")
733 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
734 if err != nil {
735 ctxC()
736 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
737 }
738 ticket := resT.Ticket
739 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
740
741 // Retrieve cluster info (for directory and ca public key) to register further
742 // nodes.
743 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
744 if err != nil {
745 ctxC()
746 return nil, fmt.Errorf("GetClusterInfo: %w", err)
747 }
748
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200749 // Use the retrieved information to configure the rest of the node options.
750 for i := 1; i < opts.NumNodes; i++ {
751 nodeOpts[i] = NodeOptions{
752 ConnectToSocket: vmPorts[i],
753 NodeParameters: &apb.NodeParameters{
754 Cluster: &apb.NodeParameters_ClusterRegister_{
755 ClusterRegister: &apb.NodeParameters_ClusterRegister{
756 RegisterTicket: ticket,
757 ClusterDirectory: resI.ClusterDirectory,
758 CaCertificate: resI.CaCertificate,
759 },
760 },
761 },
762 SerialPort: newPrefixedStdio(i),
763 }
764 }
765
766 // Now run the rest of the nodes.
767 //
Serge Bazanskie78a0892021-10-07 17:03:49 +0200768 // TODO(q3k): parallelize this
769 for i := 1; i < opts.NumNodes; i++ {
770 log.Printf("Cluster: Starting node %d...", i+1)
771 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200772 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200773 if err != nil {
774 log.Printf("Node %d finished with an error: %v", i, err)
775 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200776 done[i] <- err
777 }(i)
778 var newNode *apb.Node
779
780 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
781 for {
782 nodes, err := getNodes(ctx, mgmt)
783 if err != nil {
784 ctxC()
785 return nil, fmt.Errorf("could not get nodes: %w", err)
786 }
787 for _, n := range nodes {
788 if n.State == cpb.NodeState_NODE_STATE_NEW {
789 newNode = n
790 break
791 }
792 }
793 if newNode != nil {
794 break
795 }
796 time.Sleep(1 * time.Second)
797 }
798 id := identity.NodeID(newNode.Pubkey)
799 log.Printf("Cluster: node %d is %s", i, id)
800
801 log.Printf("Cluster: approving node %d", i)
802 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
803 Pubkey: newNode.Pubkey,
804 })
805 if err != nil {
806 ctxC()
807 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
808 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200809 log.Printf("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200810 for {
811 nodes, err := getNodes(ctx, mgmt)
812 if err != nil {
813 ctxC()
814 return nil, fmt.Errorf("could not get nodes: %w", err)
815 }
816 found := false
817 for _, n := range nodes {
818 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
819 continue
820 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200821 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200822 break
823 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200824 if n.State != cpb.NodeState_NODE_STATE_UP {
825 break
826 }
827 found = true
828 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
829 ID: identity.NodeID(n.Pubkey),
830 ManagementAddress: n.Status.ExternalAddress,
831 }
832 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
833 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200834 }
835 if found {
836 break
837 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200838 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200839 }
840 log.Printf("Cluster: node %d (%s) UP!", i, id)
841 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200842
Serge Bazanskibe742842022-04-04 13:18:50 +0200843 log.Printf("Cluster: all nodes up:")
844 for _, node := range cluster.Nodes {
845 log.Printf("Cluster: - %s at %s", node.ID, node.ManagementAddress)
846 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200847
Serge Bazanskibe742842022-04-04 13:18:50 +0200848 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200849}
850
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200851// RebootNode reboots the cluster member node matching the given index, and
852// waits for it to rejoin the cluster. It will use the given context ctx to run
853// cluster API requests, whereas the resulting QEMU process will be created
854// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
855func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
856 if idx < 0 || idx >= len(c.NodeIDs) {
857 return fmt.Errorf("index out of bounds.")
858 }
859 id := c.NodeIDs[idx]
860
861 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200862 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200863 if err != nil {
864 return err
865 }
866 mgmt := apb.NewManagementClient(curC)
867
868 // Get the timestamp of the node's last update, as observed by Curator.
869 // It'll be needed to make sure it had rejoined the cluster after the reboot.
870 var is *apb.Node
871 for {
872 r, err := getNode(ctx, mgmt, id)
873 if err != nil {
874 return err
875 }
876
877 // Node status may be absent if it hasn't reported to the cluster yet. Wait
878 // for it to appear before progressing further.
879 if r.Status != nil {
880 is = r
881 break
882 }
883 time.Sleep(time.Second)
884 }
885
886 // Cancel the node's context. This will shut down QEMU.
887 c.nodeOpts[idx].Runtime.CtxC()
888 log.Printf("Cluster: waiting for node %d (%s) to stop.", idx, id)
889 err = <-c.nodesDone[idx]
890 if err != nil {
891 return fmt.Errorf("while restarting node: %w", err)
892 }
893
894 // Start QEMU again.
895 log.Printf("Cluster: restarting node %d (%s).", idx, id)
896 go func(n int) {
897 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200898 if err != nil {
899 log.Printf("Node %d finished with an error: %v", n, err)
900 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200901 c.nodesDone[n] <- err
902 }(idx)
903
904 // Poll Management.GetNodes until the node's timestamp is updated.
905 for {
906 cs, err := getNode(ctx, mgmt, id)
907 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200908 log.Printf("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200909 return err
910 }
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200911 log.Printf("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200912 if cs.Status == nil {
913 continue
914 }
Mateusz Zalega28800ad2022-07-08 14:56:02 +0200915 if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200916 break
917 }
918 time.Sleep(time.Second)
919 }
920 log.Printf("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
921 return nil
922}
923
924// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +0200925// nodes to stop. It returns an error if stopping the nodes failed, or one of
926// the nodes failed to fully start in the first place.
927func (c *Cluster) Close() error {
928 log.Printf("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200929 if c.authClient != nil {
930 c.authClient.Close()
931 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200932 c.ctxC()
933
934 var errors []error
935 log.Printf("Cluster: waiting for nodes to exit...")
936 for _, c := range c.nodesDone {
937 err := <-c
938 if err != nil {
939 errors = append(errors, err)
940 }
941 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200942 log.Printf("Cluster: removing nodes' state files.")
943 os.RemoveAll(c.launchDir)
944 os.RemoveAll(c.socketDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200945 log.Printf("Cluster: done")
946 return multierr.Combine(errors...)
947}
Serge Bazanskibe742842022-04-04 13:18:50 +0200948
949// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
950// their ID. This is performed by connecting to the cluster nanoswitch via its
951// SOCKS proxy, and using the cluster node list for name resolution.
952//
953// For example:
954//
955// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
956//
957func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
958 host, port, err := net.SplitHostPort(addr)
959 if err != nil {
960 return nil, fmt.Errorf("invalid host:port: %w", err)
961 }
962 // Already an IP address?
963 if net.ParseIP(host) != nil {
964 return c.socksDialer.Dial("tcp", addr)
965 }
966
967 // Otherwise, expect a node name.
968 node, ok := c.Nodes[host]
969 if !ok {
970 return nil, fmt.Errorf("unknown node %q", host)
971 }
972 addr = net.JoinHostPort(node.ManagementAddress, port)
973 return c.socksDialer.Dial("tcp", addr)
974}