m/test/e2e/connectivity: add connectivity tester

This adds a connectivity testing framework. It uses pod agents and
communicates with them over stdio. This is used to implement a simple
smoke test and will later be used to test network policy controllers.

Change-Id: If40673a91336dbe3a7a383bf2e9d17736fad3bdc
Reviewed-on: https://review.monogon.dev/c/monogon/+/3756
Reviewed-by: Jan Schär <jan@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/test/e2e/connectivity/agent/main.go b/metropolis/test/e2e/connectivity/agent/main.go
new file mode 100644
index 0000000..a11b488
--- /dev/null
+++ b/metropolis/test/e2e/connectivity/agent/main.go
@@ -0,0 +1,123 @@
+// Copyright The Monogon Project Authors.
+// SPDX-License-Identifier: Apache-2.0
+
+package main
+
+// connectivity agent hosts test runner-defined network listeners and performs
+// connectivity tests to other instances of itself.
+// It runs in an OCI image and a test runner communicates with it over
+// stdin/stdout with delimited protobufs. See the spec directory for the
+// request/response definitions.
+
+import (
+	"bufio"
+	"context"
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"io"
+	"log"
+	"net"
+	"os"
+	"time"
+
+	"google.golang.org/protobuf/encoding/protodelim"
+
+	"source.monogon.dev/metropolis/test/e2e/connectivity/spec"
+)
+
+func main() {
+	t := tester{
+		servers: make(map[uint64]net.Listener),
+	}
+	stdinReader := bufio.NewReader(os.Stdin)
+	for {
+		var req spec.Request
+		if err := protodelim.UnmarshalFrom(stdinReader, &req); err != nil {
+			log.Fatalf("Unable to unmarshal request: %v", err)
+		}
+		var res spec.Response
+		switch r := req.Req.(type) {
+		case *spec.Request_Test:
+			res.Res = &spec.Response_Test{Test: t.runTest(r.Test)}
+		case *spec.Request_StartServer:
+			res.Res = &spec.Response_StartServer{StartServer: t.startServer(r.StartServer)}
+		case *spec.Request_StopServer:
+			res.Res = &spec.Response_StopServer{StopServer: t.stopServer(r.StopServer)}
+		default:
+			log.Fatalf("Unknown request type: %T", r)
+		}
+		if _, err := protodelim.MarshalTo(os.Stdout, &res); err != nil {
+			log.Fatalf("Unable to marshal response: %v", err)
+		}
+	}
+}
+
+type tester struct {
+	servers map[uint64]net.Listener
+}
+
+func errToResponse(err error) *spec.TestResponse {
+	switch {
+	case errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, context.DeadlineExceeded):
+		return &spec.TestResponse{
+			Result:           spec.TestResponse_CONNECTION_TIMEOUT,
+			ErrorDescription: err.Error(),
+		}
+	default:
+		return &spec.TestResponse{
+			Result:           spec.TestResponse_CONNECTION_REJECTED,
+			ErrorDescription: err.Error(),
+		}
+	}
+}
+
+func (t *tester) runTest(req *spec.TestRequest) *spec.TestResponse {
+	conn, err := net.DialTimeout("tcp", req.Address, req.Timeout.AsDuration())
+	if err != nil {
+		return errToResponse(err)
+	}
+	defer conn.Close()
+	var tokenRaw [8]byte
+	conn.SetReadDeadline(time.Now().Add(req.Timeout.AsDuration()))
+	if _, err := io.ReadFull(conn, tokenRaw[:]); err != nil {
+		return errToResponse(err)
+	}
+	receivedToken := binary.LittleEndian.Uint64(tokenRaw[:])
+	if receivedToken != req.Token {
+		return &spec.TestResponse{
+			Result:           spec.TestResponse_WRONG_TOKEN,
+			ErrorDescription: fmt.Sprintf("Received token %d, wanted %d", receivedToken, req.Token),
+		}
+	}
+	return &spec.TestResponse{
+		Result: spec.TestResponse_SUCCESS,
+	}
+}
+
+func (t *tester) startServer(req *spec.StartServerRequest) *spec.StartServerResponse {
+	l, err := net.Listen("tcp", req.Address)
+	if err != nil {
+		return &spec.StartServerResponse{ErrorDescription: err.Error()}
+	}
+	t.servers[req.Token] = l
+	go tokenServer(l, req.Token)
+	return &spec.StartServerResponse{Ok: true}
+}
+
+func tokenServer(l net.Listener, token uint64) {
+	for {
+		conn, err := l.Accept()
+		if err != nil {
+			return
+		}
+		conn.Write(binary.LittleEndian.AppendUint64(nil, token))
+		conn.Close()
+	}
+}
+
+func (t *tester) stopServer(req *spec.StopServerRequest) *spec.StopServerResponse {
+	t.servers[req.Token].Close()
+	delete(t.servers, req.Token)
+	return &spec.StopServerResponse{Ok: true}
+}