blob: bd71f06c162411d3e6b1763b25acaf2909167b6a [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010019 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "path/filepath"
21 "syscall"
22 "time"
23
24 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020025 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020026 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020027 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "google.golang.org/protobuf/proto"
31
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020032 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020034 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020035 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020036 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Serge Bazanski66e58952021-10-05 17:06:56 +020037 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020038 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020039 "source.monogon.dev/metropolis/test/launch"
40)
41
Leopold20a036e2023-01-15 00:17:19 +010042// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020043type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010044 // Name is a human-readable identifier to be used in debug output.
45 Name string
46
Serge Bazanski66e58952021-10-05 17:06:56 +020047 // Ports contains the port mapping where to expose the internal ports of the VM to
48 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
49 // ConnectToSocket is set.
50 Ports launch.PortMap
51
Leopold20a036e2023-01-15 00:17:19 +010052 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
53 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020054 // want to test reboot behavior this should be false.
55 AllowReboot bool
56
Leopold20a036e2023-01-15 00:17:19 +010057 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
58 // set, it is instead connected to the given file descriptor/socket. If this is
59 // set, all port maps from the Ports option are ignored. Intended for networking
60 // this instance together with others for running more complex network
61 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020062 ConnectToSocket *os.File
63
Leopoldacfad5b2023-01-15 14:05:25 +010064 // When PcapDump is set, all traffic is dumped to a pcap file in the
65 // runtime directory (e.g. "net0.pcap" for the first interface).
66 PcapDump bool
67
Leopold20a036e2023-01-15 00:17:19 +010068 // SerialPort is an io.ReadWriter over which you can communicate with the serial
69 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020070 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
71 SerialPort io.ReadWriter
72
73 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
74 // registering into a cluster.
75 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020076
77 // Mac is the node's MAC address.
78 Mac *net.HardwareAddr
79
80 // Runtime keeps the node's QEMU runtime state.
81 Runtime *NodeRuntime
82}
83
Leopold20a036e2023-01-15 00:17:19 +010084// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020085type NodeRuntime struct {
86 // ld points at the node's launch directory storing data such as storage
87 // images, firmware variables or the TPM state.
88 ld string
89 // sd points at the node's socket directory.
90 sd string
91
92 // ctxT is the context QEMU will execute in.
93 ctxT context.Context
94 // CtxC is the QEMU context's cancellation function.
95 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +020096}
97
98// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020099var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200100 node.ConsensusPort,
101
102 node.CuratorServicePort,
103 node.DebugServicePort,
104
105 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100106 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200107 node.CuratorServicePort,
108 node.DebuggerPort,
109}
110
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200111// setupRuntime creates the node's QEMU runtime directory, together with all
112// files required to preserve its state, a level below the chosen path ld. The
113// node's socket directory is similarily created a level below sd. It may
114// return an I/O error.
115func setupRuntime(ld, sd string) (*NodeRuntime, error) {
116 // Create a temporary directory to keep all the runtime files.
117 stdp, err := os.MkdirTemp(ld, "node_state*")
118 if err != nil {
119 return nil, fmt.Errorf("failed to create the state directory: %w", err)
120 }
121
122 // Initialize the node's storage with a prebuilt image.
123 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
124 if err != nil {
125 return nil, fmt.Errorf("while resolving a path: %w", err)
126 }
127 di := filepath.Join(stdp, filepath.Base(si))
128 log.Printf("Cluster: copying the node image: %s -> %s", si, di)
129 if err := copyFile(si, di); err != nil {
130 return nil, fmt.Errorf("while copying the node image: %w", err)
131 }
132
133 // Initialize the OVMF firmware variables file.
134 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
135 if err != nil {
136 return nil, fmt.Errorf("while resolving a path: %w", err)
137 }
138 dv := filepath.Join(stdp, filepath.Base(sv))
139 if err := copyFile(sv, dv); err != nil {
140 return nil, fmt.Errorf("while copying firmware variables: %w", err)
141 }
142
143 // Create the TPM state directory and initialize all files required by swtpm.
144 tpmt := filepath.Join(stdp, "tpm")
145 if err := os.Mkdir(tpmt, 0755); err != nil {
146 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
147 }
148 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
149 if err != nil {
150 return nil, fmt.Errorf("while resolving a path: %w", err)
151 }
152 tpmf, err := os.ReadDir(tpms)
153 if err != nil {
154 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
155 }
156 for _, file := range tpmf {
157 name := file.Name()
158 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
159 if err != nil {
160 return nil, fmt.Errorf("while resolving a path: %w", err)
161 }
162 tgt := filepath.Join(tpmt, name)
163 if err := copyFile(src, tgt); err != nil {
164 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
165 }
166 }
167
168 // Create the socket directory.
169 sotdp, err := os.MkdirTemp(sd, "node_sock*")
170 if err != nil {
171 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
172 }
173
174 return &NodeRuntime{
175 ld: stdp,
176 sd: sotdp,
177 }, nil
178}
179
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200180// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200181// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200182func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200183 if c.authClient == nil {
184 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, nil)
Serge Bazanski58ddc092022-06-30 18:23:33 +0200185 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200186 log.Printf("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200187 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200188 for _, n := range c.NodeIDs {
189 ep, err := resolver.NodeWithDefaultPort(n)
190 if err != nil {
191 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
192 }
193 r.AddEndpoint(ep)
194 }
195 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
196 grpc.WithTransportCredentials(authCreds),
197 grpc.WithResolvers(r),
198 grpc.WithContextDialer(c.DialNode),
199 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200200 if err != nil {
201 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
202 }
203 c.authClient = authClient
204 }
205 return c.authClient, nil
206}
207
Serge Bazanski66e58952021-10-05 17:06:56 +0200208// LaunchNode launches a single Metropolis node instance with the given options.
209// The instance runs mostly paravirtualized but with some emulated hardware
210// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200211// writable, and the changes are kept across reboots and shutdowns. ld and sd
212// point to the launch directory and the socket directory, holding the nodes'
213// state files (storage, tpm state, firmware state), and UNIX socket files
214// (swtpm <-> QEMU interplay) respectively. The directories must exist before
215// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
216// if either are not initialized.
217func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
218 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
219 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200220 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
221 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
222 // that we open and pass the name of it to QEMU. Not pinning this crashes both
223 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
224 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200225
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200226 // If it's the node's first start, set up its runtime directories.
227 if options.Runtime == nil {
228 r, err := setupRuntime(ld, sd)
229 if err != nil {
230 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200231 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200232 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200233 }
234
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200235 // Replace the node's context with a new one.
236 r := options.Runtime
237 if r.CtxC != nil {
238 r.CtxC()
239 }
240 r.ctxT, r.CtxC = context.WithCancel(ctx)
241
Serge Bazanski66e58952021-10-05 17:06:56 +0200242 var qemuNetType string
243 var qemuNetConfig launch.QemuValue
244 if options.ConnectToSocket != nil {
245 qemuNetType = "socket"
246 qemuNetConfig = launch.QemuValue{
247 "id": {"net0"},
248 "fd": {"3"},
249 }
250 } else {
251 qemuNetType = "user"
252 qemuNetConfig = launch.QemuValue{
253 "id": {"net0"},
254 "net": {"10.42.0.0/24"},
255 "dhcpstart": {"10.42.0.10"},
256 "hostfwd": options.Ports.ToQemuForwards(),
257 }
258 }
259
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200260 // Generate the node's MAC address if it isn't already set in NodeOptions.
261 if options.Mac == nil {
262 mac, err := generateRandomEthernetMAC()
263 if err != nil {
264 return err
265 }
266 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200267 }
268
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200269 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
270 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
271 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200272 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
273 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
274 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200275 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
276 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200277 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200278 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200279 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
280 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
281 "-device", "tpm-tis,tpmdev=tpm0",
282 "-device", "virtio-rng-pci",
283 "-serial", "stdio"}
284
285 if !options.AllowReboot {
286 qemuArgs = append(qemuArgs, "-no-reboot")
287 }
288
289 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200290 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200291 parametersRaw, err := proto.Marshal(options.NodeParameters)
292 if err != nil {
293 return fmt.Errorf("failed to encode node paraeters: %w", err)
294 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100295 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200296 return fmt.Errorf("failed to write node parameters: %w", err)
297 }
298 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
299 }
300
Leopoldacfad5b2023-01-15 14:05:25 +0100301 if options.PcapDump {
302 var qemuNetDump launch.QemuValue
303 pcapPath := filepath.Join(r.ld, "net0.pcap")
304 if options.PcapDump {
305 qemuNetDump = launch.QemuValue{
306 "id": {"net0"},
307 "netdev": {"net0"},
308 "file": {pcapPath},
309 }
310 }
311 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
312 }
313
Serge Bazanski66e58952021-10-05 17:06:56 +0200314 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200315 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200316 defer tpmCancel()
317
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200318 tpmd := filepath.Join(r.ld, "tpm")
319 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200320 tpmEmuCmd.Stderr = os.Stderr
321 tpmEmuCmd.Stdout = os.Stdout
322
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200323 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200324 if err != nil {
325 return fmt.Errorf("failed to start TPM emulator: %w", err)
326 }
327
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200328 // Wait for the socket to be created by the TPM emulator before launching
329 // QEMU.
330 for {
331 _, err := os.Stat(tpmSocketPath)
332 if err == nil {
333 break
334 }
335 if err != nil && !os.IsNotExist(err) {
336 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
337 }
338 if err := tpmCtx.Err(); err != nil {
339 return fmt.Errorf("while waiting for the TPM socket: %w", err)
340 }
341 time.Sleep(time.Millisecond * 100)
342 }
343
Serge Bazanski66e58952021-10-05 17:06:56 +0200344 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200345 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200346 if options.ConnectToSocket != nil {
347 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
348 }
349
350 var stdErrBuf bytes.Buffer
351 systemCmd.Stderr = &stdErrBuf
352 systemCmd.Stdout = options.SerialPort
353
Leopoldaf5086b2023-01-15 14:12:42 +0100354 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
355
Serge Bazanski66e58952021-10-05 17:06:56 +0200356 err = systemCmd.Run()
357
358 // Stop TPM emulator and wait for it to exit to properly reap the child process
359 tpmCancel()
360 log.Print("Node: Waiting for TPM emulator to exit")
361 // Wait returns a SIGKILL error because we just cancelled its context.
362 // We still need to call it to avoid creating zombies.
363 _ = tpmEmuCmd.Wait()
364 log.Print("Node: TPM emulator done")
365
366 var exerr *exec.ExitError
367 if err != nil && errors.As(err, &exerr) {
368 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
369 if status.Signaled() && status.Signal() == syscall.SIGKILL {
370 // Process was killed externally (most likely by our context being canceled).
371 // This is a normal exit for us, so return nil
372 return nil
373 }
374 exerr.Stderr = stdErrBuf.Bytes()
375 newErr := launch.QEMUError(*exerr)
376 return &newErr
377 }
378 return err
379}
380
381func copyFile(src, dst string) error {
382 in, err := os.Open(src)
383 if err != nil {
384 return fmt.Errorf("when opening source: %w", err)
385 }
386 defer in.Close()
387
388 out, err := os.Create(dst)
389 if err != nil {
390 return fmt.Errorf("when creating destination: %w", err)
391 }
392 defer out.Close()
393
394 _, err = io.Copy(out, in)
395 if err != nil {
396 return fmt.Errorf("when copying file: %w", err)
397 }
398 return out.Close()
399}
400
Serge Bazanskie78a0892021-10-07 17:03:49 +0200401// getNodes wraps around Management.GetNodes to return a list of nodes in a
402// cluster.
403func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200404 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100405 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100406 err := backoff.Retry(func() error {
407 res = nil
408 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200409 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100410 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200411 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100412 for {
413 node, err := srvN.Recv()
414 if err == io.EOF {
415 break
416 }
417 if err != nil {
418 return fmt.Errorf("GetNodes.Recv: %w", err)
419 }
420 res = append(res, node)
421 }
422 return nil
423 }, bo)
424 if err != nil {
425 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200426 }
427 return res, nil
428}
429
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200430// getNode wraps Management.GetNodes. It returns node information matching
431// given node ID.
432func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
433 nodes, err := getNodes(ctx, mgmt)
434 if err != nil {
435 return nil, fmt.Errorf("could not get nodes: %w", err)
436 }
437 for _, n := range nodes {
438 eid := identity.NodeID(n.Pubkey)
439 if eid != id {
440 continue
441 }
442 return n, nil
443 }
444 return nil, fmt.Errorf("no such node.")
445}
446
Serge Bazanski66e58952021-10-05 17:06:56 +0200447// Gets a random EUI-48 Ethernet MAC address
448func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
449 macBuf := make([]byte, 6)
450 _, err := rand.Read(macBuf)
451 if err != nil {
452 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
453 }
454
455 // Set U/L bit and clear I/G bit (locally administered individual MAC)
456 // Ref IEEE 802-2014 Section 8.2.2
457 macBuf[0] = (macBuf[0] | 2) & 0xfe
458 mac := net.HardwareAddr(macBuf)
459 return &mac, nil
460}
461
Serge Bazanskibe742842022-04-04 13:18:50 +0200462const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200463
Serge Bazanskibe742842022-04-04 13:18:50 +0200464// ClusterPorts contains all ports handled by Nanoswitch.
465var ClusterPorts = []uint16{
466 // Forwarded to the first node.
467 uint16(node.CuratorServicePort),
468 uint16(node.DebugServicePort),
469 uint16(node.KubernetesAPIPort),
470 uint16(node.KubernetesAPIWrappedPort),
471
472 // SOCKS proxy to the switch network
473 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200474}
475
476// ClusterOptions contains all options for launching a Metropolis cluster.
477type ClusterOptions struct {
478 // The number of nodes this cluster should be started with.
479 NumNodes int
480}
481
482// Cluster is the running Metropolis cluster launched using the LaunchCluster
483// function.
484type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200485 // Owner is the TLS Certificate of the owner of the test cluster. This can be
486 // used to authenticate further clients to the running cluster.
487 Owner tls.Certificate
488 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200489 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200490 Ports launch.PortMap
491
Serge Bazanskibe742842022-04-04 13:18:50 +0200492 // Nodes is a map from Node ID to its runtime information.
493 Nodes map[string]*NodeInCluster
494 // NodeIDs is a list of node IDs that are backing this cluster, in order of
495 // creation.
496 NodeIDs []string
497
Serge Bazanski66e58952021-10-05 17:06:56 +0200498 // nodesDone is a list of channels populated with the return codes from all the
499 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100500 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200501 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200502 // nodeOpts are the cluster member nodes' mutable launch options, kept here
503 // to facilitate reboots.
504 nodeOpts []NodeOptions
505 // launchDir points at the directory keeping the nodes' state, such as storage
506 // images, firmware variable files, TPM state.
507 launchDir string
508 // socketDir points at the directory keeping UNIX socket files, such as these
509 // used to facilitate communication between QEMU and swtpm. It's different
510 // from launchDir, and anchored nearer the file system root, due to the
511 // socket path length limitation imposed by the kernel.
512 socketDir string
513
Serge Bazanskibe742842022-04-04 13:18:50 +0200514 // socksDialer is used by DialNode to establish connections to nodes via the
515 // SOCKS server ran by nanoswitch.
516 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200517
518 // authClient is a cached authenticated owner connection to a Curator
519 // instance within the cluster.
520 authClient *grpc.ClientConn
521
522 // ctxT is the context individual node contexts are created from.
523 ctxT context.Context
524 // ctxC is used by Close to cancel the context under which the nodes are
525 // running.
526 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200527}
528
529// NodeInCluster represents information about a node that's part of a Cluster.
530type NodeInCluster struct {
531 // ID of the node, which can be used to dial this node's services via DialNode.
532 ID string
533 // Address of the node on the network ran by nanoswitch. Not reachable from the
534 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
535 // on Cluster.Ports[SOCKSPort]).
536 ManagementAddress string
537}
538
539// firstConnection performs the initial owner credential escrow with a newly
540// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
541// running at 10.1.0.2, which is always the case with the current nanoswitch
542// implementation.
543//
Leopold20a036e2023-01-15 00:17:19 +0100544// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200545// information as NodeInCluster.
546func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
547 // Dial external service.
548 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
549 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
550 if err != nil {
551 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
552 }
553 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
554 return socksDialer.Dial("tcp", addr)
555 }
556 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
557 if err != nil {
558 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
559 }
560 defer initClient.Close()
561
562 // Retrieve owner certificate - this can take a while because the node is still
563 // coming up, so do it in a backoff loop.
564 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
565 aaa := apb.NewAAAClient(initClient)
566 var cert *tls.Certificate
567 err = backoff.Retry(func() error {
568 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
569 if st, ok := status.FromError(err); ok {
570 if st.Code() == codes.Unavailable {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200571 log.Printf("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200572 return err
573 }
574 }
575 return backoff.Permanent(err)
576 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
577 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200578 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200579 }
580 log.Printf("Cluster: retrieved owner certificate.")
581
582 // Now connect authenticated and get the node ID.
583 creds := rpc.NewAuthenticatedCredentials(*cert, nil)
584 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
585 if err != nil {
586 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
587 }
588 defer authClient.Close()
589 mgmt := apb.NewManagementClient(authClient)
590
591 var node *NodeInCluster
592 err = backoff.Retry(func() error {
593 nodes, err := getNodes(ctx, mgmt)
594 if err != nil {
595 return fmt.Errorf("retrieving nodes failed: %w", err)
596 }
597 if len(nodes) != 1 {
598 return fmt.Errorf("expected one node, got %d", len(nodes))
599 }
600 n := nodes[0]
601 if n.Status == nil || n.Status.ExternalAddress == "" {
602 return fmt.Errorf("node has no status and/or address")
603 }
604 node = &NodeInCluster{
605 ID: identity.NodeID(n.Pubkey),
606 ManagementAddress: n.Status.ExternalAddress,
607 }
608 return nil
609 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
610 if err != nil {
611 return nil, nil, err
612 }
613
614 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200615}
616
617// LaunchCluster launches a cluster of Metropolis node VMs together with a
618// Nanoswitch instance to network them all together.
619//
620// The given context will be used to run all qemu instances in the cluster, and
621// canceling the context or calling Close() will terminate them.
622func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200623 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200624 return nil, errors.New("refusing to start cluster with zero nodes")
625 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200626
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200627 // Create the launch directory.
628 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster*")
629 if err != nil {
630 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
631 }
632 // Create the socket directory.
633 sd, err := os.MkdirTemp("/tmp", "cluster*")
634 if err != nil {
635 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
636 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200637
638 // Prepare links between nodes and nanoswitch.
639 var switchPorts []*os.File
640 var vmPorts []*os.File
641 for i := 0; i < opts.NumNodes; i++ {
642 switchPort, vmPort, err := launch.NewSocketPair()
643 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200644 return nil, fmt.Errorf("failed to get socketpair: %w", err)
645 }
646 switchPorts = append(switchPorts, switchPort)
647 vmPorts = append(vmPorts, vmPort)
648 }
649
Serge Bazanskie78a0892021-10-07 17:03:49 +0200650 // Make a list of channels that will be populated by all running node qemu
651 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200652 done := make([]chan error, opts.NumNodes)
653 for i, _ := range done {
654 done[i] = make(chan error, 1)
655 }
656
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200657 // Prepare the node options. These will be kept as part of Cluster.
658 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
659 // launch. The runtime information can be later used to restart a node.
660 // The 0th node will be initialized first. The rest will follow after it
661 // had bootstrapped the cluster.
662 nodeOpts := make([]NodeOptions, opts.NumNodes)
663 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100664 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200665 ConnectToSocket: vmPorts[0],
666 NodeParameters: &apb.NodeParameters{
667 Cluster: &apb.NodeParameters_ClusterBootstrap_{
668 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
669 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200670 },
671 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200672 },
673 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100674 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200675 }
676
677 // Start the first node.
678 ctxT, ctxC := context.WithCancel(ctx)
679 log.Printf("Cluster: Starting node %d...", 1)
680 go func() {
681 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200682 if err != nil {
683 log.Printf("Node %d finished with an error: %v", 1, err)
684 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200685 done[0] <- err
686 }()
687
Serge Bazanskie78a0892021-10-07 17:03:49 +0200688 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200689 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
690 if err != nil {
691 ctxC()
692 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
693 }
694
695 go func() {
696 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100697 Name: "nanoswitch [99]",
Serge Bazanski66e58952021-10-05 17:06:56 +0200698 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100699 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200700 ExtraNetworkInterfaces: switchPorts,
701 PortMap: portMap,
Serge Bazanski1fbc5972022-06-22 13:36:16 +0200702 SerialPort: newPrefixedStdio(99),
Leopoldacfad5b2023-01-15 14:05:25 +0100703 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200704 }); err != nil {
705 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100706 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200707 }
708 }
709 }()
710
Serge Bazanskibe742842022-04-04 13:18:50 +0200711 // Build SOCKS dialer.
712 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
713 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200714 if err != nil {
715 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200716 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200717 }
718
Serge Bazanskibe742842022-04-04 13:18:50 +0200719 // Retrieve owner credentials and first node.
720 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200721 if err != nil {
722 ctxC()
723 return nil, err
724 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200725
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200726 // Set up a partially initialized cluster instance, to be filled in in the
727 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200728 cluster := &Cluster{
729 Owner: *cert,
730 Ports: portMap,
731 Nodes: map[string]*NodeInCluster{
732 firstNode.ID: firstNode,
733 },
734 NodeIDs: []string{
735 firstNode.ID,
736 },
737
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200738 nodesDone: done,
739 nodeOpts: nodeOpts,
740 launchDir: ld,
741 socketDir: sd,
742
Serge Bazanskibe742842022-04-04 13:18:50 +0200743 socksDialer: socksDialer,
744
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200745 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200746 ctxC: ctxC,
747 }
748
749 // Now start the rest of the nodes and register them into the cluster.
750
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200751 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200752 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200753 if err != nil {
754 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200755 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200756 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200757 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200758
759 // Retrieve register ticket to register further nodes.
760 log.Printf("Cluster: retrieving register ticket...")
761 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
762 if err != nil {
763 ctxC()
764 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
765 }
766 ticket := resT.Ticket
767 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
768
769 // Retrieve cluster info (for directory and ca public key) to register further
770 // nodes.
771 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
772 if err != nil {
773 ctxC()
774 return nil, fmt.Errorf("GetClusterInfo: %w", err)
775 }
776
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200777 // Use the retrieved information to configure the rest of the node options.
778 for i := 1; i < opts.NumNodes; i++ {
779 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100780 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200781 ConnectToSocket: vmPorts[i],
782 NodeParameters: &apb.NodeParameters{
783 Cluster: &apb.NodeParameters_ClusterRegister_{
784 ClusterRegister: &apb.NodeParameters_ClusterRegister{
785 RegisterTicket: ticket,
786 ClusterDirectory: resI.ClusterDirectory,
787 CaCertificate: resI.CaCertificate,
788 },
789 },
790 },
791 SerialPort: newPrefixedStdio(i),
792 }
793 }
794
795 // Now run the rest of the nodes.
796 //
Serge Bazanskie78a0892021-10-07 17:03:49 +0200797 // TODO(q3k): parallelize this
798 for i := 1; i < opts.NumNodes; i++ {
799 log.Printf("Cluster: Starting node %d...", i+1)
800 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200801 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200802 if err != nil {
803 log.Printf("Node %d finished with an error: %v", i, err)
804 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200805 done[i] <- err
806 }(i)
807 var newNode *apb.Node
808
809 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
810 for {
811 nodes, err := getNodes(ctx, mgmt)
812 if err != nil {
813 ctxC()
814 return nil, fmt.Errorf("could not get nodes: %w", err)
815 }
816 for _, n := range nodes {
817 if n.State == cpb.NodeState_NODE_STATE_NEW {
818 newNode = n
819 break
820 }
821 }
822 if newNode != nil {
823 break
824 }
825 time.Sleep(1 * time.Second)
826 }
827 id := identity.NodeID(newNode.Pubkey)
828 log.Printf("Cluster: node %d is %s", i, id)
829
830 log.Printf("Cluster: approving node %d", i)
831 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
832 Pubkey: newNode.Pubkey,
833 })
834 if err != nil {
835 ctxC()
836 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
837 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200838 log.Printf("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200839 for {
840 nodes, err := getNodes(ctx, mgmt)
841 if err != nil {
842 ctxC()
843 return nil, fmt.Errorf("could not get nodes: %w", err)
844 }
845 found := false
846 for _, n := range nodes {
847 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
848 continue
849 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200850 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200851 break
852 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200853 if n.State != cpb.NodeState_NODE_STATE_UP {
854 break
855 }
856 found = true
857 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
858 ID: identity.NodeID(n.Pubkey),
859 ManagementAddress: n.Status.ExternalAddress,
860 }
861 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
862 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200863 }
864 if found {
865 break
866 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200867 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200868 }
869 log.Printf("Cluster: node %d (%s) UP!", i, id)
870 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200871
Serge Bazanskibe742842022-04-04 13:18:50 +0200872 log.Printf("Cluster: all nodes up:")
873 for _, node := range cluster.Nodes {
874 log.Printf("Cluster: - %s at %s", node.ID, node.ManagementAddress)
875 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200876
Serge Bazanskibe742842022-04-04 13:18:50 +0200877 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200878}
879
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200880// RebootNode reboots the cluster member node matching the given index, and
881// waits for it to rejoin the cluster. It will use the given context ctx to run
882// cluster API requests, whereas the resulting QEMU process will be created
883// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
884func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
885 if idx < 0 || idx >= len(c.NodeIDs) {
886 return fmt.Errorf("index out of bounds.")
887 }
888 id := c.NodeIDs[idx]
889
890 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200891 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200892 if err != nil {
893 return err
894 }
895 mgmt := apb.NewManagementClient(curC)
896
897 // Get the timestamp of the node's last update, as observed by Curator.
898 // It'll be needed to make sure it had rejoined the cluster after the reboot.
899 var is *apb.Node
900 for {
901 r, err := getNode(ctx, mgmt, id)
902 if err != nil {
903 return err
904 }
905
906 // Node status may be absent if it hasn't reported to the cluster yet. Wait
907 // for it to appear before progressing further.
908 if r.Status != nil {
909 is = r
910 break
911 }
912 time.Sleep(time.Second)
913 }
914
915 // Cancel the node's context. This will shut down QEMU.
916 c.nodeOpts[idx].Runtime.CtxC()
917 log.Printf("Cluster: waiting for node %d (%s) to stop.", idx, id)
918 err = <-c.nodesDone[idx]
919 if err != nil {
920 return fmt.Errorf("while restarting node: %w", err)
921 }
922
923 // Start QEMU again.
924 log.Printf("Cluster: restarting node %d (%s).", idx, id)
925 go func(n int) {
926 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200927 if err != nil {
928 log.Printf("Node %d finished with an error: %v", n, err)
929 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200930 c.nodesDone[n] <- err
931 }(idx)
932
933 // Poll Management.GetNodes until the node's timestamp is updated.
934 for {
935 cs, err := getNode(ctx, mgmt, id)
936 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200937 log.Printf("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200938 return err
939 }
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200940 log.Printf("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200941 if cs.Status == nil {
942 continue
943 }
Mateusz Zalega28800ad2022-07-08 14:56:02 +0200944 if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200945 break
946 }
947 time.Sleep(time.Second)
948 }
949 log.Printf("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
950 return nil
951}
952
953// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +0200954// nodes to stop. It returns an error if stopping the nodes failed, or one of
955// the nodes failed to fully start in the first place.
956func (c *Cluster) Close() error {
957 log.Printf("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200958 if c.authClient != nil {
959 c.authClient.Close()
960 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200961 c.ctxC()
962
Leopold20a036e2023-01-15 00:17:19 +0100963 var errs []error
Serge Bazanski66e58952021-10-05 17:06:56 +0200964 log.Printf("Cluster: waiting for nodes to exit...")
965 for _, c := range c.nodesDone {
966 err := <-c
967 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +0100968 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200969 }
970 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200971 log.Printf("Cluster: removing nodes' state files.")
972 os.RemoveAll(c.launchDir)
973 os.RemoveAll(c.socketDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200974 log.Printf("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +0100975 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200976}
Serge Bazanskibe742842022-04-04 13:18:50 +0200977
978// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
979// their ID. This is performed by connecting to the cluster nanoswitch via its
980// SOCKS proxy, and using the cluster node list for name resolution.
981//
982// For example:
983//
984// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
985//
986func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
987 host, port, err := net.SplitHostPort(addr)
988 if err != nil {
989 return nil, fmt.Errorf("invalid host:port: %w", err)
990 }
991 // Already an IP address?
992 if net.ParseIP(host) != nil {
993 return c.socksDialer.Dial("tcp", addr)
994 }
995
996 // Otherwise, expect a node name.
997 node, ok := c.Nodes[host]
998 if !ok {
999 return nil, fmt.Errorf("unknown node %q", host)
1000 }
1001 addr = net.JoinHostPort(node.ManagementAddress, port)
1002 return c.socksDialer.Dial("tcp", addr)
1003}