blob: ed0820382509c6fbcb435e17a0eb52117806dce7 [file] [log] [blame]
Serge Bazanski99b02142024-04-17 16:33:28 +02001package kubernetes
2
3import (
4 "context"
5 "crypto/tls"
6 "crypto/x509"
7 "errors"
8 "fmt"
9 "io"
10 "net"
11 "net/http"
12 _ "net/http/pprof"
13 "net/url"
14 "os"
15 "strings"
16 "testing"
17 "time"
18
19 "github.com/bazelbuild/rules_go/go/runfiles"
20 corev1 "k8s.io/api/core/v1"
21 kerrors "k8s.io/apimachinery/pkg/api/errors"
22 "k8s.io/apimachinery/pkg/api/resource"
23 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24 podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
25
Lorenz Brun732a8842024-08-26 23:25:37 +020026 common "source.monogon.dev/metropolis/node"
27 cpb "source.monogon.dev/metropolis/proto/common"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020028 mlaunch "source.monogon.dev/metropolis/test/launch"
29 "source.monogon.dev/metropolis/test/localregistry"
Serge Bazanski99b02142024-04-17 16:33:28 +020030 "source.monogon.dev/metropolis/test/util"
Serge Bazanski99b02142024-04-17 16:33:28 +020031)
32
Tim Windelschmidt82e6af72024-07-23 00:05:42 +000033var (
34 // These are filled by bazel at linking time with the canonical path of
35 // their corresponding file. Inside the init function we resolve it
36 // with the rules_go runfiles package to the real path.
37 xTestImagesManifestPath string
38)
39
40func init() {
41 var err error
42 for _, path := range []*string{
43 &xTestImagesManifestPath,
44 } {
45 *path, err = runfiles.Rlocation(*path)
46 if err != nil {
47 panic(err)
48 }
49 }
50}
51
Serge Bazanski99b02142024-04-17 16:33:28 +020052const (
53 // Timeout for the global test context.
54 //
55 // Bazel would eventually time out the test after 900s ("large") if, for
56 // some reason, the context cancellation fails to abort it.
57 globalTestTimeout = 600 * time.Second
58
59 // Timeouts for individual end-to-end tests of different sizes.
60 smallTestTimeout = 60 * time.Second
61 largeTestTimeout = 120 * time.Second
62)
63
64// TestE2EKubernetes exercises the Kubernetes functionality of Metropolis.
65//
66// The tests are performed against an in-memory cluster.
67func TestE2EKubernetes(t *testing.T) {
68 // Set a global timeout to make sure this terminates
69 ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
70 defer cancel()
71
Tim Windelschmidt82e6af72024-07-23 00:05:42 +000072 df, err := os.ReadFile(xTestImagesManifestPath)
Serge Bazanski99b02142024-04-17 16:33:28 +020073 if err != nil {
74 t.Fatalf("Reading registry manifest failed: %v", err)
75 }
76 lr, err := localregistry.FromBazelManifest(df)
77 if err != nil {
78 t.Fatalf("Creating test image registry failed: %v", err)
79 }
80
81 // Launch cluster.
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020082 clusterOptions := mlaunch.ClusterOptions{
Serge Bazanski99b02142024-04-17 16:33:28 +020083 NumNodes: 2,
84 LocalRegistry: lr,
Lorenz Brun732a8842024-08-26 23:25:37 +020085 InitialClusterConfiguration: &cpb.ClusterConfiguration{
86 TpmMode: cpb.ClusterConfiguration_TPM_MODE_DISABLED,
87 StorageSecurityPolicy: cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_INSECURE,
88 },
Serge Bazanski99b02142024-04-17 16:33:28 +020089 }
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020090 cluster, err := mlaunch.LaunchCluster(ctx, clusterOptions)
Serge Bazanski99b02142024-04-17 16:33:28 +020091 if err != nil {
92 t.Fatalf("LaunchCluster failed: %v", err)
93 }
94 defer func() {
95 err := cluster.Close()
96 if err != nil {
97 t.Fatalf("cluster Close failed: %v", err)
98 }
99 }()
100
101 clientSet, err := cluster.GetKubeClientSet()
102 if err != nil {
103 t.Fatal(err)
104 }
105 util.TestEventual(t, "Add KubernetesWorker roles", ctx, smallTestTimeout, func(ctx context.Context) error {
106 // Make everything but the first node into KubernetesWorkers.
107 for i := 1; i < clusterOptions.NumNodes; i++ {
108 err := cluster.MakeKubernetesWorker(ctx, cluster.NodeIDs[i])
109 if err != nil {
110 return util.Permanent(fmt.Errorf("MakeKubernetesWorker: %w", err))
111 }
112 }
113 return nil
114 })
115 util.TestEventual(t, "Node is registered and ready", ctx, largeTestTimeout, func(ctx context.Context) error {
116 nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
117 if err != nil {
118 return err
119 }
120 if len(nodes.Items) < 1 {
121 return errors.New("node not yet registered")
122 }
123 node := nodes.Items[0]
124 for _, cond := range node.Status.Conditions {
125 if cond.Type != corev1.NodeReady {
126 continue
127 }
128 if cond.Status != corev1.ConditionTrue {
129 return fmt.Errorf("node not ready: %v", cond.Message)
130 }
131 }
132 return nil
133 })
134 util.TestEventual(t, "Simple deployment", ctx, largeTestTimeout, func(ctx context.Context) error {
135 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeTestDeploymentSpec("test-deploy-1"), metav1.CreateOptions{})
136 return err
137 })
138 util.TestEventual(t, "Simple deployment is running", ctx, largeTestTimeout, func(ctx context.Context) error {
139 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-1"})
140 if err != nil {
141 return err
142 }
143 if len(res.Items) == 0 {
144 return errors.New("pod didn't get created")
145 }
146 pod := res.Items[0]
147 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
148 return nil
149 }
150 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
151 if err != nil || len(events.Items) == 0 {
152 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
153 } else {
154 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
155 }
156 })
157 util.TestEventual(t, "Simple deployment with gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
158 deployment := makeTestDeploymentSpec("test-deploy-2")
159 gvisorStr := "gvisor"
160 deployment.Spec.Template.Spec.RuntimeClassName = &gvisorStr
161 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
162 return err
163 })
164 util.TestEventual(t, "Simple deployment is running on gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
165 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-2"})
166 if err != nil {
167 return err
168 }
169 if len(res.Items) == 0 {
170 return errors.New("pod didn't get created")
171 }
172 pod := res.Items[0]
173 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
174 return nil
175 }
176 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
177 if err != nil || len(events.Items) == 0 {
178 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
179 } else {
180 var errorMsg strings.Builder
181 for _, msg := range events.Items {
182 errorMsg.WriteString(" | ")
183 errorMsg.WriteString(msg.Message)
184 }
185 return fmt.Errorf("pod is not ready: %v", errorMsg.String())
186 }
187 })
188 util.TestEventual(t, "Simple StatefulSet with PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
189 _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-1", corev1.PersistentVolumeFilesystem), metav1.CreateOptions{})
190 return err
191 })
192 util.TestEventual(t, "Simple StatefulSet with PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
193 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-1"})
194 if err != nil {
195 return err
196 }
197 if len(res.Items) == 0 {
198 return errors.New("pod didn't get created")
199 }
200 pod := res.Items[0]
201 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
202 return nil
203 }
204 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
205 if err != nil || len(events.Items) == 0 {
206 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
207 } else {
208 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
209 }
210 })
211 util.TestEventual(t, "Simple StatefulSet with Block PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
212 _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-2", corev1.PersistentVolumeBlock), metav1.CreateOptions{})
213 return err
214 })
215 util.TestEventual(t, "Simple StatefulSet with Block PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
216 res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-2"})
217 if err != nil {
218 return err
219 }
220 if len(res.Items) == 0 {
221 return errors.New("pod didn't get created")
222 }
223 pod := res.Items[0]
224 if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
225 return nil
226 }
227 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
228 if err != nil || len(events.Items) == 0 {
229 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
230 } else {
231 return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
232 }
233 })
234 util.TestEventual(t, "In-cluster self-test job", ctx, smallTestTimeout, func(ctx context.Context) error {
235 _, err := clientSet.BatchV1().Jobs("default").Create(ctx, makeSelftestSpec("selftest"), metav1.CreateOptions{})
236 return err
237 })
238 util.TestEventual(t, "In-cluster self-test job passed", ctx, smallTestTimeout, func(ctx context.Context) error {
239 res, err := clientSet.BatchV1().Jobs("default").Get(ctx, "selftest", metav1.GetOptions{})
240 if err != nil {
241 return err
242 }
243 if res.Status.Failed > 0 {
244 pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
245 LabelSelector: "job-name=selftest",
246 })
247 if err != nil {
248 return util.Permanent(fmt.Errorf("job failed but failed to find pod: %w", err))
249 }
250 if len(pods.Items) < 1 {
251 return fmt.Errorf("job failed but pod does not exist")
252 }
253 lines, err := getPodLogLines(ctx, clientSet, pods.Items[0].Name, 1)
254 if err != nil {
255 return fmt.Errorf("job failed but could not get logs: %w", err)
256 }
257 if len(lines) > 0 {
258 return util.Permanent(fmt.Errorf("job failed, last log line: %s", lines[0]))
259 }
260 return util.Permanent(fmt.Errorf("job failed, empty log"))
261 }
262 if res.Status.Succeeded > 0 {
263 return nil
264 }
265 return fmt.Errorf("job still running")
266 })
267 util.TestEventual(t, "Start NodePort test setup", ctx, smallTestTimeout, func(ctx context.Context) error {
268 _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeHTTPServerDeploymentSpec("nodeport-server"), metav1.CreateOptions{})
269 if err != nil && !kerrors.IsAlreadyExists(err) {
270 return err
271 }
272 _, err = clientSet.CoreV1().Services("default").Create(ctx, makeHTTPServerNodePortService("nodeport-server"), metav1.CreateOptions{})
273 if err != nil && !kerrors.IsAlreadyExists(err) {
274 return err
275 }
276 return nil
277 })
278 util.TestEventual(t, "NodePort accessible from all nodes", ctx, smallTestTimeout, func(ctx context.Context) error {
279 nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
280 if err != nil {
281 return err
282 }
283 // Use a new client for each attempt
284 hc := http.Client{
285 Timeout: 2 * time.Second,
286 Transport: &http.Transport{
287 Dial: cluster.SOCKSDialer.Dial,
288 },
289 }
290 for _, n := range nodes.Items {
291 var addr string
292 for _, a := range n.Status.Addresses {
293 if a.Type == corev1.NodeInternalIP {
294 addr = a.Address
295 }
296 }
297 u := url.URL{Scheme: "http", Host: addr, Path: "/"}
298 res, err := hc.Get(u.String())
299 if err != nil {
300 return fmt.Errorf("failed getting from node %q: %w", n.Name, err)
301 }
302 if res.StatusCode != http.StatusOK {
303 return fmt.Errorf("getting from node %q: HTTP %d", n.Name, res.StatusCode)
304 }
305 t.Logf("Got response from %q", n.Name)
306 }
307 return nil
308 })
309 util.TestEventual(t, "containerd metrics retrieved", ctx, smallTestTimeout, func(ctx context.Context) error {
310 pool := x509.NewCertPool()
311 pool.AddCert(cluster.CACertificate)
312 cl := http.Client{
313 Transport: &http.Transport{
314 TLSClientConfig: &tls.Config{
315 Certificates: []tls.Certificate{cluster.Owner},
316 RootCAs: pool,
317 },
318 DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
319 return cluster.DialNode(ctx, addr)
320 },
321 },
322 }
323 u := url.URL{
324 Scheme: "https",
325 Host: net.JoinHostPort(cluster.NodeIDs[1], common.MetricsPort.PortString()),
326 Path: "/metrics/containerd",
327 }
328 res, err := cl.Get(u.String())
329 if err != nil {
330 return err
331 }
332 defer res.Body.Close()
333 if res.StatusCode != 200 {
334 return fmt.Errorf("status code %d", res.StatusCode)
335 }
336
337 body, err := io.ReadAll(res.Body)
338 if err != nil {
339 return err
340 }
341 needle := "containerd_build_info_total"
342 if !strings.Contains(string(body), needle) {
343 return util.Permanent(fmt.Errorf("could not find %q in returned response", needle))
344 }
345 return nil
346 })
347 if os.Getenv("HAVE_NESTED_KVM") != "" {
348 util.TestEventual(t, "Pod for KVM/QEMU smoke test", ctx, smallTestTimeout, func(ctx context.Context) error {
349 runcRuntimeClass := "runc"
350 _, err := clientSet.CoreV1().Pods("default").Create(ctx, &corev1.Pod{
351 ObjectMeta: metav1.ObjectMeta{
352 Name: "vm-smoketest",
353 },
354 Spec: corev1.PodSpec{
355 Containers: []corev1.Container{{
356 Name: "vm-smoketest",
357 ImagePullPolicy: corev1.PullNever,
358 Image: "test.monogon.internal/metropolis/vm/smoketest:smoketest_container",
359 Resources: corev1.ResourceRequirements{
360 Limits: corev1.ResourceList{
361 "devices.monogon.dev/kvm": *resource.NewQuantity(1, ""),
362 },
363 },
364 }},
365 RuntimeClassName: &runcRuntimeClass,
366 RestartPolicy: corev1.RestartPolicyNever,
367 },
368 }, metav1.CreateOptions{})
369 return err
370 })
371 util.TestEventual(t, "KVM/QEMU smoke test completion", ctx, smallTestTimeout, func(ctx context.Context) error {
372 pod, err := clientSet.CoreV1().Pods("default").Get(ctx, "vm-smoketest", metav1.GetOptions{})
373 if err != nil {
374 return fmt.Errorf("failed to get pod: %w", err)
375 }
376 if pod.Status.Phase == corev1.PodSucceeded {
377 return nil
378 }
379 events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
380 if err != nil || len(events.Items) == 0 {
381 return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
382 } else {
383 return fmt.Errorf("pod is not ready: %v", events.Items[len(events.Items)-1].Message)
384 }
385 })
386 }
387}