blob: ec38aa3289fdce361f20c09cd1222746857f3e6a [file] [log] [blame]
Serge Bazanski99b02142024-04-17 16:33:28 +02001package kubernetes
2
3import (
4 "context"
5 "crypto/tls"
6 "crypto/x509"
7 "errors"
8 "fmt"
9 "io"
10 "net"
11 "net/http"
12 _ "net/http/pprof"
13 "net/url"
14 "os"
15 "strings"
16 "testing"
17 "time"
18
19 "github.com/bazelbuild/rules_go/go/runfiles"
20 corev1 "k8s.io/api/core/v1"
21 kerrors "k8s.io/apimachinery/pkg/api/errors"
22 "k8s.io/apimachinery/pkg/api/resource"
23 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24 podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
25
26 "source.monogon.dev/metropolis/pkg/localregistry"
27 "source.monogon.dev/metropolis/test/launch/cluster"
28 "source.monogon.dev/metropolis/test/util"
29
30 common "source.monogon.dev/metropolis/node"
31)
32
33const (
34 // Timeout for the global test context.
35 //
36 // Bazel would eventually time out the test after 900s ("large") if, for
37 // some reason, the context cancellation fails to abort it.
38 globalTestTimeout = 600 * time.Second
39
40 // Timeouts for individual end-to-end tests of different sizes.
41 smallTestTimeout = 60 * time.Second
42 largeTestTimeout = 120 * time.Second
43)
44
45// TestE2EKubernetes exercises the Kubernetes functionality of Metropolis.
46//
47// The tests are performed against an in-memory cluster.
48func TestE2EKubernetes(t *testing.T) {
49 // Set a global timeout to make sure this terminates
50 ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
51 defer cancel()
52
53 rPath, err := runfiles.Rlocation("_main/metropolis/test/e2e/testimages_manifest.prototxt")
54 if err != nil {
55 t.Fatalf("Resolving registry manifest failed: %v", err)
56 }
57 df, err := os.ReadFile(rPath)
58 if err != nil {
59 t.Fatalf("Reading registry manifest failed: %v", err)
60 }
61 lr, err := localregistry.FromBazelManifest(df)
62 if err != nil {
63 t.Fatalf("Creating test image registry failed: %v", err)
64 }
65
66 // Launch cluster.
67 clusterOptions := cluster.ClusterOptions{
68 NumNodes: 2,
69 LocalRegistry: lr,
70 }
71 cluster, err := cluster.LaunchCluster(ctx, clusterOptions)
72 if err != nil {
73 t.Fatalf("LaunchCluster failed: %v", err)
74 }
75 defer func() {
76 err := cluster.Close()
77 if err != nil {
78 t.Fatalf("cluster Close failed: %v", err)
79 }
80 }()
81
82 clientSet, err := cluster.GetKubeClientSet()
83 if err != nil {
84 t.Fatal(err)
85 }
86 util.TestEventual(t, "Add KubernetesWorker roles", ctx, smallTestTimeout, func(ctx context.Context) error {
87 // Make everything but the first node into KubernetesWorkers.
88 for i := 1; i < clusterOptions.NumNodes; i++ {
89 err := cluster.MakeKubernetesWorker(ctx, cluster.NodeIDs[i])
90 if err != nil {
91 return util.Permanent(fmt.Errorf("MakeKubernetesWorker: %w", err))
92 }
93 }
94 return nil
95 })
96 util.TestEventual(t, "Node is registered and ready", ctx, largeTestTimeout, func(ctx context.Context) error {
97 nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
98 if err != nil {
99 return err
100 }
101 if len(nodes.Items) < 1 {
102 return errors.New("node not yet registered")
103 }
104 node := nodes.Items[0]
105 for _, cond := range node.Status.Conditions {
106 if cond.Type != corev1.NodeReady {
107 continue
108 }
109 if cond.Status != corev1.ConditionTrue {
110 return fmt.Errorf("node not ready: %v", cond.Message)
111 }
112 }
113 return nil
114 })
115 util.TestEventual(t, "Simple deployment", ctx, largeTestTimeout, func(ctx context.Context) error {
116 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeTestDeploymentSpec("test-deploy-1"), metav1.CreateOptions{})
117 return err
118 })
119 util.TestEventual(t, "Simple deployment is running", ctx, largeTestTimeout, func(ctx context.Context) error {
120 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-1"})
121 if err != nil {
122 return err
123 }
124 if len(res.Items) == 0 {
125 return errors.New("pod didn't get created")
126 }
127 pod := res.Items[0]
128 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
129 return nil
130 }
131 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
132 if err != nil || len(events.Items) == 0 {
133 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
134 } else {
135 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
136 }
137 })
138 util.TestEventual(t, "Simple deployment with gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
139 deployment := makeTestDeploymentSpec("test-deploy-2")
140 gvisorStr := "gvisor"
141 deployment.Spec.Template.Spec.RuntimeClassName = &gvisorStr
142 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
143 return err
144 })
145 util.TestEventual(t, "Simple deployment is running on gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
146 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-2"})
147 if err != nil {
148 return err
149 }
150 if len(res.Items) == 0 {
151 return errors.New("pod didn't get created")
152 }
153 pod := res.Items[0]
154 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
155 return nil
156 }
157 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
158 if err != nil || len(events.Items) == 0 {
159 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
160 } else {
161 var errorMsg strings.Builder
162 for _, msg := range events.Items {
163 errorMsg.WriteString(" | ")
164 errorMsg.WriteString(msg.Message)
165 }
166 return fmt.Errorf("pod is not ready: %v", errorMsg.String())
167 }
168 })
169 util.TestEventual(t, "Simple StatefulSet with PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
170 _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-1", corev1.PersistentVolumeFilesystem), metav1.CreateOptions{})
171 return err
172 })
173 util.TestEventual(t, "Simple StatefulSet with PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
174 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-1"})
175 if err != nil {
176 return err
177 }
178 if len(res.Items) == 0 {
179 return errors.New("pod didn't get created")
180 }
181 pod := res.Items[0]
182 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
183 return nil
184 }
185 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
186 if err != nil || len(events.Items) == 0 {
187 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
188 } else {
189 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
190 }
191 })
192 util.TestEventual(t, "Simple StatefulSet with Block PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
193 _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-2", corev1.PersistentVolumeBlock), metav1.CreateOptions{})
194 return err
195 })
196 util.TestEventual(t, "Simple StatefulSet with Block PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
197 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-2"})
198 if err != nil {
199 return err
200 }
201 if len(res.Items) == 0 {
202 return errors.New("pod didn't get created")
203 }
204 pod := res.Items[0]
205 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
206 return nil
207 }
208 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
209 if err != nil || len(events.Items) == 0 {
210 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
211 } else {
212 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
213 }
214 })
215 util.TestEventual(t, "In-cluster self-test job", ctx, smallTestTimeout, func(ctx context.Context) error {
216 _, err := clientSet.BatchV1().Jobs("default").Create(ctx, makeSelftestSpec("selftest"), metav1.CreateOptions{})
217 return err
218 })
219 util.TestEventual(t, "In-cluster self-test job passed", ctx, smallTestTimeout, func(ctx context.Context) error {
220 res, err := clientSet.BatchV1().Jobs("default").Get(ctx, "selftest", metav1.GetOptions{})
221 if err != nil {
222 return err
223 }
224 if res.Status.Failed > 0 {
225 pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
226 LabelSelector: "job-name=selftest",
227 })
228 if err != nil {
229 return util.Permanent(fmt.Errorf("job failed but failed to find pod: %w", err))
230 }
231 if len(pods.Items) < 1 {
232 return fmt.Errorf("job failed but pod does not exist")
233 }
234 lines, err := getPodLogLines(ctx, clientSet, pods.Items[0].Name, 1)
235 if err != nil {
236 return fmt.Errorf("job failed but could not get logs: %w", err)
237 }
238 if len(lines) > 0 {
239 return util.Permanent(fmt.Errorf("job failed, last log line: %s", lines[0]))
240 }
241 return util.Permanent(fmt.Errorf("job failed, empty log"))
242 }
243 if res.Status.Succeeded > 0 {
244 return nil
245 }
246 return fmt.Errorf("job still running")
247 })
248 util.TestEventual(t, "Start NodePort test setup", ctx, smallTestTimeout, func(ctx context.Context) error {
249 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeHTTPServerDeploymentSpec("nodeport-server"), metav1.CreateOptions{})
250 if err != nil && !kerrors.IsAlreadyExists(err) {
251 return err
252 }
253 _, err = clientSet.CoreV1().Services("default").Create(ctx, makeHTTPServerNodePortService("nodeport-server"), metav1.CreateOptions{})
254 if err != nil && !kerrors.IsAlreadyExists(err) {
255 return err
256 }
257 return nil
258 })
259 util.TestEventual(t, "NodePort accessible from all nodes", ctx, smallTestTimeout, func(ctx context.Context) error {
260 nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
261 if err != nil {
262 return err
263 }
264 // Use a new client for each attempt
265 hc := http.Client{
266 Timeout: 2 * time.Second,
267 Transport: &http.Transport{
268 Dial: cluster.SOCKSDialer.Dial,
269 },
270 }
271 for _, n := range nodes.Items {
272 var addr string
273 for _, a := range n.Status.Addresses {
274 if a.Type == corev1.NodeInternalIP {
275 addr = a.Address
276 }
277 }
278 u := url.URL{Scheme: "http", Host: addr, Path: "/"}
279 res, err := hc.Get(u.String())
280 if err != nil {
281 return fmt.Errorf("failed getting from node %q: %w", n.Name, err)
282 }
283 if res.StatusCode != http.StatusOK {
284 return fmt.Errorf("getting from node %q: HTTP %d", n.Name, res.StatusCode)
285 }
286 t.Logf("Got response from %q", n.Name)
287 }
288 return nil
289 })
290 util.TestEventual(t, "containerd metrics retrieved", ctx, smallTestTimeout, func(ctx context.Context) error {
291 pool := x509.NewCertPool()
292 pool.AddCert(cluster.CACertificate)
293 cl := http.Client{
294 Transport: &http.Transport{
295 TLSClientConfig: &tls.Config{
296 Certificates: []tls.Certificate{cluster.Owner},
297 RootCAs: pool,
298 },
299 DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
300 return cluster.DialNode(ctx, addr)
301 },
302 },
303 }
304 u := url.URL{
305 Scheme: "https",
306 Host: net.JoinHostPort(cluster.NodeIDs[1], common.MetricsPort.PortString()),
307 Path: "/metrics/containerd",
308 }
309 res, err := cl.Get(u.String())
310 if err != nil {
311 return err
312 }
313 defer res.Body.Close()
314 if res.StatusCode != 200 {
315 return fmt.Errorf("status code %d", res.StatusCode)
316 }
317
318 body, err := io.ReadAll(res.Body)
319 if err != nil {
320 return err
321 }
322 needle := "containerd_build_info_total"
323 if !strings.Contains(string(body), needle) {
324 return util.Permanent(fmt.Errorf("could not find %q in returned response", needle))
325 }
326 return nil
327 })
328 if os.Getenv("HAVE_NESTED_KVM") != "" {
329 util.TestEventual(t, "Pod for KVM/QEMU smoke test", ctx, smallTestTimeout, func(ctx context.Context) error {
330 runcRuntimeClass := "runc"
331 _, err := clientSet.CoreV1().Pods("default").Create(ctx, &corev1.Pod{
332 ObjectMeta: metav1.ObjectMeta{
333 Name: "vm-smoketest",
334 },
335 Spec: corev1.PodSpec{
336 Containers: []corev1.Container{{
337 Name: "vm-smoketest",
338 ImagePullPolicy: corev1.PullNever,
339 Image: "test.monogon.internal/metropolis/vm/smoketest:smoketest_container",
340 Resources: corev1.ResourceRequirements{
341 Limits: corev1.ResourceList{
342 "devices.monogon.dev/kvm": *resource.NewQuantity(1, ""),
343 },
344 },
345 }},
346 RuntimeClassName: &runcRuntimeClass,
347 RestartPolicy: corev1.RestartPolicyNever,
348 },
349 }, metav1.CreateOptions{})
350 return err
351 })
352 util.TestEventual(t, "KVM/QEMU smoke test completion", ctx, smallTestTimeout, func(ctx context.Context) error {
353 pod, err := clientSet.CoreV1().Pods("default").Get(ctx, "vm-smoketest", metav1.GetOptions{})
354 if err != nil {
355 return fmt.Errorf("failed to get pod: %w", err)
356 }
357 if pod.Status.Phase == corev1.PodSucceeded {
358 return nil
359 }
360 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
361 if err != nil || len(events.Items) == 0 {
362 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
363 } else {
364 return fmt.Errorf("pod is not ready: %v", events.Items[len(events.Items)-1].Message)
365 }
366 })
367 }
368}