metropolis/test/e2e: add self-test image for networking
We don't have any networking tests in our E2E tests. This adds an image
which de-facto implements one. Or at least, will implement one once we
move to split workers/controllers and contacting a Kubernetes apiserver
from a pod will mean we're actually testing cross-node traffic.
Change-Id: I3d7be3824ac041d72e1c19cd468d30dbcb71fa03
Reviewed-on: https://review.monogon.dev/c/monogon/+/1481
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/test/e2e/BUILD.bazel b/metropolis/test/e2e/BUILD.bazel
index 3db8a33..2cde2c6 100644
--- a/metropolis/test/e2e/BUILD.bazel
+++ b/metropolis/test/e2e/BUILD.bazel
@@ -8,6 +8,7 @@
deps = [
"//metropolis/test/launch/cluster",
"@io_k8s_api//apps/v1:apps",
+ "@io_k8s_api//batch/v1:batch",
"@io_k8s_api//core/v1:core",
"@io_k8s_apimachinery//pkg/api/resource",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
diff --git a/metropolis/test/e2e/kubernetes_helpers.go b/metropolis/test/e2e/kubernetes_helpers.go
index dec7363..066c1c2 100644
--- a/metropolis/test/e2e/kubernetes_helpers.go
+++ b/metropolis/test/e2e/kubernetes_helpers.go
@@ -17,11 +17,16 @@
package e2e
import (
+ "bytes"
+ "context"
"crypto/x509"
"encoding/pem"
"fmt"
+ "io"
+ "strings"
appsv1 "k8s.io/api/apps/v1"
+ batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -91,6 +96,34 @@
}
}
+// makeSelftestSpec generates a Job spec for the E2E self-test image.
+func makeSelftestSpec(name string) *batchv1.Job {
+ one := int32(1)
+ return &batchv1.Job{
+ ObjectMeta: metav1.ObjectMeta{Name: name},
+ Spec: batchv1.JobSpec{
+ BackoffLimit: &one,
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Labels: map[string]string{
+ "job-name": name,
+ },
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Name: "test",
+ ImagePullPolicy: corev1.PullNever,
+ Image: "bazel/metropolis/test/e2e/selftest:selftest_image",
+ },
+ },
+ RestartPolicy: corev1.RestartPolicyOnFailure,
+ },
+ },
+ },
+ }
+}
+
// makeTestStatefulSet generates a StatefulSet spec
func makeTestStatefulSet(name string, volumeMode corev1.PersistentVolumeMode) *appsv1.StatefulSet {
return &appsv1.StatefulSet{
@@ -135,3 +168,22 @@
},
}
}
+
+func getPodLogLines(ctx context.Context, cs kubernetes.Interface, podName string, nlines int64) ([]string, error) {
+ logsR := cs.CoreV1().Pods("default").GetLogs(podName, &corev1.PodLogOptions{
+ TailLines: &nlines,
+ })
+ logs, err := logsR.Stream(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("stream failed: %w", err)
+ }
+ var buf bytes.Buffer
+ _, err = io.Copy(&buf, logs)
+ if err != nil {
+ return nil, fmt.Errorf("copy failed: %w", err)
+ }
+ lineStr := strings.Trim(buf.String(), "\n")
+ lines := strings.Split(lineStr, "\n")
+ lines = lines[len(lines)-int(nlines):]
+ return lines, nil
+}
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index 7a72db2..4e2ceb7 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -279,6 +279,39 @@
return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
}
})
+ util.TestEventual(t, "In-cluster self-test job", ctx, smallTestTimeout, func(ctx context.Context) error {
+ _, err := clientSet.BatchV1().Jobs("default").Create(ctx, makeSelftestSpec("selftest"), metav1.CreateOptions{})
+ return err
+ })
+ util.TestEventual(t, "In-cluster self-test job passed", ctx, smallTestTimeout, func(ctx context.Context) error {
+ res, err := clientSet.BatchV1().Jobs("default").Get(ctx, "selftest", metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+ if res.Status.Failed > 0 {
+ pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
+ LabelSelector: "job-name=selftest",
+ })
+ if err != nil {
+ return util.Permanent(fmt.Errorf("job failed but failed to find pod: %w", err))
+ }
+ if len(pods.Items) < 1 {
+ return fmt.Errorf("job failed but pod does not exist")
+ }
+ lines, err := getPodLogLines(ctx, clientSet, pods.Items[0].Name, 1)
+ if err != nil {
+ return fmt.Errorf("job failed but could not get logs: %w", err)
+ }
+ if len(lines) > 0 {
+ return util.Permanent(fmt.Errorf("job failed, last log line: %s", lines[0]))
+ }
+ return util.Permanent(fmt.Errorf("job failed, empty log"))
+ }
+ if res.Status.Succeeded > 0 {
+ return nil
+ }
+ return fmt.Errorf("job still running")
+ })
if os.Getenv("HAVE_NESTED_KVM") != "" {
util.TestEventual(t, "Pod for KVM/QEMU smoke test", ctx, smallTestTimeout, func(ctx context.Context) error {
runcRuntimeClass := "runc"
diff --git a/metropolis/test/e2e/selftest/BUILD.bazel b/metropolis/test/e2e/selftest/BUILD.bazel
new file mode 100644
index 0000000..2aa8029
--- /dev/null
+++ b/metropolis/test/e2e/selftest/BUILD.bazel
@@ -0,0 +1,16 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_docker//go:image.bzl", "go_image")
+
+go_library(
+ name = "selftest",
+ srcs = ["main.go"],
+ importpath = "source.monogon.dev/metropolis/test/e2e/selftest",
+ visibility = ["//visibility:private"],
+)
+
+go_image(
+ name = "selftest_image",
+ embed = [":selftest"],
+ pure = "on",
+ visibility = ["//metropolis/node:__pkg__"],
+)
diff --git a/metropolis/test/e2e/selftest/README.md b/metropolis/test/e2e/selftest/README.md
new file mode 100644
index 0000000..b001f38
--- /dev/null
+++ b/metropolis/test/e2e/selftest/README.md
@@ -0,0 +1,8 @@
+self-test image
+===
+
+This image is used by the Metropolis E2E tests to perform some cluster-internal
+tests. See //metropolis/test/e2e:main_test.go for usage.
+
+The image should be run as a Kubernetes Job, and should return 0 if all tests
+have passed. If the job fails, its last log line will be printed.
\ No newline at end of file
diff --git a/metropolis/test/e2e/selftest/main.go b/metropolis/test/e2e/selftest/main.go
new file mode 100644
index 0000000..2603eae
--- /dev/null
+++ b/metropolis/test/e2e/selftest/main.go
@@ -0,0 +1,84 @@
+package main
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "encoding/json"
+ "fmt"
+ "log"
+ "net/http"
+ "os"
+ "time"
+)
+
+// test1InClusterKubernetes exercises connectivity to the cluster-local
+// Kubernetes API server. It expects to be able to connect to the APIserver using
+// the ServiceAccount and cluster CA injected by the Kubelet.
+//
+// The entire functionality is reimplemented without relying on Kubernetes
+// client code to make the expected behaviour clear.
+func test1InClusterKubernetes(ctx context.Context) error {
+ token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
+ if err != nil {
+ return fmt.Errorf("failed to read serviceaccount token: %w", err)
+ }
+
+ cacert, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
+ if err != nil {
+ return fmt.Errorf("failed to read cluster CA certificate: %w", err)
+ }
+ pool := x509.NewCertPool()
+ pool.AppendCertsFromPEM(cacert)
+
+ client := &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ RootCAs: pool,
+ },
+ },
+ }
+
+ req, err := http.NewRequestWithContext(ctx, "GET", "https://kubernetes.default.svc.cluster.local/api", nil)
+ if err != nil {
+ return fmt.Errorf("creating request failed: %w", err)
+ }
+ req.Header.Set("Authorization", "Bearer "+string(token))
+
+ res, err := client.Do(req)
+ if err != nil {
+ return fmt.Errorf("request failed: %w", err)
+ }
+ defer res.Body.Close()
+
+ j := struct {
+ Kind string `json:"kind"`
+ Message string `json:"message"`
+ }{}
+ if err := json.NewDecoder(res.Body).Decode(&j); err != nil {
+ return fmt.Errorf("json parse error: %w", err)
+ }
+
+ if j.Kind == "Status" {
+ return fmt.Errorf("API server responded with error: %q", j.Message)
+ }
+ if j.Kind != "APIVersions" {
+ return fmt.Errorf("unexpected response from server (kind: %q)", j.Kind)
+ }
+
+ return nil
+}
+
+func main() {
+ log.Printf("Metropolis Kubernetes self-test starting...")
+ ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
+ defer ctxC()
+
+ log.Printf("1. In-cluster Kubernetes client...")
+ if err := test1InClusterKubernetes(ctx); err != nil {
+ fmt.Println(err.Error())
+ os.Exit(1)
+ }
+
+ log.Printf("All tests passed.")
+}
diff --git a/metropolis/test/util/runners.go b/metropolis/test/util/runners.go
index a2b1663..e25fe10 100644
--- a/metropolis/test/util/runners.go
+++ b/metropolis/test/util/runners.go
@@ -5,6 +5,7 @@
import (
"context"
"errors"
+ "fmt"
"testing"
"time"
@@ -31,6 +32,9 @@
if err == ctx.Err() {
t.Fatal(lastErr)
}
+ if errors.Is(err, &PermanentError{}) {
+ t.Fatal(err)
+ }
lastErr = err
select {
case <-ctx.Done():
@@ -40,3 +44,37 @@
}
})
}
+
+// PermanentError can be returned inside TestEventual to indicate that the test
+// is 'stuck', that it will not make progress anymore and that it should be
+// failed immediately.
+type PermanentError struct {
+ Err error
+}
+
+func (p *PermanentError) Error() string {
+ return fmt.Sprintf("test permanently failed: %v", p.Err)
+}
+
+func (p *PermanentError) Unwrap() error {
+ return p.Err
+}
+
+func (p *PermanentError) Is(o error) bool {
+ op, ok := o.(*PermanentError)
+ if !ok {
+ return false
+ }
+ if p.Err == nil || op.Err == nil {
+ return true
+ }
+ return errors.Is(p.Err, op.Err)
+}
+
+// Permanent wraps the given error into a PermanentError, which will cause
+// TestEventual to immediately fail the test it's returned within.
+func Permanent(err error) error {
+ return &PermanentError{
+ Err: err,
+ }
+}