blob: 143592577b82be0a7fdb99a20158bddc4c5f1674 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
Tim Windelschmidt9f21f532024-05-07 15:14:20 +02005package launch
Serge Bazanski66e58952021-10-05 17:06:56 +02006
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020019 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "os"
21 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010022 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "path/filepath"
Serge Bazanski53458ba2024-06-18 09:56:46 +000024 "strconv"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020025 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "syscall"
27 "time"
28
29 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020031 "golang.org/x/net/proxy"
Lorenz Brun87bbf7e2024-03-18 18:22:25 +010032 "golang.org/x/sys/unix"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010034 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020037 "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020039
Serge Bazanski37cfcc12024-03-21 11:59:07 +010040 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020041 apb "source.monogon.dev/metropolis/proto/api"
42 cpb "source.monogon.dev/metropolis/proto/common"
43
Serge Bazanskidd5b03c2024-05-16 18:07:06 +020044 "source.monogon.dev/go/qcow2"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010045 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Serge Bazanski66e58952021-10-05 17:06:56 +020046 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020047 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020048 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020049 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020050 "source.monogon.dev/metropolis/test/localregistry"
51 "source.monogon.dev/osbase/test/launch"
Serge Bazanski66e58952021-10-05 17:06:56 +020052)
53
Serge Bazanski53458ba2024-06-18 09:56:46 +000054const (
55 // nodeNumberKey is the key of the node label used to carry a node's numerical
56 // index in the test system.
57 nodeNumberKey string = "test-node-number"
58)
59
Leopold20a036e2023-01-15 00:17:19 +010060// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020061type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010062 // Name is a human-readable identifier to be used in debug output.
63 Name string
64
Jan Schära9b060b2024-08-07 10:42:29 +020065 // CPUs is the number of virtual CPUs of the VM.
66 CPUs int
67
68 // ThreadsPerCPU is the number of threads per CPU. This is multiplied by
69 // CPUs to get the total number of threads.
70 ThreadsPerCPU int
71
72 // MemoryMiB is the RAM size in MiB of the VM.
73 MemoryMiB int
74
Serge Bazanski66e58952021-10-05 17:06:56 +020075 // Ports contains the port mapping where to expose the internal ports of the VM to
76 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
77 // ConnectToSocket is set.
78 Ports launch.PortMap
79
Leopold20a036e2023-01-15 00:17:19 +010080 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
81 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020082 // want to test reboot behavior this should be false.
83 AllowReboot bool
84
Leopold20a036e2023-01-15 00:17:19 +010085 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
86 // set, it is instead connected to the given file descriptor/socket. If this is
87 // set, all port maps from the Ports option are ignored. Intended for networking
88 // this instance together with others for running more complex network
89 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020090 ConnectToSocket *os.File
91
Leopoldacfad5b2023-01-15 14:05:25 +010092 // When PcapDump is set, all traffic is dumped to a pcap file in the
93 // runtime directory (e.g. "net0.pcap" for the first interface).
94 PcapDump bool
95
Leopold20a036e2023-01-15 00:17:19 +010096 // SerialPort is an io.ReadWriter over which you can communicate with the serial
97 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020098 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
99 SerialPort io.ReadWriter
100
101 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
102 // registering into a cluster.
103 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200104
105 // Mac is the node's MAC address.
106 Mac *net.HardwareAddr
107
108 // Runtime keeps the node's QEMU runtime state.
109 Runtime *NodeRuntime
110}
111
Leopold20a036e2023-01-15 00:17:19 +0100112// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200113type NodeRuntime struct {
114 // ld points at the node's launch directory storing data such as storage
115 // images, firmware variables or the TPM state.
116 ld string
117 // sd points at the node's socket directory.
118 sd string
119
120 // ctxT is the context QEMU will execute in.
121 ctxT context.Context
122 // CtxC is the QEMU context's cancellation function.
123 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200124}
125
126// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200127var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200128 node.ConsensusPort,
129
130 node.CuratorServicePort,
131 node.DebugServicePort,
132
133 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100134 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200135 node.CuratorServicePort,
136 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200137 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200138}
139
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200140// setupRuntime creates the node's QEMU runtime directory, together with all
141// files required to preserve its state, a level below the chosen path ld. The
142// node's socket directory is similarily created a level below sd. It may
143// return an I/O error.
144func setupRuntime(ld, sd string) (*NodeRuntime, error) {
145 // Create a temporary directory to keep all the runtime files.
146 stdp, err := os.MkdirTemp(ld, "node_state*")
147 if err != nil {
148 return nil, fmt.Errorf("failed to create the state directory: %w", err)
149 }
150
151 // Initialize the node's storage with a prebuilt image.
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200152 di := filepath.Join(stdp, "image.qcow2")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000153 launch.Log("Cluster: generating node QCOW2 snapshot image: %s -> %s", xNodeImagePath, di)
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200154
155 df, err := os.Create(di)
156 if err != nil {
157 return nil, fmt.Errorf("while opening image for writing: %w", err)
158 }
159 defer df.Close()
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000160 if err := qcow2.Generate(df, qcow2.GenerateWithBackingFile(xNodeImagePath)); err != nil {
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200161 return nil, fmt.Errorf("while creating copy-on-write node image: %w", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200162 }
163
164 // Initialize the OVMF firmware variables file.
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000165 dv := filepath.Join(stdp, filepath.Base(xOvmfVarsPath))
166 if err := copyFile(xOvmfVarsPath, dv); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200167 return nil, fmt.Errorf("while copying firmware variables: %w", err)
168 }
169
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200170 // Create the socket directory.
171 sotdp, err := os.MkdirTemp(sd, "node_sock*")
172 if err != nil {
173 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
174 }
175
176 return &NodeRuntime{
177 ld: stdp,
178 sd: sotdp,
179 }, nil
180}
181
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200182// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200183// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200184func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200185 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200186 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200187 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100188 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200189 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200190 for _, n := range c.NodeIDs {
191 ep, err := resolver.NodeWithDefaultPort(n)
192 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200193 return nil, fmt.Errorf("could not add node %q by DNS: %w", n, err)
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200194 }
195 r.AddEndpoint(ep)
196 }
197 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
198 grpc.WithTransportCredentials(authCreds),
199 grpc.WithResolvers(r),
200 grpc.WithContextDialer(c.DialNode),
201 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200202 if err != nil {
203 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
204 }
205 c.authClient = authClient
206 }
207 return c.authClient, nil
208}
209
Serge Bazanski66e58952021-10-05 17:06:56 +0200210// LaunchNode launches a single Metropolis node instance with the given options.
211// The instance runs mostly paravirtualized but with some emulated hardware
212// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200213// writable, and the changes are kept across reboots and shutdowns. ld and sd
214// point to the launch directory and the socket directory, holding the nodes'
215// state files (storage, tpm state, firmware state), and UNIX socket files
216// (swtpm <-> QEMU interplay) respectively. The directories must exist before
217// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
218// if either are not initialized.
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200219func LaunchNode(ctx context.Context, ld, sd string, tpmFactory *TPMFactory, options *NodeOptions, doneC chan error) error {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200220 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
221 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200222 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
223 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
224 // that we open and pass the name of it to QEMU. Not pinning this crashes both
225 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
226 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200227
Jan Schära9b060b2024-08-07 10:42:29 +0200228 if options.CPUs == 0 {
229 options.CPUs = 1
230 }
231 if options.ThreadsPerCPU == 0 {
232 options.ThreadsPerCPU = 1
233 }
234 if options.MemoryMiB == 0 {
235 options.MemoryMiB = 2048
236 }
237
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200238 // If it's the node's first start, set up its runtime directories.
239 if options.Runtime == nil {
240 r, err := setupRuntime(ld, sd)
241 if err != nil {
242 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200243 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200244 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200245 }
246
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200247 // Replace the node's context with a new one.
248 r := options.Runtime
249 if r.CtxC != nil {
250 r.CtxC()
251 }
252 r.ctxT, r.CtxC = context.WithCancel(ctx)
253
Serge Bazanski66e58952021-10-05 17:06:56 +0200254 var qemuNetType string
255 var qemuNetConfig launch.QemuValue
256 if options.ConnectToSocket != nil {
257 qemuNetType = "socket"
258 qemuNetConfig = launch.QemuValue{
259 "id": {"net0"},
260 "fd": {"3"},
261 }
262 } else {
263 qemuNetType = "user"
264 qemuNetConfig = launch.QemuValue{
265 "id": {"net0"},
266 "net": {"10.42.0.0/24"},
267 "dhcpstart": {"10.42.0.10"},
268 "hostfwd": options.Ports.ToQemuForwards(),
269 }
270 }
271
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200272 // Generate the node's MAC address if it isn't already set in NodeOptions.
273 if options.Mac == nil {
274 mac, err := generateRandomEthernetMAC()
275 if err != nil {
276 return err
277 }
278 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200279 }
280
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200281 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
282 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200283 storagePath := filepath.Join(r.ld, "image.qcow2")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200284 qemuArgs := []string{
Jan Schära9b060b2024-08-07 10:42:29 +0200285 "-machine", "q35",
286 "-accel", "kvm",
287 "-nographic",
288 "-nodefaults",
289 "-cpu", "host",
290 "-m", fmt.Sprintf("%dM", options.MemoryMiB),
291 "-smp", fmt.Sprintf("cores=%d,threads=%d", options.CPUs, options.ThreadsPerCPU),
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000292 "-drive", "if=pflash,format=raw,readonly=on,file=" + xOvmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200293 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200294 "-drive", "if=virtio,format=qcow2,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200295 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200296 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200297 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
298 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
299 "-device", "tpm-tis,tpmdev=tpm0",
300 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200301 "-serial", "stdio",
302 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200303
304 if !options.AllowReboot {
305 qemuArgs = append(qemuArgs, "-no-reboot")
306 }
307
308 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200309 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200310 parametersRaw, err := proto.Marshal(options.NodeParameters)
311 if err != nil {
312 return fmt.Errorf("failed to encode node paraeters: %w", err)
313 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200314 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200315 return fmt.Errorf("failed to write node parameters: %w", err)
316 }
317 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
318 }
319
Leopoldacfad5b2023-01-15 14:05:25 +0100320 if options.PcapDump {
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200321 qemuNetDump := launch.QemuValue{
322 "id": {"net0"},
323 "netdev": {"net0"},
324 "file": {filepath.Join(r.ld, "net0.pcap")},
Leopoldacfad5b2023-01-15 14:05:25 +0100325 }
326 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
327 }
328
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200329 // Manufacture TPM if needed.
330 tpmd := filepath.Join(r.ld, "tpm")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000331 err := tpmFactory.Manufacture(ctx, tpmd, &TPMPlatform{
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200332 Manufacturer: "Monogon",
333 Version: "1.0",
334 Model: "TestCluster",
335 })
336 if err != nil {
337 return fmt.Errorf("could not manufacture TPM: %w", err)
338 }
339
Serge Bazanski66e58952021-10-05 17:06:56 +0200340 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200341 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200342
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000343 tpmEmuCmd := exec.CommandContext(tpmCtx, xSwtpmPath, "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanskib07c57a2024-06-04 14:33:27 +0000344 // Silence warnings from unsafe libtpms build (uses non-constant-time
345 // cryptographic operations).
346 tpmEmuCmd.Env = append(tpmEmuCmd.Env, "MONOGON_LIBTPMS_ACKNOWLEDGE_UNSAFE=yes")
Serge Bazanski66e58952021-10-05 17:06:56 +0200347 tpmEmuCmd.Stderr = os.Stderr
348 tpmEmuCmd.Stdout = os.Stdout
349
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100350 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200351 if err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200352 tpmCancel()
Serge Bazanski66e58952021-10-05 17:06:56 +0200353 return fmt.Errorf("failed to start TPM emulator: %w", err)
354 }
355
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200356 // Wait for the socket to be created by the TPM emulator before launching
357 // QEMU.
358 for {
359 _, err := os.Stat(tpmSocketPath)
360 if err == nil {
361 break
362 }
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200363 if !os.IsNotExist(err) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200364 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200365 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
366 }
367 if err := tpmCtx.Err(); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200368 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200369 return fmt.Errorf("while waiting for the TPM socket: %w", err)
370 }
371 time.Sleep(time.Millisecond * 100)
372 }
373
Serge Bazanski66e58952021-10-05 17:06:56 +0200374 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200375 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200376 if options.ConnectToSocket != nil {
377 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
378 }
379
380 var stdErrBuf bytes.Buffer
381 systemCmd.Stderr = &stdErrBuf
382 systemCmd.Stdout = options.SerialPort
383
Leopoldaf5086b2023-01-15 14:12:42 +0100384 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
385
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200386 go func() {
387 launch.Log("Node: Starting...")
388 err = systemCmd.Run()
389 launch.Log("Node: Returned: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200390
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200391 // Stop TPM emulator and wait for it to exit to properly reap the child process
392 tpmCancel()
393 launch.Log("Node: Waiting for TPM emulator to exit")
394 // Wait returns a SIGKILL error because we just cancelled its context.
395 // We still need to call it to avoid creating zombies.
396 errTpm := tpmEmuCmd.Wait()
397 launch.Log("Node: TPM emulator done: %v", errTpm)
Serge Bazanski66e58952021-10-05 17:06:56 +0200398
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200399 var exerr *exec.ExitError
400 if err != nil && errors.As(err, &exerr) {
401 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
402 if status.Signaled() && status.Signal() == syscall.SIGKILL {
403 // Process was killed externally (most likely by our context being canceled).
404 // This is a normal exit for us, so return nil
405 doneC <- nil
406 return
407 }
408 exerr.Stderr = stdErrBuf.Bytes()
409 newErr := launch.QEMUError(*exerr)
410 launch.Log("Node: %q", stdErrBuf.String())
411 doneC <- &newErr
412 return
Serge Bazanski66e58952021-10-05 17:06:56 +0200413 }
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200414 doneC <- err
415 }()
416 return nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200417}
418
419func copyFile(src, dst string) error {
420 in, err := os.Open(src)
421 if err != nil {
422 return fmt.Errorf("when opening source: %w", err)
423 }
424 defer in.Close()
425
426 out, err := os.Create(dst)
427 if err != nil {
428 return fmt.Errorf("when creating destination: %w", err)
429 }
430 defer out.Close()
431
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100432 endPos, err := in.Seek(0, io.SeekEnd)
Serge Bazanski66e58952021-10-05 17:06:56 +0200433 if err != nil {
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100434 return fmt.Errorf("when getting source end: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200435 }
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100436
437 // Copy the file while preserving its sparseness. The image files are very
438 // sparse (less than 10% allocated), so this is a lot faster.
439 var lastHoleStart int64
440 for {
441 dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
442 if err != nil {
443 return fmt.Errorf("when seeking to next data block: %w", err)
444 }
445 holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
446 if err != nil {
447 return fmt.Errorf("when seeking to next hole: %w", err)
448 }
449 lastHoleStart = holeStart
450 if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
451 return fmt.Errorf("when seeking to current data block: %w", err)
452 }
453 if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
454 return fmt.Errorf("when seeking output to next data block: %w", err)
455 }
456 if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
457 return fmt.Errorf("when copying file: %w", err)
458 }
459 if endPos == holeStart {
460 // The next hole is at the end of the file, we're done here.
461 break
462 }
463 }
464
Serge Bazanski66e58952021-10-05 17:06:56 +0200465 return out.Close()
466}
467
Serge Bazanskie78a0892021-10-07 17:03:49 +0200468// getNodes wraps around Management.GetNodes to return a list of nodes in a
469// cluster.
470func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200471 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100472 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100473 err := backoff.Retry(func() error {
474 res = nil
475 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200476 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100477 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200478 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100479 for {
480 node, err := srvN.Recv()
481 if err == io.EOF {
482 break
483 }
484 if err != nil {
485 return fmt.Errorf("GetNodes.Recv: %w", err)
486 }
487 res = append(res, node)
488 }
489 return nil
490 }, bo)
491 if err != nil {
492 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200493 }
494 return res, nil
495}
496
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200497// getNode wraps Management.GetNodes. It returns node information matching
498// given node ID.
499func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
500 nodes, err := getNodes(ctx, mgmt)
501 if err != nil {
502 return nil, fmt.Errorf("could not get nodes: %w", err)
503 }
504 for _, n := range nodes {
505 eid := identity.NodeID(n.Pubkey)
506 if eid != id {
507 continue
508 }
509 return n, nil
510 }
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200511 return nil, fmt.Errorf("no such node")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200512}
513
Serge Bazanski66e58952021-10-05 17:06:56 +0200514// Gets a random EUI-48 Ethernet MAC address
515func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
516 macBuf := make([]byte, 6)
517 _, err := rand.Read(macBuf)
518 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200519 return nil, fmt.Errorf("failed to read randomness for MAC: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200520 }
521
522 // Set U/L bit and clear I/G bit (locally administered individual MAC)
523 // Ref IEEE 802-2014 Section 8.2.2
524 macBuf[0] = (macBuf[0] | 2) & 0xfe
525 mac := net.HardwareAddr(macBuf)
526 return &mac, nil
527}
528
Serge Bazanskibe742842022-04-04 13:18:50 +0200529const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200530
Serge Bazanskibe742842022-04-04 13:18:50 +0200531// ClusterPorts contains all ports handled by Nanoswitch.
532var ClusterPorts = []uint16{
533 // Forwarded to the first node.
534 uint16(node.CuratorServicePort),
535 uint16(node.DebugServicePort),
536 uint16(node.KubernetesAPIPort),
537 uint16(node.KubernetesAPIWrappedPort),
538
539 // SOCKS proxy to the switch network
540 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200541}
542
543// ClusterOptions contains all options for launching a Metropolis cluster.
544type ClusterOptions struct {
545 // The number of nodes this cluster should be started with.
546 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100547
Jan Schära9b060b2024-08-07 10:42:29 +0200548 // Node are default options of all nodes.
549 Node NodeOptions
550
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100551 // If true, node logs will be saved to individual files instead of being printed
552 // out to stderr. The path of these files will be still printed to stdout.
553 //
554 // The files will be located within the launch directory inside TEST_TMPDIR (or
555 // the default tempdir location, if not set).
556 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200557
558 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
559 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
560 // incomplete.
561 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200562
563 // Optional local registry which will be made available to the cluster to
564 // pull images from. This is a more efficient alternative to preseeding all
565 // images used for testing.
566 LocalRegistry *localregistry.Server
Serge Bazanskie564f172024-04-03 12:06:06 +0200567
568 // InitialClusterConfiguration will be passed to the first node when creating the
569 // cluster, and defines some basic properties of the cluster. If not specified,
570 // the cluster will default to defaults as defined in
571 // metropolis.proto.api.NodeParameters.
572 InitialClusterConfiguration *cpb.ClusterConfiguration
Serge Bazanski66e58952021-10-05 17:06:56 +0200573}
574
575// Cluster is the running Metropolis cluster launched using the LaunchCluster
576// function.
577type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200578 // Owner is the TLS Certificate of the owner of the test cluster. This can be
579 // used to authenticate further clients to the running cluster.
580 Owner tls.Certificate
581 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200582 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200583 Ports launch.PortMap
584
Serge Bazanskibe742842022-04-04 13:18:50 +0200585 // Nodes is a map from Node ID to its runtime information.
586 Nodes map[string]*NodeInCluster
587 // NodeIDs is a list of node IDs that are backing this cluster, in order of
588 // creation.
589 NodeIDs []string
590
Serge Bazanski54e212a2023-06-14 13:45:11 +0200591 // CACertificate is the cluster's CA certificate.
592 CACertificate *x509.Certificate
593
Serge Bazanski66e58952021-10-05 17:06:56 +0200594 // nodesDone is a list of channels populated with the return codes from all the
595 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100596 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200597 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200598 // nodeOpts are the cluster member nodes' mutable launch options, kept here
599 // to facilitate reboots.
600 nodeOpts []NodeOptions
601 // launchDir points at the directory keeping the nodes' state, such as storage
602 // images, firmware variable files, TPM state.
603 launchDir string
604 // socketDir points at the directory keeping UNIX socket files, such as these
605 // used to facilitate communication between QEMU and swtpm. It's different
606 // from launchDir, and anchored nearer the file system root, due to the
607 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100608 socketDir string
609 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200610
Lorenz Brun276a7462023-07-12 21:28:54 +0200611 // SOCKSDialer is used by DialNode to establish connections to nodes via the
Serge Bazanskibe742842022-04-04 13:18:50 +0200612 // SOCKS server ran by nanoswitch.
Lorenz Brun276a7462023-07-12 21:28:54 +0200613 SOCKSDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200614
615 // authClient is a cached authenticated owner connection to a Curator
616 // instance within the cluster.
617 authClient *grpc.ClientConn
618
619 // ctxT is the context individual node contexts are created from.
620 ctxT context.Context
621 // ctxC is used by Close to cancel the context under which the nodes are
622 // running.
623 ctxC context.CancelFunc
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200624
625 tpmFactory *TPMFactory
Serge Bazanskibe742842022-04-04 13:18:50 +0200626}
627
628// NodeInCluster represents information about a node that's part of a Cluster.
629type NodeInCluster struct {
630 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200631 ID string
632 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200633 // Address of the node on the network ran by nanoswitch. Not reachable from the
634 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
635 // on Cluster.Ports[SOCKSPort]).
636 ManagementAddress string
637}
638
639// firstConnection performs the initial owner credential escrow with a newly
640// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
641// running at 10.1.0.2, which is always the case with the current nanoswitch
642// implementation.
643//
Leopold20a036e2023-01-15 00:17:19 +0100644// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200645// information as NodeInCluster.
646func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
647 // Dial external service.
648 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100649 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200650 if err != nil {
651 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
652 }
653 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
654 return socksDialer.Dial("tcp", addr)
655 }
656 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
657 if err != nil {
658 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
659 }
660 defer initClient.Close()
661
662 // Retrieve owner certificate - this can take a while because the node is still
663 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100664 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200665 aaa := apb.NewAAAClient(initClient)
666 var cert *tls.Certificate
667 err = backoff.Retry(func() error {
668 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
669 if st, ok := status.FromError(err); ok {
670 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100671 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200672 return err
673 }
674 }
675 return backoff.Permanent(err)
676 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
677 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200678 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200679 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100680 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200681
682 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200683 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200684 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
685 if err != nil {
686 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
687 }
688 defer authClient.Close()
689 mgmt := apb.NewManagementClient(authClient)
690
691 var node *NodeInCluster
692 err = backoff.Retry(func() error {
693 nodes, err := getNodes(ctx, mgmt)
694 if err != nil {
695 return fmt.Errorf("retrieving nodes failed: %w", err)
696 }
697 if len(nodes) != 1 {
698 return fmt.Errorf("expected one node, got %d", len(nodes))
699 }
700 n := nodes[0]
701 if n.Status == nil || n.Status.ExternalAddress == "" {
702 return fmt.Errorf("node has no status and/or address")
703 }
704 node = &NodeInCluster{
705 ID: identity.NodeID(n.Pubkey),
706 ManagementAddress: n.Status.ExternalAddress,
707 }
708 return nil
709 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
710 if err != nil {
711 return nil, nil, err
712 }
713
714 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200715}
716
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100717func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200718 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100719 if err != nil {
720 return nil, err
721 }
722 return f, nil
723}
724
Serge Bazanski66e58952021-10-05 17:06:56 +0200725// LaunchCluster launches a cluster of Metropolis node VMs together with a
726// Nanoswitch instance to network them all together.
727//
728// The given context will be used to run all qemu instances in the cluster, and
729// canceling the context or calling Close() will terminate them.
730func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200731 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200732 return nil, errors.New("refusing to start cluster with zero nodes")
733 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200734
Jan Schära9b060b2024-08-07 10:42:29 +0200735 // Prepare the node options. These will be kept as part of Cluster.
736 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
737 // launch. The runtime information can be later used to restart a node.
738 // The 0th node will be initialized first. The rest will follow after it
739 // had bootstrapped the cluster.
740 nodeOpts := make([]NodeOptions, opts.NumNodes)
741 for i := range opts.NumNodes {
742 nodeOpts[i] = opts.Node
743 nodeOpts[i].Name = fmt.Sprintf("node%d", i)
744 nodeOpts[i].SerialPort = newPrefixedStdio(i)
745 }
746 nodeOpts[0].NodeParameters = &apb.NodeParameters{
747 Cluster: &apb.NodeParameters_ClusterBootstrap_{
748 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
749 OwnerPublicKey: InsecurePublicKey,
750 InitialClusterConfiguration: opts.InitialClusterConfiguration,
751 Labels: &cpb.NodeLabels{
752 Pairs: []*cpb.NodeLabels_Pair{
753 {Key: nodeNumberKey, Value: "0"},
754 },
755 },
756 },
757 },
758 }
759 nodeOpts[0].PcapDump = true
760
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200761 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100762 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200763 if err != nil {
764 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
765 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100766 // Create the metroctl config directory. We keep it in /tmp because in some
767 // scenarios it's end-user visible and we want it short.
768 md, err := os.MkdirTemp("/tmp", "metroctl-*")
769 if err != nil {
770 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
771 }
772
773 // Create the socket directory. We keep it in /tmp because of socket path limits.
774 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200775 if err != nil {
776 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
777 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200778
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200779 // Set up TPM factory.
780 tpmf, err := NewTPMFactory(filepath.Join(ld, "tpm"))
781 if err != nil {
782 return nil, fmt.Errorf("failed to create TPM factory: %w", err)
783 }
784
Serge Bazanski66e58952021-10-05 17:06:56 +0200785 // Prepare links between nodes and nanoswitch.
786 var switchPorts []*os.File
Jan Schära9b060b2024-08-07 10:42:29 +0200787 for i := range opts.NumNodes {
Serge Bazanski66e58952021-10-05 17:06:56 +0200788 switchPort, vmPort, err := launch.NewSocketPair()
789 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200790 return nil, fmt.Errorf("failed to get socketpair: %w", err)
791 }
792 switchPorts = append(switchPorts, switchPort)
Jan Schära9b060b2024-08-07 10:42:29 +0200793 nodeOpts[i].ConnectToSocket = vmPort
Serge Bazanski66e58952021-10-05 17:06:56 +0200794 }
795
Serge Bazanskie78a0892021-10-07 17:03:49 +0200796 // Make a list of channels that will be populated by all running node qemu
797 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200798 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200799 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200800 done[i] = make(chan error, 1)
801 }
802
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100803 if opts.NodeLogsToFiles {
Jan Schära9b060b2024-08-07 10:42:29 +0200804 for i := range opts.NumNodes {
805 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i))
806 port, err := NewSerialFileLogger(path)
807 if err != nil {
808 return nil, fmt.Errorf("could not open log file for node %d: %w", i, err)
809 }
810 launch.Log("Node %d logs at %s", i, path)
811 nodeOpts[i].SerialPort = port
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100812 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100813 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200814
815 // Start the first node.
816 ctxT, ctxC := context.WithCancel(ctx)
Jan Schär0b927652024-07-31 18:08:50 +0200817 launch.Log("Cluster: Starting node %d...", 0)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200818 if err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[0], done[0]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200819 ctxC()
820 return nil, fmt.Errorf("failed to launch first node: %w", err)
821 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200822
Lorenz Brun150f24a2023-07-13 20:11:06 +0200823 localRegistryAddr := net.TCPAddr{
824 IP: net.IPv4(10, 42, 0, 82),
825 Port: 5000,
826 }
827
828 var guestSvcMap launch.GuestServiceMap
829 if opts.LocalRegistry != nil {
830 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
831 if err != nil {
832 ctxC()
833 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
834 }
835 s := http.Server{
836 Handler: opts.LocalRegistry,
837 }
838 go s.Serve(l)
839 go func() {
840 <-ctxT.Done()
841 s.Close()
842 }()
843 guestSvcMap = launch.GuestServiceMap{
844 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
845 }
846 }
847
Serge Bazanskie78a0892021-10-07 17:03:49 +0200848 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200849 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
850 if err != nil {
851 ctxC()
852 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
853 }
854
855 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100856 var serialPort io.ReadWriter
857 if opts.NodeLogsToFiles {
858 path := path.Join(ld, "nanoswitch.txt")
859 serialPort, err = NewSerialFileLogger(path)
860 if err != nil {
861 launch.Log("Could not open log file for nanoswitch: %v", err)
862 }
863 launch.Log("Nanoswitch logs at %s", path)
864 } else {
865 serialPort = newPrefixedStdio(99)
866 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200867 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100868 Name: "nanoswitch",
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000869 KernelPath: xKernelPath,
870 InitramfsPath: xInitramfsPath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200871 ExtraNetworkInterfaces: switchPorts,
872 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200873 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100874 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100875 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200876 }); err != nil {
877 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100878 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200879 }
880 }
881 }()
882
Serge Bazanskibe742842022-04-04 13:18:50 +0200883 // Build SOCKS dialer.
884 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
885 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200886 if err != nil {
887 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200888 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200889 }
890
Serge Bazanskibe742842022-04-04 13:18:50 +0200891 // Retrieve owner credentials and first node.
892 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200893 if err != nil {
894 ctxC()
895 return nil, err
896 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200897
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100898 // Write credentials to the metroctl directory.
899 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
900 ctxC()
901 return nil, fmt.Errorf("could not write owner key: %w", err)
902 }
903 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
904 ctxC()
905 return nil, fmt.Errorf("could not write owner certificate: %w", err)
906 }
907
Serge Bazanski53458ba2024-06-18 09:56:46 +0000908 launch.Log("Cluster: Node %d is %s", 0, firstNode.ID)
909
910 // Set up a partially initialized cluster instance, to be filled in the
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200911 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200912 cluster := &Cluster{
913 Owner: *cert,
914 Ports: portMap,
915 Nodes: map[string]*NodeInCluster{
916 firstNode.ID: firstNode,
917 },
918 NodeIDs: []string{
919 firstNode.ID,
920 },
921
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100922 nodesDone: done,
923 nodeOpts: nodeOpts,
924 launchDir: ld,
925 socketDir: sd,
926 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200927
Lorenz Brun276a7462023-07-12 21:28:54 +0200928 SOCKSDialer: socksDialer,
Serge Bazanskibe742842022-04-04 13:18:50 +0200929
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200930 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200931 ctxC: ctxC,
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200932
933 tpmFactory: tpmf,
Serge Bazanskibe742842022-04-04 13:18:50 +0200934 }
935
936 // Now start the rest of the nodes and register them into the cluster.
937
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200938 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200939 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200940 if err != nil {
941 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200942 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200943 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200944 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200945
946 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100947 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200948 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
949 if err != nil {
950 ctxC()
951 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
952 }
953 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100954 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200955
956 // Retrieve cluster info (for directory and ca public key) to register further
957 // nodes.
958 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
959 if err != nil {
960 ctxC()
961 return nil, fmt.Errorf("GetClusterInfo: %w", err)
962 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200963 caCert, err := x509.ParseCertificate(resI.CaCertificate)
964 if err != nil {
965 ctxC()
966 return nil, fmt.Errorf("ParseCertificate: %w", err)
967 }
968 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200969
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200970 // Use the retrieved information to configure the rest of the node options.
971 for i := 1; i < opts.NumNodes; i++ {
Jan Schära9b060b2024-08-07 10:42:29 +0200972 nodeOpts[i].NodeParameters = &apb.NodeParameters{
973 Cluster: &apb.NodeParameters_ClusterRegister_{
974 ClusterRegister: &apb.NodeParameters_ClusterRegister{
975 RegisterTicket: ticket,
976 ClusterDirectory: resI.ClusterDirectory,
977 CaCertificate: resI.CaCertificate,
978 Labels: &cpb.NodeLabels{
979 Pairs: []*cpb.NodeLabels_Pair{
980 {Key: nodeNumberKey, Value: fmt.Sprintf("%d", i)},
Serge Bazanski30e30b32024-05-22 14:11:56 +0200981 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200982 },
983 },
984 },
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100985 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200986 }
987
988 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200989 for i := 1; i < opts.NumNodes; i++ {
Jan Schär0b927652024-07-31 18:08:50 +0200990 launch.Log("Cluster: Starting node %d...", i)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200991 err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[i], done[i])
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200992 if err != nil {
Jan Schär0b927652024-07-31 18:08:50 +0200993 return nil, fmt.Errorf("failed to launch node %d: %w", i, err)
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200994 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200995 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200996
Serge Bazanski53458ba2024-06-18 09:56:46 +0000997 // Wait for nodes to appear as NEW, populate a map from node number (index into
Jan Schära9b060b2024-08-07 10:42:29 +0200998 // nodeOpts, etc.) to Metropolis Node ID.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200999 seenNodes := make(map[string]bool)
Serge Bazanski53458ba2024-06-18 09:56:46 +00001000 nodeNumberToID := make(map[int]string)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001001 launch.Log("Cluster: waiting for nodes to appear as NEW...")
1002 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001003 for {
1004 nodes, err := getNodes(ctx, mgmt)
1005 if err != nil {
1006 ctxC()
1007 return nil, fmt.Errorf("could not get nodes: %w", err)
1008 }
1009 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001010 if n.State != cpb.NodeState_NODE_STATE_NEW {
1011 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +02001012 }
Serge Bazanski87d9c592024-03-20 12:35:11 +01001013 if seenNodes[n.Id] {
1014 continue
1015 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001016 seenNodes[n.Id] = true
1017 cluster.Nodes[n.Id] = &NodeInCluster{
1018 ID: n.Id,
1019 Pubkey: n.Pubkey,
1020 }
Serge Bazanski53458ba2024-06-18 09:56:46 +00001021
1022 num, err := strconv.Atoi(node.GetNodeLabel(n.Labels, nodeNumberKey))
1023 if err != nil {
1024 return nil, fmt.Errorf("node %s has undecodable number label: %w", n.Id, err)
1025 }
1026 launch.Log("Cluster: Node %d is %s", num, n.Id)
1027 nodeNumberToID[num] = n.Id
Serge Bazanskie78a0892021-10-07 17:03:49 +02001028 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001029
1030 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001031 break
1032 }
1033 time.Sleep(1 * time.Second)
1034 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001035 }
1036 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001037
Serge Bazanski53458ba2024-06-18 09:56:46 +00001038 // Build the rest of NodeIDs from map.
1039 for i := 1; i < opts.NumNodes; i++ {
1040 cluster.NodeIDs = append(cluster.NodeIDs, nodeNumberToID[i])
1041 }
1042
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001043 approvedNodes := make(map[string]bool)
1044 upNodes := make(map[string]bool)
1045 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001046 for {
1047 nodes, err := getNodes(ctx, mgmt)
1048 if err != nil {
1049 ctxC()
1050 return nil, fmt.Errorf("could not get nodes: %w", err)
1051 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001052 for _, node := range nodes {
1053 if !seenNodes[node.Id] {
1054 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001055 continue
1056 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001057
1058 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
1059 launch.Log("Cluster: node %s is up", node.Id)
1060 upNodes[node.Id] = true
1061 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +02001062 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001063 if upNodes[node.Id] {
1064 continue
Serge Bazanskibe742842022-04-04 13:18:50 +02001065 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001066
1067 if !approvedNodes[node.Id] {
1068 launch.Log("Cluster: approving node %s", node.Id)
1069 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1070 Pubkey: node.Pubkey,
1071 })
1072 if err != nil {
1073 ctxC()
1074 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
1075 }
1076 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +02001077 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001078 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001079
Jan Schär0b927652024-07-31 18:08:50 +02001080 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes, len(upNodes)+1)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001081 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001082 break
1083 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001084 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001085 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001086 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001087
Serge Bazanski05f813b2023-03-16 17:58:39 +01001088 launch.Log("Cluster: all nodes up:")
Jan Schär0b927652024-07-31 18:08:50 +02001089 for i, nodeID := range cluster.NodeIDs {
1090 launch.Log("Cluster: %d. %s at %s", i, nodeID, cluster.Nodes[nodeID].ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001091 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001092 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001093
Serge Bazanskibe742842022-04-04 13:18:50 +02001094 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001095}
1096
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001097// RebootNode reboots the cluster member node matching the given index, and
1098// waits for it to rejoin the cluster. It will use the given context ctx to run
1099// cluster API requests, whereas the resulting QEMU process will be created
1100// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1101func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1102 if idx < 0 || idx >= len(c.NodeIDs) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001103 return fmt.Errorf("index out of bounds")
1104 }
1105 if c.nodeOpts[idx].Runtime == nil {
1106 return fmt.Errorf("node not running")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001107 }
1108 id := c.NodeIDs[idx]
1109
1110 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001111 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001112 if err != nil {
1113 return err
1114 }
1115 mgmt := apb.NewManagementClient(curC)
1116
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001117 // Cancel the node's context. This will shut down QEMU.
1118 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001119 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001120 err = <-c.nodesDone[idx]
1121 if err != nil {
1122 return fmt.Errorf("while restarting node: %w", err)
1123 }
1124
1125 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001126 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001127 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001128 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1129 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001130
Serge Bazanskibc969572024-03-21 11:56:13 +01001131 start := time.Now()
1132
1133 // Poll Management.GetNodes until the node is healthy.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001134 for {
1135 cs, err := getNode(ctx, mgmt, id)
1136 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001137 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001138 return err
1139 }
Serge Bazanskibc969572024-03-21 11:56:13 +01001140 launch.Log("Cluster: node health: %+v", cs.Health)
1141
1142 lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
1143 if lhb.After(start) && cs.Health == apb.Node_HEALTHY {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001144 break
1145 }
1146 time.Sleep(time.Second)
1147 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001148 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001149 return nil
1150}
1151
Serge Bazanski500f6e02024-04-03 12:06:40 +02001152// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
1153// given by idx. If the node is already shut down, this is a no-op.
1154func (c *Cluster) ShutdownNode(idx int) error {
1155 if idx < 0 || idx >= len(c.NodeIDs) {
1156 return fmt.Errorf("index out of bounds")
1157 }
1158 // Return if node is already stopped.
1159 select {
1160 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1161 return nil
1162 default:
1163 }
1164 id := c.NodeIDs[idx]
1165
1166 // Cancel the node's context. This will shut down QEMU.
1167 c.nodeOpts[idx].Runtime.CtxC()
1168 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
1169 err := <-c.nodesDone[idx]
1170 if err != nil {
1171 return fmt.Errorf("while shutting down node: %w", err)
1172 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001173 launch.Log("Cluster: node %d (%s) stopped.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001174 return nil
1175}
1176
1177// StartNode performs a power on of the node given by idx. If the node is already
1178// running, this is a no-op.
1179func (c *Cluster) StartNode(idx int) error {
1180 if idx < 0 || idx >= len(c.NodeIDs) {
1181 return fmt.Errorf("index out of bounds")
1182 }
1183 id := c.NodeIDs[idx]
1184 // Return if node is already running.
1185 select {
1186 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1187 default:
1188 return nil
1189 }
1190
1191 // Start QEMU again.
1192 launch.Log("Cluster: starting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001193 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanski500f6e02024-04-03 12:06:40 +02001194 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1195 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001196 launch.Log("Cluster: node %d (%s) started.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001197 return nil
1198}
1199
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001200// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001201// nodes to stop. It returns an error if stopping the nodes failed, or one of
1202// the nodes failed to fully start in the first place.
1203func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001204 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001205 if c.authClient != nil {
1206 c.authClient.Close()
1207 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001208 c.ctxC()
1209
Leopold20a036e2023-01-15 00:17:19 +01001210 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001211 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001212 for _, c := range c.nodesDone {
1213 err := <-c
1214 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001215 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001216 }
1217 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001218 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001219 os.RemoveAll(c.launchDir)
1220 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001221 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001222 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001223 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001224}
Serge Bazanskibe742842022-04-04 13:18:50 +02001225
1226// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1227// their ID. This is performed by connecting to the cluster nanoswitch via its
1228// SOCKS proxy, and using the cluster node list for name resolution.
1229//
1230// For example:
1231//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001232// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001233func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1234 host, port, err := net.SplitHostPort(addr)
1235 if err != nil {
1236 return nil, fmt.Errorf("invalid host:port: %w", err)
1237 }
1238 // Already an IP address?
1239 if net.ParseIP(host) != nil {
Lorenz Brun276a7462023-07-12 21:28:54 +02001240 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001241 }
1242
1243 // Otherwise, expect a node name.
1244 node, ok := c.Nodes[host]
1245 if !ok {
1246 return nil, fmt.Errorf("unknown node %q", host)
1247 }
1248 addr = net.JoinHostPort(node.ManagementAddress, port)
Lorenz Brun276a7462023-07-12 21:28:54 +02001249 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001250}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001251
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001252// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1253// Kubernetes authenticating proxy using the cluster owner identity.
1254// It currently has access to everything (i.e. the cluster-admin role)
1255// via the owner-admin binding.
1256func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1257 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1258 if err != nil {
1259 // We explicitly pass an Ed25519 private key in, so this can't happen
1260 panic(err)
1261 }
1262
1263 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001264 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001265 Host: host,
1266 TLSClientConfig: rest.TLSClientConfig{
1267 // TODO(q3k): use CA certificate
1268 Insecure: true,
1269 ServerName: "kubernetes.default.svc",
1270 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1271 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1272 },
1273 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1274 return c.DialNode(ctx, address)
1275 },
1276 }
1277 return kubernetes.NewForConfig(&clientConfig)
1278}
1279
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001280// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1281// which are currently Kubernetes controllers, ie. run an apiserver. This list
1282// might be empty if no node is currently configured with the
1283// 'KubernetesController' node.
1284func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1285 curC, err := c.CuratorClient()
1286 if err != nil {
1287 return nil, err
1288 }
1289 mgmt := apb.NewManagementClient(curC)
1290 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1291 Filter: "has(node.roles.kubernetes_controller)",
1292 })
1293 if err != nil {
1294 return nil, err
1295 }
1296 defer srv.CloseSend()
1297 var res []string
1298 for {
1299 n, err := srv.Recv()
1300 if err == io.EOF {
1301 break
1302 }
1303 if err != nil {
1304 return nil, err
1305 }
1306 if n.Status == nil || n.Status.ExternalAddress == "" {
1307 continue
1308 }
1309 res = append(res, n.Status.ExternalAddress)
1310 }
1311 return res, nil
1312}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001313
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001314// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1315// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001316func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1317 // Get an authenticated owner client within the cluster.
1318 curC, err := c.CuratorClient()
1319 if err != nil {
1320 return err
1321 }
1322 mgmt := apb.NewManagementClient(curC)
1323 nodes, err := getNodes(ctx, mgmt)
1324 if err != nil {
1325 return err
1326 }
1327
1328 var unhealthy []string
1329 for _, node := range nodes {
1330 if node.Health == apb.Node_HEALTHY {
1331 continue
1332 }
1333 unhealthy = append(unhealthy, node.Id)
1334 }
1335 if len(unhealthy) == 0 {
1336 return nil
1337 }
1338 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1339}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001340
1341// ApproveNode approves a node by ID, waiting for it to become UP.
1342func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1343 curC, err := c.CuratorClient()
1344 if err != nil {
1345 return err
1346 }
1347 mgmt := apb.NewManagementClient(curC)
1348
1349 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1350 Pubkey: c.Nodes[id].Pubkey,
1351 })
1352 if err != nil {
1353 return fmt.Errorf("ApproveNode: %w", err)
1354 }
1355 launch.Log("Cluster: %s: approved, waiting for UP", id)
1356 for {
1357 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1358 if err != nil {
1359 return fmt.Errorf("GetNodes: %w", err)
1360 }
1361 found := false
1362 for {
1363 node, err := nodes.Recv()
1364 if errors.Is(err, io.EOF) {
1365 break
1366 }
1367 if err != nil {
1368 return fmt.Errorf("Nodes.Recv: %w", err)
1369 }
1370 if node.Id != id {
1371 continue
1372 }
1373 if node.State != cpb.NodeState_NODE_STATE_UP {
1374 continue
1375 }
1376 found = true
1377 break
1378 }
1379 nodes.CloseSend()
1380
1381 if found {
1382 break
1383 }
1384 time.Sleep(time.Second)
1385 }
1386 launch.Log("Cluster: %s: UP", id)
1387 return nil
1388}
1389
1390// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1391func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1392 curC, err := c.CuratorClient()
1393 if err != nil {
1394 return err
1395 }
1396 mgmt := apb.NewManagementClient(curC)
1397
1398 tr := true
1399 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1400 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1401 Node: &apb.UpdateNodeRolesRequest_Id{
1402 Id: id,
1403 },
1404 KubernetesWorker: &tr,
1405 })
1406 return err
1407}
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001408
Jan Schära9b060b2024-08-07 10:42:29 +02001409// MakeKubernetesController adds the KubernetesController role to a node by ID.
1410func (c *Cluster) MakeKubernetesController(ctx context.Context, id string) error {
1411 curC, err := c.CuratorClient()
1412 if err != nil {
1413 return err
1414 }
1415 mgmt := apb.NewManagementClient(curC)
1416
1417 tr := true
1418 launch.Log("Cluster: %s: adding KubernetesController", id)
1419 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1420 Node: &apb.UpdateNodeRolesRequest_Id{
1421 Id: id,
1422 },
1423 KubernetesController: &tr,
1424 })
1425 return err
1426}
1427
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001428// MakeConsensusMember adds the ConsensusMember role to a node by ID.
1429func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
1430 curC, err := c.CuratorClient()
1431 if err != nil {
1432 return err
1433 }
1434 mgmt := apb.NewManagementClient(curC)
1435 cur := ipb.NewCuratorClient(curC)
1436
1437 tr := true
1438 launch.Log("Cluster: %s: adding ConsensusMember", id)
1439 bo := backoff.NewExponentialBackOff()
1440 bo.MaxElapsedTime = 10 * time.Second
1441
1442 backoff.Retry(func() error {
1443 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1444 Node: &apb.UpdateNodeRolesRequest_Id{
1445 Id: id,
1446 },
1447 ConsensusMember: &tr,
1448 })
1449 if err != nil {
1450 launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
1451 }
1452 return err
1453 }, backoff.WithContext(bo, ctx))
1454 if err != nil {
1455 return err
1456 }
1457
1458 launch.Log("Cluster: %s: waiting for learner/full members...", id)
1459
1460 learner := false
1461 for {
1462 res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
1463 if err != nil {
1464 return fmt.Errorf("GetConsensusStatus: %w", err)
1465 }
1466 for _, member := range res.EtcdMember {
1467 if member.Id != id {
1468 continue
1469 }
1470 switch member.Status {
1471 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
1472 if !learner {
1473 learner = true
1474 launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
1475 }
1476 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
1477 launch.Log("Cluster: %s: became a full member", id)
1478 return nil
1479 }
1480 }
1481 time.Sleep(100 * time.Millisecond)
1482 }
1483}