blob: d0325ce8ea795c56ddcafedb77d312a3429bdc01 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020024 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020025 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "google.golang.org/protobuf/proto"
30
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020031 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020032 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020033 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020035 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020037 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020038 "source.monogon.dev/metropolis/test/launch"
39)
40
41// Options contains all options that can be passed to Launch()
42type NodeOptions struct {
43 // Ports contains the port mapping where to expose the internal ports of the VM to
44 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
45 // ConnectToSocket is set.
46 Ports launch.PortMap
47
48 // If set to true, reboots are honored. Otherwise all reboots exit the Launch()
49 // command. Metropolis nodes generally restarts on almost all errors, so unless you
50 // want to test reboot behavior this should be false.
51 AllowReboot bool
52
53 // By default the VM is connected to the Host via SLIRP. If ConnectToSocket is set,
54 // it is instead connected to the given file descriptor/socket. If this is set, all
55 // port maps from the Ports option are ignored. Intended for networking this
56 // instance together with others for running more complex network configurations.
57 ConnectToSocket *os.File
58
59 // SerialPort is a io.ReadWriter over which you can communicate with the serial
60 // port of the machine It can be set to an existing file descriptor (like
61 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
62 SerialPort io.ReadWriter
63
64 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
65 // registering into a cluster.
66 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020067
68 // Mac is the node's MAC address.
69 Mac *net.HardwareAddr
70
71 // Runtime keeps the node's QEMU runtime state.
72 Runtime *NodeRuntime
73}
74
75// Runtime keeps the node's QEMU runtime options.
76type NodeRuntime struct {
77 // ld points at the node's launch directory storing data such as storage
78 // images, firmware variables or the TPM state.
79 ld string
80 // sd points at the node's socket directory.
81 sd string
82
83 // ctxT is the context QEMU will execute in.
84 ctxT context.Context
85 // CtxC is the QEMU context's cancellation function.
86 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +020087}
88
89// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020090var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020091 node.ConsensusPort,
92
93 node.CuratorServicePort,
94 node.DebugServicePort,
95
96 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +010097 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +020098 node.CuratorServicePort,
99 node.DebuggerPort,
100}
101
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200102// setupRuntime creates the node's QEMU runtime directory, together with all
103// files required to preserve its state, a level below the chosen path ld. The
104// node's socket directory is similarily created a level below sd. It may
105// return an I/O error.
106func setupRuntime(ld, sd string) (*NodeRuntime, error) {
107 // Create a temporary directory to keep all the runtime files.
108 stdp, err := os.MkdirTemp(ld, "node_state*")
109 if err != nil {
110 return nil, fmt.Errorf("failed to create the state directory: %w", err)
111 }
112
113 // Initialize the node's storage with a prebuilt image.
114 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
115 if err != nil {
116 return nil, fmt.Errorf("while resolving a path: %w", err)
117 }
118 di := filepath.Join(stdp, filepath.Base(si))
119 log.Printf("Cluster: copying the node image: %s -> %s", si, di)
120 if err := copyFile(si, di); err != nil {
121 return nil, fmt.Errorf("while copying the node image: %w", err)
122 }
123
124 // Initialize the OVMF firmware variables file.
125 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
126 if err != nil {
127 return nil, fmt.Errorf("while resolving a path: %w", err)
128 }
129 dv := filepath.Join(stdp, filepath.Base(sv))
130 if err := copyFile(sv, dv); err != nil {
131 return nil, fmt.Errorf("while copying firmware variables: %w", err)
132 }
133
134 // Create the TPM state directory and initialize all files required by swtpm.
135 tpmt := filepath.Join(stdp, "tpm")
136 if err := os.Mkdir(tpmt, 0755); err != nil {
137 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
138 }
139 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
140 if err != nil {
141 return nil, fmt.Errorf("while resolving a path: %w", err)
142 }
143 tpmf, err := os.ReadDir(tpms)
144 if err != nil {
145 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
146 }
147 for _, file := range tpmf {
148 name := file.Name()
149 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
150 if err != nil {
151 return nil, fmt.Errorf("while resolving a path: %w", err)
152 }
153 tgt := filepath.Join(tpmt, name)
154 if err := copyFile(src, tgt); err != nil {
155 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
156 }
157 }
158
159 // Create the socket directory.
160 sotdp, err := os.MkdirTemp(sd, "node_sock*")
161 if err != nil {
162 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
163 }
164
165 return &NodeRuntime{
166 ld: stdp,
167 sd: sotdp,
168 }, nil
169}
170
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200171// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200172// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200173func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200174 if c.authClient == nil {
175 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, nil)
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200176 r := resolver.New(c.ctxT)
177 r.SetLogger(func(f string, args ...interface{}) {
178 log.Printf("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
179 })
180 for _, n := range c.NodeIDs {
181 ep, err := resolver.NodeWithDefaultPort(n)
182 if err != nil {
183 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
184 }
185 r.AddEndpoint(ep)
186 }
187 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
188 grpc.WithTransportCredentials(authCreds),
189 grpc.WithResolvers(r),
190 grpc.WithContextDialer(c.DialNode),
191 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200192 if err != nil {
193 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
194 }
195 c.authClient = authClient
196 }
197 return c.authClient, nil
198}
199
Serge Bazanski66e58952021-10-05 17:06:56 +0200200// LaunchNode launches a single Metropolis node instance with the given options.
201// The instance runs mostly paravirtualized but with some emulated hardware
202// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200203// writable, and the changes are kept across reboots and shutdowns. ld and sd
204// point to the launch directory and the socket directory, holding the nodes'
205// state files (storage, tpm state, firmware state), and UNIX socket files
206// (swtpm <-> QEMU interplay) respectively. The directories must exist before
207// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
208// if either are not initialized.
209func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
210 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
211 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200212 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
213 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
214 // that we open and pass the name of it to QEMU. Not pinning this crashes both
215 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
216 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200217
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200218 // If it's the node's first start, set up its runtime directories.
219 if options.Runtime == nil {
220 r, err := setupRuntime(ld, sd)
221 if err != nil {
222 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200223 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200224 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200225 }
226
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200227 // Replace the node's context with a new one.
228 r := options.Runtime
229 if r.CtxC != nil {
230 r.CtxC()
231 }
232 r.ctxT, r.CtxC = context.WithCancel(ctx)
233
Serge Bazanski66e58952021-10-05 17:06:56 +0200234 var qemuNetType string
235 var qemuNetConfig launch.QemuValue
236 if options.ConnectToSocket != nil {
237 qemuNetType = "socket"
238 qemuNetConfig = launch.QemuValue{
239 "id": {"net0"},
240 "fd": {"3"},
241 }
242 } else {
243 qemuNetType = "user"
244 qemuNetConfig = launch.QemuValue{
245 "id": {"net0"},
246 "net": {"10.42.0.0/24"},
247 "dhcpstart": {"10.42.0.10"},
248 "hostfwd": options.Ports.ToQemuForwards(),
249 }
250 }
251
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200252 // Generate the node's MAC address if it isn't already set in NodeOptions.
253 if options.Mac == nil {
254 mac, err := generateRandomEthernetMAC()
255 if err != nil {
256 return err
257 }
258 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200259 }
260
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200261 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
262 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
263 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200264 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
265 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
266 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200267 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
268 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200269 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200270 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200271 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
272 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
273 "-device", "tpm-tis,tpmdev=tpm0",
274 "-device", "virtio-rng-pci",
275 "-serial", "stdio"}
276
277 if !options.AllowReboot {
278 qemuArgs = append(qemuArgs, "-no-reboot")
279 }
280
281 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200282 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200283 parametersRaw, err := proto.Marshal(options.NodeParameters)
284 if err != nil {
285 return fmt.Errorf("failed to encode node paraeters: %w", err)
286 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100287 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200288 return fmt.Errorf("failed to write node parameters: %w", err)
289 }
290 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
291 }
292
293 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200294 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200295 defer tpmCancel()
296
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200297 tpmd := filepath.Join(r.ld, "tpm")
298 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200299 tpmEmuCmd.Stderr = os.Stderr
300 tpmEmuCmd.Stdout = os.Stdout
301
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200302 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200303 if err != nil {
304 return fmt.Errorf("failed to start TPM emulator: %w", err)
305 }
306
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200307 // Wait for the socket to be created by the TPM emulator before launching
308 // QEMU.
309 for {
310 _, err := os.Stat(tpmSocketPath)
311 if err == nil {
312 break
313 }
314 if err != nil && !os.IsNotExist(err) {
315 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
316 }
317 if err := tpmCtx.Err(); err != nil {
318 return fmt.Errorf("while waiting for the TPM socket: %w", err)
319 }
320 time.Sleep(time.Millisecond * 100)
321 }
322
Serge Bazanski66e58952021-10-05 17:06:56 +0200323 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200324 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200325 if options.ConnectToSocket != nil {
326 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
327 }
328
329 var stdErrBuf bytes.Buffer
330 systemCmd.Stderr = &stdErrBuf
331 systemCmd.Stdout = options.SerialPort
332
333 err = systemCmd.Run()
334
335 // Stop TPM emulator and wait for it to exit to properly reap the child process
336 tpmCancel()
337 log.Print("Node: Waiting for TPM emulator to exit")
338 // Wait returns a SIGKILL error because we just cancelled its context.
339 // We still need to call it to avoid creating zombies.
340 _ = tpmEmuCmd.Wait()
341 log.Print("Node: TPM emulator done")
342
343 var exerr *exec.ExitError
344 if err != nil && errors.As(err, &exerr) {
345 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
346 if status.Signaled() && status.Signal() == syscall.SIGKILL {
347 // Process was killed externally (most likely by our context being canceled).
348 // This is a normal exit for us, so return nil
349 return nil
350 }
351 exerr.Stderr = stdErrBuf.Bytes()
352 newErr := launch.QEMUError(*exerr)
353 return &newErr
354 }
355 return err
356}
357
358func copyFile(src, dst string) error {
359 in, err := os.Open(src)
360 if err != nil {
361 return fmt.Errorf("when opening source: %w", err)
362 }
363 defer in.Close()
364
365 out, err := os.Create(dst)
366 if err != nil {
367 return fmt.Errorf("when creating destination: %w", err)
368 }
369 defer out.Close()
370
371 _, err = io.Copy(out, in)
372 if err != nil {
373 return fmt.Errorf("when copying file: %w", err)
374 }
375 return out.Close()
376}
377
Serge Bazanskie78a0892021-10-07 17:03:49 +0200378// getNodes wraps around Management.GetNodes to return a list of nodes in a
379// cluster.
380func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200381 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100382 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100383 err := backoff.Retry(func() error {
384 res = nil
385 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200386 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100387 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200388 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100389 for {
390 node, err := srvN.Recv()
391 if err == io.EOF {
392 break
393 }
394 if err != nil {
395 return fmt.Errorf("GetNodes.Recv: %w", err)
396 }
397 res = append(res, node)
398 }
399 return nil
400 }, bo)
401 if err != nil {
402 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200403 }
404 return res, nil
405}
406
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200407// getNode wraps Management.GetNodes. It returns node information matching
408// given node ID.
409func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
410 nodes, err := getNodes(ctx, mgmt)
411 if err != nil {
412 return nil, fmt.Errorf("could not get nodes: %w", err)
413 }
414 for _, n := range nodes {
415 eid := identity.NodeID(n.Pubkey)
416 if eid != id {
417 continue
418 }
419 return n, nil
420 }
421 return nil, fmt.Errorf("no such node.")
422}
423
Serge Bazanski66e58952021-10-05 17:06:56 +0200424// Gets a random EUI-48 Ethernet MAC address
425func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
426 macBuf := make([]byte, 6)
427 _, err := rand.Read(macBuf)
428 if err != nil {
429 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
430 }
431
432 // Set U/L bit and clear I/G bit (locally administered individual MAC)
433 // Ref IEEE 802-2014 Section 8.2.2
434 macBuf[0] = (macBuf[0] | 2) & 0xfe
435 mac := net.HardwareAddr(macBuf)
436 return &mac, nil
437}
438
Serge Bazanskibe742842022-04-04 13:18:50 +0200439const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200440
Serge Bazanskibe742842022-04-04 13:18:50 +0200441// ClusterPorts contains all ports handled by Nanoswitch.
442var ClusterPorts = []uint16{
443 // Forwarded to the first node.
444 uint16(node.CuratorServicePort),
445 uint16(node.DebugServicePort),
446 uint16(node.KubernetesAPIPort),
447 uint16(node.KubernetesAPIWrappedPort),
448
449 // SOCKS proxy to the switch network
450 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200451}
452
453// ClusterOptions contains all options for launching a Metropolis cluster.
454type ClusterOptions struct {
455 // The number of nodes this cluster should be started with.
456 NumNodes int
457}
458
459// Cluster is the running Metropolis cluster launched using the LaunchCluster
460// function.
461type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200462 // Owner is the TLS Certificate of the owner of the test cluster. This can be
463 // used to authenticate further clients to the running cluster.
464 Owner tls.Certificate
465 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200466 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200467 Ports launch.PortMap
468
Serge Bazanskibe742842022-04-04 13:18:50 +0200469 // Nodes is a map from Node ID to its runtime information.
470 Nodes map[string]*NodeInCluster
471 // NodeIDs is a list of node IDs that are backing this cluster, in order of
472 // creation.
473 NodeIDs []string
474
Serge Bazanski66e58952021-10-05 17:06:56 +0200475 // nodesDone is a list of channels populated with the return codes from all the
476 // nodes' qemu instances. It's used by Close to ensure all nodes have
477 // succesfully been stopped.
478 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200479 // nodeOpts are the cluster member nodes' mutable launch options, kept here
480 // to facilitate reboots.
481 nodeOpts []NodeOptions
482 // launchDir points at the directory keeping the nodes' state, such as storage
483 // images, firmware variable files, TPM state.
484 launchDir string
485 // socketDir points at the directory keeping UNIX socket files, such as these
486 // used to facilitate communication between QEMU and swtpm. It's different
487 // from launchDir, and anchored nearer the file system root, due to the
488 // socket path length limitation imposed by the kernel.
489 socketDir string
490
Serge Bazanskibe742842022-04-04 13:18:50 +0200491 // socksDialer is used by DialNode to establish connections to nodes via the
492 // SOCKS server ran by nanoswitch.
493 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200494
495 // authClient is a cached authenticated owner connection to a Curator
496 // instance within the cluster.
497 authClient *grpc.ClientConn
498
499 // ctxT is the context individual node contexts are created from.
500 ctxT context.Context
501 // ctxC is used by Close to cancel the context under which the nodes are
502 // running.
503 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200504}
505
506// NodeInCluster represents information about a node that's part of a Cluster.
507type NodeInCluster struct {
508 // ID of the node, which can be used to dial this node's services via DialNode.
509 ID string
510 // Address of the node on the network ran by nanoswitch. Not reachable from the
511 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
512 // on Cluster.Ports[SOCKSPort]).
513 ManagementAddress string
514}
515
516// firstConnection performs the initial owner credential escrow with a newly
517// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
518// running at 10.1.0.2, which is always the case with the current nanoswitch
519// implementation.
520//
521// It returns the newly escrowed credentials as well as the firt node's
522// information as NodeInCluster.
523func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
524 // Dial external service.
525 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
526 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
527 if err != nil {
528 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
529 }
530 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
531 return socksDialer.Dial("tcp", addr)
532 }
533 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
534 if err != nil {
535 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
536 }
537 defer initClient.Close()
538
539 // Retrieve owner certificate - this can take a while because the node is still
540 // coming up, so do it in a backoff loop.
541 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
542 aaa := apb.NewAAAClient(initClient)
543 var cert *tls.Certificate
544 err = backoff.Retry(func() error {
545 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
546 if st, ok := status.FromError(err); ok {
547 if st.Code() == codes.Unavailable {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200548 log.Printf("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200549 return err
550 }
551 }
552 return backoff.Permanent(err)
553 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
554 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200555 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200556 }
557 log.Printf("Cluster: retrieved owner certificate.")
558
559 // Now connect authenticated and get the node ID.
560 creds := rpc.NewAuthenticatedCredentials(*cert, nil)
561 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
562 if err != nil {
563 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
564 }
565 defer authClient.Close()
566 mgmt := apb.NewManagementClient(authClient)
567
568 var node *NodeInCluster
569 err = backoff.Retry(func() error {
570 nodes, err := getNodes(ctx, mgmt)
571 if err != nil {
572 return fmt.Errorf("retrieving nodes failed: %w", err)
573 }
574 if len(nodes) != 1 {
575 return fmt.Errorf("expected one node, got %d", len(nodes))
576 }
577 n := nodes[0]
578 if n.Status == nil || n.Status.ExternalAddress == "" {
579 return fmt.Errorf("node has no status and/or address")
580 }
581 node = &NodeInCluster{
582 ID: identity.NodeID(n.Pubkey),
583 ManagementAddress: n.Status.ExternalAddress,
584 }
585 return nil
586 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
587 if err != nil {
588 return nil, nil, err
589 }
590
591 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200592}
593
594// LaunchCluster launches a cluster of Metropolis node VMs together with a
595// Nanoswitch instance to network them all together.
596//
597// The given context will be used to run all qemu instances in the cluster, and
598// canceling the context or calling Close() will terminate them.
599func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200600 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200601 return nil, errors.New("refusing to start cluster with zero nodes")
602 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200603
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200604 // Create the launch directory.
605 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster*")
606 if err != nil {
607 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
608 }
609 // Create the socket directory.
610 sd, err := os.MkdirTemp("/tmp", "cluster*")
611 if err != nil {
612 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
613 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200614
615 // Prepare links between nodes and nanoswitch.
616 var switchPorts []*os.File
617 var vmPorts []*os.File
618 for i := 0; i < opts.NumNodes; i++ {
619 switchPort, vmPort, err := launch.NewSocketPair()
620 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200621 return nil, fmt.Errorf("failed to get socketpair: %w", err)
622 }
623 switchPorts = append(switchPorts, switchPort)
624 vmPorts = append(vmPorts, vmPort)
625 }
626
Serge Bazanskie78a0892021-10-07 17:03:49 +0200627 // Make a list of channels that will be populated by all running node qemu
628 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200629 done := make([]chan error, opts.NumNodes)
630 for i, _ := range done {
631 done[i] = make(chan error, 1)
632 }
633
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200634 // Prepare the node options. These will be kept as part of Cluster.
635 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
636 // launch. The runtime information can be later used to restart a node.
637 // The 0th node will be initialized first. The rest will follow after it
638 // had bootstrapped the cluster.
639 nodeOpts := make([]NodeOptions, opts.NumNodes)
640 nodeOpts[0] = NodeOptions{
641 ConnectToSocket: vmPorts[0],
642 NodeParameters: &apb.NodeParameters{
643 Cluster: &apb.NodeParameters_ClusterBootstrap_{
644 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
645 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200646 },
647 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200648 },
649 SerialPort: newPrefixedStdio(0),
650 }
651
652 // Start the first node.
653 ctxT, ctxC := context.WithCancel(ctx)
654 log.Printf("Cluster: Starting node %d...", 1)
655 go func() {
656 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200657 if err != nil {
658 log.Printf("Node %d finished with an error: %v", 1, err)
659 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200660 done[0] <- err
661 }()
662
Serge Bazanskie78a0892021-10-07 17:03:49 +0200663 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200664 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
665 if err != nil {
666 ctxC()
667 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
668 }
669
670 go func() {
671 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
672 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100673 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200674 ExtraNetworkInterfaces: switchPorts,
675 PortMap: portMap,
Serge Bazanski1fbc5972022-06-22 13:36:16 +0200676 SerialPort: newPrefixedStdio(99),
Serge Bazanski66e58952021-10-05 17:06:56 +0200677 }); err != nil {
678 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100679 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200680 }
681 }
682 }()
683
Serge Bazanskibe742842022-04-04 13:18:50 +0200684 // Build SOCKS dialer.
685 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
686 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200687 if err != nil {
688 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200689 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200690 }
691
Serge Bazanskibe742842022-04-04 13:18:50 +0200692 // Retrieve owner credentials and first node.
693 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200694 if err != nil {
695 ctxC()
696 return nil, err
697 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200698
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200699 // Set up a partially initialized cluster instance, to be filled in in the
700 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200701 cluster := &Cluster{
702 Owner: *cert,
703 Ports: portMap,
704 Nodes: map[string]*NodeInCluster{
705 firstNode.ID: firstNode,
706 },
707 NodeIDs: []string{
708 firstNode.ID,
709 },
710
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200711 nodesDone: done,
712 nodeOpts: nodeOpts,
713 launchDir: ld,
714 socketDir: sd,
715
Serge Bazanskibe742842022-04-04 13:18:50 +0200716 socksDialer: socksDialer,
717
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200718 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200719 ctxC: ctxC,
720 }
721
722 // Now start the rest of the nodes and register them into the cluster.
723
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200724 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200725 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200726 if err != nil {
727 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200728 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200729 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200730 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200731
732 // Retrieve register ticket to register further nodes.
733 log.Printf("Cluster: retrieving register ticket...")
734 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
735 if err != nil {
736 ctxC()
737 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
738 }
739 ticket := resT.Ticket
740 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
741
742 // Retrieve cluster info (for directory and ca public key) to register further
743 // nodes.
744 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
745 if err != nil {
746 ctxC()
747 return nil, fmt.Errorf("GetClusterInfo: %w", err)
748 }
749
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200750 // Use the retrieved information to configure the rest of the node options.
751 for i := 1; i < opts.NumNodes; i++ {
752 nodeOpts[i] = NodeOptions{
753 ConnectToSocket: vmPorts[i],
754 NodeParameters: &apb.NodeParameters{
755 Cluster: &apb.NodeParameters_ClusterRegister_{
756 ClusterRegister: &apb.NodeParameters_ClusterRegister{
757 RegisterTicket: ticket,
758 ClusterDirectory: resI.ClusterDirectory,
759 CaCertificate: resI.CaCertificate,
760 },
761 },
762 },
763 SerialPort: newPrefixedStdio(i),
764 }
765 }
766
767 // Now run the rest of the nodes.
768 //
Serge Bazanskie78a0892021-10-07 17:03:49 +0200769 // TODO(q3k): parallelize this
770 for i := 1; i < opts.NumNodes; i++ {
771 log.Printf("Cluster: Starting node %d...", i+1)
772 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200773 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200774 if err != nil {
775 log.Printf("Node %d finished with an error: %v", i, err)
776 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200777 done[i] <- err
778 }(i)
779 var newNode *apb.Node
780
781 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
782 for {
783 nodes, err := getNodes(ctx, mgmt)
784 if err != nil {
785 ctxC()
786 return nil, fmt.Errorf("could not get nodes: %w", err)
787 }
788 for _, n := range nodes {
789 if n.State == cpb.NodeState_NODE_STATE_NEW {
790 newNode = n
791 break
792 }
793 }
794 if newNode != nil {
795 break
796 }
797 time.Sleep(1 * time.Second)
798 }
799 id := identity.NodeID(newNode.Pubkey)
800 log.Printf("Cluster: node %d is %s", i, id)
801
802 log.Printf("Cluster: approving node %d", i)
803 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
804 Pubkey: newNode.Pubkey,
805 })
806 if err != nil {
807 ctxC()
808 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
809 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200810 log.Printf("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200811 for {
812 nodes, err := getNodes(ctx, mgmt)
813 if err != nil {
814 ctxC()
815 return nil, fmt.Errorf("could not get nodes: %w", err)
816 }
817 found := false
818 for _, n := range nodes {
819 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
820 continue
821 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200822 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200823 break
824 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200825 if n.State != cpb.NodeState_NODE_STATE_UP {
826 break
827 }
828 found = true
829 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
830 ID: identity.NodeID(n.Pubkey),
831 ManagementAddress: n.Status.ExternalAddress,
832 }
833 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
834 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200835 }
836 if found {
837 break
838 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200839 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200840 }
841 log.Printf("Cluster: node %d (%s) UP!", i, id)
842 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200843
Serge Bazanskibe742842022-04-04 13:18:50 +0200844 log.Printf("Cluster: all nodes up:")
845 for _, node := range cluster.Nodes {
846 log.Printf("Cluster: - %s at %s", node.ID, node.ManagementAddress)
847 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200848
Serge Bazanskibe742842022-04-04 13:18:50 +0200849 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200850}
851
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200852// RebootNode reboots the cluster member node matching the given index, and
853// waits for it to rejoin the cluster. It will use the given context ctx to run
854// cluster API requests, whereas the resulting QEMU process will be created
855// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
856func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
857 if idx < 0 || idx >= len(c.NodeIDs) {
858 return fmt.Errorf("index out of bounds.")
859 }
860 id := c.NodeIDs[idx]
861
862 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200863 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200864 if err != nil {
865 return err
866 }
867 mgmt := apb.NewManagementClient(curC)
868
869 // Get the timestamp of the node's last update, as observed by Curator.
870 // It'll be needed to make sure it had rejoined the cluster after the reboot.
871 var is *apb.Node
872 for {
873 r, err := getNode(ctx, mgmt, id)
874 if err != nil {
875 return err
876 }
877
878 // Node status may be absent if it hasn't reported to the cluster yet. Wait
879 // for it to appear before progressing further.
880 if r.Status != nil {
881 is = r
882 break
883 }
884 time.Sleep(time.Second)
885 }
886
887 // Cancel the node's context. This will shut down QEMU.
888 c.nodeOpts[idx].Runtime.CtxC()
889 log.Printf("Cluster: waiting for node %d (%s) to stop.", idx, id)
890 err = <-c.nodesDone[idx]
891 if err != nil {
892 return fmt.Errorf("while restarting node: %w", err)
893 }
894
895 // Start QEMU again.
896 log.Printf("Cluster: restarting node %d (%s).", idx, id)
897 go func(n int) {
898 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200899 if err != nil {
900 log.Printf("Node %d finished with an error: %v", n, err)
901 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200902 c.nodesDone[n] <- err
903 }(idx)
904
905 // Poll Management.GetNodes until the node's timestamp is updated.
906 for {
907 cs, err := getNode(ctx, mgmt, id)
908 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200909 log.Printf("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200910 return err
911 }
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200912 log.Printf("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200913 if cs.Status == nil {
914 continue
915 }
916 if cs.Status.Timestamp > is.Status.Timestamp {
917 break
918 }
919 time.Sleep(time.Second)
920 }
921 log.Printf("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
922 return nil
923}
924
925// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +0200926// nodes to stop. It returns an error if stopping the nodes failed, or one of
927// the nodes failed to fully start in the first place.
928func (c *Cluster) Close() error {
929 log.Printf("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200930 if c.authClient != nil {
931 c.authClient.Close()
932 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200933 c.ctxC()
934
935 var errors []error
936 log.Printf("Cluster: waiting for nodes to exit...")
937 for _, c := range c.nodesDone {
938 err := <-c
939 if err != nil {
940 errors = append(errors, err)
941 }
942 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200943 log.Printf("Cluster: removing nodes' state files.")
944 os.RemoveAll(c.launchDir)
945 os.RemoveAll(c.socketDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200946 log.Printf("Cluster: done")
947 return multierr.Combine(errors...)
948}
Serge Bazanskibe742842022-04-04 13:18:50 +0200949
950// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
951// their ID. This is performed by connecting to the cluster nanoswitch via its
952// SOCKS proxy, and using the cluster node list for name resolution.
953//
954// For example:
955//
956// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
957//
958func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
959 host, port, err := net.SplitHostPort(addr)
960 if err != nil {
961 return nil, fmt.Errorf("invalid host:port: %w", err)
962 }
963 // Already an IP address?
964 if net.ParseIP(host) != nil {
965 return c.socksDialer.Dial("tcp", addr)
966 }
967
968 // Otherwise, expect a node name.
969 node, ok := c.Nodes[host]
970 if !ok {
971 return nil, fmt.Errorf("unknown node %q", host)
972 }
973 addr = net.JoinHostPort(node.ManagementAddress, port)
974 return c.socksDialer.Dial("tcp", addr)
975}