blob: 615a9ccdb5ceed9aa83b17ce58c80b3c4bc8e256 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanski66e58952021-10-05 17:06:56 +020014 "errors"
15 "fmt"
16 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020017 "net"
18 "os"
19 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010020 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020021 "path/filepath"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020022 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "syscall"
24 "time"
25
26 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020027 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020028 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010030 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020032 "google.golang.org/protobuf/proto"
33
Serge Bazanski1f8cad72023-03-20 16:58:10 +010034 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020035 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020037 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020038 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020039 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Serge Bazanski66e58952021-10-05 17:06:56 +020040 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020041 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020042 "source.monogon.dev/metropolis/test/launch"
43)
44
Leopold20a036e2023-01-15 00:17:19 +010045// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020046type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010047 // Name is a human-readable identifier to be used in debug output.
48 Name string
49
Serge Bazanski66e58952021-10-05 17:06:56 +020050 // Ports contains the port mapping where to expose the internal ports of the VM to
51 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
52 // ConnectToSocket is set.
53 Ports launch.PortMap
54
Leopold20a036e2023-01-15 00:17:19 +010055 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
56 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020057 // want to test reboot behavior this should be false.
58 AllowReboot bool
59
Leopold20a036e2023-01-15 00:17:19 +010060 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
61 // set, it is instead connected to the given file descriptor/socket. If this is
62 // set, all port maps from the Ports option are ignored. Intended for networking
63 // this instance together with others for running more complex network
64 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020065 ConnectToSocket *os.File
66
Leopoldacfad5b2023-01-15 14:05:25 +010067 // When PcapDump is set, all traffic is dumped to a pcap file in the
68 // runtime directory (e.g. "net0.pcap" for the first interface).
69 PcapDump bool
70
Leopold20a036e2023-01-15 00:17:19 +010071 // SerialPort is an io.ReadWriter over which you can communicate with the serial
72 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020073 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
74 SerialPort io.ReadWriter
75
76 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
77 // registering into a cluster.
78 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020079
80 // Mac is the node's MAC address.
81 Mac *net.HardwareAddr
82
83 // Runtime keeps the node's QEMU runtime state.
84 Runtime *NodeRuntime
85}
86
Leopold20a036e2023-01-15 00:17:19 +010087// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020088type NodeRuntime struct {
89 // ld points at the node's launch directory storing data such as storage
90 // images, firmware variables or the TPM state.
91 ld string
92 // sd points at the node's socket directory.
93 sd string
94
95 // ctxT is the context QEMU will execute in.
96 ctxT context.Context
97 // CtxC is the QEMU context's cancellation function.
98 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +020099}
100
101// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200102var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200103 node.ConsensusPort,
104
105 node.CuratorServicePort,
106 node.DebugServicePort,
107
108 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100109 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200110 node.CuratorServicePort,
111 node.DebuggerPort,
112}
113
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200114// setupRuntime creates the node's QEMU runtime directory, together with all
115// files required to preserve its state, a level below the chosen path ld. The
116// node's socket directory is similarily created a level below sd. It may
117// return an I/O error.
118func setupRuntime(ld, sd string) (*NodeRuntime, error) {
119 // Create a temporary directory to keep all the runtime files.
120 stdp, err := os.MkdirTemp(ld, "node_state*")
121 if err != nil {
122 return nil, fmt.Errorf("failed to create the state directory: %w", err)
123 }
124
125 // Initialize the node's storage with a prebuilt image.
126 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
127 if err != nil {
128 return nil, fmt.Errorf("while resolving a path: %w", err)
129 }
130 di := filepath.Join(stdp, filepath.Base(si))
Serge Bazanski05f813b2023-03-16 17:58:39 +0100131 launch.Log("Cluster: copying node image: %s -> %s", si, di)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200132 if err := copyFile(si, di); err != nil {
133 return nil, fmt.Errorf("while copying the node image: %w", err)
134 }
135
136 // Initialize the OVMF firmware variables file.
137 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
138 if err != nil {
139 return nil, fmt.Errorf("while resolving a path: %w", err)
140 }
141 dv := filepath.Join(stdp, filepath.Base(sv))
142 if err := copyFile(sv, dv); err != nil {
143 return nil, fmt.Errorf("while copying firmware variables: %w", err)
144 }
145
146 // Create the TPM state directory and initialize all files required by swtpm.
147 tpmt := filepath.Join(stdp, "tpm")
148 if err := os.Mkdir(tpmt, 0755); err != nil {
149 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
150 }
151 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
152 if err != nil {
153 return nil, fmt.Errorf("while resolving a path: %w", err)
154 }
155 tpmf, err := os.ReadDir(tpms)
156 if err != nil {
157 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
158 }
159 for _, file := range tpmf {
160 name := file.Name()
161 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
162 if err != nil {
163 return nil, fmt.Errorf("while resolving a path: %w", err)
164 }
165 tgt := filepath.Join(tpmt, name)
166 if err := copyFile(src, tgt); err != nil {
167 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
168 }
169 }
170
171 // Create the socket directory.
172 sotdp, err := os.MkdirTemp(sd, "node_sock*")
173 if err != nil {
174 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
175 }
176
177 return &NodeRuntime{
178 ld: stdp,
179 sd: sotdp,
180 }, nil
181}
182
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200183// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200184// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200185func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200186 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200187 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200188 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100189 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200190 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200191 for _, n := range c.NodeIDs {
192 ep, err := resolver.NodeWithDefaultPort(n)
193 if err != nil {
194 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
195 }
196 r.AddEndpoint(ep)
197 }
198 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
199 grpc.WithTransportCredentials(authCreds),
200 grpc.WithResolvers(r),
201 grpc.WithContextDialer(c.DialNode),
202 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200203 if err != nil {
204 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
205 }
206 c.authClient = authClient
207 }
208 return c.authClient, nil
209}
210
Serge Bazanski66e58952021-10-05 17:06:56 +0200211// LaunchNode launches a single Metropolis node instance with the given options.
212// The instance runs mostly paravirtualized but with some emulated hardware
213// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200214// writable, and the changes are kept across reboots and shutdowns. ld and sd
215// point to the launch directory and the socket directory, holding the nodes'
216// state files (storage, tpm state, firmware state), and UNIX socket files
217// (swtpm <-> QEMU interplay) respectively. The directories must exist before
218// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
219// if either are not initialized.
220func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
221 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
222 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200223 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
224 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
225 // that we open and pass the name of it to QEMU. Not pinning this crashes both
226 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
227 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200228
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200229 // If it's the node's first start, set up its runtime directories.
230 if options.Runtime == nil {
231 r, err := setupRuntime(ld, sd)
232 if err != nil {
233 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200234 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200235 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200236 }
237
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200238 // Replace the node's context with a new one.
239 r := options.Runtime
240 if r.CtxC != nil {
241 r.CtxC()
242 }
243 r.ctxT, r.CtxC = context.WithCancel(ctx)
244
Serge Bazanski66e58952021-10-05 17:06:56 +0200245 var qemuNetType string
246 var qemuNetConfig launch.QemuValue
247 if options.ConnectToSocket != nil {
248 qemuNetType = "socket"
249 qemuNetConfig = launch.QemuValue{
250 "id": {"net0"},
251 "fd": {"3"},
252 }
253 } else {
254 qemuNetType = "user"
255 qemuNetConfig = launch.QemuValue{
256 "id": {"net0"},
257 "net": {"10.42.0.0/24"},
258 "dhcpstart": {"10.42.0.10"},
259 "hostfwd": options.Ports.ToQemuForwards(),
260 }
261 }
262
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200263 // Generate the node's MAC address if it isn't already set in NodeOptions.
264 if options.Mac == nil {
265 mac, err := generateRandomEthernetMAC()
266 if err != nil {
267 return err
268 }
269 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200270 }
271
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200272 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
273 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
274 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200275 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
276 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
277 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200278 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
279 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200280 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200281 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200282 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
283 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
284 "-device", "tpm-tis,tpmdev=tpm0",
285 "-device", "virtio-rng-pci",
286 "-serial", "stdio"}
287
288 if !options.AllowReboot {
289 qemuArgs = append(qemuArgs, "-no-reboot")
290 }
291
292 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200293 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200294 parametersRaw, err := proto.Marshal(options.NodeParameters)
295 if err != nil {
296 return fmt.Errorf("failed to encode node paraeters: %w", err)
297 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100298 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200299 return fmt.Errorf("failed to write node parameters: %w", err)
300 }
301 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
302 }
303
Leopoldacfad5b2023-01-15 14:05:25 +0100304 if options.PcapDump {
305 var qemuNetDump launch.QemuValue
306 pcapPath := filepath.Join(r.ld, "net0.pcap")
307 if options.PcapDump {
308 qemuNetDump = launch.QemuValue{
309 "id": {"net0"},
310 "netdev": {"net0"},
311 "file": {pcapPath},
312 }
313 }
314 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
315 }
316
Serge Bazanski66e58952021-10-05 17:06:56 +0200317 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200318 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200319 defer tpmCancel()
320
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200321 tpmd := filepath.Join(r.ld, "tpm")
322 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200323 tpmEmuCmd.Stderr = os.Stderr
324 tpmEmuCmd.Stdout = os.Stdout
325
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200326 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200327 if err != nil {
328 return fmt.Errorf("failed to start TPM emulator: %w", err)
329 }
330
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200331 // Wait for the socket to be created by the TPM emulator before launching
332 // QEMU.
333 for {
334 _, err := os.Stat(tpmSocketPath)
335 if err == nil {
336 break
337 }
338 if err != nil && !os.IsNotExist(err) {
339 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
340 }
341 if err := tpmCtx.Err(); err != nil {
342 return fmt.Errorf("while waiting for the TPM socket: %w", err)
343 }
344 time.Sleep(time.Millisecond * 100)
345 }
346
Serge Bazanski66e58952021-10-05 17:06:56 +0200347 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200348 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200349 if options.ConnectToSocket != nil {
350 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
351 }
352
353 var stdErrBuf bytes.Buffer
354 systemCmd.Stderr = &stdErrBuf
355 systemCmd.Stdout = options.SerialPort
356
Leopoldaf5086b2023-01-15 14:12:42 +0100357 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
358
Serge Bazanski66e58952021-10-05 17:06:56 +0200359 err = systemCmd.Run()
360
361 // Stop TPM emulator and wait for it to exit to properly reap the child process
362 tpmCancel()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100363 launch.Log("Node: Waiting for TPM emulator to exit")
Serge Bazanski66e58952021-10-05 17:06:56 +0200364 // Wait returns a SIGKILL error because we just cancelled its context.
365 // We still need to call it to avoid creating zombies.
366 _ = tpmEmuCmd.Wait()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100367 launch.Log("Node: TPM emulator done")
Serge Bazanski66e58952021-10-05 17:06:56 +0200368
369 var exerr *exec.ExitError
370 if err != nil && errors.As(err, &exerr) {
371 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
372 if status.Signaled() && status.Signal() == syscall.SIGKILL {
373 // Process was killed externally (most likely by our context being canceled).
374 // This is a normal exit for us, so return nil
375 return nil
376 }
377 exerr.Stderr = stdErrBuf.Bytes()
378 newErr := launch.QEMUError(*exerr)
379 return &newErr
380 }
381 return err
382}
383
384func copyFile(src, dst string) error {
385 in, err := os.Open(src)
386 if err != nil {
387 return fmt.Errorf("when opening source: %w", err)
388 }
389 defer in.Close()
390
391 out, err := os.Create(dst)
392 if err != nil {
393 return fmt.Errorf("when creating destination: %w", err)
394 }
395 defer out.Close()
396
397 _, err = io.Copy(out, in)
398 if err != nil {
399 return fmt.Errorf("when copying file: %w", err)
400 }
401 return out.Close()
402}
403
Serge Bazanskie78a0892021-10-07 17:03:49 +0200404// getNodes wraps around Management.GetNodes to return a list of nodes in a
405// cluster.
406func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200407 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100408 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100409 err := backoff.Retry(func() error {
410 res = nil
411 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200412 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100413 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200414 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100415 for {
416 node, err := srvN.Recv()
417 if err == io.EOF {
418 break
419 }
420 if err != nil {
421 return fmt.Errorf("GetNodes.Recv: %w", err)
422 }
423 res = append(res, node)
424 }
425 return nil
426 }, bo)
427 if err != nil {
428 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200429 }
430 return res, nil
431}
432
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200433// getNode wraps Management.GetNodes. It returns node information matching
434// given node ID.
435func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
436 nodes, err := getNodes(ctx, mgmt)
437 if err != nil {
438 return nil, fmt.Errorf("could not get nodes: %w", err)
439 }
440 for _, n := range nodes {
441 eid := identity.NodeID(n.Pubkey)
442 if eid != id {
443 continue
444 }
445 return n, nil
446 }
447 return nil, fmt.Errorf("no such node.")
448}
449
Serge Bazanski66e58952021-10-05 17:06:56 +0200450// Gets a random EUI-48 Ethernet MAC address
451func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
452 macBuf := make([]byte, 6)
453 _, err := rand.Read(macBuf)
454 if err != nil {
455 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
456 }
457
458 // Set U/L bit and clear I/G bit (locally administered individual MAC)
459 // Ref IEEE 802-2014 Section 8.2.2
460 macBuf[0] = (macBuf[0] | 2) & 0xfe
461 mac := net.HardwareAddr(macBuf)
462 return &mac, nil
463}
464
Serge Bazanskibe742842022-04-04 13:18:50 +0200465const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200466
Serge Bazanskibe742842022-04-04 13:18:50 +0200467// ClusterPorts contains all ports handled by Nanoswitch.
468var ClusterPorts = []uint16{
469 // Forwarded to the first node.
470 uint16(node.CuratorServicePort),
471 uint16(node.DebugServicePort),
472 uint16(node.KubernetesAPIPort),
473 uint16(node.KubernetesAPIWrappedPort),
474
475 // SOCKS proxy to the switch network
476 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200477}
478
479// ClusterOptions contains all options for launching a Metropolis cluster.
480type ClusterOptions struct {
481 // The number of nodes this cluster should be started with.
482 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100483
484 // If true, node logs will be saved to individual files instead of being printed
485 // out to stderr. The path of these files will be still printed to stdout.
486 //
487 // The files will be located within the launch directory inside TEST_TMPDIR (or
488 // the default tempdir location, if not set).
489 NodeLogsToFiles bool
Serge Bazanski66e58952021-10-05 17:06:56 +0200490}
491
492// Cluster is the running Metropolis cluster launched using the LaunchCluster
493// function.
494type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200495 // Owner is the TLS Certificate of the owner of the test cluster. This can be
496 // used to authenticate further clients to the running cluster.
497 Owner tls.Certificate
498 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200499 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200500 Ports launch.PortMap
501
Serge Bazanskibe742842022-04-04 13:18:50 +0200502 // Nodes is a map from Node ID to its runtime information.
503 Nodes map[string]*NodeInCluster
504 // NodeIDs is a list of node IDs that are backing this cluster, in order of
505 // creation.
506 NodeIDs []string
507
Serge Bazanski54e212a2023-06-14 13:45:11 +0200508 // CACertificate is the cluster's CA certificate.
509 CACertificate *x509.Certificate
510
Serge Bazanski66e58952021-10-05 17:06:56 +0200511 // nodesDone is a list of channels populated with the return codes from all the
512 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100513 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200514 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200515 // nodeOpts are the cluster member nodes' mutable launch options, kept here
516 // to facilitate reboots.
517 nodeOpts []NodeOptions
518 // launchDir points at the directory keeping the nodes' state, such as storage
519 // images, firmware variable files, TPM state.
520 launchDir string
521 // socketDir points at the directory keeping UNIX socket files, such as these
522 // used to facilitate communication between QEMU and swtpm. It's different
523 // from launchDir, and anchored nearer the file system root, due to the
524 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100525 socketDir string
526 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200527
Serge Bazanskibe742842022-04-04 13:18:50 +0200528 // socksDialer is used by DialNode to establish connections to nodes via the
529 // SOCKS server ran by nanoswitch.
530 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200531
532 // authClient is a cached authenticated owner connection to a Curator
533 // instance within the cluster.
534 authClient *grpc.ClientConn
535
536 // ctxT is the context individual node contexts are created from.
537 ctxT context.Context
538 // ctxC is used by Close to cancel the context under which the nodes are
539 // running.
540 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200541}
542
543// NodeInCluster represents information about a node that's part of a Cluster.
544type NodeInCluster struct {
545 // ID of the node, which can be used to dial this node's services via DialNode.
546 ID string
547 // Address of the node on the network ran by nanoswitch. Not reachable from the
548 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
549 // on Cluster.Ports[SOCKSPort]).
550 ManagementAddress string
551}
552
553// firstConnection performs the initial owner credential escrow with a newly
554// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
555// running at 10.1.0.2, which is always the case with the current nanoswitch
556// implementation.
557//
Leopold20a036e2023-01-15 00:17:19 +0100558// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200559// information as NodeInCluster.
560func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
561 // Dial external service.
562 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
563 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
564 if err != nil {
565 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
566 }
567 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
568 return socksDialer.Dial("tcp", addr)
569 }
570 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
571 if err != nil {
572 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
573 }
574 defer initClient.Close()
575
576 // Retrieve owner certificate - this can take a while because the node is still
577 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100578 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200579 aaa := apb.NewAAAClient(initClient)
580 var cert *tls.Certificate
581 err = backoff.Retry(func() error {
582 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
583 if st, ok := status.FromError(err); ok {
584 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100585 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200586 return err
587 }
588 }
589 return backoff.Permanent(err)
590 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
591 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200592 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200593 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100594 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200595
596 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200597 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200598 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
599 if err != nil {
600 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
601 }
602 defer authClient.Close()
603 mgmt := apb.NewManagementClient(authClient)
604
605 var node *NodeInCluster
606 err = backoff.Retry(func() error {
607 nodes, err := getNodes(ctx, mgmt)
608 if err != nil {
609 return fmt.Errorf("retrieving nodes failed: %w", err)
610 }
611 if len(nodes) != 1 {
612 return fmt.Errorf("expected one node, got %d", len(nodes))
613 }
614 n := nodes[0]
615 if n.Status == nil || n.Status.ExternalAddress == "" {
616 return fmt.Errorf("node has no status and/or address")
617 }
618 node = &NodeInCluster{
619 ID: identity.NodeID(n.Pubkey),
620 ManagementAddress: n.Status.ExternalAddress,
621 }
622 return nil
623 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
624 if err != nil {
625 return nil, nil, err
626 }
627
628 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200629}
630
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100631func NewSerialFileLogger(p string) (io.ReadWriter, error) {
632 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0600)
633 if err != nil {
634 return nil, err
635 }
636 return f, nil
637}
638
Serge Bazanski66e58952021-10-05 17:06:56 +0200639// LaunchCluster launches a cluster of Metropolis node VMs together with a
640// Nanoswitch instance to network them all together.
641//
642// The given context will be used to run all qemu instances in the cluster, and
643// canceling the context or calling Close() will terminate them.
644func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200645 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200646 return nil, errors.New("refusing to start cluster with zero nodes")
647 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200648
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200649 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100650 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200651 if err != nil {
652 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
653 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100654 // Create the metroctl config directory. We keep it in /tmp because in some
655 // scenarios it's end-user visible and we want it short.
656 md, err := os.MkdirTemp("/tmp", "metroctl-*")
657 if err != nil {
658 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
659 }
660
661 // Create the socket directory. We keep it in /tmp because of socket path limits.
662 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200663 if err != nil {
664 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
665 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200666
667 // Prepare links between nodes and nanoswitch.
668 var switchPorts []*os.File
669 var vmPorts []*os.File
670 for i := 0; i < opts.NumNodes; i++ {
671 switchPort, vmPort, err := launch.NewSocketPair()
672 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200673 return nil, fmt.Errorf("failed to get socketpair: %w", err)
674 }
675 switchPorts = append(switchPorts, switchPort)
676 vmPorts = append(vmPorts, vmPort)
677 }
678
Serge Bazanskie78a0892021-10-07 17:03:49 +0200679 // Make a list of channels that will be populated by all running node qemu
680 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200681 done := make([]chan error, opts.NumNodes)
682 for i, _ := range done {
683 done[i] = make(chan error, 1)
684 }
685
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200686 // Prepare the node options. These will be kept as part of Cluster.
687 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
688 // launch. The runtime information can be later used to restart a node.
689 // The 0th node will be initialized first. The rest will follow after it
690 // had bootstrapped the cluster.
691 nodeOpts := make([]NodeOptions, opts.NumNodes)
692 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100693 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200694 ConnectToSocket: vmPorts[0],
695 NodeParameters: &apb.NodeParameters{
696 Cluster: &apb.NodeParameters_ClusterBootstrap_{
697 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
698 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200699 },
700 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200701 },
702 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100703 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200704 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100705 if opts.NodeLogsToFiles {
706 path := path.Join(ld, "node-1.txt")
707 port, err := NewSerialFileLogger(path)
708 if err != nil {
709 return nil, fmt.Errorf("could not open log file for node 1: %w", err)
710 }
711 launch.Log("Node 1 logs at %s", path)
712 nodeOpts[0].SerialPort = port
713 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200714
715 // Start the first node.
716 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100717 launch.Log("Cluster: Starting node %d...", 1)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200718 go func() {
719 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200720 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100721 launch.Log("Node %d finished with an error: %v", 1, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200722 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200723 done[0] <- err
724 }()
725
Serge Bazanskie78a0892021-10-07 17:03:49 +0200726 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200727 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
728 if err != nil {
729 ctxC()
730 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
731 }
732
733 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100734 var serialPort io.ReadWriter
735 if opts.NodeLogsToFiles {
736 path := path.Join(ld, "nanoswitch.txt")
737 serialPort, err = NewSerialFileLogger(path)
738 if err != nil {
739 launch.Log("Could not open log file for nanoswitch: %v", err)
740 }
741 launch.Log("Nanoswitch logs at %s", path)
742 } else {
743 serialPort = newPrefixedStdio(99)
744 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200745 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100746 Name: "nanoswitch",
Serge Bazanski66e58952021-10-05 17:06:56 +0200747 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100748 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200749 ExtraNetworkInterfaces: switchPorts,
750 PortMap: portMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100751 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100752 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200753 }); err != nil {
754 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100755 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200756 }
757 }
758 }()
759
Serge Bazanskibe742842022-04-04 13:18:50 +0200760 // Build SOCKS dialer.
761 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
762 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200763 if err != nil {
764 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200765 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200766 }
767
Serge Bazanskibe742842022-04-04 13:18:50 +0200768 // Retrieve owner credentials and first node.
769 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200770 if err != nil {
771 ctxC()
772 return nil, err
773 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200774
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100775 // Write credentials to the metroctl directory.
776 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
777 ctxC()
778 return nil, fmt.Errorf("could not write owner key: %w", err)
779 }
780 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
781 ctxC()
782 return nil, fmt.Errorf("could not write owner certificate: %w", err)
783 }
784
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200785 // Set up a partially initialized cluster instance, to be filled in in the
786 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200787 cluster := &Cluster{
788 Owner: *cert,
789 Ports: portMap,
790 Nodes: map[string]*NodeInCluster{
791 firstNode.ID: firstNode,
792 },
793 NodeIDs: []string{
794 firstNode.ID,
795 },
796
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100797 nodesDone: done,
798 nodeOpts: nodeOpts,
799 launchDir: ld,
800 socketDir: sd,
801 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200802
Serge Bazanskibe742842022-04-04 13:18:50 +0200803 socksDialer: socksDialer,
804
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200805 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200806 ctxC: ctxC,
807 }
808
809 // Now start the rest of the nodes and register them into the cluster.
810
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200811 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200812 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200813 if err != nil {
814 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200815 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200816 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200817 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200818
819 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100820 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200821 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
822 if err != nil {
823 ctxC()
824 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
825 }
826 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100827 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200828
829 // Retrieve cluster info (for directory and ca public key) to register further
830 // nodes.
831 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
832 if err != nil {
833 ctxC()
834 return nil, fmt.Errorf("GetClusterInfo: %w", err)
835 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200836 caCert, err := x509.ParseCertificate(resI.CaCertificate)
837 if err != nil {
838 ctxC()
839 return nil, fmt.Errorf("ParseCertificate: %w", err)
840 }
841 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200842
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200843 // Use the retrieved information to configure the rest of the node options.
844 for i := 1; i < opts.NumNodes; i++ {
845 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100846 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200847 ConnectToSocket: vmPorts[i],
848 NodeParameters: &apb.NodeParameters{
849 Cluster: &apb.NodeParameters_ClusterRegister_{
850 ClusterRegister: &apb.NodeParameters_ClusterRegister{
851 RegisterTicket: ticket,
852 ClusterDirectory: resI.ClusterDirectory,
853 CaCertificate: resI.CaCertificate,
854 },
855 },
856 },
857 SerialPort: newPrefixedStdio(i),
858 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100859 if opts.NodeLogsToFiles {
860 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
861 port, err := NewSerialFileLogger(path)
862 if err != nil {
863 return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
864 }
865 launch.Log("Node %d logs at %s", i+1, path)
866 nodeOpts[i].SerialPort = port
867 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200868 }
869
870 // Now run the rest of the nodes.
871 //
Serge Bazanskie78a0892021-10-07 17:03:49 +0200872 // TODO(q3k): parallelize this
873 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100874 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200875 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200876 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200877 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100878 launch.Log("Node %d finished with an error: %v", i, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200879 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200880 done[i] <- err
881 }(i)
882 var newNode *apb.Node
883
Serge Bazanski05f813b2023-03-16 17:58:39 +0100884 launch.Log("Cluster: waiting for node %d to appear as NEW...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200885 for {
886 nodes, err := getNodes(ctx, mgmt)
887 if err != nil {
888 ctxC()
889 return nil, fmt.Errorf("could not get nodes: %w", err)
890 }
891 for _, n := range nodes {
892 if n.State == cpb.NodeState_NODE_STATE_NEW {
893 newNode = n
894 break
895 }
896 }
897 if newNode != nil {
898 break
899 }
900 time.Sleep(1 * time.Second)
901 }
902 id := identity.NodeID(newNode.Pubkey)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100903 launch.Log("Cluster: node %d is %s", i, id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200904
Serge Bazanski05f813b2023-03-16 17:58:39 +0100905 launch.Log("Cluster: approving node %d", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200906 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
907 Pubkey: newNode.Pubkey,
908 })
909 if err != nil {
910 ctxC()
911 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
912 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100913 launch.Log("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200914 for {
915 nodes, err := getNodes(ctx, mgmt)
916 if err != nil {
917 ctxC()
918 return nil, fmt.Errorf("could not get nodes: %w", err)
919 }
920 found := false
921 for _, n := range nodes {
922 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
923 continue
924 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200925 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200926 break
927 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200928 if n.State != cpb.NodeState_NODE_STATE_UP {
929 break
930 }
931 found = true
932 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
933 ID: identity.NodeID(n.Pubkey),
934 ManagementAddress: n.Status.ExternalAddress,
935 }
936 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
937 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200938 }
939 if found {
940 break
941 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200942 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200943 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100944 launch.Log("Cluster: node %d (%s) UP!", i, id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200945 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200946
Serge Bazanski05f813b2023-03-16 17:58:39 +0100947 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +0200948 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100949 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +0200950 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200951
Serge Bazanskibe742842022-04-04 13:18:50 +0200952 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200953}
954
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200955// RebootNode reboots the cluster member node matching the given index, and
956// waits for it to rejoin the cluster. It will use the given context ctx to run
957// cluster API requests, whereas the resulting QEMU process will be created
958// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
959func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
960 if idx < 0 || idx >= len(c.NodeIDs) {
961 return fmt.Errorf("index out of bounds.")
962 }
963 id := c.NodeIDs[idx]
964
965 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200966 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200967 if err != nil {
968 return err
969 }
970 mgmt := apb.NewManagementClient(curC)
971
972 // Get the timestamp of the node's last update, as observed by Curator.
973 // It'll be needed to make sure it had rejoined the cluster after the reboot.
974 var is *apb.Node
975 for {
976 r, err := getNode(ctx, mgmt, id)
977 if err != nil {
978 return err
979 }
980
981 // Node status may be absent if it hasn't reported to the cluster yet. Wait
982 // for it to appear before progressing further.
983 if r.Status != nil {
984 is = r
985 break
986 }
987 time.Sleep(time.Second)
988 }
989
990 // Cancel the node's context. This will shut down QEMU.
991 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100992 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200993 err = <-c.nodesDone[idx]
994 if err != nil {
995 return fmt.Errorf("while restarting node: %w", err)
996 }
997
998 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100999 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001000 go func(n int) {
1001 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001002 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001003 launch.Log("Node %d finished with an error: %v", n, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001004 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001005 c.nodesDone[n] <- err
1006 }(idx)
1007
1008 // Poll Management.GetNodes until the node's timestamp is updated.
1009 for {
1010 cs, err := getNode(ctx, mgmt, id)
1011 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001012 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001013 return err
1014 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001015 launch.Log("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001016 if cs.Status == nil {
1017 continue
1018 }
Mateusz Zalega28800ad2022-07-08 14:56:02 +02001019 if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001020 break
1021 }
1022 time.Sleep(time.Second)
1023 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001024 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001025 return nil
1026}
1027
1028// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001029// nodes to stop. It returns an error if stopping the nodes failed, or one of
1030// the nodes failed to fully start in the first place.
1031func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001032 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001033 if c.authClient != nil {
1034 c.authClient.Close()
1035 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001036 c.ctxC()
1037
Leopold20a036e2023-01-15 00:17:19 +01001038 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001039 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001040 for _, c := range c.nodesDone {
1041 err := <-c
1042 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001043 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001044 }
1045 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001046 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001047 os.RemoveAll(c.launchDir)
1048 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001049 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001050 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001051 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001052}
Serge Bazanskibe742842022-04-04 13:18:50 +02001053
1054// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1055// their ID. This is performed by connecting to the cluster nanoswitch via its
1056// SOCKS proxy, and using the cluster node list for name resolution.
1057//
1058// For example:
1059//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001060// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001061func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1062 host, port, err := net.SplitHostPort(addr)
1063 if err != nil {
1064 return nil, fmt.Errorf("invalid host:port: %w", err)
1065 }
1066 // Already an IP address?
1067 if net.ParseIP(host) != nil {
1068 return c.socksDialer.Dial("tcp", addr)
1069 }
1070
1071 // Otherwise, expect a node name.
1072 node, ok := c.Nodes[host]
1073 if !ok {
1074 return nil, fmt.Errorf("unknown node %q", host)
1075 }
1076 addr = net.JoinHostPort(node.ManagementAddress, port)
1077 return c.socksDialer.Dial("tcp", addr)
1078}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001079
1080// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1081// which are currently Kubernetes controllers, ie. run an apiserver. This list
1082// might be empty if no node is currently configured with the
1083// 'KubernetesController' node.
1084func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1085 curC, err := c.CuratorClient()
1086 if err != nil {
1087 return nil, err
1088 }
1089 mgmt := apb.NewManagementClient(curC)
1090 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1091 Filter: "has(node.roles.kubernetes_controller)",
1092 })
1093 if err != nil {
1094 return nil, err
1095 }
1096 defer srv.CloseSend()
1097 var res []string
1098 for {
1099 n, err := srv.Recv()
1100 if err == io.EOF {
1101 break
1102 }
1103 if err != nil {
1104 return nil, err
1105 }
1106 if n.Status == nil || n.Status.ExternalAddress == "" {
1107 continue
1108 }
1109 res = append(res, n.Status.ExternalAddress)
1110 }
1111 return res, nil
1112}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001113
1114func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1115 // Get an authenticated owner client within the cluster.
1116 curC, err := c.CuratorClient()
1117 if err != nil {
1118 return err
1119 }
1120 mgmt := apb.NewManagementClient(curC)
1121 nodes, err := getNodes(ctx, mgmt)
1122 if err != nil {
1123 return err
1124 }
1125
1126 var unhealthy []string
1127 for _, node := range nodes {
1128 if node.Health == apb.Node_HEALTHY {
1129 continue
1130 }
1131 unhealthy = append(unhealthy, node.Id)
1132 }
1133 if len(unhealthy) == 0 {
1134 return nil
1135 }
1136 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1137}