blob: 7ecbb085289ba3f37e138ee2eee2a308899eec00 [file] [log] [blame]
Lorenz Brunde57e6f2025-01-08 16:34:08 +00001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
4package connectivity
5
6import (
7 "bufio"
8 "context"
9 "errors"
10 "fmt"
11 "io"
12 "net"
13 "net/netip"
14 "os"
15 "strconv"
16 "sync"
17 "sync/atomic"
18 "testing"
19 "time"
20
21 "github.com/cenkalti/backoff/v4"
22 "google.golang.org/protobuf/encoding/protodelim"
23 "google.golang.org/protobuf/types/known/durationpb"
24 corev1 "k8s.io/api/core/v1"
25 kerrors "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/watch"
28 "k8s.io/client-go/kubernetes"
29 "k8s.io/client-go/kubernetes/scheme"
30 "k8s.io/client-go/rest"
31 "k8s.io/client-go/tools/remotecommand"
32 "k8s.io/utils/ptr"
33
34 "source.monogon.dev/metropolis/test/e2e/connectivity/spec"
35)
36
37type podState struct {
38 name string
39 namespace string
40 ipv4 netip.Addr
41 ipv6 netip.Addr
42 reqMu sync.Mutex
43 in io.Writer
44 out *bufio.Reader
45}
46
47func (p *podState) do(req *spec.Request) (*spec.Response, error) {
48 // This could be made much faster by introducing a request/response routing
49 // layer which would allow concurrent requests and send the response to the
50 // correct request handler. For simplicity this hasn't been done.
51 p.reqMu.Lock()
52 defer p.reqMu.Unlock()
53 if _, err := protodelim.MarshalTo(p.in, req); err != nil {
54 return nil, fmt.Errorf("while sending request to pod: %w", err)
55 }
56 var res spec.Response
57 if err := protodelim.UnmarshalFrom(p.out, &res); err != nil {
58 return nil, fmt.Errorf("while reading response from pod: %w", err)
59 }
60 return &res, nil
61}
62
63type Tester struct {
64 cancel context.CancelFunc
65 clientSet kubernetes.Interface
66 pods []podState
67 lastToken atomic.Uint64
68}
69
70// Convenience aliases
71const (
72 ExpectedSuccess = spec.TestResponse_SUCCESS
73 ExpectedTimeout = spec.TestResponse_CONNECTION_TIMEOUT
74 ExpectedReject = spec.TestResponse_CONNECTION_REJECTED
75)
76
77func (te *Tester) GetToken() uint64 {
78 return te.lastToken.Add(1)
79}
80
81func (te *Tester) TestPodConnectivity(t *testing.T, srcPod, dstPod int, port int, expected spec.TestResponse_Result) {
82 te.TestPodConnectivityEventual(t, srcPod, dstPod, port, expected, 0)
83}
84
85func (te *Tester) TestPodConnectivityEventual(t *testing.T, srcPod, dstPod int, port int, expected spec.TestResponse_Result, timeout time.Duration) {
86 token := te.GetToken()
87 prefix := fmt.Sprintf("%d -> %d :%d", srcPod, dstPod, port)
88 if err := te.startServer(dstPod, &spec.StartServerRequest{
89 Address: net.JoinHostPort("0.0.0.0", strconv.Itoa(port)),
90 Token: token,
91 }); err != nil {
92 t.Fatalf("%v: %v", prefix, err)
93 return
94 }
95 defer func() {
96 if err := te.stopServer(dstPod, &spec.StopServerRequest{Token: token}); err != nil {
97 t.Log(err)
98 }
99 }()
100 deadline := time.Now().Add(timeout)
101 for {
102 res, err := te.runTest(srcPod, &spec.TestRequest{
103 Token: token,
104 Timeout: durationpb.New(5 * time.Second),
105 Address: net.JoinHostPort(te.pods[dstPod].ipv4.String(), strconv.Itoa(port)),
106 })
107 if err != nil {
108 // Unknown errors do not get retried
109 t.Fatalf("%v: %v", prefix, err)
110 return
111 }
112 err = checkExpectations(t, res, expected, prefix)
113 if err == nil {
114 return
115 } else if deadline.Before(time.Now()) {
116 t.Error(err)
117 return
118 }
119 time.Sleep(100 * time.Millisecond)
120 }
121}
122
123func checkExpectations(t *testing.T, res *spec.TestResponse, expected spec.TestResponse_Result, prefix string) error {
124 switch res.Result {
125 case spec.TestResponse_CONNECTION_REJECTED, spec.TestResponse_CONNECTION_TIMEOUT:
126 if expected != res.Result {
127 return fmt.Errorf("%v expected %v, got %v (%v)", prefix, expected, res.Result, res.ErrorDescription)
128 }
129 case spec.TestResponse_WRONG_TOKEN:
130 return fmt.Errorf("%v connected, but got wrong token", prefix)
131 case spec.TestResponse_SUCCESS:
132 if expected != ExpectedSuccess {
133 return fmt.Errorf("%v expected %v, got %v", prefix, expected, res.Result)
134 }
135 }
136 return nil
137}
138
139func (te *Tester) startServer(pod int, req *spec.StartServerRequest) error {
140 res, err := te.pods[pod].do(&spec.Request{Req: &spec.Request_StartServer{StartServer: req}})
141 if err != nil {
142 return fmt.Errorf("test pod communication failure: %w", err)
143 }
144 startRes := res.GetStartServer()
145 if !startRes.Ok {
146 return fmt.Errorf("test server could not be started: %v", startRes.ErrorDescription)
147 }
148 return nil
149}
150
151func (te *Tester) stopServer(pod int, req *spec.StopServerRequest) error {
152 res, err := te.pods[pod].do(&spec.Request{Req: &spec.Request_StopServer{StopServer: req}})
153 if err != nil {
154 return fmt.Errorf("test pod communication failure: %w", err)
155 }
156 stopRes := res.GetStopServer()
157 if !stopRes.Ok {
158 return fmt.Errorf("test server could not be stopped: %v", stopRes.ErrorDescription)
159 }
160 return nil
161}
162
163func (te *Tester) runTest(pod int, req *spec.TestRequest) (*spec.TestResponse, error) {
164 res, err := te.pods[pod].do(&spec.Request{Req: &spec.Request_Test{Test: req}})
165 if err != nil {
166 return nil, fmt.Errorf("test pod communication failure: %w", err)
167 }
168 testRes := res.GetTest()
169 if testRes.Result == spec.TestResponse_UNKNOWN {
170 return nil, fmt.Errorf("test encountered unknown error: %v", testRes.ErrorDescription)
171 }
172 return testRes, nil
173}
174
175type TestSpec struct {
176 // Name needs to contain a DNS label-compatible unique name which
177 // is used to identify the Kubernetes resources for this test.
178 Name string
179 // ClientSet needs to be a client for the test cluster.
180 ClientSet kubernetes.Interface
181 // RESTConfig needs to be the client config for the same test cluster.
182 RESTConfig *rest.Config
183 // Number of pods to start for testing. Their identities go from 0 to
184 // NumPods-1.
185 NumPods int
186 // ExtraPodConfig is called for every pod to be created and can be used to
187 // customize their specification.
188 ExtraPodConfig func(i int, pod *corev1.Pod)
189}
190
191// SetupTest sets up the K8s resources and communication channels for
192// connectivity tests. It registers a cleanup function which automatically
193// tears them down again after the test.
194func SetupTest(t *testing.T, s *TestSpec) *Tester {
195 t.Helper()
196
197 testCtx, testCancel := context.WithCancel(context.Background())
198
199 tester := Tester{
200 cancel: testCancel,
201 clientSet: s.ClientSet,
202 pods: make([]podState, s.NumPods),
203 }
204
205 // Use a non-zero arbitrary start value to decrease the chance of
206 // accidential conflicts.
207 tester.lastToken.Store(1234)
208 wg := sync.WaitGroup{}
209 setupCtx, cancel := context.WithTimeout(testCtx, 60*time.Second)
210 defer cancel()
211 errChan := make(chan error, s.NumPods)
212 for i := 0; i < s.NumPods; i++ {
213 pod := &corev1.Pod{
214 ObjectMeta: metav1.ObjectMeta{
215 Name: fmt.Sprintf("%v-%d", s.Name, i),
216 Namespace: "default",
217 },
218 Spec: corev1.PodSpec{
219 Containers: []corev1.Container{{
220 Name: "connectivitytester",
Jan Schär9d2f3c62025-04-14 11:17:22 +0000221 Image: "test.monogon.internal/connectivity/agent:latest",
Lorenz Brunde57e6f2025-01-08 16:34:08 +0000222 Stdin: true,
223 }},
224 EnableServiceLinks: ptr.To(false),
225 },
226 }
227 if s.ExtraPodConfig != nil {
228 s.ExtraPodConfig(i, pod)
229 }
230 wg.Add(1)
231 go func() {
232 pod, err := backoff.RetryNotifyWithData(func() (*corev1.Pod, error) {
233 ctx, cancel := context.WithTimeout(setupCtx, 5*time.Second)
234 defer cancel()
235 return s.ClientSet.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
236 }, backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), setupCtx), func(err error, d time.Duration) {
237 t.Logf("attempted creating pod %d: %v", i, err)
238 })
239 if err != nil {
240 errChan <- fmt.Errorf("creating pod %d failed: %w", i, err)
241 wg.Done()
242 return
243 }
244 tester.pods[i].name = pod.Name
245 tester.pods[i].namespace = pod.Namespace
246 // Wait for pods to be ready and populate IPs
247 err = backoff.Retry(func() error {
248 podW, err := s.ClientSet.CoreV1().Pods(pod.Namespace).Watch(setupCtx, metav1.SingleObject(pod.ObjectMeta))
249 if err != nil {
250 return err
251 }
252 defer podW.Stop()
253 podUpdates := podW.ResultChan()
254 for event := range podUpdates {
255 if event.Type != watch.Modified && event.Type != watch.Added {
256 continue
257 }
258 pod = event.Object.(*corev1.Pod)
259 if pod.Status.Phase == corev1.PodRunning {
260 for _, ipObj := range pod.Status.PodIPs {
261 ip, err := netip.ParseAddr(ipObj.IP)
262 if err != nil {
263 return backoff.Permanent(fmt.Errorf("while parsing IP address: %w", err))
264 }
265 if ip.Is4() {
266 tester.pods[i].ipv4 = ip
267 } else {
268 tester.pods[i].ipv6 = ip
269 }
270 }
271 return nil
272 }
273 }
274 return fmt.Errorf("pod watcher failed")
275 }, backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), setupCtx))
276 if err != nil {
277 errChan <- fmt.Errorf("waiting for pod %d to be running failed: %w", i, err)
278 wg.Done()
279 return
280 }
281
282 inR, inW := io.Pipe()
283 outR, outW := io.Pipe()
284 tester.pods[i].in = inW
285 tester.pods[i].out = bufio.NewReader(outR)
286
287 req := s.ClientSet.CoreV1().RESTClient().Post().Resource("pods").Namespace(pod.Namespace).Name(pod.Name).SubResource("attach")
288 options := &corev1.PodAttachOptions{
289 Container: "connectivitytester",
290 Stdin: true,
291 Stdout: true,
292 Stderr: true,
293 }
294 req.VersionedParams(options, scheme.ParameterCodec)
295 attachHandle, err := remotecommand.NewWebSocketExecutor(s.RESTConfig, "POST", req.URL().String())
296 if err != nil {
297 panic(err)
298 }
299
300 wg.Done()
301 err = attachHandle.StreamWithContext(testCtx, remotecommand.StreamOptions{
302 Stdin: inR,
303 Stdout: outW,
304 Stderr: os.Stderr,
305 })
306 if err != nil && !errors.Is(err, testCtx.Err()) {
307 t.Logf("Stream for pod %d failed: %v", i, err)
308 }
309 }()
310 }
311 t.Cleanup(func() {
312 tester.cancel()
313 var cleanupWg sync.WaitGroup
314 cleanupCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
315 defer cancel()
316 for i := range tester.pods {
317 p := &tester.pods[i]
318 if p.name == "" {
319 continue
320 }
321 cleanupWg.Add(1)
322 go func() {
323 defer cleanupWg.Done()
324 err := backoff.Retry(func() error {
325 ctx, cancel := context.WithTimeout(cleanupCtx, 5*time.Second)
326 defer cancel()
327 err := s.ClientSet.CoreV1().Pods(p.namespace).Delete(ctx, p.name, metav1.DeleteOptions{})
328 if kerrors.IsNotFound(err) {
329 return nil
330 }
331 return err
332 }, backoff.WithContext(backoff.NewConstantBackOff(1*time.Second), cleanupCtx))
333 if err != nil {
334 t.Logf("Cleanup of pod %d failed: %v", i, err)
335 }
336 }()
337 }
338 cleanupWg.Wait()
339 })
340 wg.Wait()
341 close(errChan)
342 // Process asynchronous errors
343 for err := range errChan {
344 t.Error(err)
345 }
346 return &tester
347}