| Lorenz Brun | de57e6f | 2025-01-08 16:34:08 +0000 | [diff] [blame^] | 1 | // Copyright The Monogon Project Authors. |
| 2 | // SPDX-License-Identifier: Apache-2.0 |
| 3 | |
| 4 | package main |
| 5 | |
| 6 | // connectivity agent hosts test runner-defined network listeners and performs |
| 7 | // connectivity tests to other instances of itself. |
| 8 | // It runs in an OCI image and a test runner communicates with it over |
| 9 | // stdin/stdout with delimited protobufs. See the spec directory for the |
| 10 | // request/response definitions. |
| 11 | |
| 12 | import ( |
| 13 | "bufio" |
| 14 | "context" |
| 15 | "encoding/binary" |
| 16 | "errors" |
| 17 | "fmt" |
| 18 | "io" |
| 19 | "log" |
| 20 | "net" |
| 21 | "os" |
| 22 | "time" |
| 23 | |
| 24 | "google.golang.org/protobuf/encoding/protodelim" |
| 25 | |
| 26 | "source.monogon.dev/metropolis/test/e2e/connectivity/spec" |
| 27 | ) |
| 28 | |
| 29 | func main() { |
| 30 | t := tester{ |
| 31 | servers: make(map[uint64]net.Listener), |
| 32 | } |
| 33 | stdinReader := bufio.NewReader(os.Stdin) |
| 34 | for { |
| 35 | var req spec.Request |
| 36 | if err := protodelim.UnmarshalFrom(stdinReader, &req); err != nil { |
| 37 | log.Fatalf("Unable to unmarshal request: %v", err) |
| 38 | } |
| 39 | var res spec.Response |
| 40 | switch r := req.Req.(type) { |
| 41 | case *spec.Request_Test: |
| 42 | res.Res = &spec.Response_Test{Test: t.runTest(r.Test)} |
| 43 | case *spec.Request_StartServer: |
| 44 | res.Res = &spec.Response_StartServer{StartServer: t.startServer(r.StartServer)} |
| 45 | case *spec.Request_StopServer: |
| 46 | res.Res = &spec.Response_StopServer{StopServer: t.stopServer(r.StopServer)} |
| 47 | default: |
| 48 | log.Fatalf("Unknown request type: %T", r) |
| 49 | } |
| 50 | if _, err := protodelim.MarshalTo(os.Stdout, &res); err != nil { |
| 51 | log.Fatalf("Unable to marshal response: %v", err) |
| 52 | } |
| 53 | } |
| 54 | } |
| 55 | |
| 56 | type tester struct { |
| 57 | servers map[uint64]net.Listener |
| 58 | } |
| 59 | |
| 60 | func errToResponse(err error) *spec.TestResponse { |
| 61 | switch { |
| 62 | case errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, context.DeadlineExceeded): |
| 63 | return &spec.TestResponse{ |
| 64 | Result: spec.TestResponse_CONNECTION_TIMEOUT, |
| 65 | ErrorDescription: err.Error(), |
| 66 | } |
| 67 | default: |
| 68 | return &spec.TestResponse{ |
| 69 | Result: spec.TestResponse_CONNECTION_REJECTED, |
| 70 | ErrorDescription: err.Error(), |
| 71 | } |
| 72 | } |
| 73 | } |
| 74 | |
| 75 | func (t *tester) runTest(req *spec.TestRequest) *spec.TestResponse { |
| 76 | conn, err := net.DialTimeout("tcp", req.Address, req.Timeout.AsDuration()) |
| 77 | if err != nil { |
| 78 | return errToResponse(err) |
| 79 | } |
| 80 | defer conn.Close() |
| 81 | var tokenRaw [8]byte |
| 82 | conn.SetReadDeadline(time.Now().Add(req.Timeout.AsDuration())) |
| 83 | if _, err := io.ReadFull(conn, tokenRaw[:]); err != nil { |
| 84 | return errToResponse(err) |
| 85 | } |
| 86 | receivedToken := binary.LittleEndian.Uint64(tokenRaw[:]) |
| 87 | if receivedToken != req.Token { |
| 88 | return &spec.TestResponse{ |
| 89 | Result: spec.TestResponse_WRONG_TOKEN, |
| 90 | ErrorDescription: fmt.Sprintf("Received token %d, wanted %d", receivedToken, req.Token), |
| 91 | } |
| 92 | } |
| 93 | return &spec.TestResponse{ |
| 94 | Result: spec.TestResponse_SUCCESS, |
| 95 | } |
| 96 | } |
| 97 | |
| 98 | func (t *tester) startServer(req *spec.StartServerRequest) *spec.StartServerResponse { |
| 99 | l, err := net.Listen("tcp", req.Address) |
| 100 | if err != nil { |
| 101 | return &spec.StartServerResponse{ErrorDescription: err.Error()} |
| 102 | } |
| 103 | t.servers[req.Token] = l |
| 104 | go tokenServer(l, req.Token) |
| 105 | return &spec.StartServerResponse{Ok: true} |
| 106 | } |
| 107 | |
| 108 | func tokenServer(l net.Listener, token uint64) { |
| 109 | for { |
| 110 | conn, err := l.Accept() |
| 111 | if err != nil { |
| 112 | return |
| 113 | } |
| 114 | conn.Write(binary.LittleEndian.AppendUint64(nil, token)) |
| 115 | conn.Close() |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | func (t *tester) stopServer(req *spec.StopServerRequest) *spec.StopServerResponse { |
| 120 | t.servers[req.Token].Close() |
| 121 | delete(t.servers, req.Token) |
| 122 | return &spec.StopServerResponse{Ok: true} |
| 123 | } |