metropolis/test/e2e: add self-test image for networking

We don't have any networking tests in our E2E tests. This adds an image
which de-facto implements one. Or at least, will implement one once we
move to split workers/controllers and contacting a Kubernetes apiserver
from a pod will mean we're actually testing cross-node traffic.

Change-Id: I3d7be3824ac041d72e1c19cd468d30dbcb71fa03
Reviewed-on: https://review.monogon.dev/c/monogon/+/1481
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/test/e2e/BUILD.bazel b/metropolis/test/e2e/BUILD.bazel
index 3db8a33..2cde2c6 100644
--- a/metropolis/test/e2e/BUILD.bazel
+++ b/metropolis/test/e2e/BUILD.bazel
@@ -8,6 +8,7 @@
     deps = [
         "//metropolis/test/launch/cluster",
         "@io_k8s_api//apps/v1:apps",
+        "@io_k8s_api//batch/v1:batch",
         "@io_k8s_api//core/v1:core",
         "@io_k8s_apimachinery//pkg/api/resource",
         "@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
diff --git a/metropolis/test/e2e/kubernetes_helpers.go b/metropolis/test/e2e/kubernetes_helpers.go
index dec7363..066c1c2 100644
--- a/metropolis/test/e2e/kubernetes_helpers.go
+++ b/metropolis/test/e2e/kubernetes_helpers.go
@@ -17,11 +17,16 @@
 package e2e
 
 import (
+	"bytes"
+	"context"
 	"crypto/x509"
 	"encoding/pem"
 	"fmt"
+	"io"
+	"strings"
 
 	appsv1 "k8s.io/api/apps/v1"
+	batchv1 "k8s.io/api/batch/v1"
 	corev1 "k8s.io/api/core/v1"
 	"k8s.io/apimachinery/pkg/api/resource"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -91,6 +96,34 @@
 	}
 }
 
+// makeSelftestSpec generates a Job spec for the E2E self-test image.
+func makeSelftestSpec(name string) *batchv1.Job {
+	one := int32(1)
+	return &batchv1.Job{
+		ObjectMeta: metav1.ObjectMeta{Name: name},
+		Spec: batchv1.JobSpec{
+			BackoffLimit: &one,
+			Template: corev1.PodTemplateSpec{
+				ObjectMeta: metav1.ObjectMeta{
+					Labels: map[string]string{
+						"job-name": name,
+					},
+				},
+				Spec: corev1.PodSpec{
+					Containers: []corev1.Container{
+						{
+							Name:            "test",
+							ImagePullPolicy: corev1.PullNever,
+							Image:           "bazel/metropolis/test/e2e/selftest:selftest_image",
+						},
+					},
+					RestartPolicy: corev1.RestartPolicyOnFailure,
+				},
+			},
+		},
+	}
+}
+
 // makeTestStatefulSet generates a StatefulSet spec
 func makeTestStatefulSet(name string, volumeMode corev1.PersistentVolumeMode) *appsv1.StatefulSet {
 	return &appsv1.StatefulSet{
@@ -135,3 +168,22 @@
 		},
 	}
 }
+
+func getPodLogLines(ctx context.Context, cs kubernetes.Interface, podName string, nlines int64) ([]string, error) {
+	logsR := cs.CoreV1().Pods("default").GetLogs(podName, &corev1.PodLogOptions{
+		TailLines: &nlines,
+	})
+	logs, err := logsR.Stream(ctx)
+	if err != nil {
+		return nil, fmt.Errorf("stream failed: %w", err)
+	}
+	var buf bytes.Buffer
+	_, err = io.Copy(&buf, logs)
+	if err != nil {
+		return nil, fmt.Errorf("copy failed: %w", err)
+	}
+	lineStr := strings.Trim(buf.String(), "\n")
+	lines := strings.Split(lineStr, "\n")
+	lines = lines[len(lines)-int(nlines):]
+	return lines, nil
+}
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index 7a72db2..4e2ceb7 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -279,6 +279,39 @@
 					return fmt.Errorf("pod is not ready: %v", events.Items[0].Message)
 				}
 			})
+			util.TestEventual(t, "In-cluster self-test job", ctx, smallTestTimeout, func(ctx context.Context) error {
+				_, err := clientSet.BatchV1().Jobs("default").Create(ctx, makeSelftestSpec("selftest"), metav1.CreateOptions{})
+				return err
+			})
+			util.TestEventual(t, "In-cluster self-test job passed", ctx, smallTestTimeout, func(ctx context.Context) error {
+				res, err := clientSet.BatchV1().Jobs("default").Get(ctx, "selftest", metav1.GetOptions{})
+				if err != nil {
+					return err
+				}
+				if res.Status.Failed > 0 {
+					pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
+						LabelSelector: "job-name=selftest",
+					})
+					if err != nil {
+						return util.Permanent(fmt.Errorf("job failed but failed to find pod: %w", err))
+					}
+					if len(pods.Items) < 1 {
+						return fmt.Errorf("job failed but pod does not exist")
+					}
+					lines, err := getPodLogLines(ctx, clientSet, pods.Items[0].Name, 1)
+					if err != nil {
+						return fmt.Errorf("job failed but could not get logs: %w", err)
+					}
+					if len(lines) > 0 {
+						return util.Permanent(fmt.Errorf("job failed, last log line: %s", lines[0]))
+					}
+					return util.Permanent(fmt.Errorf("job failed, empty log"))
+				}
+				if res.Status.Succeeded > 0 {
+					return nil
+				}
+				return fmt.Errorf("job still running")
+			})
 			if os.Getenv("HAVE_NESTED_KVM") != "" {
 				util.TestEventual(t, "Pod for KVM/QEMU smoke test", ctx, smallTestTimeout, func(ctx context.Context) error {
 					runcRuntimeClass := "runc"
diff --git a/metropolis/test/e2e/selftest/BUILD.bazel b/metropolis/test/e2e/selftest/BUILD.bazel
new file mode 100644
index 0000000..2aa8029
--- /dev/null
+++ b/metropolis/test/e2e/selftest/BUILD.bazel
@@ -0,0 +1,16 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_docker//go:image.bzl", "go_image")
+
+go_library(
+    name = "selftest",
+    srcs = ["main.go"],
+    importpath = "source.monogon.dev/metropolis/test/e2e/selftest",
+    visibility = ["//visibility:private"],
+)
+
+go_image(
+    name = "selftest_image",
+    embed = [":selftest"],
+    pure = "on",
+    visibility = ["//metropolis/node:__pkg__"],
+)
diff --git a/metropolis/test/e2e/selftest/README.md b/metropolis/test/e2e/selftest/README.md
new file mode 100644
index 0000000..b001f38
--- /dev/null
+++ b/metropolis/test/e2e/selftest/README.md
@@ -0,0 +1,8 @@
+self-test image
+===
+
+This image is used by the Metropolis E2E tests to perform some cluster-internal
+tests. See //metropolis/test/e2e:main_test.go for usage.
+
+The image should be run as a Kubernetes Job, and should return 0 if all tests
+have passed. If the job fails, its last log line will be printed.
\ No newline at end of file
diff --git a/metropolis/test/e2e/selftest/main.go b/metropolis/test/e2e/selftest/main.go
new file mode 100644
index 0000000..2603eae
--- /dev/null
+++ b/metropolis/test/e2e/selftest/main.go
@@ -0,0 +1,84 @@
+package main
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"encoding/json"
+	"fmt"
+	"log"
+	"net/http"
+	"os"
+	"time"
+)
+
+// test1InClusterKubernetes exercises connectivity to the cluster-local
+// Kubernetes API server. It expects to be able to connect to the APIserver using
+// the ServiceAccount and cluster CA injected by the Kubelet.
+//
+// The entire functionality is reimplemented without relying on Kubernetes
+// client code to make the expected behaviour clear.
+func test1InClusterKubernetes(ctx context.Context) error {
+	token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
+	if err != nil {
+		return fmt.Errorf("failed to read serviceaccount token: %w", err)
+	}
+
+	cacert, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
+	if err != nil {
+		return fmt.Errorf("failed to read cluster CA certificate: %w", err)
+	}
+	pool := x509.NewCertPool()
+	pool.AppendCertsFromPEM(cacert)
+
+	client := &http.Client{
+		Transport: &http.Transport{
+			TLSClientConfig: &tls.Config{
+				RootCAs: pool,
+			},
+		},
+	}
+
+	req, err := http.NewRequestWithContext(ctx, "GET", "https://kubernetes.default.svc.cluster.local/api", nil)
+	if err != nil {
+		return fmt.Errorf("creating request failed: %w", err)
+	}
+	req.Header.Set("Authorization", "Bearer "+string(token))
+
+	res, err := client.Do(req)
+	if err != nil {
+		return fmt.Errorf("request failed: %w", err)
+	}
+	defer res.Body.Close()
+
+	j := struct {
+		Kind    string `json:"kind"`
+		Message string `json:"message"`
+	}{}
+	if err := json.NewDecoder(res.Body).Decode(&j); err != nil {
+		return fmt.Errorf("json parse error: %w", err)
+	}
+
+	if j.Kind == "Status" {
+		return fmt.Errorf("API server responded with error: %q", j.Message)
+	}
+	if j.Kind != "APIVersions" {
+		return fmt.Errorf("unexpected response from server (kind: %q)", j.Kind)
+	}
+
+	return nil
+}
+
+func main() {
+	log.Printf("Metropolis Kubernetes self-test starting...")
+	ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
+	defer ctxC()
+
+	log.Printf("1. In-cluster Kubernetes client...")
+	if err := test1InClusterKubernetes(ctx); err != nil {
+		fmt.Println(err.Error())
+		os.Exit(1)
+	}
+
+	log.Printf("All tests passed.")
+}
diff --git a/metropolis/test/util/runners.go b/metropolis/test/util/runners.go
index a2b1663..e25fe10 100644
--- a/metropolis/test/util/runners.go
+++ b/metropolis/test/util/runners.go
@@ -5,6 +5,7 @@
 import (
 	"context"
 	"errors"
+	"fmt"
 	"testing"
 	"time"
 
@@ -31,6 +32,9 @@
 			if err == ctx.Err() {
 				t.Fatal(lastErr)
 			}
+			if errors.Is(err, &PermanentError{}) {
+				t.Fatal(err)
+			}
 			lastErr = err
 			select {
 			case <-ctx.Done():
@@ -40,3 +44,37 @@
 		}
 	})
 }
+
+// PermanentError can be returned inside TestEventual to indicate that the test
+// is 'stuck', that it will not make progress anymore and that it should be
+// failed immediately.
+type PermanentError struct {
+	Err error
+}
+
+func (p *PermanentError) Error() string {
+	return fmt.Sprintf("test permanently failed: %v", p.Err)
+}
+
+func (p *PermanentError) Unwrap() error {
+	return p.Err
+}
+
+func (p *PermanentError) Is(o error) bool {
+	op, ok := o.(*PermanentError)
+	if !ok {
+		return false
+	}
+	if p.Err == nil || op.Err == nil {
+		return true
+	}
+	return errors.Is(p.Err, op.Err)
+}
+
+// Permanent wraps the given error into a PermanentError, which will cause
+// TestEventual to immediately fail the test it's returned within.
+func Permanent(err error) error {
+	return &PermanentError{
+		Err: err,
+	}
+}