blob: 7b5a211e77395e05d1b17b36734bf925efc201e8 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020024 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020025 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "google.golang.org/protobuf/proto"
30
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020031 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020032 "source.monogon.dev/metropolis/node"
Serge Bazanskibe742842022-04-04 13:18:50 +020033 common "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020034 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020035 "source.monogon.dev/metropolis/node/core/rpc"
36 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020037 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020038 "source.monogon.dev/metropolis/test/launch"
39)
40
41// Options contains all options that can be passed to Launch()
42type NodeOptions struct {
43 // Ports contains the port mapping where to expose the internal ports of the VM to
44 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
45 // ConnectToSocket is set.
46 Ports launch.PortMap
47
48 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
49 // command. Metropolis nodes generally restarts on almost all errors, so unless you
50 // want to test reboot behavior this should be false.
51 AllowReboot bool
52
53 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
54 // it is instead connected to the given file descriptor/socket. If this is set, all
55 // port maps from the Ports option are ignored. Intended for networking this
56 // instance together with others for running more complex network configurations.
57 ConnectToSocket *os.File
58
59 // SerialPort is a io.ReadWriter over which you can communicate with the serial
60 // port of the machine It can be set to an existing file descriptor (like
61 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
62 SerialPort io.ReadWriter
63
64 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
65 // registering into a cluster.
66 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020067
68 // Mac is the node's MAC address.
69 Mac *net.HardwareAddr
70
71 // Runtime keeps the node's QEMU runtime state.
72 Runtime *NodeRuntime
73}
74
75// Runtime keeps the node's QEMU runtime options.
76type NodeRuntime struct {
77 // ld points at the node's launch directory storing data such as storage
78 // images, firmware variables or the TPM state.
79 ld string
80 // sd points at the node's socket directory.
81 sd string
82
83 // ctxT is the context QEMU will execute in.
84 ctxT context.Context
85 // CtxC is the QEMU context's cancellation function.
86 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +020087}
88
89// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020090var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020091 node.ConsensusPort,
92
93 node.CuratorServicePort,
94 node.DebugServicePort,
95
96 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +010097 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020098 node.CuratorServicePort,
99 node.DebuggerPort,
100}
101
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200102// setupRuntime creates the node's QEMU runtime directory, together with all
103// files required to preserve its state, a level below the chosen path ld. The
104// node's socket directory is similarily created a level below sd. It may
105// return an I/O error.
106func setupRuntime(ld, sd string) (*NodeRuntime, error) {
107 // Create a temporary directory to keep all the runtime files.
108 stdp, err := os.MkdirTemp(ld, "node_state*")
109 if err != nil {
110 return nil, fmt.Errorf("failed to create the state directory: %w", err)
111 }
112
113 // Initialize the node's storage with a prebuilt image.
114 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
115 if err != nil {
116 return nil, fmt.Errorf("while resolving a path: %w", err)
117 }
118 di := filepath.Join(stdp, filepath.Base(si))
119 log.Printf("Cluster: copying the node image: %s -> %s", si, di)
120 if err := copyFile(si, di); err != nil {
121 return nil, fmt.Errorf("while copying the node image: %w", err)
122 }
123
124 // Initialize the OVMF firmware variables file.
125 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
126 if err != nil {
127 return nil, fmt.Errorf("while resolving a path: %w", err)
128 }
129 dv := filepath.Join(stdp, filepath.Base(sv))
130 if err := copyFile(sv, dv); err != nil {
131 return nil, fmt.Errorf("while copying firmware variables: %w", err)
132 }
133
134 // Create the TPM state directory and initialize all files required by swtpm.
135 tpmt := filepath.Join(stdp, "tpm")
136 if err := os.Mkdir(tpmt, 0755); err != nil {
137 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
138 }
139 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
140 if err != nil {
141 return nil, fmt.Errorf("while resolving a path: %w", err)
142 }
143 tpmf, err := os.ReadDir(tpms)
144 if err != nil {
145 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
146 }
147 for _, file := range tpmf {
148 name := file.Name()
149 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
150 if err != nil {
151 return nil, fmt.Errorf("while resolving a path: %w", err)
152 }
153 tgt := filepath.Join(tpmt, name)
154 if err := copyFile(src, tgt); err != nil {
155 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
156 }
157 }
158
159 // Create the socket directory.
160 sotdp, err := os.MkdirTemp(sd, "node_sock*")
161 if err != nil {
162 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
163 }
164
165 return &NodeRuntime{
166 ld: stdp,
167 sd: sotdp,
168 }, nil
169}
170
171// curatorClient returns an authenticated owner connection to a Curator
172// instance within Cluster c, or nil together with an error.
173func (c *Cluster) curatorClient() (*grpc.ClientConn, error) {
174 if c.authClient == nil {
175 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, nil)
176 remote := net.JoinHostPort(c.NodeIDs[0], common.CuratorServicePort.PortString())
177 authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds), grpc.WithContextDialer(c.DialNode))
178 if err != nil {
179 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
180 }
181 c.authClient = authClient
182 }
183 return c.authClient, nil
184}
185
Serge Bazanski66e58952021-10-05 17:06:56 +0200186// LaunchNode launches a single Metropolis node instance with the given options.
187// The instance runs mostly paravirtualized but with some emulated hardware
188// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200189// writable, and the changes are kept across reboots and shutdowns. ld and sd
190// point to the launch directory and the socket directory, holding the nodes'
191// state files (storage, tpm state, firmware state), and UNIX socket files
192// (swtpm <-> QEMU interplay) respectively. The directories must exist before
193// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
194// if either are not initialized.
195func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
196 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
197 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200198 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
199 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
200 // that we open and pass the name of it to QEMU. Not pinning this crashes both
201 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
202 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200203
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200204 // If it's the node's first start, set up its runtime directories.
205 if options.Runtime == nil {
206 r, err := setupRuntime(ld, sd)
207 if err != nil {
208 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200209 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200210 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200211 }
212
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200213 // Replace the node's context with a new one.
214 r := options.Runtime
215 if r.CtxC != nil {
216 r.CtxC()
217 }
218 r.ctxT, r.CtxC = context.WithCancel(ctx)
219
Serge Bazanski66e58952021-10-05 17:06:56 +0200220 var qemuNetType string
221 var qemuNetConfig launch.QemuValue
222 if options.ConnectToSocket != nil {
223 qemuNetType = "socket"
224 qemuNetConfig = launch.QemuValue{
225 "id": {"net0"},
226 "fd": {"3"},
227 }
228 } else {
229 qemuNetType = "user"
230 qemuNetConfig = launch.QemuValue{
231 "id": {"net0"},
232 "net": {"10.42.0.0/24"},
233 "dhcpstart": {"10.42.0.10"},
234 "hostfwd": options.Ports.ToQemuForwards(),
235 }
236 }
237
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200238 // Generate the node's MAC address if it isn't already set in NodeOptions.
239 if options.Mac == nil {
240 mac, err := generateRandomEthernetMAC()
241 if err != nil {
242 return err
243 }
244 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200245 }
246
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200247 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
248 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
249 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200250 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
251 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
252 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200253 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
254 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200255 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200256 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200257 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
258 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
259 "-device", "tpm-tis,tpmdev=tpm0",
260 "-device", "virtio-rng-pci",
261 "-serial", "stdio"}
262
263 if !options.AllowReboot {
264 qemuArgs = append(qemuArgs, "-no-reboot")
265 }
266
267 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200268 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200269 parametersRaw, err := proto.Marshal(options.NodeParameters)
270 if err != nil {
271 return fmt.Errorf("failed to encode node paraeters: %w", err)
272 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100273 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200274 return fmt.Errorf("failed to write node parameters: %w", err)
275 }
276 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
277 }
278
279 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200280 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200281 defer tpmCancel()
282
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200283 tpmd := filepath.Join(r.ld, "tpm")
284 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200285 tpmEmuCmd.Stderr = os.Stderr
286 tpmEmuCmd.Stdout = os.Stdout
287
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200288 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200289 if err != nil {
290 return fmt.Errorf("failed to start TPM emulator: %w", err)
291 }
292
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200293 // Wait for the socket to be created by the TPM emulator before launching
294 // QEMU.
295 for {
296 _, err := os.Stat(tpmSocketPath)
297 if err == nil {
298 break
299 }
300 if err != nil && !os.IsNotExist(err) {
301 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
302 }
303 if err := tpmCtx.Err(); err != nil {
304 return fmt.Errorf("while waiting for the TPM socket: %w", err)
305 }
306 time.Sleep(time.Millisecond * 100)
307 }
308
Serge Bazanski66e58952021-10-05 17:06:56 +0200309 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200310 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200311 if options.ConnectToSocket != nil {
312 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
313 }
314
315 var stdErrBuf bytes.Buffer
316 systemCmd.Stderr = &stdErrBuf
317 systemCmd.Stdout = options.SerialPort
318
319 err = systemCmd.Run()
320
321 // Stop TPM emulator and wait for it to exit to properly reap the child process
322 tpmCancel()
323 log.Print("Node: Waiting for TPM emulator to exit")
324 // Wait returns a SIGKILL error because we just cancelled its context.
325 // We still need to call it to avoid creating zombies.
326 _ = tpmEmuCmd.Wait()
327 log.Print("Node: TPM emulator done")
328
329 var exerr *exec.ExitError
330 if err != nil && errors.As(err, &exerr) {
331 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
332 if status.Signaled() && status.Signal() == syscall.SIGKILL {
333 // Process was killed externally (most likely by our context being canceled).
334 // This is a normal exit for us, so return nil
335 return nil
336 }
337 exerr.Stderr = stdErrBuf.Bytes()
338 newErr := launch.QEMUError(*exerr)
339 return &newErr
340 }
341 return err
342}
343
344func copyFile(src, dst string) error {
345 in, err := os.Open(src)
346 if err != nil {
347 return fmt.Errorf("when opening source: %w", err)
348 }
349 defer in.Close()
350
351 out, err := os.Create(dst)
352 if err != nil {
353 return fmt.Errorf("when creating destination: %w", err)
354 }
355 defer out.Close()
356
357 _, err = io.Copy(out, in)
358 if err != nil {
359 return fmt.Errorf("when copying file: %w", err)
360 }
361 return out.Close()
362}
363
Serge Bazanskie78a0892021-10-07 17:03:49 +0200364// getNodes wraps around Management.GetNodes to return a list of nodes in a
365// cluster.
366func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200367 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100368 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100369 err := backoff.Retry(func() error {
370 res = nil
371 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200372 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100373 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200374 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100375 for {
376 node, err := srvN.Recv()
377 if err == io.EOF {
378 break
379 }
380 if err != nil {
381 return fmt.Errorf("GetNodes.Recv: %w", err)
382 }
383 res = append(res, node)
384 }
385 return nil
386 }, bo)
387 if err != nil {
388 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200389 }
390 return res, nil
391}
392
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200393// getNode wraps Management.GetNodes. It returns node information matching
394// given node ID.
395func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
396 nodes, err := getNodes(ctx, mgmt)
397 if err != nil {
398 return nil, fmt.Errorf("could not get nodes: %w", err)
399 }
400 for _, n := range nodes {
401 eid := identity.NodeID(n.Pubkey)
402 if eid != id {
403 continue
404 }
405 return n, nil
406 }
407 return nil, fmt.Errorf("no such node.")
408}
409
Serge Bazanski66e58952021-10-05 17:06:56 +0200410// Gets a random EUI-48 Ethernet MAC address
411func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
412 macBuf := make([]byte, 6)
413 _, err := rand.Read(macBuf)
414 if err != nil {
415 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
416 }
417
418 // Set U/L bit and clear I/G bit (locally administered individual MAC)
419 // Ref IEEE 802-2014 Section 8.2.2
420 macBuf[0] = (macBuf[0] | 2) & 0xfe
421 mac := net.HardwareAddr(macBuf)
422 return &mac, nil
423}
424
Serge Bazanskibe742842022-04-04 13:18:50 +0200425const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200426
Serge Bazanskibe742842022-04-04 13:18:50 +0200427// ClusterPorts contains all ports handled by Nanoswitch.
428var ClusterPorts = []uint16{
429 // Forwarded to the first node.
430 uint16(node.CuratorServicePort),
431 uint16(node.DebugServicePort),
432 uint16(node.KubernetesAPIPort),
433 uint16(node.KubernetesAPIWrappedPort),
434
435 // SOCKS proxy to the switch network
436 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200437}
438
439// ClusterOptions contains all options for launching a Metropolis cluster.
440type ClusterOptions struct {
441 // The number of nodes this cluster should be started with.
442 NumNodes int
443}
444
445// Cluster is the running Metropolis cluster launched using the LaunchCluster
446// function.
447type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200448 // Owner is the TLS Certificate of the owner of the test cluster. This can be
449 // used to authenticate further clients to the running cluster.
450 Owner tls.Certificate
451 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200452 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200453 Ports launch.PortMap
454
Serge Bazanskibe742842022-04-04 13:18:50 +0200455 // Nodes is a map from Node ID to its runtime information.
456 Nodes map[string]*NodeInCluster
457 // NodeIDs is a list of node IDs that are backing this cluster, in order of
458 // creation.
459 NodeIDs []string
460
Serge Bazanski66e58952021-10-05 17:06:56 +0200461 // nodesDone is a list of channels populated with the return codes from all the
462 // nodes' qemu instances. It's used by Close to ensure all nodes have
463 // succesfully been stopped.
464 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200465 // nodeOpts are the cluster member nodes' mutable launch options, kept here
466 // to facilitate reboots.
467 nodeOpts []NodeOptions
468 // launchDir points at the directory keeping the nodes' state, such as storage
469 // images, firmware variable files, TPM state.
470 launchDir string
471 // socketDir points at the directory keeping UNIX socket files, such as these
472 // used to facilitate communication between QEMU and swtpm. It's different
473 // from launchDir, and anchored nearer the file system root, due to the
474 // socket path length limitation imposed by the kernel.
475 socketDir string
476
Serge Bazanskibe742842022-04-04 13:18:50 +0200477 // socksDialer is used by DialNode to establish connections to nodes via the
478 // SOCKS server ran by nanoswitch.
479 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200480
481 // authClient is a cached authenticated owner connection to a Curator
482 // instance within the cluster.
483 authClient *grpc.ClientConn
484
485 // ctxT is the context individual node contexts are created from.
486 ctxT context.Context
487 // ctxC is used by Close to cancel the context under which the nodes are
488 // running.
489 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200490}
491
492// NodeInCluster represents information about a node that's part of a Cluster.
493type NodeInCluster struct {
494 // ID of the node, which can be used to dial this node's services via DialNode.
495 ID string
496 // Address of the node on the network ran by nanoswitch. Not reachable from the
497 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
498 // on Cluster.Ports[SOCKSPort]).
499 ManagementAddress string
500}
501
502// firstConnection performs the initial owner credential escrow with a newly
503// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
504// running at 10.1.0.2, which is always the case with the current nanoswitch
505// implementation.
506//
507// It returns the newly escrowed credentials as well as the firt node's
508// information as NodeInCluster.
509func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
510 // Dial external service.
511 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
512 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
513 if err != nil {
514 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
515 }
516 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
517 return socksDialer.Dial("tcp", addr)
518 }
519 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
520 if err != nil {
521 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
522 }
523 defer initClient.Close()
524
525 // Retrieve owner certificate - this can take a while because the node is still
526 // coming up, so do it in a backoff loop.
527 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
528 aaa := apb.NewAAAClient(initClient)
529 var cert *tls.Certificate
530 err = backoff.Retry(func() error {
531 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
532 if st, ok := status.FromError(err); ok {
533 if st.Code() == codes.Unavailable {
534 return err
535 }
536 }
537 return backoff.Permanent(err)
538 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
539 if err != nil {
540 return nil, nil, err
541 }
542 log.Printf("Cluster: retrieved owner certificate.")
543
544 // Now connect authenticated and get the node ID.
545 creds := rpc.NewAuthenticatedCredentials(*cert, nil)
546 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
547 if err != nil {
548 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
549 }
550 defer authClient.Close()
551 mgmt := apb.NewManagementClient(authClient)
552
553 var node *NodeInCluster
554 err = backoff.Retry(func() error {
555 nodes, err := getNodes(ctx, mgmt)
556 if err != nil {
557 return fmt.Errorf("retrieving nodes failed: %w", err)
558 }
559 if len(nodes) != 1 {
560 return fmt.Errorf("expected one node, got %d", len(nodes))
561 }
562 n := nodes[0]
563 if n.Status == nil || n.Status.ExternalAddress == "" {
564 return fmt.Errorf("node has no status and/or address")
565 }
566 node = &NodeInCluster{
567 ID: identity.NodeID(n.Pubkey),
568 ManagementAddress: n.Status.ExternalAddress,
569 }
570 return nil
571 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
572 if err != nil {
573 return nil, nil, err
574 }
575
576 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200577}
578
579// LaunchCluster launches a cluster of Metropolis node VMs together with a
580// Nanoswitch instance to network them all together.
581//
582// The given context will be used to run all qemu instances in the cluster, and
583// canceling the context or calling Close() will terminate them.
584func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200585 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200586 return nil, errors.New("refusing to start cluster with zero nodes")
587 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200588
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200589 // Create the launch directory.
590 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster*")
591 if err != nil {
592 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
593 }
594 // Create the socket directory.
595 sd, err := os.MkdirTemp("/tmp", "cluster*")
596 if err != nil {
597 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
598 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200599
600 // Prepare links between nodes and nanoswitch.
601 var switchPorts []*os.File
602 var vmPorts []*os.File
603 for i := 0; i < opts.NumNodes; i++ {
604 switchPort, vmPort, err := launch.NewSocketPair()
605 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200606 return nil, fmt.Errorf("failed to get socketpair: %w", err)
607 }
608 switchPorts = append(switchPorts, switchPort)
609 vmPorts = append(vmPorts, vmPort)
610 }
611
Serge Bazanskie78a0892021-10-07 17:03:49 +0200612 // Make a list of channels that will be populated by all running node qemu
613 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200614 done := make([]chan error, opts.NumNodes)
615 for i, _ := range done {
616 done[i] = make(chan error, 1)
617 }
618
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200619 // Prepare the node options. These will be kept as part of Cluster.
620 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
621 // launch. The runtime information can be later used to restart a node.
622 // The 0th node will be initialized first. The rest will follow after it
623 // had bootstrapped the cluster.
624 nodeOpts := make([]NodeOptions, opts.NumNodes)
625 nodeOpts[0] = NodeOptions{
626 ConnectToSocket: vmPorts[0],
627 NodeParameters: &apb.NodeParameters{
628 Cluster: &apb.NodeParameters_ClusterBootstrap_{
629 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
630 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200631 },
632 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200633 },
634 SerialPort: newPrefixedStdio(0),
635 }
636
637 // Start the first node.
638 ctxT, ctxC := context.WithCancel(ctx)
639 log.Printf("Cluster: Starting node %d...", 1)
640 go func() {
641 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200642 if err != nil {
643 log.Printf("Node %d finished with an error: %v", 1, err)
644 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200645 done[0] <- err
646 }()
647
Serge Bazanskie78a0892021-10-07 17:03:49 +0200648 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200649 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
650 if err != nil {
651 ctxC()
652 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
653 }
654
655 go func() {
656 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
657 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100658 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200659 ExtraNetworkInterfaces: switchPorts,
660 PortMap: portMap,
Serge Bazanski1fbc5972022-06-22 13:36:16 +0200661 SerialPort: newPrefixedStdio(99),
Serge Bazanski66e58952021-10-05 17:06:56 +0200662 }); err != nil {
663 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100664 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200665 }
666 }
667 }()
668
Serge Bazanskibe742842022-04-04 13:18:50 +0200669 // Build SOCKS dialer.
670 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
671 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200672 if err != nil {
673 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200674 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200675 }
676
Serge Bazanskibe742842022-04-04 13:18:50 +0200677 // Retrieve owner credentials and first node.
678 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200679 if err != nil {
680 ctxC()
681 return nil, err
682 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200683
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200684 // Set up a partially initialized cluster instance, to be filled in in the
685 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200686 cluster := &Cluster{
687 Owner: *cert,
688 Ports: portMap,
689 Nodes: map[string]*NodeInCluster{
690 firstNode.ID: firstNode,
691 },
692 NodeIDs: []string{
693 firstNode.ID,
694 },
695
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200696 nodesDone: done,
697 nodeOpts: nodeOpts,
698 launchDir: ld,
699 socketDir: sd,
700
Serge Bazanskibe742842022-04-04 13:18:50 +0200701 socksDialer: socksDialer,
702
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200703 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200704 ctxC: ctxC,
705 }
706
707 // Now start the rest of the nodes and register them into the cluster.
708
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200709 // Get an authenticated owner client within the cluster.
710 curC, err := cluster.curatorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200711 if err != nil {
712 ctxC()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200713 return nil, fmt.Errorf("curatorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200714 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200715 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200716
717 // Retrieve register ticket to register further nodes.
718 log.Printf("Cluster: retrieving register ticket...")
719 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
720 if err != nil {
721 ctxC()
722 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
723 }
724 ticket := resT.Ticket
725 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
726
727 // Retrieve cluster info (for directory and ca public key) to register further
728 // nodes.
729 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
730 if err != nil {
731 ctxC()
732 return nil, fmt.Errorf("GetClusterInfo: %w", err)
733 }
734
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200735 // Use the retrieved information to configure the rest of the node options.
736 for i := 1; i < opts.NumNodes; i++ {
737 nodeOpts[i] = NodeOptions{
738 ConnectToSocket: vmPorts[i],
739 NodeParameters: &apb.NodeParameters{
740 Cluster: &apb.NodeParameters_ClusterRegister_{
741 ClusterRegister: &apb.NodeParameters_ClusterRegister{
742 RegisterTicket: ticket,
743 ClusterDirectory: resI.ClusterDirectory,
744 CaCertificate: resI.CaCertificate,
745 },
746 },
747 },
748 SerialPort: newPrefixedStdio(i),
749 }
750 }
751
752 // Now run the rest of the nodes.
753 //
Serge Bazanskie78a0892021-10-07 17:03:49 +0200754 // TODO(q3k): parallelize this
755 for i := 1; i < opts.NumNodes; i++ {
756 log.Printf("Cluster: Starting node %d...", i+1)
757 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200758 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200759 if err != nil {
760 log.Printf("Node %d finished with an error: %v", i, err)
761 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200762 done[i] <- err
763 }(i)
764 var newNode *apb.Node
765
766 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
767 for {
768 nodes, err := getNodes(ctx, mgmt)
769 if err != nil {
770 ctxC()
771 return nil, fmt.Errorf("could not get nodes: %w", err)
772 }
773 for _, n := range nodes {
774 if n.State == cpb.NodeState_NODE_STATE_NEW {
775 newNode = n
776 break
777 }
778 }
779 if newNode != nil {
780 break
781 }
782 time.Sleep(1 * time.Second)
783 }
784 id := identity.NodeID(newNode.Pubkey)
785 log.Printf("Cluster: node %d is %s", i, id)
786
787 log.Printf("Cluster: approving node %d", i)
788 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
789 Pubkey: newNode.Pubkey,
790 })
791 if err != nil {
792 ctxC()
793 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
794 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200795 log.Printf("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200796 for {
797 nodes, err := getNodes(ctx, mgmt)
798 if err != nil {
799 ctxC()
800 return nil, fmt.Errorf("could not get nodes: %w", err)
801 }
802 found := false
803 for _, n := range nodes {
804 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
805 continue
806 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200807 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200808 break
809 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200810 if n.State != cpb.NodeState_NODE_STATE_UP {
811 break
812 }
813 found = true
814 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
815 ID: identity.NodeID(n.Pubkey),
816 ManagementAddress: n.Status.ExternalAddress,
817 }
818 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
819 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200820 }
821 if found {
822 break
823 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200824 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200825 }
826 log.Printf("Cluster: node %d (%s) UP!", i, id)
827 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200828
Serge Bazanskibe742842022-04-04 13:18:50 +0200829 log.Printf("Cluster: all nodes up:")
830 for _, node := range cluster.Nodes {
831 log.Printf("Cluster: - %s at %s", node.ID, node.ManagementAddress)
832 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200833
Serge Bazanskibe742842022-04-04 13:18:50 +0200834 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200835}
836
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200837// RebootNode reboots the cluster member node matching the given index, and
838// waits for it to rejoin the cluster. It will use the given context ctx to run
839// cluster API requests, whereas the resulting QEMU process will be created
840// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
841func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
842 if idx < 0 || idx >= len(c.NodeIDs) {
843 return fmt.Errorf("index out of bounds.")
844 }
845 id := c.NodeIDs[idx]
846
847 // Get an authenticated owner client within the cluster.
848 curC, err := c.curatorClient()
849 if err != nil {
850 return err
851 }
852 mgmt := apb.NewManagementClient(curC)
853
854 // Get the timestamp of the node's last update, as observed by Curator.
855 // It'll be needed to make sure it had rejoined the cluster after the reboot.
856 var is *apb.Node
857 for {
858 r, err := getNode(ctx, mgmt, id)
859 if err != nil {
860 return err
861 }
862
863 // Node status may be absent if it hasn't reported to the cluster yet. Wait
864 // for it to appear before progressing further.
865 if r.Status != nil {
866 is = r
867 break
868 }
869 time.Sleep(time.Second)
870 }
871
872 // Cancel the node's context. This will shut down QEMU.
873 c.nodeOpts[idx].Runtime.CtxC()
874 log.Printf("Cluster: waiting for node %d (%s) to stop.", idx, id)
875 err = <-c.nodesDone[idx]
876 if err != nil {
877 return fmt.Errorf("while restarting node: %w", err)
878 }
879
880 // Start QEMU again.
881 log.Printf("Cluster: restarting node %d (%s).", idx, id)
882 go func(n int) {
883 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200884 if err != nil {
885 log.Printf("Node %d finished with an error: %v", n, err)
886 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200887 c.nodesDone[n] <- err
888 }(idx)
889
890 // Poll Management.GetNodes until the node's timestamp is updated.
891 for {
892 cs, err := getNode(ctx, mgmt, id)
893 if err != nil {
894 return err
895 }
896 if cs.Status == nil {
897 continue
898 }
899 if cs.Status.Timestamp > is.Status.Timestamp {
900 break
901 }
902 time.Sleep(time.Second)
903 }
904 log.Printf("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
905 return nil
906}
907
908// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +0200909// nodes to stop. It returns an error if stopping the nodes failed, or one of
910// the nodes failed to fully start in the first place.
911func (c *Cluster) Close() error {
912 log.Printf("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200913 if c.authClient != nil {
914 c.authClient.Close()
915 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200916 c.ctxC()
917
918 var errors []error
919 log.Printf("Cluster: waiting for nodes to exit...")
920 for _, c := range c.nodesDone {
921 err := <-c
922 if err != nil {
923 errors = append(errors, err)
924 }
925 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200926 log.Printf("Cluster: removing nodes' state files.")
927 os.RemoveAll(c.launchDir)
928 os.RemoveAll(c.socketDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200929 log.Printf("Cluster: done")
930 return multierr.Combine(errors...)
931}
Serge Bazanskibe742842022-04-04 13:18:50 +0200932
933// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
934// their ID. This is performed by connecting to the cluster nanoswitch via its
935// SOCKS proxy, and using the cluster node list for name resolution.
936//
937// For example:
938//
939// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
940//
941func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
942 host, port, err := net.SplitHostPort(addr)
943 if err != nil {
944 return nil, fmt.Errorf("invalid host:port: %w", err)
945 }
946 // Already an IP address?
947 if net.ParseIP(host) != nil {
948 return c.socksDialer.Dial("tcp", addr)
949 }
950
951 // Otherwise, expect a node name.
952 node, ok := c.Nodes[host]
953 if !ok {
954 return nil, fmt.Errorf("unknown node %q", host)
955 }
956 addr = net.JoinHostPort(node.ManagementAddress, port)
957 return c.socksDialer.Dial("tcp", addr)
958}