blob: b37f5b57308e1b548a2b526838b9e283fd9c1696 [file] [log] [blame]
Serge Bazanski66e58952021-10-05 17:06:56 +02001// cluster builds on the launch package and implements launching Metropolis
2// nodes and clusters in a virtualized environment using qemu. It's kept in a
3// separate package as it depends on a Metropolis node image, which might not be
4// required for some use of the launch library.
5package cluster
6
7import (
8 "bytes"
9 "context"
Serge Bazanski1f8cad72023-03-20 16:58:10 +010010 "crypto/ed25519"
Serge Bazanski66e58952021-10-05 17:06:56 +020011 "crypto/rand"
12 "crypto/tls"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "crypto/x509"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020014 "encoding/pem"
Serge Bazanski66e58952021-10-05 17:06:56 +020015 "errors"
16 "fmt"
17 "io"
Serge Bazanski66e58952021-10-05 17:06:56 +020018 "net"
Lorenz Brun150f24a2023-07-13 20:11:06 +020019 "net/http"
Serge Bazanski66e58952021-10-05 17:06:56 +020020 "os"
21 "os/exec"
Leopoldacfad5b2023-01-15 14:05:25 +010022 "path"
Serge Bazanski66e58952021-10-05 17:06:56 +020023 "path/filepath"
Serge Bazanski630fb5c2023-04-06 10:50:24 +020024 "strings"
Serge Bazanski66e58952021-10-05 17:06:56 +020025 "syscall"
26 "time"
27
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +010028 "github.com/bazelbuild/rules_go/go/runfiles"
Serge Bazanski66e58952021-10-05 17:06:56 +020029 "github.com/cenkalti/backoff/v4"
Serge Bazanski66e58952021-10-05 17:06:56 +020030 "go.uber.org/multierr"
Serge Bazanskibe742842022-04-04 13:18:50 +020031 "golang.org/x/net/proxy"
Lorenz Brun87bbf7e2024-03-18 18:22:25 +010032 "golang.org/x/sys/unix"
Serge Bazanski66e58952021-10-05 17:06:56 +020033 "google.golang.org/grpc"
Serge Bazanski636032e2022-01-26 14:21:33 +010034 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/status"
Serge Bazanski66e58952021-10-05 17:06:56 +020036 "google.golang.org/protobuf/proto"
Serge Bazanskia0bc6d32023-06-28 18:57:40 +020037 "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/rest"
Serge Bazanski66e58952021-10-05 17:06:56 +020039
Serge Bazanski37cfcc12024-03-21 11:59:07 +010040 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +020041 apb "source.monogon.dev/metropolis/proto/api"
42 cpb "source.monogon.dev/metropolis/proto/common"
43
Serge Bazanski1f8cad72023-03-20 16:58:10 +010044 metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
Serge Bazanski66e58952021-10-05 17:06:56 +020045 "source.monogon.dev/metropolis/node"
Serge Bazanskie78a0892021-10-07 17:03:49 +020046 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski66e58952021-10-05 17:06:56 +020047 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski5bb8a332022-06-23 17:41:33 +020048 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Lorenz Brun150f24a2023-07-13 20:11:06 +020049 "source.monogon.dev/metropolis/pkg/localregistry"
Serge Bazanski66e58952021-10-05 17:06:56 +020050 "source.monogon.dev/metropolis/test/launch"
51)
52
Leopold20a036e2023-01-15 00:17:19 +010053// NodeOptions contains all options that can be passed to Launch()
Serge Bazanski66e58952021-10-05 17:06:56 +020054type NodeOptions struct {
Leopoldaf5086b2023-01-15 14:12:42 +010055 // Name is a human-readable identifier to be used in debug output.
56 Name string
57
Serge Bazanski66e58952021-10-05 17:06:56 +020058 // Ports contains the port mapping where to expose the internal ports of the VM to
59 // the host. See IdentityPortMap() and ConflictFreePortMap(). Ignored when
60 // ConnectToSocket is set.
61 Ports launch.PortMap
62
Leopold20a036e2023-01-15 00:17:19 +010063 // If set to true, reboots are honored. Otherwise, all reboots exit the Launch()
64 // command. Metropolis nodes generally restart on almost all errors, so unless you
Serge Bazanski66e58952021-10-05 17:06:56 +020065 // want to test reboot behavior this should be false.
66 AllowReboot bool
67
Leopold20a036e2023-01-15 00:17:19 +010068 // By default, the VM is connected to the Host via SLIRP. If ConnectToSocket is
69 // set, it is instead connected to the given file descriptor/socket. If this is
70 // set, all port maps from the Ports option are ignored. Intended for networking
71 // this instance together with others for running more complex network
72 // configurations.
Serge Bazanski66e58952021-10-05 17:06:56 +020073 ConnectToSocket *os.File
74
Leopoldacfad5b2023-01-15 14:05:25 +010075 // When PcapDump is set, all traffic is dumped to a pcap file in the
76 // runtime directory (e.g. "net0.pcap" for the first interface).
77 PcapDump bool
78
Leopold20a036e2023-01-15 00:17:19 +010079 // SerialPort is an io.ReadWriter over which you can communicate with the serial
80 // port of the machine. It can be set to an existing file descriptor (like
Serge Bazanski66e58952021-10-05 17:06:56 +020081 // os.Stdout/os.Stderr) or any Go structure implementing this interface.
82 SerialPort io.ReadWriter
83
84 // NodeParameters is passed into the VM and subsequently used for bootstrapping or
85 // registering into a cluster.
86 NodeParameters *apb.NodeParameters
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020087
88 // Mac is the node's MAC address.
89 Mac *net.HardwareAddr
90
91 // Runtime keeps the node's QEMU runtime state.
92 Runtime *NodeRuntime
93}
94
Leopold20a036e2023-01-15 00:17:19 +010095// NodeRuntime keeps the node's QEMU runtime options.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +020096type NodeRuntime struct {
97 // ld points at the node's launch directory storing data such as storage
98 // images, firmware variables or the TPM state.
99 ld string
100 // sd points at the node's socket directory.
101 sd string
102
103 // ctxT is the context QEMU will execute in.
104 ctxT context.Context
105 // CtxC is the QEMU context's cancellation function.
106 CtxC context.CancelFunc
Serge Bazanski66e58952021-10-05 17:06:56 +0200107}
108
109// NodePorts is the list of ports a fully operational Metropolis node listens on
Serge Bazanski52304a82021-10-29 16:56:18 +0200110var NodePorts = []node.Port{
Serge Bazanski66e58952021-10-05 17:06:56 +0200111 node.ConsensusPort,
112
113 node.CuratorServicePort,
114 node.DebugServicePort,
115
116 node.KubernetesAPIPort,
Lorenz Bruncc078df2021-12-23 11:51:55 +0100117 node.KubernetesAPIWrappedPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200118 node.CuratorServicePort,
119 node.DebuggerPort,
Tim Windelschmidtbe25a3b2023-07-19 16:31:56 +0200120 node.MetricsPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200121}
122
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200123// setupRuntime creates the node's QEMU runtime directory, together with all
124// files required to preserve its state, a level below the chosen path ld. The
125// node's socket directory is similarily created a level below sd. It may
126// return an I/O error.
127func setupRuntime(ld, sd string) (*NodeRuntime, error) {
128 // Create a temporary directory to keep all the runtime files.
129 stdp, err := os.MkdirTemp(ld, "node_state*")
130 if err != nil {
131 return nil, fmt.Errorf("failed to create the state directory: %w", err)
132 }
133
134 // Initialize the node's storage with a prebuilt image.
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100135 si, err := runfiles.Rlocation("_main/metropolis/node/image.img")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200136 if err != nil {
137 return nil, fmt.Errorf("while resolving a path: %w", err)
138 }
139 di := filepath.Join(stdp, filepath.Base(si))
Serge Bazanski05f813b2023-03-16 17:58:39 +0100140 launch.Log("Cluster: copying node image: %s -> %s", si, di)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200141 if err := copyFile(si, di); err != nil {
142 return nil, fmt.Errorf("while copying the node image: %w", err)
143 }
144
145 // Initialize the OVMF firmware variables file.
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100146 sv, err := runfiles.Rlocation("edk2/OVMF_VARS.fd")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200147 if err != nil {
148 return nil, fmt.Errorf("while resolving a path: %w", err)
149 }
150 dv := filepath.Join(stdp, filepath.Base(sv))
151 if err := copyFile(sv, dv); err != nil {
152 return nil, fmt.Errorf("while copying firmware variables: %w", err)
153 }
154
155 // Create the TPM state directory and initialize all files required by swtpm.
156 tpmt := filepath.Join(stdp, "tpm")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200157 if err := os.Mkdir(tpmt, 0o755); err != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200158 return nil, fmt.Errorf("while creating the TPM directory: %w", err)
159 }
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100160 tpms, err := runfiles.Rlocation("_main/metropolis/node/tpm")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200161 if err != nil {
162 return nil, fmt.Errorf("while resolving a path: %w", err)
163 }
164 tpmf, err := os.ReadDir(tpms)
165 if err != nil {
166 return nil, fmt.Errorf("failed to read TPM directory: %w", err)
167 }
168 for _, file := range tpmf {
169 name := file.Name()
Tim Windelschmidt2a1d1b22024-02-06 07:07:42 +0100170 src, err := runfiles.Rlocation(filepath.Join(tpms, name))
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200171 if err != nil {
172 return nil, fmt.Errorf("while resolving a path: %w", err)
173 }
174 tgt := filepath.Join(tpmt, name)
175 if err := copyFile(src, tgt); err != nil {
176 return nil, fmt.Errorf("while copying TPM state: file %q to %q: %w", src, tgt, err)
177 }
178 }
179
180 // Create the socket directory.
181 sotdp, err := os.MkdirTemp(sd, "node_sock*")
182 if err != nil {
183 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
184 }
185
186 return &NodeRuntime{
187 ld: stdp,
188 sd: sotdp,
189 }, nil
190}
191
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200192// CuratorClient returns an authenticated owner connection to a Curator
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200193// instance within Cluster c, or nil together with an error.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200194func (c *Cluster) CuratorClient() (*grpc.ClientConn, error) {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200195 if c.authClient == nil {
Serge Bazanski8535cb52023-03-29 14:15:08 +0200196 authCreds := rpc.NewAuthenticatedCredentials(c.Owner, rpc.WantInsecure())
Serge Bazanski58ddc092022-06-30 18:23:33 +0200197 r := resolver.New(c.ctxT, resolver.WithLogger(func(f string, args ...interface{}) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100198 launch.Log("Cluster: client resolver: %s", fmt.Sprintf(f, args...))
Serge Bazanski58ddc092022-06-30 18:23:33 +0200199 }))
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200200 for _, n := range c.NodeIDs {
201 ep, err := resolver.NodeWithDefaultPort(n)
202 if err != nil {
203 return nil, fmt.Errorf("could not add node %q by DNS: %v", n, err)
204 }
205 r.AddEndpoint(ep)
206 }
207 authClient, err := grpc.Dial(resolver.MetropolisControlAddress,
208 grpc.WithTransportCredentials(authCreds),
209 grpc.WithResolvers(r),
210 grpc.WithContextDialer(c.DialNode),
211 )
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200212 if err != nil {
213 return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
214 }
215 c.authClient = authClient
216 }
217 return c.authClient, nil
218}
219
Serge Bazanski66e58952021-10-05 17:06:56 +0200220// LaunchNode launches a single Metropolis node instance with the given options.
221// The instance runs mostly paravirtualized but with some emulated hardware
222// similar to how a cloud provider might set up its VMs. The disk is fully
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200223// writable, and the changes are kept across reboots and shutdowns. ld and sd
224// point to the launch directory and the socket directory, holding the nodes'
225// state files (storage, tpm state, firmware state), and UNIX socket files
226// (swtpm <-> QEMU interplay) respectively. The directories must exist before
227// LaunchNode is called. LaunchNode will update options.Runtime and options.Mac
228// if either are not initialized.
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200229func LaunchNode(ctx context.Context, ld, sd string, options *NodeOptions, doneC chan error) error {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200230 // TODO(mateusz@monogon.tech) try using QEMU's abstract socket namespace instead
231 // of /tmp (requires QEMU version >5.0).
Serge Bazanski66e58952021-10-05 17:06:56 +0200232 // https://github.com/qemu/qemu/commit/776b97d3605ed0fc94443048fdf988c7725e38a9).
233 // swtpm accepts already-open FDs so we can pass in an abstract socket namespace FD
234 // that we open and pass the name of it to QEMU. Not pinning this crashes both
235 // swtpm and qemu because we run into UNIX socket length limitations (for legacy
236 // reasons 108 chars).
Serge Bazanski66e58952021-10-05 17:06:56 +0200237
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200238 // If it's the node's first start, set up its runtime directories.
239 if options.Runtime == nil {
240 r, err := setupRuntime(ld, sd)
241 if err != nil {
242 return fmt.Errorf("while setting up node runtime: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200243 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200244 options.Runtime = r
Serge Bazanski66e58952021-10-05 17:06:56 +0200245 }
246
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200247 // Replace the node's context with a new one.
248 r := options.Runtime
249 if r.CtxC != nil {
250 r.CtxC()
251 }
252 r.ctxT, r.CtxC = context.WithCancel(ctx)
253
Serge Bazanski66e58952021-10-05 17:06:56 +0200254 var qemuNetType string
255 var qemuNetConfig launch.QemuValue
256 if options.ConnectToSocket != nil {
257 qemuNetType = "socket"
258 qemuNetConfig = launch.QemuValue{
259 "id": {"net0"},
260 "fd": {"3"},
261 }
262 } else {
263 qemuNetType = "user"
264 qemuNetConfig = launch.QemuValue{
265 "id": {"net0"},
266 "net": {"10.42.0.0/24"},
267 "dhcpstart": {"10.42.0.10"},
268 "hostfwd": options.Ports.ToQemuForwards(),
269 }
270 }
271
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200272 // Generate the node's MAC address if it isn't already set in NodeOptions.
273 if options.Mac == nil {
274 mac, err := generateRandomEthernetMAC()
275 if err != nil {
276 return err
277 }
278 options.Mac = mac
Serge Bazanski66e58952021-10-05 17:06:56 +0200279 }
280
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100281 ovmfCodePath, err := runfiles.Rlocation("edk2/OVMF_CODE.fd")
282 if err != nil {
283 return err
284 }
285
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200286 tpmSocketPath := filepath.Join(r.sd, "tpm-socket")
287 fwVarPath := filepath.Join(r.ld, "OVMF_VARS.fd")
Lorenz Brun1dc60af2023-10-03 15:40:09 +0200288 storagePath := filepath.Join(r.ld, "image.img")
Lorenz Brun150f24a2023-07-13 20:11:06 +0200289 qemuArgs := []string{
Serge Bazanski99b02142024-04-17 16:33:28 +0200290 "-machine", "q35", "-accel", "kvm", "-nographic", "-nodefaults", "-m", "2048",
Serge Bazanski66e58952021-10-05 17:06:56 +0200291 "-cpu", "host", "-smp", "sockets=1,cpus=1,cores=2,threads=2,maxcpus=4",
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100292 "-drive", "if=pflash,format=raw,readonly=on,file=" + ovmfCodePath,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200293 "-drive", "if=pflash,format=raw,file=" + fwVarPath,
294 "-drive", "if=virtio,format=raw,cache=unsafe,file=" + storagePath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200295 "-netdev", qemuNetConfig.ToOption(qemuNetType),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200296 "-device", "virtio-net-pci,netdev=net0,mac=" + options.Mac.String(),
Serge Bazanski66e58952021-10-05 17:06:56 +0200297 "-chardev", "socket,id=chrtpm,path=" + tpmSocketPath,
298 "-tpmdev", "emulator,id=tpm0,chardev=chrtpm",
299 "-device", "tpm-tis,tpmdev=tpm0",
300 "-device", "virtio-rng-pci",
Lorenz Brun150f24a2023-07-13 20:11:06 +0200301 "-serial", "stdio",
302 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200303
304 if !options.AllowReboot {
305 qemuArgs = append(qemuArgs, "-no-reboot")
306 }
307
308 if options.NodeParameters != nil {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200309 parametersPath := filepath.Join(r.ld, "parameters.pb")
Serge Bazanski66e58952021-10-05 17:06:56 +0200310 parametersRaw, err := proto.Marshal(options.NodeParameters)
311 if err != nil {
312 return fmt.Errorf("failed to encode node paraeters: %w", err)
313 }
Lorenz Brun150f24a2023-07-13 20:11:06 +0200314 if err := os.WriteFile(parametersPath, parametersRaw, 0o644); err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200315 return fmt.Errorf("failed to write node parameters: %w", err)
316 }
317 qemuArgs = append(qemuArgs, "-fw_cfg", "name=dev.monogon.metropolis/parameters.pb,file="+parametersPath)
318 }
319
Leopoldacfad5b2023-01-15 14:05:25 +0100320 if options.PcapDump {
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200321 qemuNetDump := launch.QemuValue{
322 "id": {"net0"},
323 "netdev": {"net0"},
324 "file": {filepath.Join(r.ld, "net0.pcap")},
Leopoldacfad5b2023-01-15 14:05:25 +0100325 }
326 qemuArgs = append(qemuArgs, "-object", qemuNetDump.ToOption("filter-dump"))
327 }
328
Serge Bazanski66e58952021-10-05 17:06:56 +0200329 // Start TPM emulator as a subprocess
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200330 tpmCtx, tpmCancel := context.WithCancel(options.Runtime.ctxT)
Serge Bazanski66e58952021-10-05 17:06:56 +0200331
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200332 tpmd := filepath.Join(r.ld, "tpm")
333 tpmEmuCmd := exec.CommandContext(tpmCtx, "swtpm", "socket", "--tpm2", "--tpmstate", "dir="+tpmd, "--ctrl", "type=unixio,path="+tpmSocketPath)
Serge Bazanski66e58952021-10-05 17:06:56 +0200334 tpmEmuCmd.Stderr = os.Stderr
335 tpmEmuCmd.Stdout = os.Stdout
336
Tim Windelschmidt244b5672024-02-06 10:18:56 +0100337 err = tpmEmuCmd.Start()
Serge Bazanski66e58952021-10-05 17:06:56 +0200338 if err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200339 tpmCancel()
Serge Bazanski66e58952021-10-05 17:06:56 +0200340 return fmt.Errorf("failed to start TPM emulator: %w", err)
341 }
342
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200343 // Wait for the socket to be created by the TPM emulator before launching
344 // QEMU.
345 for {
346 _, err := os.Stat(tpmSocketPath)
347 if err == nil {
348 break
349 }
Tim Windelschmidta7a82f32024-04-11 01:40:25 +0200350 if !os.IsNotExist(err) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200351 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200352 return fmt.Errorf("while stat-ing TPM socket path: %w", err)
353 }
354 if err := tpmCtx.Err(); err != nil {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200355 tpmCancel()
Mateusz Zalegae90f4a12022-05-25 18:24:01 +0200356 return fmt.Errorf("while waiting for the TPM socket: %w", err)
357 }
358 time.Sleep(time.Millisecond * 100)
359 }
360
Serge Bazanski66e58952021-10-05 17:06:56 +0200361 // Start the main qemu binary
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200362 systemCmd := exec.CommandContext(options.Runtime.ctxT, "qemu-system-x86_64", qemuArgs...)
Serge Bazanski66e58952021-10-05 17:06:56 +0200363 if options.ConnectToSocket != nil {
364 systemCmd.ExtraFiles = []*os.File{options.ConnectToSocket}
365 }
366
367 var stdErrBuf bytes.Buffer
368 systemCmd.Stderr = &stdErrBuf
369 systemCmd.Stdout = options.SerialPort
370
Leopoldaf5086b2023-01-15 14:12:42 +0100371 launch.PrettyPrintQemuArgs(options.Name, systemCmd.Args)
372
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200373 go func() {
374 launch.Log("Node: Starting...")
375 err = systemCmd.Run()
376 launch.Log("Node: Returned: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200377
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200378 // Stop TPM emulator and wait for it to exit to properly reap the child process
379 tpmCancel()
380 launch.Log("Node: Waiting for TPM emulator to exit")
381 // Wait returns a SIGKILL error because we just cancelled its context.
382 // We still need to call it to avoid creating zombies.
383 errTpm := tpmEmuCmd.Wait()
384 launch.Log("Node: TPM emulator done: %v", errTpm)
Serge Bazanski66e58952021-10-05 17:06:56 +0200385
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200386 var exerr *exec.ExitError
387 if err != nil && errors.As(err, &exerr) {
388 status := exerr.ProcessState.Sys().(syscall.WaitStatus)
389 if status.Signaled() && status.Signal() == syscall.SIGKILL {
390 // Process was killed externally (most likely by our context being canceled).
391 // This is a normal exit for us, so return nil
392 doneC <- nil
393 return
394 }
395 exerr.Stderr = stdErrBuf.Bytes()
396 newErr := launch.QEMUError(*exerr)
397 launch.Log("Node: %q", stdErrBuf.String())
398 doneC <- &newErr
399 return
Serge Bazanski66e58952021-10-05 17:06:56 +0200400 }
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200401 doneC <- err
402 }()
403 return nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200404}
405
406func copyFile(src, dst string) error {
407 in, err := os.Open(src)
408 if err != nil {
409 return fmt.Errorf("when opening source: %w", err)
410 }
411 defer in.Close()
412
413 out, err := os.Create(dst)
414 if err != nil {
415 return fmt.Errorf("when creating destination: %w", err)
416 }
417 defer out.Close()
418
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100419 endPos, err := in.Seek(0, io.SeekEnd)
Serge Bazanski66e58952021-10-05 17:06:56 +0200420 if err != nil {
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100421 return fmt.Errorf("when getting source end: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200422 }
Lorenz Brun87bbf7e2024-03-18 18:22:25 +0100423
424 // Copy the file while preserving its sparseness. The image files are very
425 // sparse (less than 10% allocated), so this is a lot faster.
426 var lastHoleStart int64
427 for {
428 dataStart, err := in.Seek(lastHoleStart, unix.SEEK_DATA)
429 if err != nil {
430 return fmt.Errorf("when seeking to next data block: %w", err)
431 }
432 holeStart, err := in.Seek(dataStart, unix.SEEK_HOLE)
433 if err != nil {
434 return fmt.Errorf("when seeking to next hole: %w", err)
435 }
436 lastHoleStart = holeStart
437 if _, err := in.Seek(dataStart, io.SeekStart); err != nil {
438 return fmt.Errorf("when seeking to current data block: %w", err)
439 }
440 if _, err := out.Seek(dataStart, io.SeekStart); err != nil {
441 return fmt.Errorf("when seeking output to next data block: %w", err)
442 }
443 if _, err := io.CopyN(out, in, holeStart-dataStart); err != nil {
444 return fmt.Errorf("when copying file: %w", err)
445 }
446 if endPos == holeStart {
447 // The next hole is at the end of the file, we're done here.
448 break
449 }
450 }
451
Serge Bazanski66e58952021-10-05 17:06:56 +0200452 return out.Close()
453}
454
Serge Bazanskie78a0892021-10-07 17:03:49 +0200455// getNodes wraps around Management.GetNodes to return a list of nodes in a
456// cluster.
457func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200458 var res []*apb.Node
Serge Bazanski636032e2022-01-26 14:21:33 +0100459 bo := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
Serge Bazanski075465c2021-11-16 15:38:49 +0100460 err := backoff.Retry(func() error {
461 res = nil
462 srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
Serge Bazanskie78a0892021-10-07 17:03:49 +0200463 if err != nil {
Serge Bazanski075465c2021-11-16 15:38:49 +0100464 return fmt.Errorf("GetNodes: %w", err)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200465 }
Serge Bazanski075465c2021-11-16 15:38:49 +0100466 for {
467 node, err := srvN.Recv()
468 if err == io.EOF {
469 break
470 }
471 if err != nil {
472 return fmt.Errorf("GetNodes.Recv: %w", err)
473 }
474 res = append(res, node)
475 }
476 return nil
477 }, bo)
478 if err != nil {
479 return nil, err
Serge Bazanskie78a0892021-10-07 17:03:49 +0200480 }
481 return res, nil
482}
483
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200484// getNode wraps Management.GetNodes. It returns node information matching
485// given node ID.
486func getNode(ctx context.Context, mgmt apb.ManagementClient, id string) (*apb.Node, error) {
487 nodes, err := getNodes(ctx, mgmt)
488 if err != nil {
489 return nil, fmt.Errorf("could not get nodes: %w", err)
490 }
491 for _, n := range nodes {
492 eid := identity.NodeID(n.Pubkey)
493 if eid != id {
494 continue
495 }
496 return n, nil
497 }
Tim Windelschmidt73e98822024-04-18 23:13:49 +0200498 return nil, fmt.Errorf("no such node")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200499}
500
Serge Bazanski66e58952021-10-05 17:06:56 +0200501// Gets a random EUI-48 Ethernet MAC address
502func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
503 macBuf := make([]byte, 6)
504 _, err := rand.Read(macBuf)
505 if err != nil {
506 return nil, fmt.Errorf("failed to read randomness for MAC: %v", err)
507 }
508
509 // Set U/L bit and clear I/G bit (locally administered individual MAC)
510 // Ref IEEE 802-2014 Section 8.2.2
511 macBuf[0] = (macBuf[0] | 2) & 0xfe
512 mac := net.HardwareAddr(macBuf)
513 return &mac, nil
514}
515
Serge Bazanskibe742842022-04-04 13:18:50 +0200516const SOCKSPort uint16 = 1080
Serge Bazanski66e58952021-10-05 17:06:56 +0200517
Serge Bazanskibe742842022-04-04 13:18:50 +0200518// ClusterPorts contains all ports handled by Nanoswitch.
519var ClusterPorts = []uint16{
520 // Forwarded to the first node.
521 uint16(node.CuratorServicePort),
522 uint16(node.DebugServicePort),
523 uint16(node.KubernetesAPIPort),
524 uint16(node.KubernetesAPIWrappedPort),
525
526 // SOCKS proxy to the switch network
527 SOCKSPort,
Serge Bazanski66e58952021-10-05 17:06:56 +0200528}
529
530// ClusterOptions contains all options for launching a Metropolis cluster.
531type ClusterOptions struct {
532 // The number of nodes this cluster should be started with.
533 NumNodes int
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100534
535 // If true, node logs will be saved to individual files instead of being printed
536 // out to stderr. The path of these files will be still printed to stdout.
537 //
538 // The files will be located within the launch directory inside TEST_TMPDIR (or
539 // the default tempdir location, if not set).
540 NodeLogsToFiles bool
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200541
542 // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
543 // bootstrapping them. The nodes' address information in Cluster.Nodes will be
544 // incomplete.
545 LeaveNodesNew bool
Lorenz Brun150f24a2023-07-13 20:11:06 +0200546
547 // Optional local registry which will be made available to the cluster to
548 // pull images from. This is a more efficient alternative to preseeding all
549 // images used for testing.
550 LocalRegistry *localregistry.Server
Serge Bazanskie564f172024-04-03 12:06:06 +0200551
552 // InitialClusterConfiguration will be passed to the first node when creating the
553 // cluster, and defines some basic properties of the cluster. If not specified,
554 // the cluster will default to defaults as defined in
555 // metropolis.proto.api.NodeParameters.
556 InitialClusterConfiguration *cpb.ClusterConfiguration
Serge Bazanski66e58952021-10-05 17:06:56 +0200557}
558
559// Cluster is the running Metropolis cluster launched using the LaunchCluster
560// function.
561type Cluster struct {
Serge Bazanski66e58952021-10-05 17:06:56 +0200562 // Owner is the TLS Certificate of the owner of the test cluster. This can be
563 // used to authenticate further clients to the running cluster.
564 Owner tls.Certificate
565 // Ports is the PortMap used to access the first nodes' services (defined in
Serge Bazanskibe742842022-04-04 13:18:50 +0200566 // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Serge Bazanski66e58952021-10-05 17:06:56 +0200567 Ports launch.PortMap
568
Serge Bazanskibe742842022-04-04 13:18:50 +0200569 // Nodes is a map from Node ID to its runtime information.
570 Nodes map[string]*NodeInCluster
571 // NodeIDs is a list of node IDs that are backing this cluster, in order of
572 // creation.
573 NodeIDs []string
574
Serge Bazanski54e212a2023-06-14 13:45:11 +0200575 // CACertificate is the cluster's CA certificate.
576 CACertificate *x509.Certificate
577
Serge Bazanski66e58952021-10-05 17:06:56 +0200578 // nodesDone is a list of channels populated with the return codes from all the
579 // nodes' qemu instances. It's used by Close to ensure all nodes have
Leopold20a036e2023-01-15 00:17:19 +0100580 // successfully been stopped.
Serge Bazanski66e58952021-10-05 17:06:56 +0200581 nodesDone []chan error
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200582 // nodeOpts are the cluster member nodes' mutable launch options, kept here
583 // to facilitate reboots.
584 nodeOpts []NodeOptions
585 // launchDir points at the directory keeping the nodes' state, such as storage
586 // images, firmware variable files, TPM state.
587 launchDir string
588 // socketDir points at the directory keeping UNIX socket files, such as these
589 // used to facilitate communication between QEMU and swtpm. It's different
590 // from launchDir, and anchored nearer the file system root, due to the
591 // socket path length limitation imposed by the kernel.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100592 socketDir string
593 metroctlDir string
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200594
Lorenz Brun276a7462023-07-12 21:28:54 +0200595 // SOCKSDialer is used by DialNode to establish connections to nodes via the
Serge Bazanskibe742842022-04-04 13:18:50 +0200596 // SOCKS server ran by nanoswitch.
Lorenz Brun276a7462023-07-12 21:28:54 +0200597 SOCKSDialer proxy.Dialer
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200598
599 // authClient is a cached authenticated owner connection to a Curator
600 // instance within the cluster.
601 authClient *grpc.ClientConn
602
603 // ctxT is the context individual node contexts are created from.
604 ctxT context.Context
605 // ctxC is used by Close to cancel the context under which the nodes are
606 // running.
607 ctxC context.CancelFunc
Serge Bazanskibe742842022-04-04 13:18:50 +0200608}
609
610// NodeInCluster represents information about a node that's part of a Cluster.
611type NodeInCluster struct {
612 // ID of the node, which can be used to dial this node's services via DialNode.
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200613 ID string
614 Pubkey []byte
Serge Bazanskibe742842022-04-04 13:18:50 +0200615 // Address of the node on the network ran by nanoswitch. Not reachable from the
616 // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
617 // on Cluster.Ports[SOCKSPort]).
618 ManagementAddress string
619}
620
621// firstConnection performs the initial owner credential escrow with a newly
622// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
623// running at 10.1.0.2, which is always the case with the current nanoswitch
624// implementation.
625//
Leopold20a036e2023-01-15 00:17:19 +0100626// It returns the newly escrowed credentials as well as the first node's
Serge Bazanskibe742842022-04-04 13:18:50 +0200627// information as NodeInCluster.
628func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
629 // Dial external service.
630 remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
Serge Bazanski0c280152024-02-05 14:33:19 +0100631 initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200632 if err != nil {
633 return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
634 }
635 initDialer := func(_ context.Context, addr string) (net.Conn, error) {
636 return socksDialer.Dial("tcp", addr)
637 }
638 initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
639 if err != nil {
640 return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
641 }
642 defer initClient.Close()
643
644 // Retrieve owner certificate - this can take a while because the node is still
645 // coming up, so do it in a backoff loop.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100646 launch.Log("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
Serge Bazanskibe742842022-04-04 13:18:50 +0200647 aaa := apb.NewAAAClient(initClient)
648 var cert *tls.Certificate
649 err = backoff.Retry(func() error {
650 cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
651 if st, ok := status.FromError(err); ok {
652 if st.Code() == codes.Unavailable {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100653 launch.Log("Cluster: cluster UNAVAILABLE: %v", st.Message())
Serge Bazanskibe742842022-04-04 13:18:50 +0200654 return err
655 }
656 }
657 return backoff.Permanent(err)
658 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
659 if err != nil {
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200660 return nil, nil, fmt.Errorf("couldn't retrieve owner certificate: %w", err)
Serge Bazanskibe742842022-04-04 13:18:50 +0200661 }
Serge Bazanski05f813b2023-03-16 17:58:39 +0100662 launch.Log("Cluster: retrieved owner certificate.")
Serge Bazanskibe742842022-04-04 13:18:50 +0200663
664 // Now connect authenticated and get the node ID.
Serge Bazanski8535cb52023-03-29 14:15:08 +0200665 creds := rpc.NewAuthenticatedCredentials(*cert, rpc.WantInsecure())
Serge Bazanskibe742842022-04-04 13:18:50 +0200666 authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
667 if err != nil {
668 return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
669 }
670 defer authClient.Close()
671 mgmt := apb.NewManagementClient(authClient)
672
673 var node *NodeInCluster
674 err = backoff.Retry(func() error {
675 nodes, err := getNodes(ctx, mgmt)
676 if err != nil {
677 return fmt.Errorf("retrieving nodes failed: %w", err)
678 }
679 if len(nodes) != 1 {
680 return fmt.Errorf("expected one node, got %d", len(nodes))
681 }
682 n := nodes[0]
683 if n.Status == nil || n.Status.ExternalAddress == "" {
684 return fmt.Errorf("node has no status and/or address")
685 }
686 node = &NodeInCluster{
687 ID: identity.NodeID(n.Pubkey),
688 ManagementAddress: n.Status.ExternalAddress,
689 }
690 return nil
691 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
692 if err != nil {
693 return nil, nil, err
694 }
695
696 return cert, node, nil
Serge Bazanski66e58952021-10-05 17:06:56 +0200697}
698
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100699func NewSerialFileLogger(p string) (io.ReadWriter, error) {
Lorenz Brun150f24a2023-07-13 20:11:06 +0200700 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0o600)
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100701 if err != nil {
702 return nil, err
703 }
704 return f, nil
705}
706
Serge Bazanski66e58952021-10-05 17:06:56 +0200707// LaunchCluster launches a cluster of Metropolis node VMs together with a
708// Nanoswitch instance to network them all together.
709//
710// The given context will be used to run all qemu instances in the cluster, and
711// canceling the context or calling Close() will terminate them.
712func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200713 if opts.NumNodes <= 0 {
Serge Bazanski66e58952021-10-05 17:06:56 +0200714 return nil, errors.New("refusing to start cluster with zero nodes")
715 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200716
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200717 // Create the launch directory.
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100718 ld, err := os.MkdirTemp(os.Getenv("TEST_TMPDIR"), "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200719 if err != nil {
720 return nil, fmt.Errorf("failed to create the launch directory: %w", err)
721 }
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100722 // Create the metroctl config directory. We keep it in /tmp because in some
723 // scenarios it's end-user visible and we want it short.
724 md, err := os.MkdirTemp("/tmp", "metroctl-*")
725 if err != nil {
726 return nil, fmt.Errorf("failed to create the metroctl directory: %w", err)
727 }
728
729 // Create the socket directory. We keep it in /tmp because of socket path limits.
730 sd, err := os.MkdirTemp("/tmp", "cluster-*")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200731 if err != nil {
732 return nil, fmt.Errorf("failed to create the socket directory: %w", err)
733 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200734
735 // Prepare links between nodes and nanoswitch.
736 var switchPorts []*os.File
737 var vmPorts []*os.File
738 for i := 0; i < opts.NumNodes; i++ {
739 switchPort, vmPort, err := launch.NewSocketPair()
740 if err != nil {
Serge Bazanski66e58952021-10-05 17:06:56 +0200741 return nil, fmt.Errorf("failed to get socketpair: %w", err)
742 }
743 switchPorts = append(switchPorts, switchPort)
744 vmPorts = append(vmPorts, vmPort)
745 }
746
Serge Bazanskie78a0892021-10-07 17:03:49 +0200747 // Make a list of channels that will be populated by all running node qemu
748 // processes.
Serge Bazanski66e58952021-10-05 17:06:56 +0200749 done := make([]chan error, opts.NumNodes)
Lorenz Brun150f24a2023-07-13 20:11:06 +0200750 for i := range done {
Serge Bazanski66e58952021-10-05 17:06:56 +0200751 done[i] = make(chan error, 1)
752 }
753
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200754 // Prepare the node options. These will be kept as part of Cluster.
755 // nodeOpts[].Runtime will be initialized by LaunchNode during the first
756 // launch. The runtime information can be later used to restart a node.
757 // The 0th node will be initialized first. The rest will follow after it
758 // had bootstrapped the cluster.
759 nodeOpts := make([]NodeOptions, opts.NumNodes)
760 nodeOpts[0] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100761 Name: "node0",
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200762 ConnectToSocket: vmPorts[0],
763 NodeParameters: &apb.NodeParameters{
764 Cluster: &apb.NodeParameters_ClusterBootstrap_{
765 ClusterBootstrap: &apb.NodeParameters_ClusterBootstrap{
Serge Bazanskie564f172024-04-03 12:06:06 +0200766 OwnerPublicKey: InsecurePublicKey,
767 InitialClusterConfiguration: opts.InitialClusterConfiguration,
Serge Bazanski66e58952021-10-05 17:06:56 +0200768 },
769 },
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200770 },
771 SerialPort: newPrefixedStdio(0),
Leopoldacfad5b2023-01-15 14:05:25 +0100772 PcapDump: true,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200773 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100774 if opts.NodeLogsToFiles {
775 path := path.Join(ld, "node-1.txt")
776 port, err := NewSerialFileLogger(path)
777 if err != nil {
778 return nil, fmt.Errorf("could not open log file for node 1: %w", err)
779 }
780 launch.Log("Node 1 logs at %s", path)
781 nodeOpts[0].SerialPort = port
782 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200783
784 // Start the first node.
785 ctxT, ctxC := context.WithCancel(ctx)
Serge Bazanski05f813b2023-03-16 17:58:39 +0100786 launch.Log("Cluster: Starting node %d...", 1)
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200787 if err := LaunchNode(ctxT, ld, sd, &nodeOpts[0], done[0]); err != nil {
788 ctxC()
789 return nil, fmt.Errorf("failed to launch first node: %w", err)
790 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200791
Lorenz Brun150f24a2023-07-13 20:11:06 +0200792 localRegistryAddr := net.TCPAddr{
793 IP: net.IPv4(10, 42, 0, 82),
794 Port: 5000,
795 }
796
797 var guestSvcMap launch.GuestServiceMap
798 if opts.LocalRegistry != nil {
799 l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
800 if err != nil {
801 ctxC()
802 return nil, fmt.Errorf("failed to create TCP listener for local registry: %w", err)
803 }
804 s := http.Server{
805 Handler: opts.LocalRegistry,
806 }
807 go s.Serve(l)
808 go func() {
809 <-ctxT.Done()
810 s.Close()
811 }()
812 guestSvcMap = launch.GuestServiceMap{
813 &localRegistryAddr: *l.Addr().(*net.TCPAddr),
814 }
815 }
816
Serge Bazanskie78a0892021-10-07 17:03:49 +0200817 // Launch nanoswitch.
Serge Bazanski66e58952021-10-05 17:06:56 +0200818 portMap, err := launch.ConflictFreePortMap(ClusterPorts)
819 if err != nil {
820 ctxC()
821 return nil, fmt.Errorf("failed to allocate ephemeral ports: %w", err)
822 }
823
824 go func() {
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100825 var serialPort io.ReadWriter
826 if opts.NodeLogsToFiles {
827 path := path.Join(ld, "nanoswitch.txt")
828 serialPort, err = NewSerialFileLogger(path)
829 if err != nil {
830 launch.Log("Could not open log file for nanoswitch: %v", err)
831 }
832 launch.Log("Nanoswitch logs at %s", path)
833 } else {
834 serialPort = newPrefixedStdio(99)
835 }
Serge Bazanskie84726b2024-04-17 16:32:32 +0200836 kernelPath, err := runfiles.Rlocation("_main/metropolis/test/ktest/vmlinux")
837 if err != nil {
838 launch.Fatal("Failed to resolved nanoswitch kernel: %v", err)
839 }
840 initramfsPath, err := runfiles.Rlocation("_main/metropolis/test/nanoswitch/initramfs.cpio.zst")
841 if err != nil {
842 launch.Fatal("Failed to resolved nanoswitch initramfs: %v", err)
843 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200844 if err := launch.RunMicroVM(ctxT, &launch.MicroVMOptions{
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100845 Name: "nanoswitch",
Serge Bazanskie84726b2024-04-17 16:32:32 +0200846 KernelPath: kernelPath,
847 InitramfsPath: initramfsPath,
Serge Bazanski66e58952021-10-05 17:06:56 +0200848 ExtraNetworkInterfaces: switchPorts,
849 PortMap: portMap,
Lorenz Brun150f24a2023-07-13 20:11:06 +0200850 GuestServiceMap: guestSvcMap,
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100851 SerialPort: serialPort,
Leopoldacfad5b2023-01-15 14:05:25 +0100852 PcapDump: path.Join(ld, "nanoswitch.pcap"),
Serge Bazanski66e58952021-10-05 17:06:56 +0200853 }); err != nil {
854 if !errors.Is(err, ctxT.Err()) {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100855 launch.Fatal("Failed to launch nanoswitch: %v", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200856 }
857 }
858 }()
859
Serge Bazanskibe742842022-04-04 13:18:50 +0200860 // Build SOCKS dialer.
861 socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
862 socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
Serge Bazanski66e58952021-10-05 17:06:56 +0200863 if err != nil {
864 ctxC()
Serge Bazanskibe742842022-04-04 13:18:50 +0200865 return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200866 }
867
Serge Bazanskibe742842022-04-04 13:18:50 +0200868 // Retrieve owner credentials and first node.
869 cert, firstNode, err := firstConnection(ctxT, socksDialer)
Serge Bazanski66e58952021-10-05 17:06:56 +0200870 if err != nil {
871 ctxC()
872 return nil, err
873 }
Serge Bazanski66e58952021-10-05 17:06:56 +0200874
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100875 // Write credentials to the metroctl directory.
876 if err := metroctl.WriteOwnerKey(md, cert.PrivateKey.(ed25519.PrivateKey)); err != nil {
877 ctxC()
878 return nil, fmt.Errorf("could not write owner key: %w", err)
879 }
880 if err := metroctl.WriteOwnerCertificate(md, cert.Certificate[0]); err != nil {
881 ctxC()
882 return nil, fmt.Errorf("could not write owner certificate: %w", err)
883 }
884
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200885 // Set up a partially initialized cluster instance, to be filled in in the
886 // later steps.
Serge Bazanskibe742842022-04-04 13:18:50 +0200887 cluster := &Cluster{
888 Owner: *cert,
889 Ports: portMap,
890 Nodes: map[string]*NodeInCluster{
891 firstNode.ID: firstNode,
892 },
893 NodeIDs: []string{
894 firstNode.ID,
895 },
896
Serge Bazanski1f8cad72023-03-20 16:58:10 +0100897 nodesDone: done,
898 nodeOpts: nodeOpts,
899 launchDir: ld,
900 socketDir: sd,
901 metroctlDir: md,
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200902
Lorenz Brun276a7462023-07-12 21:28:54 +0200903 SOCKSDialer: socksDialer,
Serge Bazanskibe742842022-04-04 13:18:50 +0200904
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200905 ctxT: ctxT,
Serge Bazanskibe742842022-04-04 13:18:50 +0200906 ctxC: ctxC,
907 }
908
909 // Now start the rest of the nodes and register them into the cluster.
910
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200911 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200912 curC, err := cluster.CuratorClient()
Serge Bazanski66e58952021-10-05 17:06:56 +0200913 if err != nil {
914 ctxC()
Serge Bazanski5bb8a332022-06-23 17:41:33 +0200915 return nil, fmt.Errorf("CuratorClient: %w", err)
Serge Bazanski66e58952021-10-05 17:06:56 +0200916 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200917 mgmt := apb.NewManagementClient(curC)
Serge Bazanskie78a0892021-10-07 17:03:49 +0200918
919 // Retrieve register ticket to register further nodes.
Serge Bazanski05f813b2023-03-16 17:58:39 +0100920 launch.Log("Cluster: retrieving register ticket...")
Serge Bazanskie78a0892021-10-07 17:03:49 +0200921 resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
922 if err != nil {
923 ctxC()
924 return nil, fmt.Errorf("GetRegisterTicket: %w", err)
925 }
926 ticket := resT.Ticket
Serge Bazanski05f813b2023-03-16 17:58:39 +0100927 launch.Log("Cluster: retrieved register ticket (%d bytes).", len(ticket))
Serge Bazanskie78a0892021-10-07 17:03:49 +0200928
929 // Retrieve cluster info (for directory and ca public key) to register further
930 // nodes.
931 resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
932 if err != nil {
933 ctxC()
934 return nil, fmt.Errorf("GetClusterInfo: %w", err)
935 }
Serge Bazanski54e212a2023-06-14 13:45:11 +0200936 caCert, err := x509.ParseCertificate(resI.CaCertificate)
937 if err != nil {
938 ctxC()
939 return nil, fmt.Errorf("ParseCertificate: %w", err)
940 }
941 cluster.CACertificate = caCert
Serge Bazanskie78a0892021-10-07 17:03:49 +0200942
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200943 // Use the retrieved information to configure the rest of the node options.
944 for i := 1; i < opts.NumNodes; i++ {
945 nodeOpts[i] = NodeOptions{
Leopoldaf5086b2023-01-15 14:12:42 +0100946 Name: fmt.Sprintf("node%d", i),
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200947 ConnectToSocket: vmPorts[i],
948 NodeParameters: &apb.NodeParameters{
949 Cluster: &apb.NodeParameters_ClusterRegister_{
950 ClusterRegister: &apb.NodeParameters_ClusterRegister{
951 RegisterTicket: ticket,
952 ClusterDirectory: resI.ClusterDirectory,
953 CaCertificate: resI.CaCertificate,
954 },
955 },
956 },
957 SerialPort: newPrefixedStdio(i),
958 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +0100959 if opts.NodeLogsToFiles {
960 path := path.Join(ld, fmt.Sprintf("node-%d.txt", i+1))
961 port, err := NewSerialFileLogger(path)
962 if err != nil {
963 return nil, fmt.Errorf("could not open log file for node %d: %w", i+1, err)
964 }
965 launch.Log("Node %d logs at %s", i+1, path)
966 nodeOpts[i].SerialPort = port
967 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +0200968 }
969
970 // Now run the rest of the nodes.
Serge Bazanskie78a0892021-10-07 17:03:49 +0200971 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanski05f813b2023-03-16 17:58:39 +0100972 launch.Log("Cluster: Starting node %d...", i+1)
Serge Bazanskiee8c81b2024-04-03 11:59:38 +0200973 err := LaunchNode(ctxT, ld, sd, &nodeOpts[i], done[i])
974 if err != nil {
975 return nil, fmt.Errorf("failed to launch node %d: %w", i+1, err)
976 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200977 }
Serge Bazanskie78a0892021-10-07 17:03:49 +0200978
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200979 seenNodes := make(map[string]bool)
980 launch.Log("Cluster: waiting for nodes to appear as NEW...")
981 for i := 1; i < opts.NumNodes; i++ {
Serge Bazanskie78a0892021-10-07 17:03:49 +0200982 for {
983 nodes, err := getNodes(ctx, mgmt)
984 if err != nil {
985 ctxC()
986 return nil, fmt.Errorf("could not get nodes: %w", err)
987 }
988 for _, n := range nodes {
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200989 if n.State != cpb.NodeState_NODE_STATE_NEW {
990 continue
Serge Bazanskie78a0892021-10-07 17:03:49 +0200991 }
Serge Bazanski87d9c592024-03-20 12:35:11 +0100992 if seenNodes[n.Id] {
993 continue
994 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +0200995 seenNodes[n.Id] = true
996 cluster.Nodes[n.Id] = &NodeInCluster{
997 ID: n.Id,
998 Pubkey: n.Pubkey,
999 }
1000 cluster.NodeIDs = append(cluster.NodeIDs, n.Id)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001001 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001002
1003 if len(seenNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001004 break
1005 }
1006 time.Sleep(1 * time.Second)
1007 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001008 }
1009 launch.Log("Found all expected nodes")
Serge Bazanskie78a0892021-10-07 17:03:49 +02001010
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001011 approvedNodes := make(map[string]bool)
1012 upNodes := make(map[string]bool)
1013 if !opts.LeaveNodesNew {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001014 for {
1015 nodes, err := getNodes(ctx, mgmt)
1016 if err != nil {
1017 ctxC()
1018 return nil, fmt.Errorf("could not get nodes: %w", err)
1019 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001020 for _, node := range nodes {
1021 if !seenNodes[node.Id] {
1022 // Skip nodes that weren't NEW in the previous step.
Serge Bazanskie78a0892021-10-07 17:03:49 +02001023 continue
1024 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001025
1026 if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
1027 launch.Log("Cluster: node %s is up", node.Id)
1028 upNodes[node.Id] = true
1029 cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
Serge Bazanskie78a0892021-10-07 17:03:49 +02001030 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001031 if upNodes[node.Id] {
1032 continue
Serge Bazanskibe742842022-04-04 13:18:50 +02001033 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001034
1035 if !approvedNodes[node.Id] {
1036 launch.Log("Cluster: approving node %s", node.Id)
1037 _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1038 Pubkey: node.Pubkey,
1039 })
1040 if err != nil {
1041 ctxC()
1042 return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
1043 }
1044 approvedNodes[node.Id] = true
Serge Bazanskibe742842022-04-04 13:18:50 +02001045 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001046 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001047
1048 launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
1049 if len(upNodes) == opts.NumNodes-1 {
Serge Bazanskie78a0892021-10-07 17:03:49 +02001050 break
1051 }
Serge Bazanskibe742842022-04-04 13:18:50 +02001052 time.Sleep(time.Second)
Serge Bazanskie78a0892021-10-07 17:03:49 +02001053 }
Serge Bazanskie78a0892021-10-07 17:03:49 +02001054 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001055
Serge Bazanski05f813b2023-03-16 17:58:39 +01001056 launch.Log("Cluster: all nodes up:")
Serge Bazanskibe742842022-04-04 13:18:50 +02001057 for _, node := range cluster.Nodes {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001058 launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
Serge Bazanskibe742842022-04-04 13:18:50 +02001059 }
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001060 launch.Log("Cluster: starting tests...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001061
Serge Bazanskibe742842022-04-04 13:18:50 +02001062 return cluster, nil
Serge Bazanski66e58952021-10-05 17:06:56 +02001063}
1064
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001065// RebootNode reboots the cluster member node matching the given index, and
1066// waits for it to rejoin the cluster. It will use the given context ctx to run
1067// cluster API requests, whereas the resulting QEMU process will be created
1068// using the cluster's context c.ctxT. The nodes are indexed starting at 0.
1069func (c *Cluster) RebootNode(ctx context.Context, idx int) error {
1070 if idx < 0 || idx >= len(c.NodeIDs) {
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001071 return fmt.Errorf("index out of bounds")
1072 }
1073 if c.nodeOpts[idx].Runtime == nil {
1074 return fmt.Errorf("node not running")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001075 }
1076 id := c.NodeIDs[idx]
1077
1078 // Get an authenticated owner client within the cluster.
Serge Bazanski5bb8a332022-06-23 17:41:33 +02001079 curC, err := c.CuratorClient()
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001080 if err != nil {
1081 return err
1082 }
1083 mgmt := apb.NewManagementClient(curC)
1084
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001085 // Cancel the node's context. This will shut down QEMU.
1086 c.nodeOpts[idx].Runtime.CtxC()
Serge Bazanski05f813b2023-03-16 17:58:39 +01001087 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001088 err = <-c.nodesDone[idx]
1089 if err != nil {
1090 return fmt.Errorf("while restarting node: %w", err)
1091 }
1092
1093 // Start QEMU again.
Serge Bazanski05f813b2023-03-16 17:58:39 +01001094 launch.Log("Cluster: restarting node %d (%s).", idx, id)
Serge Bazanskiee8c81b2024-04-03 11:59:38 +02001095 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
1096 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1097 }
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001098
Serge Bazanskibc969572024-03-21 11:56:13 +01001099 start := time.Now()
1100
1101 // Poll Management.GetNodes until the node is healthy.
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001102 for {
1103 cs, err := getNode(ctx, mgmt, id)
1104 if err != nil {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001105 launch.Log("Cluster: node get error: %v", err)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001106 return err
1107 }
Serge Bazanskibc969572024-03-21 11:56:13 +01001108 launch.Log("Cluster: node health: %+v", cs.Health)
1109
1110 lhb := time.Now().Add(-cs.TimeSinceHeartbeat.AsDuration())
1111 if lhb.After(start) && cs.Health == apb.Node_HEALTHY {
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001112 break
1113 }
1114 time.Sleep(time.Second)
1115 }
Serge Bazanski05f813b2023-03-16 17:58:39 +01001116 launch.Log("Cluster: node %d (%s) has rejoined the cluster.", idx, id)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001117 return nil
1118}
1119
Serge Bazanski500f6e02024-04-03 12:06:40 +02001120// ShutdownNode performs an ungraceful shutdown (i.e. power off) of the node
1121// given by idx. If the node is already shut down, this is a no-op.
1122func (c *Cluster) ShutdownNode(idx int) error {
1123 if idx < 0 || idx >= len(c.NodeIDs) {
1124 return fmt.Errorf("index out of bounds")
1125 }
1126 // Return if node is already stopped.
1127 select {
1128 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1129 return nil
1130 default:
1131 }
1132 id := c.NodeIDs[idx]
1133
1134 // Cancel the node's context. This will shut down QEMU.
1135 c.nodeOpts[idx].Runtime.CtxC()
1136 launch.Log("Cluster: waiting for node %d (%s) to stop.", idx, id)
1137 err := <-c.nodesDone[idx]
1138 if err != nil {
1139 return fmt.Errorf("while shutting down node: %w", err)
1140 }
1141 return nil
1142}
1143
1144// StartNode performs a power on of the node given by idx. If the node is already
1145// running, this is a no-op.
1146func (c *Cluster) StartNode(idx int) error {
1147 if idx < 0 || idx >= len(c.NodeIDs) {
1148 return fmt.Errorf("index out of bounds")
1149 }
1150 id := c.NodeIDs[idx]
1151 // Return if node is already running.
1152 select {
1153 case <-c.nodeOpts[idx].Runtime.ctxT.Done():
1154 default:
1155 return nil
1156 }
1157
1158 // Start QEMU again.
1159 launch.Log("Cluster: starting node %d (%s).", idx, id)
1160 if err := LaunchNode(c.ctxT, c.launchDir, c.socketDir, &c.nodeOpts[idx], c.nodesDone[idx]); err != nil {
1161 return fmt.Errorf("failed to launch node %d: %w", idx, err)
1162 }
1163 return nil
1164}
1165
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001166// Close cancels the running clusters' context and waits for all virtualized
Serge Bazanski66e58952021-10-05 17:06:56 +02001167// nodes to stop. It returns an error if stopping the nodes failed, or one of
1168// the nodes failed to fully start in the first place.
1169func (c *Cluster) Close() error {
Serge Bazanski05f813b2023-03-16 17:58:39 +01001170 launch.Log("Cluster: stopping...")
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001171 if c.authClient != nil {
1172 c.authClient.Close()
1173 }
Serge Bazanski66e58952021-10-05 17:06:56 +02001174 c.ctxC()
1175
Leopold20a036e2023-01-15 00:17:19 +01001176 var errs []error
Serge Bazanski05f813b2023-03-16 17:58:39 +01001177 launch.Log("Cluster: waiting for nodes to exit...")
Serge Bazanski66e58952021-10-05 17:06:56 +02001178 for _, c := range c.nodesDone {
1179 err := <-c
1180 if err != nil {
Leopold20a036e2023-01-15 00:17:19 +01001181 errs = append(errs, err)
Serge Bazanski66e58952021-10-05 17:06:56 +02001182 }
1183 }
Serge Bazanskid09c58f2023-03-17 00:25:08 +01001184 launch.Log("Cluster: removing nodes' state files (%s) and sockets (%s).", c.launchDir, c.socketDir)
Mateusz Zalega0246f5e2022-04-22 17:29:04 +02001185 os.RemoveAll(c.launchDir)
1186 os.RemoveAll(c.socketDir)
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001187 os.RemoveAll(c.metroctlDir)
Serge Bazanski05f813b2023-03-16 17:58:39 +01001188 launch.Log("Cluster: done")
Leopold20a036e2023-01-15 00:17:19 +01001189 return multierr.Combine(errs...)
Serge Bazanski66e58952021-10-05 17:06:56 +02001190}
Serge Bazanskibe742842022-04-04 13:18:50 +02001191
1192// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
1193// their ID. This is performed by connecting to the cluster nanoswitch via its
1194// SOCKS proxy, and using the cluster node list for name resolution.
1195//
1196// For example:
1197//
Serge Bazanski05f813b2023-03-16 17:58:39 +01001198// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
Serge Bazanskibe742842022-04-04 13:18:50 +02001199func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
1200 host, port, err := net.SplitHostPort(addr)
1201 if err != nil {
1202 return nil, fmt.Errorf("invalid host:port: %w", err)
1203 }
1204 // Already an IP address?
1205 if net.ParseIP(host) != nil {
Lorenz Brun276a7462023-07-12 21:28:54 +02001206 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001207 }
1208
1209 // Otherwise, expect a node name.
1210 node, ok := c.Nodes[host]
1211 if !ok {
1212 return nil, fmt.Errorf("unknown node %q", host)
1213 }
1214 addr = net.JoinHostPort(node.ManagementAddress, port)
Lorenz Brun276a7462023-07-12 21:28:54 +02001215 return c.SOCKSDialer.Dial("tcp", addr)
Serge Bazanskibe742842022-04-04 13:18:50 +02001216}
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001217
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001218// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
1219// Kubernetes authenticating proxy using the cluster owner identity.
1220// It currently has access to everything (i.e. the cluster-admin role)
1221// via the owner-admin binding.
1222func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
1223 pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
1224 if err != nil {
1225 // We explicitly pass an Ed25519 private key in, so this can't happen
1226 panic(err)
1227 }
1228
1229 host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
Lorenz Brun150f24a2023-07-13 20:11:06 +02001230 clientConfig := rest.Config{
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001231 Host: host,
1232 TLSClientConfig: rest.TLSClientConfig{
1233 // TODO(q3k): use CA certificate
1234 Insecure: true,
1235 ServerName: "kubernetes.default.svc",
1236 CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
1237 KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
1238 },
1239 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
1240 return c.DialNode(ctx, address)
1241 },
1242 }
1243 return kubernetes.NewForConfig(&clientConfig)
1244}
1245
Serge Bazanski1f8cad72023-03-20 16:58:10 +01001246// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
1247// which are currently Kubernetes controllers, ie. run an apiserver. This list
1248// might be empty if no node is currently configured with the
1249// 'KubernetesController' node.
1250func (c *Cluster) KubernetesControllerNodeAddresses(ctx context.Context) ([]string, error) {
1251 curC, err := c.CuratorClient()
1252 if err != nil {
1253 return nil, err
1254 }
1255 mgmt := apb.NewManagementClient(curC)
1256 srv, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
1257 Filter: "has(node.roles.kubernetes_controller)",
1258 })
1259 if err != nil {
1260 return nil, err
1261 }
1262 defer srv.CloseSend()
1263 var res []string
1264 for {
1265 n, err := srv.Recv()
1266 if err == io.EOF {
1267 break
1268 }
1269 if err != nil {
1270 return nil, err
1271 }
1272 if n.Status == nil || n.Status.ExternalAddress == "" {
1273 continue
1274 }
1275 res = append(res, n.Status.ExternalAddress)
1276 }
1277 return res, nil
1278}
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001279
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001280// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
1281// healthy.
Serge Bazanski630fb5c2023-04-06 10:50:24 +02001282func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
1283 // Get an authenticated owner client within the cluster.
1284 curC, err := c.CuratorClient()
1285 if err != nil {
1286 return err
1287 }
1288 mgmt := apb.NewManagementClient(curC)
1289 nodes, err := getNodes(ctx, mgmt)
1290 if err != nil {
1291 return err
1292 }
1293
1294 var unhealthy []string
1295 for _, node := range nodes {
1296 if node.Health == apb.Node_HEALTHY {
1297 continue
1298 }
1299 unhealthy = append(unhealthy, node.Id)
1300 }
1301 if len(unhealthy) == 0 {
1302 return nil
1303 }
1304 return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
1305}
Serge Bazanskia0bc6d32023-06-28 18:57:40 +02001306
1307// ApproveNode approves a node by ID, waiting for it to become UP.
1308func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
1309 curC, err := c.CuratorClient()
1310 if err != nil {
1311 return err
1312 }
1313 mgmt := apb.NewManagementClient(curC)
1314
1315 _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
1316 Pubkey: c.Nodes[id].Pubkey,
1317 })
1318 if err != nil {
1319 return fmt.Errorf("ApproveNode: %w", err)
1320 }
1321 launch.Log("Cluster: %s: approved, waiting for UP", id)
1322 for {
1323 nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
1324 if err != nil {
1325 return fmt.Errorf("GetNodes: %w", err)
1326 }
1327 found := false
1328 for {
1329 node, err := nodes.Recv()
1330 if errors.Is(err, io.EOF) {
1331 break
1332 }
1333 if err != nil {
1334 return fmt.Errorf("Nodes.Recv: %w", err)
1335 }
1336 if node.Id != id {
1337 continue
1338 }
1339 if node.State != cpb.NodeState_NODE_STATE_UP {
1340 continue
1341 }
1342 found = true
1343 break
1344 }
1345 nodes.CloseSend()
1346
1347 if found {
1348 break
1349 }
1350 time.Sleep(time.Second)
1351 }
1352 launch.Log("Cluster: %s: UP", id)
1353 return nil
1354}
1355
1356// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
1357func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
1358 curC, err := c.CuratorClient()
1359 if err != nil {
1360 return err
1361 }
1362 mgmt := apb.NewManagementClient(curC)
1363
1364 tr := true
1365 launch.Log("Cluster: %s: adding KubernetesWorker", id)
1366 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1367 Node: &apb.UpdateNodeRolesRequest_Id{
1368 Id: id,
1369 },
1370 KubernetesWorker: &tr,
1371 })
1372 return err
1373}
Serge Bazanski37cfcc12024-03-21 11:59:07 +01001374
1375// MakeConsensusMember adds the ConsensusMember role to a node by ID.
1376func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
1377 curC, err := c.CuratorClient()
1378 if err != nil {
1379 return err
1380 }
1381 mgmt := apb.NewManagementClient(curC)
1382 cur := ipb.NewCuratorClient(curC)
1383
1384 tr := true
1385 launch.Log("Cluster: %s: adding ConsensusMember", id)
1386 bo := backoff.NewExponentialBackOff()
1387 bo.MaxElapsedTime = 10 * time.Second
1388
1389 backoff.Retry(func() error {
1390 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
1391 Node: &apb.UpdateNodeRolesRequest_Id{
1392 Id: id,
1393 },
1394 ConsensusMember: &tr,
1395 })
1396 if err != nil {
1397 launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
1398 }
1399 return err
1400 }, backoff.WithContext(bo, ctx))
1401 if err != nil {
1402 return err
1403 }
1404
1405 launch.Log("Cluster: %s: waiting for learner/full members...", id)
1406
1407 learner := false
1408 for {
1409 res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
1410 if err != nil {
1411 return fmt.Errorf("GetConsensusStatus: %w", err)
1412 }
1413 for _, member := range res.EtcdMember {
1414 if member.Id != id {
1415 continue
1416 }
1417 switch member.Status {
1418 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
1419 if !learner {
1420 learner = true
1421 launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
1422 }
1423 case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
1424 launch.Log("Cluster: %s: became a full member", id)
1425 return nil
1426 }
1427 }
1428 time.Sleep(100 * time.Millisecond)
1429 }
1430}