blob: 5b39f67b4344c1265d25bd0a703fe381429a774d [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
19 "os"
20 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010021 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020022 "path/filepath"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020023 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020024 "syscall"
25 "time"
26
27 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020028 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020029 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010031 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020034 "k8s.io/client-go/kubernetes"
35 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020036
Serge Bazanski1f8cad72023-03-20 16:58:10 +010037 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020038 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020039 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020040 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020041 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020042 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Serge Bazanski66e58952021-10-05 17:06:56 +020043 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskie78a0892021-10-07 17:03:49 +020044 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski66e58952021-10-05 17:06:56 +020045 "source.monogon.dev/metropolis/test/launch"
46)
47
Leopold20a036e2023-01-15 00:17:19 +010048// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020049type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010050 // Name is a human-readable identifier to be used in debug output.
51 Name string
52
Serge Bazanski66e58952021-10-05 17:06:56 +020053 // Ports contains the port mapping where to expose the internal ports of the VM to
54 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
55 // ConnectToSocket is set.
56 Ports launch.PortMap
57
Leopold20a036e2023-01-15 00:17:19 +010058 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
59 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020060 // want to test reboot behavior this should be false.
61 AllowReboot bool
62
Leopold20a036e2023-01-15 00:17:19 +010063 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
64 // set, it is instead connected to the given file descriptor/socket. If this is
65 // set, all port maps from the Ports option are ignored. Intended for networking
66 // this instance together with others for running more complex network
67 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020068 ConnectToSocket *os.File
69
Leopoldacfad5b2023-01-15 14:05:25 +010070 // When PcapDump is set, all traffic is dumped to a pcap file in the
71 // runtime directory (e.g. "net0.pcap" for the first interface).
72 PcapDump bool
73
Leopold20a036e2023-01-15 00:17:19 +010074 // SerialPort is an io.ReadWriter over which you can communicate with the serial
75 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020076 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
77 SerialPort io.ReadWriter
78
79 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
80 // registering into a cluster.
81 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020082
83 // Mac is the node's MAC address.
84 Mac *net.HardwareAddr
85
86 // Runtime keeps the node's QEMU runtime state.
87 Runtime *NodeRuntime
88}
89
Leopold20a036e2023-01-15 00:17:19 +010090// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020091type NodeRuntime struct {
92 // ld points at the node's launch directory storing data such as storage
93 // images, firmware variables or the TPM state.
94 ld string
95 // sd points at the node's socket directory.
96 sd string
97
98 // ctxT is the context QEMU will execute in.
99 ctxT context.Context
100 // CtxC is the QEMU context's cancellation function.
101 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200102}
103
104// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200105var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200106 node.ConsensusPort,
107
108 node.CuratorServicePort,
109 node.DebugServicePort,
110
111 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100112 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200113 node.CuratorServicePort,
114 node.DebuggerPort,
115}
116
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200117// setupRuntime creates the node's QEMU runtime directory, together with all
118// files required to preserve its state, a level below the chosen path ld. The
119// node's socket directory is similarily created a level below sd. It may
120// return an I/O error.
121func setupRuntime(ld, sd string) (*NodeRuntime, error) {
122 // Create a temporary directory to keep all the runtime files.
123 stdp, err := os.MkdirTemp(ld, "node_state*")
124 if err != nil {
125 return nil, fmt.Errorf("failed to create the state directory: %w", err)
126 }
127
128 // Initialize the node's storage with a prebuilt image.
129 si, err := datafile.ResolveRunfile("metropolis/node/node.img")
130 if err != nil {
131 return nil, fmt.Errorf("while resolving a path: %w", err)
132 }
133 di := filepath.Join(stdp, filepath.Base(si))
Serge Bazanski05f813b2023-03-16 17:58:39 +0100134 launch.Log("Cluster: copying node image: %s -> %s", si, di)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200135 if err := copyFile(si, di); err != nil {
136 return nil, fmt.Errorf("while copying the node image: %w", err)
137 }
138
139 // Initialize the OVMF firmware variables file.
140 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
141 if err != nil {
142 return nil, fmt.Errorf("while resolving a path: %w", err)
143 }
144 dv := filepath.Join(stdp, filepath.Base(sv))
145 if err := copyFile(sv, dv); err != nil {
146 return nil, fmt.Errorf("while copying firmware variables: %w", err)
147 }
148
149 // Create the TPM state directory and initialize all files required by swtpm.
150 tpmt := filepath.Join(stdp, "tpm")
151 if err := os.Mkdir(tpmt, 0755); err != nil {
152 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
153 }
154 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
155 if err != nil {
156 return nil, fmt.Errorf("while resolving a path: %w", err)
157 }
158 tpmf, err := os.ReadDir(tpms)
159 if err != nil {
160 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
161 }
162 for _, file := range tpmf {
163 name := file.Name()
164 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
165 if err != nil {
166 return nil, fmt.Errorf("while resolving a path: %w", err)
167 }
168 tgt := filepath.Join(tpmt, name)
169 if err := copyFile(src, tgt); err != nil {
170 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
171 }
172 }
173
174 // Create the socket directory.
175 sotdp, err := os.MkdirTemp(sd, "node_sock*")
176 if err != nil {
177 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
178 }
179
180 return &NodeRuntime{
181 ld: stdp,
182 sd: sotdp,
183 }, nil
184}
185
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200186// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200187// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200188func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200189 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200190 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200191 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100192 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200193 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200194 for _, n := range c.NodeIDs {
195 ep, err := resolver.NodeWithDefaultPort(n)
196 if err != nil {
197 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
198 }
199 r.AddEndpoint(ep)
200 }
201 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
202 grpc.WithTransportCredentials(authCreds),
203 grpc.WithResolvers(r),
204 grpc.WithContextDialer(c.DialNode),
205 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200206 if err != nil {
207 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
208 }
209 c.authClient = authClient
210 }
211 return c.authClient, nil
212}
213
Serge Bazanski66e58952021-10-05 17:06:56 +0200214// LaunchNode launches a single Metropolis node instance with the given options.
215// The instance runs mostly paravirtualized but with some emulated hardware
216// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200217// writable, and the changes are kept across reboots and shutdowns. ld and sd
218// point to the launch directory and the socket directory, holding the nodes'
219// state files (storage, tpm state, firmware state), and UNIX socket files
220// (swtpm <-> QEMU interplay) respectively. The directories must exist before
221// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
222// if either are not initialized.
223func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
224 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
225 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200226 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
227 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
228 // that we open and pass the name of it to QEMU. Not pinning this crashes both
229 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
230 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200231
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200232 // If it's the node's first start, set up its runtime directories.
233 if options.Runtime == nil {
234 r, err := setupRuntime(ld, sd)
235 if err != nil {
236 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200237 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200238 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200239 }
240
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200241 // Replace the node's context with a new one.
242 r := options.Runtime
243 if r.CtxC != nil {
244 r.CtxC()
245 }
246 r.ctxT, r.CtxC = context.WithCancel(ctx)
247
Serge Bazanski66e58952021-10-05 17:06:56 +0200248 var qemuNetType string
249 var qemuNetConfig launch.QemuValue
250 if options.ConnectToSocket != nil {
251 qemuNetType = "socket"
252 qemuNetConfig = launch.QemuValue{
253 "id": {"net0"},
254 "fd": {"3"},
255 }
256 } else {
257 qemuNetType = "user"
258 qemuNetConfig = launch.QemuValue{
259 "id": {"net0"},
260 "net": {"10.42.0.0/24"},
261 "dhcpstart": {"10.42.0.10"},
262 "hostfwd": options.Ports.ToQemuForwards(),
263 }
264 }
265
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200266 // Generate the node's MAC address if it isn't already set in NodeOptions.
267 if options.Mac == nil {
268 mac, err := generateRandomEthernetMAC()
269 if err != nil {
270 return err
271 }
272 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200273 }
274
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200275 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
276 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
277 storagePath := filepath.Join(r.ld, "node.img")
Serge Bazanski66e58952021-10-05 17:06:56 +0200278 qemuArgs := []string{"-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
279 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
280 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200281 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
282 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200283 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200284 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200285 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
286 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
287 "-device", "tpm-tis,tpmdev=tpm0",
288 "-device", "virtio-rng-pci",
289 "-serial", "stdio"}
290
291 if !options.AllowReboot {
292 qemuArgs = append(qemuArgs, "-no-reboot")
293 }
294
295 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200296 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200297 parametersRaw, err := proto.Marshal(options.NodeParameters)
298 if err != nil {
299 return fmt.Errorf("failed to encode node paraeters: %w", err)
300 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100301 if err := os.WriteFile(parametersPath, parametersRaw, 0644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200302 return fmt.Errorf("failed to write node parameters: %w", err)
303 }
304 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
305 }
306
Leopoldacfad5b2023-01-15 14:05:25 +0100307 if options.PcapDump {
308 var qemuNetDump launch.QemuValue
309 pcapPath := filepath.Join(r.ld, "net0.pcap")
310 if options.PcapDump {
311 qemuNetDump = launch.QemuValue{
312 "id": {"net0"},
313 "netdev": {"net0"},
314 "file": {pcapPath},
315 }
316 }
317 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
318 }
319
Serge Bazanski66e58952021-10-05 17:06:56 +0200320 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200321 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200322 defer tpmCancel()
323
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200324 tpmd := filepath.Join(r.ld, "tpm")
325 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200326 tpmEmuCmd.Stderr = os.Stderr
327 tpmEmuCmd.Stdout = os.Stdout
328
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200329 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200330 if err != nil {
331 return fmt.Errorf("failed to start TPM emulator: %w", err)
332 }
333
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200334 // Wait for the socket to be created by the TPM emulator before launching
335 // QEMU.
336 for {
337 _, err := os.Stat(tpmSocketPath)
338 if err == nil {
339 break
340 }
341 if err != nil && !os.IsNotExist(err) {
342 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
343 }
344 if err := tpmCtx.Err(); err != nil {
345 return fmt.Errorf("while waiting for the TPM socket: %w", err)
346 }
347 time.Sleep(time.Millisecond * 100)
348 }
349
Serge Bazanski66e58952021-10-05 17:06:56 +0200350 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200351 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200352 if options.ConnectToSocket != nil {
353 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
354 }
355
356 var stdErrBuf bytes.Buffer
357 systemCmd.Stderr = &stdErrBuf
358 systemCmd.Stdout = options.SerialPort
359
Leopoldaf5086b2023-01-15 14:12:42 +0100360 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
361
Serge Bazanski66e58952021-10-05 17:06:56 +0200362 err = systemCmd.Run()
363
364 // Stop TPM emulator and wait for it to exit to properly reap the child process
365 tpmCancel()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100366 launch.Log("Node: Waiting for TPM emulator to exit")
Serge Bazanski66e58952021-10-05 17:06:56 +0200367 // Wait returns a SIGKILL error because we just cancelled its context.
368 // We still need to call it to avoid creating zombies.
369 _ = tpmEmuCmd.Wait()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100370 launch.Log("Node: TPM emulator done")
Serge Bazanski66e58952021-10-05 17:06:56 +0200371
372 var exerr *exec.ExitError
373 if err != nil && errors.As(err, &exerr) {
374 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
375 if status.Signaled() && status.Signal() == syscall.SIGKILL {
376 // Process was killed externally (most likely by our context being canceled).
377 // This is a normal exit for us, so return nil
378 return nil
379 }
380 exerr.Stderr = stdErrBuf.Bytes()
381 newErr := launch.QEMUError(*exerr)
382 return &newErr
383 }
384 return err
385}
386
387func copyFile(src, dst string) error {
388 in, err := os.Open(src)
389 if err != nil {
390 return fmt.Errorf("when opening source: %w", err)
391 }
392 defer in.Close()
393
394 out, err := os.Create(dst)
395 if err != nil {
396 return fmt.Errorf("when creating destination: %w", err)
397 }
398 defer out.Close()
399
400 _, err = io.Copy(out, in)
401 if err != nil {
402 return fmt.Errorf("when copying file: %w", err)
403 }
404 return out.Close()
405}
406
Serge Bazanskie78a0892021-10-07 17:03:49 +0200407// getNodes wraps around Management.GetNodes to return a list of nodes in a
408// cluster.
409func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200410 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100411 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100412 err := backoff.Retry(func() error {
413 res = nil
414 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200415 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100416 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200417 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100418 for {
419 node, err := srvN.Recv()
420 if err == io.EOF {
421 break
422 }
423 if err != nil {
424 return fmt.Errorf("GetNodes.Recv: %w", err)
425 }
426 res = append(res, node)
427 }
428 return nil
429 }, bo)
430 if err != nil {
431 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200432 }
433 return res, nil
434}
435
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200436// getNode wraps Management.GetNodes. It returns node information matching
437// given node ID.
438func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
439 nodes, err := getNodes(ctx, mgmt)
440 if err != nil {
441 return nil, fmt.Errorf("could not get nodes: %w", err)
442 }
443 for _, n := range nodes {
444 eid := identity.NodeID(n.Pubkey)
445 if eid != id {
446 continue
447 }
448 return n, nil
449 }
450 return nil, fmt.Errorf("no such node.")
451}
452
Serge Bazanski66e58952021-10-05 17:06:56 +0200453// Gets a random EUI-48 Ethernet MAC address
454func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
455 macBuf := make([]byte, 6)
456 _, err := rand.Read(macBuf)
457 if err != nil {
458 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
459 }
460
461 // Set U/L bit and clear I/G bit (locally administered individual MAC)
462 // Ref IEEE 802-2014 Section 8.2.2
463 macBuf[0] = (macBuf[0] | 2) & 0xfe
464 mac := net.HardwareAddr(macBuf)
465 return &mac, nil
466}
467
Serge Bazanskibe742842022-04-04 13:18:50 +0200468const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200469
Serge Bazanskibe742842022-04-04 13:18:50 +0200470// ClusterPorts contains all ports handled by Nanoswitch.
471var ClusterPorts = []uint16{
472 // Forwarded to the first node.
473 uint16(node.CuratorServicePort),
474 uint16(node.DebugServicePort),
475 uint16(node.KubernetesAPIPort),
476 uint16(node.KubernetesAPIWrappedPort),
477
478 // SOCKS proxy to the switch network
479 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200480}
481
482// ClusterOptions contains all options for launching a Metropolis cluster.
483type ClusterOptions struct {
484 // The number of nodes this cluster should be started with.
485 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100486
487 // If true, node logs will be saved to individual files instead of being printed
488 // out to stderr. The path of these files will be still printed to stdout.
489 //
490 // The files will be located within the launch directory inside TEST_TMPDIR (or
491 // the default tempdir location, if not set).
492 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200493
494 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
495 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
496 // incomplete.
497 LeaveNodesNew bool
Serge Bazanski66e58952021-10-05 17:06:56 +0200498}
499
500// Cluster is the running Metropolis cluster launched using the LaunchCluster
501// function.
502type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200503 // Owner is the TLS Certificate of the owner of the test cluster. This can be
504 // used to authenticate further clients to the running cluster.
505 Owner tls.Certificate
506 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200507 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200508 Ports launch.PortMap
509
Serge Bazanskibe742842022-04-04 13:18:50 +0200510 // Nodes is a map from Node ID to its runtime information.
511 Nodes map[string]*NodeInCluster
512 // NodeIDs is a list of node IDs that are backing this cluster, in order of
513 // creation.
514 NodeIDs []string
515
Serge Bazanski54e212a2023-06-14 13:45:11 +0200516 // CACertificate is the cluster's CA certificate.
517 CACertificate *x509.Certificate
518
Serge Bazanski66e58952021-10-05 17:06:56 +0200519 // nodesDone is a list of channels populated with the return codes from all the
520 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100521 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200522 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200523 // nodeOpts are the cluster member nodes' mutable launch options, kept here
524 // to facilitate reboots.
525 nodeOpts []NodeOptions
526 // launchDir points at the directory keeping the nodes' state, such as storage
527 // images, firmware variable files, TPM state.
528 launchDir string
529 // socketDir points at the directory keeping UNIX socket files, such as these
530 // used to facilitate communication between QEMU and swtpm. It's different
531 // from launchDir, and anchored nearer the file system root, due to the
532 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100533 socketDir string
534 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200535
Serge Bazanskibe742842022-04-04 13:18:50 +0200536 // socksDialer is used by DialNode to establish connections to nodes via the
537 // SOCKS server ran by nanoswitch.
538 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200539
540 // authClient is a cached authenticated owner connection to a Curator
541 // instance within the cluster.
542 authClient *grpc.ClientConn
543
544 // ctxT is the context individual node contexts are created from.
545 ctxT context.Context
546 // ctxC is used by Close to cancel the context under which the nodes are
547 // running.
548 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200549}
550
551// NodeInCluster represents information about a node that's part of a Cluster.
552type NodeInCluster struct {
553 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200554 ID string
555 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200556 // Address of the node on the network ran by nanoswitch. Not reachable from the
557 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
558 // on Cluster.Ports[SOCKSPort]).
559 ManagementAddress string
560}
561
562// firstConnection performs the initial owner credential escrow with a newly
563// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
564// running at 10.1.0.2, which is always the case with the current nanoswitch
565// implementation.
566//
Leopold20a036e2023-01-15 00:17:19 +0100567// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200568// information as NodeInCluster.
569func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
570 // Dial external service.
571 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
572 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
573 if err != nil {
574 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
575 }
576 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
577 return socksDialer.Dial("tcp", addr)
578 }
579 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
580 if err != nil {
581 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
582 }
583 defer initClient.Close()
584
585 // Retrieve owner certificate - this can take a while because the node is still
586 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100587 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200588 aaa := apb.NewAAAClient(initClient)
589 var cert *tls.Certificate
590 err = backoff.Retry(func() error {
591 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
592 if st, ok := status.FromError(err); ok {
593 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100594 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200595 return err
596 }
597 }
598 return backoff.Permanent(err)
599 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
600 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200601 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200602 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100603 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200604
605 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200606 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200607 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
608 if err != nil {
609 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
610 }
611 defer authClient.Close()
612 mgmt := apb.NewManagementClient(authClient)
613
614 var node *NodeInCluster
615 err = backoff.Retry(func() error {
616 nodes, err := getNodes(ctx, mgmt)
617 if err != nil {
618 return fmt.Errorf("retrieving nodes failed: %w", err)
619 }
620 if len(nodes) != 1 {
621 return fmt.Errorf("expected one node, got %d", len(nodes))
622 }
623 n := nodes[0]
624 if n.Status == nil || n.Status.ExternalAddress == "" {
625 return fmt.Errorf("node has no status and/or address")
626 }
627 node = &NodeInCluster{
628 ID: identity.NodeID(n.Pubkey),
629 ManagementAddress: n.Status.ExternalAddress,
630 }
631 return nil
632 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
633 if err != nil {
634 return nil, nil, err
635 }
636
637 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200638}
639
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100640func NewSerialFileLogger(p string) (io.ReadWriter, error) {
641 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0600)
642 if err != nil {
643 return nil, err
644 }
645 return f, nil
646}
647
Serge Bazanski66e58952021-10-05 17:06:56 +0200648// LaunchCluster launches a cluster of Metropolis node VMs together with a
649// Nanoswitch instance to network them all together.
650//
651// The given context will be used to run all qemu instances in the cluster, and
652// canceling the context or calling Close() will terminate them.
653func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200654 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200655 return nil, errors.New("refusing to start cluster with zero nodes")
656 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200657
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200658 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100659 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200660 if err != nil {
661 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
662 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100663 // Create the metroctl config directory. We keep it in /tmp because in some
664 // scenarios it's end-user visible and we want it short.
665 md, err := os.MkdirTemp("/tmp", "metroctl-*")
666 if err != nil {
667 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
668 }
669
670 // Create the socket directory. We keep it in /tmp because of socket path limits.
671 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200672 if err != nil {
673 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
674 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200675
676 // Prepare links between nodes and nanoswitch.
677 var switchPorts []*os.File
678 var vmPorts []*os.File
679 for i := 0; i < opts.NumNodes; i++ {
680 switchPort, vmPort, err := launch.NewSocketPair()
681 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200682 return nil, fmt.Errorf("failed to get socketpair: %w", err)
683 }
684 switchPorts = append(switchPorts, switchPort)
685 vmPorts = append(vmPorts, vmPort)
686 }
687
Serge Bazanskie78a0892021-10-07 17:03:49 +0200688 // Make a list of channels that will be populated by all running node qemu
689 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200690 done := make([]chan error, opts.NumNodes)
691 for i, _ := range done {
692 done[i] = make(chan error, 1)
693 }
694
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200695 // Prepare the node options. These will be kept as part of Cluster.
696 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
697 // launch. The runtime information can be later used to restart a node.
698 // The 0th node will be initialized first. The rest will follow after it
699 // had bootstrapped the cluster.
700 nodeOpts := make([]NodeOptions, opts.NumNodes)
701 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100702 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200703 ConnectToSocket: vmPorts[0],
704 NodeParameters: &apb.NodeParameters{
705 Cluster: &apb.NodeParameters_ClusterBootstrap_{
706 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
707 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200708 },
709 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200710 },
711 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100712 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200713 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100714 if opts.NodeLogsToFiles {
715 path := path.Join(ld, "node-1.txt")
716 port, err := NewSerialFileLogger(path)
717 if err != nil {
718 return nil, fmt.Errorf("could not open log file for node 1: %w", err)
719 }
720 launch.Log("Node 1 logs at %s", path)
721 nodeOpts[0].SerialPort = port
722 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200723
724 // Start the first node.
725 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100726 launch.Log("Cluster: Starting node %d...", 1)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200727 go func() {
728 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200729 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100730 launch.Log("Node %d finished with an error: %v", 1, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200731 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200732 done[0] <- err
733 }()
734
Serge Bazanskie78a0892021-10-07 17:03:49 +0200735 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200736 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
737 if err != nil {
738 ctxC()
739 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
740 }
741
742 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100743 var serialPort io.ReadWriter
744 if opts.NodeLogsToFiles {
745 path := path.Join(ld, "nanoswitch.txt")
746 serialPort, err = NewSerialFileLogger(path)
747 if err != nil {
748 launch.Log("Could not open log file for nanoswitch: %v", err)
749 }
750 launch.Log("Nanoswitch logs at %s", path)
751 } else {
752 serialPort = newPrefixedStdio(99)
753 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200754 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100755 Name: "nanoswitch",
Serge Bazanski66e58952021-10-05 17:06:56 +0200756 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100757 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200758 ExtraNetworkInterfaces: switchPorts,
759 PortMap: portMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100760 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100761 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200762 }); err != nil {
763 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100764 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200765 }
766 }
767 }()
768
Serge Bazanskibe742842022-04-04 13:18:50 +0200769 // Build SOCKS dialer.
770 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
771 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200772 if err != nil {
773 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200774 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200775 }
776
Serge Bazanskibe742842022-04-04 13:18:50 +0200777 // Retrieve owner credentials and first node.
778 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200779 if err != nil {
780 ctxC()
781 return nil, err
782 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200783
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100784 // Write credentials to the metroctl directory.
785 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
786 ctxC()
787 return nil, fmt.Errorf("could not write owner key: %w", err)
788 }
789 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
790 ctxC()
791 return nil, fmt.Errorf("could not write owner certificate: %w", err)
792 }
793
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200794 // Set up a partially initialized cluster instance, to be filled in in the
795 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200796 cluster := &Cluster{
797 Owner: *cert,
798 Ports: portMap,
799 Nodes: map[string]*NodeInCluster{
800 firstNode.ID: firstNode,
801 },
802 NodeIDs: []string{
803 firstNode.ID,
804 },
805
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100806 nodesDone: done,
807 nodeOpts: nodeOpts,
808 launchDir: ld,
809 socketDir: sd,
810 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200811
Serge Bazanskibe742842022-04-04 13:18:50 +0200812 socksDialer: socksDialer,
813
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200814 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200815 ctxC: ctxC,
816 }
817
818 // Now start the rest of the nodes and register them into the cluster.
819
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200820 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200821 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200822 if err != nil {
823 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200824 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200825 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200826 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200827
828 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100829 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200830 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
831 if err != nil {
832 ctxC()
833 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
834 }
835 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100836 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200837
838 // Retrieve cluster info (for directory and ca public key) to register further
839 // nodes.
840 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
841 if err != nil {
842 ctxC()
843 return nil, fmt.Errorf("GetClusterInfo: %w", err)
844 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200845 caCert, err := x509.ParseCertificate(resI.CaCertificate)
846 if err != nil {
847 ctxC()
848 return nil, fmt.Errorf("ParseCertificate: %w", err)
849 }
850 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200851
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200852 // Use the retrieved information to configure the rest of the node options.
853 for i := 1; i < opts.NumNodes; i++ {
854 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100855 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200856 ConnectToSocket: vmPorts[i],
857 NodeParameters: &apb.NodeParameters{
858 Cluster: &apb.NodeParameters_ClusterRegister_{
859 ClusterRegister: &apb.NodeParameters_ClusterRegister{
860 RegisterTicket: ticket,
861 ClusterDirectory: resI.ClusterDirectory,
862 CaCertificate: resI.CaCertificate,
863 },
864 },
865 },
866 SerialPort: newPrefixedStdio(i),
867 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100868 if opts.NodeLogsToFiles {
869 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
870 port, err := NewSerialFileLogger(path)
871 if err != nil {
872 return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
873 }
874 launch.Log("Node %d logs at %s", i+1, path)
875 nodeOpts[i].SerialPort = port
876 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200877 }
878
879 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200880 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100881 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200882 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200883 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200884 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100885 launch.Log("Node %d finished with an error: %v", i, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200886 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200887 done[i] <- err
888 }(i)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200889 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200890
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200891 seenNodes := make(map[string]bool)
892 launch.Log("Cluster: waiting for nodes to appear as NEW...")
893 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200894 for {
895 nodes, err := getNodes(ctx, mgmt)
896 if err != nil {
897 ctxC()
898 return nil, fmt.Errorf("could not get nodes: %w", err)
899 }
900 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200901 if n.State != cpb.NodeState_NODE_STATE_NEW {
902 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +0200903 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200904 seenNodes[n.Id] = true
905 cluster.Nodes[n.Id] = &NodeInCluster{
906 ID: n.Id,
907 Pubkey: n.Pubkey,
908 }
909 cluster.NodeIDs = append(cluster.NodeIDs, n.Id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200910 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200911
912 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200913 break
914 }
915 time.Sleep(1 * time.Second)
916 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200917 }
918 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200919
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200920 approvedNodes := make(map[string]bool)
921 upNodes := make(map[string]bool)
922 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200923 for {
924 nodes, err := getNodes(ctx, mgmt)
925 if err != nil {
926 ctxC()
927 return nil, fmt.Errorf("could not get nodes: %w", err)
928 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200929 for _, node := range nodes {
930 if !seenNodes[node.Id] {
931 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200932 continue
933 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200934
935 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
936 launch.Log("Cluster: node %s is up", node.Id)
937 upNodes[node.Id] = true
938 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +0200939 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200940 if upNodes[node.Id] {
941 continue
Serge Bazanskibe742842022-04-04 13:18:50 +0200942 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200943
944 if !approvedNodes[node.Id] {
945 launch.Log("Cluster: approving node %s", node.Id)
946 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
947 Pubkey: node.Pubkey,
948 })
949 if err != nil {
950 ctxC()
951 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
952 }
953 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +0200954 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200955 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200956
957 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
958 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200959 break
960 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200961 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200962 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200963 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200964
Serge Bazanski05f813b2023-03-16 17:58:39 +0100965 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +0200966 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100967 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +0200968 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200969 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +0200970
Serge Bazanskibe742842022-04-04 13:18:50 +0200971 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200972}
973
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200974// RebootNode reboots the cluster member node matching the given index, and
975// waits for it to rejoin the cluster. It will use the given context ctx to run
976// cluster API requests, whereas the resulting QEMU process will be created
977// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
978func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
979 if idx < 0 || idx >= len(c.NodeIDs) {
980 return fmt.Errorf("index out of bounds.")
981 }
982 id := c.NodeIDs[idx]
983
984 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200985 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200986 if err != nil {
987 return err
988 }
989 mgmt := apb.NewManagementClient(curC)
990
991 // Get the timestamp of the node's last update, as observed by Curator.
992 // It'll be needed to make sure it had rejoined the cluster after the reboot.
993 var is *apb.Node
994 for {
995 r, err := getNode(ctx, mgmt, id)
996 if err != nil {
997 return err
998 }
999
1000 // Node status may be absent if it hasn't reported to the cluster yet. Wait
1001 // for it to appear before progressing further.
1002 if r.Status != nil {
1003 is = r
1004 break
1005 }
1006 time.Sleep(time.Second)
1007 }
1008
1009 // Cancel the node's context. This will shut down QEMU.
1010 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001011 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001012 err = <-c.nodesDone[idx]
1013 if err != nil {
1014 return fmt.Errorf("while restarting node: %w", err)
1015 }
1016
1017 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001018 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001019 go func(n int) {
1020 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001021 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001022 launch.Log("Node %d finished with an error: %v", n, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001023 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001024 c.nodesDone[n] <- err
1025 }(idx)
1026
1027 // Poll Management.GetNodes until the node's timestamp is updated.
1028 for {
1029 cs, err := getNode(ctx, mgmt, id)
1030 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001031 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001032 return err
1033 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001034 launch.Log("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001035 if cs.Status == nil {
1036 continue
1037 }
Mateusz Zalega28800ad2022-07-08 14:56:02 +02001038 if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001039 break
1040 }
1041 time.Sleep(time.Second)
1042 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001043 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001044 return nil
1045}
1046
1047// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001048// nodes to stop. It returns an error if stopping the nodes failed, or one of
1049// the nodes failed to fully start in the first place.
1050func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001051 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001052 if c.authClient != nil {
1053 c.authClient.Close()
1054 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001055 c.ctxC()
1056
Leopold20a036e2023-01-15 00:17:19 +01001057 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001058 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001059 for _, c := range c.nodesDone {
1060 err := <-c
1061 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001062 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001063 }
1064 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001065 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001066 os.RemoveAll(c.launchDir)
1067 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001068 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001069 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001070 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001071}
Serge Bazanskibe742842022-04-04 13:18:50 +02001072
1073// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1074// their ID. This is performed by connecting to the cluster nanoswitch via its
1075// SOCKS proxy, and using the cluster node list for name resolution.
1076//
1077// For example:
1078//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001079// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001080func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1081 host, port, err := net.SplitHostPort(addr)
1082 if err != nil {
1083 return nil, fmt.Errorf("invalid host:port: %w", err)
1084 }
1085 // Already an IP address?
1086 if net.ParseIP(host) != nil {
1087 return c.socksDialer.Dial("tcp", addr)
1088 }
1089
1090 // Otherwise, expect a node name.
1091 node, ok := c.Nodes[host]
1092 if !ok {
1093 return nil, fmt.Errorf("unknown node %q", host)
1094 }
1095 addr = net.JoinHostPort(node.ManagementAddress, port)
1096 return c.socksDialer.Dial("tcp", addr)
1097}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001098
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001099// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1100// Kubernetes authenticating proxy using the cluster owner identity.
1101// It currently has access to everything (i.e. the cluster-admin role)
1102// via the owner-admin binding.
1103func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1104 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1105 if err != nil {
1106 // We explicitly pass an Ed25519 private key in, so this can't happen
1107 panic(err)
1108 }
1109
1110 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
1111 var clientConfig = rest.Config{
1112 Host: host,
1113 TLSClientConfig: rest.TLSClientConfig{
1114 // TODO(q3k): use CA certificate
1115 Insecure: true,
1116 ServerName: "kubernetes.default.svc",
1117 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1118 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1119 },
1120 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1121 return c.DialNode(ctx, address)
1122 },
1123 }
1124 return kubernetes.NewForConfig(&clientConfig)
1125}
1126
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001127// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1128// which are currently Kubernetes controllers, ie. run an apiserver. This list
1129// might be empty if no node is currently configured with the
1130// 'KubernetesController' node.
1131func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1132 curC, err := c.CuratorClient()
1133 if err != nil {
1134 return nil, err
1135 }
1136 mgmt := apb.NewManagementClient(curC)
1137 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1138 Filter: "has(node.roles.kubernetes_controller)",
1139 })
1140 if err != nil {
1141 return nil, err
1142 }
1143 defer srv.CloseSend()
1144 var res []string
1145 for {
1146 n, err := srv.Recv()
1147 if err == io.EOF {
1148 break
1149 }
1150 if err != nil {
1151 return nil, err
1152 }
1153 if n.Status == nil || n.Status.ExternalAddress == "" {
1154 continue
1155 }
1156 res = append(res, n.Status.ExternalAddress)
1157 }
1158 return res, nil
1159}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001160
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001161// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1162// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001163func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1164 // Get an authenticated owner client within the cluster.
1165 curC, err := c.CuratorClient()
1166 if err != nil {
1167 return err
1168 }
1169 mgmt := apb.NewManagementClient(curC)
1170 nodes, err := getNodes(ctx, mgmt)
1171 if err != nil {
1172 return err
1173 }
1174
1175 var unhealthy []string
1176 for _, node := range nodes {
1177 if node.Health == apb.Node_HEALTHY {
1178 continue
1179 }
1180 unhealthy = append(unhealthy, node.Id)
1181 }
1182 if len(unhealthy) == 0 {
1183 return nil
1184 }
1185 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1186}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001187
1188// ApproveNode approves a node by ID, waiting for it to become UP.
1189func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1190 curC, err := c.CuratorClient()
1191 if err != nil {
1192 return err
1193 }
1194 mgmt := apb.NewManagementClient(curC)
1195
1196 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1197 Pubkey: c.Nodes[id].Pubkey,
1198 })
1199 if err != nil {
1200 return fmt.Errorf("ApproveNode: %w", err)
1201 }
1202 launch.Log("Cluster: %s: approved, waiting for UP", id)
1203 for {
1204 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1205 if err != nil {
1206 return fmt.Errorf("GetNodes: %w", err)
1207 }
1208 found := false
1209 for {
1210 node, err := nodes.Recv()
1211 if errors.Is(err, io.EOF) {
1212 break
1213 }
1214 if err != nil {
1215 return fmt.Errorf("Nodes.Recv: %w", err)
1216 }
1217 if node.Id != id {
1218 continue
1219 }
1220 if node.State != cpb.NodeState_NODE_STATE_UP {
1221 continue
1222 }
1223 found = true
1224 break
1225 }
1226 nodes.CloseSend()
1227
1228 if found {
1229 break
1230 }
1231 time.Sleep(time.Second)
1232 }
1233 launch.Log("Cluster: %s: UP", id)
1234 return nil
1235}
1236
1237// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1238func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1239 curC, err := c.CuratorClient()
1240 if err != nil {
1241 return err
1242 }
1243 mgmt := apb.NewManagementClient(curC)
1244
1245 tr := true
1246 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1247 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1248 Node: &apb.UpdateNodeRolesRequest_Id{
1249 Id: id,
1250 },
1251 KubernetesWorker: &tr,
1252 })
1253 return err
1254}