blob: f68f0a4d6797602ad6d5088e338a96637747765b [file] [log] [blame]
Serge Bazanski99b02142024-04-17 16:33:28 +02001package kubernetes
2
3import (
4 "context"
5 "crypto/tls"
6 "crypto/x509"
7 "errors"
8 "fmt"
9 "io"
10 "net"
11 "net/http"
12 _ "net/http/pprof"
13 "net/url"
14 "os"
15 "strings"
16 "testing"
17 "time"
18
19 "github.com/bazelbuild/rules_go/go/runfiles"
20 corev1 "k8s.io/api/core/v1"
21 kerrors "k8s.io/apimachinery/pkg/api/errors"
22 "k8s.io/apimachinery/pkg/api/resource"
23 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24 podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
25
Lorenz Brun732a8842024-08-26 23:25:37 +020026 common "source.monogon.dev/metropolis/node"
Serge Bazanski6d1ff362024-09-30 15:15:31 +000027 apb "source.monogon.dev/metropolis/proto/api"
Lorenz Brun732a8842024-08-26 23:25:37 +020028 cpb "source.monogon.dev/metropolis/proto/common"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020029 mlaunch "source.monogon.dev/metropolis/test/launch"
30 "source.monogon.dev/metropolis/test/localregistry"
Serge Bazanski99b02142024-04-17 16:33:28 +020031 "source.monogon.dev/metropolis/test/util"
Serge Bazanski99b02142024-04-17 16:33:28 +020032)
33
Tim Windelschmidt82e6af72024-07-23 00:05:42 +000034var (
35 // These are filled by bazel at linking time with the canonical path of
36 // their corresponding file. Inside the init function we resolve it
37 // with the rules_go runfiles package to the real path.
38 xTestImagesManifestPath string
39)
40
41func init() {
42 var err error
43 for _, path := range []*string{
44 &xTestImagesManifestPath,
45 } {
46 *path, err = runfiles.Rlocation(*path)
47 if err != nil {
48 panic(err)
49 }
50 }
51}
52
Serge Bazanski99b02142024-04-17 16:33:28 +020053const (
54 // Timeout for the global test context.
55 //
56 // Bazel would eventually time out the test after 900s ("large") if, for
57 // some reason, the context cancellation fails to abort it.
58 globalTestTimeout = 600 * time.Second
59
60 // Timeouts for individual end-to-end tests of different sizes.
61 smallTestTimeout = 60 * time.Second
62 largeTestTimeout = 120 * time.Second
63)
64
Serge Bazanski6d1ff362024-09-30 15:15:31 +000065// TestE2EKubernetesLabels verifies that Kubernetes node labels are being updated
66// when the cluster state changes.
67func TestE2EKubernetesLabels(t *testing.T) {
68 ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
69 defer cancel()
70
71 clusterOptions := mlaunch.ClusterOptions{
72 NumNodes: 2,
73 InitialClusterConfiguration: &cpb.ClusterConfiguration{
74 TpmMode: cpb.ClusterConfiguration_TPM_MODE_DISABLED,
75 StorageSecurityPolicy: cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_INSECURE,
76 },
77 }
78 cluster, err := mlaunch.LaunchCluster(ctx, clusterOptions)
79 if err != nil {
80 t.Fatalf("LaunchCluster failed: %v", err)
81 }
82 defer func() {
83 err := cluster.Close()
84 if err != nil {
85 t.Fatalf("cluster Close failed: %v", err)
86 }
87 }()
88
89 con, err := cluster.CuratorClient()
90 if err != nil {
91 t.Fatalf("Could not get curator client: %v", err)
92 }
93 mgmt := apb.NewManagementClient(con)
94 clientSet, err := cluster.GetKubeClientSet()
95 if err != nil {
96 t.Fatal(err)
97 }
98
99 getLabelsForNode := func(nid string) common.Labels {
100 node, err := clientSet.CoreV1().Nodes().Get(ctx, nid, metav1.GetOptions{})
101 if kerrors.IsNotFound(err) {
102 return nil
103 }
104 if err != nil {
105 t.Fatalf("Could not get node %s: %v", nid, err)
106 return nil
107 }
108 return common.Labels(node.Labels).Filter(func(k, v string) bool {
109 return strings.HasPrefix(k, "node-role.kubernetes.io/")
110 })
111 }
112
113 // Nodes should have no labels at first.
114 for _, nid := range cluster.NodeIDs {
115 if labels := getLabelsForNode(nid); !labels.Equals(nil) {
116 t.Errorf("Node %s should have no labels, has %s", nid, labels)
117 }
118 }
119 // Nominate both nodes to be Kubernetes workers.
120 for _, nid := range cluster.NodeIDs {
121 yes := true
122 _, err := mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
123 Node: &apb.UpdateNodeRolesRequest_Id{
124 Id: nid,
125 },
126 KubernetesWorker: &yes,
127 })
128 if err != nil {
129 t.Fatalf("Could not make %s a KubernetesWorker: %v", nid, err)
130 }
131 }
132
133 util.MustTestEventual(t, "Labels added", ctx, time.Second*5, func(ctx context.Context) error {
134 // Nodes should have role labels now.
135 for _, nid := range cluster.NodeIDs {
136 want := common.Labels{
137 "node-role.kubernetes.io/KubernetesWorker": "",
138 }
139 if nid == cluster.NodeIDs[0] {
140 want["node-role.kubernetes.io/KubernetesController"] = ""
141 want["node-role.kubernetes.io/ConsensusMember"] = ""
142 }
143 if labels := getLabelsForNode(nid); !want.Equals(labels) {
144 return fmt.Errorf("node %s should have labels %s, has %s", nid, want, labels)
145 }
146 }
147 return nil
148 })
149
150 // Remove KubernetesWorker from first node again. It will stay in k8s (arguably,
151 // this is a bug) but its role label should be removed.
152 no := false
153 _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
154 Node: &apb.UpdateNodeRolesRequest_Id{
155 Id: cluster.NodeIDs[0],
156 },
157 KubernetesWorker: &no,
158 })
159 if err != nil {
160 t.Fatalf("Could not remove KubernetesWorker from %s: %v", cluster.NodeIDs[0], err)
161 }
162
163 util.MustTestEventual(t, "Labels removed", ctx, time.Second*5, func(ctx context.Context) error {
164 for _, nid := range cluster.NodeIDs {
165 want := make(common.Labels)
166 if nid == cluster.NodeIDs[0] {
167 want["node-role.kubernetes.io/KubernetesController"] = ""
168 want["node-role.kubernetes.io/ConsensusMember"] = ""
169 } else {
170 want["node-role.kubernetes.io/KubernetesWorker"] = ""
171 }
172 if labels := getLabelsForNode(nid); !want.Equals(labels) {
173 return fmt.Errorf("node %s should have labels %s, has %s", nid, want, labels)
174 }
175 }
176 return nil
177 })
178}
179
Serge Bazanski99b02142024-04-17 16:33:28 +0200180// TestE2EKubernetes exercises the Kubernetes functionality of Metropolis.
181//
182// The tests are performed against an in-memory cluster.
183func TestE2EKubernetes(t *testing.T) {
184 // Set a global timeout to make sure this terminates
185 ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
186 defer cancel()
187
Tim Windelschmidt82e6af72024-07-23 00:05:42 +0000188 df, err := os.ReadFile(xTestImagesManifestPath)
Serge Bazanski99b02142024-04-17 16:33:28 +0200189 if err != nil {
190 t.Fatalf("Reading registry manifest failed: %v", err)
191 }
192 lr, err := localregistry.FromBazelManifest(df)
193 if err != nil {
194 t.Fatalf("Creating test image registry failed: %v", err)
195 }
196
197 // Launch cluster.
Tim Windelschmidt9f21f532024-05-07 15:14:20 +0200198 clusterOptions := mlaunch.ClusterOptions{
Serge Bazanski99b02142024-04-17 16:33:28 +0200199 NumNodes: 2,
200 LocalRegistry: lr,
Lorenz Brun732a8842024-08-26 23:25:37 +0200201 InitialClusterConfiguration: &cpb.ClusterConfiguration{
202 TpmMode: cpb.ClusterConfiguration_TPM_MODE_DISABLED,
203 StorageSecurityPolicy: cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_INSECURE,
204 },
Serge Bazanski99b02142024-04-17 16:33:28 +0200205 }
Tim Windelschmidt9f21f532024-05-07 15:14:20 +0200206 cluster, err := mlaunch.LaunchCluster(ctx, clusterOptions)
Serge Bazanski99b02142024-04-17 16:33:28 +0200207 if err != nil {
208 t.Fatalf("LaunchCluster failed: %v", err)
209 }
210 defer func() {
211 err := cluster.Close()
212 if err != nil {
213 t.Fatalf("cluster Close failed: %v", err)
214 }
215 }()
216
217 clientSet, err := cluster.GetKubeClientSet()
218 if err != nil {
219 t.Fatal(err)
220 }
221 util.TestEventual(t, "Add KubernetesWorker roles", ctx, smallTestTimeout, func(ctx context.Context) error {
222 // Make everything but the first node into KubernetesWorkers.
223 for i := 1; i < clusterOptions.NumNodes; i++ {
224 err := cluster.MakeKubernetesWorker(ctx, cluster.NodeIDs[i])
225 if err != nil {
226 return util.Permanent(fmt.Errorf("MakeKubernetesWorker: %w", err))
227 }
228 }
229 return nil
230 })
231 util.TestEventual(t, "Node is registered and ready", ctx, largeTestTimeout, func(ctx context.Context) error {
232 nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
233 if err != nil {
234 return err
235 }
236 if len(nodes.Items) < 1 {
237 return errors.New("node not yet registered")
238 }
239 node := nodes.Items[0]
240 for _, cond := range node.Status.Conditions {
241 if cond.Type != corev1.NodeReady {
242 continue
243 }
244 if cond.Status != corev1.ConditionTrue {
245 return fmt.Errorf("node not ready: %v", cond.Message)
246 }
247 }
248 return nil
249 })
250 util.TestEventual(t, "Simple deployment", ctx, largeTestTimeout, func(ctx context.Context) error {
251 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeTestDeploymentSpec("test-deploy-1"), metav1.CreateOptions{})
252 return err
253 })
254 util.TestEventual(t, "Simple deployment is running", ctx, largeTestTimeout, func(ctx context.Context) error {
255 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-1"})
256 if err != nil {
257 return err
258 }
259 if len(res.Items) == 0 {
260 return errors.New("pod didn't get created")
261 }
262 pod := res.Items[0]
263 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
264 return nil
265 }
266 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
267 if err != nil || len(events.Items) == 0 {
268 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
269 } else {
270 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
271 }
272 })
273 util.TestEventual(t, "Simple deployment with gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
274 deployment := makeTestDeploymentSpec("test-deploy-2")
275 gvisorStr := "gvisor"
276 deployment.Spec.Template.Spec.RuntimeClassName = &gvisorStr
277 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
278 return err
279 })
280 util.TestEventual(t, "Simple deployment is running on gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
281 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-2"})
282 if err != nil {
283 return err
284 }
285 if len(res.Items) == 0 {
286 return errors.New("pod didn't get created")
287 }
288 pod := res.Items[0]
289 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
290 return nil
291 }
292 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
293 if err != nil || len(events.Items) == 0 {
294 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
295 } else {
296 var errorMsg strings.Builder
297 for _, msg := range events.Items {
298 errorMsg.WriteString(" | ")
299 errorMsg.WriteString(msg.Message)
300 }
Tim Windelschmidt5f1a7de2024-09-19 02:00:14 +0200301 return fmt.Errorf("pod is not ready: %s", errorMsg.String())
Serge Bazanski99b02142024-04-17 16:33:28 +0200302 }
303 })
304 util.TestEventual(t, "Simple StatefulSet with PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
305 _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-1", corev1.PersistentVolumeFilesystem), metav1.CreateOptions{})
306 return err
307 })
308 util.TestEventual(t, "Simple StatefulSet with PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
309 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-1"})
310 if err != nil {
311 return err
312 }
313 if len(res.Items) == 0 {
314 return errors.New("pod didn't get created")
315 }
316 pod := res.Items[0]
317 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
318 return nil
319 }
320 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
321 if err != nil || len(events.Items) == 0 {
322 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
323 } else {
324 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
325 }
326 })
327 util.TestEventual(t, "Simple StatefulSet with Block PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
328 _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-2", corev1.PersistentVolumeBlock), metav1.CreateOptions{})
329 return err
330 })
331 util.TestEventual(t, "Simple StatefulSet with Block PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
332 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-2"})
333 if err != nil {
334 return err
335 }
336 if len(res.Items) == 0 {
337 return errors.New("pod didn't get created")
338 }
339 pod := res.Items[0]
340 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
341 return nil
342 }
343 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
344 if err != nil || len(events.Items) == 0 {
345 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
346 } else {
347 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
348 }
349 })
350 util.TestEventual(t, "In-cluster self-test job", ctx, smallTestTimeout, func(ctx context.Context) error {
351 _, err := clientSet.BatchV1().Jobs("default").Create(ctx, makeSelftestSpec("selftest"), metav1.CreateOptions{})
352 return err
353 })
354 util.TestEventual(t, "In-cluster self-test job passed", ctx, smallTestTimeout, func(ctx context.Context) error {
355 res, err := clientSet.BatchV1().Jobs("default").Get(ctx, "selftest", metav1.GetOptions{})
356 if err != nil {
357 return err
358 }
359 if res.Status.Failed > 0 {
360 pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
361 LabelSelector: "job-name=selftest",
362 })
363 if err != nil {
364 return util.Permanent(fmt.Errorf("job failed but failed to find pod: %w", err))
365 }
366 if len(pods.Items) < 1 {
367 return fmt.Errorf("job failed but pod does not exist")
368 }
369 lines, err := getPodLogLines(ctx, clientSet, pods.Items[0].Name, 1)
370 if err != nil {
371 return fmt.Errorf("job failed but could not get logs: %w", err)
372 }
373 if len(lines) > 0 {
374 return util.Permanent(fmt.Errorf("job failed, last log line: %s", lines[0]))
375 }
376 return util.Permanent(fmt.Errorf("job failed, empty log"))
377 }
378 if res.Status.Succeeded > 0 {
379 return nil
380 }
381 return fmt.Errorf("job still running")
382 })
383 util.TestEventual(t, "Start NodePort test setup", ctx, smallTestTimeout, func(ctx context.Context) error {
384 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeHTTPServerDeploymentSpec("nodeport-server"), metav1.CreateOptions{})
385 if err != nil && !kerrors.IsAlreadyExists(err) {
386 return err
387 }
388 _, err = clientSet.CoreV1().Services("default").Create(ctx, makeHTTPServerNodePortService("nodeport-server"), metav1.CreateOptions{})
389 if err != nil && !kerrors.IsAlreadyExists(err) {
390 return err
391 }
392 return nil
393 })
394 util.TestEventual(t, "NodePort accessible from all nodes", ctx, smallTestTimeout, func(ctx context.Context) error {
395 nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
396 if err != nil {
397 return err
398 }
399 // Use a new client for each attempt
400 hc := http.Client{
401 Timeout: 2 * time.Second,
402 Transport: &http.Transport{
403 Dial: cluster.SOCKSDialer.Dial,
404 },
405 }
406 for _, n := range nodes.Items {
407 var addr string
408 for _, a := range n.Status.Addresses {
409 if a.Type == corev1.NodeInternalIP {
410 addr = a.Address
411 }
412 }
413 u := url.URL{Scheme: "http", Host: addr, Path: "/"}
414 res, err := hc.Get(u.String())
415 if err != nil {
416 return fmt.Errorf("failed getting from node %q: %w", n.Name, err)
417 }
418 if res.StatusCode != http.StatusOK {
419 return fmt.Errorf("getting from node %q: HTTP %d", n.Name, res.StatusCode)
420 }
421 t.Logf("Got response from %q", n.Name)
422 }
423 return nil
424 })
425 util.TestEventual(t, "containerd metrics retrieved", ctx, smallTestTimeout, func(ctx context.Context) error {
426 pool := x509.NewCertPool()
427 pool.AddCert(cluster.CACertificate)
428 cl := http.Client{
429 Transport: &http.Transport{
430 TLSClientConfig: &tls.Config{
431 Certificates: []tls.Certificate{cluster.Owner},
432 RootCAs: pool,
433 },
434 DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
435 return cluster.DialNode(ctx, addr)
436 },
437 },
438 }
439 u := url.URL{
440 Scheme: "https",
441 Host: net.JoinHostPort(cluster.NodeIDs[1], common.MetricsPort.PortString()),
442 Path: "/metrics/containerd",
443 }
444 res, err := cl.Get(u.String())
445 if err != nil {
446 return err
447 }
448 defer res.Body.Close()
449 if res.StatusCode != 200 {
450 return fmt.Errorf("status code %d", res.StatusCode)
451 }
452
453 body, err := io.ReadAll(res.Body)
454 if err != nil {
455 return err
456 }
457 needle := "containerd_build_info_total"
458 if !strings.Contains(string(body), needle) {
459 return util.Permanent(fmt.Errorf("could not find %q in returned response", needle))
460 }
461 return nil
462 })
463 if os.Getenv("HAVE_NESTED_KVM") != "" {
464 util.TestEventual(t, "Pod for KVM/QEMU smoke test", ctx, smallTestTimeout, func(ctx context.Context) error {
465 runcRuntimeClass := "runc"
466 _, err := clientSet.CoreV1().Pods("default").Create(ctx, &corev1.Pod{
467 ObjectMeta: metav1.ObjectMeta{
468 Name: "vm-smoketest",
469 },
470 Spec: corev1.PodSpec{
471 Containers: []corev1.Container{{
472 Name: "vm-smoketest",
473 ImagePullPolicy: corev1.PullNever,
474 Image: "test.monogon.internal/metropolis/vm/smoketest:smoketest_container",
475 Resources: corev1.ResourceRequirements{
476 Limits: corev1.ResourceList{
477 "devices.monogon.dev/kvm": *resource.NewQuantity(1, ""),
478 },
479 },
480 }},
481 RuntimeClassName: &runcRuntimeClass,
482 RestartPolicy: corev1.RestartPolicyNever,
483 },
484 }, metav1.CreateOptions{})
485 return err
486 })
487 util.TestEventual(t, "KVM/QEMU smoke test completion", ctx, smallTestTimeout, func(ctx context.Context) error {
488 pod, err := clientSet.CoreV1().Pods("default").Get(ctx, "vm-smoketest", metav1.GetOptions{})
489 if err != nil {
490 return fmt.Errorf("failed to get pod: %w", err)
491 }
492 if pod.Status.Phase == corev1.PodSucceeded {
493 return nil
494 }
495 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
496 if err != nil || len(events.Items) == 0 {
497 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
498 } else {
499 return fmt.Errorf("pod is not ready: %v", events.Items[len(events.Items)-1].Message)
500 }
501 })
502 }
503}