blob: 3608c3c8a35be3fa3fbd922296d399e353158566 [file] [log] [blame]
Serge Bazanski99b02142024-04-17 16:33:28 +02001package kubernetes
2
3import (
4 "context"
5 "crypto/tls"
6 "crypto/x509"
7 "errors"
8 "fmt"
9 "io"
10 "net"
11 "net/http"
12 _ "net/http/pprof"
13 "net/url"
14 "os"
15 "strings"
16 "testing"
17 "time"
18
19 "github.com/bazelbuild/rules_go/go/runfiles"
20 corev1 "k8s.io/api/core/v1"
21 kerrors "k8s.io/apimachinery/pkg/api/errors"
22 "k8s.io/apimachinery/pkg/api/resource"
23 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24 podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
25
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020026 mlaunch "source.monogon.dev/metropolis/test/launch"
27 "source.monogon.dev/metropolis/test/localregistry"
Serge Bazanski99b02142024-04-17 16:33:28 +020028 "source.monogon.dev/metropolis/test/util"
29
30 common "source.monogon.dev/metropolis/node"
31)
32
Tim Windelschmidt82e6af72024-07-23 00:05:42 +000033var (
34 // These are filled by bazel at linking time with the canonical path of
35 // their corresponding file. Inside the init function we resolve it
36 // with the rules_go runfiles package to the real path.
37 xTestImagesManifestPath string
38)
39
40func init() {
41 var err error
42 for _, path := range []*string{
43 &xTestImagesManifestPath,
44 } {
45 *path, err = runfiles.Rlocation(*path)
46 if err != nil {
47 panic(err)
48 }
49 }
50}
51
Serge Bazanski99b02142024-04-17 16:33:28 +020052const (
53 // Timeout for the global test context.
54 //
55 // Bazel would eventually time out the test after 900s ("large") if, for
56 // some reason, the context cancellation fails to abort it.
57 globalTestTimeout = 600 * time.Second
58
59 // Timeouts for individual end-to-end tests of different sizes.
60 smallTestTimeout = 60 * time.Second
61 largeTestTimeout = 120 * time.Second
62)
63
64// TestE2EKubernetes exercises the Kubernetes functionality of Metropolis.
65//
66// The tests are performed against an in-memory cluster.
67func TestE2EKubernetes(t *testing.T) {
68 // Set a global timeout to make sure this terminates
69 ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
70 defer cancel()
71
Tim Windelschmidt82e6af72024-07-23 00:05:42 +000072 df, err := os.ReadFile(xTestImagesManifestPath)
Serge Bazanski99b02142024-04-17 16:33:28 +020073 if err != nil {
74 t.Fatalf("Reading registry manifest failed: %v", err)
75 }
76 lr, err := localregistry.FromBazelManifest(df)
77 if err != nil {
78 t.Fatalf("Creating test image registry failed: %v", err)
79 }
80
81 // Launch cluster.
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020082 clusterOptions := mlaunch.ClusterOptions{
Serge Bazanski99b02142024-04-17 16:33:28 +020083 NumNodes: 2,
84 LocalRegistry: lr,
85 }
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020086 cluster, err := mlaunch.LaunchCluster(ctx, clusterOptions)
Serge Bazanski99b02142024-04-17 16:33:28 +020087 if err != nil {
88 t.Fatalf("LaunchCluster failed: %v", err)
89 }
90 defer func() {
91 err := cluster.Close()
92 if err != nil {
93 t.Fatalf("cluster Close failed: %v", err)
94 }
95 }()
96
97 clientSet, err := cluster.GetKubeClientSet()
98 if err != nil {
99 t.Fatal(err)
100 }
101 util.TestEventual(t, "Add KubernetesWorker roles", ctx, smallTestTimeout, func(ctx context.Context) error {
102 // Make everything but the first node into KubernetesWorkers.
103 for i := 1; i < clusterOptions.NumNodes; i++ {
104 err := cluster.MakeKubernetesWorker(ctx, cluster.NodeIDs[i])
105 if err != nil {
106 return util.Permanent(fmt.Errorf("MakeKubernetesWorker: %w", err))
107 }
108 }
109 return nil
110 })
111 util.TestEventual(t, "Node is registered and ready", ctx, largeTestTimeout, func(ctx context.Context) error {
112 nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
113 if err != nil {
114 return err
115 }
116 if len(nodes.Items) < 1 {
117 return errors.New("node not yet registered")
118 }
119 node := nodes.Items[0]
120 for _, cond := range node.Status.Conditions {
121 if cond.Type != corev1.NodeReady {
122 continue
123 }
124 if cond.Status != corev1.ConditionTrue {
125 return fmt.Errorf("node not ready: %v", cond.Message)
126 }
127 }
128 return nil
129 })
130 util.TestEventual(t, "Simple deployment", ctx, largeTestTimeout, func(ctx context.Context) error {
131 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeTestDeploymentSpec("test-deploy-1"), metav1.CreateOptions{})
132 return err
133 })
134 util.TestEventual(t, "Simple deployment is running", ctx, largeTestTimeout, func(ctx context.Context) error {
135 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-1"})
136 if err != nil {
137 return err
138 }
139 if len(res.Items) == 0 {
140 return errors.New("pod didn't get created")
141 }
142 pod := res.Items[0]
143 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
144 return nil
145 }
146 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
147 if err != nil || len(events.Items) == 0 {
148 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
149 } else {
150 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
151 }
152 })
153 util.TestEventual(t, "Simple deployment with gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
154 deployment := makeTestDeploymentSpec("test-deploy-2")
155 gvisorStr := "gvisor"
156 deployment.Spec.Template.Spec.RuntimeClassName = &gvisorStr
157 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
158 return err
159 })
160 util.TestEventual(t, "Simple deployment is running on gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
161 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-2"})
162 if err != nil {
163 return err
164 }
165 if len(res.Items) == 0 {
166 return errors.New("pod didn't get created")
167 }
168 pod := res.Items[0]
169 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
170 return nil
171 }
172 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
173 if err != nil || len(events.Items) == 0 {
174 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
175 } else {
176 var errorMsg strings.Builder
177 for _, msg := range events.Items {
178 errorMsg.WriteString(" | ")
179 errorMsg.WriteString(msg.Message)
180 }
181 return fmt.Errorf("pod is not ready: %v", errorMsg.String())
182 }
183 })
184 util.TestEventual(t, "Simple StatefulSet with PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
185 _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-1", corev1.PersistentVolumeFilesystem), metav1.CreateOptions{})
186 return err
187 })
188 util.TestEventual(t, "Simple StatefulSet with PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
189 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-1"})
190 if err != nil {
191 return err
192 }
193 if len(res.Items) == 0 {
194 return errors.New("pod didn't get created")
195 }
196 pod := res.Items[0]
197 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
198 return nil
199 }
200 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
201 if err != nil || len(events.Items) == 0 {
202 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
203 } else {
204 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
205 }
206 })
207 util.TestEventual(t, "Simple StatefulSet with Block PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
208 _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-2", corev1.PersistentVolumeBlock), metav1.CreateOptions{})
209 return err
210 })
211 util.TestEventual(t, "Simple StatefulSet with Block PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
212 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-2"})
213 if err != nil {
214 return err
215 }
216 if len(res.Items) == 0 {
217 return errors.New("pod didn't get created")
218 }
219 pod := res.Items[0]
220 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
221 return nil
222 }
223 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
224 if err != nil || len(events.Items) == 0 {
225 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
226 } else {
227 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
228 }
229 })
230 util.TestEventual(t, "In-cluster self-test job", ctx, smallTestTimeout, func(ctx context.Context) error {
231 _, err := clientSet.BatchV1().Jobs("default").Create(ctx, makeSelftestSpec("selftest"), metav1.CreateOptions{})
232 return err
233 })
234 util.TestEventual(t, "In-cluster self-test job passed", ctx, smallTestTimeout, func(ctx context.Context) error {
235 res, err := clientSet.BatchV1().Jobs("default").Get(ctx, "selftest", metav1.GetOptions{})
236 if err != nil {
237 return err
238 }
239 if res.Status.Failed > 0 {
240 pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
241 LabelSelector: "job-name=selftest",
242 })
243 if err != nil {
244 return util.Permanent(fmt.Errorf("job failed but failed to find pod: %w", err))
245 }
246 if len(pods.Items) < 1 {
247 return fmt.Errorf("job failed but pod does not exist")
248 }
249 lines, err := getPodLogLines(ctx, clientSet, pods.Items[0].Name, 1)
250 if err != nil {
251 return fmt.Errorf("job failed but could not get logs: %w", err)
252 }
253 if len(lines) > 0 {
254 return util.Permanent(fmt.Errorf("job failed, last log line: %s", lines[0]))
255 }
256 return util.Permanent(fmt.Errorf("job failed, empty log"))
257 }
258 if res.Status.Succeeded > 0 {
259 return nil
260 }
261 return fmt.Errorf("job still running")
262 })
263 util.TestEventual(t, "Start NodePort test setup", ctx, smallTestTimeout, func(ctx context.Context) error {
264 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeHTTPServerDeploymentSpec("nodeport-server"), metav1.CreateOptions{})
265 if err != nil && !kerrors.IsAlreadyExists(err) {
266 return err
267 }
268 _, err = clientSet.CoreV1().Services("default").Create(ctx, makeHTTPServerNodePortService("nodeport-server"), metav1.CreateOptions{})
269 if err != nil && !kerrors.IsAlreadyExists(err) {
270 return err
271 }
272 return nil
273 })
274 util.TestEventual(t, "NodePort accessible from all nodes", ctx, smallTestTimeout, func(ctx context.Context) error {
275 nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
276 if err != nil {
277 return err
278 }
279 // Use a new client for each attempt
280 hc := http.Client{
281 Timeout: 2 * time.Second,
282 Transport: &http.Transport{
283 Dial: cluster.SOCKSDialer.Dial,
284 },
285 }
286 for _, n := range nodes.Items {
287 var addr string
288 for _, a := range n.Status.Addresses {
289 if a.Type == corev1.NodeInternalIP {
290 addr = a.Address
291 }
292 }
293 u := url.URL{Scheme: "http", Host: addr, Path: "/"}
294 res, err := hc.Get(u.String())
295 if err != nil {
296 return fmt.Errorf("failed getting from node %q: %w", n.Name, err)
297 }
298 if res.StatusCode != http.StatusOK {
299 return fmt.Errorf("getting from node %q: HTTP %d", n.Name, res.StatusCode)
300 }
301 t.Logf("Got response from %q", n.Name)
302 }
303 return nil
304 })
305 util.TestEventual(t, "containerd metrics retrieved", ctx, smallTestTimeout, func(ctx context.Context) error {
306 pool := x509.NewCertPool()
307 pool.AddCert(cluster.CACertificate)
308 cl := http.Client{
309 Transport: &http.Transport{
310 TLSClientConfig: &tls.Config{
311 Certificates: []tls.Certificate{cluster.Owner},
312 RootCAs: pool,
313 },
314 DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
315 return cluster.DialNode(ctx, addr)
316 },
317 },
318 }
319 u := url.URL{
320 Scheme: "https",
321 Host: net.JoinHostPort(cluster.NodeIDs[1], common.MetricsPort.PortString()),
322 Path: "/metrics/containerd",
323 }
324 res, err := cl.Get(u.String())
325 if err != nil {
326 return err
327 }
328 defer res.Body.Close()
329 if res.StatusCode != 200 {
330 return fmt.Errorf("status code %d", res.StatusCode)
331 }
332
333 body, err := io.ReadAll(res.Body)
334 if err != nil {
335 return err
336 }
337 needle := "containerd_build_info_total"
338 if !strings.Contains(string(body), needle) {
339 return util.Permanent(fmt.Errorf("could not find %q in returned response", needle))
340 }
341 return nil
342 })
343 if os.Getenv("HAVE_NESTED_KVM") != "" {
344 util.TestEventual(t, "Pod for KVM/QEMU smoke test", ctx, smallTestTimeout, func(ctx context.Context) error {
345 runcRuntimeClass := "runc"
346 _, err := clientSet.CoreV1().Pods("default").Create(ctx, &corev1.Pod{
347 ObjectMeta: metav1.ObjectMeta{
348 Name: "vm-smoketest",
349 },
350 Spec: corev1.PodSpec{
351 Containers: []corev1.Container{{
352 Name: "vm-smoketest",
353 ImagePullPolicy: corev1.PullNever,
354 Image: "test.monogon.internal/metropolis/vm/smoketest:smoketest_container",
355 Resources: corev1.ResourceRequirements{
356 Limits: corev1.ResourceList{
357 "devices.monogon.dev/kvm": *resource.NewQuantity(1, ""),
358 },
359 },
360 }},
361 RuntimeClassName: &runcRuntimeClass,
362 RestartPolicy: corev1.RestartPolicyNever,
363 },
364 }, metav1.CreateOptions{})
365 return err
366 })
367 util.TestEventual(t, "KVM/QEMU smoke test completion", ctx, smallTestTimeout, func(ctx context.Context) error {
368 pod, err := clientSet.CoreV1().Pods("default").Get(ctx, "vm-smoketest", metav1.GetOptions{})
369 if err != nil {
370 return fmt.Errorf("failed to get pod: %w", err)
371 }
372 if pod.Status.Phase == corev1.PodSucceeded {
373 return nil
374 }
375 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
376 if err != nil || len(events.Items) == 0 {
377 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
378 } else {
379 return fmt.Errorf("pod is not ready: %v", events.Items[len(events.Items)-1].Message)
380 }
381 })
382 }
383}