blob: 69d96677a7dc22010a08e01a2d1e552d5346e223 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020019 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "os"
21 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010022 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "path/filepath"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020024 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020025 "syscall"
26 "time"
27
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +010028 "github.com/bazelbuild/rules_go/go/runfiles"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020031 "golang.org/x/net/proxy"
Serge Bazanski66e58952021-10-05 17:06:56 +020032 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010033 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020035 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020036 "k8s.io/client-go/kubernetes"
37 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020038
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020039 apb "source.monogon.dev/metropolis/proto/api"
40 cpb "source.monogon.dev/metropolis/proto/common"
41
Serge Bazanski1f8cad72023-03-20 16:58:10 +010042 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Serge Bazanski66e58952021-10-05 17:06:56 +020043 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020044 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020045 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020046 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Lorenz Brun150f24a2023-07-13 20:11:06 +020047 "source.monogon.dev/metropolis/pkg/localregistry"
Serge Bazanski66e58952021-10-05 17:06:56 +020048 "source.monogon.dev/metropolis/test/launch"
49)
50
Leopold20a036e2023-01-15 00:17:19 +010051// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020052type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010053 // Name is a human-readable identifier to be used in debug output.
54 Name string
55
Serge Bazanski66e58952021-10-05 17:06:56 +020056 // Ports contains the port mapping where to expose the internal ports of the VM to
57 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
58 // ConnectToSocket is set.
59 Ports launch.PortMap
60
Leopold20a036e2023-01-15 00:17:19 +010061 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
62 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020063 // want to test reboot behavior this should be false.
64 AllowReboot bool
65
Leopold20a036e2023-01-15 00:17:19 +010066 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
67 // set, it is instead connected to the given file descriptor/socket. If this is
68 // set, all port maps from the Ports option are ignored. Intended for networking
69 // this instance together with others for running more complex network
70 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020071 ConnectToSocket *os.File
72
Leopoldacfad5b2023-01-15 14:05:25 +010073 // When PcapDump is set, all traffic is dumped to a pcap file in the
74 // runtime directory (e.g. "net0.pcap" for the first interface).
75 PcapDump bool
76
Leopold20a036e2023-01-15 00:17:19 +010077 // SerialPort is an io.ReadWriter over which you can communicate with the serial
78 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020079 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
80 SerialPort io.ReadWriter
81
82 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
83 // registering into a cluster.
84 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020085
86 // Mac is the node's MAC address.
87 Mac *net.HardwareAddr
88
89 // Runtime keeps the node's QEMU runtime state.
90 Runtime *NodeRuntime
91}
92
Leopold20a036e2023-01-15 00:17:19 +010093// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020094type NodeRuntime struct {
95 // ld points at the node's launch directory storing data such as storage
96 // images, firmware variables or the TPM state.
97 ld string
98 // sd points at the node's socket directory.
99 sd string
100
101 // ctxT is the context QEMU will execute in.
102 ctxT context.Context
103 // CtxC is the QEMU context's cancellation function.
104 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200105}
106
107// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200108var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200109 node.ConsensusPort,
110
111 node.CuratorServicePort,
112 node.DebugServicePort,
113
114 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100115 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200116 node.CuratorServicePort,
117 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200118 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200119}
120
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200121// setupRuntime creates the node's QEMU runtime directory, together with all
122// files required to preserve its state, a level below the chosen path ld. The
123// node's socket directory is similarily created a level below sd. It may
124// return an I/O error.
125func setupRuntime(ld, sd string) (*NodeRuntime, error) {
126 // Create a temporary directory to keep all the runtime files.
127 stdp, err := os.MkdirTemp(ld, "node_state*")
128 if err != nil {
129 return nil, fmt.Errorf("failed to create the state directory: %w", err)
130 }
131
132 // Initialize the node's storage with a prebuilt image.
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100133 si, err := runfiles.Rlocation("_main/metropolis/node/image.img")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200134 if err != nil {
135 return nil, fmt.Errorf("while resolving a path: %w", err)
136 }
137 di := filepath.Join(stdp, filepath.Base(si))
Serge Bazanski05f813b2023-03-16 17:58:39 +0100138 launch.Log("Cluster: copying node image: %s -> %s", si, di)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200139 if err := copyFile(si, di); err != nil {
140 return nil, fmt.Errorf("while copying the node image: %w", err)
141 }
142
143 // Initialize the OVMF firmware variables file.
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100144 sv, err := runfiles.Rlocation("edk2/OVMF_VARS.fd")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200145 if err != nil {
146 return nil, fmt.Errorf("while resolving a path: %w", err)
147 }
148 dv := filepath.Join(stdp, filepath.Base(sv))
149 if err := copyFile(sv, dv); err != nil {
150 return nil, fmt.Errorf("while copying firmware variables: %w", err)
151 }
152
153 // Create the TPM state directory and initialize all files required by swtpm.
154 tpmt := filepath.Join(stdp, "tpm")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200155 if err := os.Mkdir(tpmt, 0o755); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200156 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
157 }
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100158 tpms, err := runfiles.Rlocation("_main/metropolis/node/tpm")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200159 if err != nil {
160 return nil, fmt.Errorf("while resolving a path: %w", err)
161 }
162 tpmf, err := os.ReadDir(tpms)
163 if err != nil {
164 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
165 }
166 for _, file := range tpmf {
167 name := file.Name()
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100168 src, err := runfiles.Rlocation(filepath.Join(tpms, name))
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200169 if err != nil {
170 return nil, fmt.Errorf("while resolving a path: %w", err)
171 }
172 tgt := filepath.Join(tpmt, name)
173 if err := copyFile(src, tgt); err != nil {
174 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
175 }
176 }
177
178 // Create the socket directory.
179 sotdp, err := os.MkdirTemp(sd, "node_sock*")
180 if err != nil {
181 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
182 }
183
184 return &NodeRuntime{
185 ld: stdp,
186 sd: sotdp,
187 }, nil
188}
189
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200190// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200191// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200192func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200193 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200194 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200195 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100196 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200197 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200198 for _, n := range c.NodeIDs {
199 ep, err := resolver.NodeWithDefaultPort(n)
200 if err != nil {
201 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
202 }
203 r.AddEndpoint(ep)
204 }
205 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
206 grpc.WithTransportCredentials(authCreds),
207 grpc.WithResolvers(r),
208 grpc.WithContextDialer(c.DialNode),
209 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200210 if err != nil {
211 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
212 }
213 c.authClient = authClient
214 }
215 return c.authClient, nil
216}
217
Serge Bazanski66e58952021-10-05 17:06:56 +0200218// LaunchNode launches a single Metropolis node instance with the given options.
219// The instance runs mostly paravirtualized but with some emulated hardware
220// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200221// writable, and the changes are kept across reboots and shutdowns. ld and sd
222// point to the launch directory and the socket directory, holding the nodes'
223// state files (storage, tpm state, firmware state), and UNIX socket files
224// (swtpm <-> QEMU interplay) respectively. The directories must exist before
225// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
226// if either are not initialized.
227func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions) error {
228 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
229 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200230 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
231 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
232 // that we open and pass the name of it to QEMU. Not pinning this crashes both
233 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
234 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200235
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200236 // If it's the node's first start, set up its runtime directories.
237 if options.Runtime == nil {
238 r, err := setupRuntime(ld, sd)
239 if err != nil {
240 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200241 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200242 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200243 }
244
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200245 // Replace the node's context with a new one.
246 r := options.Runtime
247 if r.CtxC != nil {
248 r.CtxC()
249 }
250 r.ctxT, r.CtxC = context.WithCancel(ctx)
251
Serge Bazanski66e58952021-10-05 17:06:56 +0200252 var qemuNetType string
253 var qemuNetConfig launch.QemuValue
254 if options.ConnectToSocket != nil {
255 qemuNetType = "socket"
256 qemuNetConfig = launch.QemuValue{
257 "id": {"net0"},
258 "fd": {"3"},
259 }
260 } else {
261 qemuNetType = "user"
262 qemuNetConfig = launch.QemuValue{
263 "id": {"net0"},
264 "net": {"10.42.0.0/24"},
265 "dhcpstart": {"10.42.0.10"},
266 "hostfwd": options.Ports.ToQemuForwards(),
267 }
268 }
269
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200270 // Generate the node's MAC address if it isn't already set in NodeOptions.
271 if options.Mac == nil {
272 mac, err := generateRandomEthernetMAC()
273 if err != nil {
274 return err
275 }
276 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200277 }
278
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100279 ovmfCodePath, err := runfiles.Rlocation("edk2/OVMF_CODE.fd")
280 if err != nil {
281 return err
282 }
283
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200284 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
285 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Lorenz Brun1dc60af2023-10-03 15:40:09 +0200286 storagePath := filepath.Join(r.ld, "image.img")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200287 qemuArgs := []string{
288 "-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "4096",
Serge Bazanski66e58952021-10-05 17:06:56 +0200289 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100290 "-drive", "if=pflash,format=raw,readonly=on,file=" + ovmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200291 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
292 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200293 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200294 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200295 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
296 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
297 "-device", "tpm-tis,tpmdev=tpm0",
298 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200299 "-serial", "stdio",
300 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200301
302 if !options.AllowReboot {
303 qemuArgs = append(qemuArgs, "-no-reboot")
304 }
305
306 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200307 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200308 parametersRaw, err := proto.Marshal(options.NodeParameters)
309 if err != nil {
310 return fmt.Errorf("failed to encode node paraeters: %w", err)
311 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200312 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200313 return fmt.Errorf("failed to write node parameters: %w", err)
314 }
315 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
316 }
317
Leopoldacfad5b2023-01-15 14:05:25 +0100318 if options.PcapDump {
319 var qemuNetDump launch.QemuValue
320 pcapPath := filepath.Join(r.ld, "net0.pcap")
321 if options.PcapDump {
322 qemuNetDump = launch.QemuValue{
323 "id": {"net0"},
324 "netdev": {"net0"},
325 "file": {pcapPath},
326 }
327 }
328 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
329 }
330
Serge Bazanski66e58952021-10-05 17:06:56 +0200331 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200332 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200333 defer tpmCancel()
334
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200335 tpmd := filepath.Join(r.ld, "tpm")
336 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200337 tpmEmuCmd.Stderr = os.Stderr
338 tpmEmuCmd.Stdout = os.Stdout
339
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100340 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200341 if err != nil {
342 return fmt.Errorf("failed to start TPM emulator: %w", err)
343 }
344
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200345 // Wait for the socket to be created by the TPM emulator before launching
346 // QEMU.
347 for {
348 _, err := os.Stat(tpmSocketPath)
349 if err == nil {
350 break
351 }
352 if err != nil && !os.IsNotExist(err) {
353 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
354 }
355 if err := tpmCtx.Err(); err != nil {
356 return fmt.Errorf("while waiting for the TPM socket: %w", err)
357 }
358 time.Sleep(time.Millisecond * 100)
359 }
360
Serge Bazanski66e58952021-10-05 17:06:56 +0200361 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200362 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200363 if options.ConnectToSocket != nil {
364 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
365 }
366
367 var stdErrBuf bytes.Buffer
368 systemCmd.Stderr = &stdErrBuf
369 systemCmd.Stdout = options.SerialPort
370
Leopoldaf5086b2023-01-15 14:12:42 +0100371 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
372
Serge Bazanski66e58952021-10-05 17:06:56 +0200373 err = systemCmd.Run()
374
375 // Stop TPM emulator and wait for it to exit to properly reap the child process
376 tpmCancel()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100377 launch.Log("Node: Waiting for TPM emulator to exit")
Serge Bazanski66e58952021-10-05 17:06:56 +0200378 // Wait returns a SIGKILL error because we just cancelled its context.
379 // We still need to call it to avoid creating zombies.
380 _ = tpmEmuCmd.Wait()
Serge Bazanski05f813b2023-03-16 17:58:39 +0100381 launch.Log("Node: TPM emulator done")
Serge Bazanski66e58952021-10-05 17:06:56 +0200382
383 var exerr *exec.ExitError
384 if err != nil && errors.As(err, &exerr) {
385 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
386 if status.Signaled() && status.Signal() == syscall.SIGKILL {
387 // Process was killed externally (most likely by our context being canceled).
388 // This is a normal exit for us, so return nil
389 return nil
390 }
391 exerr.Stderr = stdErrBuf.Bytes()
392 newErr := launch.QEMUError(*exerr)
393 return &newErr
394 }
395 return err
396}
397
398func copyFile(src, dst string) error {
399 in, err := os.Open(src)
400 if err != nil {
401 return fmt.Errorf("when opening source: %w", err)
402 }
403 defer in.Close()
404
405 out, err := os.Create(dst)
406 if err != nil {
407 return fmt.Errorf("when creating destination: %w", err)
408 }
409 defer out.Close()
410
411 _, err = io.Copy(out, in)
412 if err != nil {
413 return fmt.Errorf("when copying file: %w", err)
414 }
415 return out.Close()
416}
417
Serge Bazanskie78a0892021-10-07 17:03:49 +0200418// getNodes wraps around Management.GetNodes to return a list of nodes in a
419// cluster.
420func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200421 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100422 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100423 err := backoff.Retry(func() error {
424 res = nil
425 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200426 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100427 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200428 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100429 for {
430 node, err := srvN.Recv()
431 if err == io.EOF {
432 break
433 }
434 if err != nil {
435 return fmt.Errorf("GetNodes.Recv: %w", err)
436 }
437 res = append(res, node)
438 }
439 return nil
440 }, bo)
441 if err != nil {
442 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200443 }
444 return res, nil
445}
446
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200447// getNode wraps Management.GetNodes. It returns node information matching
448// given node ID.
449func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
450 nodes, err := getNodes(ctx, mgmt)
451 if err != nil {
452 return nil, fmt.Errorf("could not get nodes: %w", err)
453 }
454 for _, n := range nodes {
455 eid := identity.NodeID(n.Pubkey)
456 if eid != id {
457 continue
458 }
459 return n, nil
460 }
461 return nil, fmt.Errorf("no such node.")
462}
463
Serge Bazanski66e58952021-10-05 17:06:56 +0200464// Gets a random EUI-48 Ethernet MAC address
465func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
466 macBuf := make([]byte, 6)
467 _, err := rand.Read(macBuf)
468 if err != nil {
469 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
470 }
471
472 // Set U/L bit and clear I/G bit (locally administered individual MAC)
473 // Ref IEEE 802-2014 Section 8.2.2
474 macBuf[0] = (macBuf[0] | 2) & 0xfe
475 mac := net.HardwareAddr(macBuf)
476 return &mac, nil
477}
478
Serge Bazanskibe742842022-04-04 13:18:50 +0200479const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200480
Serge Bazanskibe742842022-04-04 13:18:50 +0200481// ClusterPorts contains all ports handled by Nanoswitch.
482var ClusterPorts = []uint16{
483 // Forwarded to the first node.
484 uint16(node.CuratorServicePort),
485 uint16(node.DebugServicePort),
486 uint16(node.KubernetesAPIPort),
487 uint16(node.KubernetesAPIWrappedPort),
488
489 // SOCKS proxy to the switch network
490 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200491}
492
493// ClusterOptions contains all options for launching a Metropolis cluster.
494type ClusterOptions struct {
495 // The number of nodes this cluster should be started with.
496 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100497
498 // If true, node logs will be saved to individual files instead of being printed
499 // out to stderr. The path of these files will be still printed to stdout.
500 //
501 // The files will be located within the launch directory inside TEST_TMPDIR (or
502 // the default tempdir location, if not set).
503 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200504
505 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
506 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
507 // incomplete.
508 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200509
510 // Optional local registry which will be made available to the cluster to
511 // pull images from. This is a more efficient alternative to preseeding all
512 // images used for testing.
513 LocalRegistry *localregistry.Server
Serge Bazanski66e58952021-10-05 17:06:56 +0200514}
515
516// Cluster is the running Metropolis cluster launched using the LaunchCluster
517// function.
518type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200519 // Owner is the TLS Certificate of the owner of the test cluster. This can be
520 // used to authenticate further clients to the running cluster.
521 Owner tls.Certificate
522 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200523 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200524 Ports launch.PortMap
525
Serge Bazanskibe742842022-04-04 13:18:50 +0200526 // Nodes is a map from Node ID to its runtime information.
527 Nodes map[string]*NodeInCluster
528 // NodeIDs is a list of node IDs that are backing this cluster, in order of
529 // creation.
530 NodeIDs []string
531
Serge Bazanski54e212a2023-06-14 13:45:11 +0200532 // CACertificate is the cluster's CA certificate.
533 CACertificate *x509.Certificate
534
Serge Bazanski66e58952021-10-05 17:06:56 +0200535 // nodesDone is a list of channels populated with the return codes from all the
536 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100537 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200538 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200539 // nodeOpts are the cluster member nodes' mutable launch options, kept here
540 // to facilitate reboots.
541 nodeOpts []NodeOptions
542 // launchDir points at the directory keeping the nodes' state, such as storage
543 // images, firmware variable files, TPM state.
544 launchDir string
545 // socketDir points at the directory keeping UNIX socket files, such as these
546 // used to facilitate communication between QEMU and swtpm. It's different
547 // from launchDir, and anchored nearer the file system root, due to the
548 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100549 socketDir string
550 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200551
Serge Bazanskibe742842022-04-04 13:18:50 +0200552 // socksDialer is used by DialNode to establish connections to nodes via the
553 // SOCKS server ran by nanoswitch.
554 socksDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200555
556 // authClient is a cached authenticated owner connection to a Curator
557 // instance within the cluster.
558 authClient *grpc.ClientConn
559
560 // ctxT is the context individual node contexts are created from.
561 ctxT context.Context
562 // ctxC is used by Close to cancel the context under which the nodes are
563 // running.
564 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200565}
566
567// NodeInCluster represents information about a node that's part of a Cluster.
568type NodeInCluster struct {
569 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200570 ID string
571 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200572 // Address of the node on the network ran by nanoswitch. Not reachable from the
573 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
574 // on Cluster.Ports[SOCKSPort]).
575 ManagementAddress string
576}
577
578// firstConnection performs the initial owner credential escrow with a newly
579// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
580// running at 10.1.0.2, which is always the case with the current nanoswitch
581// implementation.
582//
Leopold20a036e2023-01-15 00:17:19 +0100583// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200584// information as NodeInCluster.
585func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
586 // Dial external service.
587 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100588 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200589 if err != nil {
590 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
591 }
592 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
593 return socksDialer.Dial("tcp", addr)
594 }
595 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
596 if err != nil {
597 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
598 }
599 defer initClient.Close()
600
601 // Retrieve owner certificate - this can take a while because the node is still
602 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100603 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200604 aaa := apb.NewAAAClient(initClient)
605 var cert *tls.Certificate
606 err = backoff.Retry(func() error {
607 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
608 if st, ok := status.FromError(err); ok {
609 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100610 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200611 return err
612 }
613 }
614 return backoff.Permanent(err)
615 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
616 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200617 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200618 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100619 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200620
621 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200622 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200623 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
624 if err != nil {
625 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
626 }
627 defer authClient.Close()
628 mgmt := apb.NewManagementClient(authClient)
629
630 var node *NodeInCluster
631 err = backoff.Retry(func() error {
632 nodes, err := getNodes(ctx, mgmt)
633 if err != nil {
634 return fmt.Errorf("retrieving nodes failed: %w", err)
635 }
636 if len(nodes) != 1 {
637 return fmt.Errorf("expected one node, got %d", len(nodes))
638 }
639 n := nodes[0]
640 if n.Status == nil || n.Status.ExternalAddress == "" {
641 return fmt.Errorf("node has no status and/or address")
642 }
643 node = &NodeInCluster{
644 ID: identity.NodeID(n.Pubkey),
645 ManagementAddress: n.Status.ExternalAddress,
646 }
647 return nil
648 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
649 if err != nil {
650 return nil, nil, err
651 }
652
653 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200654}
655
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100656func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200657 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100658 if err != nil {
659 return nil, err
660 }
661 return f, nil
662}
663
Serge Bazanski66e58952021-10-05 17:06:56 +0200664// LaunchCluster launches a cluster of Metropolis node VMs together with a
665// Nanoswitch instance to network them all together.
666//
667// The given context will be used to run all qemu instances in the cluster, and
668// canceling the context or calling Close() will terminate them.
669func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200670 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200671 return nil, errors.New("refusing to start cluster with zero nodes")
672 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200673
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200674 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100675 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200676 if err != nil {
677 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
678 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100679 // Create the metroctl config directory. We keep it in /tmp because in some
680 // scenarios it's end-user visible and we want it short.
681 md, err := os.MkdirTemp("/tmp", "metroctl-*")
682 if err != nil {
683 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
684 }
685
686 // Create the socket directory. We keep it in /tmp because of socket path limits.
687 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200688 if err != nil {
689 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
690 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200691
692 // Prepare links between nodes and nanoswitch.
693 var switchPorts []*os.File
694 var vmPorts []*os.File
695 for i := 0; i < opts.NumNodes; i++ {
696 switchPort, vmPort, err := launch.NewSocketPair()
697 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200698 return nil, fmt.Errorf("failed to get socketpair: %w", err)
699 }
700 switchPorts = append(switchPorts, switchPort)
701 vmPorts = append(vmPorts, vmPort)
702 }
703
Serge Bazanskie78a0892021-10-07 17:03:49 +0200704 // Make a list of channels that will be populated by all running node qemu
705 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200706 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200707 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200708 done[i] = make(chan error, 1)
709 }
710
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200711 // Prepare the node options. These will be kept as part of Cluster.
712 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
713 // launch. The runtime information can be later used to restart a node.
714 // The 0th node will be initialized first. The rest will follow after it
715 // had bootstrapped the cluster.
716 nodeOpts := make([]NodeOptions, opts.NumNodes)
717 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100718 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200719 ConnectToSocket: vmPorts[0],
720 NodeParameters: &apb.NodeParameters{
721 Cluster: &apb.NodeParameters_ClusterBootstrap_{
722 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
723 OwnerPublicKey: InsecurePublicKey,
Serge Bazanski66e58952021-10-05 17:06:56 +0200724 },
725 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200726 },
727 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100728 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200729 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100730 if opts.NodeLogsToFiles {
731 path := path.Join(ld, "node-1.txt")
732 port, err := NewSerialFileLogger(path)
733 if err != nil {
734 return nil, fmt.Errorf("could not open log file for node 1: %w", err)
735 }
736 launch.Log("Node 1 logs at %s", path)
737 nodeOpts[0].SerialPort = port
738 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200739
740 // Start the first node.
741 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100742 launch.Log("Cluster: Starting node %d...", 1)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200743 go func() {
744 err := LaunchNode(ctxT, ld, sd, &nodeOpts[0])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200745 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100746 launch.Log("Node %d finished with an error: %v", 1, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200747 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200748 done[0] <- err
749 }()
750
Lorenz Brun150f24a2023-07-13 20:11:06 +0200751 localRegistryAddr := net.TCPAddr{
752 IP: net.IPv4(10, 42, 0, 82),
753 Port: 5000,
754 }
755
756 var guestSvcMap launch.GuestServiceMap
757 if opts.LocalRegistry != nil {
758 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
759 if err != nil {
760 ctxC()
761 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
762 }
763 s := http.Server{
764 Handler: opts.LocalRegistry,
765 }
766 go s.Serve(l)
767 go func() {
768 <-ctxT.Done()
769 s.Close()
770 }()
771 guestSvcMap = launch.GuestServiceMap{
772 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
773 }
774 }
775
Serge Bazanskie78a0892021-10-07 17:03:49 +0200776 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200777 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
778 if err != nil {
779 ctxC()
780 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
781 }
782
783 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100784 var serialPort io.ReadWriter
785 if opts.NodeLogsToFiles {
786 path := path.Join(ld, "nanoswitch.txt")
787 serialPort, err = NewSerialFileLogger(path)
788 if err != nil {
789 launch.Log("Could not open log file for nanoswitch: %v", err)
790 }
791 launch.Log("Nanoswitch logs at %s", path)
792 } else {
793 serialPort = newPrefixedStdio(99)
794 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200795 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100796 Name: "nanoswitch",
Serge Bazanski66e58952021-10-05 17:06:56 +0200797 KernelPath: "metropolis/test/ktest/vmlinux",
Lorenz Brun62f1d362023-11-14 16:18:24 +0100798 InitramfsPath: "metropolis/test/nanoswitch/initramfs.cpio.zst",
Serge Bazanski66e58952021-10-05 17:06:56 +0200799 ExtraNetworkInterfaces: switchPorts,
800 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200801 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100802 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100803 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200804 }); err != nil {
805 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100806 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200807 }
808 }
809 }()
810
Serge Bazanskibe742842022-04-04 13:18:50 +0200811 // Build SOCKS dialer.
812 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
813 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200814 if err != nil {
815 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200816 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200817 }
818
Serge Bazanskibe742842022-04-04 13:18:50 +0200819 // Retrieve owner credentials and first node.
820 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200821 if err != nil {
822 ctxC()
823 return nil, err
824 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200825
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100826 // Write credentials to the metroctl directory.
827 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
828 ctxC()
829 return nil, fmt.Errorf("could not write owner key: %w", err)
830 }
831 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
832 ctxC()
833 return nil, fmt.Errorf("could not write owner certificate: %w", err)
834 }
835
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200836 // Set up a partially initialized cluster instance, to be filled in in the
837 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200838 cluster := &Cluster{
839 Owner: *cert,
840 Ports: portMap,
841 Nodes: map[string]*NodeInCluster{
842 firstNode.ID: firstNode,
843 },
844 NodeIDs: []string{
845 firstNode.ID,
846 },
847
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100848 nodesDone: done,
849 nodeOpts: nodeOpts,
850 launchDir: ld,
851 socketDir: sd,
852 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200853
Serge Bazanskibe742842022-04-04 13:18:50 +0200854 socksDialer: socksDialer,
855
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200856 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200857 ctxC: ctxC,
858 }
859
860 // Now start the rest of the nodes and register them into the cluster.
861
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200862 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200863 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200864 if err != nil {
865 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200866 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200867 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200868 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200869
870 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100871 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200872 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
873 if err != nil {
874 ctxC()
875 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
876 }
877 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100878 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200879
880 // Retrieve cluster info (for directory and ca public key) to register further
881 // nodes.
882 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
883 if err != nil {
884 ctxC()
885 return nil, fmt.Errorf("GetClusterInfo: %w", err)
886 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200887 caCert, err := x509.ParseCertificate(resI.CaCertificate)
888 if err != nil {
889 ctxC()
890 return nil, fmt.Errorf("ParseCertificate: %w", err)
891 }
892 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200893
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200894 // Use the retrieved information to configure the rest of the node options.
895 for i := 1; i < opts.NumNodes; i++ {
896 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100897 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200898 ConnectToSocket: vmPorts[i],
899 NodeParameters: &apb.NodeParameters{
900 Cluster: &apb.NodeParameters_ClusterRegister_{
901 ClusterRegister: &apb.NodeParameters_ClusterRegister{
902 RegisterTicket: ticket,
903 ClusterDirectory: resI.ClusterDirectory,
904 CaCertificate: resI.CaCertificate,
905 },
906 },
907 },
908 SerialPort: newPrefixedStdio(i),
909 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100910 if opts.NodeLogsToFiles {
911 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
912 port, err := NewSerialFileLogger(path)
913 if err != nil {
914 return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
915 }
916 launch.Log("Node %d logs at %s", i+1, path)
917 nodeOpts[i].SerialPort = port
918 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200919 }
920
921 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200922 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100923 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200924 go func(i int) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200925 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i])
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200926 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100927 launch.Log("Node %d finished with an error: %v", i, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +0200928 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200929 done[i] <- err
930 }(i)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200931 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200932
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200933 seenNodes := make(map[string]bool)
934 launch.Log("Cluster: waiting for nodes to appear as NEW...")
935 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200936 for {
937 nodes, err := getNodes(ctx, mgmt)
938 if err != nil {
939 ctxC()
940 return nil, fmt.Errorf("could not get nodes: %w", err)
941 }
942 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200943 if n.State != cpb.NodeState_NODE_STATE_NEW {
944 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +0200945 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200946 seenNodes[n.Id] = true
947 cluster.Nodes[n.Id] = &NodeInCluster{
948 ID: n.Id,
949 Pubkey: n.Pubkey,
950 }
951 cluster.NodeIDs = append(cluster.NodeIDs, n.Id)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200952 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200953
954 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200955 break
956 }
957 time.Sleep(1 * time.Second)
958 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200959 }
960 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200961
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200962 approvedNodes := make(map[string]bool)
963 upNodes := make(map[string]bool)
964 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200965 for {
966 nodes, err := getNodes(ctx, mgmt)
967 if err != nil {
968 ctxC()
969 return nil, fmt.Errorf("could not get nodes: %w", err)
970 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200971 for _, node := range nodes {
972 if !seenNodes[node.Id] {
973 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200974 continue
975 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200976
977 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
978 launch.Log("Cluster: node %s is up", node.Id)
979 upNodes[node.Id] = true
980 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +0200981 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200982 if upNodes[node.Id] {
983 continue
Serge Bazanskibe742842022-04-04 13:18:50 +0200984 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200985
986 if !approvedNodes[node.Id] {
987 launch.Log("Cluster: approving node %s", node.Id)
988 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
989 Pubkey: node.Pubkey,
990 })
991 if err != nil {
992 ctxC()
993 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
994 }
995 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +0200996 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200997 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200998
999 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
1000 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001001 break
1002 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001003 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001004 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001005 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001006
Serge Bazanski05f813b2023-03-16 17:58:39 +01001007 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +02001008 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001009 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001010 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001011 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001012
Serge Bazanskibe742842022-04-04 13:18:50 +02001013 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001014}
1015
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001016// RebootNode reboots the cluster member node matching the given index, and
1017// waits for it to rejoin the cluster. It will use the given context ctx to run
1018// cluster API requests, whereas the resulting QEMU process will be created
1019// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1020func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1021 if idx < 0 || idx >= len(c.NodeIDs) {
1022 return fmt.Errorf("index out of bounds.")
1023 }
1024 id := c.NodeIDs[idx]
1025
1026 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001027 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001028 if err != nil {
1029 return err
1030 }
1031 mgmt := apb.NewManagementClient(curC)
1032
1033 // Get the timestamp of the node's last update, as observed by Curator.
1034 // It'll be needed to make sure it had rejoined the cluster after the reboot.
1035 var is *apb.Node
1036 for {
1037 r, err := getNode(ctx, mgmt, id)
1038 if err != nil {
1039 return err
1040 }
1041
1042 // Node status may be absent if it hasn't reported to the cluster yet. Wait
1043 // for it to appear before progressing further.
1044 if r.Status != nil {
1045 is = r
1046 break
1047 }
1048 time.Sleep(time.Second)
1049 }
1050
1051 // Cancel the node's context. This will shut down QEMU.
1052 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001053 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001054 err = <-c.nodesDone[idx]
1055 if err != nil {
1056 return fmt.Errorf("while restarting node: %w", err)
1057 }
1058
1059 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001060 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001061 go func(n int) {
1062 err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[n])
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001063 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001064 launch.Log("Node %d finished with an error: %v", n, err)
Mateusz Zalega08cb4642022-05-25 17:35:59 +02001065 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001066 c.nodesDone[n] <- err
1067 }(idx)
1068
1069 // Poll Management.GetNodes until the node's timestamp is updated.
1070 for {
1071 cs, err := getNode(ctx, mgmt, id)
1072 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001073 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001074 return err
1075 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001076 launch.Log("Cluster: node status: %+v", cs)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001077 if cs.Status == nil {
1078 continue
1079 }
Mateusz Zalega28800ad2022-07-08 14:56:02 +02001080 if cs.Status.Timestamp.AsTime().Sub(is.Status.Timestamp.AsTime()) > 0 {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001081 break
1082 }
1083 time.Sleep(time.Second)
1084 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001085 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001086 return nil
1087}
1088
1089// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001090// nodes to stop. It returns an error if stopping the nodes failed, or one of
1091// the nodes failed to fully start in the first place.
1092func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001093 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001094 if c.authClient != nil {
1095 c.authClient.Close()
1096 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001097 c.ctxC()
1098
Leopold20a036e2023-01-15 00:17:19 +01001099 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001100 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001101 for _, c := range c.nodesDone {
1102 err := <-c
1103 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001104 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001105 }
1106 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001107 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001108 os.RemoveAll(c.launchDir)
1109 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001110 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001111 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001112 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001113}
Serge Bazanskibe742842022-04-04 13:18:50 +02001114
1115// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1116// their ID. This is performed by connecting to the cluster nanoswitch via its
1117// SOCKS proxy, and using the cluster node list for name resolution.
1118//
1119// For example:
1120//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001121// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001122func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1123 host, port, err := net.SplitHostPort(addr)
1124 if err != nil {
1125 return nil, fmt.Errorf("invalid host:port: %w", err)
1126 }
1127 // Already an IP address?
1128 if net.ParseIP(host) != nil {
1129 return c.socksDialer.Dial("tcp", addr)
1130 }
1131
1132 // Otherwise, expect a node name.
1133 node, ok := c.Nodes[host]
1134 if !ok {
1135 return nil, fmt.Errorf("unknown node %q", host)
1136 }
1137 addr = net.JoinHostPort(node.ManagementAddress, port)
1138 return c.socksDialer.Dial("tcp", addr)
1139}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001140
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001141// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1142// Kubernetes authenticating proxy using the cluster owner identity.
1143// It currently has access to everything (i.e. the cluster-admin role)
1144// via the owner-admin binding.
1145func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1146 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1147 if err != nil {
1148 // We explicitly pass an Ed25519 private key in, so this can't happen
1149 panic(err)
1150 }
1151
1152 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001153 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001154 Host: host,
1155 TLSClientConfig: rest.TLSClientConfig{
1156 // TODO(q3k): use CA certificate
1157 Insecure: true,
1158 ServerName: "kubernetes.default.svc",
1159 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1160 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1161 },
1162 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1163 return c.DialNode(ctx, address)
1164 },
1165 }
1166 return kubernetes.NewForConfig(&clientConfig)
1167}
1168
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001169// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1170// which are currently Kubernetes controllers, ie. run an apiserver. This list
1171// might be empty if no node is currently configured with the
1172// 'KubernetesController' node.
1173func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1174 curC, err := c.CuratorClient()
1175 if err != nil {
1176 return nil, err
1177 }
1178 mgmt := apb.NewManagementClient(curC)
1179 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1180 Filter: "has(node.roles.kubernetes_controller)",
1181 })
1182 if err != nil {
1183 return nil, err
1184 }
1185 defer srv.CloseSend()
1186 var res []string
1187 for {
1188 n, err := srv.Recv()
1189 if err == io.EOF {
1190 break
1191 }
1192 if err != nil {
1193 return nil, err
1194 }
1195 if n.Status == nil || n.Status.ExternalAddress == "" {
1196 continue
1197 }
1198 res = append(res, n.Status.ExternalAddress)
1199 }
1200 return res, nil
1201}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001202
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001203// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1204// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001205func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1206 // Get an authenticated owner client within the cluster.
1207 curC, err := c.CuratorClient()
1208 if err != nil {
1209 return err
1210 }
1211 mgmt := apb.NewManagementClient(curC)
1212 nodes, err := getNodes(ctx, mgmt)
1213 if err != nil {
1214 return err
1215 }
1216
1217 var unhealthy []string
1218 for _, node := range nodes {
1219 if node.Health == apb.Node_HEALTHY {
1220 continue
1221 }
1222 unhealthy = append(unhealthy, node.Id)
1223 }
1224 if len(unhealthy) == 0 {
1225 return nil
1226 }
1227 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1228}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001229
1230// ApproveNode approves a node by ID, waiting for it to become UP.
1231func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1232 curC, err := c.CuratorClient()
1233 if err != nil {
1234 return err
1235 }
1236 mgmt := apb.NewManagementClient(curC)
1237
1238 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1239 Pubkey: c.Nodes[id].Pubkey,
1240 })
1241 if err != nil {
1242 return fmt.Errorf("ApproveNode: %w", err)
1243 }
1244 launch.Log("Cluster: %s: approved, waiting for UP", id)
1245 for {
1246 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1247 if err != nil {
1248 return fmt.Errorf("GetNodes: %w", err)
1249 }
1250 found := false
1251 for {
1252 node, err := nodes.Recv()
1253 if errors.Is(err, io.EOF) {
1254 break
1255 }
1256 if err != nil {
1257 return fmt.Errorf("Nodes.Recv: %w", err)
1258 }
1259 if node.Id != id {
1260 continue
1261 }
1262 if node.State != cpb.NodeState_NODE_STATE_UP {
1263 continue
1264 }
1265 found = true
1266 break
1267 }
1268 nodes.CloseSend()
1269
1270 if found {
1271 break
1272 }
1273 time.Sleep(time.Second)
1274 }
1275 launch.Log("Cluster: %s: UP", id)
1276 return nil
1277}
1278
1279// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1280func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1281 curC, err := c.CuratorClient()
1282 if err != nil {
1283 return err
1284 }
1285 mgmt := apb.NewManagementClient(curC)
1286
1287 tr := true
1288 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1289 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1290 Node: &apb.UpdateNodeRolesRequest_Id{
1291 Id: id,
1292 },
1293 KubernetesWorker: &tr,
1294 })
1295 return err
1296}