blob: ea69d967d425faac6fe9237eb9f3b07e92b54255 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020019 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "os"
21 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010022 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "path/filepath"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020024 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020025 "syscall"
26 "time"
27
28 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020030 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020031 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010032 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020035 "k8s.io/client-go/kubernetes"
36 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020037
Serge Bazanski1f8cad72023-03-20 16:58:10 +010038 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020039 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020040 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020041 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020042 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020043 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Lorenz Brun150f24a2023-07-13 20:11:06 +020044 "source.monogon.dev/metropolis/pkg/localregistry"
Serge Bazanski66e58952021-10-05 17:06:56 +020045 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020046 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020047 "source.monogon.dev/metropolis/test/launch"
48)
49
Leopold20a036e2023-01-15 00:17:19 +010050// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020051type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010052 // Name is a human-readable identifier to be used in debug output.
53 Name string
54
Serge Bazanski66e58952021-10-05 17:06:56 +020055 // Ports contains the port mapping where to expose the internal ports of the VM to
56 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
57 // ConnectToSocket is set.
58 Ports launch.PortMap
59
Leopold20a036e2023-01-15 00:17:19 +010060 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
61 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020062 // want to test reboot behavior this should be false.
63 AllowReboot bool
64
Leopold20a036e2023-01-15 00:17:19 +010065 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
66 // set, it is instead connected to the given file descriptor/socket. If this is
67 // set, all port maps from the Ports option are ignored. Intended for networking
68 // this instance together with others for running more complex network
69 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020070 ConnectToSocket *os.File
71
Leopoldacfad5b2023-01-15 14:05:25 +010072 // When PcapDump is set, all traffic is dumped to a pcap file in the
73 // runtime directory (e.g. "net0.pcap" for the first interface).
74 PcapDump bool
75
Leopold20a036e2023-01-15 00:17:19 +010076 // SerialPort is an io.ReadWriter over which you can communicate with the serial
77 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020078 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
79 SerialPort io.ReadWriter
80
81 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
82 // registering into a cluster.
83 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020084
85 // Mac is the node's MAC address.
86 Mac *net.HardwareAddr
87
88 // Runtime keeps the node's QEMU runtime state.
89 Runtime *NodeRuntime
90}
91
Leopold20a036e2023-01-15 00:17:19 +010092// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020093type NodeRuntime struct {
94 // ld points at the node's launch directory storing data such as storage
95 // images, firmware variables or the TPM state.
96 ld string
97 // sd points at the node's socket directory.
98 sd string
99
100 // ctxT is the context QEMU will execute in.
101 ctxT context.Context
102 // CtxC is the QEMU context's cancellation function.
103 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200104}
105
106// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200107var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200108 node.ConsensusPort,
109
110 node.CuratorServicePort,
111 node.DebugServicePort,
112
113 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100114 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200115 node.CuratorServicePort,
116 node.DebuggerPort,
117}
118
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200119// setupRuntime creates the node's QEMU runtime directory, together with all
120// files required to preserve its state, a level below the chosen path ld. The
121// node's socket directory is similarily created a level below sd. It may
122// return an I/O error.
123func setupRuntime(ld, sd string) (*NodeRuntime, error) {
124 // Create a temporary directory to keep all the runtime files.
125 stdp, err := os.MkdirTemp(ld, "node_state*")
126 if err != nil {
127 return nil, fmt.Errorf("failed to create the state directory: %w", err)
128 }
129
130 // Initialize the node's storage with a prebuilt image.
131 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
132 if err != nil {
133 return nil, fmt.Errorf("while resolving a path: %w", err)
134 }
135 di := filepath.Join(stdp, filepath.Base(si))
Serge Bazanski05f813b2023-03-16 17:58:39 +0100136 launch.Log("Cluster: copying node image: %s -> %s", si, di)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200137 if err := copyFile(si, di); err != nil {
138 return nil, fmt.Errorf("while copying the node image: %w", err)
139 }
140
141 // Initialize the OVMF firmware variables file.
142 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
143 if err != nil {
144 return nil, fmt.Errorf("while resolving a path: %w", err)
145 }
146 dv := filepath.Join(stdp, filepath.Base(sv))
147 if err := copyFile(sv, dv); err != nil {
148 return nil, fmt.Errorf("while copying firmware variables: %w", err)
149 }
150
151 // Create the TPM state directory and initialize all files required by swtpm.
152 tpmt := filepath.Join(stdp, "tpm")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200153 if err := os.Mkdir(tpmt, 0o755); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200154 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
155 }
156 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
157 if err != nil {
158 return nil, fmt.Errorf("while resolving a path: %w", err)
159 }
160 tpmf, err := os.ReadDir(tpms)
161 if err != nil {
162 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
163 }
164 for _, file := range tpmf {
165 name := file.Name()
166 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
167 if err != nil {
168 return nil, fmt.Errorf("while resolving a path: %w", err)
169 }
170 tgt := filepath.Join(tpmt, name)
171 if err := copyFile(src, tgt); err != nil {
172 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
173 }
174 }
175
176 // Create the socket directory.
177 sotdp, err := os.MkdirTemp(sd, "node_sock*")
178 if err != nil {
179 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
180 }
181
182 return &NodeRuntime{
183 ld: stdp,
184 sd: sotdp,
185 }, nil
186}
187
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200188// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200189// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200190func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200191 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200192 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200193 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100194 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200195 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200196 for _, n := range c.NodeIDs {
197 ep, err := resolver.NodeWithDefaultPort(n)
198 if err != nil {
199 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
200 }
201 r.AddEndpoint(ep)
202 }
203 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
204 grpc.WithTransportCredentials(authCreds),
205 grpc.WithResolvers(r),
206 grpc.WithContextDialer(c.DialNode),
207 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200208 if err != nil {
209 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
210 }
211 c.authClient = authClient
212 }
213 return c.authClient, nil
214}
215
Serge Bazanski66e58952021-10-05 17:06:56 +0200216// LaunchNode launches a single Metropolis node instance with the given options.
217// The instance runs mostly paravirtualized but with some emulated hardware
218// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200219// writable, and the changes are kept across reboots and shutdowns. ld and sd
220// point to the launch directory and the socket directory, holding the nodes'
221// state files (storage, tpm state, firmware state), and UNIX socket files
222// (swtpm <-> QEMU interplay) respectively. The directories must exist before
223// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
224// if either are not initialized.
225func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
226 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
227 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200228 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
229 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
230 // that we open and pass the name of it to QEMU. Not pinning this crashes both
231 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
232 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200233
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200234 // If it's the node's first start, set up its runtime directories.
235 if options.Runtime == nil {
236 r, err := setupRuntime(ld, sd)
237 if err != nil {
238 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200239 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200240 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200241 }
242
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200243 // Replace the node's context with a new one.
244 r := options.Runtime
245 if r.CtxC != nil {
246 r.CtxC()
247 }
248 r.ctxT, r.CtxC = context.WithCancel(ctx)
249
Serge Bazanski66e58952021-10-05 17:06:56 +0200250 var qemuNetType string
251 var qemuNetConfig launch.QemuValue
252 if options.ConnectToSocket != nil {
253 qemuNetType = "socket"
254 qemuNetConfig = launch.QemuValue{
255 "id": {"net0"},
256 "fd": {"3"},
257 }
258 } else {
259 qemuNetType = "user"
260 qemuNetConfig = launch.QemuValue{
261 "id": {"net0"},
262 "net": {"10.42.0.0/24"},
263 "dhcpstart": {"10.42.0.10"},
264 "hostfwd": options.Ports.ToQemuForwards(),
265 }
266 }
267
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200268 // Generate the node's MAC address if it isn't already set in NodeOptions.
269 if options.Mac == nil {
270 mac, err := generateRandomEthernetMAC()
271 if err != nil {
272 return err
273 }
274 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200275 }
276
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200277 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
278 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
279 storagePath := filepath.Join(r.ld, "node.img")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200280 qemuArgs := []string{
281 "-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
Serge Bazanski66e58952021-10-05 17:06:56 +0200282 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
283 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200284 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
285 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200286 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200287 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200288 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
289 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
290 "-device", "tpm-tis,tpmdev=tpm0",
291 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200292 "-serial", "stdio",
293 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200294
295 if !options.AllowReboot {
296 qemuArgs = append(qemuArgs, "-no-reboot")
297 }
298
299 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200300 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200301 parametersRaw, err := proto.Marshal(options.NodeParameters)
302 if err != nil {
303 return fmt.Errorf("failed to encode node paraeters: %w", err)
304 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200305 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200306 return fmt.Errorf("failed to write node parameters: %w", err)
307 }
308 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
309 }
310
Leopoldacfad5b2023-01-15 14:05:25 +0100311 if options.PcapDump {
312 var qemuNetDump launch.QemuValue
313 pcapPath := filepath.Join(r.ld, "net0.pcap")
314 if options.PcapDump {
315 qemuNetDump = launch.QemuValue{
316 "id": {"net0"},
317 "netdev": {"net0"},
318 "file": {pcapPath},
319 }
320 }
321 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
322 }
323
Serge Bazanski66e58952021-10-05 17:06:56 +0200324 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200325 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200326 defer tpmCancel()
327
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200328 tpmd := filepath.Join(r.ld, "tpm")
329 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200330 tpmEmuCmd.Stderr = os.Stderr
331 tpmEmuCmd.Stdout = os.Stdout
332
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200333 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200334 if err != nil {
335 return fmt.Errorf("failed to start TPM emulator: %w", err)
336 }
337
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200338 // Wait for the socket to be created by the TPM emulator before launching
339 // QEMU.
340 for {
341 _, err := os.Stat(tpmSocketPath)
342 if err == nil {
343 break
344 }
345 if err != nil && !os.IsNotExist(err) {
346 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
347 }
348 if err := tpmCtx.Err(); err != nil {
349 return fmt.Errorf("while waiting for the TPM socket: %w", err)
350 }
351 time.Sleep(time.Millisecond * 100)
352 }
353
Serge Bazanski66e58952021-10-05 17:06:56 +0200354 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200355 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200356 if options.ConnectToSocket != nil {
357 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
358 }
359
360 var stdErrBuf bytes.Buffer
361 systemCmd.Stderr = &stdErrBuf
362 systemCmd.Stdout = options.SerialPort
363
Leopoldaf5086b2023-01-15 14:12:42 +0100364 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
365
Serge Bazanski66e58952021-10-05 17:06:56 +0200366 err = systemCmd.Run()
367
368 // Stop TPM emulator and wait for it to exit to properly reap the child process
369 tpmCancel()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100370 launch.Log("Node: Waiting for TPM emulator to exit")
Serge Bazanski66e58952021-10-05 17:06:56 +0200371 // Wait returns a SIGKILL error because we just cancelled its context.
372 // We still need to call it to avoid creating zombies.
373 _ = tpmEmuCmd.Wait()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100374 launch.Log("Node: TPM emulator done")
Serge Bazanski66e58952021-10-05 17:06:56 +0200375
376 var exerr *exec.ExitError
377 if err != nil && errors.As(err, &exerr) {
378 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
379 if status.Signaled() && status.Signal() == syscall.SIGKILL {
380 // Process was killed externally (most likely by our context being canceled).
381 // This is a normal exit for us, so return nil
382 return nil
383 }
384 exerr.Stderr = stdErrBuf.Bytes()
385 newErr := launch.QEMUError(*exerr)
386 return &newErr
387 }
388 return err
389}
390
391func copyFile(src, dst string) error {
392 in, err := os.Open(src)
393 if err != nil {
394 return fmt.Errorf("when opening source: %w", err)
395 }
396 defer in.Close()
397
398 out, err := os.Create(dst)
399 if err != nil {
400 return fmt.Errorf("when creating destination: %w", err)
401 }
402 defer out.Close()
403
404 _, err = io.Copy(out, in)
405 if err != nil {
406 return fmt.Errorf("when copying file: %w", err)
407 }
408 return out.Close()
409}
410
Serge Bazanskie78a0892021-10-07 17:03:49 +0200411// getNodes wraps around Management.GetNodes to return a list of nodes in a
412// cluster.
413func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200414 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100415 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100416 err := backoff.Retry(func() error {
417 res = nil
418 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200419 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100420 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200421 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100422 for {
423 node, err := srvN.Recv()
424 if err == io.EOF {
425 break
426 }
427 if err != nil {
428 return fmt.Errorf("GetNodes.Recv: %w", err)
429 }
430 res = append(res, node)
431 }
432 return nil
433 }, bo)
434 if err != nil {
435 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200436 }
437 return res, nil
438}
439
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200440// getNode wraps Management.GetNodes. It returns node information matching
441// given node ID.
442func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
443 nodes, err := getNodes(ctx, mgmt)
444 if err != nil {
445 return nil, fmt.Errorf("could not get nodes: %w", err)
446 }
447 for _, n := range nodes {
448 eid := identity.NodeID(n.Pubkey)
449 if eid != id {
450 continue
451 }
452 return n, nil
453 }
454 return nil, fmt.Errorf("no such node.")
455}
456
Serge Bazanski66e58952021-10-05 17:06:56 +0200457// Gets a random EUI-48 Ethernet MAC address
458func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
459 macBuf := make([]byte, 6)
460 _, err := rand.Read(macBuf)
461 if err != nil {
462 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
463 }
464
465 // Set U/L bit and clear I/G bit (locally administered individual MAC)
466 // Ref IEEE 802-2014 Section 8.2.2
467 macBuf[0] = (macBuf[0] | 2) & 0xfe
468 mac := net.HardwareAddr(macBuf)
469 return &mac, nil
470}
471
Serge Bazanskibe742842022-04-04 13:18:50 +0200472const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200473
Serge Bazanskibe742842022-04-04 13:18:50 +0200474// ClusterPorts contains all ports handled by Nanoswitch.
475var ClusterPorts = []uint16{
476 // Forwarded to the first node.
477 uint16(node.CuratorServicePort),
478 uint16(node.DebugServicePort),
479 uint16(node.KubernetesAPIPort),
480 uint16(node.KubernetesAPIWrappedPort),
481
482 // SOCKS proxy to the switch network
483 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200484}
485
486// ClusterOptions contains all options for launching a Metropolis cluster.
487type ClusterOptions struct {
488 // The number of nodes this cluster should be started with.
489 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100490
491 // If true, node logs will be saved to individual files instead of being printed
492 // out to stderr. The path of these files will be still printed to stdout.
493 //
494 // The files will be located within the launch directory inside TEST_TMPDIR (or
495 // the default tempdir location, if not set).
496 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200497
498 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
499 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
500 // incomplete.
501 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200502
503 // Optional local registry which will be made available to the cluster to
504 // pull images from. This is a more efficient alternative to preseeding all
505 // images used for testing.
506 LocalRegistry *localregistry.Server
Serge Bazanski66e58952021-10-05 17:06:56 +0200507}
508
509// Cluster is the running Metropolis cluster launched using the LaunchCluster
510// function.
511type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200512 // Owner is the TLS Certificate of the owner of the test cluster. This can be
513 // used to authenticate further clients to the running cluster.
514 Owner tls.Certificate
515 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200516 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200517 Ports launch.PortMap
518
Serge Bazanskibe742842022-04-04 13:18:50 +0200519 // Nodes is a map from Node ID to its runtime information.
520 Nodes map[string]*NodeInCluster
521 // NodeIDs is a list of node IDs that are backing this cluster, in order of
522 // creation.
523 NodeIDs []string
524
Serge Bazanski54e212a2023-06-14 13:45:11 +0200525 // CACertificate is the cluster's CA certificate.
526 CACertificate *x509.Certificate
527
Serge Bazanski66e58952021-10-05 17:06:56 +0200528 // nodesDone is a list of channels populated with the return codes from all the
529 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100530 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200531 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200532 // nodeOpts are the cluster member nodes' mutable launch options, kept here
533 // to facilitate reboots.
534 nodeOpts []NodeOptions
535 // launchDir points at the directory keeping the nodes' state, such as storage
536 // images, firmware variable files, TPM state.
537 launchDir string
538 // socketDir points at the directory keeping UNIX socket files, such as these
539 // used to facilitate communication between QEMU and swtpm. It's different
540 // from launchDir, and anchored nearer the file system root, due to the
541 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100542 socketDir string
543 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200544
Serge Bazanskibe742842022-04-04 13:18:50 +0200545 // socksDialer is used by DialNode to establish connections to nodes via the
546 // SOCKS server ran by nanoswitch.
547 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200548
549 // authClient is a cached authenticated owner connection to a Curator
550 // instance within the cluster.
551 authClient *grpc.ClientConn
552
553 // ctxT is the context individual node contexts are created from.
554 ctxT context.Context
555 // ctxC is used by Close to cancel the context under which the nodes are
556 // running.
557 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200558}
559
560// NodeInCluster represents information about a node that's part of a Cluster.
561type NodeInCluster struct {
562 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200563 ID string
564 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200565 // Address of the node on the network ran by nanoswitch. Not reachable from the
566 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
567 // on Cluster.Ports[SOCKSPort]).
568 ManagementAddress string
569}
570
571// firstConnection performs the initial owner credential escrow with a newly
572// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
573// running at 10.1.0.2, which is always the case with the current nanoswitch
574// implementation.
575//
Leopold20a036e2023-01-15 00:17:19 +0100576// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200577// information as NodeInCluster.
578func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
579 // Dial external service.
580 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
581 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
582 if err != nil {
583 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
584 }
585 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
586 return socksDialer.Dial("tcp", addr)
587 }
588 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
589 if err != nil {
590 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
591 }
592 defer initClient.Close()
593
594 // Retrieve owner certificate - this can take a while because the node is still
595 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100596 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200597 aaa := apb.NewAAAClient(initClient)
598 var cert *tls.Certificate
599 err = backoff.Retry(func() error {
600 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
601 if st, ok := status.FromError(err); ok {
602 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100603 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200604 return err
605 }
606 }
607 return backoff.Permanent(err)
608 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
609 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200610 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200611 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100612 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200613
614 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200615 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200616 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
617 if err != nil {
618 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
619 }
620 defer authClient.Close()
621 mgmt := apb.NewManagementClient(authClient)
622
623 var node *NodeInCluster
624 err = backoff.Retry(func() error {
625 nodes, err := getNodes(ctx, mgmt)
626 if err != nil {
627 return fmt.Errorf("retrieving nodes failed: %w", err)
628 }
629 if len(nodes) != 1 {
630 return fmt.Errorf("expected one node, got %d", len(nodes))
631 }
632 n := nodes[0]
633 if n.Status == nil || n.Status.ExternalAddress == "" {
634 return fmt.Errorf("node has no status and/or address")
635 }
636 node = &NodeInCluster{
637 ID: identity.NodeID(n.Pubkey),
638 ManagementAddress: n.Status.ExternalAddress,
639 }
640 return nil
641 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
642 if err != nil {
643 return nil, nil, err
644 }
645
646 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200647}
648
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100649func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200650 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100651 if err != nil {
652 return nil, err
653 }
654 return f, nil
655}
656
Serge Bazanski66e58952021-10-05 17:06:56 +0200657// LaunchCluster launches a cluster of Metropolis node VMs together with a
658// Nanoswitch instance to network them all together.
659//
660// The given context will be used to run all qemu instances in the cluster, and
661// canceling the context or calling Close() will terminate them.
662func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200663 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200664 return nil, errors.New("refusing to start cluster with zero nodes")
665 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200666
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200667 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100668 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200669 if err != nil {
670 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
671 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100672 // Create the metroctl config directory. We keep it in /tmp because in some
673 // scenarios it's end-user visible and we want it short.
674 md, err := os.MkdirTemp("/tmp", "metroctl-*")
675 if err != nil {
676 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
677 }
678
679 // Create the socket directory. We keep it in /tmp because of socket path limits.
680 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200681 if err != nil {
682 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
683 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200684
685 // Prepare links between nodes and nanoswitch.
686 var switchPorts []*os.File
687 var vmPorts []*os.File
688 for i := 0; i < opts.NumNodes; i++ {
689 switchPort, vmPort, err := launch.NewSocketPair()
690 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200691 return nil, fmt.Errorf("failed to get socketpair: %w", err)
692 }
693 switchPorts = append(switchPorts, switchPort)
694 vmPorts = append(vmPorts, vmPort)
695 }
696
Serge Bazanskie78a0892021-10-07 17:03:49 +0200697 // Make a list of channels that will be populated by all running node qemu
698 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200699 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200700 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200701 done[i] = make(chan error, 1)
702 }
703
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200704 // Prepare the node options. These will be kept as part of Cluster.
705 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
706 // launch. The runtime information can be later used to restart a node.
707 // The 0th node will be initialized first. The rest will follow after it
708 // had bootstrapped the cluster.
709 nodeOpts := make([]NodeOptions, opts.NumNodes)
710 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100711 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200712 ConnectToSocket: vmPorts[0],
713 NodeParameters: &apb.NodeParameters{
714 Cluster: &apb.NodeParameters_ClusterBootstrap_{
715 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
716 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200717 },
718 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200719 },
720 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100721 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200722 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100723 if opts.NodeLogsToFiles {
724 path := path.Join(ld, "node-1.txt")
725 port, err := NewSerialFileLogger(path)
726 if err != nil {
727 return nil, fmt.Errorf("could not open log file for node 1: %w", err)
728 }
729 launch.Log("Node 1 logs at %s", path)
730 nodeOpts[0].SerialPort = port
731 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200732
733 // Start the first node.
734 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100735 launch.Log("Cluster: Starting node %d...", 1)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200736 go func() {
737 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200738 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100739 launch.Log("Node %d finished with an error: %v", 1, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200740 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200741 done[0] <- err
742 }()
743
Lorenz Brun150f24a2023-07-13 20:11:06 +0200744 localRegistryAddr := net.TCPAddr{
745 IP: net.IPv4(10, 42, 0, 82),
746 Port: 5000,
747 }
748
749 var guestSvcMap launch.GuestServiceMap
750 if opts.LocalRegistry != nil {
751 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
752 if err != nil {
753 ctxC()
754 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
755 }
756 s := http.Server{
757 Handler: opts.LocalRegistry,
758 }
759 go s.Serve(l)
760 go func() {
761 <-ctxT.Done()
762 s.Close()
763 }()
764 guestSvcMap = launch.GuestServiceMap{
765 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
766 }
767 }
768
Serge Bazanskie78a0892021-10-07 17:03:49 +0200769 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200770 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
771 if err != nil {
772 ctxC()
773 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
774 }
775
776 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100777 var serialPort io.ReadWriter
778 if opts.NodeLogsToFiles {
779 path := path.Join(ld, "nanoswitch.txt")
780 serialPort, err = NewSerialFileLogger(path)
781 if err != nil {
782 launch.Log("Could not open log file for nanoswitch: %v", err)
783 }
784 launch.Log("Nanoswitch logs at %s", path)
785 } else {
786 serialPort = newPrefixedStdio(99)
787 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200788 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100789 Name: "nanoswitch",
Serge Bazanski66e58952021-10-05 17:06:56 +0200790 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100791 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200792 ExtraNetworkInterfaces: switchPorts,
793 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200794 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100795 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100796 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200797 }); err != nil {
798 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100799 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200800 }
801 }
802 }()
803
Serge Bazanskibe742842022-04-04 13:18:50 +0200804 // Build SOCKS dialer.
805 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
806 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200807 if err != nil {
808 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200809 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200810 }
811
Serge Bazanskibe742842022-04-04 13:18:50 +0200812 // Retrieve owner credentials and first node.
813 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200814 if err != nil {
815 ctxC()
816 return nil, err
817 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200818
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100819 // Write credentials to the metroctl directory.
820 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
821 ctxC()
822 return nil, fmt.Errorf("could not write owner key: %w", err)
823 }
824 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
825 ctxC()
826 return nil, fmt.Errorf("could not write owner certificate: %w", err)
827 }
828
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200829 // Set up a partially initialized cluster instance, to be filled in in the
830 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200831 cluster := &Cluster{
832 Owner: *cert,
833 Ports: portMap,
834 Nodes: map[string]*NodeInCluster{
835 firstNode.ID: firstNode,
836 },
837 NodeIDs: []string{
838 firstNode.ID,
839 },
840
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100841 nodesDone: done,
842 nodeOpts: nodeOpts,
843 launchDir: ld,
844 socketDir: sd,
845 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200846
Serge Bazanskibe742842022-04-04 13:18:50 +0200847 socksDialer: socksDialer,
848
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200849 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200850 ctxC: ctxC,
851 }
852
853 // Now start the rest of the nodes and register them into the cluster.
854
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200855 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200856 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200857 if err != nil {
858 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200859 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200860 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200861 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200862
863 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100864 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200865 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
866 if err != nil {
867 ctxC()
868 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
869 }
870 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100871 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200872
873 // Retrieve cluster info (for directory and ca public key) to register further
874 // nodes.
875 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
876 if err != nil {
877 ctxC()
878 return nil, fmt.Errorf("GetClusterInfo: %w", err)
879 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200880 caCert, err := x509.ParseCertificate(resI.CaCertificate)
881 if err != nil {
882 ctxC()
883 return nil, fmt.Errorf("ParseCertificate: %w", err)
884 }
885 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200886
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200887 // Use the retrieved information to configure the rest of the node options.
888 for i := 1; i < opts.NumNodes; i++ {
889 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100890 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200891 ConnectToSocket: vmPorts[i],
892 NodeParameters: &apb.NodeParameters{
893 Cluster: &apb.NodeParameters_ClusterRegister_{
894 ClusterRegister: &apb.NodeParameters_ClusterRegister{
895 RegisterTicket: ticket,
896 ClusterDirectory: resI.ClusterDirectory,
897 CaCertificate: resI.CaCertificate,
898 },
899 },
900 },
901 SerialPort: newPrefixedStdio(i),
902 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100903 if opts.NodeLogsToFiles {
904 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
905 port, err := NewSerialFileLogger(path)
906 if err != nil {
907 return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
908 }
909 launch.Log("Node %d logs at %s", i+1, path)
910 nodeOpts[i].SerialPort = port
911 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200912 }
913
914 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200915 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100916 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200917 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200918 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200919 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100920 launch.Log("Node %d finished with an error: %v", i, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200921 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200922 done[i] <- err
923 }(i)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200924 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200925
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200926 seenNodes := make(map[string]bool)
927 launch.Log("Cluster: waiting for nodes to appear as NEW...")
928 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200929 for {
930 nodes, err := getNodes(ctx, mgmt)
931 if err != nil {
932 ctxC()
933 return nil, fmt.Errorf("could not get nodes: %w", err)
934 }
935 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200936 if n.State != cpb.NodeState_NODE_STATE_NEW {
937 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +0200938 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200939 seenNodes[n.Id] = true
940 cluster.Nodes[n.Id] = &NodeInCluster{
941 ID: n.Id,
942 Pubkey: n.Pubkey,
943 }
944 cluster.NodeIDs = append(cluster.NodeIDs, n.Id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200945 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200946
947 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200948 break
949 }
950 time.Sleep(1 * time.Second)
951 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200952 }
953 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200954
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200955 approvedNodes := make(map[string]bool)
956 upNodes := make(map[string]bool)
957 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200958 for {
959 nodes, err := getNodes(ctx, mgmt)
960 if err != nil {
961 ctxC()
962 return nil, fmt.Errorf("could not get nodes: %w", err)
963 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200964 for _, node := range nodes {
965 if !seenNodes[node.Id] {
966 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200967 continue
968 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200969
970 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
971 launch.Log("Cluster: node %s is up", node.Id)
972 upNodes[node.Id] = true
973 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +0200974 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200975 if upNodes[node.Id] {
976 continue
Serge Bazanskibe742842022-04-04 13:18:50 +0200977 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200978
979 if !approvedNodes[node.Id] {
980 launch.Log("Cluster: approving node %s", node.Id)
981 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
982 Pubkey: node.Pubkey,
983 })
984 if err != nil {
985 ctxC()
986 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
987 }
988 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +0200989 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200990 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200991
992 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
993 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200994 break
995 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200996 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200997 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200998 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200999
Serge Bazanski05f813b2023-03-16 17:58:39 +01001000 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +02001001 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001002 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001003 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001004 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001005
Serge Bazanskibe742842022-04-04 13:18:50 +02001006 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001007}
1008
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001009// RebootNode reboots the cluster member node matching the given index, and
1010// waits for it to rejoin the cluster. It will use the given context ctx to run
1011// cluster API requests, whereas the resulting QEMU process will be created
1012// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1013func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1014 if idx < 0 || idx >= len(c.NodeIDs) {
1015 return fmt.Errorf("index out of bounds.")
1016 }
1017 id := c.NodeIDs[idx]
1018
1019 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001020 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001021 if err != nil {
1022 return err
1023 }
1024 mgmt := apb.NewManagementClient(curC)
1025
1026 // Get the timestamp of the node's last update, as observed by Curator.
1027 // It'll be needed to make sure it had rejoined the cluster after the reboot.
1028 var is *apb.Node
1029 for {
1030 r, err := getNode(ctx, mgmt, id)
1031 if err != nil {
1032 return err
1033 }
1034
1035 // Node status may be absent if it hasn't reported to the cluster yet. Wait
1036 // for it to appear before progressing further.
1037 if r.Status != nil {
1038 is = r
1039 break
1040 }
1041 time.Sleep(time.Second)
1042 }
1043
1044 // Cancel the node's context. This will shut down QEMU.
1045 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001046 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001047 err = <-c.nodesDone[idx]
1048 if err != nil {
1049 return fmt.Errorf("while restarting node: %w", err)
1050 }
1051
1052 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001053 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001054 go func(n int) {
1055 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001056 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001057 launch.Log("Node %d finished with an error: %v", n, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001058 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001059 c.nodesDone[n] <- err
1060 }(idx)
1061
1062 // Poll Management.GetNodes until the node's timestamp is updated.
1063 for {
1064 cs, err := getNode(ctx, mgmt, id)
1065 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001066 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001067 return err
1068 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001069 launch.Log("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001070 if cs.Status == nil {
1071 continue
1072 }
Mateusz Zalega28800ad2022-07-08 14:56:02 +02001073 if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001074 break
1075 }
1076 time.Sleep(time.Second)
1077 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001078 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001079 return nil
1080}
1081
1082// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001083// nodes to stop. It returns an error if stopping the nodes failed, or one of
1084// the nodes failed to fully start in the first place.
1085func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001086 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001087 if c.authClient != nil {
1088 c.authClient.Close()
1089 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001090 c.ctxC()
1091
Leopold20a036e2023-01-15 00:17:19 +01001092 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001093 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001094 for _, c := range c.nodesDone {
1095 err := <-c
1096 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001097 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001098 }
1099 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001100 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001101 os.RemoveAll(c.launchDir)
1102 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001103 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001104 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001105 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001106}
Serge Bazanskibe742842022-04-04 13:18:50 +02001107
1108// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1109// their ID. This is performed by connecting to the cluster nanoswitch via its
1110// SOCKS proxy, and using the cluster node list for name resolution.
1111//
1112// For example:
1113//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001114// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001115func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1116 host, port, err := net.SplitHostPort(addr)
1117 if err != nil {
1118 return nil, fmt.Errorf("invalid host:port: %w", err)
1119 }
1120 // Already an IP address?
1121 if net.ParseIP(host) != nil {
1122 return c.socksDialer.Dial("tcp", addr)
1123 }
1124
1125 // Otherwise, expect a node name.
1126 node, ok := c.Nodes[host]
1127 if !ok {
1128 return nil, fmt.Errorf("unknown node %q", host)
1129 }
1130 addr = net.JoinHostPort(node.ManagementAddress, port)
1131 return c.socksDialer.Dial("tcp", addr)
1132}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001133
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001134// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1135// Kubernetes authenticating proxy using the cluster owner identity.
1136// It currently has access to everything (i.e. the cluster-admin role)
1137// via the owner-admin binding.
1138func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1139 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1140 if err != nil {
1141 // We explicitly pass an Ed25519 private key in, so this can't happen
1142 panic(err)
1143 }
1144
1145 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001146 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001147 Host: host,
1148 TLSClientConfig: rest.TLSClientConfig{
1149 // TODO(q3k): use CA certificate
1150 Insecure: true,
1151 ServerName: "kubernetes.default.svc",
1152 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1153 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1154 },
1155 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1156 return c.DialNode(ctx, address)
1157 },
1158 }
1159 return kubernetes.NewForConfig(&clientConfig)
1160}
1161
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001162// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1163// which are currently Kubernetes controllers, ie. run an apiserver. This list
1164// might be empty if no node is currently configured with the
1165// 'KubernetesController' node.
1166func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1167 curC, err := c.CuratorClient()
1168 if err != nil {
1169 return nil, err
1170 }
1171 mgmt := apb.NewManagementClient(curC)
1172 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1173 Filter: "has(node.roles.kubernetes_controller)",
1174 })
1175 if err != nil {
1176 return nil, err
1177 }
1178 defer srv.CloseSend()
1179 var res []string
1180 for {
1181 n, err := srv.Recv()
1182 if err == io.EOF {
1183 break
1184 }
1185 if err != nil {
1186 return nil, err
1187 }
1188 if n.Status == nil || n.Status.ExternalAddress == "" {
1189 continue
1190 }
1191 res = append(res, n.Status.ExternalAddress)
1192 }
1193 return res, nil
1194}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001195
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001196// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1197// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001198func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1199 // Get an authenticated owner client within the cluster.
1200 curC, err := c.CuratorClient()
1201 if err != nil {
1202 return err
1203 }
1204 mgmt := apb.NewManagementClient(curC)
1205 nodes, err := getNodes(ctx, mgmt)
1206 if err != nil {
1207 return err
1208 }
1209
1210 var unhealthy []string
1211 for _, node := range nodes {
1212 if node.Health == apb.Node_HEALTHY {
1213 continue
1214 }
1215 unhealthy = append(unhealthy, node.Id)
1216 }
1217 if len(unhealthy) == 0 {
1218 return nil
1219 }
1220 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1221}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001222
1223// ApproveNode approves a node by ID, waiting for it to become UP.
1224func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1225 curC, err := c.CuratorClient()
1226 if err != nil {
1227 return err
1228 }
1229 mgmt := apb.NewManagementClient(curC)
1230
1231 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1232 Pubkey: c.Nodes[id].Pubkey,
1233 })
1234 if err != nil {
1235 return fmt.Errorf("ApproveNode: %w", err)
1236 }
1237 launch.Log("Cluster: %s: approved, waiting for UP", id)
1238 for {
1239 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1240 if err != nil {
1241 return fmt.Errorf("GetNodes: %w", err)
1242 }
1243 found := false
1244 for {
1245 node, err := nodes.Recv()
1246 if errors.Is(err, io.EOF) {
1247 break
1248 }
1249 if err != nil {
1250 return fmt.Errorf("Nodes.Recv: %w", err)
1251 }
1252 if node.Id != id {
1253 continue
1254 }
1255 if node.State != cpb.NodeState_NODE_STATE_UP {
1256 continue
1257 }
1258 found = true
1259 break
1260 }
1261 nodes.CloseSend()
1262
1263 if found {
1264 break
1265 }
1266 time.Sleep(time.Second)
1267 }
1268 launch.Log("Cluster: %s: UP", id)
1269 return nil
1270}
1271
1272// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1273func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1274 curC, err := c.CuratorClient()
1275 if err != nil {
1276 return err
1277 }
1278 mgmt := apb.NewManagementClient(curC)
1279
1280 tr := true
1281 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1282 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1283 Node: &apb.UpdateNodeRolesRequest_Id{
1284 Id: id,
1285 },
1286 KubernetesWorker: &tr,
1287 })
1288 return err
1289}