blob: bfeb8775aa6d9fdb0ca7d6653f7b7fbe53f8ceb3 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
13 "errors"
14 "fmt"
15 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020016 "net"
17 "os"
18 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010019 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "path/filepath"
21 "syscall"
22 "time"
23
24 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020025 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020026 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020027 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "google.golang.org/protobuf/proto"
31
Serge Bazanski1f8cad72023-03-20 16:58:10 +010032 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020033 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020035 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020037 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Serge Bazanski66e58952021-10-05 17:06:56 +020038 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020039 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020040 "source.monogon.dev/metropolis/test/launch"
41)
42
Leopold20a036e2023-01-15 00:17:19 +010043// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020044type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010045 // Name is a human-readable identifier to be used in debug output.
46 Name string
47
Serge Bazanski66e58952021-10-05 17:06:56 +020048 // Ports contains the port mapping where to expose the internal ports of the VM to
49 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
50 // ConnectToSocket is set.
51 Ports launch.PortMap
52
Leopold20a036e2023-01-15 00:17:19 +010053 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
54 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020055 // want to test reboot behavior this should be false.
56 AllowReboot bool
57
Leopold20a036e2023-01-15 00:17:19 +010058 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
59 // set, it is instead connected to the given file descriptor/socket. If this is
60 // set, all port maps from the Ports option are ignored. Intended for networking
61 // this instance together with others for running more complex network
62 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020063 ConnectToSocket *os.File
64
Leopoldacfad5b2023-01-15 14:05:25 +010065 // When PcapDump is set, all traffic is dumped to a pcap file in the
66 // runtime directory (e.g. "net0.pcap" for the first interface).
67 PcapDump bool
68
Leopold20a036e2023-01-15 00:17:19 +010069 // SerialPort is an io.ReadWriter over which you can communicate with the serial
70 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020071 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
72 SerialPort io.ReadWriter
73
74 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
75 // registering into a cluster.
76 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020077
78 // Mac is the node's MAC address.
79 Mac *net.HardwareAddr
80
81 // Runtime keeps the node's QEMU runtime state.
82 Runtime *NodeRuntime
83}
84
Leopold20a036e2023-01-15 00:17:19 +010085// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020086type NodeRuntime struct {
87 // ld points at the node's launch directory storing data such as storage
88 // images, firmware variables or the TPM state.
89 ld string
90 // sd points at the node's socket directory.
91 sd string
92
93 // ctxT is the context QEMU will execute in.
94 ctxT context.Context
95 // CtxC is the QEMU context's cancellation function.
96 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +020097}
98
99// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200100var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200101 node.ConsensusPort,
102
103 node.CuratorServicePort,
104 node.DebugServicePort,
105
106 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100107 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200108 node.CuratorServicePort,
109 node.DebuggerPort,
110}
111
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200112// setupRuntime creates the node's QEMU runtime directory, together with all
113// files required to preserve its state, a level below the chosen path ld. The
114// node's socket directory is similarily created a level below sd. It may
115// return an I/O error.
116func setupRuntime(ld, sd string) (*NodeRuntime, error) {
117 // Create a temporary directory to keep all the runtime files.
118 stdp, err := os.MkdirTemp(ld, "node_state*")
119 if err != nil {
120 return nil, fmt.Errorf("failed to create the state directory: %w", err)
121 }
122
123 // Initialize the node's storage with a prebuilt image.
124 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
125 if err != nil {
126 return nil, fmt.Errorf("while resolving a path: %w", err)
127 }
128 di := filepath.Join(stdp, filepath.Base(si))
Serge Bazanski05f813b2023-03-16 17:58:39 +0100129 launch.Log("Cluster: copying node image: %s -> %s", si, di)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200130 if err := copyFile(si, di); err != nil {
131 return nil, fmt.Errorf("while copying the node image: %w", err)
132 }
133
134 // Initialize the OVMF firmware variables file.
135 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
136 if err != nil {
137 return nil, fmt.Errorf("while resolving a path: %w", err)
138 }
139 dv := filepath.Join(stdp, filepath.Base(sv))
140 if err := copyFile(sv, dv); err != nil {
141 return nil, fmt.Errorf("while copying firmware variables: %w", err)
142 }
143
144 // Create the TPM state directory and initialize all files required by swtpm.
145 tpmt := filepath.Join(stdp, "tpm")
146 if err := os.Mkdir(tpmt, 0755); err != nil {
147 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
148 }
149 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
150 if err != nil {
151 return nil, fmt.Errorf("while resolving a path: %w", err)
152 }
153 tpmf, err := os.ReadDir(tpms)
154 if err != nil {
155 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
156 }
157 for _, file := range tpmf {
158 name := file.Name()
159 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
160 if err != nil {
161 return nil, fmt.Errorf("while resolving a path: %w", err)
162 }
163 tgt := filepath.Join(tpmt, name)
164 if err := copyFile(src, tgt); err != nil {
165 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
166 }
167 }
168
169 // Create the socket directory.
170 sotdp, err := os.MkdirTemp(sd, "node_sock*")
171 if err != nil {
172 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
173 }
174
175 return &NodeRuntime{
176 ld: stdp,
177 sd: sotdp,
178 }, nil
179}
180
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200181// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200182// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200183func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200184 if c.authClient == nil {
185 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, nil)
Serge Bazanski58ddc092022-06-30 18:23:33 +0200186 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100187 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200188 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200189 for _, n := range c.NodeIDs {
190 ep, err := resolver.NodeWithDefaultPort(n)
191 if err != nil {
192 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
193 }
194 r.AddEndpoint(ep)
195 }
196 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
197 grpc.WithTransportCredentials(authCreds),
198 grpc.WithResolvers(r),
199 grpc.WithContextDialer(c.DialNode),
200 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200201 if err != nil {
202 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
203 }
204 c.authClient = authClient
205 }
206 return c.authClient, nil
207}
208
Serge Bazanski66e58952021-10-05 17:06:56 +0200209// LaunchNode launches a single Metropolis node instance with the given options.
210// The instance runs mostly paravirtualized but with some emulated hardware
211// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200212// writable, and the changes are kept across reboots and shutdowns. ld and sd
213// point to the launch directory and the socket directory, holding the nodes'
214// state files (storage, tpm state, firmware state), and UNIX socket files
215// (swtpm <-> QEMU interplay) respectively. The directories must exist before
216// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
217// if either are not initialized.
218func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
219 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
220 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200221 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
222 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
223 // that we open and pass the name of it to QEMU. Not pinning this crashes both
224 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
225 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200226
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200227 // If it's the node's first start, set up its runtime directories.
228 if options.Runtime == nil {
229 r, err := setupRuntime(ld, sd)
230 if err != nil {
231 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200232 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200233 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200234 }
235
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200236 // Replace the node's context with a new one.
237 r := options.Runtime
238 if r.CtxC != nil {
239 r.CtxC()
240 }
241 r.ctxT, r.CtxC = context.WithCancel(ctx)
242
Serge Bazanski66e58952021-10-05 17:06:56 +0200243 var qemuNetType string
244 var qemuNetConfig launch.QemuValue
245 if options.ConnectToSocket != nil {
246 qemuNetType = "socket"
247 qemuNetConfig = launch.QemuValue{
248 "id": {"net0"},
249 "fd": {"3"},
250 }
251 } else {
252 qemuNetType = "user"
253 qemuNetConfig = launch.QemuValue{
254 "id": {"net0"},
255 "net": {"10.42.0.0/24"},
256 "dhcpstart": {"10.42.0.10"},
257 "hostfwd": options.Ports.ToQemuForwards(),
258 }
259 }
260
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200261 // Generate the node's MAC address if it isn't already set in NodeOptions.
262 if options.Mac == nil {
263 mac, err := generateRandomEthernetMAC()
264 if err != nil {
265 return err
266 }
267 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200268 }
269
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200270 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
271 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
272 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200273 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
274 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
275 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200276 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
277 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200278 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200279 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200280 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
281 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
282 "-device", "tpm-tis,tpmdev=tpm0",
283 "-device", "virtio-rng-pci",
284 "-serial", "stdio"}
285
286 if !options.AllowReboot {
287 qemuArgs = append(qemuArgs, "-no-reboot")
288 }
289
290 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200291 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200292 parametersRaw, err := proto.Marshal(options.NodeParameters)
293 if err != nil {
294 return fmt.Errorf("failed to encode node paraeters: %w", err)
295 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100296 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200297 return fmt.Errorf("failed to write node parameters: %w", err)
298 }
299 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
300 }
301
Leopoldacfad5b2023-01-15 14:05:25 +0100302 if options.PcapDump {
303 var qemuNetDump launch.QemuValue
304 pcapPath := filepath.Join(r.ld, "net0.pcap")
305 if options.PcapDump {
306 qemuNetDump = launch.QemuValue{
307 "id": {"net0"},
308 "netdev": {"net0"},
309 "file": {pcapPath},
310 }
311 }
312 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
313 }
314
Serge Bazanski66e58952021-10-05 17:06:56 +0200315 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200316 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200317 defer tpmCancel()
318
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200319 tpmd := filepath.Join(r.ld, "tpm")
320 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200321 tpmEmuCmd.Stderr = os.Stderr
322 tpmEmuCmd.Stdout = os.Stdout
323
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200324 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200325 if err != nil {
326 return fmt.Errorf("failed to start TPM emulator: %w", err)
327 }
328
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200329 // Wait for the socket to be created by the TPM emulator before launching
330 // QEMU.
331 for {
332 _, err := os.Stat(tpmSocketPath)
333 if err == nil {
334 break
335 }
336 if err != nil && !os.IsNotExist(err) {
337 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
338 }
339 if err := tpmCtx.Err(); err != nil {
340 return fmt.Errorf("while waiting for the TPM socket: %w", err)
341 }
342 time.Sleep(time.Millisecond * 100)
343 }
344
Serge Bazanski66e58952021-10-05 17:06:56 +0200345 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200346 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200347 if options.ConnectToSocket != nil {
348 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
349 }
350
351 var stdErrBuf bytes.Buffer
352 systemCmd.Stderr = &stdErrBuf
353 systemCmd.Stdout = options.SerialPort
354
Leopoldaf5086b2023-01-15 14:12:42 +0100355 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
356
Serge Bazanski66e58952021-10-05 17:06:56 +0200357 err = systemCmd.Run()
358
359 // Stop TPM emulator and wait for it to exit to properly reap the child process
360 tpmCancel()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100361 launch.Log("Node: Waiting for TPM emulator to exit")
Serge Bazanski66e58952021-10-05 17:06:56 +0200362 // Wait returns a SIGKILL error because we just cancelled its context.
363 // We still need to call it to avoid creating zombies.
364 _ = tpmEmuCmd.Wait()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100365 launch.Log("Node: TPM emulator done")
Serge Bazanski66e58952021-10-05 17:06:56 +0200366
367 var exerr *exec.ExitError
368 if err != nil && errors.As(err, &exerr) {
369 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
370 if status.Signaled() && status.Signal() == syscall.SIGKILL {
371 // Process was killed externally (most likely by our context being canceled).
372 // This is a normal exit for us, so return nil
373 return nil
374 }
375 exerr.Stderr = stdErrBuf.Bytes()
376 newErr := launch.QEMUError(*exerr)
377 return &newErr
378 }
379 return err
380}
381
382func copyFile(src, dst string) error {
383 in, err := os.Open(src)
384 if err != nil {
385 return fmt.Errorf("when opening source: %w", err)
386 }
387 defer in.Close()
388
389 out, err := os.Create(dst)
390 if err != nil {
391 return fmt.Errorf("when creating destination: %w", err)
392 }
393 defer out.Close()
394
395 _, err = io.Copy(out, in)
396 if err != nil {
397 return fmt.Errorf("when copying file: %w", err)
398 }
399 return out.Close()
400}
401
Serge Bazanskie78a0892021-10-07 17:03:49 +0200402// getNodes wraps around Management.GetNodes to return a list of nodes in a
403// cluster.
404func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200405 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100406 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100407 err := backoff.Retry(func() error {
408 res = nil
409 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200410 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100411 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200412 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100413 for {
414 node, err := srvN.Recv()
415 if err == io.EOF {
416 break
417 }
418 if err != nil {
419 return fmt.Errorf("GetNodes.Recv: %w", err)
420 }
421 res = append(res, node)
422 }
423 return nil
424 }, bo)
425 if err != nil {
426 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200427 }
428 return res, nil
429}
430
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200431// getNode wraps Management.GetNodes. It returns node information matching
432// given node ID.
433func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
434 nodes, err := getNodes(ctx, mgmt)
435 if err != nil {
436 return nil, fmt.Errorf("could not get nodes: %w", err)
437 }
438 for _, n := range nodes {
439 eid := identity.NodeID(n.Pubkey)
440 if eid != id {
441 continue
442 }
443 return n, nil
444 }
445 return nil, fmt.Errorf("no such node.")
446}
447
Serge Bazanski66e58952021-10-05 17:06:56 +0200448// Gets a random EUI-48 Ethernet MAC address
449func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
450 macBuf := make([]byte, 6)
451 _, err := rand.Read(macBuf)
452 if err != nil {
453 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
454 }
455
456 // Set U/L bit and clear I/G bit (locally administered individual MAC)
457 // Ref IEEE 802-2014 Section 8.2.2
458 macBuf[0] = (macBuf[0] | 2) & 0xfe
459 mac := net.HardwareAddr(macBuf)
460 return &mac, nil
461}
462
Serge Bazanskibe742842022-04-04 13:18:50 +0200463const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200464
Serge Bazanskibe742842022-04-04 13:18:50 +0200465// ClusterPorts contains all ports handled by Nanoswitch.
466var ClusterPorts = []uint16{
467 // Forwarded to the first node.
468 uint16(node.CuratorServicePort),
469 uint16(node.DebugServicePort),
470 uint16(node.KubernetesAPIPort),
471 uint16(node.KubernetesAPIWrappedPort),
472
473 // SOCKS proxy to the switch network
474 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200475}
476
477// ClusterOptions contains all options for launching a Metropolis cluster.
478type ClusterOptions struct {
479 // The number of nodes this cluster should be started with.
480 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100481
482 // If true, node logs will be saved to individual files instead of being printed
483 // out to stderr. The path of these files will be still printed to stdout.
484 //
485 // The files will be located within the launch directory inside TEST_TMPDIR (or
486 // the default tempdir location, if not set).
487 NodeLogsToFiles bool
Serge Bazanski66e58952021-10-05 17:06:56 +0200488}
489
490// Cluster is the running Metropolis cluster launched using the LaunchCluster
491// function.
492type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200493 // Owner is the TLS Certificate of the owner of the test cluster. This can be
494 // used to authenticate further clients to the running cluster.
495 Owner tls.Certificate
496 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200497 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200498 Ports launch.PortMap
499
Serge Bazanskibe742842022-04-04 13:18:50 +0200500 // Nodes is a map from Node ID to its runtime information.
501 Nodes map[string]*NodeInCluster
502 // NodeIDs is a list of node IDs that are backing this cluster, in order of
503 // creation.
504 NodeIDs []string
505
Serge Bazanski66e58952021-10-05 17:06:56 +0200506 // nodesDone is a list of channels populated with the return codes from all the
507 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100508 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200509 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200510 // nodeOpts are the cluster member nodes' mutable launch options, kept here
511 // to facilitate reboots.
512 nodeOpts []NodeOptions
513 // launchDir points at the directory keeping the nodes' state, such as storage
514 // images, firmware variable files, TPM state.
515 launchDir string
516 // socketDir points at the directory keeping UNIX socket files, such as these
517 // used to facilitate communication between QEMU and swtpm. It's different
518 // from launchDir, and anchored nearer the file system root, due to the
519 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100520 socketDir string
521 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200522
Serge Bazanskibe742842022-04-04 13:18:50 +0200523 // socksDialer is used by DialNode to establish connections to nodes via the
524 // SOCKS server ran by nanoswitch.
525 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200526
527 // authClient is a cached authenticated owner connection to a Curator
528 // instance within the cluster.
529 authClient *grpc.ClientConn
530
531 // ctxT is the context individual node contexts are created from.
532 ctxT context.Context
533 // ctxC is used by Close to cancel the context under which the nodes are
534 // running.
535 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200536}
537
538// NodeInCluster represents information about a node that's part of a Cluster.
539type NodeInCluster struct {
540 // ID of the node, which can be used to dial this node's services via DialNode.
541 ID string
542 // Address of the node on the network ran by nanoswitch. Not reachable from the
543 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
544 // on Cluster.Ports[SOCKSPort]).
545 ManagementAddress string
546}
547
548// firstConnection performs the initial owner credential escrow with a newly
549// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
550// running at 10.1.0.2, which is always the case with the current nanoswitch
551// implementation.
552//
Leopold20a036e2023-01-15 00:17:19 +0100553// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200554// information as NodeInCluster.
555func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
556 // Dial external service.
557 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
558 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
559 if err != nil {
560 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
561 }
562 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
563 return socksDialer.Dial("tcp", addr)
564 }
565 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
566 if err != nil {
567 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
568 }
569 defer initClient.Close()
570
571 // Retrieve owner certificate - this can take a while because the node is still
572 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100573 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200574 aaa := apb.NewAAAClient(initClient)
575 var cert *tls.Certificate
576 err = backoff.Retry(func() error {
577 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
578 if st, ok := status.FromError(err); ok {
579 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100580 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200581 return err
582 }
583 }
584 return backoff.Permanent(err)
585 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
586 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200587 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200588 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100589 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200590
591 // Now connect authenticated and get the node ID.
592 creds := rpc.NewAuthenticatedCredentials(*cert, nil)
593 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
594 if err != nil {
595 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
596 }
597 defer authClient.Close()
598 mgmt := apb.NewManagementClient(authClient)
599
600 var node *NodeInCluster
601 err = backoff.Retry(func() error {
602 nodes, err := getNodes(ctx, mgmt)
603 if err != nil {
604 return fmt.Errorf("retrieving nodes failed: %w", err)
605 }
606 if len(nodes) != 1 {
607 return fmt.Errorf("expected one node, got %d", len(nodes))
608 }
609 n := nodes[0]
610 if n.Status == nil || n.Status.ExternalAddress == "" {
611 return fmt.Errorf("node has no status and/or address")
612 }
613 node = &NodeInCluster{
614 ID: identity.NodeID(n.Pubkey),
615 ManagementAddress: n.Status.ExternalAddress,
616 }
617 return nil
618 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
619 if err != nil {
620 return nil, nil, err
621 }
622
623 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200624}
625
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100626func NewSerialFileLogger(p string) (io.ReadWriter, error) {
627 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0600)
628 if err != nil {
629 return nil, err
630 }
631 return f, nil
632}
633
Serge Bazanski66e58952021-10-05 17:06:56 +0200634// LaunchCluster launches a cluster of Metropolis node VMs together with a
635// Nanoswitch instance to network them all together.
636//
637// The given context will be used to run all qemu instances in the cluster, and
638// canceling the context or calling Close() will terminate them.
639func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200640 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200641 return nil, errors.New("refusing to start cluster with zero nodes")
642 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200643
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200644 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100645 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200646 if err != nil {
647 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
648 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100649 // Create the metroctl config directory. We keep it in /tmp because in some
650 // scenarios it's end-user visible and we want it short.
651 md, err := os.MkdirTemp("/tmp", "metroctl-*")
652 if err != nil {
653 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
654 }
655
656 // Create the socket directory. We keep it in /tmp because of socket path limits.
657 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200658 if err != nil {
659 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
660 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200661
662 // Prepare links between nodes and nanoswitch.
663 var switchPorts []*os.File
664 var vmPorts []*os.File
665 for i := 0; i < opts.NumNodes; i++ {
666 switchPort, vmPort, err := launch.NewSocketPair()
667 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200668 return nil, fmt.Errorf("failed to get socketpair: %w", err)
669 }
670 switchPorts = append(switchPorts, switchPort)
671 vmPorts = append(vmPorts, vmPort)
672 }
673
Serge Bazanskie78a0892021-10-07 17:03:49 +0200674 // Make a list of channels that will be populated by all running node qemu
675 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200676 done := make([]chan error, opts.NumNodes)
677 for i, _ := range done {
678 done[i] = make(chan error, 1)
679 }
680
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200681 // Prepare the node options. These will be kept as part of Cluster.
682 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
683 // launch. The runtime information can be later used to restart a node.
684 // The 0th node will be initialized first. The rest will follow after it
685 // had bootstrapped the cluster.
686 nodeOpts := make([]NodeOptions, opts.NumNodes)
687 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100688 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200689 ConnectToSocket: vmPorts[0],
690 NodeParameters: &apb.NodeParameters{
691 Cluster: &apb.NodeParameters_ClusterBootstrap_{
692 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
693 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200694 },
695 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200696 },
697 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100698 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200699 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100700 if opts.NodeLogsToFiles {
701 path := path.Join(ld, "node-1.txt")
702 port, err := NewSerialFileLogger(path)
703 if err != nil {
704 return nil, fmt.Errorf("could not open log file for node 1: %w", err)
705 }
706 launch.Log("Node 1 logs at %s", path)
707 nodeOpts[0].SerialPort = port
708 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200709
710 // Start the first node.
711 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100712 launch.Log("Cluster: Starting node %d...", 1)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200713 go func() {
714 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200715 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100716 launch.Log("Node %d finished with an error: %v", 1, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200717 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200718 done[0] <- err
719 }()
720
Serge Bazanskie78a0892021-10-07 17:03:49 +0200721 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200722 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
723 if err != nil {
724 ctxC()
725 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
726 }
727
728 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100729 var serialPort io.ReadWriter
730 if opts.NodeLogsToFiles {
731 path := path.Join(ld, "nanoswitch.txt")
732 serialPort, err = NewSerialFileLogger(path)
733 if err != nil {
734 launch.Log("Could not open log file for nanoswitch: %v", err)
735 }
736 launch.Log("Nanoswitch logs at %s", path)
737 } else {
738 serialPort = newPrefixedStdio(99)
739 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200740 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100741 Name: "nanoswitch",
Serge Bazanski66e58952021-10-05 17:06:56 +0200742 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100743 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200744 ExtraNetworkInterfaces: switchPorts,
745 PortMap: portMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100746 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100747 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200748 }); err != nil {
749 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100750 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200751 }
752 }
753 }()
754
Serge Bazanskibe742842022-04-04 13:18:50 +0200755 // Build SOCKS dialer.
756 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
757 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200758 if err != nil {
759 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200760 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200761 }
762
Serge Bazanskibe742842022-04-04 13:18:50 +0200763 // Retrieve owner credentials and first node.
764 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200765 if err != nil {
766 ctxC()
767 return nil, err
768 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200769
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100770 // Write credentials to the metroctl directory.
771 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
772 ctxC()
773 return nil, fmt.Errorf("could not write owner key: %w", err)
774 }
775 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
776 ctxC()
777 return nil, fmt.Errorf("could not write owner certificate: %w", err)
778 }
779
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200780 // Set up a partially initialized cluster instance, to be filled in in the
781 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200782 cluster := &Cluster{
783 Owner: *cert,
784 Ports: portMap,
785 Nodes: map[string]*NodeInCluster{
786 firstNode.ID: firstNode,
787 },
788 NodeIDs: []string{
789 firstNode.ID,
790 },
791
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100792 nodesDone: done,
793 nodeOpts: nodeOpts,
794 launchDir: ld,
795 socketDir: sd,
796 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200797
Serge Bazanskibe742842022-04-04 13:18:50 +0200798 socksDialer: socksDialer,
799
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200800 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200801 ctxC: ctxC,
802 }
803
804 // Now start the rest of the nodes and register them into the cluster.
805
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200806 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200807 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200808 if err != nil {
809 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200810 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200811 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200812 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200813
814 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100815 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200816 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
817 if err != nil {
818 ctxC()
819 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
820 }
821 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100822 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200823
824 // Retrieve cluster info (for directory and ca public key) to register further
825 // nodes.
826 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
827 if err != nil {
828 ctxC()
829 return nil, fmt.Errorf("GetClusterInfo: %w", err)
830 }
831
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200832 // Use the retrieved information to configure the rest of the node options.
833 for i := 1; i < opts.NumNodes; i++ {
834 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100835 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200836 ConnectToSocket: vmPorts[i],
837 NodeParameters: &apb.NodeParameters{
838 Cluster: &apb.NodeParameters_ClusterRegister_{
839 ClusterRegister: &apb.NodeParameters_ClusterRegister{
840 RegisterTicket: ticket,
841 ClusterDirectory: resI.ClusterDirectory,
842 CaCertificate: resI.CaCertificate,
843 },
844 },
845 },
846 SerialPort: newPrefixedStdio(i),
847 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100848 if opts.NodeLogsToFiles {
849 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
850 port, err := NewSerialFileLogger(path)
851 if err != nil {
852 return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
853 }
854 launch.Log("Node %d logs at %s", i+1, path)
855 nodeOpts[i].SerialPort = port
856 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200857 }
858
859 // Now run the rest of the nodes.
860 //
Serge Bazanskie78a0892021-10-07 17:03:49 +0200861 // TODO(q3k): parallelize this
862 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100863 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200864 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200865 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200866 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100867 launch.Log("Node %d finished with an error: %v", i, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200868 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200869 done[i] <- err
870 }(i)
871 var newNode *apb.Node
872
Serge Bazanski05f813b2023-03-16 17:58:39 +0100873 launch.Log("Cluster: waiting for node %d to appear as NEW...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200874 for {
875 nodes, err := getNodes(ctx, mgmt)
876 if err != nil {
877 ctxC()
878 return nil, fmt.Errorf("could not get nodes: %w", err)
879 }
880 for _, n := range nodes {
881 if n.State == cpb.NodeState_NODE_STATE_NEW {
882 newNode = n
883 break
884 }
885 }
886 if newNode != nil {
887 break
888 }
889 time.Sleep(1 * time.Second)
890 }
891 id := identity.NodeID(newNode.Pubkey)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100892 launch.Log("Cluster: node %d is %s", i, id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200893
Serge Bazanski05f813b2023-03-16 17:58:39 +0100894 launch.Log("Cluster: approving node %d", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200895 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
896 Pubkey: newNode.Pubkey,
897 })
898 if err != nil {
899 ctxC()
900 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
901 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100902 launch.Log("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200903 for {
904 nodes, err := getNodes(ctx, mgmt)
905 if err != nil {
906 ctxC()
907 return nil, fmt.Errorf("could not get nodes: %w", err)
908 }
909 found := false
910 for _, n := range nodes {
911 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
912 continue
913 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200914 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200915 break
916 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200917 if n.State != cpb.NodeState_NODE_STATE_UP {
918 break
919 }
920 found = true
921 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
922 ID: identity.NodeID(n.Pubkey),
923 ManagementAddress: n.Status.ExternalAddress,
924 }
925 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
926 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200927 }
928 if found {
929 break
930 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200931 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200932 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100933 launch.Log("Cluster: node %d (%s) UP!", i, id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200934 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200935
Serge Bazanski05f813b2023-03-16 17:58:39 +0100936 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +0200937 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100938 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +0200939 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200940
Serge Bazanskibe742842022-04-04 13:18:50 +0200941 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200942}
943
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200944// RebootNode reboots the cluster member node matching the given index, and
945// waits for it to rejoin the cluster. It will use the given context ctx to run
946// cluster API requests, whereas the resulting QEMU process will be created
947// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
948func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
949 if idx < 0 || idx >= len(c.NodeIDs) {
950 return fmt.Errorf("index out of bounds.")
951 }
952 id := c.NodeIDs[idx]
953
954 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200955 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200956 if err != nil {
957 return err
958 }
959 mgmt := apb.NewManagementClient(curC)
960
961 // Get the timestamp of the node's last update, as observed by Curator.
962 // It'll be needed to make sure it had rejoined the cluster after the reboot.
963 var is *apb.Node
964 for {
965 r, err := getNode(ctx, mgmt, id)
966 if err != nil {
967 return err
968 }
969
970 // Node status may be absent if it hasn't reported to the cluster yet. Wait
971 // for it to appear before progressing further.
972 if r.Status != nil {
973 is = r
974 break
975 }
976 time.Sleep(time.Second)
977 }
978
979 // Cancel the node's context. This will shut down QEMU.
980 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100981 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200982 err = <-c.nodesDone[idx]
983 if err != nil {
984 return fmt.Errorf("while restarting node: %w", err)
985 }
986
987 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100988 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200989 go func(n int) {
990 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200991 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100992 launch.Log("Node %d finished with an error: %v", n, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200993 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200994 c.nodesDone[n] <- err
995 }(idx)
996
997 // Poll Management.GetNodes until the node's timestamp is updated.
998 for {
999 cs, err := getNode(ctx, mgmt, id)
1000 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001001 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001002 return err
1003 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001004 launch.Log("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001005 if cs.Status == nil {
1006 continue
1007 }
Mateusz Zalega28800ad2022-07-08 14:56:02 +02001008 if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001009 break
1010 }
1011 time.Sleep(time.Second)
1012 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001013 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001014 return nil
1015}
1016
1017// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001018// nodes to stop. It returns an error if stopping the nodes failed, or one of
1019// the nodes failed to fully start in the first place.
1020func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001021 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001022 if c.authClient != nil {
1023 c.authClient.Close()
1024 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001025 c.ctxC()
1026
Leopold20a036e2023-01-15 00:17:19 +01001027 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001028 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001029 for _, c := range c.nodesDone {
1030 err := <-c
1031 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001032 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001033 }
1034 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001035 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001036 os.RemoveAll(c.launchDir)
1037 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001038 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001039 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001040 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001041}
Serge Bazanskibe742842022-04-04 13:18:50 +02001042
1043// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1044// their ID. This is performed by connecting to the cluster nanoswitch via its
1045// SOCKS proxy, and using the cluster node list for name resolution.
1046//
1047// For example:
1048//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001049// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001050func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1051 host, port, err := net.SplitHostPort(addr)
1052 if err != nil {
1053 return nil, fmt.Errorf("invalid host:port: %w", err)
1054 }
1055 // Already an IP address?
1056 if net.ParseIP(host) != nil {
1057 return c.socksDialer.Dial("tcp", addr)
1058 }
1059
1060 // Otherwise, expect a node name.
1061 node, ok := c.Nodes[host]
1062 if !ok {
1063 return nil, fmt.Errorf("unknown node %q", host)
1064 }
1065 addr = net.JoinHostPort(node.ManagementAddress, port)
1066 return c.socksDialer.Dial("tcp", addr)
1067}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001068
1069// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1070// which are currently Kubernetes controllers, ie. run an apiserver. This list
1071// might be empty if no node is currently configured with the
1072// 'KubernetesController' node.
1073func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1074 curC, err := c.CuratorClient()
1075 if err != nil {
1076 return nil, err
1077 }
1078 mgmt := apb.NewManagementClient(curC)
1079 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1080 Filter: "has(node.roles.kubernetes_controller)",
1081 })
1082 if err != nil {
1083 return nil, err
1084 }
1085 defer srv.CloseSend()
1086 var res []string
1087 for {
1088 n, err := srv.Recv()
1089 if err == io.EOF {
1090 break
1091 }
1092 if err != nil {
1093 return nil, err
1094 }
1095 if n.Status == nil || n.Status.ExternalAddress == "" {
1096 continue
1097 }
1098 res = append(res, n.Status.ExternalAddress)
1099 }
1100 return res, nil
1101}