blob: 98ab7601778cc19233e16553bff8f8bbcdf26b96 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
Tim Windelschmidt9f21f532024-05-07 15:14:20 +02005package launch
Serge Bazanski66e58952021-10-05 17:06:56 +02006
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020019 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "os"
21 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010022 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "path/filepath"
Serge Bazanski53458ba2024-06-18 09:56:46 +000024 "strconv"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020025 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020026 "syscall"
27 "time"
28
29 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020031 "golang.org/x/net/proxy"
Lorenz Brun87bbf7e2024-03-18 18:22:25 +010032 "golang.org/x/sys/unix"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010034 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020037 "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020039
Serge Bazanski37cfcc12024-03-21 11:59:07 +010040 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020041 apb "source.monogon.dev/metropolis/proto/api"
42 cpb "source.monogon.dev/metropolis/proto/common"
43
Serge Bazanskidd5b03c2024-05-16 18:07:06 +020044 "source.monogon.dev/go/qcow2"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010045 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Serge Bazanski66e58952021-10-05 17:06:56 +020046 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020047 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020048 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020049 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020050 "source.monogon.dev/metropolis/test/localregistry"
51 "source.monogon.dev/osbase/test/launch"
Serge Bazanski66e58952021-10-05 17:06:56 +020052)
53
Serge Bazanski53458ba2024-06-18 09:56:46 +000054const (
55 // nodeNumberKey is the key of the node label used to carry a node's numerical
56 // index in the test system.
57 nodeNumberKey string = "test-node-number"
58)
59
Leopold20a036e2023-01-15 00:17:19 +010060// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020061type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010062 // Name is a human-readable identifier to be used in debug output.
63 Name string
64
Serge Bazanski66e58952021-10-05 17:06:56 +020065 // Ports contains the port mapping where to expose the internal ports of the VM to
66 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
67 // ConnectToSocket is set.
68 Ports launch.PortMap
69
Leopold20a036e2023-01-15 00:17:19 +010070 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
71 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020072 // want to test reboot behavior this should be false.
73 AllowReboot bool
74
Leopold20a036e2023-01-15 00:17:19 +010075 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
76 // set, it is instead connected to the given file descriptor/socket. If this is
77 // set, all port maps from the Ports option are ignored. Intended for networking
78 // this instance together with others for running more complex network
79 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020080 ConnectToSocket *os.File
81
Leopoldacfad5b2023-01-15 14:05:25 +010082 // When PcapDump is set, all traffic is dumped to a pcap file in the
83 // runtime directory (e.g. "net0.pcap" for the first interface).
84 PcapDump bool
85
Leopold20a036e2023-01-15 00:17:19 +010086 // SerialPort is an io.ReadWriter over which you can communicate with the serial
87 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020088 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
89 SerialPort io.ReadWriter
90
91 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
92 // registering into a cluster.
93 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020094
95 // Mac is the node's MAC address.
96 Mac *net.HardwareAddr
97
98 // Runtime keeps the node's QEMU runtime state.
99 Runtime *NodeRuntime
100}
101
Leopold20a036e2023-01-15 00:17:19 +0100102// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200103type NodeRuntime struct {
104 // ld points at the node's launch directory storing data such as storage
105 // images, firmware variables or the TPM state.
106 ld string
107 // sd points at the node's socket directory.
108 sd string
109
110 // ctxT is the context QEMU will execute in.
111 ctxT context.Context
112 // CtxC is the QEMU context's cancellation function.
113 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200114}
115
116// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200117var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200118 node.ConsensusPort,
119
120 node.CuratorServicePort,
121 node.DebugServicePort,
122
123 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100124 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200125 node.CuratorServicePort,
126 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200127 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200128}
129
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200130// setupRuntime creates the node's QEMU runtime directory, together with all
131// files required to preserve its state, a level below the chosen path ld. The
132// node's socket directory is similarily created a level below sd. It may
133// return an I/O error.
134func setupRuntime(ld, sd string) (*NodeRuntime, error) {
135 // Create a temporary directory to keep all the runtime files.
136 stdp, err := os.MkdirTemp(ld, "node_state*")
137 if err != nil {
138 return nil, fmt.Errorf("failed to create the state directory: %w", err)
139 }
140
141 // Initialize the node's storage with a prebuilt image.
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200142 di := filepath.Join(stdp, "image.qcow2")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000143 launch.Log("Cluster: generating node QCOW2 snapshot image: %s -> %s", xNodeImagePath, di)
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200144
145 df, err := os.Create(di)
146 if err != nil {
147 return nil, fmt.Errorf("while opening image for writing: %w", err)
148 }
149 defer df.Close()
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000150 if err := qcow2.Generate(df, qcow2.GenerateWithBackingFile(xNodeImagePath)); err != nil {
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200151 return nil, fmt.Errorf("while creating copy-on-write node image: %w", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200152 }
153
154 // Initialize the OVMF firmware variables file.
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000155 dv := filepath.Join(stdp, filepath.Base(xOvmfVarsPath))
156 if err := copyFile(xOvmfVarsPath, dv); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200157 return nil, fmt.Errorf("while copying firmware variables: %w", err)
158 }
159
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200160 // Create the socket directory.
161 sotdp, err := os.MkdirTemp(sd, "node_sock*")
162 if err != nil {
163 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
164 }
165
166 return &NodeRuntime{
167 ld: stdp,
168 sd: sotdp,
169 }, nil
170}
171
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200172// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200173// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200174func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200175 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200176 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200177 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100178 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200179 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200180 for _, n := range c.NodeIDs {
181 ep, err := resolver.NodeWithDefaultPort(n)
182 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200183 return nil, fmt.Errorf("could not add node %q by DNS: %w", n, err)
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200184 }
185 r.AddEndpoint(ep)
186 }
187 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
188 grpc.WithTransportCredentials(authCreds),
189 grpc.WithResolvers(r),
190 grpc.WithContextDialer(c.DialNode),
191 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200192 if err != nil {
193 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
194 }
195 c.authClient = authClient
196 }
197 return c.authClient, nil
198}
199
Serge Bazanski66e58952021-10-05 17:06:56 +0200200// LaunchNode launches a single Metropolis node instance with the given options.
201// The instance runs mostly paravirtualized but with some emulated hardware
202// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200203// writable, and the changes are kept across reboots and shutdowns. ld and sd
204// point to the launch directory and the socket directory, holding the nodes'
205// state files (storage, tpm state, firmware state), and UNIX socket files
206// (swtpm <-> QEMU interplay) respectively. The directories must exist before
207// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
208// if either are not initialized.
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200209func LaunchNode(ctx context.Context, ld, sd string, tpmFactory *TPMFactory, options *NodeOptions, doneC chan error) error {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200210 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
211 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200212 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
213 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
214 // that we open and pass the name of it to QEMU. Not pinning this crashes both
215 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
216 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200217
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200218 // If it's the node's first start, set up its runtime directories.
219 if options.Runtime == nil {
220 r, err := setupRuntime(ld, sd)
221 if err != nil {
222 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200223 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200224 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200225 }
226
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200227 // Replace the node's context with a new one.
228 r := options.Runtime
229 if r.CtxC != nil {
230 r.CtxC()
231 }
232 r.ctxT, r.CtxC = context.WithCancel(ctx)
233
Serge Bazanski66e58952021-10-05 17:06:56 +0200234 var qemuNetType string
235 var qemuNetConfig launch.QemuValue
236 if options.ConnectToSocket != nil {
237 qemuNetType = "socket"
238 qemuNetConfig = launch.QemuValue{
239 "id": {"net0"},
240 "fd": {"3"},
241 }
242 } else {
243 qemuNetType = "user"
244 qemuNetConfig = launch.QemuValue{
245 "id": {"net0"},
246 "net": {"10.42.0.0/24"},
247 "dhcpstart": {"10.42.0.10"},
248 "hostfwd": options.Ports.ToQemuForwards(),
249 }
250 }
251
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200252 // Generate the node's MAC address if it isn't already set in NodeOptions.
253 if options.Mac == nil {
254 mac, err := generateRandomEthernetMAC()
255 if err != nil {
256 return err
257 }
258 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200259 }
260
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200261 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
262 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200263 storagePath := filepath.Join(r.ld, "image.qcow2")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200264 qemuArgs := []string{
Serge Bazanski99b02142024-04-17 16:33:28 +0200265 "-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "2048",
Serge Bazanski66e58952021-10-05 17:06:56 +0200266 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000267 "-drive", "if=pflash,format=raw,readonly=on,file=" + xOvmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200268 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
Serge Bazanskidd5b03c2024-05-16 18:07:06 +0200269 "-drive", "if=virtio,format=qcow2,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200270 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200271 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200272 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
273 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
274 "-device", "tpm-tis,tpmdev=tpm0",
275 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200276 "-serial", "stdio",
277 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200278
279 if !options.AllowReboot {
280 qemuArgs = append(qemuArgs, "-no-reboot")
281 }
282
283 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200284 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200285 parametersRaw, err := proto.Marshal(options.NodeParameters)
286 if err != nil {
287 return fmt.Errorf("failed to encode node paraeters: %w", err)
288 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200289 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200290 return fmt.Errorf("failed to write node parameters: %w", err)
291 }
292 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
293 }
294
Leopoldacfad5b2023-01-15 14:05:25 +0100295 if options.PcapDump {
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200296 qemuNetDump := launch.QemuValue{
297 "id": {"net0"},
298 "netdev": {"net0"},
299 "file": {filepath.Join(r.ld, "net0.pcap")},
Leopoldacfad5b2023-01-15 14:05:25 +0100300 }
301 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
302 }
303
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200304 // Manufacture TPM if needed.
305 tpmd := filepath.Join(r.ld, "tpm")
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000306 err := tpmFactory.Manufacture(ctx, tpmd, &TPMPlatform{
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200307 Manufacturer: "Monogon",
308 Version: "1.0",
309 Model: "TestCluster",
310 })
311 if err != nil {
312 return fmt.Errorf("could not manufacture TPM: %w", err)
313 }
314
Serge Bazanski66e58952021-10-05 17:06:56 +0200315 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200316 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200317
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000318 tpmEmuCmd := exec.CommandContext(tpmCtx, xSwtpmPath, "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanskib07c57a2024-06-04 14:33:27 +0000319 // Silence warnings from unsafe libtpms build (uses non-constant-time
320 // cryptographic operations).
321 tpmEmuCmd.Env = append(tpmEmuCmd.Env, "MONOGON_LIBTPMS_ACKNOWLEDGE_UNSAFE=yes")
Serge Bazanski66e58952021-10-05 17:06:56 +0200322 tpmEmuCmd.Stderr = os.Stderr
323 tpmEmuCmd.Stdout = os.Stdout
324
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100325 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200326 if err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200327 tpmCancel()
Serge Bazanski66e58952021-10-05 17:06:56 +0200328 return fmt.Errorf("failed to start TPM emulator: %w", err)
329 }
330
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200331 // Wait for the socket to be created by the TPM emulator before launching
332 // QEMU.
333 for {
334 _, err := os.Stat(tpmSocketPath)
335 if err == nil {
336 break
337 }
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200338 if !os.IsNotExist(err) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200339 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200340 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
341 }
342 if err := tpmCtx.Err(); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200343 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200344 return fmt.Errorf("while waiting for the TPM socket: %w", err)
345 }
346 time.Sleep(time.Millisecond * 100)
347 }
348
Serge Bazanski66e58952021-10-05 17:06:56 +0200349 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200350 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200351 if options.ConnectToSocket != nil {
352 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
353 }
354
355 var stdErrBuf bytes.Buffer
356 systemCmd.Stderr = &stdErrBuf
357 systemCmd.Stdout = options.SerialPort
358
Leopoldaf5086b2023-01-15 14:12:42 +0100359 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
360
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200361 go func() {
362 launch.Log("Node: Starting...")
363 err = systemCmd.Run()
364 launch.Log("Node: Returned: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200365
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200366 // Stop TPM emulator and wait for it to exit to properly reap the child process
367 tpmCancel()
368 launch.Log("Node: Waiting for TPM emulator to exit")
369 // Wait returns a SIGKILL error because we just cancelled its context.
370 // We still need to call it to avoid creating zombies.
371 errTpm := tpmEmuCmd.Wait()
372 launch.Log("Node: TPM emulator done: %v", errTpm)
Serge Bazanski66e58952021-10-05 17:06:56 +0200373
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200374 var exerr *exec.ExitError
375 if err != nil && errors.As(err, &exerr) {
376 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
377 if status.Signaled() && status.Signal() == syscall.SIGKILL {
378 // Process was killed externally (most likely by our context being canceled).
379 // This is a normal exit for us, so return nil
380 doneC <- nil
381 return
382 }
383 exerr.Stderr = stdErrBuf.Bytes()
384 newErr := launch.QEMUError(*exerr)
385 launch.Log("Node: %q", stdErrBuf.String())
386 doneC <- &newErr
387 return
Serge Bazanski66e58952021-10-05 17:06:56 +0200388 }
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200389 doneC <- err
390 }()
391 return nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200392}
393
394func copyFile(src, dst string) error {
395 in, err := os.Open(src)
396 if err != nil {
397 return fmt.Errorf("when opening source: %w", err)
398 }
399 defer in.Close()
400
401 out, err := os.Create(dst)
402 if err != nil {
403 return fmt.Errorf("when creating destination: %w", err)
404 }
405 defer out.Close()
406
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100407 endPos, err := in.Seek(0, io.SeekEnd)
Serge Bazanski66e58952021-10-05 17:06:56 +0200408 if err != nil {
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100409 return fmt.Errorf("when getting source end: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200410 }
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100411
412 // Copy the file while preserving its sparseness. The image files are very
413 // sparse (less than 10% allocated), so this is a lot faster.
414 var lastHoleStart int64
415 for {
416 dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
417 if err != nil {
418 return fmt.Errorf("when seeking to next data block: %w", err)
419 }
420 holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
421 if err != nil {
422 return fmt.Errorf("when seeking to next hole: %w", err)
423 }
424 lastHoleStart = holeStart
425 if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
426 return fmt.Errorf("when seeking to current data block: %w", err)
427 }
428 if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
429 return fmt.Errorf("when seeking output to next data block: %w", err)
430 }
431 if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
432 return fmt.Errorf("when copying file: %w", err)
433 }
434 if endPos == holeStart {
435 // The next hole is at the end of the file, we're done here.
436 break
437 }
438 }
439
Serge Bazanski66e58952021-10-05 17:06:56 +0200440 return out.Close()
441}
442
Serge Bazanskie78a0892021-10-07 17:03:49 +0200443// getNodes wraps around Management.GetNodes to return a list of nodes in a
444// cluster.
445func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200446 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100447 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100448 err := backoff.Retry(func() error {
449 res = nil
450 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200451 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100452 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200453 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100454 for {
455 node, err := srvN.Recv()
456 if err == io.EOF {
457 break
458 }
459 if err != nil {
460 return fmt.Errorf("GetNodes.Recv: %w", err)
461 }
462 res = append(res, node)
463 }
464 return nil
465 }, bo)
466 if err != nil {
467 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200468 }
469 return res, nil
470}
471
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200472// getNode wraps Management.GetNodes. It returns node information matching
473// given node ID.
474func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
475 nodes, err := getNodes(ctx, mgmt)
476 if err != nil {
477 return nil, fmt.Errorf("could not get nodes: %w", err)
478 }
479 for _, n := range nodes {
480 eid := identity.NodeID(n.Pubkey)
481 if eid != id {
482 continue
483 }
484 return n, nil
485 }
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200486 return nil, fmt.Errorf("no such node")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200487}
488
Serge Bazanski66e58952021-10-05 17:06:56 +0200489// Gets a random EUI-48 Ethernet MAC address
490func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
491 macBuf := make([]byte, 6)
492 _, err := rand.Read(macBuf)
493 if err != nil {
Tim Windelschmidtadcf5d72024-05-21 13:46:25 +0200494 return nil, fmt.Errorf("failed to read randomness for MAC: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200495 }
496
497 // Set U/L bit and clear I/G bit (locally administered individual MAC)
498 // Ref IEEE 802-2014 Section 8.2.2
499 macBuf[0] = (macBuf[0] | 2) & 0xfe
500 mac := net.HardwareAddr(macBuf)
501 return &mac, nil
502}
503
Serge Bazanskibe742842022-04-04 13:18:50 +0200504const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200505
Serge Bazanskibe742842022-04-04 13:18:50 +0200506// ClusterPorts contains all ports handled by Nanoswitch.
507var ClusterPorts = []uint16{
508 // Forwarded to the first node.
509 uint16(node.CuratorServicePort),
510 uint16(node.DebugServicePort),
511 uint16(node.KubernetesAPIPort),
512 uint16(node.KubernetesAPIWrappedPort),
513
514 // SOCKS proxy to the switch network
515 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200516}
517
518// ClusterOptions contains all options for launching a Metropolis cluster.
519type ClusterOptions struct {
520 // The number of nodes this cluster should be started with.
521 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100522
523 // If true, node logs will be saved to individual files instead of being printed
524 // out to stderr. The path of these files will be still printed to stdout.
525 //
526 // The files will be located within the launch directory inside TEST_TMPDIR (or
527 // the default tempdir location, if not set).
528 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200529
530 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
531 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
532 // incomplete.
533 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200534
535 // Optional local registry which will be made available to the cluster to
536 // pull images from. This is a more efficient alternative to preseeding all
537 // images used for testing.
538 LocalRegistry *localregistry.Server
Serge Bazanskie564f172024-04-03 12:06:06 +0200539
540 // InitialClusterConfiguration will be passed to the first node when creating the
541 // cluster, and defines some basic properties of the cluster. If not specified,
542 // the cluster will default to defaults as defined in
543 // metropolis.proto.api.NodeParameters.
544 InitialClusterConfiguration *cpb.ClusterConfiguration
Serge Bazanski66e58952021-10-05 17:06:56 +0200545}
546
547// Cluster is the running Metropolis cluster launched using the LaunchCluster
548// function.
549type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200550 // Owner is the TLS Certificate of the owner of the test cluster. This can be
551 // used to authenticate further clients to the running cluster.
552 Owner tls.Certificate
553 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200554 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200555 Ports launch.PortMap
556
Serge Bazanskibe742842022-04-04 13:18:50 +0200557 // Nodes is a map from Node ID to its runtime information.
558 Nodes map[string]*NodeInCluster
559 // NodeIDs is a list of node IDs that are backing this cluster, in order of
560 // creation.
561 NodeIDs []string
562
Serge Bazanski54e212a2023-06-14 13:45:11 +0200563 // CACertificate is the cluster's CA certificate.
564 CACertificate *x509.Certificate
565
Serge Bazanski66e58952021-10-05 17:06:56 +0200566 // nodesDone is a list of channels populated with the return codes from all the
567 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100568 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200569 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200570 // nodeOpts are the cluster member nodes' mutable launch options, kept here
571 // to facilitate reboots.
572 nodeOpts []NodeOptions
573 // launchDir points at the directory keeping the nodes' state, such as storage
574 // images, firmware variable files, TPM state.
575 launchDir string
576 // socketDir points at the directory keeping UNIX socket files, such as these
577 // used to facilitate communication between QEMU and swtpm. It's different
578 // from launchDir, and anchored nearer the file system root, due to the
579 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100580 socketDir string
581 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200582
Lorenz Brun276a7462023-07-12 21:28:54 +0200583 // SOCKSDialer is used by DialNode to establish connections to nodes via the
Serge Bazanskibe742842022-04-04 13:18:50 +0200584 // SOCKS server ran by nanoswitch.
Lorenz Brun276a7462023-07-12 21:28:54 +0200585 SOCKSDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200586
587 // authClient is a cached authenticated owner connection to a Curator
588 // instance within the cluster.
589 authClient *grpc.ClientConn
590
591 // ctxT is the context individual node contexts are created from.
592 ctxT context.Context
593 // ctxC is used by Close to cancel the context under which the nodes are
594 // running.
595 ctxC context.CancelFunc
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200596
597 tpmFactory *TPMFactory
Serge Bazanskibe742842022-04-04 13:18:50 +0200598}
599
600// NodeInCluster represents information about a node that's part of a Cluster.
601type NodeInCluster struct {
602 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200603 ID string
604 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200605 // Address of the node on the network ran by nanoswitch. Not reachable from the
606 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
607 // on Cluster.Ports[SOCKSPort]).
608 ManagementAddress string
609}
610
611// firstConnection performs the initial owner credential escrow with a newly
612// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
613// running at 10.1.0.2, which is always the case with the current nanoswitch
614// implementation.
615//
Leopold20a036e2023-01-15 00:17:19 +0100616// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200617// information as NodeInCluster.
618func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
619 // Dial external service.
620 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100621 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200622 if err != nil {
623 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
624 }
625 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
626 return socksDialer.Dial("tcp", addr)
627 }
628 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
629 if err != nil {
630 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
631 }
632 defer initClient.Close()
633
634 // Retrieve owner certificate - this can take a while because the node is still
635 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100636 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200637 aaa := apb.NewAAAClient(initClient)
638 var cert *tls.Certificate
639 err = backoff.Retry(func() error {
640 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
641 if st, ok := status.FromError(err); ok {
642 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100643 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200644 return err
645 }
646 }
647 return backoff.Permanent(err)
648 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
649 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200650 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200651 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100652 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200653
654 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200655 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200656 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
657 if err != nil {
658 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
659 }
660 defer authClient.Close()
661 mgmt := apb.NewManagementClient(authClient)
662
663 var node *NodeInCluster
664 err = backoff.Retry(func() error {
665 nodes, err := getNodes(ctx, mgmt)
666 if err != nil {
667 return fmt.Errorf("retrieving nodes failed: %w", err)
668 }
669 if len(nodes) != 1 {
670 return fmt.Errorf("expected one node, got %d", len(nodes))
671 }
672 n := nodes[0]
673 if n.Status == nil || n.Status.ExternalAddress == "" {
674 return fmt.Errorf("node has no status and/or address")
675 }
676 node = &NodeInCluster{
677 ID: identity.NodeID(n.Pubkey),
678 ManagementAddress: n.Status.ExternalAddress,
679 }
680 return nil
681 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
682 if err != nil {
683 return nil, nil, err
684 }
685
686 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200687}
688
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100689func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200690 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100691 if err != nil {
692 return nil, err
693 }
694 return f, nil
695}
696
Serge Bazanski66e58952021-10-05 17:06:56 +0200697// LaunchCluster launches a cluster of Metropolis node VMs together with a
698// Nanoswitch instance to network them all together.
699//
700// The given context will be used to run all qemu instances in the cluster, and
701// canceling the context or calling Close() will terminate them.
702func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200703 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200704 return nil, errors.New("refusing to start cluster with zero nodes")
705 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200706
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200707 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100708 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200709 if err != nil {
710 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
711 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100712 // Create the metroctl config directory. We keep it in /tmp because in some
713 // scenarios it's end-user visible and we want it short.
714 md, err := os.MkdirTemp("/tmp", "metroctl-*")
715 if err != nil {
716 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
717 }
718
719 // Create the socket directory. We keep it in /tmp because of socket path limits.
720 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200721 if err != nil {
722 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
723 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200724
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200725 // Set up TPM factory.
726 tpmf, err := NewTPMFactory(filepath.Join(ld, "tpm"))
727 if err != nil {
728 return nil, fmt.Errorf("failed to create TPM factory: %w", err)
729 }
730
Serge Bazanski66e58952021-10-05 17:06:56 +0200731 // Prepare links between nodes and nanoswitch.
732 var switchPorts []*os.File
733 var vmPorts []*os.File
734 for i := 0; i < opts.NumNodes; i++ {
735 switchPort, vmPort, err := launch.NewSocketPair()
736 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200737 return nil, fmt.Errorf("failed to get socketpair: %w", err)
738 }
739 switchPorts = append(switchPorts, switchPort)
740 vmPorts = append(vmPorts, vmPort)
741 }
742
Serge Bazanskie78a0892021-10-07 17:03:49 +0200743 // Make a list of channels that will be populated by all running node qemu
744 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200745 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200746 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200747 done[i] = make(chan error, 1)
748 }
749
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200750 // Prepare the node options. These will be kept as part of Cluster.
751 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
752 // launch. The runtime information can be later used to restart a node.
753 // The 0th node will be initialized first. The rest will follow after it
754 // had bootstrapped the cluster.
755 nodeOpts := make([]NodeOptions, opts.NumNodes)
756 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100757 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200758 ConnectToSocket: vmPorts[0],
759 NodeParameters: &apb.NodeParameters{
760 Cluster: &apb.NodeParameters_ClusterBootstrap_{
761 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
Serge Bazanskie564f172024-04-03 12:06:06 +0200762 OwnerPublicKey: InsecurePublicKey,
763 InitialClusterConfiguration: opts.InitialClusterConfiguration,
Serge Bazanski11198c82024-05-22 14:11:01 +0200764 Labels: &cpb.NodeLabels{
765 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski53458ba2024-06-18 09:56:46 +0000766 {Key: nodeNumberKey, Value: "0"},
Serge Bazanski11198c82024-05-22 14:11:01 +0200767 },
768 },
Serge Bazanski66e58952021-10-05 17:06:56 +0200769 },
770 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200771 },
772 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100773 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200774 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100775 if opts.NodeLogsToFiles {
Jan Schär0b927652024-07-31 18:08:50 +0200776 path := path.Join(ld, "node-0.txt")
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100777 port, err := NewSerialFileLogger(path)
778 if err != nil {
Jan Schär0b927652024-07-31 18:08:50 +0200779 return nil, fmt.Errorf("could not open log file for node 0: %w", err)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100780 }
Jan Schär0b927652024-07-31 18:08:50 +0200781 launch.Log("Node 0 logs at %s", path)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100782 nodeOpts[0].SerialPort = port
783 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200784
785 // Start the first node.
786 ctxT, ctxC := context.WithCancel(ctx)
Jan Schär0b927652024-07-31 18:08:50 +0200787 launch.Log("Cluster: Starting node %d...", 0)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200788 if err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[0], done[0]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200789 ctxC()
790 return nil, fmt.Errorf("failed to launch first node: %w", err)
791 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200792
Lorenz Brun150f24a2023-07-13 20:11:06 +0200793 localRegistryAddr := net.TCPAddr{
794 IP: net.IPv4(10, 42, 0, 82),
795 Port: 5000,
796 }
797
798 var guestSvcMap launch.GuestServiceMap
799 if opts.LocalRegistry != nil {
800 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
801 if err != nil {
802 ctxC()
803 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
804 }
805 s := http.Server{
806 Handler: opts.LocalRegistry,
807 }
808 go s.Serve(l)
809 go func() {
810 <-ctxT.Done()
811 s.Close()
812 }()
813 guestSvcMap = launch.GuestServiceMap{
814 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
815 }
816 }
817
Serge Bazanskie78a0892021-10-07 17:03:49 +0200818 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200819 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
820 if err != nil {
821 ctxC()
822 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
823 }
824
825 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100826 var serialPort io.ReadWriter
827 if opts.NodeLogsToFiles {
828 path := path.Join(ld, "nanoswitch.txt")
829 serialPort, err = NewSerialFileLogger(path)
830 if err != nil {
831 launch.Log("Could not open log file for nanoswitch: %v", err)
832 }
833 launch.Log("Nanoswitch logs at %s", path)
834 } else {
835 serialPort = newPrefixedStdio(99)
836 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200837 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100838 Name: "nanoswitch",
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000839 KernelPath: xKernelPath,
840 InitramfsPath: xInitramfsPath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200841 ExtraNetworkInterfaces: switchPorts,
842 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200843 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100844 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100845 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200846 }); err != nil {
847 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100848 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200849 }
850 }
851 }()
852
Serge Bazanskibe742842022-04-04 13:18:50 +0200853 // Build SOCKS dialer.
854 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
855 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200856 if err != nil {
857 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200858 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200859 }
860
Serge Bazanskibe742842022-04-04 13:18:50 +0200861 // Retrieve owner credentials and first node.
862 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200863 if err != nil {
864 ctxC()
865 return nil, err
866 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200867
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100868 // Write credentials to the metroctl directory.
869 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
870 ctxC()
871 return nil, fmt.Errorf("could not write owner key: %w", err)
872 }
873 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
874 ctxC()
875 return nil, fmt.Errorf("could not write owner certificate: %w", err)
876 }
877
Serge Bazanski53458ba2024-06-18 09:56:46 +0000878 launch.Log("Cluster: Node %d is %s", 0, firstNode.ID)
879
880 // Set up a partially initialized cluster instance, to be filled in the
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200881 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200882 cluster := &Cluster{
883 Owner: *cert,
884 Ports: portMap,
885 Nodes: map[string]*NodeInCluster{
886 firstNode.ID: firstNode,
887 },
888 NodeIDs: []string{
889 firstNode.ID,
890 },
891
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100892 nodesDone: done,
893 nodeOpts: nodeOpts,
894 launchDir: ld,
895 socketDir: sd,
896 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200897
Lorenz Brun276a7462023-07-12 21:28:54 +0200898 SOCKSDialer: socksDialer,
Serge Bazanskibe742842022-04-04 13:18:50 +0200899
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200900 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200901 ctxC: ctxC,
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200902
903 tpmFactory: tpmf,
Serge Bazanskibe742842022-04-04 13:18:50 +0200904 }
905
906 // Now start the rest of the nodes and register them into the cluster.
907
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200908 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200909 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200910 if err != nil {
911 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200912 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200913 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200914 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200915
916 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100917 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200918 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
919 if err != nil {
920 ctxC()
921 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
922 }
923 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100924 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200925
926 // Retrieve cluster info (for directory and ca public key) to register further
927 // nodes.
928 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
929 if err != nil {
930 ctxC()
931 return nil, fmt.Errorf("GetClusterInfo: %w", err)
932 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200933 caCert, err := x509.ParseCertificate(resI.CaCertificate)
934 if err != nil {
935 ctxC()
936 return nil, fmt.Errorf("ParseCertificate: %w", err)
937 }
938 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200939
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200940 // Use the retrieved information to configure the rest of the node options.
941 for i := 1; i < opts.NumNodes; i++ {
942 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100943 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200944 ConnectToSocket: vmPorts[i],
945 NodeParameters: &apb.NodeParameters{
946 Cluster: &apb.NodeParameters_ClusterRegister_{
947 ClusterRegister: &apb.NodeParameters_ClusterRegister{
948 RegisterTicket: ticket,
949 ClusterDirectory: resI.ClusterDirectory,
950 CaCertificate: resI.CaCertificate,
Serge Bazanski30e30b32024-05-22 14:11:56 +0200951 Labels: &cpb.NodeLabels{
952 Pairs: []*cpb.NodeLabels_Pair{
Serge Bazanski53458ba2024-06-18 09:56:46 +0000953 {Key: nodeNumberKey, Value: fmt.Sprintf("%d", i)},
Serge Bazanski30e30b32024-05-22 14:11:56 +0200954 },
955 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200956 },
957 },
958 },
959 SerialPort: newPrefixedStdio(i),
960 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100961 if opts.NodeLogsToFiles {
Jan Schär0b927652024-07-31 18:08:50 +0200962 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i))
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100963 port, err := NewSerialFileLogger(path)
964 if err != nil {
Jan Schär0b927652024-07-31 18:08:50 +0200965 return nil, fmt.Errorf("could not open log file for node %d: %w", i, err)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100966 }
Jan Schär0b927652024-07-31 18:08:50 +0200967 launch.Log("Node %d logs at %s", i, path)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100968 nodeOpts[i].SerialPort = port
969 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200970 }
971
972 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200973 for i := 1; i < opts.NumNodes; i++ {
Jan Schär0b927652024-07-31 18:08:50 +0200974 launch.Log("Cluster: Starting node %d...", i)
Serge Bazanski2b6dc312024-06-04 17:44:55 +0200975 err := LaunchNode(ctxT, ld, sd, tpmf, &nodeOpts[i], done[i])
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200976 if err != nil {
Jan Schär0b927652024-07-31 18:08:50 +0200977 return nil, fmt.Errorf("failed to launch node %d: %w", i, err)
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200978 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200979 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200980
Serge Bazanski53458ba2024-06-18 09:56:46 +0000981 // Wait for nodes to appear as NEW, populate a map from node number (index into
982 // NodeOpts, etc.) to Metropolis Node ID.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200983 seenNodes := make(map[string]bool)
Serge Bazanski53458ba2024-06-18 09:56:46 +0000984 nodeNumberToID := make(map[int]string)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200985 launch.Log("Cluster: waiting for nodes to appear as NEW...")
986 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200987 for {
988 nodes, err := getNodes(ctx, mgmt)
989 if err != nil {
990 ctxC()
991 return nil, fmt.Errorf("could not get nodes: %w", err)
992 }
993 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200994 if n.State != cpb.NodeState_NODE_STATE_NEW {
995 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +0200996 }
Serge Bazanski87d9c592024-03-20 12:35:11 +0100997 if seenNodes[n.Id] {
998 continue
999 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001000 seenNodes[n.Id] = true
1001 cluster.Nodes[n.Id] = &NodeInCluster{
1002 ID: n.Id,
1003 Pubkey: n.Pubkey,
1004 }
Serge Bazanski53458ba2024-06-18 09:56:46 +00001005
1006 num, err := strconv.Atoi(node.GetNodeLabel(n.Labels, nodeNumberKey))
1007 if err != nil {
1008 return nil, fmt.Errorf("node %s has undecodable number label: %w", n.Id, err)
1009 }
1010 launch.Log("Cluster: Node %d is %s", num, n.Id)
1011 nodeNumberToID[num] = n.Id
Serge Bazanskie78a0892021-10-07 17:03:49 +02001012 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001013
1014 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001015 break
1016 }
1017 time.Sleep(1 * time.Second)
1018 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001019 }
1020 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001021
Serge Bazanski53458ba2024-06-18 09:56:46 +00001022 // Build the rest of NodeIDs from map.
1023 for i := 1; i < opts.NumNodes; i++ {
1024 cluster.NodeIDs = append(cluster.NodeIDs, nodeNumberToID[i])
1025 }
1026
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001027 approvedNodes := make(map[string]bool)
1028 upNodes := make(map[string]bool)
1029 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001030 for {
1031 nodes, err := getNodes(ctx, mgmt)
1032 if err != nil {
1033 ctxC()
1034 return nil, fmt.Errorf("could not get nodes: %w", err)
1035 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001036 for _, node := range nodes {
1037 if !seenNodes[node.Id] {
1038 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001039 continue
1040 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001041
1042 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
1043 launch.Log("Cluster: node %s is up", node.Id)
1044 upNodes[node.Id] = true
1045 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +02001046 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001047 if upNodes[node.Id] {
1048 continue
Serge Bazanskibe742842022-04-04 13:18:50 +02001049 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001050
1051 if !approvedNodes[node.Id] {
1052 launch.Log("Cluster: approving node %s", node.Id)
1053 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1054 Pubkey: node.Pubkey,
1055 })
1056 if err != nil {
1057 ctxC()
1058 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
1059 }
1060 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +02001061 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001062 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001063
Jan Schär0b927652024-07-31 18:08:50 +02001064 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes, len(upNodes)+1)
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001065 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001066 break
1067 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001068 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001069 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001070 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001071
Serge Bazanski05f813b2023-03-16 17:58:39 +01001072 launch.Log("Cluster: all nodes up:")
Jan Schär0b927652024-07-31 18:08:50 +02001073 for i, nodeID := range cluster.NodeIDs {
1074 launch.Log("Cluster: %d. %s at %s", i, nodeID, cluster.Nodes[nodeID].ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001075 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001076 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001077
Serge Bazanskibe742842022-04-04 13:18:50 +02001078 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001079}
1080
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001081// RebootNode reboots the cluster member node matching the given index, and
1082// waits for it to rejoin the cluster. It will use the given context ctx to run
1083// cluster API requests, whereas the resulting QEMU process will be created
1084// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1085func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1086 if idx < 0 || idx >= len(c.NodeIDs) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001087 return fmt.Errorf("index out of bounds")
1088 }
1089 if c.nodeOpts[idx].Runtime == nil {
1090 return fmt.Errorf("node not running")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001091 }
1092 id := c.NodeIDs[idx]
1093
1094 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001095 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001096 if err != nil {
1097 return err
1098 }
1099 mgmt := apb.NewManagementClient(curC)
1100
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001101 // Cancel the node's context. This will shut down QEMU.
1102 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001103 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001104 err = <-c.nodesDone[idx]
1105 if err != nil {
1106 return fmt.Errorf("while restarting node: %w", err)
1107 }
1108
1109 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001110 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001111 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001112 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1113 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001114
Serge Bazanskibc969572024-03-21 11:56:13 +01001115 start := time.Now()
1116
1117 // Poll Management.GetNodes until the node is healthy.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001118 for {
1119 cs, err := getNode(ctx, mgmt, id)
1120 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001121 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001122 return err
1123 }
Serge Bazanskibc969572024-03-21 11:56:13 +01001124 launch.Log("Cluster: node health: %+v", cs.Health)
1125
1126 lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
1127 if lhb.After(start) && cs.Health == apb.Node_HEALTHY {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001128 break
1129 }
1130 time.Sleep(time.Second)
1131 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001132 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001133 return nil
1134}
1135
Serge Bazanski500f6e02024-04-03 12:06:40 +02001136// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
1137// given by idx. If the node is already shut down, this is a no-op.
1138func (c *Cluster) ShutdownNode(idx int) error {
1139 if idx < 0 || idx >= len(c.NodeIDs) {
1140 return fmt.Errorf("index out of bounds")
1141 }
1142 // Return if node is already stopped.
1143 select {
1144 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1145 return nil
1146 default:
1147 }
1148 id := c.NodeIDs[idx]
1149
1150 // Cancel the node's context. This will shut down QEMU.
1151 c.nodeOpts[idx].Runtime.CtxC()
1152 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
1153 err := <-c.nodesDone[idx]
1154 if err != nil {
1155 return fmt.Errorf("while shutting down node: %w", err)
1156 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001157 launch.Log("Cluster: node %d (%s) stopped.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001158 return nil
1159}
1160
1161// StartNode performs a power on of the node given by idx. If the node is already
1162// running, this is a no-op.
1163func (c *Cluster) StartNode(idx int) error {
1164 if idx < 0 || idx >= len(c.NodeIDs) {
1165 return fmt.Errorf("index out of bounds")
1166 }
1167 id := c.NodeIDs[idx]
1168 // Return if node is already running.
1169 select {
1170 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1171 default:
1172 return nil
1173 }
1174
1175 // Start QEMU again.
1176 launch.Log("Cluster: starting node %d (%s).", idx, id)
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001177 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, c.tpmFactory, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
Serge Bazanski500f6e02024-04-03 12:06:40 +02001178 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1179 }
Serge Bazanski2b6dc312024-06-04 17:44:55 +02001180 launch.Log("Cluster: node %d (%s) started.", idx, id)
Serge Bazanski500f6e02024-04-03 12:06:40 +02001181 return nil
1182}
1183
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001184// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001185// nodes to stop. It returns an error if stopping the nodes failed, or one of
1186// the nodes failed to fully start in the first place.
1187func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001188 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001189 if c.authClient != nil {
1190 c.authClient.Close()
1191 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001192 c.ctxC()
1193
Leopold20a036e2023-01-15 00:17:19 +01001194 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001195 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001196 for _, c := range c.nodesDone {
1197 err := <-c
1198 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001199 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001200 }
1201 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001202 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001203 os.RemoveAll(c.launchDir)
1204 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001205 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001206 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001207 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001208}
Serge Bazanskibe742842022-04-04 13:18:50 +02001209
1210// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1211// their ID. This is performed by connecting to the cluster nanoswitch via its
1212// SOCKS proxy, and using the cluster node list for name resolution.
1213//
1214// For example:
1215//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001216// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001217func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1218 host, port, err := net.SplitHostPort(addr)
1219 if err != nil {
1220 return nil, fmt.Errorf("invalid host:port: %w", err)
1221 }
1222 // Already an IP address?
1223 if net.ParseIP(host) != nil {
Lorenz Brun276a7462023-07-12 21:28:54 +02001224 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001225 }
1226
1227 // Otherwise, expect a node name.
1228 node, ok := c.Nodes[host]
1229 if !ok {
1230 return nil, fmt.Errorf("unknown node %q", host)
1231 }
1232 addr = net.JoinHostPort(node.ManagementAddress, port)
Lorenz Brun276a7462023-07-12 21:28:54 +02001233 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001234}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001235
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001236// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1237// Kubernetes authenticating proxy using the cluster owner identity.
1238// It currently has access to everything (i.e. the cluster-admin role)
1239// via the owner-admin binding.
1240func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1241 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1242 if err != nil {
1243 // We explicitly pass an Ed25519 private key in, so this can't happen
1244 panic(err)
1245 }
1246
1247 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001248 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001249 Host: host,
1250 TLSClientConfig: rest.TLSClientConfig{
1251 // TODO(q3k): use CA certificate
1252 Insecure: true,
1253 ServerName: "kubernetes.default.svc",
1254 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1255 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1256 },
1257 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1258 return c.DialNode(ctx, address)
1259 },
1260 }
1261 return kubernetes.NewForConfig(&clientConfig)
1262}
1263
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001264// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1265// which are currently Kubernetes controllers, ie. run an apiserver. This list
1266// might be empty if no node is currently configured with the
1267// 'KubernetesController' node.
1268func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1269 curC, err := c.CuratorClient()
1270 if err != nil {
1271 return nil, err
1272 }
1273 mgmt := apb.NewManagementClient(curC)
1274 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1275 Filter: "has(node.roles.kubernetes_controller)",
1276 })
1277 if err != nil {
1278 return nil, err
1279 }
1280 defer srv.CloseSend()
1281 var res []string
1282 for {
1283 n, err := srv.Recv()
1284 if err == io.EOF {
1285 break
1286 }
1287 if err != nil {
1288 return nil, err
1289 }
1290 if n.Status == nil || n.Status.ExternalAddress == "" {
1291 continue
1292 }
1293 res = append(res, n.Status.ExternalAddress)
1294 }
1295 return res, nil
1296}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001297
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001298// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1299// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001300func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1301 // Get an authenticated owner client within the cluster.
1302 curC, err := c.CuratorClient()
1303 if err != nil {
1304 return err
1305 }
1306 mgmt := apb.NewManagementClient(curC)
1307 nodes, err := getNodes(ctx, mgmt)
1308 if err != nil {
1309 return err
1310 }
1311
1312 var unhealthy []string
1313 for _, node := range nodes {
1314 if node.Health == apb.Node_HEALTHY {
1315 continue
1316 }
1317 unhealthy = append(unhealthy, node.Id)
1318 }
1319 if len(unhealthy) == 0 {
1320 return nil
1321 }
1322 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1323}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001324
1325// ApproveNode approves a node by ID, waiting for it to become UP.
1326func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1327 curC, err := c.CuratorClient()
1328 if err != nil {
1329 return err
1330 }
1331 mgmt := apb.NewManagementClient(curC)
1332
1333 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1334 Pubkey: c.Nodes[id].Pubkey,
1335 })
1336 if err != nil {
1337 return fmt.Errorf("ApproveNode: %w", err)
1338 }
1339 launch.Log("Cluster: %s: approved, waiting for UP", id)
1340 for {
1341 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1342 if err != nil {
1343 return fmt.Errorf("GetNodes: %w", err)
1344 }
1345 found := false
1346 for {
1347 node, err := nodes.Recv()
1348 if errors.Is(err, io.EOF) {
1349 break
1350 }
1351 if err != nil {
1352 return fmt.Errorf("Nodes.Recv: %w", err)
1353 }
1354 if node.Id != id {
1355 continue
1356 }
1357 if node.State != cpb.NodeState_NODE_STATE_UP {
1358 continue
1359 }
1360 found = true
1361 break
1362 }
1363 nodes.CloseSend()
1364
1365 if found {
1366 break
1367 }
1368 time.Sleep(time.Second)
1369 }
1370 launch.Log("Cluster: %s: UP", id)
1371 return nil
1372}
1373
1374// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1375func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1376 curC, err := c.CuratorClient()
1377 if err != nil {
1378 return err
1379 }
1380 mgmt := apb.NewManagementClient(curC)
1381
1382 tr := true
1383 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1384 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1385 Node: &apb.UpdateNodeRolesRequest_Id{
1386 Id: id,
1387 },
1388 KubernetesWorker: &tr,
1389 })
1390 return err
1391}
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001392
1393// MakeConsensusMember adds the ConsensusMember role to a node by ID.
1394func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
1395 curC, err := c.CuratorClient()
1396 if err != nil {
1397 return err
1398 }
1399 mgmt := apb.NewManagementClient(curC)
1400 cur := ipb.NewCuratorClient(curC)
1401
1402 tr := true
1403 launch.Log("Cluster: %s: adding ConsensusMember", id)
1404 bo := backoff.NewExponentialBackOff()
1405 bo.MaxElapsedTime = 10 * time.Second
1406
1407 backoff.Retry(func() error {
1408 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1409 Node: &apb.UpdateNodeRolesRequest_Id{
1410 Id: id,
1411 },
1412 ConsensusMember: &tr,
1413 })
1414 if err != nil {
1415 launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
1416 }
1417 return err
1418 }, backoff.WithContext(bo, ctx))
1419 if err != nil {
1420 return err
1421 }
1422
1423 launch.Log("Cluster: %s: waiting for learner/full members...", id)
1424
1425 learner := false
1426 for {
1427 res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
1428 if err != nil {
1429 return fmt.Errorf("GetConsensusStatus: %w", err)
1430 }
1431 for _, member := range res.EtcdMember {
1432 if member.Id != id {
1433 continue
1434 }
1435 switch member.Status {
1436 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
1437 if !learner {
1438 learner = true
1439 launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
1440 }
1441 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
1442 launch.Log("Cluster: %s: became a full member", id)
1443 return nil
1444 }
1445 }
1446 time.Sleep(100 * time.Millisecond)
1447 }
1448}