m/test/e2e: split core/kubernetes tests, clean up
This splits the large TestE2E function into two separate functions and
tests: one which exercises the core functionality of Kubernetes, the
other which exercises just the Kubernetes bits.
This allows for easier testing during development, and generally trades
off higher resources usage for faster execution time in CI.
At the same time we do some small cleanups of the E2E functionality:
1. Node startup is now parallelized.
2. Non-bootstrap nodes can now be left in NEW (this was used in
diagnosing issue #234, but it currently unused in the main code).
3. Kubernetes access now goes over SOCKS.
4. Some Cluster helper functions have been added.
All in all this should allow us writing more E2E tests in the future,
and at some point also maybe turn Cluster into an interface that is
implemented both by the current framework but also some persistent tests
running against long-term VMs/physical machines.
Change-Id: Ia4586b2aaa5fc8c979d35f4b49513638481e4c10
Reviewed-on: https://review.monogon.dev/c/monogon/+/1870
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/test/e2e/BUILD.bazel b/metropolis/test/e2e/BUILD.bazel
index 2cde2c6..25a6c88 100644
--- a/metropolis/test/e2e/BUILD.bazel
+++ b/metropolis/test/e2e/BUILD.bazel
@@ -6,7 +6,6 @@
importpath = "source.monogon.dev/metropolis/test/e2e",
visibility = ["//metropolis/test:__subpackages__"],
deps = [
- "//metropolis/test/launch/cluster",
"@io_k8s_api//apps/v1:apps",
"@io_k8s_api//batch/v1:batch",
"@io_k8s_api//core/v1:core",
@@ -14,7 +13,6 @@
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_apimachinery//pkg/util/intstr",
"@io_k8s_client_go//kubernetes",
- "@io_k8s_client_go//rest",
],
)
diff --git a/metropolis/test/e2e/k8s_cts/BUILD.bazel b/metropolis/test/e2e/k8s_cts/BUILD.bazel
index 6cc4080..8458b2d 100644
--- a/metropolis/test/e2e/k8s_cts/BUILD.bazel
+++ b/metropolis/test/e2e/k8s_cts/BUILD.bazel
@@ -6,8 +6,6 @@
importpath = "source.monogon.dev/metropolis/test/e2e/k8s_cts",
visibility = ["//visibility:private"],
deps = [
- "//metropolis/node",
- "//metropolis/test/e2e",
"//metropolis/test/launch/cluster",
"@io_k8s_api//core/v1:core",
"@io_k8s_api//rbac/v1:rbac",
diff --git a/metropolis/test/e2e/k8s_cts/main.go b/metropolis/test/e2e/k8s_cts/main.go
index c0adc75..4b86b64 100644
--- a/metropolis/test/e2e/k8s_cts/main.go
+++ b/metropolis/test/e2e/k8s_cts/main.go
@@ -33,8 +33,6 @@
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- common "source.monogon.dev/metropolis/node"
- "source.monogon.dev/metropolis/test/e2e"
"source.monogon.dev/metropolis/test/launch/cluster"
)
@@ -106,8 +104,7 @@
log.Fatalf("Failed to launch cluster: %v", err)
}
log.Println("Cluster initialized")
- // TODO(q3k): use SOCKS proxy instead.
- clientSet, err := e2e.GetKubeClientSet(cl, cl.Ports[uint16(common.KubernetesAPIWrappedPort)])
+ clientSet, err := cl.GetKubeClientSet()
if err != nil {
log.Fatalf("Failed to get clientSet: %v", err)
}
diff --git a/metropolis/test/e2e/kubernetes_helpers.go b/metropolis/test/e2e/kubernetes_helpers.go
index 066c1c2..cba4ca7 100644
--- a/metropolis/test/e2e/kubernetes_helpers.go
+++ b/metropolis/test/e2e/kubernetes_helpers.go
@@ -19,8 +19,6 @@
import (
"bytes"
"context"
- "crypto/x509"
- "encoding/pem"
"fmt"
"io"
"strings"
@@ -32,33 +30,8 @@
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
- "k8s.io/client-go/rest"
-
- "source.monogon.dev/metropolis/test/launch/cluster"
)
-// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
-// Kubernetes authenticating proxy using the cluster owner identity.
-// It currently has access to everything (i.e. the cluster-admin role)
-// via the owner-admin binding.
-func GetKubeClientSet(cluster *cluster.Cluster, port uint16) (kubernetes.Interface, error) {
- pkcs8Key, err := x509.MarshalPKCS8PrivateKey(cluster.Owner.PrivateKey)
- if err != nil {
- // We explicitly pass an Ed25519 private key in, so this can't happen
- panic(err)
- }
- var clientConfig = rest.Config{
- Host: fmt.Sprintf("localhost:%v", port),
- TLSClientConfig: rest.TLSClientConfig{
- ServerName: "kubernetes.default.svc",
- Insecure: true,
- CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cluster.Owner.Certificate[0]}),
- KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
- },
- }
- return kubernetes.NewForConfig(&clientConfig)
-}
-
// makeTestDeploymentSpec generates a Deployment spec for a single pod running
// NGINX with a readiness probe. This allows verifying that the control plane
// is capable of scheduling simple pods and that kubelet works, its runtime is
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index c2b9d95..a2c9eaf 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -60,28 +60,11 @@
largeTestTimeout = 120 * time.Second
)
-// TestE2E is the main E2E test entrypoint for single-node freshly-bootstrapped
-// E2E tests. It starts a full Metropolis node in bootstrap mode and then runs
-// tests against it. The actual tests it performs are located in the RunGroup
-// subtest.
-func TestE2E(t *testing.T) {
- // Run pprof server for debugging
- addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
- if err != nil {
- panic(err)
- }
-
- pprofListen, err := net.ListenTCP("tcp", addr)
- if err != nil {
- launch.Fatal("Failed to listen on pprof port: %s", pprofListen.Addr())
- }
-
- launch.Log("E2E: pprof server listening on %s", pprofListen.Addr())
- go func() {
- launch.Log("E2E: pprof server returned an error: %v", http.Serve(pprofListen, nil))
- pprofListen.Close()
- }()
-
+// TestE2ECore exercisees the core functionality of Metropolis: maintaining a
+// control plane, changing node roles, ...
+//
+// The tests are performed against an in-memory cluster.
+func TestE2ECore(t *testing.T) {
// Set a global timeout to make sure this terminates
ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
defer cancel()
@@ -113,331 +96,311 @@
defer cl.Close()
mgmt := apb.NewManagementClient(cl)
- // This exists to keep the parent around while all the children race.
- // It currently tests both a set of OS-level conditions and Kubernetes
- // Deployments and StatefulSets
- t.Run("RunGroup", func(t *testing.T) {
- t.Run("Cluster", func(t *testing.T) {
- util.TestEventual(t, "Retrieving cluster directory sucessful", ctx, 60*time.Second, func(ctx context.Context) error {
- res, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
- if err != nil {
- return fmt.Errorf("GetClusterInfo: %w", err)
- }
+ util.TestEventual(t, "Retrieving cluster directory sucessful", ctx, 60*time.Second, func(ctx context.Context) error {
+ res, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+ if err != nil {
+ return fmt.Errorf("GetClusterInfo: %w", err)
+ }
- // Ensure that the expected node count is present.
- nodes := res.ClusterDirectory.Nodes
- if want, got := clusterOptions.NumNodes, len(nodes); want != got {
- return fmt.Errorf("wanted %d nodes in cluster directory, got %d", want, got)
- }
+ // Ensure that the expected node count is present.
+ nodes := res.ClusterDirectory.Nodes
+ if want, got := clusterOptions.NumNodes, len(nodes); want != got {
+ return fmt.Errorf("wanted %d nodes in cluster directory, got %d", want, got)
+ }
- // Ensure the nodes have the expected addresses.
- addresses := make(map[string]bool)
- for _, n := range nodes {
- if len(n.Addresses) != 1 {
- return fmt.Errorf("node %s has no addresss", identity.NodeID(n.PublicKey))
- }
- address := n.Addresses[0].Host
- addresses[address] = true
- }
-
- for _, address := range []string{"10.1.0.2", "10.1.0.3"} {
- if !addresses[address] {
- return fmt.Errorf("address %q not found in directory", address)
- }
- }
- return nil
- })
- util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
- util.TestEventual(t, "Node rejoin successful", ctx, 60*time.Second, func(ctx context.Context) error {
- // Ensure nodes rejoin the cluster after a reboot by reboting the 1st node.
- if err := cluster.RebootNode(ctx, 1); err != nil {
- return fmt.Errorf("while rebooting a node: %w", err)
- }
- return nil
- })
- util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
- })
- t.Run("Kubernetes", func(t *testing.T) {
- t.Parallel()
- // TODO(q3k): use SOCKS proxy.
- clientSet, err := GetKubeClientSet(cluster, cluster.Ports[uint16(common.KubernetesAPIWrappedPort)])
- if err != nil {
- t.Fatal(err)
+ // Ensure the nodes have the expected addresses.
+ addresses := make(map[string]bool)
+ for _, n := range nodes {
+ if len(n.Addresses) != 1 {
+ return fmt.Errorf("node %s has no addresss", identity.NodeID(n.PublicKey))
}
- util.TestEventual(t, "Add KubernetesWorker roles", ctx, smallTestTimeout, func(ctx context.Context) error {
- // Find all nodes that are non-controllers.
- var ids []string
- srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
- if err != nil {
- return fmt.Errorf("GetNodes: %w", err)
- }
- defer srvN.CloseSend()
- for {
- node, err := srvN.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return fmt.Errorf("GetNodes.Recv: %w", err)
- }
- if node.Roles.KubernetesController != nil {
- continue
- }
- if node.Roles.ConsensusMember != nil {
- continue
- }
- ids = append(ids, identity.NodeID(node.Pubkey))
- }
+ address := n.Addresses[0].Host
+ addresses[address] = true
+ }
- if len(ids) < 1 {
- return fmt.Errorf("no appropriate nodes found")
- }
-
- // Make all these nodes as KubernetesWorker.
- for _, id := range ids {
- tr := true
- _, err := mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
- Node: &apb.UpdateNodeRolesRequest_Id{
- Id: id,
- },
- KubernetesWorker: &tr,
- })
- if err != nil {
- return fmt.Errorf("could not make node %q into kubernetes worker: %w", id, err)
- }
- }
- return nil
- })
- util.TestEventual(t, "Node is registered and ready", ctx, largeTestTimeout, func(ctx context.Context) error {
- nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
- if err != nil {
- return err
- }
- if len(nodes.Items) < 1 {
- return errors.New("node not yet registered")
- }
- node := nodes.Items[0]
- for _, cond := range node.Status.Conditions {
- if cond.Type != corev1.NodeReady {
- continue
- }
- if cond.Status != corev1.ConditionTrue {
- return fmt.Errorf("node not ready: %v", cond.Message)
- }
- }
- return nil
- })
- util.TestEventual(t, "Simple deployment", ctx, largeTestTimeout, func(ctx context.Context) error {
- _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeTestDeploymentSpec("test-deploy-1"), metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "Simple deployment is running", ctx, largeTestTimeout, func(ctx context.Context) error {
- res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-1"})
- if err != nil {
- return err
- }
- if len(res.Items) == 0 {
- return errors.New("pod didn't get created")
- }
- pod := res.Items[0]
- if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
- return nil
- }
- events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
- if err != nil || len(events.Items) == 0 {
- return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
- } else {
- return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
- }
- })
- util.TestEventual(t, "Simple deployment with gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
- deployment := makeTestDeploymentSpec("test-deploy-2")
- gvisorStr := "gvisor"
- deployment.Spec.Template.Spec.RuntimeClassName = &gvisorStr
- _, err := clientSet.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "Simple deployment is running on gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
- res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-2"})
- if err != nil {
- return err
- }
- if len(res.Items) == 0 {
- return errors.New("pod didn't get created")
- }
- pod := res.Items[0]
- if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
- return nil
- }
- events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
- if err != nil || len(events.Items) == 0 {
- return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
- } else {
- var errorMsg strings.Builder
- for _, msg := range events.Items {
- errorMsg.WriteString(" | ")
- errorMsg.WriteString(msg.Message)
- }
- return fmt.Errorf("pod is not ready: %v", errorMsg.String())
- }
- })
- util.TestEventual(t, "Simple StatefulSet with PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
- _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-1", corev1.PersistentVolumeFilesystem), metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "Simple StatefulSet with PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
- res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-1"})
- if err != nil {
- return err
- }
- if len(res.Items) == 0 {
- return errors.New("pod didn't get created")
- }
- pod := res.Items[0]
- if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
- return nil
- }
- events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
- if err != nil || len(events.Items) == 0 {
- return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
- } else {
- return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
- }
- })
- util.TestEventual(t, "Simple StatefulSet with Block PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
- _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-2", corev1.PersistentVolumeBlock), metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "Simple StatefulSet with Block PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
- res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-2"})
- if err != nil {
- return err
- }
- if len(res.Items) == 0 {
- return errors.New("pod didn't get created")
- }
- pod := res.Items[0]
- if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
- return nil
- }
- events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
- if err != nil || len(events.Items) == 0 {
- return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
- } else {
- return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
- }
- })
- util.TestEventual(t, "In-cluster self-test job", ctx, smallTestTimeout, func(ctx context.Context) error {
- _, err := clientSet.BatchV1().Jobs("default").Create(ctx, makeSelftestSpec("selftest"), metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "In-cluster self-test job passed", ctx, smallTestTimeout, func(ctx context.Context) error {
- res, err := clientSet.BatchV1().Jobs("default").Get(ctx, "selftest", metav1.GetOptions{})
- if err != nil {
- return err
- }
- if res.Status.Failed > 0 {
- pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
- LabelSelector: "job-name=selftest",
- })
- if err != nil {
- return util.Permanent(fmt.Errorf("job failed but failed to find pod: %w", err))
- }
- if len(pods.Items) < 1 {
- return fmt.Errorf("job failed but pod does not exist")
- }
- lines, err := getPodLogLines(ctx, clientSet, pods.Items[0].Name, 1)
- if err != nil {
- return fmt.Errorf("job failed but could not get logs: %w", err)
- }
- if len(lines) > 0 {
- return util.Permanent(fmt.Errorf("job failed, last log line: %s", lines[0]))
- }
- return util.Permanent(fmt.Errorf("job failed, empty log"))
- }
- if res.Status.Succeeded > 0 {
- return nil
- }
- return fmt.Errorf("job still running")
- })
- util.TestEventual(t, "Prometheus node metrics retrieved", ctx, smallTestTimeout, func(ctx context.Context) error {
- pool := x509.NewCertPool()
- pool.AddCert(cluster.CACertificate)
- cl := http.Client{
- Transport: &http.Transport{
- TLSClientConfig: &tls.Config{
- Certificates: []tls.Certificate{cluster.Owner},
- RootCAs: pool,
- },
- DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
- return cluster.DialNode(ctx, addr)
- },
- },
- }
- u := url.URL{
- Scheme: "https",
- Host: net.JoinHostPort(cluster.NodeIDs[0], common.MetricsPort.PortString()),
- Path: "/metrics/node",
- }
- res, err := cl.Get(u.String())
- if err != nil {
- return err
- }
- defer res.Body.Close()
- if res.StatusCode != 200 {
- return fmt.Errorf("status code %d", res.StatusCode)
- }
-
- body, err := io.ReadAll(res.Body)
- if err != nil {
- return err
- }
- needle := "node_uname_info"
- if !strings.Contains(string(body), needle) {
- return util.Permanent(fmt.Errorf("could not find %q in returned response", needle))
- }
- return nil
- })
- if os.Getenv("HAVE_NESTED_KVM") != "" {
- util.TestEventual(t, "Pod for KVM/QEMU smoke test", ctx, smallTestTimeout, func(ctx context.Context) error {
- runcRuntimeClass := "runc"
- _, err := clientSet.CoreV1().Pods("default").Create(ctx, &corev1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "vm-smoketest",
- },
- Spec: corev1.PodSpec{
- Containers: []corev1.Container{{
- Name: "vm-smoketest",
- ImagePullPolicy: corev1.PullNever,
- Image: "bazel/metropolis/vm/smoketest:smoketest_container",
- Resources: corev1.ResourceRequirements{
- Limits: corev1.ResourceList{
- "devices.monogon.dev/kvm": *resource.NewQuantity(1, ""),
- },
- },
- }},
- RuntimeClassName: &runcRuntimeClass,
- RestartPolicy: corev1.RestartPolicyNever,
- },
- }, metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "KVM/QEMU smoke test completion", ctx, smallTestTimeout, func(ctx context.Context) error {
- pod, err := clientSet.CoreV1().Pods("default").Get(ctx, "vm-smoketest", metav1.GetOptions{})
- if err != nil {
- return fmt.Errorf("failed to get pod: %w", err)
- }
- if pod.Status.Phase == corev1.PodSucceeded {
- return nil
- }
- events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
- if err != nil || len(events.Items) == 0 {
- return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
- } else {
- return fmt.Errorf("pod is not ready: %v", events.Items[len(events.Items)-1].Message)
- }
- })
+ for _, address := range []string{"10.1.0.2", "10.1.0.3"} {
+ if !addresses[address] {
+ return fmt.Errorf("address %q not found in directory", address)
}
- })
+ }
+ return nil
})
+ util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
+ util.TestEventual(t, "Node rejoin successful", ctx, 60*time.Second, func(ctx context.Context) error {
+ // Ensure nodes rejoin the cluster after a reboot by reboting the 1st node.
+ if err := cluster.RebootNode(ctx, 1); err != nil {
+ return fmt.Errorf("while rebooting a node: %w", err)
+ }
+ return nil
+ })
+ util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
+ util.TestEventual(t, "Prometheus node metrics retrieved", ctx, smallTestTimeout, func(ctx context.Context) error {
+ pool := x509.NewCertPool()
+ pool.AddCert(cluster.CACertificate)
+ cl := http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ Certificates: []tls.Certificate{cluster.Owner},
+ RootCAs: pool,
+ },
+ DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
+ return cluster.DialNode(ctx, addr)
+ },
+ },
+ }
+ u := url.URL{
+ Scheme: "https",
+ Host: net.JoinHostPort(cluster.NodeIDs[0], common.MetricsPort.PortString()),
+ Path: "/metrics/node",
+ }
+ res, err := cl.Get(u.String())
+ if err != nil {
+ return err
+ }
+ defer res.Body.Close()
+ if res.StatusCode != 200 {
+ return fmt.Errorf("status code %d", res.StatusCode)
+ }
+
+ body, err := io.ReadAll(res.Body)
+ if err != nil {
+ return err
+ }
+ needle := "node_uname_info"
+ if !strings.Contains(string(body), needle) {
+ return util.Permanent(fmt.Errorf("could not find %q in returned response", needle))
+ }
+ return nil
+ })
+}
+
+// TestE2ECore exercisees the Kubernetes functionality of Metropolis.
+//
+// The tests are performed against an in-memory cluster.
+func TestE2EKubernetes(t *testing.T) {
+ // Set a global timeout to make sure this terminates
+ ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
+ defer cancel()
+
+ // Launch cluster.
+ clusterOptions := cluster.ClusterOptions{
+ NumNodes: 2,
+ }
+ cluster, err := cluster.LaunchCluster(ctx, clusterOptions)
+ if err != nil {
+ t.Fatalf("LaunchCluster failed: %v", err)
+ }
+ defer func() {
+ err := cluster.Close()
+ if err != nil {
+ t.Fatalf("cluster Close failed: %v", err)
+ }
+ }()
+
+ clientSet, err := cluster.GetKubeClientSet()
+ if err != nil {
+ t.Fatal(err)
+ }
+ util.TestEventual(t, "Add KubernetesWorker roles", ctx, smallTestTimeout, func(ctx context.Context) error {
+ // Make everything but the first node into KubernetesWorkers.
+ for i := 1; i < clusterOptions.NumNodes; i++ {
+ err := cluster.MakeKubernetesWorker(ctx, cluster.NodeIDs[i])
+ if err != nil {
+ return util.Permanent(fmt.Errorf("MakeKubernetesWorker: %w", err))
+ }
+ }
+ return nil
+ })
+ util.TestEventual(t, "Node is registered and ready", ctx, largeTestTimeout, func(ctx context.Context) error {
+ nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
+ if err != nil {
+ return err
+ }
+ if len(nodes.Items) < 1 {
+ return errors.New("node not yet registered")
+ }
+ node := nodes.Items[0]
+ for _, cond := range node.Status.Conditions {
+ if cond.Type != corev1.NodeReady {
+ continue
+ }
+ if cond.Status != corev1.ConditionTrue {
+ return fmt.Errorf("node not ready: %v", cond.Message)
+ }
+ }
+ return nil
+ })
+ util.TestEventual(t, "Simple deployment", ctx, largeTestTimeout, func(ctx context.Context) error {
+ _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeTestDeploymentSpec("test-deploy-1"), metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "Simple deployment is running", ctx, largeTestTimeout, func(ctx context.Context) error {
+ res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-1"})
+ if err != nil {
+ return err
+ }
+ if len(res.Items) == 0 {
+ return errors.New("pod didn't get created")
+ }
+ pod := res.Items[0]
+ if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
+ return nil
+ }
+ events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
+ if err != nil || len(events.Items) == 0 {
+ return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
+ } else {
+ return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
+ }
+ })
+ util.TestEventual(t, "Simple deployment with gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
+ deployment := makeTestDeploymentSpec("test-deploy-2")
+ gvisorStr := "gvisor"
+ deployment.Spec.Template.Spec.RuntimeClassName = &gvisorStr
+ _, err := clientSet.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "Simple deployment is running on gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
+ res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-2"})
+ if err != nil {
+ return err
+ }
+ if len(res.Items) == 0 {
+ return errors.New("pod didn't get created")
+ }
+ pod := res.Items[0]
+ if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
+ return nil
+ }
+ events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
+ if err != nil || len(events.Items) == 0 {
+ return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
+ } else {
+ var errorMsg strings.Builder
+ for _, msg := range events.Items {
+ errorMsg.WriteString(" | ")
+ errorMsg.WriteString(msg.Message)
+ }
+ return fmt.Errorf("pod is not ready: %v", errorMsg.String())
+ }
+ })
+ util.TestEventual(t, "Simple StatefulSet with PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
+ _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-1", corev1.PersistentVolumeFilesystem), metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "Simple StatefulSet with PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
+ res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-1"})
+ if err != nil {
+ return err
+ }
+ if len(res.Items) == 0 {
+ return errors.New("pod didn't get created")
+ }
+ pod := res.Items[0]
+ if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
+ return nil
+ }
+ events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
+ if err != nil || len(events.Items) == 0 {
+ return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
+ } else {
+ return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
+ }
+ })
+ util.TestEventual(t, "Simple StatefulSet with Block PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
+ _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-2", corev1.PersistentVolumeBlock), metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "Simple StatefulSet with Block PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
+ res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-2"})
+ if err != nil {
+ return err
+ }
+ if len(res.Items) == 0 {
+ return errors.New("pod didn't get created")
+ }
+ pod := res.Items[0]
+ if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
+ return nil
+ }
+ events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
+ if err != nil || len(events.Items) == 0 {
+ return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
+ } else {
+ return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
+ }
+ })
+ util.TestEventual(t, "In-cluster self-test job", ctx, smallTestTimeout, func(ctx context.Context) error {
+ _, err := clientSet.BatchV1().Jobs("default").Create(ctx, makeSelftestSpec("selftest"), metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "In-cluster self-test job passed", ctx, smallTestTimeout, func(ctx context.Context) error {
+ res, err := clientSet.BatchV1().Jobs("default").Get(ctx, "selftest", metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+ if res.Status.Failed > 0 {
+ pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
+ LabelSelector: "job-name=selftest",
+ })
+ if err != nil {
+ return util.Permanent(fmt.Errorf("job failed but failed to find pod: %w", err))
+ }
+ if len(pods.Items) < 1 {
+ return fmt.Errorf("job failed but pod does not exist")
+ }
+ lines, err := getPodLogLines(ctx, clientSet, pods.Items[0].Name, 1)
+ if err != nil {
+ return fmt.Errorf("job failed but could not get logs: %w", err)
+ }
+ if len(lines) > 0 {
+ return util.Permanent(fmt.Errorf("job failed, last log line: %s", lines[0]))
+ }
+ return util.Permanent(fmt.Errorf("job failed, empty log"))
+ }
+ if res.Status.Succeeded > 0 {
+ return nil
+ }
+ return fmt.Errorf("job still running")
+ })
+ if os.Getenv("HAVE_NESTED_KVM") != "" {
+ util.TestEventual(t, "Pod for KVM/QEMU smoke test", ctx, smallTestTimeout, func(ctx context.Context) error {
+ runcRuntimeClass := "runc"
+ _, err := clientSet.CoreV1().Pods("default").Create(ctx, &corev1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "vm-smoketest",
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{{
+ Name: "vm-smoketest",
+ ImagePullPolicy: corev1.PullNever,
+ Image: "bazel/metropolis/vm/smoketest:smoketest_container",
+ Resources: corev1.ResourceRequirements{
+ Limits: corev1.ResourceList{
+ "devices.monogon.dev/kvm": *resource.NewQuantity(1, ""),
+ },
+ },
+ }},
+ RuntimeClassName: &runcRuntimeClass,
+ RestartPolicy: corev1.RestartPolicyNever,
+ },
+ }, metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "KVM/QEMU smoke test completion", ctx, smallTestTimeout, func(ctx context.Context) error {
+ pod, err := clientSet.CoreV1().Pods("default").Get(ctx, "vm-smoketest", metav1.GetOptions{})
+ if err != nil {
+ return fmt.Errorf("failed to get pod: %w", err)
+ }
+ if pod.Status.Phase == corev1.PodSucceeded {
+ return nil
+ }
+ events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
+ if err != nil || len(events.Items) == 0 {
+ return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
+ } else {
+ return fmt.Errorf("pod is not ready: %v", events.Items[len(events.Items)-1].Message)
+ }
+ })
+ }
}