blob: 18cab253ec5c6b003479e7813c65814864289d31 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
10 "crypto/rand"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "log"
16 "net"
17 "os"
18 "os/exec"
19 "path/filepath"
20 "syscall"
21 "time"
22
23 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020024 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020025 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "google.golang.org/protobuf/proto"
30
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020031 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020032 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020033 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020035 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020037 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020038 "source.monogon.dev/metropolis/test/launch"
39)
40
Leopold20a036e2023-01-15 00:17:19 +010041// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020042type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010043 // Name is a human-readable identifier to be used in debug output.
44 Name string
45
Serge Bazanski66e58952021-10-05 17:06:56 +020046 // Ports contains the port mapping where to expose the internal ports of the VM to
47 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
48 // ConnectToSocket is set.
49 Ports launch.PortMap
50
Leopold20a036e2023-01-15 00:17:19 +010051 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
52 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020053 // want to test reboot behavior this should be false.
54 AllowReboot bool
55
Leopold20a036e2023-01-15 00:17:19 +010056 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
57 // set, it is instead connected to the given file descriptor/socket. If this is
58 // set, all port maps from the Ports option are ignored. Intended for networking
59 // this instance together with others for running more complex network
60 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020061 ConnectToSocket *os.File
62
Leopold20a036e2023-01-15 00:17:19 +010063 // SerialPort is an io.ReadWriter over which you can communicate with the serial
64 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020065 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
66 SerialPort io.ReadWriter
67
68 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
69 // registering into a cluster.
70 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020071
72 // Mac is the node's MAC address.
73 Mac *net.HardwareAddr
74
75 // Runtime keeps the node's QEMU runtime state.
76 Runtime *NodeRuntime
77}
78
Leopold20a036e2023-01-15 00:17:19 +010079// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020080type NodeRuntime struct {
81 // ld points at the node's launch directory storing data such as storage
82 // images, firmware variables or the TPM state.
83 ld string
84 // sd points at the node's socket directory.
85 sd string
86
87 // ctxT is the context QEMU will execute in.
88 ctxT context.Context
89 // CtxC is the QEMU context's cancellation function.
90 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +020091}
92
93// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +020094var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +020095 node.ConsensusPort,
96
97 node.CuratorServicePort,
98 node.DebugServicePort,
99
100 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100101 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200102 node.CuratorServicePort,
103 node.DebuggerPort,
104}
105
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200106// setupRuntime creates the node's QEMU runtime directory, together with all
107// files required to preserve its state, a level below the chosen path ld. The
108// node's socket directory is similarily created a level below sd. It may
109// return an I/O error.
110func setupRuntime(ld, sd string) (*NodeRuntime, error) {
111 // Create a temporary directory to keep all the runtime files.
112 stdp, err := os.MkdirTemp(ld, "node_state*")
113 if err != nil {
114 return nil, fmt.Errorf("failed to create the state directory: %w", err)
115 }
116
117 // Initialize the node's storage with a prebuilt image.
118 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
119 if err != nil {
120 return nil, fmt.Errorf("while resolving a path: %w", err)
121 }
122 di := filepath.Join(stdp, filepath.Base(si))
123 log.Printf("Cluster: copying the node image: %s -> %s", si, di)
124 if err := copyFile(si, di); err != nil {
125 return nil, fmt.Errorf("while copying the node image: %w", err)
126 }
127
128 // Initialize the OVMF firmware variables file.
129 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
130 if err != nil {
131 return nil, fmt.Errorf("while resolving a path: %w", err)
132 }
133 dv := filepath.Join(stdp, filepath.Base(sv))
134 if err := copyFile(sv, dv); err != nil {
135 return nil, fmt.Errorf("while copying firmware variables: %w", err)
136 }
137
138 // Create the TPM state directory and initialize all files required by swtpm.
139 tpmt := filepath.Join(stdp, "tpm")
140 if err := os.Mkdir(tpmt, 0755); err != nil {
141 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
142 }
143 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
144 if err != nil {
145 return nil, fmt.Errorf("while resolving a path: %w", err)
146 }
147 tpmf, err := os.ReadDir(tpms)
148 if err != nil {
149 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
150 }
151 for _, file := range tpmf {
152 name := file.Name()
153 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
154 if err != nil {
155 return nil, fmt.Errorf("while resolving a path: %w", err)
156 }
157 tgt := filepath.Join(tpmt, name)
158 if err := copyFile(src, tgt); err != nil {
159 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
160 }
161 }
162
163 // Create the socket directory.
164 sotdp, err := os.MkdirTemp(sd, "node_sock*")
165 if err != nil {
166 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
167 }
168
169 return &NodeRuntime{
170 ld: stdp,
171 sd: sotdp,
172 }, nil
173}
174
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200175// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200176// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200177func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200178 if c.authClient == nil {
179 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, nil)
Serge Bazanski58ddc092022-06-30 18:23:33 +0200180 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200181 log.Printf("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200182 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200183 for _, n := range c.NodeIDs {
184 ep, err := resolver.NodeWithDefaultPort(n)
185 if err != nil {
186 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
187 }
188 r.AddEndpoint(ep)
189 }
190 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
191 grpc.WithTransportCredentials(authCreds),
192 grpc.WithResolvers(r),
193 grpc.WithContextDialer(c.DialNode),
194 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200195 if err != nil {
196 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
197 }
198 c.authClient = authClient
199 }
200 return c.authClient, nil
201}
202
Serge Bazanski66e58952021-10-05 17:06:56 +0200203// LaunchNode launches a single Metropolis node instance with the given options.
204// The instance runs mostly paravirtualized but with some emulated hardware
205// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200206// writable, and the changes are kept across reboots and shutdowns. ld and sd
207// point to the launch directory and the socket directory, holding the nodes'
208// state files (storage, tpm state, firmware state), and UNIX socket files
209// (swtpm <-> QEMU interplay) respectively. The directories must exist before
210// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
211// if either are not initialized.
212func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
213 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
214 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200215 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
216 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
217 // that we open and pass the name of it to QEMU. Not pinning this crashes both
218 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
219 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200220
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200221 // If it's the node's first start, set up its runtime directories.
222 if options.Runtime == nil {
223 r, err := setupRuntime(ld, sd)
224 if err != nil {
225 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200226 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200227 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200228 }
229
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200230 // Replace the node's context with a new one.
231 r := options.Runtime
232 if r.CtxC != nil {
233 r.CtxC()
234 }
235 r.ctxT, r.CtxC = context.WithCancel(ctx)
236
Serge Bazanski66e58952021-10-05 17:06:56 +0200237 var qemuNetType string
238 var qemuNetConfig launch.QemuValue
239 if options.ConnectToSocket != nil {
240 qemuNetType = "socket"
241 qemuNetConfig = launch.QemuValue{
242 "id": {"net0"},
243 "fd": {"3"},
244 }
245 } else {
246 qemuNetType = "user"
247 qemuNetConfig = launch.QemuValue{
248 "id": {"net0"},
249 "net": {"10.42.0.0/24"},
250 "dhcpstart": {"10.42.0.10"},
251 "hostfwd": options.Ports.ToQemuForwards(),
252 }
253 }
254
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200255 // Generate the node's MAC address if it isn't already set in NodeOptions.
256 if options.Mac == nil {
257 mac, err := generateRandomEthernetMAC()
258 if err != nil {
259 return err
260 }
261 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200262 }
263
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200264 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
265 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
266 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200267 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
268 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
269 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200270 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
271 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200272 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200273 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200274 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
275 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
276 "-device", "tpm-tis,tpmdev=tpm0",
277 "-device", "virtio-rng-pci",
278 "-serial", "stdio"}
279
280 if !options.AllowReboot {
281 qemuArgs = append(qemuArgs, "-no-reboot")
282 }
283
284 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200285 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200286 parametersRaw, err := proto.Marshal(options.NodeParameters)
287 if err != nil {
288 return fmt.Errorf("failed to encode node paraeters: %w", err)
289 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100290 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200291 return fmt.Errorf("failed to write node parameters: %w", err)
292 }
293 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
294 }
295
296 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200297 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200298 defer tpmCancel()
299
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200300 tpmd := filepath.Join(r.ld, "tpm")
301 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200302 tpmEmuCmd.Stderr = os.Stderr
303 tpmEmuCmd.Stdout = os.Stdout
304
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200305 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200306 if err != nil {
307 return fmt.Errorf("failed to start TPM emulator: %w", err)
308 }
309
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200310 // Wait for the socket to be created by the TPM emulator before launching
311 // QEMU.
312 for {
313 _, err := os.Stat(tpmSocketPath)
314 if err == nil {
315 break
316 }
317 if err != nil && !os.IsNotExist(err) {
318 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
319 }
320 if err := tpmCtx.Err(); err != nil {
321 return fmt.Errorf("while waiting for the TPM socket: %w", err)
322 }
323 time.Sleep(time.Millisecond * 100)
324 }
325
Serge Bazanski66e58952021-10-05 17:06:56 +0200326 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200327 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200328 if options.ConnectToSocket != nil {
329 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
330 }
331
332 var stdErrBuf bytes.Buffer
333 systemCmd.Stderr = &stdErrBuf
334 systemCmd.Stdout = options.SerialPort
335
Leopoldaf5086b2023-01-15 14:12:42 +0100336 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
337
Serge Bazanski66e58952021-10-05 17:06:56 +0200338 err = systemCmd.Run()
339
340 // Stop TPM emulator and wait for it to exit to properly reap the child process
341 tpmCancel()
342 log.Print("Node: Waiting for TPM emulator to exit")
343 // Wait returns a SIGKILL error because we just cancelled its context.
344 // We still need to call it to avoid creating zombies.
345 _ = tpmEmuCmd.Wait()
346 log.Print("Node: TPM emulator done")
347
348 var exerr *exec.ExitError
349 if err != nil && errors.As(err, &exerr) {
350 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
351 if status.Signaled() && status.Signal() == syscall.SIGKILL {
352 // Process was killed externally (most likely by our context being canceled).
353 // This is a normal exit for us, so return nil
354 return nil
355 }
356 exerr.Stderr = stdErrBuf.Bytes()
357 newErr := launch.QEMUError(*exerr)
358 return &newErr
359 }
360 return err
361}
362
363func copyFile(src, dst string) error {
364 in, err := os.Open(src)
365 if err != nil {
366 return fmt.Errorf("when opening source: %w", err)
367 }
368 defer in.Close()
369
370 out, err := os.Create(dst)
371 if err != nil {
372 return fmt.Errorf("when creating destination: %w", err)
373 }
374 defer out.Close()
375
376 _, err = io.Copy(out, in)
377 if err != nil {
378 return fmt.Errorf("when copying file: %w", err)
379 }
380 return out.Close()
381}
382
Serge Bazanskie78a0892021-10-07 17:03:49 +0200383// getNodes wraps around Management.GetNodes to return a list of nodes in a
384// cluster.
385func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200386 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100387 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100388 err := backoff.Retry(func() error {
389 res = nil
390 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200391 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100392 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200393 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100394 for {
395 node, err := srvN.Recv()
396 if err == io.EOF {
397 break
398 }
399 if err != nil {
400 return fmt.Errorf("GetNodes.Recv: %w", err)
401 }
402 res = append(res, node)
403 }
404 return nil
405 }, bo)
406 if err != nil {
407 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200408 }
409 return res, nil
410}
411
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200412// getNode wraps Management.GetNodes. It returns node information matching
413// given node ID.
414func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
415 nodes, err := getNodes(ctx, mgmt)
416 if err != nil {
417 return nil, fmt.Errorf("could not get nodes: %w", err)
418 }
419 for _, n := range nodes {
420 eid := identity.NodeID(n.Pubkey)
421 if eid != id {
422 continue
423 }
424 return n, nil
425 }
426 return nil, fmt.Errorf("no such node.")
427}
428
Serge Bazanski66e58952021-10-05 17:06:56 +0200429// Gets a random EUI-48 Ethernet MAC address
430func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
431 macBuf := make([]byte, 6)
432 _, err := rand.Read(macBuf)
433 if err != nil {
434 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
435 }
436
437 // Set U/L bit and clear I/G bit (locally administered individual MAC)
438 // Ref IEEE 802-2014 Section 8.2.2
439 macBuf[0] = (macBuf[0] | 2) & 0xfe
440 mac := net.HardwareAddr(macBuf)
441 return &mac, nil
442}
443
Serge Bazanskibe742842022-04-04 13:18:50 +0200444const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200445
Serge Bazanskibe742842022-04-04 13:18:50 +0200446// ClusterPorts contains all ports handled by Nanoswitch.
447var ClusterPorts = []uint16{
448 // Forwarded to the first node.
449 uint16(node.CuratorServicePort),
450 uint16(node.DebugServicePort),
451 uint16(node.KubernetesAPIPort),
452 uint16(node.KubernetesAPIWrappedPort),
453
454 // SOCKS proxy to the switch network
455 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200456}
457
458// ClusterOptions contains all options for launching a Metropolis cluster.
459type ClusterOptions struct {
460 // The number of nodes this cluster should be started with.
461 NumNodes int
462}
463
464// Cluster is the running Metropolis cluster launched using the LaunchCluster
465// function.
466type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200467 // Owner is the TLS Certificate of the owner of the test cluster. This can be
468 // used to authenticate further clients to the running cluster.
469 Owner tls.Certificate
470 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200471 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200472 Ports launch.PortMap
473
Serge Bazanskibe742842022-04-04 13:18:50 +0200474 // Nodes is a map from Node ID to its runtime information.
475 Nodes map[string]*NodeInCluster
476 // NodeIDs is a list of node IDs that are backing this cluster, in order of
477 // creation.
478 NodeIDs []string
479
Serge Bazanski66e58952021-10-05 17:06:56 +0200480 // nodesDone is a list of channels populated with the return codes from all the
481 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100482 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200483 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200484 // nodeOpts are the cluster member nodes' mutable launch options, kept here
485 // to facilitate reboots.
486 nodeOpts []NodeOptions
487 // launchDir points at the directory keeping the nodes' state, such as storage
488 // images, firmware variable files, TPM state.
489 launchDir string
490 // socketDir points at the directory keeping UNIX socket files, such as these
491 // used to facilitate communication between QEMU and swtpm. It's different
492 // from launchDir, and anchored nearer the file system root, due to the
493 // socket path length limitation imposed by the kernel.
494 socketDir string
495
Serge Bazanskibe742842022-04-04 13:18:50 +0200496 // socksDialer is used by DialNode to establish connections to nodes via the
497 // SOCKS server ran by nanoswitch.
498 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200499
500 // authClient is a cached authenticated owner connection to a Curator
501 // instance within the cluster.
502 authClient *grpc.ClientConn
503
504 // ctxT is the context individual node contexts are created from.
505 ctxT context.Context
506 // ctxC is used by Close to cancel the context under which the nodes are
507 // running.
508 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200509}
510
511// NodeInCluster represents information about a node that's part of a Cluster.
512type NodeInCluster struct {
513 // ID of the node, which can be used to dial this node's services via DialNode.
514 ID string
515 // Address of the node on the network ran by nanoswitch. Not reachable from the
516 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
517 // on Cluster.Ports[SOCKSPort]).
518 ManagementAddress string
519}
520
521// firstConnection performs the initial owner credential escrow with a newly
522// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
523// running at 10.1.0.2, which is always the case with the current nanoswitch
524// implementation.
525//
Leopold20a036e2023-01-15 00:17:19 +0100526// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200527// information as NodeInCluster.
528func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
529 // Dial external service.
530 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
531 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
532 if err != nil {
533 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
534 }
535 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
536 return socksDialer.Dial("tcp", addr)
537 }
538 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
539 if err != nil {
540 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
541 }
542 defer initClient.Close()
543
544 // Retrieve owner certificate - this can take a while because the node is still
545 // coming up, so do it in a backoff loop.
546 log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
547 aaa := apb.NewAAAClient(initClient)
548 var cert *tls.Certificate
549 err = backoff.Retry(func() error {
550 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
551 if st, ok := status.FromError(err); ok {
552 if st.Code() == codes.Unavailable {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200553 log.Printf("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200554 return err
555 }
556 }
557 return backoff.Permanent(err)
558 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
559 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200560 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200561 }
562 log.Printf("Cluster: retrieved owner certificate.")
563
564 // Now connect authenticated and get the node ID.
565 creds := rpc.NewAuthenticatedCredentials(*cert, nil)
566 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
567 if err != nil {
568 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
569 }
570 defer authClient.Close()
571 mgmt := apb.NewManagementClient(authClient)
572
573 var node *NodeInCluster
574 err = backoff.Retry(func() error {
575 nodes, err := getNodes(ctx, mgmt)
576 if err != nil {
577 return fmt.Errorf("retrieving nodes failed: %w", err)
578 }
579 if len(nodes) != 1 {
580 return fmt.Errorf("expected one node, got %d", len(nodes))
581 }
582 n := nodes[0]
583 if n.Status == nil || n.Status.ExternalAddress == "" {
584 return fmt.Errorf("node has no status and/or address")
585 }
586 node = &NodeInCluster{
587 ID: identity.NodeID(n.Pubkey),
588 ManagementAddress: n.Status.ExternalAddress,
589 }
590 return nil
591 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
592 if err != nil {
593 return nil, nil, err
594 }
595
596 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200597}
598
599// LaunchCluster launches a cluster of Metropolis node VMs together with a
600// Nanoswitch instance to network them all together.
601//
602// The given context will be used to run all qemu instances in the cluster, and
603// canceling the context or calling Close() will terminate them.
604func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200605 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200606 return nil, errors.New("refusing to start cluster with zero nodes")
607 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200608
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200609 // Create the launch directory.
610 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster*")
611 if err != nil {
612 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
613 }
614 // Create the socket directory.
615 sd, err := os.MkdirTemp("/tmp", "cluster*")
616 if err != nil {
617 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
618 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200619
620 // Prepare links between nodes and nanoswitch.
621 var switchPorts []*os.File
622 var vmPorts []*os.File
623 for i := 0; i < opts.NumNodes; i++ {
624 switchPort, vmPort, err := launch.NewSocketPair()
625 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200626 return nil, fmt.Errorf("failed to get socketpair: %w", err)
627 }
628 switchPorts = append(switchPorts, switchPort)
629 vmPorts = append(vmPorts, vmPort)
630 }
631
Serge Bazanskie78a0892021-10-07 17:03:49 +0200632 // Make a list of channels that will be populated by all running node qemu
633 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200634 done := make([]chan error, opts.NumNodes)
635 for i, _ := range done {
636 done[i] = make(chan error, 1)
637 }
638
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200639 // Prepare the node options. These will be kept as part of Cluster.
640 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
641 // launch. The runtime information can be later used to restart a node.
642 // The 0th node will be initialized first. The rest will follow after it
643 // had bootstrapped the cluster.
644 nodeOpts := make([]NodeOptions, opts.NumNodes)
645 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100646 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200647 ConnectToSocket: vmPorts[0],
648 NodeParameters: &apb.NodeParameters{
649 Cluster: &apb.NodeParameters_ClusterBootstrap_{
650 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
651 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200652 },
653 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200654 },
655 SerialPort: newPrefixedStdio(0),
656 }
657
658 // Start the first node.
659 ctxT, ctxC := context.WithCancel(ctx)
660 log.Printf("Cluster: Starting node %d...", 1)
661 go func() {
662 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200663 if err != nil {
664 log.Printf("Node %d finished with an error: %v", 1, err)
665 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200666 done[0] <- err
667 }()
668
Serge Bazanskie78a0892021-10-07 17:03:49 +0200669 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200670 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
671 if err != nil {
672 ctxC()
673 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
674 }
675
676 go func() {
677 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100678 Name: "nanoswitch [99]",
Serge Bazanski66e58952021-10-05 17:06:56 +0200679 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100680 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200681 ExtraNetworkInterfaces: switchPorts,
682 PortMap: portMap,
Serge Bazanski1fbc5972022-06-22 13:36:16 +0200683 SerialPort: newPrefixedStdio(99),
Serge Bazanski66e58952021-10-05 17:06:56 +0200684 }); err != nil {
685 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski075465c2021-11-16 15:38:49 +0100686 log.Fatalf("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200687 }
688 }
689 }()
690
Serge Bazanskibe742842022-04-04 13:18:50 +0200691 // Build SOCKS dialer.
692 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
693 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200694 if err != nil {
695 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200696 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200697 }
698
Serge Bazanskibe742842022-04-04 13:18:50 +0200699 // Retrieve owner credentials and first node.
700 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200701 if err != nil {
702 ctxC()
703 return nil, err
704 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200705
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200706 // Set up a partially initialized cluster instance, to be filled in in the
707 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200708 cluster := &Cluster{
709 Owner: *cert,
710 Ports: portMap,
711 Nodes: map[string]*NodeInCluster{
712 firstNode.ID: firstNode,
713 },
714 NodeIDs: []string{
715 firstNode.ID,
716 },
717
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200718 nodesDone: done,
719 nodeOpts: nodeOpts,
720 launchDir: ld,
721 socketDir: sd,
722
Serge Bazanskibe742842022-04-04 13:18:50 +0200723 socksDialer: socksDialer,
724
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200725 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200726 ctxC: ctxC,
727 }
728
729 // Now start the rest of the nodes and register them into the cluster.
730
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200731 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200732 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200733 if err != nil {
734 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200735 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200736 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200737 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200738
739 // Retrieve register ticket to register further nodes.
740 log.Printf("Cluster: retrieving register ticket...")
741 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
742 if err != nil {
743 ctxC()
744 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
745 }
746 ticket := resT.Ticket
747 log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
748
749 // Retrieve cluster info (for directory and ca public key) to register further
750 // nodes.
751 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
752 if err != nil {
753 ctxC()
754 return nil, fmt.Errorf("GetClusterInfo: %w", err)
755 }
756
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200757 // Use the retrieved information to configure the rest of the node options.
758 for i := 1; i < opts.NumNodes; i++ {
759 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100760 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200761 ConnectToSocket: vmPorts[i],
762 NodeParameters: &apb.NodeParameters{
763 Cluster: &apb.NodeParameters_ClusterRegister_{
764 ClusterRegister: &apb.NodeParameters_ClusterRegister{
765 RegisterTicket: ticket,
766 ClusterDirectory: resI.ClusterDirectory,
767 CaCertificate: resI.CaCertificate,
768 },
769 },
770 },
771 SerialPort: newPrefixedStdio(i),
772 }
773 }
774
775 // Now run the rest of the nodes.
776 //
Serge Bazanskie78a0892021-10-07 17:03:49 +0200777 // TODO(q3k): parallelize this
778 for i := 1; i < opts.NumNodes; i++ {
779 log.Printf("Cluster: Starting node %d...", i+1)
780 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200781 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200782 if err != nil {
783 log.Printf("Node %d finished with an error: %v", i, err)
784 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200785 done[i] <- err
786 }(i)
787 var newNode *apb.Node
788
789 log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
790 for {
791 nodes, err := getNodes(ctx, mgmt)
792 if err != nil {
793 ctxC()
794 return nil, fmt.Errorf("could not get nodes: %w", err)
795 }
796 for _, n := range nodes {
797 if n.State == cpb.NodeState_NODE_STATE_NEW {
798 newNode = n
799 break
800 }
801 }
802 if newNode != nil {
803 break
804 }
805 time.Sleep(1 * time.Second)
806 }
807 id := identity.NodeID(newNode.Pubkey)
808 log.Printf("Cluster: node %d is %s", i, id)
809
810 log.Printf("Cluster: approving node %d", i)
811 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
812 Pubkey: newNode.Pubkey,
813 })
814 if err != nil {
815 ctxC()
816 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
817 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200818 log.Printf("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200819 for {
820 nodes, err := getNodes(ctx, mgmt)
821 if err != nil {
822 ctxC()
823 return nil, fmt.Errorf("could not get nodes: %w", err)
824 }
825 found := false
826 for _, n := range nodes {
827 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
828 continue
829 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200830 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200831 break
832 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200833 if n.State != cpb.NodeState_NODE_STATE_UP {
834 break
835 }
836 found = true
837 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
838 ID: identity.NodeID(n.Pubkey),
839 ManagementAddress: n.Status.ExternalAddress,
840 }
841 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
842 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200843 }
844 if found {
845 break
846 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200847 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200848 }
849 log.Printf("Cluster: node %d (%s) UP!", i, id)
850 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200851
Serge Bazanskibe742842022-04-04 13:18:50 +0200852 log.Printf("Cluster: all nodes up:")
853 for _, node := range cluster.Nodes {
854 log.Printf("Cluster: - %s at %s", node.ID, node.ManagementAddress)
855 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200856
Serge Bazanskibe742842022-04-04 13:18:50 +0200857 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200858}
859
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200860// RebootNode reboots the cluster member node matching the given index, and
861// waits for it to rejoin the cluster. It will use the given context ctx to run
862// cluster API requests, whereas the resulting QEMU process will be created
863// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
864func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
865 if idx < 0 || idx >= len(c.NodeIDs) {
866 return fmt.Errorf("index out of bounds.")
867 }
868 id := c.NodeIDs[idx]
869
870 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200871 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200872 if err != nil {
873 return err
874 }
875 mgmt := apb.NewManagementClient(curC)
876
877 // Get the timestamp of the node's last update, as observed by Curator.
878 // It'll be needed to make sure it had rejoined the cluster after the reboot.
879 var is *apb.Node
880 for {
881 r, err := getNode(ctx, mgmt, id)
882 if err != nil {
883 return err
884 }
885
886 // Node status may be absent if it hasn't reported to the cluster yet. Wait
887 // for it to appear before progressing further.
888 if r.Status != nil {
889 is = r
890 break
891 }
892 time.Sleep(time.Second)
893 }
894
895 // Cancel the node's context. This will shut down QEMU.
896 c.nodeOpts[idx].Runtime.CtxC()
897 log.Printf("Cluster: waiting for node %d (%s) to stop.", idx, id)
898 err = <-c.nodesDone[idx]
899 if err != nil {
900 return fmt.Errorf("while restarting node: %w", err)
901 }
902
903 // Start QEMU again.
904 log.Printf("Cluster: restarting node %d (%s).", idx, id)
905 go func(n int) {
906 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200907 if err != nil {
908 log.Printf("Node %d finished with an error: %v", n, err)
909 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200910 c.nodesDone[n] <- err
911 }(idx)
912
913 // Poll Management.GetNodes until the node's timestamp is updated.
914 for {
915 cs, err := getNode(ctx, mgmt, id)
916 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200917 log.Printf("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200918 return err
919 }
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200920 log.Printf("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200921 if cs.Status == nil {
922 continue
923 }
Mateusz Zalega28800ad2022-07-08 14:56:02 +0200924 if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200925 break
926 }
927 time.Sleep(time.Second)
928 }
929 log.Printf("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
930 return nil
931}
932
933// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +0200934// nodes to stop. It returns an error if stopping the nodes failed, or one of
935// the nodes failed to fully start in the first place.
936func (c *Cluster) Close() error {
937 log.Printf("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200938 if c.authClient != nil {
939 c.authClient.Close()
940 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200941 c.ctxC()
942
Leopold20a036e2023-01-15 00:17:19 +0100943 var errs []error
Serge Bazanski66e58952021-10-05 17:06:56 +0200944 log.Printf("Cluster: waiting for nodes to exit...")
945 for _, c := range c.nodesDone {
946 err := <-c
947 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +0100948 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200949 }
950 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200951 log.Printf("Cluster: removing nodes' state files.")
952 os.RemoveAll(c.launchDir)
953 os.RemoveAll(c.socketDir)
Serge Bazanski66e58952021-10-05 17:06:56 +0200954 log.Printf("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +0100955 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200956}
Serge Bazanskibe742842022-04-04 13:18:50 +0200957
958// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
959// their ID. This is performed by connecting to the cluster nanoswitch via its
960// SOCKS proxy, and using the cluster node list for name resolution.
961//
962// For example:
963//
964// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
965//
966func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
967 host, port, err := net.SplitHostPort(addr)
968 if err != nil {
969 return nil, fmt.Errorf("invalid host:port: %w", err)
970 }
971 // Already an IP address?
972 if net.ParseIP(host) != nil {
973 return c.socksDialer.Dial("tcp", addr)
974 }
975
976 // Otherwise, expect a node name.
977 node, ok := c.Nodes[host]
978 if !ok {
979 return nil, fmt.Errorf("unknown node %q", host)
980 }
981 addr = net.JoinHostPort(node.ManagementAddress, port)
982 return c.socksDialer.Dial("tcp", addr)
983}