m/test/e2e/connectivity: add connectivity tester
This adds a connectivity testing framework. It uses pod agents and
communicates with them over stdio. This is used to implement a simple
smoke test and will later be used to test network policy controllers.
Change-Id: If40673a91336dbe3a7a383bf2e9d17736fad3bdc
Reviewed-on: https://review.monogon.dev/c/monogon/+/3756
Reviewed-by: Jan Schär <jan@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/test/e2e/BUILD.bazel b/metropolis/test/e2e/BUILD.bazel
index 883dcb8..85ef13a 100644
--- a/metropolis/test/e2e/BUILD.bazel
+++ b/metropolis/test/e2e/BUILD.bazel
@@ -6,6 +6,7 @@
"//metropolis/test/e2e/selftest:selftest_image",
"//metropolis/test/e2e/persistentvolume:persistentvolume_image",
"//metropolis/test/e2e/httpserver:httpserver_image",
+ "//metropolis/test/e2e/connectivity/agent:agent_image",
],
visibility = [
"//metropolis/test/e2e/suites:__subpackages__",
diff --git a/metropolis/test/e2e/connectivity/BUILD.bazel b/metropolis/test/e2e/connectivity/BUILD.bazel
new file mode 100644
index 0000000..d8dbe3e
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/BUILD.bazel
@@ -0,0 +1,23 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "connectivity",
+ srcs = ["connectivity.go"],
+ importpath = "source.monogon.dev/metropolis/test/e2e/connectivity",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//metropolis/test/e2e/connectivity/spec",
+ "@com_github_cenkalti_backoff_v4//:backoff",
+ "@io_k8s_api//core/v1:core",
+ "@io_k8s_apimachinery//pkg/api/errors",
+ "@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
+ "@io_k8s_apimachinery//pkg/watch",
+ "@io_k8s_client_go//kubernetes",
+ "@io_k8s_client_go//kubernetes/scheme",
+ "@io_k8s_client_go//rest",
+ "@io_k8s_client_go//tools/remotecommand",
+ "@io_k8s_utils//ptr",
+ "@org_golang_google_protobuf//encoding/protodelim",
+ "@org_golang_google_protobuf//types/known/durationpb",
+ ],
+)
diff --git a/metropolis/test/e2e/connectivity/agent/BUILD.bazel b/metropolis/test/e2e/connectivity/agent/BUILD.bazel
new file mode 100644
index 0000000..cb52497
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/agent/BUILD.bazel
@@ -0,0 +1,43 @@
+load("@aspect_bazel_lib//lib:transitions.bzl", "platform_transition_binary")
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+load("@rules_oci//oci:defs.bzl", "oci_image")
+load("@rules_pkg//pkg:tar.bzl", "pkg_tar")
+
+go_library(
+ name = "agent_lib",
+ srcs = ["main.go"],
+ importpath = "source.monogon.dev/metropolis/test/e2e/connectivity/agent",
+ visibility = ["//visibility:private"],
+ deps = [
+ "//metropolis/test/e2e/connectivity/spec",
+ "@org_golang_google_protobuf//encoding/protodelim",
+ ],
+)
+
+go_binary(
+ name = "agent",
+ embed = [":agent_lib"],
+ visibility = ["//visibility:private"],
+)
+
+platform_transition_binary(
+ name = "agent_transitioned",
+ binary = ":agent",
+ target_platform = "//build/platforms:linux_amd64_static",
+ visibility = ["//visibility:private"],
+)
+
+pkg_tar(
+ name = "agent_layer",
+ srcs = [":agent_transitioned"],
+ visibility = ["//visibility:private"],
+)
+
+oci_image(
+ name = "agent_image",
+ base = "@distroless_base",
+ entrypoint = ["/agent"],
+ tars = [":agent_layer"],
+ visibility = ["//metropolis/test/e2e:__pkg__"],
+ workdir = "/app",
+)
diff --git a/metropolis/test/e2e/connectivity/agent/main.go b/metropolis/test/e2e/connectivity/agent/main.go
new file mode 100644
index 0000000..a11b488
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/agent/main.go
@@ -0,0 +1,123 @@
+// Copyright The Monogon Project Authors.
+// SPDX-License-Identifier: Apache-2.0
+
+package main
+
+// connectivity agent hosts test runner-defined network listeners and performs
+// connectivity tests to other instances of itself.
+// It runs in an OCI image and a test runner communicates with it over
+// stdin/stdout with delimited protobufs. See the spec directory for the
+// request/response definitions.
+
+import (
+ "bufio"
+ "context"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "os"
+ "time"
+
+ "google.golang.org/protobuf/encoding/protodelim"
+
+ "source.monogon.dev/metropolis/test/e2e/connectivity/spec"
+)
+
+func main() {
+ t := tester{
+ servers: make(map[uint64]net.Listener),
+ }
+ stdinReader := bufio.NewReader(os.Stdin)
+ for {
+ var req spec.Request
+ if err := protodelim.UnmarshalFrom(stdinReader, &req); err != nil {
+ log.Fatalf("Unable to unmarshal request: %v", err)
+ }
+ var res spec.Response
+ switch r := req.Req.(type) {
+ case *spec.Request_Test:
+ res.Res = &spec.Response_Test{Test: t.runTest(r.Test)}
+ case *spec.Request_StartServer:
+ res.Res = &spec.Response_StartServer{StartServer: t.startServer(r.StartServer)}
+ case *spec.Request_StopServer:
+ res.Res = &spec.Response_StopServer{StopServer: t.stopServer(r.StopServer)}
+ default:
+ log.Fatalf("Unknown request type: %T", r)
+ }
+ if _, err := protodelim.MarshalTo(os.Stdout, &res); err != nil {
+ log.Fatalf("Unable to marshal response: %v", err)
+ }
+ }
+}
+
+type tester struct {
+ servers map[uint64]net.Listener
+}
+
+func errToResponse(err error) *spec.TestResponse {
+ switch {
+ case errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, context.DeadlineExceeded):
+ return &spec.TestResponse{
+ Result: spec.TestResponse_CONNECTION_TIMEOUT,
+ ErrorDescription: err.Error(),
+ }
+ default:
+ return &spec.TestResponse{
+ Result: spec.TestResponse_CONNECTION_REJECTED,
+ ErrorDescription: err.Error(),
+ }
+ }
+}
+
+func (t *tester) runTest(req *spec.TestRequest) *spec.TestResponse {
+ conn, err := net.DialTimeout("tcp", req.Address, req.Timeout.AsDuration())
+ if err != nil {
+ return errToResponse(err)
+ }
+ defer conn.Close()
+ var tokenRaw [8]byte
+ conn.SetReadDeadline(time.Now().Add(req.Timeout.AsDuration()))
+ if _, err := io.ReadFull(conn, tokenRaw[:]); err != nil {
+ return errToResponse(err)
+ }
+ receivedToken := binary.LittleEndian.Uint64(tokenRaw[:])
+ if receivedToken != req.Token {
+ return &spec.TestResponse{
+ Result: spec.TestResponse_WRONG_TOKEN,
+ ErrorDescription: fmt.Sprintf("Received token %d, wanted %d", receivedToken, req.Token),
+ }
+ }
+ return &spec.TestResponse{
+ Result: spec.TestResponse_SUCCESS,
+ }
+}
+
+func (t *tester) startServer(req *spec.StartServerRequest) *spec.StartServerResponse {
+ l, err := net.Listen("tcp", req.Address)
+ if err != nil {
+ return &spec.StartServerResponse{ErrorDescription: err.Error()}
+ }
+ t.servers[req.Token] = l
+ go tokenServer(l, req.Token)
+ return &spec.StartServerResponse{Ok: true}
+}
+
+func tokenServer(l net.Listener, token uint64) {
+ for {
+ conn, err := l.Accept()
+ if err != nil {
+ return
+ }
+ conn.Write(binary.LittleEndian.AppendUint64(nil, token))
+ conn.Close()
+ }
+}
+
+func (t *tester) stopServer(req *spec.StopServerRequest) *spec.StopServerResponse {
+ t.servers[req.Token].Close()
+ delete(t.servers, req.Token)
+ return &spec.StopServerResponse{Ok: true}
+}
diff --git a/metropolis/test/e2e/connectivity/connectivity.go b/metropolis/test/e2e/connectivity/connectivity.go
new file mode 100644
index 0000000..070bacc
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/connectivity.go
@@ -0,0 +1,347 @@
+// Copyright The Monogon Project Authors.
+// SPDX-License-Identifier: Apache-2.0
+
+package connectivity
+
+import (
+ "bufio"
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/netip"
+ "os"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "google.golang.org/protobuf/encoding/protodelim"
+ "google.golang.org/protobuf/types/known/durationpb"
+ corev1 "k8s.io/api/core/v1"
+ kerrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/remotecommand"
+ "k8s.io/utils/ptr"
+
+ "source.monogon.dev/metropolis/test/e2e/connectivity/spec"
+)
+
+type podState struct {
+ name string
+ namespace string
+ ipv4 netip.Addr
+ ipv6 netip.Addr
+ reqMu sync.Mutex
+ in io.Writer
+ out *bufio.Reader
+}
+
+func (p *podState) do(req *spec.Request) (*spec.Response, error) {
+ // This could be made much faster by introducing a request/response routing
+ // layer which would allow concurrent requests and send the response to the
+ // correct request handler. For simplicity this hasn't been done.
+ p.reqMu.Lock()
+ defer p.reqMu.Unlock()
+ if _, err := protodelim.MarshalTo(p.in, req); err != nil {
+ return nil, fmt.Errorf("while sending request to pod: %w", err)
+ }
+ var res spec.Response
+ if err := protodelim.UnmarshalFrom(p.out, &res); err != nil {
+ return nil, fmt.Errorf("while reading response from pod: %w", err)
+ }
+ return &res, nil
+}
+
+type Tester struct {
+ cancel context.CancelFunc
+ clientSet kubernetes.Interface
+ pods []podState
+ lastToken atomic.Uint64
+}
+
+// Convenience aliases
+const (
+ ExpectedSuccess = spec.TestResponse_SUCCESS
+ ExpectedTimeout = spec.TestResponse_CONNECTION_TIMEOUT
+ ExpectedReject = spec.TestResponse_CONNECTION_REJECTED
+)
+
+func (te *Tester) GetToken() uint64 {
+ return te.lastToken.Add(1)
+}
+
+func (te *Tester) TestPodConnectivity(t *testing.T, srcPod, dstPod int, port int, expected spec.TestResponse_Result) {
+ te.TestPodConnectivityEventual(t, srcPod, dstPod, port, expected, 0)
+}
+
+func (te *Tester) TestPodConnectivityEventual(t *testing.T, srcPod, dstPod int, port int, expected spec.TestResponse_Result, timeout time.Duration) {
+ token := te.GetToken()
+ prefix := fmt.Sprintf("%d -> %d :%d", srcPod, dstPod, port)
+ if err := te.startServer(dstPod, &spec.StartServerRequest{
+ Address: net.JoinHostPort("0.0.0.0", strconv.Itoa(port)),
+ Token: token,
+ }); err != nil {
+ t.Fatalf("%v: %v", prefix, err)
+ return
+ }
+ defer func() {
+ if err := te.stopServer(dstPod, &spec.StopServerRequest{Token: token}); err != nil {
+ t.Log(err)
+ }
+ }()
+ deadline := time.Now().Add(timeout)
+ for {
+ res, err := te.runTest(srcPod, &spec.TestRequest{
+ Token: token,
+ Timeout: durationpb.New(5 * time.Second),
+ Address: net.JoinHostPort(te.pods[dstPod].ipv4.String(), strconv.Itoa(port)),
+ })
+ if err != nil {
+ // Unknown errors do not get retried
+ t.Fatalf("%v: %v", prefix, err)
+ return
+ }
+ err = checkExpectations(t, res, expected, prefix)
+ if err == nil {
+ return
+ } else if deadline.Before(time.Now()) {
+ t.Error(err)
+ return
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+}
+
+func checkExpectations(t *testing.T, res *spec.TestResponse, expected spec.TestResponse_Result, prefix string) error {
+ switch res.Result {
+ case spec.TestResponse_CONNECTION_REJECTED, spec.TestResponse_CONNECTION_TIMEOUT:
+ if expected != res.Result {
+ return fmt.Errorf("%v expected %v, got %v (%v)", prefix, expected, res.Result, res.ErrorDescription)
+ }
+ case spec.TestResponse_WRONG_TOKEN:
+ return fmt.Errorf("%v connected, but got wrong token", prefix)
+ case spec.TestResponse_SUCCESS:
+ if expected != ExpectedSuccess {
+ return fmt.Errorf("%v expected %v, got %v", prefix, expected, res.Result)
+ }
+ }
+ return nil
+}
+
+func (te *Tester) startServer(pod int, req *spec.StartServerRequest) error {
+ res, err := te.pods[pod].do(&spec.Request{Req: &spec.Request_StartServer{StartServer: req}})
+ if err != nil {
+ return fmt.Errorf("test pod communication failure: %w", err)
+ }
+ startRes := res.GetStartServer()
+ if !startRes.Ok {
+ return fmt.Errorf("test server could not be started: %v", startRes.ErrorDescription)
+ }
+ return nil
+}
+
+func (te *Tester) stopServer(pod int, req *spec.StopServerRequest) error {
+ res, err := te.pods[pod].do(&spec.Request{Req: &spec.Request_StopServer{StopServer: req}})
+ if err != nil {
+ return fmt.Errorf("test pod communication failure: %w", err)
+ }
+ stopRes := res.GetStopServer()
+ if !stopRes.Ok {
+ return fmt.Errorf("test server could not be stopped: %v", stopRes.ErrorDescription)
+ }
+ return nil
+}
+
+func (te *Tester) runTest(pod int, req *spec.TestRequest) (*spec.TestResponse, error) {
+ res, err := te.pods[pod].do(&spec.Request{Req: &spec.Request_Test{Test: req}})
+ if err != nil {
+ return nil, fmt.Errorf("test pod communication failure: %w", err)
+ }
+ testRes := res.GetTest()
+ if testRes.Result == spec.TestResponse_UNKNOWN {
+ return nil, fmt.Errorf("test encountered unknown error: %v", testRes.ErrorDescription)
+ }
+ return testRes, nil
+}
+
+type TestSpec struct {
+ // Name needs to contain a DNS label-compatible unique name which
+ // is used to identify the Kubernetes resources for this test.
+ Name string
+ // ClientSet needs to be a client for the test cluster.
+ ClientSet kubernetes.Interface
+ // RESTConfig needs to be the client config for the same test cluster.
+ RESTConfig *rest.Config
+ // Number of pods to start for testing. Their identities go from 0 to
+ // NumPods-1.
+ NumPods int
+ // ExtraPodConfig is called for every pod to be created and can be used to
+ // customize their specification.
+ ExtraPodConfig func(i int, pod *corev1.Pod)
+}
+
+// SetupTest sets up the K8s resources and communication channels for
+// connectivity tests. It registers a cleanup function which automatically
+// tears them down again after the test.
+func SetupTest(t *testing.T, s *TestSpec) *Tester {
+ t.Helper()
+
+ testCtx, testCancel := context.WithCancel(context.Background())
+
+ tester := Tester{
+ cancel: testCancel,
+ clientSet: s.ClientSet,
+ pods: make([]podState, s.NumPods),
+ }
+
+ // Use a non-zero arbitrary start value to decrease the chance of
+ // accidential conflicts.
+ tester.lastToken.Store(1234)
+ wg := sync.WaitGroup{}
+ setupCtx, cancel := context.WithTimeout(testCtx, 60*time.Second)
+ defer cancel()
+ errChan := make(chan error, s.NumPods)
+ for i := 0; i < s.NumPods; i++ {
+ pod := &corev1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: fmt.Sprintf("%v-%d", s.Name, i),
+ Namespace: "default",
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{{
+ Name: "connectivitytester",
+ Image: "test.monogon.internal/metropolis/test/e2e/connectivity/agent/agent_image",
+ Stdin: true,
+ }},
+ EnableServiceLinks: ptr.To(false),
+ },
+ }
+ if s.ExtraPodConfig != nil {
+ s.ExtraPodConfig(i, pod)
+ }
+ wg.Add(1)
+ go func() {
+ pod, err := backoff.RetryNotifyWithData(func() (*corev1.Pod, error) {
+ ctx, cancel := context.WithTimeout(setupCtx, 5*time.Second)
+ defer cancel()
+ return s.ClientSet.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
+ }, backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), setupCtx), func(err error, d time.Duration) {
+ t.Logf("attempted creating pod %d: %v", i, err)
+ })
+ if err != nil {
+ errChan <- fmt.Errorf("creating pod %d failed: %w", i, err)
+ wg.Done()
+ return
+ }
+ tester.pods[i].name = pod.Name
+ tester.pods[i].namespace = pod.Namespace
+ // Wait for pods to be ready and populate IPs
+ err = backoff.Retry(func() error {
+ podW, err := s.ClientSet.CoreV1().Pods(pod.Namespace).Watch(setupCtx, metav1.SingleObject(pod.ObjectMeta))
+ if err != nil {
+ return err
+ }
+ defer podW.Stop()
+ podUpdates := podW.ResultChan()
+ for event := range podUpdates {
+ if event.Type != watch.Modified && event.Type != watch.Added {
+ continue
+ }
+ pod = event.Object.(*corev1.Pod)
+ if pod.Status.Phase == corev1.PodRunning {
+ for _, ipObj := range pod.Status.PodIPs {
+ ip, err := netip.ParseAddr(ipObj.IP)
+ if err != nil {
+ return backoff.Permanent(fmt.Errorf("while parsing IP address: %w", err))
+ }
+ if ip.Is4() {
+ tester.pods[i].ipv4 = ip
+ } else {
+ tester.pods[i].ipv6 = ip
+ }
+ }
+ return nil
+ }
+ }
+ return fmt.Errorf("pod watcher failed")
+ }, backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), setupCtx))
+ if err != nil {
+ errChan <- fmt.Errorf("waiting for pod %d to be running failed: %w", i, err)
+ wg.Done()
+ return
+ }
+
+ inR, inW := io.Pipe()
+ outR, outW := io.Pipe()
+ tester.pods[i].in = inW
+ tester.pods[i].out = bufio.NewReader(outR)
+
+ req := s.ClientSet.CoreV1().RESTClient().Post().Resource("pods").Namespace(pod.Namespace).Name(pod.Name).SubResource("attach")
+ options := &corev1.PodAttachOptions{
+ Container: "connectivitytester",
+ Stdin: true,
+ Stdout: true,
+ Stderr: true,
+ }
+ req.VersionedParams(options, scheme.ParameterCodec)
+ attachHandle, err := remotecommand.NewWebSocketExecutor(s.RESTConfig, "POST", req.URL().String())
+ if err != nil {
+ panic(err)
+ }
+
+ wg.Done()
+ err = attachHandle.StreamWithContext(testCtx, remotecommand.StreamOptions{
+ Stdin: inR,
+ Stdout: outW,
+ Stderr: os.Stderr,
+ })
+ if err != nil && !errors.Is(err, testCtx.Err()) {
+ t.Logf("Stream for pod %d failed: %v", i, err)
+ }
+ }()
+ }
+ t.Cleanup(func() {
+ tester.cancel()
+ var cleanupWg sync.WaitGroup
+ cleanupCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+ defer cancel()
+ for i := range tester.pods {
+ p := &tester.pods[i]
+ if p.name == "" {
+ continue
+ }
+ cleanupWg.Add(1)
+ go func() {
+ defer cleanupWg.Done()
+ err := backoff.Retry(func() error {
+ ctx, cancel := context.WithTimeout(cleanupCtx, 5*time.Second)
+ defer cancel()
+ err := s.ClientSet.CoreV1().Pods(p.namespace).Delete(ctx, p.name, metav1.DeleteOptions{})
+ if kerrors.IsNotFound(err) {
+ return nil
+ }
+ return err
+ }, backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), cleanupCtx))
+ if err != nil {
+ t.Logf("Cleanup of pod %d failed: %v", i, err)
+ }
+ }()
+ }
+ cleanupWg.Wait()
+ })
+ wg.Wait()
+ close(errChan)
+ // Process asynchronous errors
+ for err := range errChan {
+ t.Error(err)
+ }
+ return &tester
+}
diff --git a/metropolis/test/e2e/connectivity/spec/BUILD.bazel b/metropolis/test/e2e/connectivity/spec/BUILD.bazel
new file mode 100644
index 0000000..619c8dd
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/spec/BUILD.bazel
@@ -0,0 +1,24 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
+load("@rules_proto//proto:defs.bzl", "proto_library")
+
+proto_library(
+ name = "metropolis_test_e2e_connectivty_tester_proto",
+ srcs = ["spec.proto"],
+ visibility = ["//visibility:public"],
+ deps = ["@protobuf//:duration_proto"],
+)
+
+go_proto_library(
+ name = "metropolis_test_e2e_connectivty_tester_go_proto",
+ importpath = "source.monogon.dev/metropolis/test/e2e/connectivity/spec",
+ proto = ":metropolis_test_e2e_connectivty_tester_proto",
+ visibility = ["//visibility:public"],
+)
+
+go_library(
+ name = "spec",
+ embed = [":metropolis_test_e2e_connectivty_tester_go_proto"],
+ importpath = "source.monogon.dev/metropolis/test/e2e/connectivity/spec",
+ visibility = ["//visibility:public"],
+)
diff --git a/metropolis/test/e2e/connectivity/spec/gomod-generated-placeholder.go b/metropolis/test/e2e/connectivity/spec/gomod-generated-placeholder.go
new file mode 100644
index 0000000..f09cd57
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/spec/gomod-generated-placeholder.go
@@ -0,0 +1 @@
+package spec
diff --git a/metropolis/test/e2e/connectivity/spec/spec.proto b/metropolis/test/e2e/connectivity/spec/spec.proto
new file mode 100644
index 0000000..8ab8a0c
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/spec/spec.proto
@@ -0,0 +1,59 @@
+syntax = "proto3";
+
+import "google/protobuf/duration.proto";
+
+package metropolis.test.e2e.connectivty_tester;
+
+message Request {
+ oneof req {
+ TestRequest test = 1;
+ StartServerRequest start_server = 2;
+ StopServerRequest stop_server = 3;
+ }
+}
+
+message Response {
+ oneof res {
+ TestResponse test = 1;
+ StartServerResponse start_server = 2;
+ StopServerResponse stop_server = 3;
+ }
+}
+
+message TestRequest {
+ string address = 1;
+ uint64 token = 2;
+ google.protobuf.Duration timeout = 3;
+}
+
+message TestResponse {
+ enum Result {
+ UNKNOWN = 0;
+ CONNECTION_REJECTED = 1;
+ CONNECTION_TIMEOUT = 2;
+ WRONG_TOKEN = 3;
+ SUCCESS = 4;
+ }
+ Result result = 1;
+ string error_description = 2;
+}
+
+message StartServerRequest {
+ string address = 1;
+ uint64 token = 2;
+}
+
+message StartServerResponse {
+ bool ok = 1;
+ string error_description = 2;
+}
+
+message StopServerRequest {
+ uint64 token = 1;
+}
+
+message StopServerResponse {
+ bool ok = 1;
+ string error_description = 2;
+}
+
diff --git a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
index fbfad4d..ac449eb 100644
--- a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
+++ b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
@@ -37,6 +37,7 @@
"//metropolis/node",
"//metropolis/proto/api",
"//metropolis/proto/common",
+ "//metropolis/test/e2e/connectivity",
"//metropolis/test/launch",
"//metropolis/test/localregistry",
"//metropolis/test/util",
diff --git a/metropolis/test/e2e/suites/kubernetes/run_test.go b/metropolis/test/e2e/suites/kubernetes/run_test.go
index 0acc473..939c663 100644
--- a/metropolis/test/e2e/suites/kubernetes/run_test.go
+++ b/metropolis/test/e2e/suites/kubernetes/run_test.go
@@ -30,6 +30,7 @@
common "source.monogon.dev/metropolis/node"
apb "source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
+ "source.monogon.dev/metropolis/test/e2e/connectivity"
mlaunch "source.monogon.dev/metropolis/test/launch"
"source.monogon.dev/metropolis/test/localregistry"
"source.monogon.dev/metropolis/test/util"
@@ -284,7 +285,7 @@
}
}()
- clientSet, _, err := cluster.GetKubeClientSet()
+ clientSet, restConfig, err := cluster.GetKubeClientSet()
if err != nil {
t.Fatal(err)
}
@@ -371,6 +372,26 @@
return fmt.Errorf("pod is not ready: %s", errorMsg.String())
}
})
+ t.Run("Connectivity Smoke Tests", func(t *testing.T) {
+ ct := connectivity.SetupTest(t, &connectivity.TestSpec{
+ Name: "connectivity-smoke",
+ ClientSet: clientSet,
+ RESTConfig: restConfig,
+ NumPods: 2,
+ ExtraPodConfig: func(i int, pod *corev1.Pod) {
+ // Spread pods out over nodes to test inter-node network
+ pod.Labels = make(map[string]string)
+ pod.Labels["name"] = "connectivity-smoketest"
+ pod.Spec.TopologySpreadConstraints = []corev1.TopologySpreadConstraint{{
+ MaxSkew: 1,
+ TopologyKey: "kubernetes.io/hostname",
+ WhenUnsatisfiable: corev1.DoNotSchedule,
+ LabelSelector: metav1.SetAsLabelSelector(pod.Labels),
+ }}
+ },
+ })
+ ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess)
+ })
for _, runtimeClass := range []string{"runc", "gvisor"} {
statefulSetName := fmt.Sprintf("test-statefulset-%s", runtimeClass)
util.TestEventual(t, fmt.Sprintf("StatefulSet with %s tests", runtimeClass), ctx, smallTestTimeout, func(ctx context.Context) error {