m/test/e2e/connectivity: add connectivity tester

This adds a connectivity testing framework. It uses pod agents and
communicates with them over stdio. This is used to implement a simple
smoke test and will later be used to test network policy controllers.

Change-Id: If40673a91336dbe3a7a383bf2e9d17736fad3bdc
Reviewed-on: https://review.monogon.dev/c/monogon/+/3756
Reviewed-by: Jan Schär <jan@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/test/e2e/BUILD.bazel b/metropolis/test/e2e/BUILD.bazel
index 883dcb8..85ef13a 100644
--- a/metropolis/test/e2e/BUILD.bazel
+++ b/metropolis/test/e2e/BUILD.bazel
@@ -6,6 +6,7 @@
         "//metropolis/test/e2e/selftest:selftest_image",
         "//metropolis/test/e2e/persistentvolume:persistentvolume_image",
         "//metropolis/test/e2e/httpserver:httpserver_image",
+        "//metropolis/test/e2e/connectivity/agent:agent_image",
     ],
     visibility = [
         "//metropolis/test/e2e/suites:__subpackages__",
diff --git a/metropolis/test/e2e/connectivity/BUILD.bazel b/metropolis/test/e2e/connectivity/BUILD.bazel
new file mode 100644
index 0000000..d8dbe3e
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/BUILD.bazel
@@ -0,0 +1,23 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+    name = "connectivity",
+    srcs = ["connectivity.go"],
+    importpath = "source.monogon.dev/metropolis/test/e2e/connectivity",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//metropolis/test/e2e/connectivity/spec",
+        "@com_github_cenkalti_backoff_v4//:backoff",
+        "@io_k8s_api//core/v1:core",
+        "@io_k8s_apimachinery//pkg/api/errors",
+        "@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
+        "@io_k8s_apimachinery//pkg/watch",
+        "@io_k8s_client_go//kubernetes",
+        "@io_k8s_client_go//kubernetes/scheme",
+        "@io_k8s_client_go//rest",
+        "@io_k8s_client_go//tools/remotecommand",
+        "@io_k8s_utils//ptr",
+        "@org_golang_google_protobuf//encoding/protodelim",
+        "@org_golang_google_protobuf//types/known/durationpb",
+    ],
+)
diff --git a/metropolis/test/e2e/connectivity/agent/BUILD.bazel b/metropolis/test/e2e/connectivity/agent/BUILD.bazel
new file mode 100644
index 0000000..cb52497
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/agent/BUILD.bazel
@@ -0,0 +1,43 @@
+load("@aspect_bazel_lib//lib:transitions.bzl", "platform_transition_binary")
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+load("@rules_oci//oci:defs.bzl", "oci_image")
+load("@rules_pkg//pkg:tar.bzl", "pkg_tar")
+
+go_library(
+    name = "agent_lib",
+    srcs = ["main.go"],
+    importpath = "source.monogon.dev/metropolis/test/e2e/connectivity/agent",
+    visibility = ["//visibility:private"],
+    deps = [
+        "//metropolis/test/e2e/connectivity/spec",
+        "@org_golang_google_protobuf//encoding/protodelim",
+    ],
+)
+
+go_binary(
+    name = "agent",
+    embed = [":agent_lib"],
+    visibility = ["//visibility:private"],
+)
+
+platform_transition_binary(
+    name = "agent_transitioned",
+    binary = ":agent",
+    target_platform = "//build/platforms:linux_amd64_static",
+    visibility = ["//visibility:private"],
+)
+
+pkg_tar(
+    name = "agent_layer",
+    srcs = [":agent_transitioned"],
+    visibility = ["//visibility:private"],
+)
+
+oci_image(
+    name = "agent_image",
+    base = "@distroless_base",
+    entrypoint = ["/agent"],
+    tars = [":agent_layer"],
+    visibility = ["//metropolis/test/e2e:__pkg__"],
+    workdir = "/app",
+)
diff --git a/metropolis/test/e2e/connectivity/agent/main.go b/metropolis/test/e2e/connectivity/agent/main.go
new file mode 100644
index 0000000..a11b488
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/agent/main.go
@@ -0,0 +1,123 @@
+// Copyright The Monogon Project Authors.
+// SPDX-License-Identifier: Apache-2.0
+
+package main
+
+// connectivity agent hosts test runner-defined network listeners and performs
+// connectivity tests to other instances of itself.
+// It runs in an OCI image and a test runner communicates with it over
+// stdin/stdout with delimited protobufs. See the spec directory for the
+// request/response definitions.
+
+import (
+	"bufio"
+	"context"
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"io"
+	"log"
+	"net"
+	"os"
+	"time"
+
+	"google.golang.org/protobuf/encoding/protodelim"
+
+	"source.monogon.dev/metropolis/test/e2e/connectivity/spec"
+)
+
+func main() {
+	t := tester{
+		servers: make(map[uint64]net.Listener),
+	}
+	stdinReader := bufio.NewReader(os.Stdin)
+	for {
+		var req spec.Request
+		if err := protodelim.UnmarshalFrom(stdinReader, &req); err != nil {
+			log.Fatalf("Unable to unmarshal request: %v", err)
+		}
+		var res spec.Response
+		switch r := req.Req.(type) {
+		case *spec.Request_Test:
+			res.Res = &spec.Response_Test{Test: t.runTest(r.Test)}
+		case *spec.Request_StartServer:
+			res.Res = &spec.Response_StartServer{StartServer: t.startServer(r.StartServer)}
+		case *spec.Request_StopServer:
+			res.Res = &spec.Response_StopServer{StopServer: t.stopServer(r.StopServer)}
+		default:
+			log.Fatalf("Unknown request type: %T", r)
+		}
+		if _, err := protodelim.MarshalTo(os.Stdout, &res); err != nil {
+			log.Fatalf("Unable to marshal response: %v", err)
+		}
+	}
+}
+
+type tester struct {
+	servers map[uint64]net.Listener
+}
+
+func errToResponse(err error) *spec.TestResponse {
+	switch {
+	case errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, context.DeadlineExceeded):
+		return &spec.TestResponse{
+			Result:           spec.TestResponse_CONNECTION_TIMEOUT,
+			ErrorDescription: err.Error(),
+		}
+	default:
+		return &spec.TestResponse{
+			Result:           spec.TestResponse_CONNECTION_REJECTED,
+			ErrorDescription: err.Error(),
+		}
+	}
+}
+
+func (t *tester) runTest(req *spec.TestRequest) *spec.TestResponse {
+	conn, err := net.DialTimeout("tcp", req.Address, req.Timeout.AsDuration())
+	if err != nil {
+		return errToResponse(err)
+	}
+	defer conn.Close()
+	var tokenRaw [8]byte
+	conn.SetReadDeadline(time.Now().Add(req.Timeout.AsDuration()))
+	if _, err := io.ReadFull(conn, tokenRaw[:]); err != nil {
+		return errToResponse(err)
+	}
+	receivedToken := binary.LittleEndian.Uint64(tokenRaw[:])
+	if receivedToken != req.Token {
+		return &spec.TestResponse{
+			Result:           spec.TestResponse_WRONG_TOKEN,
+			ErrorDescription: fmt.Sprintf("Received token %d, wanted %d", receivedToken, req.Token),
+		}
+	}
+	return &spec.TestResponse{
+		Result: spec.TestResponse_SUCCESS,
+	}
+}
+
+func (t *tester) startServer(req *spec.StartServerRequest) *spec.StartServerResponse {
+	l, err := net.Listen("tcp", req.Address)
+	if err != nil {
+		return &spec.StartServerResponse{ErrorDescription: err.Error()}
+	}
+	t.servers[req.Token] = l
+	go tokenServer(l, req.Token)
+	return &spec.StartServerResponse{Ok: true}
+}
+
+func tokenServer(l net.Listener, token uint64) {
+	for {
+		conn, err := l.Accept()
+		if err != nil {
+			return
+		}
+		conn.Write(binary.LittleEndian.AppendUint64(nil, token))
+		conn.Close()
+	}
+}
+
+func (t *tester) stopServer(req *spec.StopServerRequest) *spec.StopServerResponse {
+	t.servers[req.Token].Close()
+	delete(t.servers, req.Token)
+	return &spec.StopServerResponse{Ok: true}
+}
diff --git a/metropolis/test/e2e/connectivity/connectivity.go b/metropolis/test/e2e/connectivity/connectivity.go
new file mode 100644
index 0000000..070bacc
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/connectivity.go
@@ -0,0 +1,347 @@
+// Copyright The Monogon Project Authors.
+// SPDX-License-Identifier: Apache-2.0
+
+package connectivity
+
+import (
+	"bufio"
+	"context"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"net/netip"
+	"os"
+	"strconv"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"github.com/cenkalti/backoff/v4"
+	"google.golang.org/protobuf/encoding/protodelim"
+	"google.golang.org/protobuf/types/known/durationpb"
+	corev1 "k8s.io/api/core/v1"
+	kerrors "k8s.io/apimachinery/pkg/api/errors"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/watch"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/kubernetes/scheme"
+	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/remotecommand"
+	"k8s.io/utils/ptr"
+
+	"source.monogon.dev/metropolis/test/e2e/connectivity/spec"
+)
+
+type podState struct {
+	name      string
+	namespace string
+	ipv4      netip.Addr
+	ipv6      netip.Addr
+	reqMu     sync.Mutex
+	in        io.Writer
+	out       *bufio.Reader
+}
+
+func (p *podState) do(req *spec.Request) (*spec.Response, error) {
+	// This could be made much faster by introducing a request/response routing
+	// layer which would allow concurrent requests and send the response to the
+	// correct request handler. For simplicity this hasn't been done.
+	p.reqMu.Lock()
+	defer p.reqMu.Unlock()
+	if _, err := protodelim.MarshalTo(p.in, req); err != nil {
+		return nil, fmt.Errorf("while sending request to pod: %w", err)
+	}
+	var res spec.Response
+	if err := protodelim.UnmarshalFrom(p.out, &res); err != nil {
+		return nil, fmt.Errorf("while reading response from pod: %w", err)
+	}
+	return &res, nil
+}
+
+type Tester struct {
+	cancel    context.CancelFunc
+	clientSet kubernetes.Interface
+	pods      []podState
+	lastToken atomic.Uint64
+}
+
+// Convenience aliases
+const (
+	ExpectedSuccess = spec.TestResponse_SUCCESS
+	ExpectedTimeout = spec.TestResponse_CONNECTION_TIMEOUT
+	ExpectedReject  = spec.TestResponse_CONNECTION_REJECTED
+)
+
+func (te *Tester) GetToken() uint64 {
+	return te.lastToken.Add(1)
+}
+
+func (te *Tester) TestPodConnectivity(t *testing.T, srcPod, dstPod int, port int, expected spec.TestResponse_Result) {
+	te.TestPodConnectivityEventual(t, srcPod, dstPod, port, expected, 0)
+}
+
+func (te *Tester) TestPodConnectivityEventual(t *testing.T, srcPod, dstPod int, port int, expected spec.TestResponse_Result, timeout time.Duration) {
+	token := te.GetToken()
+	prefix := fmt.Sprintf("%d -> %d :%d", srcPod, dstPod, port)
+	if err := te.startServer(dstPod, &spec.StartServerRequest{
+		Address: net.JoinHostPort("0.0.0.0", strconv.Itoa(port)),
+		Token:   token,
+	}); err != nil {
+		t.Fatalf("%v: %v", prefix, err)
+		return
+	}
+	defer func() {
+		if err := te.stopServer(dstPod, &spec.StopServerRequest{Token: token}); err != nil {
+			t.Log(err)
+		}
+	}()
+	deadline := time.Now().Add(timeout)
+	for {
+		res, err := te.runTest(srcPod, &spec.TestRequest{
+			Token:   token,
+			Timeout: durationpb.New(5 * time.Second),
+			Address: net.JoinHostPort(te.pods[dstPod].ipv4.String(), strconv.Itoa(port)),
+		})
+		if err != nil {
+			// Unknown errors do not get retried
+			t.Fatalf("%v: %v", prefix, err)
+			return
+		}
+		err = checkExpectations(t, res, expected, prefix)
+		if err == nil {
+			return
+		} else if deadline.Before(time.Now()) {
+			t.Error(err)
+			return
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+}
+
+func checkExpectations(t *testing.T, res *spec.TestResponse, expected spec.TestResponse_Result, prefix string) error {
+	switch res.Result {
+	case spec.TestResponse_CONNECTION_REJECTED, spec.TestResponse_CONNECTION_TIMEOUT:
+		if expected != res.Result {
+			return fmt.Errorf("%v expected %v, got %v (%v)", prefix, expected, res.Result, res.ErrorDescription)
+		}
+	case spec.TestResponse_WRONG_TOKEN:
+		return fmt.Errorf("%v connected, but got wrong token", prefix)
+	case spec.TestResponse_SUCCESS:
+		if expected != ExpectedSuccess {
+			return fmt.Errorf("%v expected %v, got %v", prefix, expected, res.Result)
+		}
+	}
+	return nil
+}
+
+func (te *Tester) startServer(pod int, req *spec.StartServerRequest) error {
+	res, err := te.pods[pod].do(&spec.Request{Req: &spec.Request_StartServer{StartServer: req}})
+	if err != nil {
+		return fmt.Errorf("test pod communication failure: %w", err)
+	}
+	startRes := res.GetStartServer()
+	if !startRes.Ok {
+		return fmt.Errorf("test server could not be started: %v", startRes.ErrorDescription)
+	}
+	return nil
+}
+
+func (te *Tester) stopServer(pod int, req *spec.StopServerRequest) error {
+	res, err := te.pods[pod].do(&spec.Request{Req: &spec.Request_StopServer{StopServer: req}})
+	if err != nil {
+		return fmt.Errorf("test pod communication failure: %w", err)
+	}
+	stopRes := res.GetStopServer()
+	if !stopRes.Ok {
+		return fmt.Errorf("test server could not be stopped: %v", stopRes.ErrorDescription)
+	}
+	return nil
+}
+
+func (te *Tester) runTest(pod int, req *spec.TestRequest) (*spec.TestResponse, error) {
+	res, err := te.pods[pod].do(&spec.Request{Req: &spec.Request_Test{Test: req}})
+	if err != nil {
+		return nil, fmt.Errorf("test pod communication failure: %w", err)
+	}
+	testRes := res.GetTest()
+	if testRes.Result == spec.TestResponse_UNKNOWN {
+		return nil, fmt.Errorf("test encountered unknown error: %v", testRes.ErrorDescription)
+	}
+	return testRes, nil
+}
+
+type TestSpec struct {
+	// Name needs to contain a DNS label-compatible unique name which
+	// is used to identify the Kubernetes resources for this test.
+	Name string
+	// ClientSet needs to be a client for the test cluster.
+	ClientSet kubernetes.Interface
+	// RESTConfig needs to be the client config for the same test cluster.
+	RESTConfig *rest.Config
+	// Number of pods to start for testing. Their identities go from 0 to
+	// NumPods-1.
+	NumPods int
+	// ExtraPodConfig is called for every pod to be created and can be used to
+	// customize their specification.
+	ExtraPodConfig func(i int, pod *corev1.Pod)
+}
+
+// SetupTest sets up the K8s resources and communication channels for
+// connectivity tests. It registers a cleanup function which automatically
+// tears them down again after the test.
+func SetupTest(t *testing.T, s *TestSpec) *Tester {
+	t.Helper()
+
+	testCtx, testCancel := context.WithCancel(context.Background())
+
+	tester := Tester{
+		cancel:    testCancel,
+		clientSet: s.ClientSet,
+		pods:      make([]podState, s.NumPods),
+	}
+
+	// Use a non-zero arbitrary start value to decrease the chance of
+	// accidential conflicts.
+	tester.lastToken.Store(1234)
+	wg := sync.WaitGroup{}
+	setupCtx, cancel := context.WithTimeout(testCtx, 60*time.Second)
+	defer cancel()
+	errChan := make(chan error, s.NumPods)
+	for i := 0; i < s.NumPods; i++ {
+		pod := &corev1.Pod{
+			ObjectMeta: metav1.ObjectMeta{
+				Name:      fmt.Sprintf("%v-%d", s.Name, i),
+				Namespace: "default",
+			},
+			Spec: corev1.PodSpec{
+				Containers: []corev1.Container{{
+					Name:  "connectivitytester",
+					Image: "test.monogon.internal/metropolis/test/e2e/connectivity/agent/agent_image",
+					Stdin: true,
+				}},
+				EnableServiceLinks: ptr.To(false),
+			},
+		}
+		if s.ExtraPodConfig != nil {
+			s.ExtraPodConfig(i, pod)
+		}
+		wg.Add(1)
+		go func() {
+			pod, err := backoff.RetryNotifyWithData(func() (*corev1.Pod, error) {
+				ctx, cancel := context.WithTimeout(setupCtx, 5*time.Second)
+				defer cancel()
+				return s.ClientSet.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
+			}, backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), setupCtx), func(err error, d time.Duration) {
+				t.Logf("attempted creating pod %d: %v", i, err)
+			})
+			if err != nil {
+				errChan <- fmt.Errorf("creating pod %d failed: %w", i, err)
+				wg.Done()
+				return
+			}
+			tester.pods[i].name = pod.Name
+			tester.pods[i].namespace = pod.Namespace
+			// Wait for pods to be ready and populate IPs
+			err = backoff.Retry(func() error {
+				podW, err := s.ClientSet.CoreV1().Pods(pod.Namespace).Watch(setupCtx, metav1.SingleObject(pod.ObjectMeta))
+				if err != nil {
+					return err
+				}
+				defer podW.Stop()
+				podUpdates := podW.ResultChan()
+				for event := range podUpdates {
+					if event.Type != watch.Modified && event.Type != watch.Added {
+						continue
+					}
+					pod = event.Object.(*corev1.Pod)
+					if pod.Status.Phase == corev1.PodRunning {
+						for _, ipObj := range pod.Status.PodIPs {
+							ip, err := netip.ParseAddr(ipObj.IP)
+							if err != nil {
+								return backoff.Permanent(fmt.Errorf("while parsing IP address: %w", err))
+							}
+							if ip.Is4() {
+								tester.pods[i].ipv4 = ip
+							} else {
+								tester.pods[i].ipv6 = ip
+							}
+						}
+						return nil
+					}
+				}
+				return fmt.Errorf("pod watcher failed")
+			}, backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), setupCtx))
+			if err != nil {
+				errChan <- fmt.Errorf("waiting for pod %d to be running failed: %w", i, err)
+				wg.Done()
+				return
+			}
+
+			inR, inW := io.Pipe()
+			outR, outW := io.Pipe()
+			tester.pods[i].in = inW
+			tester.pods[i].out = bufio.NewReader(outR)
+
+			req := s.ClientSet.CoreV1().RESTClient().Post().Resource("pods").Namespace(pod.Namespace).Name(pod.Name).SubResource("attach")
+			options := &corev1.PodAttachOptions{
+				Container: "connectivitytester",
+				Stdin:     true,
+				Stdout:    true,
+				Stderr:    true,
+			}
+			req.VersionedParams(options, scheme.ParameterCodec)
+			attachHandle, err := remotecommand.NewWebSocketExecutor(s.RESTConfig, "POST", req.URL().String())
+			if err != nil {
+				panic(err)
+			}
+
+			wg.Done()
+			err = attachHandle.StreamWithContext(testCtx, remotecommand.StreamOptions{
+				Stdin:  inR,
+				Stdout: outW,
+				Stderr: os.Stderr,
+			})
+			if err != nil && !errors.Is(err, testCtx.Err()) {
+				t.Logf("Stream for pod %d failed: %v", i, err)
+			}
+		}()
+	}
+	t.Cleanup(func() {
+		tester.cancel()
+		var cleanupWg sync.WaitGroup
+		cleanupCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+		defer cancel()
+		for i := range tester.pods {
+			p := &tester.pods[i]
+			if p.name == "" {
+				continue
+			}
+			cleanupWg.Add(1)
+			go func() {
+				defer cleanupWg.Done()
+				err := backoff.Retry(func() error {
+					ctx, cancel := context.WithTimeout(cleanupCtx, 5*time.Second)
+					defer cancel()
+					err := s.ClientSet.CoreV1().Pods(p.namespace).Delete(ctx, p.name, metav1.DeleteOptions{})
+					if kerrors.IsNotFound(err) {
+						return nil
+					}
+					return err
+				}, backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), cleanupCtx))
+				if err != nil {
+					t.Logf("Cleanup of pod %d failed: %v", i, err)
+				}
+			}()
+		}
+		cleanupWg.Wait()
+	})
+	wg.Wait()
+	close(errChan)
+	// Process asynchronous errors
+	for err := range errChan {
+		t.Error(err)
+	}
+	return &tester
+}
diff --git a/metropolis/test/e2e/connectivity/spec/BUILD.bazel b/metropolis/test/e2e/connectivity/spec/BUILD.bazel
new file mode 100644
index 0000000..619c8dd
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/spec/BUILD.bazel
@@ -0,0 +1,24 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
+load("@rules_proto//proto:defs.bzl", "proto_library")
+
+proto_library(
+    name = "metropolis_test_e2e_connectivty_tester_proto",
+    srcs = ["spec.proto"],
+    visibility = ["//visibility:public"],
+    deps = ["@protobuf//:duration_proto"],
+)
+
+go_proto_library(
+    name = "metropolis_test_e2e_connectivty_tester_go_proto",
+    importpath = "source.monogon.dev/metropolis/test/e2e/connectivity/spec",
+    proto = ":metropolis_test_e2e_connectivty_tester_proto",
+    visibility = ["//visibility:public"],
+)
+
+go_library(
+    name = "spec",
+    embed = [":metropolis_test_e2e_connectivty_tester_go_proto"],
+    importpath = "source.monogon.dev/metropolis/test/e2e/connectivity/spec",
+    visibility = ["//visibility:public"],
+)
diff --git a/metropolis/test/e2e/connectivity/spec/gomod-generated-placeholder.go b/metropolis/test/e2e/connectivity/spec/gomod-generated-placeholder.go
new file mode 100644
index 0000000..f09cd57
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/spec/gomod-generated-placeholder.go
@@ -0,0 +1 @@
+package spec
diff --git a/metropolis/test/e2e/connectivity/spec/spec.proto b/metropolis/test/e2e/connectivity/spec/spec.proto
new file mode 100644
index 0000000..8ab8a0c
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/spec/spec.proto
@@ -0,0 +1,59 @@
+syntax = "proto3";
+
+import "google/protobuf/duration.proto";
+
+package metropolis.test.e2e.connectivty_tester;
+
+message Request {
+    oneof req {
+        TestRequest test = 1;
+        StartServerRequest start_server = 2;
+        StopServerRequest stop_server = 3;
+    }
+}
+
+message Response {
+    oneof res {
+        TestResponse test = 1;
+        StartServerResponse start_server = 2;
+        StopServerResponse stop_server = 3;
+    }
+}
+
+message TestRequest {
+    string address = 1;
+    uint64 token = 2;
+    google.protobuf.Duration timeout = 3;
+}
+
+message TestResponse {
+    enum Result {
+        UNKNOWN = 0;
+        CONNECTION_REJECTED = 1;
+        CONNECTION_TIMEOUT = 2;
+        WRONG_TOKEN = 3;
+        SUCCESS = 4;
+    }
+    Result result = 1;
+    string error_description = 2;
+}
+
+message StartServerRequest {
+    string address = 1;
+    uint64 token = 2;
+}
+
+message StartServerResponse {
+    bool ok = 1;
+    string error_description = 2;
+}
+
+message StopServerRequest {
+    uint64 token = 1;
+}
+
+message StopServerResponse {
+    bool ok = 1;
+    string error_description = 2;
+}
+
diff --git a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
index fbfad4d..ac449eb 100644
--- a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
+++ b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
@@ -37,6 +37,7 @@
         "//metropolis/node",
         "//metropolis/proto/api",
         "//metropolis/proto/common",
+        "//metropolis/test/e2e/connectivity",
         "//metropolis/test/launch",
         "//metropolis/test/localregistry",
         "//metropolis/test/util",
diff --git a/metropolis/test/e2e/suites/kubernetes/run_test.go b/metropolis/test/e2e/suites/kubernetes/run_test.go
index 0acc473..939c663 100644
--- a/metropolis/test/e2e/suites/kubernetes/run_test.go
+++ b/metropolis/test/e2e/suites/kubernetes/run_test.go
@@ -30,6 +30,7 @@
 	common "source.monogon.dev/metropolis/node"
 	apb "source.monogon.dev/metropolis/proto/api"
 	cpb "source.monogon.dev/metropolis/proto/common"
+	"source.monogon.dev/metropolis/test/e2e/connectivity"
 	mlaunch "source.monogon.dev/metropolis/test/launch"
 	"source.monogon.dev/metropolis/test/localregistry"
 	"source.monogon.dev/metropolis/test/util"
@@ -284,7 +285,7 @@
 		}
 	}()
 
-	clientSet, _, err := cluster.GetKubeClientSet()
+	clientSet, restConfig, err := cluster.GetKubeClientSet()
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -371,6 +372,26 @@
 			return fmt.Errorf("pod is not ready: %s", errorMsg.String())
 		}
 	})
+	t.Run("Connectivity Smoke Tests", func(t *testing.T) {
+		ct := connectivity.SetupTest(t, &connectivity.TestSpec{
+			Name:       "connectivity-smoke",
+			ClientSet:  clientSet,
+			RESTConfig: restConfig,
+			NumPods:    2,
+			ExtraPodConfig: func(i int, pod *corev1.Pod) {
+				// Spread pods out over nodes to test inter-node network
+				pod.Labels = make(map[string]string)
+				pod.Labels["name"] = "connectivity-smoketest"
+				pod.Spec.TopologySpreadConstraints = []corev1.TopologySpreadConstraint{{
+					MaxSkew:           1,
+					TopologyKey:       "kubernetes.io/hostname",
+					WhenUnsatisfiable: corev1.DoNotSchedule,
+					LabelSelector:     metav1.SetAsLabelSelector(pod.Labels),
+				}}
+			},
+		})
+		ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess)
+	})
 	for _, runtimeClass := range []string{"runc", "gvisor"} {
 		statefulSetName := fmt.Sprintf("test-statefulset-%s", runtimeClass)
 		util.TestEventual(t, fmt.Sprintf("StatefulSet with %s tests", runtimeClass), ctx, smallTestTimeout, func(ctx context.Context) error {