blob: a25e9921796ba3a41ddf64a636581e5ec0505d36 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020024 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020025 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "google.golang.org/protobuf/proto"
30
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020031 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020032 "source.monogon.dev/metropolis/node"
Serge Bazanskibe742842022-04-04 13:18:50 +020033 common "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020034 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020035 "source.monogon.dev/metropolis/node/core/rpc"
36 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020037 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020038 "source.monogon.dev/metropolis/test/launch"
39)
40
41// Options contains all options that can be passed to Launch()
42type NodeOptions struct {
43 // Ports contains the port mapping where to expose the internal ports of the VM to
44 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
45 // ConnectToSocket is set.
46 Ports launch.PortMap
47
48 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
49 // command. Metropolis nodes generally restarts on almost all errors, so unless you
50 // want to test reboot behavior this should be false.
51 AllowReboot bool
52
53 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
54 // it is instead connected to the given file descriptor/socket. If this is set, all
55 // port maps from the Ports option are ignored. Intended for networking this
56 // instance together with others for running more complex network configurations.
57 ConnectToSocket *os.File
58
59 // SerialPort is a io.ReadWriter over which you can communicate with the serial
60 // port of the machine It can be set to an existing file descriptor (like
61 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
62 SerialPort io.ReadWriter
63
64 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
65 // registering into a cluster.
66 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020067
68 // Mac is the node's MAC address.
69 Mac *net.HardwareAddr
70
71 // Runtime keeps the node's QEMU runtime state.
72 Runtime *NodeRuntime
73}
74
75// Runtime keeps the node's QEMU runtime options.
76type NodeRuntime struct {
77 // ld points at the node's launch directory storing data such as storage
78 // images, firmware variables or the TPM state.
79 ld string
80 // sd points at the node's socket directory.
81 sd string
82
83 // ctxT is the context QEMU will execute in.
84 ctxT context.Context
85 // CtxC is the QEMU context's cancellation function.
86 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +020087}
88
89// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020090var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020091 node.ConsensusPort,
92
93 node.CuratorServicePort,
94 node.DebugServicePort,
95
96 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +010097 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020098 node.CuratorServicePort,
99 node.DebuggerPort,
100}
101
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200102// setupRuntime creates the node's QEMU runtime directory, together with all
103// files required to preserve its state, a level below the chosen path ld. The
104// node's socket directory is similarily created a level below sd. It may
105// return an I/O error.
106func setupRuntime(ld, sd string) (*NodeRuntime, error) {
107 // Create a temporary directory to keep all the runtime files.
108 stdp, err := os.MkdirTemp(ld, "node_state*")
109 if err != nil {
110 return nil, fmt.Errorf("failed to create the state directory: %w", err)
111 }
112
113 // Initialize the node's storage with a prebuilt image.
114 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
115 if err != nil {
116 return nil, fmt.Errorf("while resolving a path: %w", err)
117 }
118 di := filepath.Join(stdp, filepath.Base(si))
119 log.Printf("Cluster: copying the node image: %s -> %s", si, di)
120 if err := copyFile(si, di); err != nil {
121 return nil, fmt.Errorf("while copying the node image: %w", err)
122 }
123
124 // Initialize the OVMF firmware variables file.
125 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
126 if err != nil {
127 return nil, fmt.Errorf("while resolving a path: %w", err)
128 }
129 dv := filepath.Join(stdp, filepath.Base(sv))
130 if err := copyFile(sv, dv); err != nil {
131 return nil, fmt.Errorf("while copying firmware variables: %w", err)
132 }
133
134 // Create the TPM state directory and initialize all files required by swtpm.
135 tpmt := filepath.Join(stdp, "tpm")
136 if err := os.Mkdir(tpmt, 0755); err != nil {
137 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
138 }
139 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
140 if err != nil {
141 return nil, fmt.Errorf("while resolving a path: %w", err)
142 }
143 tpmf, err := os.ReadDir(tpms)
144 if err != nil {
145 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
146 }
147 for _, file := range tpmf {
148 name := file.Name()
149 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
150 if err != nil {
151 return nil, fmt.Errorf("while resolving a path: %w", err)
152 }
153 tgt := filepath.Join(tpmt, name)
154 if err := copyFile(src, tgt); err != nil {
155 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
156 }
157 }
158
159 // Create the socket directory.
160 sotdp, err := os.MkdirTemp(sd, "node_sock*")
161 if err != nil {
162 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
163 }
164
165 return &NodeRuntime{
166 ld: stdp,
167 sd: sotdp,
168 }, nil
169}
170
171// curatorClient returns an authenticated owner connection to a Curator
172// instance within Cluster c, or nil together with an error.
173func (c *Cluster) curatorClient() (*grpc.ClientConn, error) {
174 if c.authClient == nil {
175 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, nil)
176 remote := net.JoinHostPort(c.NodeIDs[0], common.CuratorServicePort.PortString())
177 authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds), grpc.WithContextDialer(c.DialNode))
178 if err != nil {
179 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
180 }
181 c.authClient = authClient
182 }
183 return c.authClient, nil
184}
185
Serge Bazanski66e58952021-10-05 17:06:56 +0200186// LaunchNode launches a single Metropolis node instance with the given options.
187// The instance runs mostly paravirtualized but with some emulated hardware
188// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200189// writable, and the changes are kept across reboots and shutdowns. ld and sd
190// point to the launch directory and the socket directory, holding the nodes'
191// state files (storage, tpm state, firmware state), and UNIX socket files
192// (swtpm <-> QEMU interplay) respectively. The directories must exist before
193// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
194// if either are not initialized.
195func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
196 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
197 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200198 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
199 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
200 // that we open and pass the name of it to QEMU. Not pinning this crashes both
201 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
202 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200203
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200204 // If it's the node's first start, set up its runtime directories.
205 if options.Runtime == nil {
206 r, err := setupRuntime(ld, sd)
207 if err != nil {
208 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200209 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200210 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200211 }
212
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200213 // Replace the node's context with a new one.
214 r := options.Runtime
215 if r.CtxC != nil {
216 r.CtxC()
217 }
218 r.ctxT, r.CtxC = context.WithCancel(ctx)
219
Serge Bazanski66e58952021-10-05 17:06:56 +0200220 var qemuNetType string
221 var qemuNetConfig launch.QemuValue
222 if options.ConnectToSocket != nil {
223 qemuNetType = "socket"
224 qemuNetConfig = launch.QemuValue{
225 "id": {"net0"},
226 "fd": {"3"},
227 }
228 } else {
229 qemuNetType = "user"
230 qemuNetConfig = launch.QemuValue{
231 "id": {"net0"},
232 "net": {"10.42.0.0/24"},
233 "dhcpstart": {"10.42.0.10"},
234 "hostfwd": options.Ports.ToQemuForwards(),
235 }
236 }
237
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200238 // Generate the node's MAC address if it isn't already set in NodeOptions.
239 if options.Mac == nil {
240 mac, err := generateRandomEthernetMAC()
241 if err != nil {
242 return err
243 }
244 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200245 }
246
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200247 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
248 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
249 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200250 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
251 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
252 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200253 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
254 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200255 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200256 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200257 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
258 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
259 "-device", "tpm-tis,tpmdev=tpm0",
260 "-device", "virtio-rng-pci",
261 "-serial", "stdio"}
262
263 if !options.AllowReboot {
264 qemuArgs = append(qemuArgs, "-no-reboot")
265 }
266
267 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200268 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200269 parametersRaw, err := proto.Marshal(options.NodeParameters)
270 if err != nil {
271 return fmt.Errorf("failed to encode node paraeters: %w", err)
272 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100273 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200274 return fmt.Errorf("failed to write node parameters: %w", err)
275 }
276 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
277 }
278
279 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200280 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200281 defer tpmCancel()
282
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200283 tpmd := filepath.Join(r.ld, "tpm")
284 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200285 tpmEmuCmd.Stderr = os.Stderr
286 tpmEmuCmd.Stdout = os.Stdout
287
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200288 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200289 if err != nil {
290 return fmt.Errorf("failed to start TPM emulator: %w", err)
291 }
292
293 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200294 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200295 if options.ConnectToSocket != nil {
296 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
297 }
298
299 var stdErrBuf bytes.Buffer
300 systemCmd.Stderr = &stdErrBuf
301 systemCmd.Stdout = options.SerialPort
302
303 err = systemCmd.Run()
304
305 // Stop TPM emulator and wait for it to exit to properly reap the child process
306 tpmCancel()
307 log.Print("Node: Waiting for TPM emulator to exit")
308 // Wait returns a SIGKILL error because we just cancelled its context.
309 // We still need to call it to avoid creating zombies.
310 _ = tpmEmuCmd.Wait()
311 log.Print("Node: TPM emulator done")
312
313 var exerr *exec.ExitError
314 if err != nil && errors.As(err, &exerr) {
315 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
316 if status.Signaled() && status.Signal() == syscall.SIGKILL {
317 // Process was killed externally (most likely by our context being canceled).
318 // This is a normal exit for us, so return nil
319 return nil
320 }
321 exerr.Stderr = stdErrBuf.Bytes()
322 newErr := launch.QEMUError(*exerr)
323 return &newErr
324 }
325 return err
326}
327
328func copyFile(src, dst string) error {
329 in, err := os.Open(src)
330 if err != nil {
331 return fmt.Errorf("when opening source: %w", err)
332 }
333 defer in.Close()
334
335 out, err := os.Create(dst)
336 if err != nil {
337 return fmt.Errorf("when creating destination: %w", err)
338 }
339 defer out.Close()
340
341 _, err = io.Copy(out, in)
342 if err != nil {
343 return fmt.Errorf("when copying file: %w", err)
344 }
345 return out.Close()
346}
347
Serge Bazanskie78a0892021-10-07 17:03:49 +0200348// getNodes wraps around Management.GetNodes to return a list of nodes in a
349// cluster.
350func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200351 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100352 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100353 err := backoff.Retry(func() error {
354 res = nil
355 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200356 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100357 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200358 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100359 for {
360 node, err := srvN.Recv()
361 if err == io.EOF {
362 break
363 }
364 if err != nil {
365 return fmt.Errorf("GetNodes.Recv: %w", err)
366 }
367 res = append(res, node)
368 }
369 return nil
370 }, bo)
371 if err != nil {
372 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200373 }
374 return res, nil
375}
376
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200377// getNode wraps Management.GetNodes. It returns node information matching
378// given node ID.
379func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
380 nodes, err := getNodes(ctx, mgmt)
381 if err != nil {
382 return nil, fmt.Errorf("could not get nodes: %w", err)
383 }
384 for _, n := range nodes {
385 eid := identity.NodeID(n.Pubkey)
386 if eid != id {
387 continue
388 }
389 return n, nil
390 }
391 return nil, fmt.Errorf("no such node.")
392}
393
Serge Bazanski66e58952021-10-05 17:06:56 +0200394// Gets a random EUI-48 Ethernet MAC address
395func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
396 macBuf := make([]byte, 6)
397 _, err := rand.Read(macBuf)
398 if err != nil {
399 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
400 }
401
402 // Set U/L bit and clear I/G bit (locally administered individual MAC)
403 // Ref IEEE 802-2014 Section 8.2.2
404 macBuf[0] = (macBuf[0] | 2) & 0xfe
405 mac := net.HardwareAddr(macBuf)
406 return &mac, nil
407}
408
Serge Bazanskibe742842022-04-04 13:18:50 +0200409const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200410
Serge Bazanskibe742842022-04-04 13:18:50 +0200411// ClusterPorts contains all ports handled by Nanoswitch.
412var ClusterPorts = []uint16{
413 // Forwarded to the first node.
414 uint16(node.CuratorServicePort),
415 uint16(node.DebugServicePort),
416 uint16(node.KubernetesAPIPort),
417 uint16(node.KubernetesAPIWrappedPort),
418
419 // SOCKS proxy to the switch network
420 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200421}
422
423// ClusterOptions contains all options for launching a Metropolis cluster.
424type ClusterOptions struct {
425 // The number of nodes this cluster should be started with.
426 NumNodes int
427}
428
429// Cluster is the running Metropolis cluster launched using the LaunchCluster
430// function.
431type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200432 // Owner is the TLS Certificate of the owner of the test cluster. This can be
433 // used to authenticate further clients to the running cluster.
434 Owner tls.Certificate
435 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200436 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200437 Ports launch.PortMap
438
Serge Bazanskibe742842022-04-04 13:18:50 +0200439 // Nodes is a map from Node ID to its runtime information.
440 Nodes map[string]*NodeInCluster
441 // NodeIDs is a list of node IDs that are backing this cluster, in order of
442 // creation.
443 NodeIDs []string
444
Serge Bazanski66e58952021-10-05 17:06:56 +0200445 // nodesDone is a list of channels populated with the return codes from all the
446 // nodes' qemu instances. It's used by Close to ensure all nodes have
447 // succesfully been stopped.
448 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200449 // nodeOpts are the cluster member nodes' mutable launch options, kept here
450 // to facilitate reboots.
451 nodeOpts []NodeOptions
452 // launchDir points at the directory keeping the nodes' state, such as storage
453 // images, firmware variable files, TPM state.
454 launchDir string
455 // socketDir points at the directory keeping UNIX socket files, such as these
456 // used to facilitate communication between QEMU and swtpm. It's different
457 // from launchDir, and anchored nearer the file system root, due to the
458 // socket path length limitation imposed by the kernel.
459 socketDir string
460
Serge Bazanskibe742842022-04-04 13:18:50 +0200461 // socksDialer is used by DialNode to establish connections to nodes via the
462 // SOCKS server ran by nanoswitch.
463 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200464
465 // authClient is a cached authenticated owner connection to a Curator
466 // instance within the cluster.
467 authClient *grpc.ClientConn
468
469 // ctxT is the context individual node contexts are created from.
470 ctxT context.Context
471 // ctxC is used by Close to cancel the context under which the nodes are
472 // running.
473 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200474}
475
476// NodeInCluster represents information about a node that's part of a Cluster.
477type NodeInCluster struct {
478 // ID of the node, which can be used to dial this node's services via DialNode.
479 ID string
480 // Address of the node on the network ran by nanoswitch. Not reachable from the
481 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
482 // on Cluster.Ports[SOCKSPort]).
483 ManagementAddress string
484}
485
486// firstConnection performs the initial owner credential escrow with a newly
487// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
488// running at 10.1.0.2, which is always the case with the current nanoswitch
489// implementation.
490//
491// It returns the newly escrowed credentials as well as the firt node's
492// information as NodeInCluster.
493func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
494 // Dial external service.
495 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
496 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
497 if err != nil {
498 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
499 }
500 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
501 return socksDialer.Dial("tcp", addr)
502 }
503 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
504 if err != nil {
505 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
506 }
507 defer initClient.Close()
508
509 // Retrieve owner certificate - this can take a while because the node is still
510 // coming up, so do it in a backoff loop.
511 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
512 aaa := apb.NewAAAClient(initClient)
513 var cert *tls.Certificate
514 err = backoff.Retry(func() error {
515 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
516 if st, ok := status.FromError(err); ok {
517 if st.Code() == codes.Unavailable {
518 return err
519 }
520 }
521 return backoff.Permanent(err)
522 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
523 if err != nil {
524 return nil, nil, err
525 }
526 log.Printf("Cluster: retrieved owner certificate.")
527
528 // Now connect authenticated and get the node ID.
529 creds := rpc.NewAuthenticatedCredentials(*cert, nil)
530 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
531 if err != nil {
532 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
533 }
534 defer authClient.Close()
535 mgmt := apb.NewManagementClient(authClient)
536
537 var node *NodeInCluster
538 err = backoff.Retry(func() error {
539 nodes, err := getNodes(ctx, mgmt)
540 if err != nil {
541 return fmt.Errorf("retrieving nodes failed: %w", err)
542 }
543 if len(nodes) != 1 {
544 return fmt.Errorf("expected one node, got %d", len(nodes))
545 }
546 n := nodes[0]
547 if n.Status == nil || n.Status.ExternalAddress == "" {
548 return fmt.Errorf("node has no status and/or address")
549 }
550 node = &NodeInCluster{
551 ID: identity.NodeID(n.Pubkey),
552 ManagementAddress: n.Status.ExternalAddress,
553 }
554 return nil
555 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
556 if err != nil {
557 return nil, nil, err
558 }
559
560 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200561}
562
563// LaunchCluster launches a cluster of Metropolis node VMs together with a
564// Nanoswitch instance to network them all together.
565//
566// The given context will be used to run all qemu instances in the cluster, and
567// canceling the context or calling Close() will terminate them.
568func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200569 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200570 return nil, errors.New("refusing to start cluster with zero nodes")
571 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200572
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200573 // Create the launch directory.
574 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster*")
575 if err != nil {
576 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
577 }
578 // Create the socket directory.
579 sd, err := os.MkdirTemp("/tmp", "cluster*")
580 if err != nil {
581 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
582 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200583
584 // Prepare links between nodes and nanoswitch.
585 var switchPorts []*os.File
586 var vmPorts []*os.File
587 for i := 0; i < opts.NumNodes; i++ {
588 switchPort, vmPort, err := launch.NewSocketPair()
589 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200590 return nil, fmt.Errorf("failed to get socketpair: %w", err)
591 }
592 switchPorts = append(switchPorts, switchPort)
593 vmPorts = append(vmPorts, vmPort)
594 }
595
Serge Bazanskie78a0892021-10-07 17:03:49 +0200596 // Make a list of channels that will be populated by all running node qemu
597 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200598 done := make([]chan error, opts.NumNodes)
599 for i, _ := range done {
600 done[i] = make(chan error, 1)
601 }
602
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200603 // Prepare the node options. These will be kept as part of Cluster.
604 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
605 // launch. The runtime information can be later used to restart a node.
606 // The 0th node will be initialized first. The rest will follow after it
607 // had bootstrapped the cluster.
608 nodeOpts := make([]NodeOptions, opts.NumNodes)
609 nodeOpts[0] = NodeOptions{
610 ConnectToSocket: vmPorts[0],
611 NodeParameters: &apb.NodeParameters{
612 Cluster: &apb.NodeParameters_ClusterBootstrap_{
613 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
614 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200615 },
616 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200617 },
618 SerialPort: newPrefixedStdio(0),
619 }
620
621 // Start the first node.
622 ctxT, ctxC := context.WithCancel(ctx)
623 log.Printf("Cluster: Starting node %d...", 1)
624 go func() {
625 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Serge Bazanski66e58952021-10-05 17:06:56 +0200626 done[0] <- err
627 }()
628
Serge Bazanskie78a0892021-10-07 17:03:49 +0200629 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200630 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
631 if err != nil {
632 ctxC()
633 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
634 }
635
636 go func() {
637 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
638 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100639 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200640 ExtraNetworkInterfaces: switchPorts,
641 PortMap: portMap,
642 }); err != nil {
643 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100644 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200645 }
646 }
647 }()
648
Serge Bazanskibe742842022-04-04 13:18:50 +0200649 // Build SOCKS dialer.
650 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
651 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200652 if err != nil {
653 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200654 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200655 }
656
Serge Bazanskibe742842022-04-04 13:18:50 +0200657 // Retrieve owner credentials and first node.
658 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200659 if err != nil {
660 ctxC()
661 return nil, err
662 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200663
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200664 // Set up a partially initialized cluster instance, to be filled in in the
665 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200666 cluster := &Cluster{
667 Owner: *cert,
668 Ports: portMap,
669 Nodes: map[string]*NodeInCluster{
670 firstNode.ID: firstNode,
671 },
672 NodeIDs: []string{
673 firstNode.ID,
674 },
675
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200676 nodesDone: done,
677 nodeOpts: nodeOpts,
678 launchDir: ld,
679 socketDir: sd,
680
Serge Bazanskibe742842022-04-04 13:18:50 +0200681 socksDialer: socksDialer,
682
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200683 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200684 ctxC: ctxC,
685 }
686
687 // Now start the rest of the nodes and register them into the cluster.
688
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200689 // Get an authenticated owner client within the cluster.
690 curC, err := cluster.curatorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200691 if err != nil {
692 ctxC()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200693 return nil, fmt.Errorf("curatorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200694 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200695 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200696
697 // Retrieve register ticket to register further nodes.
698 log.Printf("Cluster: retrieving register ticket...")
699 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
700 if err != nil {
701 ctxC()
702 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
703 }
704 ticket := resT.Ticket
705 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
706
707 // Retrieve cluster info (for directory and ca public key) to register further
708 // nodes.
709 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
710 if err != nil {
711 ctxC()
712 return nil, fmt.Errorf("GetClusterInfo: %w", err)
713 }
714
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200715 // Use the retrieved information to configure the rest of the node options.
716 for i := 1; i < opts.NumNodes; i++ {
717 nodeOpts[i] = NodeOptions{
718 ConnectToSocket: vmPorts[i],
719 NodeParameters: &apb.NodeParameters{
720 Cluster: &apb.NodeParameters_ClusterRegister_{
721 ClusterRegister: &apb.NodeParameters_ClusterRegister{
722 RegisterTicket: ticket,
723 ClusterDirectory: resI.ClusterDirectory,
724 CaCertificate: resI.CaCertificate,
725 },
726 },
727 },
728 SerialPort: newPrefixedStdio(i),
729 }
730 }
731
732 // Now run the rest of the nodes.
733 //
Serge Bazanskie78a0892021-10-07 17:03:49 +0200734 // TODO(q3k): parallelize this
735 for i := 1; i < opts.NumNodes; i++ {
736 log.Printf("Cluster: Starting node %d...", i+1)
737 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200738 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Serge Bazanskie78a0892021-10-07 17:03:49 +0200739 done[i] <- err
740 }(i)
741 var newNode *apb.Node
742
743 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
744 for {
745 nodes, err := getNodes(ctx, mgmt)
746 if err != nil {
747 ctxC()
748 return nil, fmt.Errorf("could not get nodes: %w", err)
749 }
750 for _, n := range nodes {
751 if n.State == cpb.NodeState_NODE_STATE_NEW {
752 newNode = n
753 break
754 }
755 }
756 if newNode != nil {
757 break
758 }
759 time.Sleep(1 * time.Second)
760 }
761 id := identity.NodeID(newNode.Pubkey)
762 log.Printf("Cluster: node %d is %s", i, id)
763
764 log.Printf("Cluster: approving node %d", i)
765 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
766 Pubkey: newNode.Pubkey,
767 })
768 if err != nil {
769 ctxC()
770 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
771 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200772 log.Printf("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200773 for {
774 nodes, err := getNodes(ctx, mgmt)
775 if err != nil {
776 ctxC()
777 return nil, fmt.Errorf("could not get nodes: %w", err)
778 }
779 found := false
780 for _, n := range nodes {
781 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
782 continue
783 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200784 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200785 break
786 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200787 if n.State != cpb.NodeState_NODE_STATE_UP {
788 break
789 }
790 found = true
791 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
792 ID: identity.NodeID(n.Pubkey),
793 ManagementAddress: n.Status.ExternalAddress,
794 }
795 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
796 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200797 }
798 if found {
799 break
800 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200801 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200802 }
803 log.Printf("Cluster: node %d (%s) UP!", i, id)
804 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200805
Serge Bazanskibe742842022-04-04 13:18:50 +0200806 log.Printf("Cluster: all nodes up:")
807 for _, node := range cluster.Nodes {
808 log.Printf("Cluster: - %s at %s", node.ID, node.ManagementAddress)
809 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200810
Serge Bazanskibe742842022-04-04 13:18:50 +0200811 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200812}
813
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200814// RebootNode reboots the cluster member node matching the given index, and
815// waits for it to rejoin the cluster. It will use the given context ctx to run
816// cluster API requests, whereas the resulting QEMU process will be created
817// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
818func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
819 if idx < 0 || idx >= len(c.NodeIDs) {
820 return fmt.Errorf("index out of bounds.")
821 }
822 id := c.NodeIDs[idx]
823
824 // Get an authenticated owner client within the cluster.
825 curC, err := c.curatorClient()
826 if err != nil {
827 return err
828 }
829 mgmt := apb.NewManagementClient(curC)
830
831 // Get the timestamp of the node's last update, as observed by Curator.
832 // It'll be needed to make sure it had rejoined the cluster after the reboot.
833 var is *apb.Node
834 for {
835 r, err := getNode(ctx, mgmt, id)
836 if err != nil {
837 return err
838 }
839
840 // Node status may be absent if it hasn't reported to the cluster yet. Wait
841 // for it to appear before progressing further.
842 if r.Status != nil {
843 is = r
844 break
845 }
846 time.Sleep(time.Second)
847 }
848
849 // Cancel the node's context. This will shut down QEMU.
850 c.nodeOpts[idx].Runtime.CtxC()
851 log.Printf("Cluster: waiting for node %d (%s) to stop.", idx, id)
852 err = <-c.nodesDone[idx]
853 if err != nil {
854 return fmt.Errorf("while restarting node: %w", err)
855 }
856
857 // Start QEMU again.
858 log.Printf("Cluster: restarting node %d (%s).", idx, id)
859 go func(n int) {
860 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
861 c.nodesDone[n] <- err
862 }(idx)
863
864 // Poll Management.GetNodes until the node's timestamp is updated.
865 for {
866 cs, err := getNode(ctx, mgmt, id)
867 if err != nil {
868 return err
869 }
870 if cs.Status == nil {
871 continue
872 }
873 if cs.Status.Timestamp > is.Status.Timestamp {
874 break
875 }
876 time.Sleep(time.Second)
877 }
878 log.Printf("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
879 return nil
880}
881
882// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +0200883// nodes to stop. It returns an error if stopping the nodes failed, or one of
884// the nodes failed to fully start in the first place.
885func (c *Cluster) Close() error {
886 log.Printf("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200887 if c.authClient != nil {
888 c.authClient.Close()
889 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200890 c.ctxC()
891
892 var errors []error
893 log.Printf("Cluster: waiting for nodes to exit...")
894 for _, c := range c.nodesDone {
895 err := <-c
896 if err != nil {
897 errors = append(errors, err)
898 }
899 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200900 log.Printf("Cluster: removing nodes' state files.")
901 os.RemoveAll(c.launchDir)
902 os.RemoveAll(c.socketDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200903 log.Printf("Cluster: done")
904 return multierr.Combine(errors...)
905}
Serge Bazanskibe742842022-04-04 13:18:50 +0200906
907// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
908// their ID. This is performed by connecting to the cluster nanoswitch via its
909// SOCKS proxy, and using the cluster node list for name resolution.
910//
911// For example:
912//
913// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
914//
915func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
916 host, port, err := net.SplitHostPort(addr)
917 if err != nil {
918 return nil, fmt.Errorf("invalid host:port: %w", err)
919 }
920 // Already an IP address?
921 if net.ParseIP(host) != nil {
922 return c.socksDialer.Dial("tcp", addr)
923 }
924
925 // Otherwise, expect a node name.
926 node, ok := c.Nodes[host]
927 if !ok {
928 return nil, fmt.Errorf("unknown node %q", host)
929 }
930 addr = net.JoinHostPort(node.ManagementAddress, port)
931 return c.socksDialer.Dial("tcp", addr)
932}