blob: 0efd08b62e3b663c7005537cd0042d3cc8c394a5 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
13 "errors"
14 "fmt"
15 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020016 "net"
17 "os"
18 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010019 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "path/filepath"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020021 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020022 "syscall"
23 "time"
24
25 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020027 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020028 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010029 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020031 "google.golang.org/protobuf/proto"
32
Serge Bazanski1f8cad72023-03-20 16:58:10 +010033 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020034 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020035 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020036 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020037 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020038 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Serge Bazanski66e58952021-10-05 17:06:56 +020039 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020040 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020041 "source.monogon.dev/metropolis/test/launch"
42)
43
Leopold20a036e2023-01-15 00:17:19 +010044// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020045type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010046 // Name is a human-readable identifier to be used in debug output.
47 Name string
48
Serge Bazanski66e58952021-10-05 17:06:56 +020049 // Ports contains the port mapping where to expose the internal ports of the VM to
50 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
51 // ConnectToSocket is set.
52 Ports launch.PortMap
53
Leopold20a036e2023-01-15 00:17:19 +010054 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
55 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020056 // want to test reboot behavior this should be false.
57 AllowReboot bool
58
Leopold20a036e2023-01-15 00:17:19 +010059 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
60 // set, it is instead connected to the given file descriptor/socket. If this is
61 // set, all port maps from the Ports option are ignored. Intended for networking
62 // this instance together with others for running more complex network
63 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020064 ConnectToSocket *os.File
65
Leopoldacfad5b2023-01-15 14:05:25 +010066 // When PcapDump is set, all traffic is dumped to a pcap file in the
67 // runtime directory (e.g. "net0.pcap" for the first interface).
68 PcapDump bool
69
Leopold20a036e2023-01-15 00:17:19 +010070 // SerialPort is an io.ReadWriter over which you can communicate with the serial
71 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020072 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
73 SerialPort io.ReadWriter
74
75 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
76 // registering into a cluster.
77 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020078
79 // Mac is the node's MAC address.
80 Mac *net.HardwareAddr
81
82 // Runtime keeps the node's QEMU runtime state.
83 Runtime *NodeRuntime
84}
85
Leopold20a036e2023-01-15 00:17:19 +010086// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020087type NodeRuntime struct {
88 // ld points at the node's launch directory storing data such as storage
89 // images, firmware variables or the TPM state.
90 ld string
91 // sd points at the node's socket directory.
92 sd string
93
94 // ctxT is the context QEMU will execute in.
95 ctxT context.Context
96 // CtxC is the QEMU context's cancellation function.
97 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +020098}
99
100// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200101var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200102 node.ConsensusPort,
103
104 node.CuratorServicePort,
105 node.DebugServicePort,
106
107 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100108 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200109 node.CuratorServicePort,
110 node.DebuggerPort,
111}
112
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200113// setupRuntime creates the node's QEMU runtime directory, together with all
114// files required to preserve its state, a level below the chosen path ld. The
115// node's socket directory is similarily created a level below sd. It may
116// return an I/O error.
117func setupRuntime(ld, sd string) (*NodeRuntime, error) {
118 // Create a temporary directory to keep all the runtime files.
119 stdp, err := os.MkdirTemp(ld, "node_state*")
120 if err != nil {
121 return nil, fmt.Errorf("failed to create the state directory: %w", err)
122 }
123
124 // Initialize the node's storage with a prebuilt image.
125 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
126 if err != nil {
127 return nil, fmt.Errorf("while resolving a path: %w", err)
128 }
129 di := filepath.Join(stdp, filepath.Base(si))
Serge Bazanski05f813b2023-03-16 17:58:39 +0100130 launch.Log("Cluster: copying node image: %s -> %s", si, di)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200131 if err := copyFile(si, di); err != nil {
132 return nil, fmt.Errorf("while copying the node image: %w", err)
133 }
134
135 // Initialize the OVMF firmware variables file.
136 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
137 if err != nil {
138 return nil, fmt.Errorf("while resolving a path: %w", err)
139 }
140 dv := filepath.Join(stdp, filepath.Base(sv))
141 if err := copyFile(sv, dv); err != nil {
142 return nil, fmt.Errorf("while copying firmware variables: %w", err)
143 }
144
145 // Create the TPM state directory and initialize all files required by swtpm.
146 tpmt := filepath.Join(stdp, "tpm")
147 if err := os.Mkdir(tpmt, 0755); err != nil {
148 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
149 }
150 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
151 if err != nil {
152 return nil, fmt.Errorf("while resolving a path: %w", err)
153 }
154 tpmf, err := os.ReadDir(tpms)
155 if err != nil {
156 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
157 }
158 for _, file := range tpmf {
159 name := file.Name()
160 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
161 if err != nil {
162 return nil, fmt.Errorf("while resolving a path: %w", err)
163 }
164 tgt := filepath.Join(tpmt, name)
165 if err := copyFile(src, tgt); err != nil {
166 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
167 }
168 }
169
170 // Create the socket directory.
171 sotdp, err := os.MkdirTemp(sd, "node_sock*")
172 if err != nil {
173 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
174 }
175
176 return &NodeRuntime{
177 ld: stdp,
178 sd: sotdp,
179 }, nil
180}
181
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200182// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200183// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200184func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200185 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200186 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200187 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100188 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200189 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200190 for _, n := range c.NodeIDs {
191 ep, err := resolver.NodeWithDefaultPort(n)
192 if err != nil {
193 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
194 }
195 r.AddEndpoint(ep)
196 }
197 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
198 grpc.WithTransportCredentials(authCreds),
199 grpc.WithResolvers(r),
200 grpc.WithContextDialer(c.DialNode),
201 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200202 if err != nil {
203 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
204 }
205 c.authClient = authClient
206 }
207 return c.authClient, nil
208}
209
Serge Bazanski66e58952021-10-05 17:06:56 +0200210// LaunchNode launches a single Metropolis node instance with the given options.
211// The instance runs mostly paravirtualized but with some emulated hardware
212// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200213// writable, and the changes are kept across reboots and shutdowns. ld and sd
214// point to the launch directory and the socket directory, holding the nodes'
215// state files (storage, tpm state, firmware state), and UNIX socket files
216// (swtpm <-> QEMU interplay) respectively. The directories must exist before
217// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
218// if either are not initialized.
219func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
220 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
221 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200222 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
223 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
224 // that we open and pass the name of it to QEMU. Not pinning this crashes both
225 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
226 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200227
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200228 // If it's the node's first start, set up its runtime directories.
229 if options.Runtime == nil {
230 r, err := setupRuntime(ld, sd)
231 if err != nil {
232 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200233 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200234 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200235 }
236
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200237 // Replace the node's context with a new one.
238 r := options.Runtime
239 if r.CtxC != nil {
240 r.CtxC()
241 }
242 r.ctxT, r.CtxC = context.WithCancel(ctx)
243
Serge Bazanski66e58952021-10-05 17:06:56 +0200244 var qemuNetType string
245 var qemuNetConfig launch.QemuValue
246 if options.ConnectToSocket != nil {
247 qemuNetType = "socket"
248 qemuNetConfig = launch.QemuValue{
249 "id": {"net0"},
250 "fd": {"3"},
251 }
252 } else {
253 qemuNetType = "user"
254 qemuNetConfig = launch.QemuValue{
255 "id": {"net0"},
256 "net": {"10.42.0.0/24"},
257 "dhcpstart": {"10.42.0.10"},
258 "hostfwd": options.Ports.ToQemuForwards(),
259 }
260 }
261
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200262 // Generate the node's MAC address if it isn't already set in NodeOptions.
263 if options.Mac == nil {
264 mac, err := generateRandomEthernetMAC()
265 if err != nil {
266 return err
267 }
268 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200269 }
270
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200271 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
272 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
273 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200274 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
275 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
276 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200277 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
278 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200279 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200280 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200281 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
282 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
283 "-device", "tpm-tis,tpmdev=tpm0",
284 "-device", "virtio-rng-pci",
285 "-serial", "stdio"}
286
287 if !options.AllowReboot {
288 qemuArgs = append(qemuArgs, "-no-reboot")
289 }
290
291 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200292 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200293 parametersRaw, err := proto.Marshal(options.NodeParameters)
294 if err != nil {
295 return fmt.Errorf("failed to encode node paraeters: %w", err)
296 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100297 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200298 return fmt.Errorf("failed to write node parameters: %w", err)
299 }
300 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
301 }
302
Leopoldacfad5b2023-01-15 14:05:25 +0100303 if options.PcapDump {
304 var qemuNetDump launch.QemuValue
305 pcapPath := filepath.Join(r.ld, "net0.pcap")
306 if options.PcapDump {
307 qemuNetDump = launch.QemuValue{
308 "id": {"net0"},
309 "netdev": {"net0"},
310 "file": {pcapPath},
311 }
312 }
313 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
314 }
315
Serge Bazanski66e58952021-10-05 17:06:56 +0200316 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200317 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200318 defer tpmCancel()
319
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200320 tpmd := filepath.Join(r.ld, "tpm")
321 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200322 tpmEmuCmd.Stderr = os.Stderr
323 tpmEmuCmd.Stdout = os.Stdout
324
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200325 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200326 if err != nil {
327 return fmt.Errorf("failed to start TPM emulator: %w", err)
328 }
329
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200330 // Wait for the socket to be created by the TPM emulator before launching
331 // QEMU.
332 for {
333 _, err := os.Stat(tpmSocketPath)
334 if err == nil {
335 break
336 }
337 if err != nil && !os.IsNotExist(err) {
338 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
339 }
340 if err := tpmCtx.Err(); err != nil {
341 return fmt.Errorf("while waiting for the TPM socket: %w", err)
342 }
343 time.Sleep(time.Millisecond * 100)
344 }
345
Serge Bazanski66e58952021-10-05 17:06:56 +0200346 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200347 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200348 if options.ConnectToSocket != nil {
349 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
350 }
351
352 var stdErrBuf bytes.Buffer
353 systemCmd.Stderr = &stdErrBuf
354 systemCmd.Stdout = options.SerialPort
355
Leopoldaf5086b2023-01-15 14:12:42 +0100356 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
357
Serge Bazanski66e58952021-10-05 17:06:56 +0200358 err = systemCmd.Run()
359
360 // Stop TPM emulator and wait for it to exit to properly reap the child process
361 tpmCancel()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100362 launch.Log("Node: Waiting for TPM emulator to exit")
Serge Bazanski66e58952021-10-05 17:06:56 +0200363 // Wait returns a SIGKILL error because we just cancelled its context.
364 // We still need to call it to avoid creating zombies.
365 _ = tpmEmuCmd.Wait()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100366 launch.Log("Node: TPM emulator done")
Serge Bazanski66e58952021-10-05 17:06:56 +0200367
368 var exerr *exec.ExitError
369 if err != nil && errors.As(err, &exerr) {
370 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
371 if status.Signaled() && status.Signal() == syscall.SIGKILL {
372 // Process was killed externally (most likely by our context being canceled).
373 // This is a normal exit for us, so return nil
374 return nil
375 }
376 exerr.Stderr = stdErrBuf.Bytes()
377 newErr := launch.QEMUError(*exerr)
378 return &newErr
379 }
380 return err
381}
382
383func copyFile(src, dst string) error {
384 in, err := os.Open(src)
385 if err != nil {
386 return fmt.Errorf("when opening source: %w", err)
387 }
388 defer in.Close()
389
390 out, err := os.Create(dst)
391 if err != nil {
392 return fmt.Errorf("when creating destination: %w", err)
393 }
394 defer out.Close()
395
396 _, err = io.Copy(out, in)
397 if err != nil {
398 return fmt.Errorf("when copying file: %w", err)
399 }
400 return out.Close()
401}
402
Serge Bazanskie78a0892021-10-07 17:03:49 +0200403// getNodes wraps around Management.GetNodes to return a list of nodes in a
404// cluster.
405func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200406 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100407 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100408 err := backoff.Retry(func() error {
409 res = nil
410 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200411 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100412 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200413 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100414 for {
415 node, err := srvN.Recv()
416 if err == io.EOF {
417 break
418 }
419 if err != nil {
420 return fmt.Errorf("GetNodes.Recv: %w", err)
421 }
422 res = append(res, node)
423 }
424 return nil
425 }, bo)
426 if err != nil {
427 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200428 }
429 return res, nil
430}
431
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200432// getNode wraps Management.GetNodes. It returns node information matching
433// given node ID.
434func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
435 nodes, err := getNodes(ctx, mgmt)
436 if err != nil {
437 return nil, fmt.Errorf("could not get nodes: %w", err)
438 }
439 for _, n := range nodes {
440 eid := identity.NodeID(n.Pubkey)
441 if eid != id {
442 continue
443 }
444 return n, nil
445 }
446 return nil, fmt.Errorf("no such node.")
447}
448
Serge Bazanski66e58952021-10-05 17:06:56 +0200449// Gets a random EUI-48 Ethernet MAC address
450func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
451 macBuf := make([]byte, 6)
452 _, err := rand.Read(macBuf)
453 if err != nil {
454 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
455 }
456
457 // Set U/L bit and clear I/G bit (locally administered individual MAC)
458 // Ref IEEE 802-2014 Section 8.2.2
459 macBuf[0] = (macBuf[0] | 2) & 0xfe
460 mac := net.HardwareAddr(macBuf)
461 return &mac, nil
462}
463
Serge Bazanskibe742842022-04-04 13:18:50 +0200464const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200465
Serge Bazanskibe742842022-04-04 13:18:50 +0200466// ClusterPorts contains all ports handled by Nanoswitch.
467var ClusterPorts = []uint16{
468 // Forwarded to the first node.
469 uint16(node.CuratorServicePort),
470 uint16(node.DebugServicePort),
471 uint16(node.KubernetesAPIPort),
472 uint16(node.KubernetesAPIWrappedPort),
473
474 // SOCKS proxy to the switch network
475 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200476}
477
478// ClusterOptions contains all options for launching a Metropolis cluster.
479type ClusterOptions struct {
480 // The number of nodes this cluster should be started with.
481 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100482
483 // If true, node logs will be saved to individual files instead of being printed
484 // out to stderr. The path of these files will be still printed to stdout.
485 //
486 // The files will be located within the launch directory inside TEST_TMPDIR (or
487 // the default tempdir location, if not set).
488 NodeLogsToFiles bool
Serge Bazanski66e58952021-10-05 17:06:56 +0200489}
490
491// Cluster is the running Metropolis cluster launched using the LaunchCluster
492// function.
493type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200494 // Owner is the TLS Certificate of the owner of the test cluster. This can be
495 // used to authenticate further clients to the running cluster.
496 Owner tls.Certificate
497 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200498 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200499 Ports launch.PortMap
500
Serge Bazanskibe742842022-04-04 13:18:50 +0200501 // Nodes is a map from Node ID to its runtime information.
502 Nodes map[string]*NodeInCluster
503 // NodeIDs is a list of node IDs that are backing this cluster, in order of
504 // creation.
505 NodeIDs []string
506
Serge Bazanski66e58952021-10-05 17:06:56 +0200507 // nodesDone is a list of channels populated with the return codes from all the
508 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100509 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200510 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200511 // nodeOpts are the cluster member nodes' mutable launch options, kept here
512 // to facilitate reboots.
513 nodeOpts []NodeOptions
514 // launchDir points at the directory keeping the nodes' state, such as storage
515 // images, firmware variable files, TPM state.
516 launchDir string
517 // socketDir points at the directory keeping UNIX socket files, such as these
518 // used to facilitate communication between QEMU and swtpm. It's different
519 // from launchDir, and anchored nearer the file system root, due to the
520 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100521 socketDir string
522 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200523
Serge Bazanskibe742842022-04-04 13:18:50 +0200524 // socksDialer is used by DialNode to establish connections to nodes via the
525 // SOCKS server ran by nanoswitch.
526 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200527
528 // authClient is a cached authenticated owner connection to a Curator
529 // instance within the cluster.
530 authClient *grpc.ClientConn
531
532 // ctxT is the context individual node contexts are created from.
533 ctxT context.Context
534 // ctxC is used by Close to cancel the context under which the nodes are
535 // running.
536 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200537}
538
539// NodeInCluster represents information about a node that's part of a Cluster.
540type NodeInCluster struct {
541 // ID of the node, which can be used to dial this node's services via DialNode.
542 ID string
543 // Address of the node on the network ran by nanoswitch. Not reachable from the
544 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
545 // on Cluster.Ports[SOCKSPort]).
546 ManagementAddress string
547}
548
549// firstConnection performs the initial owner credential escrow with a newly
550// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
551// running at 10.1.0.2, which is always the case with the current nanoswitch
552// implementation.
553//
Leopold20a036e2023-01-15 00:17:19 +0100554// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200555// information as NodeInCluster.
556func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
557 // Dial external service.
558 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
559 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
560 if err != nil {
561 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
562 }
563 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
564 return socksDialer.Dial("tcp", addr)
565 }
566 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
567 if err != nil {
568 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
569 }
570 defer initClient.Close()
571
572 // Retrieve owner certificate - this can take a while because the node is still
573 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100574 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200575 aaa := apb.NewAAAClient(initClient)
576 var cert *tls.Certificate
577 err = backoff.Retry(func() error {
578 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
579 if st, ok := status.FromError(err); ok {
580 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100581 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200582 return err
583 }
584 }
585 return backoff.Permanent(err)
586 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
587 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200588 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200589 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100590 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200591
592 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200593 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200594 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
595 if err != nil {
596 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
597 }
598 defer authClient.Close()
599 mgmt := apb.NewManagementClient(authClient)
600
601 var node *NodeInCluster
602 err = backoff.Retry(func() error {
603 nodes, err := getNodes(ctx, mgmt)
604 if err != nil {
605 return fmt.Errorf("retrieving nodes failed: %w", err)
606 }
607 if len(nodes) != 1 {
608 return fmt.Errorf("expected one node, got %d", len(nodes))
609 }
610 n := nodes[0]
611 if n.Status == nil || n.Status.ExternalAddress == "" {
612 return fmt.Errorf("node has no status and/or address")
613 }
614 node = &NodeInCluster{
615 ID: identity.NodeID(n.Pubkey),
616 ManagementAddress: n.Status.ExternalAddress,
617 }
618 return nil
619 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
620 if err != nil {
621 return nil, nil, err
622 }
623
624 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200625}
626
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100627func NewSerialFileLogger(p string) (io.ReadWriter, error) {
628 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0600)
629 if err != nil {
630 return nil, err
631 }
632 return f, nil
633}
634
Serge Bazanski66e58952021-10-05 17:06:56 +0200635// LaunchCluster launches a cluster of Metropolis node VMs together with a
636// Nanoswitch instance to network them all together.
637//
638// The given context will be used to run all qemu instances in the cluster, and
639// canceling the context or calling Close() will terminate them.
640func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200641 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200642 return nil, errors.New("refusing to start cluster with zero nodes")
643 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200644
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200645 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100646 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200647 if err != nil {
648 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
649 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100650 // Create the metroctl config directory. We keep it in /tmp because in some
651 // scenarios it's end-user visible and we want it short.
652 md, err := os.MkdirTemp("/tmp", "metroctl-*")
653 if err != nil {
654 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
655 }
656
657 // Create the socket directory. We keep it in /tmp because of socket path limits.
658 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200659 if err != nil {
660 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
661 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200662
663 // Prepare links between nodes and nanoswitch.
664 var switchPorts []*os.File
665 var vmPorts []*os.File
666 for i := 0; i < opts.NumNodes; i++ {
667 switchPort, vmPort, err := launch.NewSocketPair()
668 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200669 return nil, fmt.Errorf("failed to get socketpair: %w", err)
670 }
671 switchPorts = append(switchPorts, switchPort)
672 vmPorts = append(vmPorts, vmPort)
673 }
674
Serge Bazanskie78a0892021-10-07 17:03:49 +0200675 // Make a list of channels that will be populated by all running node qemu
676 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200677 done := make([]chan error, opts.NumNodes)
678 for i, _ := range done {
679 done[i] = make(chan error, 1)
680 }
681
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200682 // Prepare the node options. These will be kept as part of Cluster.
683 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
684 // launch. The runtime information can be later used to restart a node.
685 // The 0th node will be initialized first. The rest will follow after it
686 // had bootstrapped the cluster.
687 nodeOpts := make([]NodeOptions, opts.NumNodes)
688 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100689 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200690 ConnectToSocket: vmPorts[0],
691 NodeParameters: &apb.NodeParameters{
692 Cluster: &apb.NodeParameters_ClusterBootstrap_{
693 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
694 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200695 },
696 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200697 },
698 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100699 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200700 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100701 if opts.NodeLogsToFiles {
702 path := path.Join(ld, "node-1.txt")
703 port, err := NewSerialFileLogger(path)
704 if err != nil {
705 return nil, fmt.Errorf("could not open log file for node 1: %w", err)
706 }
707 launch.Log("Node 1 logs at %s", path)
708 nodeOpts[0].SerialPort = port
709 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200710
711 // Start the first node.
712 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100713 launch.Log("Cluster: Starting node %d...", 1)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200714 go func() {
715 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200716 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100717 launch.Log("Node %d finished with an error: %v", 1, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200718 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200719 done[0] <- err
720 }()
721
Serge Bazanskie78a0892021-10-07 17:03:49 +0200722 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200723 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
724 if err != nil {
725 ctxC()
726 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
727 }
728
729 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100730 var serialPort io.ReadWriter
731 if opts.NodeLogsToFiles {
732 path := path.Join(ld, "nanoswitch.txt")
733 serialPort, err = NewSerialFileLogger(path)
734 if err != nil {
735 launch.Log("Could not open log file for nanoswitch: %v", err)
736 }
737 launch.Log("Nanoswitch logs at %s", path)
738 } else {
739 serialPort = newPrefixedStdio(99)
740 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200741 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100742 Name: "nanoswitch",
Serge Bazanski66e58952021-10-05 17:06:56 +0200743 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100744 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200745 ExtraNetworkInterfaces: switchPorts,
746 PortMap: portMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100747 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100748 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200749 }); err != nil {
750 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100751 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200752 }
753 }
754 }()
755
Serge Bazanskibe742842022-04-04 13:18:50 +0200756 // Build SOCKS dialer.
757 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
758 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200759 if err != nil {
760 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200761 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200762 }
763
Serge Bazanskibe742842022-04-04 13:18:50 +0200764 // Retrieve owner credentials and first node.
765 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200766 if err != nil {
767 ctxC()
768 return nil, err
769 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200770
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100771 // Write credentials to the metroctl directory.
772 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
773 ctxC()
774 return nil, fmt.Errorf("could not write owner key: %w", err)
775 }
776 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
777 ctxC()
778 return nil, fmt.Errorf("could not write owner certificate: %w", err)
779 }
780
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200781 // Set up a partially initialized cluster instance, to be filled in in the
782 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200783 cluster := &Cluster{
784 Owner: *cert,
785 Ports: portMap,
786 Nodes: map[string]*NodeInCluster{
787 firstNode.ID: firstNode,
788 },
789 NodeIDs: []string{
790 firstNode.ID,
791 },
792
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100793 nodesDone: done,
794 nodeOpts: nodeOpts,
795 launchDir: ld,
796 socketDir: sd,
797 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200798
Serge Bazanskibe742842022-04-04 13:18:50 +0200799 socksDialer: socksDialer,
800
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200801 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200802 ctxC: ctxC,
803 }
804
805 // Now start the rest of the nodes and register them into the cluster.
806
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200807 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200808 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200809 if err != nil {
810 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200811 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200812 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200813 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200814
815 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100816 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200817 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
818 if err != nil {
819 ctxC()
820 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
821 }
822 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100823 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200824
825 // Retrieve cluster info (for directory and ca public key) to register further
826 // nodes.
827 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
828 if err != nil {
829 ctxC()
830 return nil, fmt.Errorf("GetClusterInfo: %w", err)
831 }
832
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200833 // Use the retrieved information to configure the rest of the node options.
834 for i := 1; i < opts.NumNodes; i++ {
835 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100836 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200837 ConnectToSocket: vmPorts[i],
838 NodeParameters: &apb.NodeParameters{
839 Cluster: &apb.NodeParameters_ClusterRegister_{
840 ClusterRegister: &apb.NodeParameters_ClusterRegister{
841 RegisterTicket: ticket,
842 ClusterDirectory: resI.ClusterDirectory,
843 CaCertificate: resI.CaCertificate,
844 },
845 },
846 },
847 SerialPort: newPrefixedStdio(i),
848 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100849 if opts.NodeLogsToFiles {
850 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
851 port, err := NewSerialFileLogger(path)
852 if err != nil {
853 return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
854 }
855 launch.Log("Node %d logs at %s", i+1, path)
856 nodeOpts[i].SerialPort = port
857 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200858 }
859
860 // Now run the rest of the nodes.
861 //
Serge Bazanskie78a0892021-10-07 17:03:49 +0200862 // TODO(q3k): parallelize this
863 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100864 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200865 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200866 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200867 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100868 launch.Log("Node %d finished with an error: %v", i, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200869 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200870 done[i] <- err
871 }(i)
872 var newNode *apb.Node
873
Serge Bazanski05f813b2023-03-16 17:58:39 +0100874 launch.Log("Cluster: waiting for node %d to appear as NEW...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200875 for {
876 nodes, err := getNodes(ctx, mgmt)
877 if err != nil {
878 ctxC()
879 return nil, fmt.Errorf("could not get nodes: %w", err)
880 }
881 for _, n := range nodes {
882 if n.State == cpb.NodeState_NODE_STATE_NEW {
883 newNode = n
884 break
885 }
886 }
887 if newNode != nil {
888 break
889 }
890 time.Sleep(1 * time.Second)
891 }
892 id := identity.NodeID(newNode.Pubkey)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100893 launch.Log("Cluster: node %d is %s", i, id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200894
Serge Bazanski05f813b2023-03-16 17:58:39 +0100895 launch.Log("Cluster: approving node %d", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200896 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
897 Pubkey: newNode.Pubkey,
898 })
899 if err != nil {
900 ctxC()
901 return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
902 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100903 launch.Log("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200904 for {
905 nodes, err := getNodes(ctx, mgmt)
906 if err != nil {
907 ctxC()
908 return nil, fmt.Errorf("could not get nodes: %w", err)
909 }
910 found := false
911 for _, n := range nodes {
912 if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
913 continue
914 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200915 if n.Status == nil || n.Status.ExternalAddress == "" {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200916 break
917 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200918 if n.State != cpb.NodeState_NODE_STATE_UP {
919 break
920 }
921 found = true
922 cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
923 ID: identity.NodeID(n.Pubkey),
924 ManagementAddress: n.Status.ExternalAddress,
925 }
926 cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
927 break
Serge Bazanskie78a0892021-10-07 17:03:49 +0200928 }
929 if found {
930 break
931 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200932 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200933 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100934 launch.Log("Cluster: node %d (%s) UP!", i, id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200935 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200936
Serge Bazanski05f813b2023-03-16 17:58:39 +0100937 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +0200938 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100939 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +0200940 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200941
Serge Bazanskibe742842022-04-04 13:18:50 +0200942 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200943}
944
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200945// RebootNode reboots the cluster member node matching the given index, and
946// waits for it to rejoin the cluster. It will use the given context ctx to run
947// cluster API requests, whereas the resulting QEMU process will be created
948// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
949func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
950 if idx < 0 || idx >= len(c.NodeIDs) {
951 return fmt.Errorf("index out of bounds.")
952 }
953 id := c.NodeIDs[idx]
954
955 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200956 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200957 if err != nil {
958 return err
959 }
960 mgmt := apb.NewManagementClient(curC)
961
962 // Get the timestamp of the node's last update, as observed by Curator.
963 // It'll be needed to make sure it had rejoined the cluster after the reboot.
964 var is *apb.Node
965 for {
966 r, err := getNode(ctx, mgmt, id)
967 if err != nil {
968 return err
969 }
970
971 // Node status may be absent if it hasn't reported to the cluster yet. Wait
972 // for it to appear before progressing further.
973 if r.Status != nil {
974 is = r
975 break
976 }
977 time.Sleep(time.Second)
978 }
979
980 // Cancel the node's context. This will shut down QEMU.
981 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100982 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200983 err = <-c.nodesDone[idx]
984 if err != nil {
985 return fmt.Errorf("while restarting node: %w", err)
986 }
987
988 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100989 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200990 go func(n int) {
991 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200992 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100993 launch.Log("Node %d finished with an error: %v", n, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200994 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200995 c.nodesDone[n] <- err
996 }(idx)
997
998 // Poll Management.GetNodes until the node's timestamp is updated.
999 for {
1000 cs, err := getNode(ctx, mgmt, id)
1001 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001002 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001003 return err
1004 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001005 launch.Log("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001006 if cs.Status == nil {
1007 continue
1008 }
Mateusz Zalega28800ad2022-07-08 14:56:02 +02001009 if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001010 break
1011 }
1012 time.Sleep(time.Second)
1013 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001014 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001015 return nil
1016}
1017
1018// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001019// nodes to stop. It returns an error if stopping the nodes failed, or one of
1020// the nodes failed to fully start in the first place.
1021func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001022 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001023 if c.authClient != nil {
1024 c.authClient.Close()
1025 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001026 c.ctxC()
1027
Leopold20a036e2023-01-15 00:17:19 +01001028 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001029 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001030 for _, c := range c.nodesDone {
1031 err := <-c
1032 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001033 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001034 }
1035 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001036 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001037 os.RemoveAll(c.launchDir)
1038 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001039 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001040 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001041 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001042}
Serge Bazanskibe742842022-04-04 13:18:50 +02001043
1044// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1045// their ID. This is performed by connecting to the cluster nanoswitch via its
1046// SOCKS proxy, and using the cluster node list for name resolution.
1047//
1048// For example:
1049//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001050// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001051func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1052 host, port, err := net.SplitHostPort(addr)
1053 if err != nil {
1054 return nil, fmt.Errorf("invalid host:port: %w", err)
1055 }
1056 // Already an IP address?
1057 if net.ParseIP(host) != nil {
1058 return c.socksDialer.Dial("tcp", addr)
1059 }
1060
1061 // Otherwise, expect a node name.
1062 node, ok := c.Nodes[host]
1063 if !ok {
1064 return nil, fmt.Errorf("unknown node %q", host)
1065 }
1066 addr = net.JoinHostPort(node.ManagementAddress, port)
1067 return c.socksDialer.Dial("tcp", addr)
1068}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001069
1070// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1071// which are currently Kubernetes controllers, ie. run an apiserver. This list
1072// might be empty if no node is currently configured with the
1073// 'KubernetesController' node.
1074func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1075 curC, err := c.CuratorClient()
1076 if err != nil {
1077 return nil, err
1078 }
1079 mgmt := apb.NewManagementClient(curC)
1080 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1081 Filter: "has(node.roles.kubernetes_controller)",
1082 })
1083 if err != nil {
1084 return nil, err
1085 }
1086 defer srv.CloseSend()
1087 var res []string
1088 for {
1089 n, err := srv.Recv()
1090 if err == io.EOF {
1091 break
1092 }
1093 if err != nil {
1094 return nil, err
1095 }
1096 if n.Status == nil || n.Status.ExternalAddress == "" {
1097 continue
1098 }
1099 res = append(res, n.Status.ExternalAddress)
1100 }
1101 return res, nil
1102}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001103
1104func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1105 // Get an authenticated owner client within the cluster.
1106 curC, err := c.CuratorClient()
1107 if err != nil {
1108 return err
1109 }
1110 mgmt := apb.NewManagementClient(curC)
1111 nodes, err := getNodes(ctx, mgmt)
1112 if err != nil {
1113 return err
1114 }
1115
1116 var unhealthy []string
1117 for _, node := range nodes {
1118 if node.Health == apb.Node_HEALTHY {
1119 continue
1120 }
1121 unhealthy = append(unhealthy, node.Id)
1122 }
1123 if len(unhealthy) == 0 {
1124 return nil
1125 }
1126 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1127}