blob: a11b488376cb5e532c9abbb5709bce01f201a135 [file] [log] [blame]
Lorenz Brunde57e6f2025-01-08 16:34:08 +00001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
4package main
5
6// connectivity agent hosts test runner-defined network listeners and performs
7// connectivity tests to other instances of itself.
8// It runs in an OCI image and a test runner communicates with it over
9// stdin/stdout with delimited protobufs. See the spec directory for the
10// request/response definitions.
11
12import (
13 "bufio"
14 "context"
15 "encoding/binary"
16 "errors"
17 "fmt"
18 "io"
19 "log"
20 "net"
21 "os"
22 "time"
23
24 "google.golang.org/protobuf/encoding/protodelim"
25
26 "source.monogon.dev/metropolis/test/e2e/connectivity/spec"
27)
28
29func main() {
30 t := tester{
31 servers: make(map[uint64]net.Listener),
32 }
33 stdinReader := bufio.NewReader(os.Stdin)
34 for {
35 var req spec.Request
36 if err := protodelim.UnmarshalFrom(stdinReader, &req); err != nil {
37 log.Fatalf("Unable to unmarshal request: %v", err)
38 }
39 var res spec.Response
40 switch r := req.Req.(type) {
41 case *spec.Request_Test:
42 res.Res = &spec.Response_Test{Test: t.runTest(r.Test)}
43 case *spec.Request_StartServer:
44 res.Res = &spec.Response_StartServer{StartServer: t.startServer(r.StartServer)}
45 case *spec.Request_StopServer:
46 res.Res = &spec.Response_StopServer{StopServer: t.stopServer(r.StopServer)}
47 default:
48 log.Fatalf("Unknown request type: %T", r)
49 }
50 if _, err := protodelim.MarshalTo(os.Stdout, &res); err != nil {
51 log.Fatalf("Unable to marshal response: %v", err)
52 }
53 }
54}
55
56type tester struct {
57 servers map[uint64]net.Listener
58}
59
60func errToResponse(err error) *spec.TestResponse {
61 switch {
62 case errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, context.DeadlineExceeded):
63 return &spec.TestResponse{
64 Result: spec.TestResponse_CONNECTION_TIMEOUT,
65 ErrorDescription: err.Error(),
66 }
67 default:
68 return &spec.TestResponse{
69 Result: spec.TestResponse_CONNECTION_REJECTED,
70 ErrorDescription: err.Error(),
71 }
72 }
73}
74
75func (t *tester) runTest(req *spec.TestRequest) *spec.TestResponse {
76 conn, err := net.DialTimeout("tcp", req.Address, req.Timeout.AsDuration())
77 if err != nil {
78 return errToResponse(err)
79 }
80 defer conn.Close()
81 var tokenRaw [8]byte
82 conn.SetReadDeadline(time.Now().Add(req.Timeout.AsDuration()))
83 if _, err := io.ReadFull(conn, tokenRaw[:]); err != nil {
84 return errToResponse(err)
85 }
86 receivedToken := binary.LittleEndian.Uint64(tokenRaw[:])
87 if receivedToken != req.Token {
88 return &spec.TestResponse{
89 Result: spec.TestResponse_WRONG_TOKEN,
90 ErrorDescription: fmt.Sprintf("Received token %d, wanted %d", receivedToken, req.Token),
91 }
92 }
93 return &spec.TestResponse{
94 Result: spec.TestResponse_SUCCESS,
95 }
96}
97
98func (t *tester) startServer(req *spec.StartServerRequest) *spec.StartServerResponse {
99 l, err := net.Listen("tcp", req.Address)
100 if err != nil {
101 return &spec.StartServerResponse{ErrorDescription: err.Error()}
102 }
103 t.servers[req.Token] = l
104 go tokenServer(l, req.Token)
105 return &spec.StartServerResponse{Ok: true}
106}
107
108func tokenServer(l net.Listener, token uint64) {
109 for {
110 conn, err := l.Accept()
111 if err != nil {
112 return
113 }
114 conn.Write(binary.LittleEndian.AppendUint64(nil, token))
115 conn.Close()
116 }
117}
118
119func (t *tester) stopServer(req *spec.StopServerRequest) *spec.StopServerResponse {
120 t.servers[req.Token].Close()
121 delete(t.servers, req.Token)
122 return &spec.StopServerResponse{Ok: true}
123}