blob: 4cbfedefe69c29862c1f35e9aef5bfc752746878 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020019 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "os"
21 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010022 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "path/filepath"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020024 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020025 "syscall"
26 "time"
27
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +010028 "github.com/bazelbuild/rules_go/go/runfiles"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020031 "golang.org/x/net/proxy"
Lorenz Brun87bbf7e2024-03-18 18:22:25 +010032 "golang.org/x/sys/unix"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010034 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020037 "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020039
Serge Bazanski37cfcc12024-03-21 11:59:07 +010040 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020041 apb "source.monogon.dev/metropolis/proto/api"
42 cpb "source.monogon.dev/metropolis/proto/common"
43
Serge Bazanskidd5b03c2024-05-16 18:07:06 +020044 "source.monogon.dev/go/qcow2"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010045 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Serge Bazanski66e58952021-10-05 17:06:56 +020046 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020047 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020048 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020049 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Lorenz Brun150f24a2023-07-13 20:11:06 +020050 "source.monogon.dev/metropolis/pkg/localregistry"
Serge Bazanski66e58952021-10-05 17:06:56 +020051 "source.monogon.dev/metropolis/test/launch"
52)
53
Leopold20a036e2023-01-15 00:17:19 +010054// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020055type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010056 // Name is a human-readable identifier to be used in debug output.
57 Name string
58
Serge Bazanski66e58952021-10-05 17:06:56 +020059 // Ports contains the port mapping where to expose the internal ports of the VM to
60 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
61 // ConnectToSocket is set.
62 Ports launch.PortMap
63
Leopold20a036e2023-01-15 00:17:19 +010064 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
65 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020066 // want to test reboot behavior this should be false.
67 AllowReboot bool
68
Leopold20a036e2023-01-15 00:17:19 +010069 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
70 // set, it is instead connected to the given file descriptor/socket. If this is
71 // set, all port maps from the Ports option are ignored. Intended for networking
72 // this instance together with others for running more complex network
73 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020074 ConnectToSocket *os.File
75
Leopoldacfad5b2023-01-15 14:05:25 +010076 // When PcapDump is set, all traffic is dumped to a pcap file in the
77 // runtime directory (e.g. "net0.pcap" for the first interface).
78 PcapDump bool
79
Leopold20a036e2023-01-15 00:17:19 +010080 // SerialPort is an io.ReadWriter over which you can communicate with the serial
81 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020082 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
83 SerialPort io.ReadWriter
84
85 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
86 // registering into a cluster.
87 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020088
89 // Mac is the node's MAC address.
90 Mac *net.HardwareAddr
91
92 // Runtime keeps the node's QEMU runtime state.
93 Runtime *NodeRuntime
94}
95
Leopold20a036e2023-01-15 00:17:19 +010096// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020097type NodeRuntime struct {
98 // ld points at the node's launch directory storing data such as storage
99 // images, firmware variables or the TPM state.
100 ld string
101 // sd points at the node's socket directory.
102 sd string
103
104 // ctxT is the context QEMU will execute in.
105 ctxT context.Context
106 // CtxC is the QEMU context's cancellation function.
107 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200108}
109
110// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200111var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200112 node.ConsensusPort,
113
114 node.CuratorServicePort,
115 node.DebugServicePort,
116
117 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100118 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200119 node.CuratorServicePort,
120 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200121 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200122}
123
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200124// setupRuntime creates the node's QEMU runtime directory, together with all
125// files required to preserve its state, a level below the chosen path ld. The
126// node's socket directory is similarily created a level below sd. It may
127// return an I/O error.
128func setupRuntime(ld, sd string) (*NodeRuntime, error) {
129 // Create a temporary directory to keep all the runtime files.
130 stdp, err := os.MkdirTemp(ld, "node_state*")
131 if err != nil {
132 return nil, fmt.Errorf("failed to create the state directory: %w", err)
133 }
134
135 // Initialize the node's storage with a prebuilt image.
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100136 si, err := runfiles.Rlocation("_main/metropolis/node/image.img")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200137 if err != nil {
138 return nil, fmt.Errorf("while resolving a path: %w", err)
139 }
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200140
141 di := filepath.Join(stdp, "image.qcow2")
142 launch.Log("Cluster: generating node QCOW2 snapshot image: %s -> %s", si, di)
143
144 df, err := os.Create(di)
145 if err != nil {
146 return nil, fmt.Errorf("while opening image for writing: %w", err)
147 }
148 defer df.Close()
149 if err := qcow2.Generate(df, qcow2.GenerateWithBackingFile(si)); err != nil {
150 return nil, fmt.Errorf("while creating copy-on-write node image: %w", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200151 }
152
153 // Initialize the OVMF firmware variables file.
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100154 sv, err := runfiles.Rlocation("edk2/OVMF_VARS.fd")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200155 if err != nil {
156 return nil, fmt.Errorf("while resolving a path: %w", err)
157 }
158 dv := filepath.Join(stdp, filepath.Base(sv))
159 if err := copyFile(sv, dv); err != nil {
160 return nil, fmt.Errorf("while copying firmware variables: %w", err)
161 }
162
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200163 // Create the socket directory.
164 sotdp, err := os.MkdirTemp(sd, "node_sock*")
165 if err != nil {
166 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
167 }
168
169 return &NodeRuntime{
170 ld: stdp,
171 sd: sotdp,
172 }, nil
173}
174
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200175// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200176// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200177func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200178 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200179 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200180 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100181 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200182 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200183 for _, n := range c.NodeIDs {
184 ep, err := resolver.NodeWithDefaultPort(n)
185 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200186 return nil, fmt.Errorf("could not add node %q by DNS: %w", n, err)
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200187 }
188 r.AddEndpoint(ep)
189 }
190 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
191 grpc.WithTransportCredentials(authCreds),
192 grpc.WithResolvers(r),
193 grpc.WithContextDialer(c.DialNode),
194 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200195 if err != nil {
196 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
197 }
198 c.authClient = authClient
199 }
200 return c.authClient, nil
201}
202
Serge Bazanski66e58952021-10-05 17:06:56 +0200203// LaunchNode launches a single Metropolis node instance with the given options.
204// The instance runs mostly paravirtualized but with some emulated hardware
205// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200206// writable, and the changes are kept across reboots and shutdowns. ld and sd
207// point to the launch directory and the socket directory, holding the nodes'
208// state files (storage, tpm state, firmware state), and UNIX socket files
209// (swtpm <-> QEMU interplay) respectively. The directories must exist before
210// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
211// if either are not initialized.
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200212func LaunchNode(ctx context.Context, ld, sd string, tpmFactory *TPMFactory, options *NodeOptions, doneC chan error) error {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200213 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
214 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200215 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
216 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
217 // that we open and pass the name of it to QEMU. Not pinning this crashes both
218 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
219 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200220
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200221 // If it's the node's first start, set up its runtime directories.
222 if options.Runtime == nil {
223 r, err := setupRuntime(ld, sd)
224 if err != nil {
225 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200226 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200227 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200228 }
229
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200230 // Replace the node's context with a new one.
231 r := options.Runtime
232 if r.CtxC != nil {
233 r.CtxC()
234 }
235 r.ctxT, r.CtxC = context.WithCancel(ctx)
236
Serge Bazanski66e58952021-10-05 17:06:56 +0200237 var qemuNetType string
238 var qemuNetConfig launch.QemuValue
239 if options.ConnectToSocket != nil {
240 qemuNetType = "socket"
241 qemuNetConfig = launch.QemuValue{
242 "id": {"net0"},
243 "fd": {"3"},
244 }
245 } else {
246 qemuNetType = "user"
247 qemuNetConfig = launch.QemuValue{
248 "id": {"net0"},
249 "net": {"10.42.0.0/24"},
250 "dhcpstart": {"10.42.0.10"},
251 "hostfwd": options.Ports.ToQemuForwards(),
252 }
253 }
254
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200255 // Generate the node's MAC address if it isn't already set in NodeOptions.
256 if options.Mac == nil {
257 mac, err := generateRandomEthernetMAC()
258 if err != nil {
259 return err
260 }
261 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200262 }
263
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100264 ovmfCodePath, err := runfiles.Rlocation("edk2/OVMF_CODE.fd")
265 if err != nil {
266 return err
267 }
268
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200269 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
270 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200271 storagePath := filepath.Join(r.ld, "image.qcow2")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200272 qemuArgs := []string{
Serge Bazanski99b02142024-04-17 16:33:28 +0200273 "-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "2048",
Serge Bazanski66e58952021-10-05 17:06:56 +0200274 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100275 "-drive", "if=pflash,format=raw,readonly=on,file=" + ovmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200276 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200277 "-drive", "if=virtio,format=qcow2,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200278 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200279 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200280 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
281 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
282 "-device", "tpm-tis,tpmdev=tpm0",
283 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200284 "-serial", "stdio",
285 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200286
287 if !options.AllowReboot {
288 qemuArgs = append(qemuArgs, "-no-reboot")
289 }
290
291 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200292 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200293 parametersRaw, err := proto.Marshal(options.NodeParameters)
294 if err != nil {
295 return fmt.Errorf("failed to encode node paraeters: %w", err)
296 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200297 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200298 return fmt.Errorf("failed to write node parameters: %w", err)
299 }
300 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
301 }
302
Leopoldacfad5b2023-01-15 14:05:25 +0100303 if options.PcapDump {
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200304 qemuNetDump := launch.QemuValue{
305 "id": {"net0"},
306 "netdev": {"net0"},
307 "file": {filepath.Join(r.ld, "net0.pcap")},
Leopoldacfad5b2023-01-15 14:05:25 +0100308 }
309 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
310 }
311
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200312 // Manufacture TPM if needed.
313 tpmd := filepath.Join(r.ld, "tpm")
314 err = tpmFactory.Manufacture(ctx, tpmd, &TPMPlatform{
315 Manufacturer: "Monogon",
316 Version: "1.0",
317 Model: "TestCluster",
318 })
319 if err != nil {
320 return fmt.Errorf("could not manufacture TPM: %w", err)
321 }
322
Serge Bazanski66e58952021-10-05 17:06:56 +0200323 // Start TPM emulator as a subprocess
Serge Bazanskib07c57a2024-06-04 14:33:27 +0000324 swtpm, err := runfiles.Rlocation("swtpm/swtpm")
325 if err != nil {
326 return fmt.Errorf("could not find swtpm: %w", err)
327 }
328
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200329 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200330
Serge Bazanskib07c57a2024-06-04 14:33:27 +0000331 tpmEmuCmd := exec.CommandContext(tpmCtx, swtpm, "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
332 // Silence warnings from unsafe libtpms build (uses non-constant-time
333 // cryptographic operations).
334 tpmEmuCmd.Env = append(tpmEmuCmd.Env, "MONOGON_LIBTPMS_ACKNOWLEDGE_UNSAFE=yes")
Serge Bazanski66e58952021-10-05 17:06:56 +0200335 tpmEmuCmd.Stderr = os.Stderr
336 tpmEmuCmd.Stdout = os.Stdout
337
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100338 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200339 if err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200340 tpmCancel()
Serge Bazanski66e58952021-10-05 17:06:56 +0200341 return fmt.Errorf("failed to start TPM emulator: %w", err)
342 }
343
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200344 // Wait for the socket to be created by the TPM emulator before launching
345 // QEMU.
346 for {
347 _, err := os.Stat(tpmSocketPath)
348 if err == nil {
349 break
350 }
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200351 if !os.IsNotExist(err) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200352 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200353 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
354 }
355 if err := tpmCtx.Err(); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200356 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200357 return fmt.Errorf("while waiting for the TPM socket: %w", err)
358 }
359 time.Sleep(time.Millisecond * 100)
360 }
361
Serge Bazanski66e58952021-10-05 17:06:56 +0200362 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200363 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200364 if options.ConnectToSocket != nil {
365 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
366 }
367
368 var stdErrBuf bytes.Buffer
369 systemCmd.Stderr = &stdErrBuf
370 systemCmd.Stdout = options.SerialPort
371
Leopoldaf5086b2023-01-15 14:12:42 +0100372 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
373
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200374 go func() {
375 launch.Log("Node: Starting...")
376 err = systemCmd.Run()
377 launch.Log("Node: Returned: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200378
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200379 // Stop TPM emulator and wait for it to exit to properly reap the child process
380 tpmCancel()
381 launch.Log("Node: Waiting for TPM emulator to exit")
382 // Wait returns a SIGKILL error because we just cancelled its context.
383 // We still need to call it to avoid creating zombies.
384 errTpm := tpmEmuCmd.Wait()
385 launch.Log("Node: TPM emulator done: %v", errTpm)
Serge Bazanski66e58952021-10-05 17:06:56 +0200386
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200387 var exerr *exec.ExitError
388 if err != nil && errors.As(err, &exerr) {
389 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
390 if status.Signaled() && status.Signal() == syscall.SIGKILL {
391 // Process was killed externally (most likely by our context being canceled).
392 // This is a normal exit for us, so return nil
393 doneC <- nil
394 return
395 }
396 exerr.Stderr = stdErrBuf.Bytes()
397 newErr := launch.QEMUError(*exerr)
398 launch.Log("Node: %q", stdErrBuf.String())
399 doneC <- &newErr
400 return
Serge Bazanski66e58952021-10-05 17:06:56 +0200401 }
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200402 doneC <- err
403 }()
404 return nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200405}
406
407func copyFile(src, dst string) error {
408 in, err := os.Open(src)
409 if err != nil {
410 return fmt.Errorf("when opening source: %w", err)
411 }
412 defer in.Close()
413
414 out, err := os.Create(dst)
415 if err != nil {
416 return fmt.Errorf("when creating destination: %w", err)
417 }
418 defer out.Close()
419
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100420 endPos, err := in.Seek(0, io.SeekEnd)
Serge Bazanski66e58952021-10-05 17:06:56 +0200421 if err != nil {
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100422 return fmt.Errorf("when getting source end: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200423 }
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100424
425 // Copy the file while preserving its sparseness. The image files are very
426 // sparse (less than 10% allocated), so this is a lot faster.
427 var lastHoleStart int64
428 for {
429 dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
430 if err != nil {
431 return fmt.Errorf("when seeking to next data block: %w", err)
432 }
433 holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
434 if err != nil {
435 return fmt.Errorf("when seeking to next hole: %w", err)
436 }
437 lastHoleStart = holeStart
438 if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
439 return fmt.Errorf("when seeking to current data block: %w", err)
440 }
441 if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
442 return fmt.Errorf("when seeking output to next data block: %w", err)
443 }
444 if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
445 return fmt.Errorf("when copying file: %w", err)
446 }
447 if endPos == holeStart {
448 // The next hole is at the end of the file, we're done here.
449 break
450 }
451 }
452
Serge Bazanski66e58952021-10-05 17:06:56 +0200453 return out.Close()
454}
455
Serge Bazanskie78a0892021-10-07 17:03:49 +0200456// getNodes wraps around Management.GetNodes to return a list of nodes in a
457// cluster.
458func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200459 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100460 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100461 err := backoff.Retry(func() error {
462 res = nil
463 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200464 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100465 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200466 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100467 for {
468 node, err := srvN.Recv()
469 if err == io.EOF {
470 break
471 }
472 if err != nil {
473 return fmt.Errorf("GetNodes.Recv: %w", err)
474 }
475 res = append(res, node)
476 }
477 return nil
478 }, bo)
479 if err != nil {
480 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200481 }
482 return res, nil
483}
484
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200485// getNode wraps Management.GetNodes. It returns node information matching
486// given node ID.
487func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
488 nodes, err := getNodes(ctx, mgmt)
489 if err != nil {
490 return nil, fmt.Errorf("could not get nodes: %w", err)
491 }
492 for _, n := range nodes {
493 eid := identity.NodeID(n.Pubkey)
494 if eid != id {
495 continue
496 }
497 return n, nil
498 }
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200499 return nil, fmt.Errorf("no such node")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200500}
501
Serge Bazanski66e58952021-10-05 17:06:56 +0200502// Gets a random EUI-48 Ethernet MAC address
503func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
504 macBuf := make([]byte, 6)
505 _, err := rand.Read(macBuf)
506 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200507 return nil, fmt.Errorf("failed to read randomness for MAC: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200508 }
509
510 // Set U/L bit and clear I/G bit (locally administered individual MAC)
511 // Ref IEEE 802-2014 Section 8.2.2
512 macBuf[0] = (macBuf[0] | 2) & 0xfe
513 mac := net.HardwareAddr(macBuf)
514 return &mac, nil
515}
516
Serge Bazanskibe742842022-04-04 13:18:50 +0200517const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200518
Serge Bazanskibe742842022-04-04 13:18:50 +0200519// ClusterPorts contains all ports handled by Nanoswitch.
520var ClusterPorts = []uint16{
521 // Forwarded to the first node.
522 uint16(node.CuratorServicePort),
523 uint16(node.DebugServicePort),
524 uint16(node.KubernetesAPIPort),
525 uint16(node.KubernetesAPIWrappedPort),
526
527 // SOCKS proxy to the switch network
528 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200529}
530
531// ClusterOptions contains all options for launching a Metropolis cluster.
532type ClusterOptions struct {
533 // The number of nodes this cluster should be started with.
534 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100535
536 // If true, node logs will be saved to individual files instead of being printed
537 // out to stderr. The path of these files will be still printed to stdout.
538 //
539 // The files will be located within the launch directory inside TEST_TMPDIR (or
540 // the default tempdir location, if not set).
541 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200542
543 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
544 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
545 // incomplete.
546 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200547
548 // Optional local registry which will be made available to the cluster to
549 // pull images from. This is a more efficient alternative to preseeding all
550 // images used for testing.
551 LocalRegistry *localregistry.Server
Serge Bazanskie564f172024-04-03 12:06:06 +0200552
553 // InitialClusterConfiguration will be passed to the first node when creating the
554 // cluster, and defines some basic properties of the cluster. If not specified,
555 // the cluster will default to defaults as defined in
556 // metropolis.proto.api.NodeParameters.
557 InitialClusterConfiguration *cpb.ClusterConfiguration
Serge Bazanski66e58952021-10-05 17:06:56 +0200558}
559
560// Cluster is the running Metropolis cluster launched using the LaunchCluster
561// function.
562type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200563 // Owner is the TLS Certificate of the owner of the test cluster. This can be
564 // used to authenticate further clients to the running cluster.
565 Owner tls.Certificate
566 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200567 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200568 Ports launch.PortMap
569
Serge Bazanskibe742842022-04-04 13:18:50 +0200570 // Nodes is a map from Node ID to its runtime information.
571 Nodes map[string]*NodeInCluster
572 // NodeIDs is a list of node IDs that are backing this cluster, in order of
573 // creation.
574 NodeIDs []string
575
Serge Bazanski54e212a2023-06-14 13:45:11 +0200576 // CACertificate is the cluster's CA certificate.
577 CACertificate *x509.Certificate
578
Serge Bazanski66e58952021-10-05 17:06:56 +0200579 // nodesDone is a list of channels populated with the return codes from all the
580 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100581 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200582 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200583 // nodeOpts are the cluster member nodes' mutable launch options, kept here
584 // to facilitate reboots.
585 nodeOpts []NodeOptions
586 // launchDir points at the directory keeping the nodes' state, such as storage
587 // images, firmware variable files, TPM state.
588 launchDir string
589 // socketDir points at the directory keeping UNIX socket files, such as these
590 // used to facilitate communication between QEMU and swtpm. It's different
591 // from launchDir, and anchored nearer the file system root, due to the
592 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100593 socketDir string
594 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200595
Lorenz Brun276a7462023-07-12 21:28:54 +0200596 // SOCKSDialer is used by DialNode to establish connections to nodes via the
Serge Bazanskibe742842022-04-04 13:18:50 +0200597 // SOCKS server ran by nanoswitch.
Lorenz Brun276a7462023-07-12 21:28:54 +0200598 SOCKSDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200599
600 // authClient is a cached authenticated owner connection to a Curator
601 // instance within the cluster.
602 authClient *grpc.ClientConn
603
604 // ctxT is the context individual node contexts are created from.
605 ctxT context.Context
606 // ctxC is used by Close to cancel the context under which the nodes are
607 // running.
608 ctxC context.CancelFunc
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200609
610 tpmFactory *TPMFactory
Serge Bazanskibe742842022-04-04 13:18:50 +0200611}
612
613// NodeInCluster represents information about a node that's part of a Cluster.
614type NodeInCluster struct {
615 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200616 ID string
617 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200618 // Address of the node on the network ran by nanoswitch. Not reachable from the
619 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
620 // on Cluster.Ports[SOCKSPort]).
621 ManagementAddress string
622}
623
624// firstConnection performs the initial owner credential escrow with a newly
625// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
626// running at 10.1.0.2, which is always the case with the current nanoswitch
627// implementation.
628//
Leopold20a036e2023-01-15 00:17:19 +0100629// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200630// information as NodeInCluster.
631func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
632 // Dial external service.
633 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100634 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200635 if err != nil {
636 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
637 }
638 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
639 return socksDialer.Dial("tcp", addr)
640 }
641 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
642 if err != nil {
643 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
644 }
645 defer initClient.Close()
646
647 // Retrieve owner certificate - this can take a while because the node is still
648 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100649 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200650 aaa := apb.NewAAAClient(initClient)
651 var cert *tls.Certificate
652 err = backoff.Retry(func() error {
653 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
654 if st, ok := status.FromError(err); ok {
655 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100656 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200657 return err
658 }
659 }
660 return backoff.Permanent(err)
661 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
662 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200663 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200664 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100665 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200666
667 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200668 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200669 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
670 if err != nil {
671 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
672 }
673 defer authClient.Close()
674 mgmt := apb.NewManagementClient(authClient)
675
676 var node *NodeInCluster
677 err = backoff.Retry(func() error {
678 nodes, err := getNodes(ctx, mgmt)
679 if err != nil {
680 return fmt.Errorf("retrieving nodes failed: %w", err)
681 }
682 if len(nodes) != 1 {
683 return fmt.Errorf("expected one node, got %d", len(nodes))
684 }
685 n := nodes[0]
686 if n.Status == nil || n.Status.ExternalAddress == "" {
687 return fmt.Errorf("node has no status and/or address")
688 }
689 node = &NodeInCluster{
690 ID: identity.NodeID(n.Pubkey),
691 ManagementAddress: n.Status.ExternalAddress,
692 }
693 return nil
694 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
695 if err != nil {
696 return nil, nil, err
697 }
698
699 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200700}
701
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100702func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200703 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100704 if err != nil {
705 return nil, err
706 }
707 return f, nil
708}
709
Serge Bazanski66e58952021-10-05 17:06:56 +0200710// LaunchCluster launches a cluster of Metropolis node VMs together with a
711// Nanoswitch instance to network them all together.
712//
713// The given context will be used to run all qemu instances in the cluster, and
714// canceling the context or calling Close() will terminate them.
715func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200716 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200717 return nil, errors.New("refusing to start cluster with zero nodes")
718 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200719
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200720 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100721 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200722 if err != nil {
723 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
724 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100725 // Create the metroctl config directory. We keep it in /tmp because in some
726 // scenarios it's end-user visible and we want it short.
727 md, err := os.MkdirTemp("/tmp", "metroctl-*")
728 if err != nil {
729 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
730 }
731
732 // Create the socket directory. We keep it in /tmp because of socket path limits.
733 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200734 if err != nil {
735 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
736 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200737
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200738 // Set up TPM factory.
739 tpmf, err := NewTPMFactory(filepath.Join(ld, "tpm"))
740 if err != nil {
741 return nil, fmt.Errorf("failed to create TPM factory: %w", err)
742 }
743
Serge Bazanski66e58952021-10-05 17:06:56 +0200744 // Prepare links between nodes and nanoswitch.
745 var switchPorts []*os.File
746 var vmPorts []*os.File
747 for i := 0; i < opts.NumNodes; i++ {
748 switchPort, vmPort, err := launch.NewSocketPair()
749 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200750 return nil, fmt.Errorf("failed to get socketpair: %w", err)
751 }
752 switchPorts = append(switchPorts, switchPort)
753 vmPorts = append(vmPorts, vmPort)
754 }
755
Serge Bazanskie78a0892021-10-07 17:03:49 +0200756 // Make a list of channels that will be populated by all running node qemu
757 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200758 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200759 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200760 done[i] = make(chan error, 1)
761 }
762
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200763 // Prepare the node options. These will be kept as part of Cluster.
764 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
765 // launch. The runtime information can be later used to restart a node.
766 // The 0th node will be initialized first. The rest will follow after it
767 // had bootstrapped the cluster.
768 nodeOpts := make([]NodeOptions, opts.NumNodes)
769 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100770 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200771 ConnectToSocket: vmPorts[0],
772 NodeParameters: &apb.NodeParameters{
773 Cluster: &apb.NodeParameters_ClusterBootstrap_{
774 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
Serge Bazanskie564f172024-04-03 12:06:06 +0200775 OwnerPublicKey: InsecurePublicKey,
776 InitialClusterConfiguration: opts.InitialClusterConfiguration,
Serge Bazanski11198c82024-05-22 14:11:01 +0200777 Labels: &cpb.NodeLabels{
778 Pairs: []*cpb.NodeLabels_Pair{
779 {Key: "test-node-id", Value: "0"},
780 },
781 },
Serge Bazanski66e58952021-10-05 17:06:56 +0200782 },
783 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200784 },
785 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100786 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200787 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100788 if opts.NodeLogsToFiles {
789 path := path.Join(ld, "node-1.txt")
790 port, err := NewSerialFileLogger(path)
791 if err != nil {
792 return nil, fmt.Errorf("could not open log file for node 1: %w", err)
793 }
794 launch.Log("Node 1 logs at %s", path)
795 nodeOpts[0].SerialPort = port
796 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200797
798 // Start the first node.
799 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100800 launch.Log("Cluster: Starting node %d...", 1)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200801 if err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[0], done[0]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200802 ctxC()
803 return nil, fmt.Errorf("failed to launch first node: %w", err)
804 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200805
Lorenz Brun150f24a2023-07-13 20:11:06 +0200806 localRegistryAddr := net.TCPAddr{
807 IP: net.IPv4(10, 42, 0, 82),
808 Port: 5000,
809 }
810
811 var guestSvcMap launch.GuestServiceMap
812 if opts.LocalRegistry != nil {
813 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
814 if err != nil {
815 ctxC()
816 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
817 }
818 s := http.Server{
819 Handler: opts.LocalRegistry,
820 }
821 go s.Serve(l)
822 go func() {
823 <-ctxT.Done()
824 s.Close()
825 }()
826 guestSvcMap = launch.GuestServiceMap{
827 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
828 }
829 }
830
Serge Bazanskie78a0892021-10-07 17:03:49 +0200831 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200832 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
833 if err != nil {
834 ctxC()
835 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
836 }
837
838 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100839 var serialPort io.ReadWriter
840 if opts.NodeLogsToFiles {
841 path := path.Join(ld, "nanoswitch.txt")
842 serialPort, err = NewSerialFileLogger(path)
843 if err != nil {
844 launch.Log("Could not open log file for nanoswitch: %v", err)
845 }
846 launch.Log("Nanoswitch logs at %s", path)
847 } else {
848 serialPort = newPrefixedStdio(99)
849 }
Serge Bazanskie84726b2024-04-17 16:32:32 +0200850 kernelPath, err := runfiles.Rlocation("_main/metropolis/test/ktest/vmlinux")
851 if err != nil {
852 launch.Fatal("Failed to resolved nanoswitch kernel: %v", err)
853 }
854 initramfsPath, err := runfiles.Rlocation("_main/metropolis/test/nanoswitch/initramfs.cpio.zst")
855 if err != nil {
856 launch.Fatal("Failed to resolved nanoswitch initramfs: %v", err)
857 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200858 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100859 Name: "nanoswitch",
Serge Bazanskie84726b2024-04-17 16:32:32 +0200860 KernelPath: kernelPath,
861 InitramfsPath: initramfsPath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200862 ExtraNetworkInterfaces: switchPorts,
863 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200864 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100865 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100866 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200867 }); err != nil {
868 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100869 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200870 }
871 }
872 }()
873
Serge Bazanskibe742842022-04-04 13:18:50 +0200874 // Build SOCKS dialer.
875 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
876 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200877 if err != nil {
878 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200879 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200880 }
881
Serge Bazanskibe742842022-04-04 13:18:50 +0200882 // Retrieve owner credentials and first node.
883 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200884 if err != nil {
885 ctxC()
886 return nil, err
887 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200888
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100889 // Write credentials to the metroctl directory.
890 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
891 ctxC()
892 return nil, fmt.Errorf("could not write owner key: %w", err)
893 }
894 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
895 ctxC()
896 return nil, fmt.Errorf("could not write owner certificate: %w", err)
897 }
898
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200899 // Set up a partially initialized cluster instance, to be filled in in the
900 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200901 cluster := &Cluster{
902 Owner: *cert,
903 Ports: portMap,
904 Nodes: map[string]*NodeInCluster{
905 firstNode.ID: firstNode,
906 },
907 NodeIDs: []string{
908 firstNode.ID,
909 },
910
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100911 nodesDone: done,
912 nodeOpts: nodeOpts,
913 launchDir: ld,
914 socketDir: sd,
915 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200916
Lorenz Brun276a7462023-07-12 21:28:54 +0200917 SOCKSDialer: socksDialer,
Serge Bazanskibe742842022-04-04 13:18:50 +0200918
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200919 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200920 ctxC: ctxC,
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200921
922 tpmFactory: tpmf,
Serge Bazanskibe742842022-04-04 13:18:50 +0200923 }
924
925 // Now start the rest of the nodes and register them into the cluster.
926
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200927 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200928 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200929 if err != nil {
930 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200931 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200932 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200933 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200934
935 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100936 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200937 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
938 if err != nil {
939 ctxC()
940 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
941 }
942 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100943 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200944
945 // Retrieve cluster info (for directory and ca public key) to register further
946 // nodes.
947 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
948 if err != nil {
949 ctxC()
950 return nil, fmt.Errorf("GetClusterInfo: %w", err)
951 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200952 caCert, err := x509.ParseCertificate(resI.CaCertificate)
953 if err != nil {
954 ctxC()
955 return nil, fmt.Errorf("ParseCertificate: %w", err)
956 }
957 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200958
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200959 // Use the retrieved information to configure the rest of the node options.
960 for i := 1; i < opts.NumNodes; i++ {
961 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100962 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200963 ConnectToSocket: vmPorts[i],
964 NodeParameters: &apb.NodeParameters{
965 Cluster: &apb.NodeParameters_ClusterRegister_{
966 ClusterRegister: &apb.NodeParameters_ClusterRegister{
967 RegisterTicket: ticket,
968 ClusterDirectory: resI.ClusterDirectory,
969 CaCertificate: resI.CaCertificate,
Serge Bazanski30e30b32024-05-22 14:11:56 +0200970 Labels: &cpb.NodeLabels{
971 Pairs: []*cpb.NodeLabels_Pair{
972 {Key: "test-node-id", Value: fmt.Sprintf("%d", i)},
973 },
974 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200975 },
976 },
977 },
978 SerialPort: newPrefixedStdio(i),
979 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100980 if opts.NodeLogsToFiles {
981 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
982 port, err := NewSerialFileLogger(path)
983 if err != nil {
984 return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
985 }
986 launch.Log("Node %d logs at %s", i+1, path)
987 nodeOpts[i].SerialPort = port
988 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200989 }
990
991 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200992 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100993 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200994 err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[i], done[i])
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200995 if err != nil {
996 return nil, fmt.Errorf("failed to launch node %d: %w", i+1, err)
997 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200998 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200999
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001000 seenNodes := make(map[string]bool)
1001 launch.Log("Cluster: waiting for nodes to appear as NEW...")
1002 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001003 for {
1004 nodes, err := getNodes(ctx, mgmt)
1005 if err != nil {
1006 ctxC()
1007 return nil, fmt.Errorf("could not get nodes: %w", err)
1008 }
1009 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001010 if n.State != cpb.NodeState_NODE_STATE_NEW {
1011 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +02001012 }
Serge Bazanski87d9c592024-03-20 12:35:11 +01001013 if seenNodes[n.Id] {
1014 continue
1015 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001016 seenNodes[n.Id] = true
1017 cluster.Nodes[n.Id] = &NodeInCluster{
1018 ID: n.Id,
1019 Pubkey: n.Pubkey,
1020 }
1021 cluster.NodeIDs = append(cluster.NodeIDs, n.Id)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001022 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001023
1024 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001025 break
1026 }
1027 time.Sleep(1 * time.Second)
1028 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001029 }
1030 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001031
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001032 approvedNodes := make(map[string]bool)
1033 upNodes := make(map[string]bool)
1034 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001035 for {
1036 nodes, err := getNodes(ctx, mgmt)
1037 if err != nil {
1038 ctxC()
1039 return nil, fmt.Errorf("could not get nodes: %w", err)
1040 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001041 for _, node := range nodes {
1042 if !seenNodes[node.Id] {
1043 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001044 continue
1045 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001046
1047 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
1048 launch.Log("Cluster: node %s is up", node.Id)
1049 upNodes[node.Id] = true
1050 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +02001051 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001052 if upNodes[node.Id] {
1053 continue
Serge Bazanskibe742842022-04-04 13:18:50 +02001054 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001055
1056 if !approvedNodes[node.Id] {
1057 launch.Log("Cluster: approving node %s", node.Id)
1058 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1059 Pubkey: node.Pubkey,
1060 })
1061 if err != nil {
1062 ctxC()
1063 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
1064 }
1065 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +02001066 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001067 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001068
1069 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
1070 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001071 break
1072 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001073 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001074 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001075 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001076
Serge Bazanski05f813b2023-03-16 17:58:39 +01001077 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +02001078 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001079 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001080 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001081 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001082
Serge Bazanskibe742842022-04-04 13:18:50 +02001083 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001084}
1085
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001086// RebootNode reboots the cluster member node matching the given index, and
1087// waits for it to rejoin the cluster. It will use the given context ctx to run
1088// cluster API requests, whereas the resulting QEMU process will be created
1089// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1090func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1091 if idx < 0 || idx >= len(c.NodeIDs) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001092 return fmt.Errorf("index out of bounds")
1093 }
1094 if c.nodeOpts[idx].Runtime == nil {
1095 return fmt.Errorf("node not running")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001096 }
1097 id := c.NodeIDs[idx]
1098
1099 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001100 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001101 if err != nil {
1102 return err
1103 }
1104 mgmt := apb.NewManagementClient(curC)
1105
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001106 // Cancel the node's context. This will shut down QEMU.
1107 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001108 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001109 err = <-c.nodesDone[idx]
1110 if err != nil {
1111 return fmt.Errorf("while restarting node: %w", err)
1112 }
1113
1114 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001115 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001116 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001117 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1118 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001119
Serge Bazanskibc969572024-03-21 11:56:13 +01001120 start := time.Now()
1121
1122 // Poll Management.GetNodes until the node is healthy.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001123 for {
1124 cs, err := getNode(ctx, mgmt, id)
1125 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001126 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001127 return err
1128 }
Serge Bazanskibc969572024-03-21 11:56:13 +01001129 launch.Log("Cluster: node health: %+v", cs.Health)
1130
1131 lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
1132 if lhb.After(start) && cs.Health == apb.Node_HEALTHY {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001133 break
1134 }
1135 time.Sleep(time.Second)
1136 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001137 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001138 return nil
1139}
1140
Serge Bazanski500f6e02024-04-03 12:06:40 +02001141// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
1142// given by idx. If the node is already shut down, this is a no-op.
1143func (c *Cluster) ShutdownNode(idx int) error {
1144 if idx < 0 || idx >= len(c.NodeIDs) {
1145 return fmt.Errorf("index out of bounds")
1146 }
1147 // Return if node is already stopped.
1148 select {
1149 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1150 return nil
1151 default:
1152 }
1153 id := c.NodeIDs[idx]
1154
1155 // Cancel the node's context. This will shut down QEMU.
1156 c.nodeOpts[idx].Runtime.CtxC()
1157 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
1158 err := <-c.nodesDone[idx]
1159 if err != nil {
1160 return fmt.Errorf("while shutting down node: %w", err)
1161 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001162 launch.Log("Cluster: node %d (%s) stopped.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001163 return nil
1164}
1165
1166// StartNode performs a power on of the node given by idx. If the node is already
1167// running, this is a no-op.
1168func (c *Cluster) StartNode(idx int) error {
1169 if idx < 0 || idx >= len(c.NodeIDs) {
1170 return fmt.Errorf("index out of bounds")
1171 }
1172 id := c.NodeIDs[idx]
1173 // Return if node is already running.
1174 select {
1175 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1176 default:
1177 return nil
1178 }
1179
1180 // Start QEMU again.
1181 launch.Log("Cluster: starting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001182 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanski500f6e02024-04-03 12:06:40 +02001183 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1184 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001185 launch.Log("Cluster: node %d (%s) started.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001186 return nil
1187}
1188
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001189// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001190// nodes to stop. It returns an error if stopping the nodes failed, or one of
1191// the nodes failed to fully start in the first place.
1192func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001193 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001194 if c.authClient != nil {
1195 c.authClient.Close()
1196 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001197 c.ctxC()
1198
Leopold20a036e2023-01-15 00:17:19 +01001199 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001200 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001201 for _, c := range c.nodesDone {
1202 err := <-c
1203 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001204 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001205 }
1206 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001207 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001208 os.RemoveAll(c.launchDir)
1209 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001210 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001211 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001212 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001213}
Serge Bazanskibe742842022-04-04 13:18:50 +02001214
1215// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1216// their ID. This is performed by connecting to the cluster nanoswitch via its
1217// SOCKS proxy, and using the cluster node list for name resolution.
1218//
1219// For example:
1220//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001221// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001222func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1223 host, port, err := net.SplitHostPort(addr)
1224 if err != nil {
1225 return nil, fmt.Errorf("invalid host:port: %w", err)
1226 }
1227 // Already an IP address?
1228 if net.ParseIP(host) != nil {
Lorenz Brun276a7462023-07-12 21:28:54 +02001229 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001230 }
1231
1232 // Otherwise, expect a node name.
1233 node, ok := c.Nodes[host]
1234 if !ok {
1235 return nil, fmt.Errorf("unknown node %q", host)
1236 }
1237 addr = net.JoinHostPort(node.ManagementAddress, port)
Lorenz Brun276a7462023-07-12 21:28:54 +02001238 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001239}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001240
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001241// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1242// Kubernetes authenticating proxy using the cluster owner identity.
1243// It currently has access to everything (i.e. the cluster-admin role)
1244// via the owner-admin binding.
1245func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1246 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1247 if err != nil {
1248 // We explicitly pass an Ed25519 private key in, so this can't happen
1249 panic(err)
1250 }
1251
1252 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001253 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001254 Host: host,
1255 TLSClientConfig: rest.TLSClientConfig{
1256 // TODO(q3k): use CA certificate
1257 Insecure: true,
1258 ServerName: "kubernetes.default.svc",
1259 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1260 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1261 },
1262 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1263 return c.DialNode(ctx, address)
1264 },
1265 }
1266 return kubernetes.NewForConfig(&clientConfig)
1267}
1268
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001269// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1270// which are currently Kubernetes controllers, ie. run an apiserver. This list
1271// might be empty if no node is currently configured with the
1272// 'KubernetesController' node.
1273func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1274 curC, err := c.CuratorClient()
1275 if err != nil {
1276 return nil, err
1277 }
1278 mgmt := apb.NewManagementClient(curC)
1279 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1280 Filter: "has(node.roles.kubernetes_controller)",
1281 })
1282 if err != nil {
1283 return nil, err
1284 }
1285 defer srv.CloseSend()
1286 var res []string
1287 for {
1288 n, err := srv.Recv()
1289 if err == io.EOF {
1290 break
1291 }
1292 if err != nil {
1293 return nil, err
1294 }
1295 if n.Status == nil || n.Status.ExternalAddress == "" {
1296 continue
1297 }
1298 res = append(res, n.Status.ExternalAddress)
1299 }
1300 return res, nil
1301}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001302
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001303// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1304// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001305func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1306 // Get an authenticated owner client within the cluster.
1307 curC, err := c.CuratorClient()
1308 if err != nil {
1309 return err
1310 }
1311 mgmt := apb.NewManagementClient(curC)
1312 nodes, err := getNodes(ctx, mgmt)
1313 if err != nil {
1314 return err
1315 }
1316
1317 var unhealthy []string
1318 for _, node := range nodes {
1319 if node.Health == apb.Node_HEALTHY {
1320 continue
1321 }
1322 unhealthy = append(unhealthy, node.Id)
1323 }
1324 if len(unhealthy) == 0 {
1325 return nil
1326 }
1327 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1328}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001329
1330// ApproveNode approves a node by ID, waiting for it to become UP.
1331func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1332 curC, err := c.CuratorClient()
1333 if err != nil {
1334 return err
1335 }
1336 mgmt := apb.NewManagementClient(curC)
1337
1338 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1339 Pubkey: c.Nodes[id].Pubkey,
1340 })
1341 if err != nil {
1342 return fmt.Errorf("ApproveNode: %w", err)
1343 }
1344 launch.Log("Cluster: %s: approved, waiting for UP", id)
1345 for {
1346 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1347 if err != nil {
1348 return fmt.Errorf("GetNodes: %w", err)
1349 }
1350 found := false
1351 for {
1352 node, err := nodes.Recv()
1353 if errors.Is(err, io.EOF) {
1354 break
1355 }
1356 if err != nil {
1357 return fmt.Errorf("Nodes.Recv: %w", err)
1358 }
1359 if node.Id != id {
1360 continue
1361 }
1362 if node.State != cpb.NodeState_NODE_STATE_UP {
1363 continue
1364 }
1365 found = true
1366 break
1367 }
1368 nodes.CloseSend()
1369
1370 if found {
1371 break
1372 }
1373 time.Sleep(time.Second)
1374 }
1375 launch.Log("Cluster: %s: UP", id)
1376 return nil
1377}
1378
1379// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1380func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1381 curC, err := c.CuratorClient()
1382 if err != nil {
1383 return err
1384 }
1385 mgmt := apb.NewManagementClient(curC)
1386
1387 tr := true
1388 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1389 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1390 Node: &apb.UpdateNodeRolesRequest_Id{
1391 Id: id,
1392 },
1393 KubernetesWorker: &tr,
1394 })
1395 return err
1396}
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001397
1398// MakeConsensusMember adds the ConsensusMember role to a node by ID.
1399func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
1400 curC, err := c.CuratorClient()
1401 if err != nil {
1402 return err
1403 }
1404 mgmt := apb.NewManagementClient(curC)
1405 cur := ipb.NewCuratorClient(curC)
1406
1407 tr := true
1408 launch.Log("Cluster: %s: adding ConsensusMember", id)
1409 bo := backoff.NewExponentialBackOff()
1410 bo.MaxElapsedTime = 10 * time.Second
1411
1412 backoff.Retry(func() error {
1413 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1414 Node: &apb.UpdateNodeRolesRequest_Id{
1415 Id: id,
1416 },
1417 ConsensusMember: &tr,
1418 })
1419 if err != nil {
1420 launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
1421 }
1422 return err
1423 }, backoff.WithContext(bo, ctx))
1424 if err != nil {
1425 return err
1426 }
1427
1428 launch.Log("Cluster: %s: waiting for learner/full members...", id)
1429
1430 learner := false
1431 for {
1432 res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
1433 if err != nil {
1434 return fmt.Errorf("GetConsensusStatus: %w", err)
1435 }
1436 for _, member := range res.EtcdMember {
1437 if member.Id != id {
1438 continue
1439 }
1440 switch member.Status {
1441 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
1442 if !learner {
1443 learner = true
1444 launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
1445 }
1446 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
1447 launch.Log("Cluster: %s: became a full member", id)
1448 return nil
1449 }
1450 }
1451 time.Sleep(100 * time.Millisecond)
1452 }
1453}