m/test/e2e: split core/kubernetes tests, clean up
This splits the large TestE2E function into two separate functions and
tests: one which exercises the core functionality of Kubernetes, the
other which exercises just the Kubernetes bits.
This allows for easier testing during development, and generally trades
off higher resources usage for faster execution time in CI.
At the same time we do some small cleanups of the E2E functionality:
1. Node startup is now parallelized.
2. Non-bootstrap nodes can now be left in NEW (this was used in
diagnosing issue #234, but it currently unused in the main code).
3. Kubernetes access now goes over SOCKS.
4. Some Cluster helper functions have been added.
All in all this should allow us writing more E2E tests in the future,
and at some point also maybe turn Cluster into an interface that is
implemented both by the current framework but also some persistent tests
running against long-term VMs/physical machines.
Change-Id: Ia4586b2aaa5fc8c979d35f4b49513638481e4c10
Reviewed-on: https://review.monogon.dev/c/monogon/+/1870
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/test/e2e/BUILD.bazel b/metropolis/test/e2e/BUILD.bazel
index 2cde2c6..25a6c88 100644
--- a/metropolis/test/e2e/BUILD.bazel
+++ b/metropolis/test/e2e/BUILD.bazel
@@ -6,7 +6,6 @@
importpath = "source.monogon.dev/metropolis/test/e2e",
visibility = ["//metropolis/test:__subpackages__"],
deps = [
- "//metropolis/test/launch/cluster",
"@io_k8s_api//apps/v1:apps",
"@io_k8s_api//batch/v1:batch",
"@io_k8s_api//core/v1:core",
@@ -14,7 +13,6 @@
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_apimachinery//pkg/util/intstr",
"@io_k8s_client_go//kubernetes",
- "@io_k8s_client_go//rest",
],
)
diff --git a/metropolis/test/e2e/k8s_cts/BUILD.bazel b/metropolis/test/e2e/k8s_cts/BUILD.bazel
index 6cc4080..8458b2d 100644
--- a/metropolis/test/e2e/k8s_cts/BUILD.bazel
+++ b/metropolis/test/e2e/k8s_cts/BUILD.bazel
@@ -6,8 +6,6 @@
importpath = "source.monogon.dev/metropolis/test/e2e/k8s_cts",
visibility = ["//visibility:private"],
deps = [
- "//metropolis/node",
- "//metropolis/test/e2e",
"//metropolis/test/launch/cluster",
"@io_k8s_api//core/v1:core",
"@io_k8s_api//rbac/v1:rbac",
diff --git a/metropolis/test/e2e/k8s_cts/main.go b/metropolis/test/e2e/k8s_cts/main.go
index c0adc75..4b86b64 100644
--- a/metropolis/test/e2e/k8s_cts/main.go
+++ b/metropolis/test/e2e/k8s_cts/main.go
@@ -33,8 +33,6 @@
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- common "source.monogon.dev/metropolis/node"
- "source.monogon.dev/metropolis/test/e2e"
"source.monogon.dev/metropolis/test/launch/cluster"
)
@@ -106,8 +104,7 @@
log.Fatalf("Failed to launch cluster: %v", err)
}
log.Println("Cluster initialized")
- // TODO(q3k): use SOCKS proxy instead.
- clientSet, err := e2e.GetKubeClientSet(cl, cl.Ports[uint16(common.KubernetesAPIWrappedPort)])
+ clientSet, err := cl.GetKubeClientSet()
if err != nil {
log.Fatalf("Failed to get clientSet: %v", err)
}
diff --git a/metropolis/test/e2e/kubernetes_helpers.go b/metropolis/test/e2e/kubernetes_helpers.go
index 066c1c2..cba4ca7 100644
--- a/metropolis/test/e2e/kubernetes_helpers.go
+++ b/metropolis/test/e2e/kubernetes_helpers.go
@@ -19,8 +19,6 @@
import (
"bytes"
"context"
- "crypto/x509"
- "encoding/pem"
"fmt"
"io"
"strings"
@@ -32,33 +30,8 @@
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
- "k8s.io/client-go/rest"
-
- "source.monogon.dev/metropolis/test/launch/cluster"
)
-// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
-// Kubernetes authenticating proxy using the cluster owner identity.
-// It currently has access to everything (i.e. the cluster-admin role)
-// via the owner-admin binding.
-func GetKubeClientSet(cluster *cluster.Cluster, port uint16) (kubernetes.Interface, error) {
- pkcs8Key, err := x509.MarshalPKCS8PrivateKey(cluster.Owner.PrivateKey)
- if err != nil {
- // We explicitly pass an Ed25519 private key in, so this can't happen
- panic(err)
- }
- var clientConfig = rest.Config{
- Host: fmt.Sprintf("localhost:%v", port),
- TLSClientConfig: rest.TLSClientConfig{
- ServerName: "kubernetes.default.svc",
- Insecure: true,
- CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cluster.Owner.Certificate[0]}),
- KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
- },
- }
- return kubernetes.NewForConfig(&clientConfig)
-}
-
// makeTestDeploymentSpec generates a Deployment spec for a single pod running
// NGINX with a readiness probe. This allows verifying that the control plane
// is capable of scheduling simple pods and that kubelet works, its runtime is
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index c2b9d95..a2c9eaf 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -60,28 +60,11 @@
largeTestTimeout = 120 * time.Second
)
-// TestE2E is the main E2E test entrypoint for single-node freshly-bootstrapped
-// E2E tests. It starts a full Metropolis node in bootstrap mode and then runs
-// tests against it. The actual tests it performs are located in the RunGroup
-// subtest.
-func TestE2E(t *testing.T) {
- // Run pprof server for debugging
- addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
- if err != nil {
- panic(err)
- }
-
- pprofListen, err := net.ListenTCP("tcp", addr)
- if err != nil {
- launch.Fatal("Failed to listen on pprof port: %s", pprofListen.Addr())
- }
-
- launch.Log("E2E: pprof server listening on %s", pprofListen.Addr())
- go func() {
- launch.Log("E2E: pprof server returned an error: %v", http.Serve(pprofListen, nil))
- pprofListen.Close()
- }()
-
+// TestE2ECore exercisees the core functionality of Metropolis: maintaining a
+// control plane, changing node roles, ...
+//
+// The tests are performed against an in-memory cluster.
+func TestE2ECore(t *testing.T) {
// Set a global timeout to make sure this terminates
ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
defer cancel()
@@ -113,331 +96,311 @@
defer cl.Close()
mgmt := apb.NewManagementClient(cl)
- // This exists to keep the parent around while all the children race.
- // It currently tests both a set of OS-level conditions and Kubernetes
- // Deployments and StatefulSets
- t.Run("RunGroup", func(t *testing.T) {
- t.Run("Cluster", func(t *testing.T) {
- util.TestEventual(t, "Retrieving cluster directory sucessful", ctx, 60*time.Second, func(ctx context.Context) error {
- res, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
- if err != nil {
- return fmt.Errorf("GetClusterInfo: %w", err)
- }
+ util.TestEventual(t, "Retrieving cluster directory sucessful", ctx, 60*time.Second, func(ctx context.Context) error {
+ res, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+ if err != nil {
+ return fmt.Errorf("GetClusterInfo: %w", err)
+ }
- // Ensure that the expected node count is present.
- nodes := res.ClusterDirectory.Nodes
- if want, got := clusterOptions.NumNodes, len(nodes); want != got {
- return fmt.Errorf("wanted %d nodes in cluster directory, got %d", want, got)
- }
+ // Ensure that the expected node count is present.
+ nodes := res.ClusterDirectory.Nodes
+ if want, got := clusterOptions.NumNodes, len(nodes); want != got {
+ return fmt.Errorf("wanted %d nodes in cluster directory, got %d", want, got)
+ }
- // Ensure the nodes have the expected addresses.
- addresses := make(map[string]bool)
- for _, n := range nodes {
- if len(n.Addresses) != 1 {
- return fmt.Errorf("node %s has no addresss", identity.NodeID(n.PublicKey))
- }
- address := n.Addresses[0].Host
- addresses[address] = true
- }
-
- for _, address := range []string{"10.1.0.2", "10.1.0.3"} {
- if !addresses[address] {
- return fmt.Errorf("address %q not found in directory", address)
- }
- }
- return nil
- })
- util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
- util.TestEventual(t, "Node rejoin successful", ctx, 60*time.Second, func(ctx context.Context) error {
- // Ensure nodes rejoin the cluster after a reboot by reboting the 1st node.
- if err := cluster.RebootNode(ctx, 1); err != nil {
- return fmt.Errorf("while rebooting a node: %w", err)
- }
- return nil
- })
- util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
- })
- t.Run("Kubernetes", func(t *testing.T) {
- t.Parallel()
- // TODO(q3k): use SOCKS proxy.
- clientSet, err := GetKubeClientSet(cluster, cluster.Ports[uint16(common.KubernetesAPIWrappedPort)])
- if err != nil {
- t.Fatal(err)
+ // Ensure the nodes have the expected addresses.
+ addresses := make(map[string]bool)
+ for _, n := range nodes {
+ if len(n.Addresses) != 1 {
+ return fmt.Errorf("node %s has no addresss", identity.NodeID(n.PublicKey))
}
- util.TestEventual(t, "Add KubernetesWorker roles", ctx, smallTestTimeout, func(ctx context.Context) error {
- // Find all nodes that are non-controllers.
- var ids []string
- srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
- if err != nil {
- return fmt.Errorf("GetNodes: %w", err)
- }
- defer srvN.CloseSend()
- for {
- node, err := srvN.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return fmt.Errorf("GetNodes.Recv: %w", err)
- }
- if node.Roles.KubernetesController != nil {
- continue
- }
- if node.Roles.ConsensusMember != nil {
- continue
- }
- ids = append(ids, identity.NodeID(node.Pubkey))
- }
+ address := n.Addresses[0].Host
+ addresses[address] = true
+ }
- if len(ids) < 1 {
- return fmt.Errorf("no appropriate nodes found")
- }
-
- // Make all these nodes as KubernetesWorker.
- for _, id := range ids {
- tr := true
- _, err := mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
- Node: &apb.UpdateNodeRolesRequest_Id{
- Id: id,
- },
- KubernetesWorker: &tr,
- })
- if err != nil {
- return fmt.Errorf("could not make node %q into kubernetes worker: %w", id, err)
- }
- }
- return nil
- })
- util.TestEventual(t, "Node is registered and ready", ctx, largeTestTimeout, func(ctx context.Context) error {
- nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
- if err != nil {
- return err
- }
- if len(nodes.Items) < 1 {
- return errors.New("node not yet registered")
- }
- node := nodes.Items[0]
- for _, cond := range node.Status.Conditions {
- if cond.Type != corev1.NodeReady {
- continue
- }
- if cond.Status != corev1.ConditionTrue {
- return fmt.Errorf("node not ready: %v", cond.Message)
- }
- }
- return nil
- })
- util.TestEventual(t, "Simple deployment", ctx, largeTestTimeout, func(ctx context.Context) error {
- _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeTestDeploymentSpec("test-deploy-1"), metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "Simple deployment is running", ctx, largeTestTimeout, func(ctx context.Context) error {
- res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-1"})
- if err != nil {
- return err
- }
- if len(res.Items) == 0 {
- return errors.New("pod didn't get created")
- }
- pod := res.Items[0]
- if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
- return nil
- }
- events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
- if err != nil || len(events.Items) == 0 {
- return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
- } else {
- return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
- }
- })
- util.TestEventual(t, "Simple deployment with gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
- deployment := makeTestDeploymentSpec("test-deploy-2")
- gvisorStr := "gvisor"
- deployment.Spec.Template.Spec.RuntimeClassName = &gvisorStr
- _, err := clientSet.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "Simple deployment is running on gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
- res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-2"})
- if err != nil {
- return err
- }
- if len(res.Items) == 0 {
- return errors.New("pod didn't get created")
- }
- pod := res.Items[0]
- if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
- return nil
- }
- events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
- if err != nil || len(events.Items) == 0 {
- return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
- } else {
- var errorMsg strings.Builder
- for _, msg := range events.Items {
- errorMsg.WriteString(" | ")
- errorMsg.WriteString(msg.Message)
- }
- return fmt.Errorf("pod is not ready: %v", errorMsg.String())
- }
- })
- util.TestEventual(t, "Simple StatefulSet with PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
- _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-1", corev1.PersistentVolumeFilesystem), metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "Simple StatefulSet with PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
- res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-1"})
- if err != nil {
- return err
- }
- if len(res.Items) == 0 {
- return errors.New("pod didn't get created")
- }
- pod := res.Items[0]
- if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
- return nil
- }
- events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
- if err != nil || len(events.Items) == 0 {
- return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
- } else {
- return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
- }
- })
- util.TestEventual(t, "Simple StatefulSet with Block PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
- _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-2", corev1.PersistentVolumeBlock), metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "Simple StatefulSet with Block PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
- res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-2"})
- if err != nil {
- return err
- }
- if len(res.Items) == 0 {
- return errors.New("pod didn't get created")
- }
- pod := res.Items[0]
- if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
- return nil
- }
- events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
- if err != nil || len(events.Items) == 0 {
- return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
- } else {
- return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
- }
- })
- util.TestEventual(t, "In-cluster self-test job", ctx, smallTestTimeout, func(ctx context.Context) error {
- _, err := clientSet.BatchV1().Jobs("default").Create(ctx, makeSelftestSpec("selftest"), metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "In-cluster self-test job passed", ctx, smallTestTimeout, func(ctx context.Context) error {
- res, err := clientSet.BatchV1().Jobs("default").Get(ctx, "selftest", metav1.GetOptions{})
- if err != nil {
- return err
- }
- if res.Status.Failed > 0 {
- pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
- LabelSelector: "job-name=selftest",
- })
- if err != nil {
- return util.Permanent(fmt.Errorf("job failed but failed to find pod: %w", err))
- }
- if len(pods.Items) < 1 {
- return fmt.Errorf("job failed but pod does not exist")
- }
- lines, err := getPodLogLines(ctx, clientSet, pods.Items[0].Name, 1)
- if err != nil {
- return fmt.Errorf("job failed but could not get logs: %w", err)
- }
- if len(lines) > 0 {
- return util.Permanent(fmt.Errorf("job failed, last log line: %s", lines[0]))
- }
- return util.Permanent(fmt.Errorf("job failed, empty log"))
- }
- if res.Status.Succeeded > 0 {
- return nil
- }
- return fmt.Errorf("job still running")
- })
- util.TestEventual(t, "Prometheus node metrics retrieved", ctx, smallTestTimeout, func(ctx context.Context) error {
- pool := x509.NewCertPool()
- pool.AddCert(cluster.CACertificate)
- cl := http.Client{
- Transport: &http.Transport{
- TLSClientConfig: &tls.Config{
- Certificates: []tls.Certificate{cluster.Owner},
- RootCAs: pool,
- },
- DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
- return cluster.DialNode(ctx, addr)
- },
- },
- }
- u := url.URL{
- Scheme: "https",
- Host: net.JoinHostPort(cluster.NodeIDs[0], common.MetricsPort.PortString()),
- Path: "/metrics/node",
- }
- res, err := cl.Get(u.String())
- if err != nil {
- return err
- }
- defer res.Body.Close()
- if res.StatusCode != 200 {
- return fmt.Errorf("status code %d", res.StatusCode)
- }
-
- body, err := io.ReadAll(res.Body)
- if err != nil {
- return err
- }
- needle := "node_uname_info"
- if !strings.Contains(string(body), needle) {
- return util.Permanent(fmt.Errorf("could not find %q in returned response", needle))
- }
- return nil
- })
- if os.Getenv("HAVE_NESTED_KVM") != "" {
- util.TestEventual(t, "Pod for KVM/QEMU smoke test", ctx, smallTestTimeout, func(ctx context.Context) error {
- runcRuntimeClass := "runc"
- _, err := clientSet.CoreV1().Pods("default").Create(ctx, &corev1.Pod{
- ObjectMeta: metav1.ObjectMeta{
- Name: "vm-smoketest",
- },
- Spec: corev1.PodSpec{
- Containers: []corev1.Container{{
- Name: "vm-smoketest",
- ImagePullPolicy: corev1.PullNever,
- Image: "bazel/metropolis/vm/smoketest:smoketest_container",
- Resources: corev1.ResourceRequirements{
- Limits: corev1.ResourceList{
- "devices.monogon.dev/kvm": *resource.NewQuantity(1, ""),
- },
- },
- }},
- RuntimeClassName: &runcRuntimeClass,
- RestartPolicy: corev1.RestartPolicyNever,
- },
- }, metav1.CreateOptions{})
- return err
- })
- util.TestEventual(t, "KVM/QEMU smoke test completion", ctx, smallTestTimeout, func(ctx context.Context) error {
- pod, err := clientSet.CoreV1().Pods("default").Get(ctx, "vm-smoketest", metav1.GetOptions{})
- if err != nil {
- return fmt.Errorf("failed to get pod: %w", err)
- }
- if pod.Status.Phase == corev1.PodSucceeded {
- return nil
- }
- events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
- if err != nil || len(events.Items) == 0 {
- return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
- } else {
- return fmt.Errorf("pod is not ready: %v", events.Items[len(events.Items)-1].Message)
- }
- })
+ for _, address := range []string{"10.1.0.2", "10.1.0.3"} {
+ if !addresses[address] {
+ return fmt.Errorf("address %q not found in directory", address)
}
- })
+ }
+ return nil
})
+ util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
+ util.TestEventual(t, "Node rejoin successful", ctx, 60*time.Second, func(ctx context.Context) error {
+ // Ensure nodes rejoin the cluster after a reboot by reboting the 1st node.
+ if err := cluster.RebootNode(ctx, 1); err != nil {
+ return fmt.Errorf("while rebooting a node: %w", err)
+ }
+ return nil
+ })
+ util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
+ util.TestEventual(t, "Prometheus node metrics retrieved", ctx, smallTestTimeout, func(ctx context.Context) error {
+ pool := x509.NewCertPool()
+ pool.AddCert(cluster.CACertificate)
+ cl := http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ Certificates: []tls.Certificate{cluster.Owner},
+ RootCAs: pool,
+ },
+ DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
+ return cluster.DialNode(ctx, addr)
+ },
+ },
+ }
+ u := url.URL{
+ Scheme: "https",
+ Host: net.JoinHostPort(cluster.NodeIDs[0], common.MetricsPort.PortString()),
+ Path: "/metrics/node",
+ }
+ res, err := cl.Get(u.String())
+ if err != nil {
+ return err
+ }
+ defer res.Body.Close()
+ if res.StatusCode != 200 {
+ return fmt.Errorf("status code %d", res.StatusCode)
+ }
+
+ body, err := io.ReadAll(res.Body)
+ if err != nil {
+ return err
+ }
+ needle := "node_uname_info"
+ if !strings.Contains(string(body), needle) {
+ return util.Permanent(fmt.Errorf("could not find %q in returned response", needle))
+ }
+ return nil
+ })
+}
+
+// TestE2ECore exercisees the Kubernetes functionality of Metropolis.
+//
+// The tests are performed against an in-memory cluster.
+func TestE2EKubernetes(t *testing.T) {
+ // Set a global timeout to make sure this terminates
+ ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
+ defer cancel()
+
+ // Launch cluster.
+ clusterOptions := cluster.ClusterOptions{
+ NumNodes: 2,
+ }
+ cluster, err := cluster.LaunchCluster(ctx, clusterOptions)
+ if err != nil {
+ t.Fatalf("LaunchCluster failed: %v", err)
+ }
+ defer func() {
+ err := cluster.Close()
+ if err != nil {
+ t.Fatalf("cluster Close failed: %v", err)
+ }
+ }()
+
+ clientSet, err := cluster.GetKubeClientSet()
+ if err != nil {
+ t.Fatal(err)
+ }
+ util.TestEventual(t, "Add KubernetesWorker roles", ctx, smallTestTimeout, func(ctx context.Context) error {
+ // Make everything but the first node into KubernetesWorkers.
+ for i := 1; i < clusterOptions.NumNodes; i++ {
+ err := cluster.MakeKubernetesWorker(ctx, cluster.NodeIDs[i])
+ if err != nil {
+ return util.Permanent(fmt.Errorf("MakeKubernetesWorker: %w", err))
+ }
+ }
+ return nil
+ })
+ util.TestEventual(t, "Node is registered and ready", ctx, largeTestTimeout, func(ctx context.Context) error {
+ nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
+ if err != nil {
+ return err
+ }
+ if len(nodes.Items) < 1 {
+ return errors.New("node not yet registered")
+ }
+ node := nodes.Items[0]
+ for _, cond := range node.Status.Conditions {
+ if cond.Type != corev1.NodeReady {
+ continue
+ }
+ if cond.Status != corev1.ConditionTrue {
+ return fmt.Errorf("node not ready: %v", cond.Message)
+ }
+ }
+ return nil
+ })
+ util.TestEventual(t, "Simple deployment", ctx, largeTestTimeout, func(ctx context.Context) error {
+ _, err := clientSet.AppsV1().Deployments("default").Create(ctx, makeTestDeploymentSpec("test-deploy-1"), metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "Simple deployment is running", ctx, largeTestTimeout, func(ctx context.Context) error {
+ res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-1"})
+ if err != nil {
+ return err
+ }
+ if len(res.Items) == 0 {
+ return errors.New("pod didn't get created")
+ }
+ pod := res.Items[0]
+ if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
+ return nil
+ }
+ events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
+ if err != nil || len(events.Items) == 0 {
+ return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
+ } else {
+ return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
+ }
+ })
+ util.TestEventual(t, "Simple deployment with gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
+ deployment := makeTestDeploymentSpec("test-deploy-2")
+ gvisorStr := "gvisor"
+ deployment.Spec.Template.Spec.RuntimeClassName = &gvisorStr
+ _, err := clientSet.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "Simple deployment is running on gvisor", ctx, largeTestTimeout, func(ctx context.Context) error {
+ res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-deploy-2"})
+ if err != nil {
+ return err
+ }
+ if len(res.Items) == 0 {
+ return errors.New("pod didn't get created")
+ }
+ pod := res.Items[0]
+ if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
+ return nil
+ }
+ events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
+ if err != nil || len(events.Items) == 0 {
+ return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
+ } else {
+ var errorMsg strings.Builder
+ for _, msg := range events.Items {
+ errorMsg.WriteString(" | ")
+ errorMsg.WriteString(msg.Message)
+ }
+ return fmt.Errorf("pod is not ready: %v", errorMsg.String())
+ }
+ })
+ util.TestEventual(t, "Simple StatefulSet with PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
+ _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-1", corev1.PersistentVolumeFilesystem), metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "Simple StatefulSet with PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
+ res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-1"})
+ if err != nil {
+ return err
+ }
+ if len(res.Items) == 0 {
+ return errors.New("pod didn't get created")
+ }
+ pod := res.Items[0]
+ if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
+ return nil
+ }
+ events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
+ if err != nil || len(events.Items) == 0 {
+ return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
+ } else {
+ return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
+ }
+ })
+ util.TestEventual(t, "Simple StatefulSet with Block PVC", ctx, largeTestTimeout, func(ctx context.Context) error {
+ _, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet("test-statefulset-2", corev1.PersistentVolumeBlock), metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "Simple StatefulSet with Block PVC is running", ctx, largeTestTimeout, func(ctx context.Context) error {
+ res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: "name=test-statefulset-2"})
+ if err != nil {
+ return err
+ }
+ if len(res.Items) == 0 {
+ return errors.New("pod didn't get created")
+ }
+ pod := res.Items[0]
+ if podv1.IsPodAvailable(&pod, 1, metav1.NewTime(time.Now())) {
+ return nil
+ }
+ events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
+ if err != nil || len(events.Items) == 0 {
+ return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
+ } else {
+ return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
+ }
+ })
+ util.TestEventual(t, "In-cluster self-test job", ctx, smallTestTimeout, func(ctx context.Context) error {
+ _, err := clientSet.BatchV1().Jobs("default").Create(ctx, makeSelftestSpec("selftest"), metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "In-cluster self-test job passed", ctx, smallTestTimeout, func(ctx context.Context) error {
+ res, err := clientSet.BatchV1().Jobs("default").Get(ctx, "selftest", metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+ if res.Status.Failed > 0 {
+ pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
+ LabelSelector: "job-name=selftest",
+ })
+ if err != nil {
+ return util.Permanent(fmt.Errorf("job failed but failed to find pod: %w", err))
+ }
+ if len(pods.Items) < 1 {
+ return fmt.Errorf("job failed but pod does not exist")
+ }
+ lines, err := getPodLogLines(ctx, clientSet, pods.Items[0].Name, 1)
+ if err != nil {
+ return fmt.Errorf("job failed but could not get logs: %w", err)
+ }
+ if len(lines) > 0 {
+ return util.Permanent(fmt.Errorf("job failed, last log line: %s", lines[0]))
+ }
+ return util.Permanent(fmt.Errorf("job failed, empty log"))
+ }
+ if res.Status.Succeeded > 0 {
+ return nil
+ }
+ return fmt.Errorf("job still running")
+ })
+ if os.Getenv("HAVE_NESTED_KVM") != "" {
+ util.TestEventual(t, "Pod for KVM/QEMU smoke test", ctx, smallTestTimeout, func(ctx context.Context) error {
+ runcRuntimeClass := "runc"
+ _, err := clientSet.CoreV1().Pods("default").Create(ctx, &corev1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "vm-smoketest",
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{{
+ Name: "vm-smoketest",
+ ImagePullPolicy: corev1.PullNever,
+ Image: "bazel/metropolis/vm/smoketest:smoketest_container",
+ Resources: corev1.ResourceRequirements{
+ Limits: corev1.ResourceList{
+ "devices.monogon.dev/kvm": *resource.NewQuantity(1, ""),
+ },
+ },
+ }},
+ RuntimeClassName: &runcRuntimeClass,
+ RestartPolicy: corev1.RestartPolicyNever,
+ },
+ }, metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "KVM/QEMU smoke test completion", ctx, smallTestTimeout, func(ctx context.Context) error {
+ pod, err := clientSet.CoreV1().Pods("default").Get(ctx, "vm-smoketest", metav1.GetOptions{})
+ if err != nil {
+ return fmt.Errorf("failed to get pod: %w", err)
+ }
+ if pod.Status.Phase == corev1.PodSucceeded {
+ return nil
+ }
+ events, err := clientSet.CoreV1().Events("default").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("involvedObject.name=%s,involvedObject.namespace=default", pod.Name)})
+ if err != nil || len(events.Items) == 0 {
+ return fmt.Errorf("pod is not ready: %v", pod.Status.Phase)
+ } else {
+ return fmt.Errorf("pod is not ready: %v", events.Items[len(events.Items)-1].Message)
+ }
+ })
+ }
}
diff --git a/metropolis/test/launch/cluster/BUILD.bazel b/metropolis/test/launch/cluster/BUILD.bazel
index e81ff7c..a2dcc52 100644
--- a/metropolis/test/launch/cluster/BUILD.bazel
+++ b/metropolis/test/launch/cluster/BUILD.bazel
@@ -31,6 +31,8 @@
"//metropolis/test/launch",
"@com_github_cenkalti_backoff_v4//:backoff",
"@com_github_kballard_go_shellquote//:go-shellquote",
+ "@io_k8s_client_go//kubernetes",
+ "@io_k8s_client_go//rest",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index 615a9cc..5b39f67 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -11,6 +11,7 @@
"crypto/rand"
"crypto/tls"
"crypto/x509"
+ "encoding/pem"
"errors"
"fmt"
"io"
@@ -30,6 +31,8 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
"source.monogon.dev/metropolis/cli/pkg/datafile"
@@ -487,6 +490,11 @@
// The files will be located within the launch directory inside TEST_TMPDIR (or
// the default tempdir location, if not set).
NodeLogsToFiles bool
+
+ // LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
+ // bootstrapping them. The nodes' address information in Cluster.Nodes will be
+ // incomplete.
+ LeaveNodesNew bool
}
// Cluster is the running Metropolis cluster launched using the LaunchCluster
@@ -543,7 +551,8 @@
// NodeInCluster represents information about a node that's part of a Cluster.
type NodeInCluster struct {
// ID of the node, which can be used to dial this node's services via DialNode.
- ID string
+ ID string
+ Pubkey []byte
// Address of the node on the network ran by nanoswitch. Not reachable from the
// host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
// on Cluster.Ports[SOCKSPort]).
@@ -868,8 +877,6 @@
}
// Now run the rest of the nodes.
- //
- // TODO(q3k): parallelize this
for i := 1; i < opts.NumNodes; i++ {
launch.Log("Cluster: Starting node %d...", i+1)
go func(i int) {
@@ -879,9 +886,11 @@
}
done[i] <- err
}(i)
- var newNode *apb.Node
+ }
- launch.Log("Cluster: waiting for node %d to appear as NEW...", i)
+ seenNodes := make(map[string]bool)
+ launch.Log("Cluster: waiting for nodes to appear as NEW...")
+ for i := 1; i < opts.NumNodes; i++ {
for {
nodes, err := getNodes(ctx, mgmt)
if err != nil {
@@ -889,65 +898,75 @@
return nil, fmt.Errorf("could not get nodes: %w", err)
}
for _, n := range nodes {
- if n.State == cpb.NodeState_NODE_STATE_NEW {
- newNode = n
- break
+ if n.State != cpb.NodeState_NODE_STATE_NEW {
+ continue
}
+ seenNodes[n.Id] = true
+ cluster.Nodes[n.Id] = &NodeInCluster{
+ ID: n.Id,
+ Pubkey: n.Pubkey,
+ }
+ cluster.NodeIDs = append(cluster.NodeIDs, n.Id)
}
- if newNode != nil {
+
+ if len(seenNodes) == opts.NumNodes-1 {
break
}
time.Sleep(1 * time.Second)
}
- id := identity.NodeID(newNode.Pubkey)
- launch.Log("Cluster: node %d is %s", i, id)
+ }
+ launch.Log("Found all expected nodes")
- launch.Log("Cluster: approving node %d", i)
- _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
- Pubkey: newNode.Pubkey,
- })
- if err != nil {
- ctxC()
- return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
- }
- launch.Log("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
+ approvedNodes := make(map[string]bool)
+ upNodes := make(map[string]bool)
+ if !opts.LeaveNodesNew {
for {
nodes, err := getNodes(ctx, mgmt)
if err != nil {
ctxC()
return nil, fmt.Errorf("could not get nodes: %w", err)
}
- found := false
- for _, n := range nodes {
- if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
+ for _, node := range nodes {
+ if !seenNodes[node.Id] {
+ // Skip nodes that weren't NEW in the previous step.
continue
}
- if n.Status == nil || n.Status.ExternalAddress == "" {
- break
+
+ if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
+ launch.Log("Cluster: node %s is up", node.Id)
+ upNodes[node.Id] = true
+ cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
}
- if n.State != cpb.NodeState_NODE_STATE_UP {
- break
+ if upNodes[node.Id] {
+ continue
}
- found = true
- cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
- ID: identity.NodeID(n.Pubkey),
- ManagementAddress: n.Status.ExternalAddress,
+
+ if !approvedNodes[node.Id] {
+ launch.Log("Cluster: approving node %s", node.Id)
+ _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
+ Pubkey: node.Pubkey,
+ })
+ if err != nil {
+ ctxC()
+ return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
+ }
+ approvedNodes[node.Id] = true
}
- cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
- break
}
- if found {
+
+ launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
+ if len(upNodes) == opts.NumNodes-1 {
break
}
time.Sleep(time.Second)
}
- launch.Log("Cluster: node %d (%s) UP!", i, id)
}
launch.Log("Cluster: all nodes up:")
for _, node := range cluster.Nodes {
launch.Log("Cluster: - %s at %s", node.ID, node.ManagementAddress)
}
+ launch.Log("Cluster: starting tests...")
return cluster, nil
}
@@ -1077,6 +1096,34 @@
return c.socksDialer.Dial("tcp", addr)
}
+// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
+// Kubernetes authenticating proxy using the cluster owner identity.
+// It currently has access to everything (i.e. the cluster-admin role)
+// via the owner-admin binding.
+func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
+ pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
+ if err != nil {
+ // We explicitly pass an Ed25519 private key in, so this can't happen
+ panic(err)
+ }
+
+ host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
+ var clientConfig = rest.Config{
+ Host: host,
+ TLSClientConfig: rest.TLSClientConfig{
+ // TODO(q3k): use CA certificate
+ Insecure: true,
+ ServerName: "kubernetes.default.svc",
+ CertData: pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
+ KeyData: pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
+ },
+ Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
+ return c.DialNode(ctx, address)
+ },
+ }
+ return kubernetes.NewForConfig(&clientConfig)
+}
+
// KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
// which are currently Kubernetes controllers, ie. run an apiserver. This list
// might be empty if no node is currently configured with the
@@ -1111,6 +1158,8 @@
return res, nil
}
+// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
+// healthy.
func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
// Get an authenticated owner client within the cluster.
curC, err := c.CuratorClient()
@@ -1135,3 +1184,71 @@
}
return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
}
+
+// ApproveNode approves a node by ID, waiting for it to become UP.
+func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
+ curC, err := c.CuratorClient()
+ if err != nil {
+ return err
+ }
+ mgmt := apb.NewManagementClient(curC)
+
+ _, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
+ Pubkey: c.Nodes[id].Pubkey,
+ })
+ if err != nil {
+ return fmt.Errorf("ApproveNode: %w", err)
+ }
+ launch.Log("Cluster: %s: approved, waiting for UP", id)
+ for {
+ nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+ if err != nil {
+ return fmt.Errorf("GetNodes: %w", err)
+ }
+ found := false
+ for {
+ node, err := nodes.Recv()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("Nodes.Recv: %w", err)
+ }
+ if node.Id != id {
+ continue
+ }
+ if node.State != cpb.NodeState_NODE_STATE_UP {
+ continue
+ }
+ found = true
+ break
+ }
+ nodes.CloseSend()
+
+ if found {
+ break
+ }
+ time.Sleep(time.Second)
+ }
+ launch.Log("Cluster: %s: UP", id)
+ return nil
+}
+
+// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
+func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
+ curC, err := c.CuratorClient()
+ if err != nil {
+ return err
+ }
+ mgmt := apb.NewManagementClient(curC)
+
+ tr := true
+ launch.Log("Cluster: %s: adding KubernetesWorker", id)
+ _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
+ Node: &apb.UpdateNodeRolesRequest_Id{
+ Id: id,
+ },
+ KubernetesWorker: &tr,
+ })
+ return err
+}