blob: a312e86897bf9fb1598f34d55bed3ecb823fef59 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020019 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "os"
21 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010022 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "path/filepath"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020024 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020025 "syscall"
26 "time"
27
28 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020030 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020031 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010032 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020034 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020035 "k8s.io/client-go/kubernetes"
36 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020037
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020038 apb "source.monogon.dev/metropolis/proto/api"
39 cpb "source.monogon.dev/metropolis/proto/common"
40
Serge Bazanski1f8cad72023-03-20 16:58:10 +010041 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020042 "source.monogon.dev/metropolis/cli/pkg/datafile"
Serge Bazanski66e58952021-10-05 17:06:56 +020043 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020044 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020045 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020046 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Lorenz Brun150f24a2023-07-13 20:11:06 +020047 "source.monogon.dev/metropolis/pkg/localregistry"
Serge Bazanski66e58952021-10-05 17:06:56 +020048 "source.monogon.dev/metropolis/test/launch"
49)
50
Leopold20a036e2023-01-15 00:17:19 +010051// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020052type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010053 // Name is a human-readable identifier to be used in debug output.
54 Name string
55
Serge Bazanski66e58952021-10-05 17:06:56 +020056 // Ports contains the port mapping where to expose the internal ports of the VM to
57 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
58 // ConnectToSocket is set.
59 Ports launch.PortMap
60
Leopold20a036e2023-01-15 00:17:19 +010061 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
62 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020063 // want to test reboot behavior this should be false.
64 AllowReboot bool
65
Leopold20a036e2023-01-15 00:17:19 +010066 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
67 // set, it is instead connected to the given file descriptor/socket. If this is
68 // set, all port maps from the Ports option are ignored. Intended for networking
69 // this instance together with others for running more complex network
70 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020071 ConnectToSocket *os.File
72
Leopoldacfad5b2023-01-15 14:05:25 +010073 // When PcapDump is set, all traffic is dumped to a pcap file in the
74 // runtime directory (e.g. "net0.pcap" for the first interface).
75 PcapDump bool
76
Leopold20a036e2023-01-15 00:17:19 +010077 // SerialPort is an io.ReadWriter over which you can communicate with the serial
78 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020079 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
80 SerialPort io.ReadWriter
81
82 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
83 // registering into a cluster.
84 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020085
86 // Mac is the node's MAC address.
87 Mac *net.HardwareAddr
88
89 // Runtime keeps the node's QEMU runtime state.
90 Runtime *NodeRuntime
91}
92
Leopold20a036e2023-01-15 00:17:19 +010093// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020094type NodeRuntime struct {
95 // ld points at the node's launch directory storing data such as storage
96 // images, firmware variables or the TPM state.
97 ld string
98 // sd points at the node's socket directory.
99 sd string
100
101 // ctxT is the context QEMU will execute in.
102 ctxT context.Context
103 // CtxC is the QEMU context's cancellation function.
104 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200105}
106
107// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200108var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200109 node.ConsensusPort,
110
111 node.CuratorServicePort,
112 node.DebugServicePort,
113
114 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100115 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200116 node.CuratorServicePort,
117 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200118 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200119}
120
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200121// setupRuntime creates the node's QEMU runtime directory, together with all
122// files required to preserve its state, a level below the chosen path ld. The
123// node's socket directory is similarily created a level below sd. It may
124// return an I/O error.
125func setupRuntime(ld, sd string) (*NodeRuntime, error) {
126 // Create a temporary directory to keep all the runtime files.
127 stdp, err := os.MkdirTemp(ld, "node_state*")
128 if err != nil {
129 return nil, fmt.Errorf("failed to create the state directory: %w", err)
130 }
131
132 // Initialize the node's storage with a prebuilt image.
Lorenz Brun1dc60af2023-10-03 15:40:09 +0200133 si, err := datafile.ResolveRunfile("metropolis/node/image.img")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200134 if err != nil {
135 return nil, fmt.Errorf("while resolving a path: %w", err)
136 }
137 di := filepath.Join(stdp, filepath.Base(si))
Serge Bazanski05f813b2023-03-16 17:58:39 +0100138 launch.Log("Cluster: copying node image: %s -> %s", si, di)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200139 if err := copyFile(si, di); err != nil {
140 return nil, fmt.Errorf("while copying the node image: %w", err)
141 }
142
143 // Initialize the OVMF firmware variables file.
144 sv, err := datafile.ResolveRunfile("external/edk2/OVMF_VARS.fd")
145 if err != nil {
146 return nil, fmt.Errorf("while resolving a path: %w", err)
147 }
148 dv := filepath.Join(stdp, filepath.Base(sv))
149 if err := copyFile(sv, dv); err != nil {
150 return nil, fmt.Errorf("while copying firmware variables: %w", err)
151 }
152
153 // Create the TPM state directory and initialize all files required by swtpm.
154 tpmt := filepath.Join(stdp, "tpm")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200155 if err := os.Mkdir(tpmt, 0o755); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200156 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
157 }
158 tpms, err := datafile.ResolveRunfile("metropolis/node/tpm")
159 if err != nil {
160 return nil, fmt.Errorf("while resolving a path: %w", err)
161 }
162 tpmf, err := os.ReadDir(tpms)
163 if err != nil {
164 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
165 }
166 for _, file := range tpmf {
167 name := file.Name()
168 src, err := datafile.ResolveRunfile(filepath.Join(tpms, name))
169 if err != nil {
170 return nil, fmt.Errorf("while resolving a path: %w", err)
171 }
172 tgt := filepath.Join(tpmt, name)
173 if err := copyFile(src, tgt); err != nil {
174 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
175 }
176 }
177
178 // Create the socket directory.
179 sotdp, err := os.MkdirTemp(sd, "node_sock*")
180 if err != nil {
181 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
182 }
183
184 return &NodeRuntime{
185 ld: stdp,
186 sd: sotdp,
187 }, nil
188}
189
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200190// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200191// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200192func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200193 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200194 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200195 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100196 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200197 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200198 for _, n := range c.NodeIDs {
199 ep, err := resolver.NodeWithDefaultPort(n)
200 if err != nil {
201 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
202 }
203 r.AddEndpoint(ep)
204 }
205 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
206 grpc.WithTransportCredentials(authCreds),
207 grpc.WithResolvers(r),
208 grpc.WithContextDialer(c.DialNode),
209 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200210 if err != nil {
211 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
212 }
213 c.authClient = authClient
214 }
215 return c.authClient, nil
216}
217
Serge Bazanski66e58952021-10-05 17:06:56 +0200218// LaunchNode launches a single Metropolis node instance with the given options.
219// The instance runs mostly paravirtualized but with some emulated hardware
220// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200221// writable, and the changes are kept across reboots and shutdowns. ld and sd
222// point to the launch directory and the socket directory, holding the nodes'
223// state files (storage, tpm state, firmware state), and UNIX socket files
224// (swtpm <-> QEMU interplay) respectively. The directories must exist before
225// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
226// if either are not initialized.
227func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
228 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
229 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200230 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
231 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
232 // that we open and pass the name of it to QEMU. Not pinning this crashes both
233 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
234 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200235
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200236 // If it's the node's first start, set up its runtime directories.
237 if options.Runtime == nil {
238 r, err := setupRuntime(ld, sd)
239 if err != nil {
240 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200241 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200242 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200243 }
244
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200245 // Replace the node's context with a new one.
246 r := options.Runtime
247 if r.CtxC != nil {
248 r.CtxC()
249 }
250 r.ctxT, r.CtxC = context.WithCancel(ctx)
251
Serge Bazanski66e58952021-10-05 17:06:56 +0200252 var qemuNetType string
253 var qemuNetConfig launch.QemuValue
254 if options.ConnectToSocket != nil {
255 qemuNetType = "socket"
256 qemuNetConfig = launch.QemuValue{
257 "id": {"net0"},
258 "fd": {"3"},
259 }
260 } else {
261 qemuNetType = "user"
262 qemuNetConfig = launch.QemuValue{
263 "id": {"net0"},
264 "net": {"10.42.0.0/24"},
265 "dhcpstart": {"10.42.0.10"},
266 "hostfwd": options.Ports.ToQemuForwards(),
267 }
268 }
269
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200270 // Generate the node's MAC address if it isn't already set in NodeOptions.
271 if options.Mac == nil {
272 mac, err := generateRandomEthernetMAC()
273 if err != nil {
274 return err
275 }
276 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200277 }
278
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200279 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
280 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Lorenz Brun1dc60af2023-10-03 15:40:09 +0200281 storagePath := filepath.Join(r.ld, "image.img")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200282 qemuArgs := []string{
283 "-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
Serge Bazanski66e58952021-10-05 17:06:56 +0200284 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
285 "-drive", "if=pflash,format=raw,readonly,file=external/edk2/OVMF_CODE.fd",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200286 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
287 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200288 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200289 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200290 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
291 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
292 "-device", "tpm-tis,tpmdev=tpm0",
293 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200294 "-serial", "stdio",
295 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200296
297 if !options.AllowReboot {
298 qemuArgs = append(qemuArgs, "-no-reboot")
299 }
300
301 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200302 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200303 parametersRaw, err := proto.Marshal(options.NodeParameters)
304 if err != nil {
305 return fmt.Errorf("failed to encode node paraeters: %w", err)
306 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200307 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200308 return fmt.Errorf("failed to write node parameters: %w", err)
309 }
310 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
311 }
312
Leopoldacfad5b2023-01-15 14:05:25 +0100313 if options.PcapDump {
314 var qemuNetDump launch.QemuValue
315 pcapPath := filepath.Join(r.ld, "net0.pcap")
316 if options.PcapDump {
317 qemuNetDump = launch.QemuValue{
318 "id": {"net0"},
319 "netdev": {"net0"},
320 "file": {pcapPath},
321 }
322 }
323 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
324 }
325
Serge Bazanski66e58952021-10-05 17:06:56 +0200326 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200327 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200328 defer tpmCancel()
329
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200330 tpmd := filepath.Join(r.ld, "tpm")
331 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200332 tpmEmuCmd.Stderr = os.Stderr
333 tpmEmuCmd.Stdout = os.Stdout
334
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200335 err := tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200336 if err != nil {
337 return fmt.Errorf("failed to start TPM emulator: %w", err)
338 }
339
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200340 // Wait for the socket to be created by the TPM emulator before launching
341 // QEMU.
342 for {
343 _, err := os.Stat(tpmSocketPath)
344 if err == nil {
345 break
346 }
347 if err != nil && !os.IsNotExist(err) {
348 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
349 }
350 if err := tpmCtx.Err(); err != nil {
351 return fmt.Errorf("while waiting for the TPM socket: %w", err)
352 }
353 time.Sleep(time.Millisecond * 100)
354 }
355
Serge Bazanski66e58952021-10-05 17:06:56 +0200356 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200357 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200358 if options.ConnectToSocket != nil {
359 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
360 }
361
362 var stdErrBuf bytes.Buffer
363 systemCmd.Stderr = &stdErrBuf
364 systemCmd.Stdout = options.SerialPort
365
Leopoldaf5086b2023-01-15 14:12:42 +0100366 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
367
Serge Bazanski66e58952021-10-05 17:06:56 +0200368 err = systemCmd.Run()
369
370 // Stop TPM emulator and wait for it to exit to properly reap the child process
371 tpmCancel()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100372 launch.Log("Node: Waiting for TPM emulator to exit")
Serge Bazanski66e58952021-10-05 17:06:56 +0200373 // Wait returns a SIGKILL error because we just cancelled its context.
374 // We still need to call it to avoid creating zombies.
375 _ = tpmEmuCmd.Wait()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100376 launch.Log("Node: TPM emulator done")
Serge Bazanski66e58952021-10-05 17:06:56 +0200377
378 var exerr *exec.ExitError
379 if err != nil && errors.As(err, &exerr) {
380 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
381 if status.Signaled() && status.Signal() == syscall.SIGKILL {
382 // Process was killed externally (most likely by our context being canceled).
383 // This is a normal exit for us, so return nil
384 return nil
385 }
386 exerr.Stderr = stdErrBuf.Bytes()
387 newErr := launch.QEMUError(*exerr)
388 return &newErr
389 }
390 return err
391}
392
393func copyFile(src, dst string) error {
394 in, err := os.Open(src)
395 if err != nil {
396 return fmt.Errorf("when opening source: %w", err)
397 }
398 defer in.Close()
399
400 out, err := os.Create(dst)
401 if err != nil {
402 return fmt.Errorf("when creating destination: %w", err)
403 }
404 defer out.Close()
405
406 _, err = io.Copy(out, in)
407 if err != nil {
408 return fmt.Errorf("when copying file: %w", err)
409 }
410 return out.Close()
411}
412
Serge Bazanskie78a0892021-10-07 17:03:49 +0200413// getNodes wraps around Management.GetNodes to return a list of nodes in a
414// cluster.
415func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200416 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100417 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100418 err := backoff.Retry(func() error {
419 res = nil
420 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200421 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100422 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200423 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100424 for {
425 node, err := srvN.Recv()
426 if err == io.EOF {
427 break
428 }
429 if err != nil {
430 return fmt.Errorf("GetNodes.Recv: %w", err)
431 }
432 res = append(res, node)
433 }
434 return nil
435 }, bo)
436 if err != nil {
437 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200438 }
439 return res, nil
440}
441
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200442// getNode wraps Management.GetNodes. It returns node information matching
443// given node ID.
444func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
445 nodes, err := getNodes(ctx, mgmt)
446 if err != nil {
447 return nil, fmt.Errorf("could not get nodes: %w", err)
448 }
449 for _, n := range nodes {
450 eid := identity.NodeID(n.Pubkey)
451 if eid != id {
452 continue
453 }
454 return n, nil
455 }
456 return nil, fmt.Errorf("no such node.")
457}
458
Serge Bazanski66e58952021-10-05 17:06:56 +0200459// Gets a random EUI-48 Ethernet MAC address
460func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
461 macBuf := make([]byte, 6)
462 _, err := rand.Read(macBuf)
463 if err != nil {
464 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
465 }
466
467 // Set U/L bit and clear I/G bit (locally administered individual MAC)
468 // Ref IEEE 802-2014 Section 8.2.2
469 macBuf[0] = (macBuf[0] | 2) & 0xfe
470 mac := net.HardwareAddr(macBuf)
471 return &mac, nil
472}
473
Serge Bazanskibe742842022-04-04 13:18:50 +0200474const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200475
Serge Bazanskibe742842022-04-04 13:18:50 +0200476// ClusterPorts contains all ports handled by Nanoswitch.
477var ClusterPorts = []uint16{
478 // Forwarded to the first node.
479 uint16(node.CuratorServicePort),
480 uint16(node.DebugServicePort),
481 uint16(node.KubernetesAPIPort),
482 uint16(node.KubernetesAPIWrappedPort),
483
484 // SOCKS proxy to the switch network
485 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200486}
487
488// ClusterOptions contains all options for launching a Metropolis cluster.
489type ClusterOptions struct {
490 // The number of nodes this cluster should be started with.
491 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100492
493 // If true, node logs will be saved to individual files instead of being printed
494 // out to stderr. The path of these files will be still printed to stdout.
495 //
496 // The files will be located within the launch directory inside TEST_TMPDIR (or
497 // the default tempdir location, if not set).
498 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200499
500 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
501 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
502 // incomplete.
503 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200504
505 // Optional local registry which will be made available to the cluster to
506 // pull images from. This is a more efficient alternative to preseeding all
507 // images used for testing.
508 LocalRegistry *localregistry.Server
Serge Bazanski66e58952021-10-05 17:06:56 +0200509}
510
511// Cluster is the running Metropolis cluster launched using the LaunchCluster
512// function.
513type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200514 // Owner is the TLS Certificate of the owner of the test cluster. This can be
515 // used to authenticate further clients to the running cluster.
516 Owner tls.Certificate
517 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200518 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200519 Ports launch.PortMap
520
Serge Bazanskibe742842022-04-04 13:18:50 +0200521 // Nodes is a map from Node ID to its runtime information.
522 Nodes map[string]*NodeInCluster
523 // NodeIDs is a list of node IDs that are backing this cluster, in order of
524 // creation.
525 NodeIDs []string
526
Serge Bazanski54e212a2023-06-14 13:45:11 +0200527 // CACertificate is the cluster's CA certificate.
528 CACertificate *x509.Certificate
529
Serge Bazanski66e58952021-10-05 17:06:56 +0200530 // nodesDone is a list of channels populated with the return codes from all the
531 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100532 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200533 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200534 // nodeOpts are the cluster member nodes' mutable launch options, kept here
535 // to facilitate reboots.
536 nodeOpts []NodeOptions
537 // launchDir points at the directory keeping the nodes' state, such as storage
538 // images, firmware variable files, TPM state.
539 launchDir string
540 // socketDir points at the directory keeping UNIX socket files, such as these
541 // used to facilitate communication between QEMU and swtpm. It's different
542 // from launchDir, and anchored nearer the file system root, due to the
543 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100544 socketDir string
545 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200546
Serge Bazanskibe742842022-04-04 13:18:50 +0200547 // socksDialer is used by DialNode to establish connections to nodes via the
548 // SOCKS server ran by nanoswitch.
549 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200550
551 // authClient is a cached authenticated owner connection to a Curator
552 // instance within the cluster.
553 authClient *grpc.ClientConn
554
555 // ctxT is the context individual node contexts are created from.
556 ctxT context.Context
557 // ctxC is used by Close to cancel the context under which the nodes are
558 // running.
559 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200560}
561
562// NodeInCluster represents information about a node that's part of a Cluster.
563type NodeInCluster struct {
564 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200565 ID string
566 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200567 // Address of the node on the network ran by nanoswitch. Not reachable from the
568 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
569 // on Cluster.Ports[SOCKSPort]).
570 ManagementAddress string
571}
572
573// firstConnection performs the initial owner credential escrow with a newly
574// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
575// running at 10.1.0.2, which is always the case with the current nanoswitch
576// implementation.
577//
Leopold20a036e2023-01-15 00:17:19 +0100578// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200579// information as NodeInCluster.
580func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
581 // Dial external service.
582 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
583 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
584 if err != nil {
585 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
586 }
587 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
588 return socksDialer.Dial("tcp", addr)
589 }
590 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
591 if err != nil {
592 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
593 }
594 defer initClient.Close()
595
596 // Retrieve owner certificate - this can take a while because the node is still
597 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100598 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200599 aaa := apb.NewAAAClient(initClient)
600 var cert *tls.Certificate
601 err = backoff.Retry(func() error {
602 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
603 if st, ok := status.FromError(err); ok {
604 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100605 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200606 return err
607 }
608 }
609 return backoff.Permanent(err)
610 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
611 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200612 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200613 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100614 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200615
616 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200617 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200618 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
619 if err != nil {
620 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
621 }
622 defer authClient.Close()
623 mgmt := apb.NewManagementClient(authClient)
624
625 var node *NodeInCluster
626 err = backoff.Retry(func() error {
627 nodes, err := getNodes(ctx, mgmt)
628 if err != nil {
629 return fmt.Errorf("retrieving nodes failed: %w", err)
630 }
631 if len(nodes) != 1 {
632 return fmt.Errorf("expected one node, got %d", len(nodes))
633 }
634 n := nodes[0]
635 if n.Status == nil || n.Status.ExternalAddress == "" {
636 return fmt.Errorf("node has no status and/or address")
637 }
638 node = &NodeInCluster{
639 ID: identity.NodeID(n.Pubkey),
640 ManagementAddress: n.Status.ExternalAddress,
641 }
642 return nil
643 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
644 if err != nil {
645 return nil, nil, err
646 }
647
648 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200649}
650
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100651func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200652 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100653 if err != nil {
654 return nil, err
655 }
656 return f, nil
657}
658
Serge Bazanski66e58952021-10-05 17:06:56 +0200659// LaunchCluster launches a cluster of Metropolis node VMs together with a
660// Nanoswitch instance to network them all together.
661//
662// The given context will be used to run all qemu instances in the cluster, and
663// canceling the context or calling Close() will terminate them.
664func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200665 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200666 return nil, errors.New("refusing to start cluster with zero nodes")
667 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200668
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200669 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100670 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200671 if err != nil {
672 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
673 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100674 // Create the metroctl config directory. We keep it in /tmp because in some
675 // scenarios it's end-user visible and we want it short.
676 md, err := os.MkdirTemp("/tmp", "metroctl-*")
677 if err != nil {
678 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
679 }
680
681 // Create the socket directory. We keep it in /tmp because of socket path limits.
682 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200683 if err != nil {
684 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
685 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200686
687 // Prepare links between nodes and nanoswitch.
688 var switchPorts []*os.File
689 var vmPorts []*os.File
690 for i := 0; i < opts.NumNodes; i++ {
691 switchPort, vmPort, err := launch.NewSocketPair()
692 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200693 return nil, fmt.Errorf("failed to get socketpair: %w", err)
694 }
695 switchPorts = append(switchPorts, switchPort)
696 vmPorts = append(vmPorts, vmPort)
697 }
698
Serge Bazanskie78a0892021-10-07 17:03:49 +0200699 // Make a list of channels that will be populated by all running node qemu
700 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200701 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200702 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200703 done[i] = make(chan error, 1)
704 }
705
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200706 // Prepare the node options. These will be kept as part of Cluster.
707 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
708 // launch. The runtime information can be later used to restart a node.
709 // The 0th node will be initialized first. The rest will follow after it
710 // had bootstrapped the cluster.
711 nodeOpts := make([]NodeOptions, opts.NumNodes)
712 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100713 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200714 ConnectToSocket: vmPorts[0],
715 NodeParameters: &apb.NodeParameters{
716 Cluster: &apb.NodeParameters_ClusterBootstrap_{
717 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
718 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200719 },
720 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200721 },
722 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100723 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200724 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100725 if opts.NodeLogsToFiles {
726 path := path.Join(ld, "node-1.txt")
727 port, err := NewSerialFileLogger(path)
728 if err != nil {
729 return nil, fmt.Errorf("could not open log file for node 1: %w", err)
730 }
731 launch.Log("Node 1 logs at %s", path)
732 nodeOpts[0].SerialPort = port
733 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200734
735 // Start the first node.
736 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100737 launch.Log("Cluster: Starting node %d...", 1)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200738 go func() {
739 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200740 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100741 launch.Log("Node %d finished with an error: %v", 1, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200742 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200743 done[0] <- err
744 }()
745
Lorenz Brun150f24a2023-07-13 20:11:06 +0200746 localRegistryAddr := net.TCPAddr{
747 IP: net.IPv4(10, 42, 0, 82),
748 Port: 5000,
749 }
750
751 var guestSvcMap launch.GuestServiceMap
752 if opts.LocalRegistry != nil {
753 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
754 if err != nil {
755 ctxC()
756 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
757 }
758 s := http.Server{
759 Handler: opts.LocalRegistry,
760 }
761 go s.Serve(l)
762 go func() {
763 <-ctxT.Done()
764 s.Close()
765 }()
766 guestSvcMap = launch.GuestServiceMap{
767 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
768 }
769 }
770
Serge Bazanskie78a0892021-10-07 17:03:49 +0200771 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200772 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
773 if err != nil {
774 ctxC()
775 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
776 }
777
778 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100779 var serialPort io.ReadWriter
780 if opts.NodeLogsToFiles {
781 path := path.Join(ld, "nanoswitch.txt")
782 serialPort, err = NewSerialFileLogger(path)
783 if err != nil {
784 launch.Log("Could not open log file for nanoswitch: %v", err)
785 }
786 launch.Log("Nanoswitch logs at %s", path)
787 } else {
788 serialPort = newPrefixedStdio(99)
789 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200790 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100791 Name: "nanoswitch",
Serge Bazanski66e58952021-10-05 17:06:56 +0200792 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brunb6a9d3c2022-01-27 18:56:20 +0100793 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.lz4",
Serge Bazanski66e58952021-10-05 17:06:56 +0200794 ExtraNetworkInterfaces: switchPorts,
795 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200796 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100797 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100798 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200799 }); err != nil {
800 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100801 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200802 }
803 }
804 }()
805
Serge Bazanskibe742842022-04-04 13:18:50 +0200806 // Build SOCKS dialer.
807 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
808 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200809 if err != nil {
810 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200811 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200812 }
813
Serge Bazanskibe742842022-04-04 13:18:50 +0200814 // Retrieve owner credentials and first node.
815 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200816 if err != nil {
817 ctxC()
818 return nil, err
819 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200820
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100821 // Write credentials to the metroctl directory.
822 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
823 ctxC()
824 return nil, fmt.Errorf("could not write owner key: %w", err)
825 }
826 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
827 ctxC()
828 return nil, fmt.Errorf("could not write owner certificate: %w", err)
829 }
830
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200831 // Set up a partially initialized cluster instance, to be filled in in the
832 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200833 cluster := &Cluster{
834 Owner: *cert,
835 Ports: portMap,
836 Nodes: map[string]*NodeInCluster{
837 firstNode.ID: firstNode,
838 },
839 NodeIDs: []string{
840 firstNode.ID,
841 },
842
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100843 nodesDone: done,
844 nodeOpts: nodeOpts,
845 launchDir: ld,
846 socketDir: sd,
847 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200848
Serge Bazanskibe742842022-04-04 13:18:50 +0200849 socksDialer: socksDialer,
850
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200851 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200852 ctxC: ctxC,
853 }
854
855 // Now start the rest of the nodes and register them into the cluster.
856
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200857 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200858 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200859 if err != nil {
860 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200861 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200862 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200863 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200864
865 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100866 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200867 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
868 if err != nil {
869 ctxC()
870 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
871 }
872 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100873 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200874
875 // Retrieve cluster info (for directory and ca public key) to register further
876 // nodes.
877 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
878 if err != nil {
879 ctxC()
880 return nil, fmt.Errorf("GetClusterInfo: %w", err)
881 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200882 caCert, err := x509.ParseCertificate(resI.CaCertificate)
883 if err != nil {
884 ctxC()
885 return nil, fmt.Errorf("ParseCertificate: %w", err)
886 }
887 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200888
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200889 // Use the retrieved information to configure the rest of the node options.
890 for i := 1; i < opts.NumNodes; i++ {
891 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100892 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200893 ConnectToSocket: vmPorts[i],
894 NodeParameters: &apb.NodeParameters{
895 Cluster: &apb.NodeParameters_ClusterRegister_{
896 ClusterRegister: &apb.NodeParameters_ClusterRegister{
897 RegisterTicket: ticket,
898 ClusterDirectory: resI.ClusterDirectory,
899 CaCertificate: resI.CaCertificate,
900 },
901 },
902 },
903 SerialPort: newPrefixedStdio(i),
904 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100905 if opts.NodeLogsToFiles {
906 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
907 port, err := NewSerialFileLogger(path)
908 if err != nil {
909 return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
910 }
911 launch.Log("Node %d logs at %s", i+1, path)
912 nodeOpts[i].SerialPort = port
913 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200914 }
915
916 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200917 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100918 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200919 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200920 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200921 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100922 launch.Log("Node %d finished with an error: %v", i, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200923 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200924 done[i] <- err
925 }(i)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200926 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200927
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200928 seenNodes := make(map[string]bool)
929 launch.Log("Cluster: waiting for nodes to appear as NEW...")
930 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200931 for {
932 nodes, err := getNodes(ctx, mgmt)
933 if err != nil {
934 ctxC()
935 return nil, fmt.Errorf("could not get nodes: %w", err)
936 }
937 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200938 if n.State != cpb.NodeState_NODE_STATE_NEW {
939 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +0200940 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200941 seenNodes[n.Id] = true
942 cluster.Nodes[n.Id] = &NodeInCluster{
943 ID: n.Id,
944 Pubkey: n.Pubkey,
945 }
946 cluster.NodeIDs = append(cluster.NodeIDs, n.Id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200947 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200948
949 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200950 break
951 }
952 time.Sleep(1 * time.Second)
953 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200954 }
955 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200956
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200957 approvedNodes := make(map[string]bool)
958 upNodes := make(map[string]bool)
959 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200960 for {
961 nodes, err := getNodes(ctx, mgmt)
962 if err != nil {
963 ctxC()
964 return nil, fmt.Errorf("could not get nodes: %w", err)
965 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200966 for _, node := range nodes {
967 if !seenNodes[node.Id] {
968 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200969 continue
970 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200971
972 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
973 launch.Log("Cluster: node %s is up", node.Id)
974 upNodes[node.Id] = true
975 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +0200976 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200977 if upNodes[node.Id] {
978 continue
Serge Bazanskibe742842022-04-04 13:18:50 +0200979 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200980
981 if !approvedNodes[node.Id] {
982 launch.Log("Cluster: approving node %s", node.Id)
983 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
984 Pubkey: node.Pubkey,
985 })
986 if err != nil {
987 ctxC()
988 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
989 }
990 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +0200991 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200992 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200993
994 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
995 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200996 break
997 }
Serge Bazanskibe742842022-04-04 13:18:50 +0200998 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200999 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001000 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001001
Serge Bazanski05f813b2023-03-16 17:58:39 +01001002 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +02001003 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001004 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001005 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001006 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001007
Serge Bazanskibe742842022-04-04 13:18:50 +02001008 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001009}
1010
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001011// RebootNode reboots the cluster member node matching the given index, and
1012// waits for it to rejoin the cluster. It will use the given context ctx to run
1013// cluster API requests, whereas the resulting QEMU process will be created
1014// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1015func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1016 if idx < 0 || idx >= len(c.NodeIDs) {
1017 return fmt.Errorf("index out of bounds.")
1018 }
1019 id := c.NodeIDs[idx]
1020
1021 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001022 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001023 if err != nil {
1024 return err
1025 }
1026 mgmt := apb.NewManagementClient(curC)
1027
1028 // Get the timestamp of the node's last update, as observed by Curator.
1029 // It'll be needed to make sure it had rejoined the cluster after the reboot.
1030 var is *apb.Node
1031 for {
1032 r, err := getNode(ctx, mgmt, id)
1033 if err != nil {
1034 return err
1035 }
1036
1037 // Node status may be absent if it hasn't reported to the cluster yet. Wait
1038 // for it to appear before progressing further.
1039 if r.Status != nil {
1040 is = r
1041 break
1042 }
1043 time.Sleep(time.Second)
1044 }
1045
1046 // Cancel the node's context. This will shut down QEMU.
1047 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001048 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001049 err = <-c.nodesDone[idx]
1050 if err != nil {
1051 return fmt.Errorf("while restarting node: %w", err)
1052 }
1053
1054 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001055 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001056 go func(n int) {
1057 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001058 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001059 launch.Log("Node %d finished with an error: %v", n, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001060 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001061 c.nodesDone[n] <- err
1062 }(idx)
1063
1064 // Poll Management.GetNodes until the node's timestamp is updated.
1065 for {
1066 cs, err := getNode(ctx, mgmt, id)
1067 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001068 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001069 return err
1070 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001071 launch.Log("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001072 if cs.Status == nil {
1073 continue
1074 }
Mateusz Zalega28800ad2022-07-08 14:56:02 +02001075 if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001076 break
1077 }
1078 time.Sleep(time.Second)
1079 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001080 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001081 return nil
1082}
1083
1084// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001085// nodes to stop. It returns an error if stopping the nodes failed, or one of
1086// the nodes failed to fully start in the first place.
1087func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001088 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001089 if c.authClient != nil {
1090 c.authClient.Close()
1091 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001092 c.ctxC()
1093
Leopold20a036e2023-01-15 00:17:19 +01001094 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001095 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001096 for _, c := range c.nodesDone {
1097 err := <-c
1098 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001099 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001100 }
1101 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001102 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001103 os.RemoveAll(c.launchDir)
1104 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001105 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001106 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001107 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001108}
Serge Bazanskibe742842022-04-04 13:18:50 +02001109
1110// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1111// their ID. This is performed by connecting to the cluster nanoswitch via its
1112// SOCKS proxy, and using the cluster node list for name resolution.
1113//
1114// For example:
1115//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001116// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001117func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1118 host, port, err := net.SplitHostPort(addr)
1119 if err != nil {
1120 return nil, fmt.Errorf("invalid host:port: %w", err)
1121 }
1122 // Already an IP address?
1123 if net.ParseIP(host) != nil {
1124 return c.socksDialer.Dial("tcp", addr)
1125 }
1126
1127 // Otherwise, expect a node name.
1128 node, ok := c.Nodes[host]
1129 if !ok {
1130 return nil, fmt.Errorf("unknown node %q", host)
1131 }
1132 addr = net.JoinHostPort(node.ManagementAddress, port)
1133 return c.socksDialer.Dial("tcp", addr)
1134}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001135
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001136// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1137// Kubernetes authenticating proxy using the cluster owner identity.
1138// It currently has access to everything (i.e. the cluster-admin role)
1139// via the owner-admin binding.
1140func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1141 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1142 if err != nil {
1143 // We explicitly pass an Ed25519 private key in, so this can't happen
1144 panic(err)
1145 }
1146
1147 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001148 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001149 Host: host,
1150 TLSClientConfig: rest.TLSClientConfig{
1151 // TODO(q3k): use CA certificate
1152 Insecure: true,
1153 ServerName: "kubernetes.default.svc",
1154 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1155 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1156 },
1157 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1158 return c.DialNode(ctx, address)
1159 },
1160 }
1161 return kubernetes.NewForConfig(&clientConfig)
1162}
1163
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001164// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1165// which are currently Kubernetes controllers, ie. run an apiserver. This list
1166// might be empty if no node is currently configured with the
1167// 'KubernetesController' node.
1168func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1169 curC, err := c.CuratorClient()
1170 if err != nil {
1171 return nil, err
1172 }
1173 mgmt := apb.NewManagementClient(curC)
1174 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1175 Filter: "has(node.roles.kubernetes_controller)",
1176 })
1177 if err != nil {
1178 return nil, err
1179 }
1180 defer srv.CloseSend()
1181 var res []string
1182 for {
1183 n, err := srv.Recv()
1184 if err == io.EOF {
1185 break
1186 }
1187 if err != nil {
1188 return nil, err
1189 }
1190 if n.Status == nil || n.Status.ExternalAddress == "" {
1191 continue
1192 }
1193 res = append(res, n.Status.ExternalAddress)
1194 }
1195 return res, nil
1196}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001197
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001198// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1199// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001200func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1201 // Get an authenticated owner client within the cluster.
1202 curC, err := c.CuratorClient()
1203 if err != nil {
1204 return err
1205 }
1206 mgmt := apb.NewManagementClient(curC)
1207 nodes, err := getNodes(ctx, mgmt)
1208 if err != nil {
1209 return err
1210 }
1211
1212 var unhealthy []string
1213 for _, node := range nodes {
1214 if node.Health == apb.Node_HEALTHY {
1215 continue
1216 }
1217 unhealthy = append(unhealthy, node.Id)
1218 }
1219 if len(unhealthy) == 0 {
1220 return nil
1221 }
1222 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1223}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001224
1225// ApproveNode approves a node by ID, waiting for it to become UP.
1226func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1227 curC, err := c.CuratorClient()
1228 if err != nil {
1229 return err
1230 }
1231 mgmt := apb.NewManagementClient(curC)
1232
1233 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1234 Pubkey: c.Nodes[id].Pubkey,
1235 })
1236 if err != nil {
1237 return fmt.Errorf("ApproveNode: %w", err)
1238 }
1239 launch.Log("Cluster: %s: approved, waiting for UP", id)
1240 for {
1241 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1242 if err != nil {
1243 return fmt.Errorf("GetNodes: %w", err)
1244 }
1245 found := false
1246 for {
1247 node, err := nodes.Recv()
1248 if errors.Is(err, io.EOF) {
1249 break
1250 }
1251 if err != nil {
1252 return fmt.Errorf("Nodes.Recv: %w", err)
1253 }
1254 if node.Id != id {
1255 continue
1256 }
1257 if node.State != cpb.NodeState_NODE_STATE_UP {
1258 continue
1259 }
1260 found = true
1261 break
1262 }
1263 nodes.CloseSend()
1264
1265 if found {
1266 break
1267 }
1268 time.Sleep(time.Second)
1269 }
1270 launch.Log("Cluster: %s: UP", id)
1271 return nil
1272}
1273
1274// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1275func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1276 curC, err := c.CuratorClient()
1277 if err != nil {
1278 return err
1279 }
1280 mgmt := apb.NewManagementClient(curC)
1281
1282 tr := true
1283 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1284 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1285 Node: &apb.UpdateNodeRolesRequest_Id{
1286 Id: id,
1287 },
1288 KubernetesWorker: &tr,
1289 })
1290 return err
1291}