blob: e40f7b839592be9b41c24288adbdcffa7c1747b0 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020024 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020025 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "google.golang.org/protobuf/proto"
30
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020031 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020032 "source.monogon.dev/metropolis/node"
Serge Bazanskibe742842022-04-04 13:18:50 +020033 common "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020034 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020035 "source.monogon.dev/metropolis/node/core/rpc"
36 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020037 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020038 "source.monogon.dev/metropolis/test/launch"
39)
40
41// Options contains all options that can be passed to Launch()
42type NodeOptions struct {
43 // Ports contains the port mapping where to expose the internal ports of the VM to
44 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
45 // ConnectToSocket is set.
46 Ports launch.PortMap
47
48 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
49 // command. Metropolis nodes generally restarts on almost all errors, so unless you
50 // want to test reboot behavior this should be false.
51 AllowReboot bool
52
53 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
54 // it is instead connected to the given file descriptor/socket. If this is set, all
55 // port maps from the Ports option are ignored. Intended for networking this
56 // instance together with others for running more complex network configurations.
57 ConnectToSocket *os.File
58
59 // SerialPort is a io.ReadWriter over which you can communicate with the serial
60 // port of the machine It can be set to an existing file descriptor (like
61 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
62 SerialPort io.ReadWriter
63
64 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
65 // registering into a cluster.
66 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020067
68 // Mac is the node's MAC address.
69 Mac *net.HardwareAddr
70
71 // Runtime keeps the node's QEMU runtime state.
72 Runtime *NodeRuntime
73}
74
75// Runtime keeps the node's QEMU runtime options.
76type NodeRuntime struct {
77 // ld points at the node's launch directory storing data such as storage
78 // images, firmware variables or the TPM state.
79 ld string
80 // sd points at the node's socket directory.
81 sd string
82
83 // ctxT is the context QEMU will execute in.
84 ctxT context.Context
85 // CtxC is the QEMU context's cancellation function.
86 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +020087}
88
89// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020090var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020091 node.ConsensusPort,
92
93 node.CuratorServicePort,
94 node.DebugServicePort,
95
96 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +010097 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020098 node.CuratorServicePort,
99 node.DebuggerPort,
100}
101
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200102// setupRuntime creates the node's QEMU runtime directory, together with all
103// files required to preserve its state, a level below the chosen path ld. The
104// node's socket directory is similarily created a level below sd. It may
105// return an I/O error.
106func setupRuntime(ld, sd string) (*NodeRuntime, error) {
107 // Create a temporary directory to keep all the runtime files.
108 stdp, err := os.MkdirTemp(ld, "node_state*")
109 if err != nil {
110 return nil, fmt.Errorf("failed to create the state directory: %w", err)
111 }
112
113 // Initialize the node's storage with a prebuilt image.
114 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
115 if err != nil {
116 return nil, fmt.Errorf("while resolving a path: %w", err)
117 }
118 di := filepath.Join(stdp, filepath.Base(si))
119 log.Printf("Cluster: copying the node image: %s -> %s", si, di)
120 if err := copyFile(si, di); err != nil {
121 return nil, fmt.Errorf("while copying the node image: %w", err)
122 }
123
124 // Initialize the OVMF firmware variables file.
125 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
126 if err != nil {
127 return nil, fmt.Errorf("while resolving a path: %w", err)
128 }
129 dv := filepath.Join(stdp, filepath.Base(sv))
130 if err := copyFile(sv, dv); err != nil {
131 return nil, fmt.Errorf("while copying firmware variables: %w", err)
132 }
133
134 // Create the TPM state directory and initialize all files required by swtpm.
135 tpmt := filepath.Join(stdp, "tpm")
136 if err := os.Mkdir(tpmt, 0755); err != nil {
137 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
138 }
139 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
140 if err != nil {
141 return nil, fmt.Errorf("while resolving a path: %w", err)
142 }
143 tpmf, err := os.ReadDir(tpms)
144 if err != nil {
145 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
146 }
147 for _, file := range tpmf {
148 name := file.Name()
149 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
150 if err != nil {
151 return nil, fmt.Errorf("while resolving a path: %w", err)
152 }
153 tgt := filepath.Join(tpmt, name)
154 if err := copyFile(src, tgt); err != nil {
155 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
156 }
157 }
158
159 // Create the socket directory.
160 sotdp, err := os.MkdirTemp(sd, "node_sock*")
161 if err != nil {
162 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
163 }
164
165 return &NodeRuntime{
166 ld: stdp,
167 sd: sotdp,
168 }, nil
169}
170
171// curatorClient returns an authenticated owner connection to a Curator
172// instance within Cluster c, or nil together with an error.
173func (c *Cluster) curatorClient() (*grpc.ClientConn, error) {
174 if c.authClient == nil {
175 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, nil)
176 remote := net.JoinHostPort(c.NodeIDs[0], common.CuratorServicePort.PortString())
177 authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds), grpc.WithContextDialer(c.DialNode))
178 if err != nil {
179 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
180 }
181 c.authClient = authClient
182 }
183 return c.authClient, nil
184}
185
Serge Bazanski66e58952021-10-05 17:06:56 +0200186// LaunchNode launches a single Metropolis node instance with the given options.
187// The instance runs mostly paravirtualized but with some emulated hardware
188// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200189// writable, and the changes are kept across reboots and shutdowns. ld and sd
190// point to the launch directory and the socket directory, holding the nodes'
191// state files (storage, tpm state, firmware state), and UNIX socket files
192// (swtpm <-> QEMU interplay) respectively. The directories must exist before
193// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
194// if either are not initialized.
195func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
196 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
197 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200198 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
199 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
200 // that we open and pass the name of it to QEMU. Not pinning this crashes both
201 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
202 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200203
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200204 // If it's the node's first start, set up its runtime directories.
205 if options.Runtime == nil {
206 r, err := setupRuntime(ld, sd)
207 if err != nil {
208 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200209 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200210 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200211 }
212
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200213 // Replace the node's context with a new one.
214 r := options.Runtime
215 if r.CtxC != nil {
216 r.CtxC()
217 }
218 r.ctxT, r.CtxC = context.WithCancel(ctx)
219
Serge Bazanski66e58952021-10-05 17:06:56 +0200220 var qemuNetType string
221 var qemuNetConfig launch.QemuValue
222 if options.ConnectToSocket != nil {
223 qemuNetType = "socket"
224 qemuNetConfig = launch.QemuValue{
225 "id": {"net0"},
226 "fd": {"3"},
227 }
228 } else {
229 qemuNetType = "user"
230 qemuNetConfig = launch.QemuValue{
231 "id": {"net0"},
232 "net": {"10.42.0.0/24"},
233 "dhcpstart": {"10.42.0.10"},
234 "hostfwd": options.Ports.ToQemuForwards(),
235 }
236 }
237
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200238 // Generate the node's MAC address if it isn't already set in NodeOptions.
239 if options.Mac == nil {
240 mac, err := generateRandomEthernetMAC()
241 if err != nil {
242 return err
243 }
244 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200245 }
246
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200247 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
248 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
249 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200250 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
251 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
252 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200253 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
254 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200255 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200256 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200257 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
258 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
259 "-device", "tpm-tis,tpmdev=tpm0",
260 "-device", "virtio-rng-pci",
261 "-serial", "stdio"}
262
263 if !options.AllowReboot {
264 qemuArgs = append(qemuArgs, "-no-reboot")
265 }
266
267 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200268 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200269 parametersRaw, err := proto.Marshal(options.NodeParameters)
270 if err != nil {
271 return fmt.Errorf("failed to encode node paraeters: %w", err)
272 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100273 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200274 return fmt.Errorf("failed to write node parameters: %w", err)
275 }
276 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
277 }
278
279 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200280 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200281 defer tpmCancel()
282
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200283 tpmd := filepath.Join(r.ld, "tpm")
284 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200285 tpmEmuCmd.Stderr = os.Stderr
286 tpmEmuCmd.Stdout = os.Stdout
287
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200288 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200289 if err != nil {
290 return fmt.Errorf("failed to start TPM emulator: %w", err)
291 }
292
293 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200294 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200295 if options.ConnectToSocket != nil {
296 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
297 }
298
299 var stdErrBuf bytes.Buffer
300 systemCmd.Stderr = &stdErrBuf
301 systemCmd.Stdout = options.SerialPort
302
303 err = systemCmd.Run()
304
305 // Stop TPM emulator and wait for it to exit to properly reap the child process
306 tpmCancel()
307 log.Print("Node: Waiting for TPM emulator to exit")
308 // Wait returns a SIGKILL error because we just cancelled its context.
309 // We still need to call it to avoid creating zombies.
310 _ = tpmEmuCmd.Wait()
311 log.Print("Node: TPM emulator done")
312
313 var exerr *exec.ExitError
314 if err != nil && errors.As(err, &exerr) {
315 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
316 if status.Signaled() && status.Signal() == syscall.SIGKILL {
317 // Process was killed externally (most likely by our context being canceled).
318 // This is a normal exit for us, so return nil
319 return nil
320 }
321 exerr.Stderr = stdErrBuf.Bytes()
322 newErr := launch.QEMUError(*exerr)
323 return &newErr
324 }
325 return err
326}
327
328func copyFile(src, dst string) error {
329 in, err := os.Open(src)
330 if err != nil {
331 return fmt.Errorf("when opening source: %w", err)
332 }
333 defer in.Close()
334
335 out, err := os.Create(dst)
336 if err != nil {
337 return fmt.Errorf("when creating destination: %w", err)
338 }
339 defer out.Close()
340
341 _, err = io.Copy(out, in)
342 if err != nil {
343 return fmt.Errorf("when copying file: %w", err)
344 }
345 return out.Close()
346}
347
Serge Bazanskie78a0892021-10-07 17:03:49 +0200348// getNodes wraps around Management.GetNodes to return a list of nodes in a
349// cluster.
350func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200351 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100352 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100353 err := backoff.Retry(func() error {
354 res = nil
355 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200356 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100357 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200358 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100359 for {
360 node, err := srvN.Recv()
361 if err == io.EOF {
362 break
363 }
364 if err != nil {
365 return fmt.Errorf("GetNodes.Recv: %w", err)
366 }
367 res = append(res, node)
368 }
369 return nil
370 }, bo)
371 if err != nil {
372 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200373 }
374 return res, nil
375}
376
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200377// getNode wraps Management.GetNodes. It returns node information matching
378// given node ID.
379func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
380 nodes, err := getNodes(ctx, mgmt)
381 if err != nil {
382 return nil, fmt.Errorf("could not get nodes: %w", err)
383 }
384 for _, n := range nodes {
385 eid := identity.NodeID(n.Pubkey)
386 if eid != id {
387 continue
388 }
389 return n, nil
390 }
391 return nil, fmt.Errorf("no such node.")
392}
393
Serge Bazanski66e58952021-10-05 17:06:56 +0200394// Gets a random EUI-48 Ethernet MAC address
395func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
396 macBuf := make([]byte, 6)
397 _, err := rand.Read(macBuf)
398 if err != nil {
399 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
400 }
401
402 // Set U/L bit and clear I/G bit (locally administered individual MAC)
403 // Ref IEEE 802-2014 Section 8.2.2
404 macBuf[0] = (macBuf[0] | 2) & 0xfe
405 mac := net.HardwareAddr(macBuf)
406 return &mac, nil
407}
408
Serge Bazanskibe742842022-04-04 13:18:50 +0200409const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200410
Serge Bazanskibe742842022-04-04 13:18:50 +0200411// ClusterPorts contains all ports handled by Nanoswitch.
412var ClusterPorts = []uint16{
413 // Forwarded to the first node.
414 uint16(node.CuratorServicePort),
415 uint16(node.DebugServicePort),
416 uint16(node.KubernetesAPIPort),
417 uint16(node.KubernetesAPIWrappedPort),
418
419 // SOCKS proxy to the switch network
420 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200421}
422
423// ClusterOptions contains all options for launching a Metropolis cluster.
424type ClusterOptions struct {
425 // The number of nodes this cluster should be started with.
426 NumNodes int
427}
428
429// Cluster is the running Metropolis cluster launched using the LaunchCluster
430// function.
431type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200432 // Owner is the TLS Certificate of the owner of the test cluster. This can be
433 // used to authenticate further clients to the running cluster.
434 Owner tls.Certificate
435 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200436 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200437 Ports launch.PortMap
438
Serge Bazanskibe742842022-04-04 13:18:50 +0200439 // Nodes is a map from Node ID to its runtime information.
440 Nodes map[string]*NodeInCluster
441 // NodeIDs is a list of node IDs that are backing this cluster, in order of
442 // creation.
443 NodeIDs []string
444
Serge Bazanski66e58952021-10-05 17:06:56 +0200445 // nodesDone is a list of channels populated with the return codes from all the
446 // nodes' qemu instances. It's used by Close to ensure all nodes have
447 // succesfully been stopped.
448 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200449 // nodeOpts are the cluster member nodes' mutable launch options, kept here
450 // to facilitate reboots.
451 nodeOpts []NodeOptions
452 // launchDir points at the directory keeping the nodes' state, such as storage
453 // images, firmware variable files, TPM state.
454 launchDir string
455 // socketDir points at the directory keeping UNIX socket files, such as these
456 // used to facilitate communication between QEMU and swtpm. It's different
457 // from launchDir, and anchored nearer the file system root, due to the
458 // socket path length limitation imposed by the kernel.
459 socketDir string
460
Serge Bazanskibe742842022-04-04 13:18:50 +0200461 // socksDialer is used by DialNode to establish connections to nodes via the
462 // SOCKS server ran by nanoswitch.
463 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200464
465 // authClient is a cached authenticated owner connection to a Curator
466 // instance within the cluster.
467 authClient *grpc.ClientConn
468
469 // ctxT is the context individual node contexts are created from.
470 ctxT context.Context
471 // ctxC is used by Close to cancel the context under which the nodes are
472 // running.
473 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200474}
475
476// NodeInCluster represents information about a node that's part of a Cluster.
477type NodeInCluster struct {
478 // ID of the node, which can be used to dial this node's services via DialNode.
479 ID string
480 // Address of the node on the network ran by nanoswitch. Not reachable from the
481 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
482 // on Cluster.Ports[SOCKSPort]).
483 ManagementAddress string
484}
485
486// firstConnection performs the initial owner credential escrow with a newly
487// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
488// running at 10.1.0.2, which is always the case with the current nanoswitch
489// implementation.
490//
491// It returns the newly escrowed credentials as well as the firt node's
492// information as NodeInCluster.
493func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
494 // Dial external service.
495 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
496 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
497 if err != nil {
498 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
499 }
500 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
501 return socksDialer.Dial("tcp", addr)
502 }
503 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
504 if err != nil {
505 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
506 }
507 defer initClient.Close()
508
509 // Retrieve owner certificate - this can take a while because the node is still
510 // coming up, so do it in a backoff loop.
511 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
512 aaa := apb.NewAAAClient(initClient)
513 var cert *tls.Certificate
514 err = backoff.Retry(func() error {
515 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
516 if st, ok := status.FromError(err); ok {
517 if st.Code() == codes.Unavailable {
518 return err
519 }
520 }
521 return backoff.Permanent(err)
522 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
523 if err != nil {
524 return nil, nil, err
525 }
526 log.Printf("Cluster: retrieved owner certificate.")
527
528 // Now connect authenticated and get the node ID.
529 creds := rpc.NewAuthenticatedCredentials(*cert, nil)
530 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
531 if err != nil {
532 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
533 }
534 defer authClient.Close()
535 mgmt := apb.NewManagementClient(authClient)
536
537 var node *NodeInCluster
538 err = backoff.Retry(func() error {
539 nodes, err := getNodes(ctx, mgmt)
540 if err != nil {
541 return fmt.Errorf("retrieving nodes failed: %w", err)
542 }
543 if len(nodes) != 1 {
544 return fmt.Errorf("expected one node, got %d", len(nodes))
545 }
546 n := nodes[0]
547 if n.Status == nil || n.Status.ExternalAddress == "" {
548 return fmt.Errorf("node has no status and/or address")
549 }
550 node = &NodeInCluster{
551 ID: identity.NodeID(n.Pubkey),
552 ManagementAddress: n.Status.ExternalAddress,
553 }
554 return nil
555 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
556 if err != nil {
557 return nil, nil, err
558 }
559
560 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200561}
562
563// LaunchCluster launches a cluster of Metropolis node VMs together with a
564// Nanoswitch instance to network them all together.
565//
566// The given context will be used to run all qemu instances in the cluster, and
567// canceling the context or calling Close() will terminate them.
568func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200569 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200570 return nil, errors.New("refusing to start cluster with zero nodes")
571 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200572
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200573 // Create the launch directory.
574 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster*")
575 if err != nil {
576 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
577 }
578 // Create the socket directory.
579 sd, err := os.MkdirTemp("/tmp", "cluster*")
580 if err != nil {
581 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
582 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200583
584 // Prepare links between nodes and nanoswitch.
585 var switchPorts []*os.File
586 var vmPorts []*os.File
587 for i := 0; i < opts.NumNodes; i++ {
588 switchPort, vmPort, err := launch.NewSocketPair()
589 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200590 return nil, fmt.Errorf("failed to get socketpair: %w", err)
591 }
592 switchPorts = append(switchPorts, switchPort)
593 vmPorts = append(vmPorts, vmPort)
594 }
595
Serge Bazanskie78a0892021-10-07 17:03:49 +0200596 // Make a list of channels that will be populated by all running node qemu
597 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200598 done := make([]chan error, opts.NumNodes)
599 for i, _ := range done {
600 done[i] = make(chan error, 1)
601 }
602
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200603 // Prepare the node options. These will be kept as part of Cluster.
604 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
605 // launch. The runtime information can be later used to restart a node.
606 // The 0th node will be initialized first. The rest will follow after it
607 // had bootstrapped the cluster.
608 nodeOpts := make([]NodeOptions, opts.NumNodes)
609 nodeOpts[0] = NodeOptions{
610 ConnectToSocket: vmPorts[0],
611 NodeParameters: &apb.NodeParameters{
612 Cluster: &apb.NodeParameters_ClusterBootstrap_{
613 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
614 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200615 },
616 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200617 },
618 SerialPort: newPrefixedStdio(0),
619 }
620
621 // Start the first node.
622 ctxT, ctxC := context.WithCancel(ctx)
623 log.Printf("Cluster: Starting node %d...", 1)
624 go func() {
625 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200626 if err != nil {
627 log.Printf("Node %d finished with an error: %v", 1, err)
628 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200629 done[0] <- err
630 }()
631
Serge Bazanskie78a0892021-10-07 17:03:49 +0200632 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200633 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
634 if err != nil {
635 ctxC()
636 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
637 }
638
639 go func() {
640 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
641 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100642 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200643 ExtraNetworkInterfaces: switchPorts,
644 PortMap: portMap,
645 }); err != nil {
646 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100647 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200648 }
649 }
650 }()
651
Serge Bazanskibe742842022-04-04 13:18:50 +0200652 // Build SOCKS dialer.
653 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
654 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200655 if err != nil {
656 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200657 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200658 }
659
Serge Bazanskibe742842022-04-04 13:18:50 +0200660 // Retrieve owner credentials and first node.
661 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200662 if err != nil {
663 ctxC()
664 return nil, err
665 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200666
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200667 // Set up a partially initialized cluster instance, to be filled in in the
668 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200669 cluster := &Cluster{
670 Owner: *cert,
671 Ports: portMap,
672 Nodes: map[string]*NodeInCluster{
673 firstNode.ID: firstNode,
674 },
675 NodeIDs: []string{
676 firstNode.ID,
677 },
678
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200679 nodesDone: done,
680 nodeOpts: nodeOpts,
681 launchDir: ld,
682 socketDir: sd,
683
Serge Bazanskibe742842022-04-04 13:18:50 +0200684 socksDialer: socksDialer,
685
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200686 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200687 ctxC: ctxC,
688 }
689
690 // Now start the rest of the nodes and register them into the cluster.
691
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200692 // Get an authenticated owner client within the cluster.
693 curC, err := cluster.curatorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200694 if err != nil {
695 ctxC()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200696 return nil, fmt.Errorf("curatorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200697 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200698 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200699
700 // Retrieve register ticket to register further nodes.
701 log.Printf("Cluster: retrieving register ticket...")
702 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
703 if err != nil {
704 ctxC()
705 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
706 }
707 ticket := resT.Ticket
708 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
709
710 // Retrieve cluster info (for directory and ca public key) to register further
711 // nodes.
712 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
713 if err != nil {
714 ctxC()
715 return nil, fmt.Errorf("GetClusterInfo: %w", err)
716 }
717
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200718 // Use the retrieved information to configure the rest of the node options.
719 for i := 1; i < opts.NumNodes; i++ {
720 nodeOpts[i] = NodeOptions{
721 ConnectToSocket: vmPorts[i],
722 NodeParameters: &apb.NodeParameters{
723 Cluster: &apb.NodeParameters_ClusterRegister_{
724 ClusterRegister: &apb.NodeParameters_ClusterRegister{
725 RegisterTicket: ticket,
726 ClusterDirectory: resI.ClusterDirectory,
727 CaCertificate: resI.CaCertificate,
728 },
729 },
730 },
731 SerialPort: newPrefixedStdio(i),
732 }
733 }
734
735 // Now run the rest of the nodes.
736 //
Serge Bazanskie78a0892021-10-07 17:03:49 +0200737 // TODO(q3k): parallelize this
738 for i := 1; i < opts.NumNodes; i++ {
739 log.Printf("Cluster: Starting node %d...", i+1)
740 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200741 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200742 if err != nil {
743 log.Printf("Node %d finished with an error: %v", i, err)
744 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200745 done[i] <- err
746 }(i)
747 var newNode *apb.Node
748
749 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
750 for {
751 nodes, err := getNodes(ctx, mgmt)
752 if err != nil {
753 ctxC()
754 return nil, fmt.Errorf("could not get nodes: %w", err)
755 }
756 for _, n := range nodes {
757 if n.State == cpb.NodeState_NODE_STATE_NEW {
758 newNode = n
759 break
760 }
761 }
762 if newNode != nil {
763 break
764 }
765 time.Sleep(1 * time.Second)
766 }
767 id := identity.NodeID(newNode.Pubkey)
768 log.Printf("Cluster: node %d is %s", i, id)
769
770 log.Printf("Cluster: approving node %d", i)
771 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
772 Pubkey: newNode.Pubkey,
773 })
774 if err != nil {
775 ctxC()
776 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
777 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200778 log.Printf("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200779 for {
780 nodes, err := getNodes(ctx, mgmt)
781 if err != nil {
782 ctxC()
783 return nil, fmt.Errorf("could not get nodes: %w", err)
784 }
785 found := false
786 for _, n := range nodes {
787 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
788 continue
789 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200790 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200791 break
792 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200793 if n.State != cpb.NodeState_NODE_STATE_UP {
794 break
795 }
796 found = true
797 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
798 ID: identity.NodeID(n.Pubkey),
799 ManagementAddress: n.Status.ExternalAddress,
800 }
801 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
802 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200803 }
804 if found {
805 break
806 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200807 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200808 }
809 log.Printf("Cluster: node %d (%s) UP!", i, id)
810 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200811
Serge Bazanskibe742842022-04-04 13:18:50 +0200812 log.Printf("Cluster: all nodes up:")
813 for _, node := range cluster.Nodes {
814 log.Printf("Cluster: - %s at %s", node.ID, node.ManagementAddress)
815 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200816
Serge Bazanskibe742842022-04-04 13:18:50 +0200817 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200818}
819
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200820// RebootNode reboots the cluster member node matching the given index, and
821// waits for it to rejoin the cluster. It will use the given context ctx to run
822// cluster API requests, whereas the resulting QEMU process will be created
823// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
824func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
825 if idx < 0 || idx >= len(c.NodeIDs) {
826 return fmt.Errorf("index out of bounds.")
827 }
828 id := c.NodeIDs[idx]
829
830 // Get an authenticated owner client within the cluster.
831 curC, err := c.curatorClient()
832 if err != nil {
833 return err
834 }
835 mgmt := apb.NewManagementClient(curC)
836
837 // Get the timestamp of the node's last update, as observed by Curator.
838 // It'll be needed to make sure it had rejoined the cluster after the reboot.
839 var is *apb.Node
840 for {
841 r, err := getNode(ctx, mgmt, id)
842 if err != nil {
843 return err
844 }
845
846 // Node status may be absent if it hasn't reported to the cluster yet. Wait
847 // for it to appear before progressing further.
848 if r.Status != nil {
849 is = r
850 break
851 }
852 time.Sleep(time.Second)
853 }
854
855 // Cancel the node's context. This will shut down QEMU.
856 c.nodeOpts[idx].Runtime.CtxC()
857 log.Printf("Cluster: waiting for node %d (%s) to stop.", idx, id)
858 err = <-c.nodesDone[idx]
859 if err != nil {
860 return fmt.Errorf("while restarting node: %w", err)
861 }
862
863 // Start QEMU again.
864 log.Printf("Cluster: restarting node %d (%s).", idx, id)
865 go func(n int) {
866 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200867 if err != nil {
868 log.Printf("Node %d finished with an error: %v", n, err)
869 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200870 c.nodesDone[n] <- err
871 }(idx)
872
873 // Poll Management.GetNodes until the node's timestamp is updated.
874 for {
875 cs, err := getNode(ctx, mgmt, id)
876 if err != nil {
877 return err
878 }
879 if cs.Status == nil {
880 continue
881 }
882 if cs.Status.Timestamp > is.Status.Timestamp {
883 break
884 }
885 time.Sleep(time.Second)
886 }
887 log.Printf("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
888 return nil
889}
890
891// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +0200892// nodes to stop. It returns an error if stopping the nodes failed, or one of
893// the nodes failed to fully start in the first place.
894func (c *Cluster) Close() error {
895 log.Printf("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200896 if c.authClient != nil {
897 c.authClient.Close()
898 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200899 c.ctxC()
900
901 var errors []error
902 log.Printf("Cluster: waiting for nodes to exit...")
903 for _, c := range c.nodesDone {
904 err := <-c
905 if err != nil {
906 errors = append(errors, err)
907 }
908 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200909 log.Printf("Cluster: removing nodes' state files.")
910 os.RemoveAll(c.launchDir)
911 os.RemoveAll(c.socketDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200912 log.Printf("Cluster: done")
913 return multierr.Combine(errors...)
914}
Serge Bazanskibe742842022-04-04 13:18:50 +0200915
916// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
917// their ID. This is performed by connecting to the cluster nanoswitch via its
918// SOCKS proxy, and using the cluster node list for name resolution.
919//
920// For example:
921//
922// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
923//
924func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
925 host, port, err := net.SplitHostPort(addr)
926 if err != nil {
927 return nil, fmt.Errorf("invalid host:port: %w", err)
928 }
929 // Already an IP address?
930 if net.ParseIP(host) != nil {
931 return c.socksDialer.Dial("tcp", addr)
932 }
933
934 // Otherwise, expect a node name.
935 node, ok := c.Nodes[host]
936 if !ok {
937 return nil, fmt.Errorf("unknown node %q", host)
938 }
939 addr = net.JoinHostPort(node.ManagementAddress, port)
940 return c.socksDialer.Dial("tcp", addr)
941}