blob: 22cf569d98032336f275a7c66a6bb86512ce5521 [file] [log] [blame]
Jan Schär17ad63f2025-02-27 14:43:56 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
4// This package tests the network policy controller, using the Cyclonus network
5// policy conformance test suite. It uses the real policy controller and
6// nftables controller, but fake Kubernetes API and pods, which allows the tests
7// to run much faster than would be possible with the real API server and pods.
8//
9// By default, the test runs in a ktest, which means we are testing with the
10// same kernel version, and thus nftables implementation, which is used in
11// Monogon OS. But you can also run tests manually in a new user and network
12// namespace, which allows you to use all the debugging tools available on your
13// machine. Useful commands:
14//
15// bazel build //metropolis/node/kubernetes/networkpolicy:networkpolicy_test
16// Build the test.
17// unshare --map-user=0 --net
18// Create a user and network namespace.
19// tcpdump -i any &
20// Capture traffic in the background.
21// IN_KTEST=true bazel-bin/metropolis/node/kubernetes/networkpolicy/networkpolicy_test_/networkpolicy_test
22// Run the test.
23package networkpolicy_test
24
25import (
26 "crypto/sha256"
27 "errors"
28 "fmt"
29 "net"
30 "os"
31 "os/exec"
32 "runtime"
33 "slices"
34 "strconv"
35 "strings"
36 "syscall"
37 "testing"
38 "time"
39
40 "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl"
41 "github.com/mattfenwick/cyclonus/pkg/connectivity"
42 "github.com/mattfenwick/cyclonus/pkg/connectivity/probe"
43 "github.com/mattfenwick/cyclonus/pkg/generator"
44 "github.com/mattfenwick/cyclonus/pkg/kube"
45 "github.com/mattfenwick/cyclonus/pkg/matcher"
46 "github.com/vishvananda/netlink"
47 "golang.org/x/sys/unix"
48 corev1 "k8s.io/api/core/v1"
49 networkingv1 "k8s.io/api/networking/v1"
50 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
51 kruntime "k8s.io/apimachinery/pkg/runtime"
52 "k8s.io/client-go/tools/cache"
53)
54
55// Change this to 1 to test IPv6 instead of IPv4. Tests pass with IPv6, but run
56// much slower, apparently because some ICMPv6 packets are dropped and need to
57// be retransmitted.
58//
59// TODO: Fix slow IPv6 tests and run them automatically, e.g. as a separate
60// ktest with a flag to select IPv6.
61const testIPIndex = 0
62
63const podIfaceGroup uint32 = 8
64
65// Loopback traffic is only affected by network policies when connecting through
66// a ClusterIP service. For now, we don't simulate service IPs, so this is
67// disabled.
68const ignoreLoopback = true
69
70var serverPorts = []int{80, 81}
71var serverProtocols = []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}
72
73var gatewayIPv4 = net.ParseIP("10.8.0.1").To4()
74var gatewayIPv6 = net.ParseIP("fd08::1")
75
76func netListen(port int, protocol corev1.Protocol, payload string, stop <-chan struct{}) error {
77 switch protocol {
78 case corev1.ProtocolTCP:
79 listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: port})
80 if err != nil {
81 return err
82 }
83 go func() {
84 <-stop
85 listener.Close()
86 }()
87 for {
88 conn, err := listener.Accept()
89 if err != nil {
90 return err
91 }
92 conn.Write([]byte(payload))
93 conn.Close()
94 }
95 case corev1.ProtocolUDP:
96 conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: port})
97 if err != nil {
98 return err
99 }
100 go func() {
101 <-stop
102 conn.Close()
103 }()
104 var buf [16]byte
105 for {
106 _, clientAddr, err := conn.ReadFrom(buf[:])
107 if err != nil {
108 return err
109 }
110 conn.WriteTo([]byte(payload), clientAddr)
111 }
112 default:
113 return fmt.Errorf("unsupported protocol: %q", protocol)
114 }
115}
116
117func netConnect(ip net.IP, port int, protocol corev1.Protocol, expectPayload string) (error, error) {
118 addr := (&net.TCPAddr{IP: ip, Port: port}).String()
119 payload := make([]byte, len(expectPayload))
120 switch protocol {
121 case corev1.ProtocolTCP:
122 conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
123 if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) {
124 return err, nil
125 } else if err != nil {
126 return nil, err
127 }
128 defer conn.Close()
129 _, err = conn.Read(payload)
130 if err != nil {
131 return nil, err
132 }
133 case corev1.ProtocolUDP:
134 conn, err := net.Dial("udp", addr)
135 if err != nil {
136 return nil, err
137 }
138 defer conn.Close()
139 err = conn.SetDeadline(time.Now().Add(2 * time.Second))
140 if err != nil {
141 return nil, err
142 }
143 _, err = conn.Write([]byte("hello"))
144 if err != nil {
145 return nil, err
146 }
147 _, err = conn.Read(payload)
148 if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) {
149 return err, nil
150 } else if err != nil {
151 return nil, err
152 }
153 default:
154 return nil, fmt.Errorf("unsupported protocol: %q", protocol)
155 }
156 if string(payload) != expectPayload {
157 return nil, fmt.Errorf("wrong payload, expected %q, got %q", expectPayload, payload)
158 }
159 return nil, nil
160}
161
162type fakePod struct {
163 name string
164 ips []podIP
165 netns *os.File
166 rawNetns syscall.RawConn
167 stopCh chan struct{}
168}
169
170type podIP struct {
171 ip net.IP
172 gatewayIP net.IP
173 zeroIP net.IP
174 fullMask net.IPMask
175 zeroMask net.IPMask
176}
177
178func createFakePod(namespace, name string, ips []net.IP) (*fakePod, error) {
179 p := &fakePod{
180 name: fmt.Sprintf("%s/%s", namespace, name),
181 ips: make([]podIP, len(ips)),
182 stopCh: make(chan struct{}),
183 }
184 for i, ip := range ips {
185 p := &p.ips[i]
186 switch {
187 case ip.To4() != nil:
188 p.ip = ip.To4()
189 p.gatewayIP = gatewayIPv4
190 p.zeroIP = net.ParseIP("0.0.0.0").To4()
191 p.fullMask = net.CIDRMask(32, 32)
192 p.zeroMask = net.CIDRMask(0, 32)
193 case ip.To16() != nil:
194 p.ip = ip.To16()
195 p.gatewayIP = gatewayIPv6
196 p.zeroIP = net.ParseIP("::")
197 p.fullMask = net.CIDRMask(128, 128)
198 p.zeroMask = net.CIDRMask(0, 128)
199 default:
200 return nil, fmt.Errorf("invalid IP: %v", ip)
201 }
202 }
203
204 // Create new network namespace.
205 runLockedThread(func() {
206 err := unix.Unshare(unix.CLONE_NEWNET)
207 if err != nil {
208 panic(err)
209 }
210 // Obtain an FD of the new net namespace.
211 p.netns, err = os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid()))
212 if err != nil {
213 panic(err)
214 }
215 })
216
217 var err error
218 p.rawNetns, err = p.netns.SyscallConn()
219 if err != nil {
220 panic(err)
221 }
222
223 // Create veth pair.
224 linkAttrs := netlink.NewLinkAttrs()
225 linkAttrs.Name = fmt.Sprintf("veth_%s_%s", namespace, name)
226 if len(linkAttrs.Name) > 15 {
227 hash := sha256.Sum256([]byte(linkAttrs.Name))
228 linkAttrs.Name = fmt.Sprintf("veth_%x", hash[:5])
229 }
230 linkAttrs.Group = podIfaceGroup
231 linkAttrs.Flags = net.FlagUp
232 p.rawNetns.Control(func(fd uintptr) {
233 err = netlink.LinkAdd(&netlink.Veth{LinkAttrs: linkAttrs, PeerName: "veth", PeerNamespace: netlink.NsFd(fd)})
234 })
235 if err != nil {
236 return nil, err
237 }
238
239 veth, err := netlink.LinkByName(linkAttrs.Name)
240 if err != nil {
241 return nil, err
242 }
243
244 for _, ip := range p.ips {
245 // Add gateway address to link.
246 addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD}
247 err = netlink.AddrAdd(veth, addr)
248 if err != nil {
249 return nil, err
250 }
251
252 // Add route.
253 err = netlink.RouteAdd(&netlink.Route{
254 LinkIndex: veth.Attrs().Index,
255 Scope: netlink.SCOPE_HOST,
256 Dst: &net.IPNet{IP: ip.ip, Mask: ip.fullMask},
257 })
258 if err != nil {
259 return nil, err
260 }
261 }
262
263 p.runInNetns(func() {
264 // Enable loopback traffic in the pod.
265 loopback, err := netlink.LinkByName("lo")
266 if err != nil {
267 panic(err)
268 }
269 err = netlink.LinkSetUp(loopback)
270 if err != nil {
271 panic(err)
272 }
273
274 // Set up the veth in the pod namespace.
275 veth, err := netlink.LinkByName("veth")
276 if err != nil {
277 panic(err)
278 }
279 err = netlink.LinkSetUp(veth)
280 if err != nil {
281 panic(err)
282 }
283
284 for _, ip := range p.ips {
285 // IFA_F_NODAD disables duplicate address detection for IPv6.
286 addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.ip, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD}
287 err = netlink.AddrAdd(veth, addr)
288 if err != nil {
289 panic(err)
290 }
291
292 err = netlink.RouteAdd(&netlink.Route{
293 LinkIndex: veth.Attrs().Index,
294 Scope: netlink.SCOPE_LINK,
295 Dst: &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask},
296 Src: ip.ip,
297 })
298 if err != nil {
299 panic(err)
300 }
301 err = netlink.RouteAdd(&netlink.Route{
302 LinkIndex: veth.Attrs().Index,
303 Scope: netlink.SCOPE_UNIVERSE,
304 Dst: &net.IPNet{IP: ip.zeroIP, Mask: ip.zeroMask},
305 Gw: ip.gatewayIP,
306 })
307 if err != nil {
308 panic(err)
309 }
310 }
311 })
312
313 for _, protocol := range serverProtocols {
314 for _, port := range serverPorts {
315 go p.runInNetns(func() {
316 err := netListen(port, protocol, p.name, p.stopCh)
317 select {
318 case <-p.stopCh:
319 default:
320 panic(err)
321 }
322 })
323 }
324 }
325
326 return p, nil
327}
328
329func (p *fakePod) stop() {
330 close(p.stopCh)
331 p.netns.Close()
332}
333
334func (p *fakePod) runInNetns(f func()) {
335 runLockedThread(func() {
336 p.rawNetns.Control(func(fd uintptr) {
337 err := unix.Setns(int(fd), unix.CLONE_NEWNET)
338 if err != nil {
339 panic(err)
340 }
341 })
342 f()
343 })
344}
345
346func (p *fakePod) connect(target *fakePod, port int, protocol corev1.Protocol) (error, error) {
347 var errConnectivity, err error
348 p.runInNetns(func() {
349 errConnectivity, err = netConnect(target.ips[testIPIndex].ip, port, protocol, target.name)
350 })
351 return errConnectivity, err
352}
353
354// runLockedThread runs f locked on an OS thread, which allows f to switch to a
355// different network namespace. After f returns, the previous network namespace
356// is restored, and if that succeeds, the thread is unlocked, allowing the
357// thread to be reused for other goroutines.
358func runLockedThread(f func()) {
359 runtime.LockOSThread()
360 oldNs, err := os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid()))
361 if err != nil {
362 panic(err)
363 }
364 defer oldNs.Close()
365 f()
366 err = unix.Setns(int(oldNs.Fd()), unix.CLONE_NEWNET)
367 if err == nil {
368 runtime.UnlockOSThread()
369 }
370}
371
372type fakeKubernetes struct {
373 *kube.MockKubernetes
374 nft *nftctrl.Controller
375 fakePods map[string]*fakePod
376 nextPodIP uint64
377}
378
379func (k *fakeKubernetes) CreateNamespace(kubeNamespace *corev1.Namespace) (*corev1.Namespace, error) {
380 kubeNamespace, err := k.MockKubernetes.CreateNamespace(kubeNamespace)
381 if err == nil && k.nft != nil {
382 k.nft.SetNamespace(kubeNamespace.Name, kubeNamespace)
383 if err := k.nft.Flush(); err != nil {
384 return nil, err
385 }
386 }
387 return kubeNamespace, err
388}
389
390func (k *fakeKubernetes) SetNamespaceLabels(namespace string, labels map[string]string) (*corev1.Namespace, error) {
391 kubeNamespace, err := k.MockKubernetes.SetNamespaceLabels(namespace, labels)
392 if err == nil && k.nft != nil {
393 k.nft.SetNamespace(namespace, kubeNamespace)
394 if err := k.nft.Flush(); err != nil {
395 return nil, err
396 }
397 }
398 return kubeNamespace, err
399}
400
401func (k *fakeKubernetes) DeleteNamespace(namespace string) error {
402 err := k.MockKubernetes.DeleteNamespace(namespace)
403 if err == nil && k.nft != nil {
404 k.nft.SetNamespace(namespace, nil)
405 if err := k.nft.Flush(); err != nil {
406 return err
407 }
408 }
409 return err
410}
411
412func (k *fakeKubernetes) CreateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
413 kubePolicy, err := k.MockKubernetes.CreateNetworkPolicy(kubePolicy)
414 if err == nil && k.nft != nil {
415 k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy)
416 if err := k.nft.Flush(); err != nil {
417 return nil, err
418 }
419 }
420 return kubePolicy, err
421}
422
423func (k *fakeKubernetes) UpdateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
424 kubePolicy, err := k.MockKubernetes.UpdateNetworkPolicy(kubePolicy)
425 if err == nil && k.nft != nil {
426 k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy)
427 if err := k.nft.Flush(); err != nil {
428 return nil, err
429 }
430 }
431 return kubePolicy, err
432}
433
434func (k *fakeKubernetes) DeleteNetworkPolicy(namespace string, name string) error {
435 err := k.MockKubernetes.DeleteNetworkPolicy(namespace, name)
436 if err == nil && k.nft != nil {
437 k.nft.SetNetworkPolicy(cache.NewObjectName(namespace, name), nil)
438 if err := k.nft.Flush(); err != nil {
439 return err
440 }
441 }
442 return err
443}
444
445func (k *fakeKubernetes) DeleteAllNetworkPoliciesInNamespace(namespace string) error {
446 policies, err := k.GetNetworkPoliciesInNamespace(namespace)
447 if err == nil && k.nft != nil {
448 for _, kubePolicy := range policies {
449 k.nft.SetNetworkPolicy(cache.MetaObjectToName(&kubePolicy), nil)
450 if err := k.nft.Flush(); err != nil {
451 return err
452 }
453 }
454 }
455 return k.MockKubernetes.DeleteAllNetworkPoliciesInNamespace(namespace)
456}
457
458func (k *fakeKubernetes) CreatePod(kubePod *corev1.Pod) (*corev1.Pod, error) {
459 kubePod, err := k.MockKubernetes.CreatePod(kubePod)
460 objectName := cache.MetaObjectToName(kubePod)
461 if err == nil {
462 ipSuffix := []byte{uint8(k.nextPodIP >> 8), uint8(k.nextPodIP)}
463 podIPv4 := slices.Concat(gatewayIPv4[:2], ipSuffix)
464 podIPv6 := slices.Concat(gatewayIPv6[:14], ipSuffix)
465 k.nextPodIP++
466 kubePod.Status.PodIPs = []corev1.PodIP{{IP: podIPv4.String()}, {IP: podIPv6.String()}}
467 kubePod.Status.PodIP = kubePod.Status.PodIPs[testIPIndex].IP
468 ips := []net.IP{podIPv4, podIPv6}
469 fakePod, err := createFakePod(kubePod.Namespace, kubePod.Name, ips)
470 if err != nil {
471 return nil, fmt.Errorf("failed to create fake pod: %w", err)
472 }
473 k.fakePods[objectName.String()] = fakePod
474 }
475 if err == nil && k.nft != nil {
476 k.nft.SetPod(objectName, kubePod)
477 if err := k.nft.Flush(); err != nil {
478 return nil, err
479 }
480 }
481 return kubePod, err
482}
483
484func (k *fakeKubernetes) DeletePod(namespace string, pod string) error {
485 err := k.MockKubernetes.DeletePod(namespace, pod)
486 objectName := cache.NewObjectName(namespace, pod)
487 if err == nil {
488 k.fakePods[objectName.String()].stop()
489 delete(k.fakePods, objectName.String())
490 }
491 if err == nil && k.nft != nil {
492 k.nft.SetPod(objectName, nil)
493 if err := k.nft.Flush(); err != nil {
494 return err
495 }
496 }
497 return err
498}
499
500func (k *fakeKubernetes) SetPodLabels(namespace string, pod string, labels map[string]string) (*corev1.Pod, error) {
501 kubePod, err := k.MockKubernetes.SetPodLabels(namespace, pod, labels)
502 if err == nil && k.nft != nil {
503 k.nft.SetPod(cache.MetaObjectToName(kubePod), kubePod)
504 if err := k.nft.Flush(); err != nil {
505 return nil, err
506 }
507 }
508 return kubePod, err
509}
510
511var protocolMap = map[string]corev1.Protocol{
512 "tcp": corev1.ProtocolTCP,
513 "udp": corev1.ProtocolUDP,
514 "sctp": corev1.ProtocolSCTP,
515}
516
517func (k *fakeKubernetes) ExecuteRemoteCommand(namespace string, pod string, container string, command []string) (string, string, error, error) {
518 // command is expected to have this format:
519 // /agnhost connect s-x-a.x.svc.cluster.local:80 --timeout=1s --protocol=tcp
520 if len(command) != 5 {
521 return "", "", nil, fmt.Errorf("unexpected command length: %v", command)
522 }
523 targetService, targetPortStr, ok := strings.Cut(command[2], ".svc.cluster.local:")
524 if !ok {
525 return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2])
526 }
527 targetService, targetNamespace, ok := strings.Cut(targetService, ".")
528 if !ok {
529 return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2])
530 }
531 targetPod := strings.TrimPrefix(targetService, fmt.Sprintf("s-%s-", targetNamespace))
532 protocol := strings.TrimPrefix(command[4], "--protocol=")
533
534 sourceFakePod := k.fakePods[cache.NewObjectName(namespace, pod).String()]
535 targetFakePod := k.fakePods[cache.NewObjectName(targetNamespace, targetPod).String()]
536
537 targetPort, err := strconv.Atoi(targetPortStr)
538 if err != nil {
539 return "", "", nil, err
540 }
541 kubeProtocol, ok := protocolMap[protocol]
542 if !ok {
543 return "", "", nil, fmt.Errorf("invalid protocol: %q", protocol)
544 }
545
546 connectErr, err := sourceFakePod.connect(targetFakePod, targetPort, kubeProtocol)
547 return "", "", connectErr, err
548}
549
550func (k *fakeKubernetes) initializeNft() error {
551 namespaces, err := k.GetAllNamespaces()
552 if err != nil {
553 return err
554 }
555 for _, kubeNamespace := range namespaces.Items {
556 k.nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
557 pods, err := k.GetPodsInNamespace(kubeNamespace.Name)
558 if err != nil {
559 return err
560 }
561 for _, kubePod := range pods {
562 k.nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
563 }
564 policies, err := k.GetNetworkPoliciesInNamespace(kubeNamespace.Name)
565 if err != nil {
566 return err
567 }
568 if len(policies) != 0 {
569 return fmt.Errorf("expected no initial policies, but found %s", cache.MetaObjectToName(&policies[0]).String())
570 }
571 }
572 if err := k.nft.Flush(); err != nil {
573 return fmt.Errorf("initial flush failed: %w", err)
574 }
575 return nil
576}
577
578type testRecorder struct {
579 t *testing.T
580}
581
582func (t *testRecorder) Event(object kruntime.Object, eventtype, reason, message string) {
583 if eventtype == corev1.EventTypeWarning {
584 t.t.Errorf("Warning event: object %v, %s: %s", object, reason, message)
585 } else {
586 t.t.Logf("%s event: object %v, %s: %s", eventtype, object, reason, message)
587 }
588}
589
590func (t *testRecorder) Eventf(object kruntime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
591 t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
592}
593
594func (t *testRecorder) AnnotatedEventf(object kruntime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
595 t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
596}
597
598const maxNamespaceName = "ns-3456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
599const maxPodName = "pod-456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
600const maxPolicyName = "policy-789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
601
602func setPolicyName(name string) generator.Setter {
603 return func(policy *generator.Netpol) {
604 policy.Name = name
605 }
606}
607
608func extraTestCases() []*generator.TestCase {
609 return []*generator.TestCase{
610 {
611 Description: "Update namespace so that additional peer selector matches",
612 Tags: generator.NewStringSet(generator.TagSetNamespaceLabels),
613 Steps: []*generator.TestStep{
614 generator.NewTestStep(generator.ProbeAllAvailable,
615 generator.CreatePolicy(generator.BuildPolicy(generator.SetPeers(true, []networkingv1.NetworkPolicyPeer{
616 {NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"ns": "y"}}},
617 {NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"new-ns": "qrs"}}},
618 })).NetworkPolicy())),
619 generator.NewTestStep(generator.ProbeAllAvailable,
620 generator.SetNamespaceLabels("y", map[string]string{"ns": "y", "new-ns": "qrs"})),
621 generator.NewTestStep(generator.ProbeAllAvailable,
622 generator.SetNamespaceLabels("y", map[string]string{"ns": "y"})),
623 },
624 },
625 {
626 Description: "Delete one of multiple policies",
627 Tags: generator.NewStringSet(generator.TagDeletePolicy),
628 Steps: []*generator.TestStep{
629 generator.NewTestStep(generator.ProbeAllAvailable,
630 generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy1")).NetworkPolicy()),
631 generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy2"), generator.SetPorts(true, nil)).NetworkPolicy()),
632 generator.DeletePolicy("x", "policy1"),
633 ),
634 },
635 },
636 {
637 Description: "Create namespace, pod and policy with maximum name length",
638 Tags: generator.NewStringSet(generator.TagCreateNamespace),
639 Steps: []*generator.TestStep{
640 generator.NewTestStep(generator.ProbeAllAvailable,
641 generator.CreateNamespace(maxNamespaceName, nil),
642 generator.CreatePod(maxNamespaceName, maxPodName, nil),
643 generator.CreatePolicy(generator.BuildPolicy(
644 generator.SetNamespace(maxNamespaceName),
645 setPolicyName(maxPolicyName),
646 generator.SetPodSelector(metav1.LabelSelector{})).NetworkPolicy())),
647 generator.NewTestStep(generator.ProbeAllAvailable,
648 generator.DeleteNamespace(maxNamespaceName)),
649 },
650 },
651 }
652}
653
654type initTestCase struct {
655 description string
656 init func(*connectivity.TestCaseState, *nftctrl.Controller) error
657}
658
659func initializationTestCases() []initTestCase {
660 return []initTestCase{
661 {
662 description: "Delete rule during initialization",
663 // It is possible that we already receive updates/deletes during
664 // initialization, if one of the resource types has not yet finished
665 // loading. This means that we need to be able to handle such updates
666 // before the first Flush.
667 init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error {
668 namespaces, err := t.Kubernetes.GetAllNamespaces()
669 if err != nil {
670 return err
671 }
672 for _, kubeNamespace := range namespaces.Items {
673 nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
674 pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name)
675 if err != nil {
676 return err
677 }
678 for _, kubePod := range pods {
679 nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
680 }
681 }
682
683 policy1 := generator.BuildPolicy().NetworkPolicy()
684 policy1.Name = "policy1"
685 nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), policy1)
686
687 policy2 := generator.BuildPolicy(generator.SetPorts(true, nil)).NetworkPolicy()
688 policy2.Name = "policy2"
689 if err := t.CreatePolicy(policy2); err != nil {
690 return fmt.Errorf("failed to create policy: %w", err)
691 }
692 nft.SetNetworkPolicy(cache.MetaObjectToName(policy2), policy2)
693
694 // Delete policy 1.
695 nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), nil)
696
697 return nil
698 },
699 },
700 {
701 description: "Initialize namespaces last",
702 init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error {
703 policy := generator.BuildPolicy().NetworkPolicy()
704 policy.Spec.Ingress[0].From[0].NamespaceSelector = nil
705 if err := t.CreatePolicy(policy); err != nil {
706 return fmt.Errorf("failed to create policy: %w", err)
707 }
708
709 namespaces, err := t.Kubernetes.GetAllNamespaces()
710 if err != nil {
711 return err
712 }
713 for _, kubeNamespace := range namespaces.Items {
714 pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name)
715 if err != nil {
716 return err
717 }
718 for _, kubePod := range pods {
719 nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
720 }
721 policies, err := t.Kubernetes.GetNetworkPoliciesInNamespace(kubeNamespace.Name)
722 if err != nil {
723 return err
724 }
725 for _, policy := range policies {
726 nft.SetNetworkPolicy(cache.MetaObjectToName(&policy), &policy)
727 }
728 }
729 for _, kubeNamespace := range namespaces.Items {
730 nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
731 }
732 return nil
733 },
734 },
735 }
736}
737
738func TestCyclonus(t *testing.T) {
739 if os.Getenv("IN_KTEST") != "true" {
740 t.Skip("Not in ktest")
741 }
742
743 if err := os.WriteFile("/proc/sys/net/ipv4/ip_forward", []byte("1\n"), 0644); err != nil {
744 t.Fatalf("Failed to enable IPv4 forwarding: %v", err)
745 }
746 if err := os.WriteFile("/proc/sys/net/ipv6/conf/all/forwarding", []byte("1\n"), 0644); err != nil {
747 t.Fatalf("Failed to enable IPv6 forwarding: %v", err)
748 }
749 // By default, linux rate limits ICMP replies, which slows down our tests.
750 if err := os.WriteFile("/proc/sys/net/ipv4/icmp_ratemask", []byte("0\n"), 0644); err != nil {
751 t.Fatalf("Failed to disable ICMPv4 rate limiting: %v", err)
752 }
753 if err := os.WriteFile("/proc/sys/net/ipv6/icmp/ratemask", []byte("\n"), 0644); err != nil {
754 t.Fatalf("Failed to disable ICMPv6 rate limiting: %v", err)
755 }
756 // Disable IPv6 duplicate address detection, which delays IPv6 connectivity
757 // becoming available.
758 if err := os.WriteFile("/proc/sys/net/ipv6/conf/default/accept_dad", []byte("0\n"), 0644); err != nil {
759 t.Fatalf("Failed to disable IPv6 DAD: %v", err)
760 }
761
762 kubernetes := &fakeKubernetes{
763 MockKubernetes: kube.NewMockKubernetes(1.0),
764 fakePods: make(map[string]*fakePod),
765 nextPodIP: 2,
766 }
767
768 allowDNS := false // This creates policies which allow port 53, we don't need this.
769 serverNamespaces := []string{"x", "y", "z"}
770 serverPods := []string{"a", "b", "c"}
771 externalIPs := []string{}
772 podCreationTimeoutSeconds := 1
773 batchJobs := false
774 imageRegistry := "registry.k8s.io"
775
776 resources, err := probe.NewDefaultResources(kubernetes, serverNamespaces, serverPods, serverPorts, serverProtocols, externalIPs, podCreationTimeoutSeconds, batchJobs, imageRegistry)
777 if err != nil {
778 t.Fatal(err)
779 }
780
781 interpreterConfig := &connectivity.InterpreterConfig{
782 ResetClusterBeforeTestCase: false,
783 KubeProbeRetries: 1,
784 PerturbationWaitSeconds: 0,
785 VerifyClusterStateBeforeTestCase: true,
786 BatchJobs: batchJobs,
787 IgnoreLoopback: ignoreLoopback,
788 JobTimeoutSeconds: 1,
789 FailFast: false,
790 }
791 interpreter := connectivity.NewInterpreter(kubernetes, resources, interpreterConfig)
792 printer := &connectivity.Printer{
793 Noisy: false,
794 IgnoreLoopback: ignoreLoopback,
795 JunitResultsFile: "",
796 }
797
798 zcPod, err := resources.GetPod("z", "c")
799 if err != nil {
800 t.Fatal(err)
801 }
802 testCaseGenerator := generator.NewTestCaseGenerator(allowDNS, zcPod.IP, serverNamespaces, nil, nil)
803 testCases := slices.Concat(testCaseGenerator.GenerateTestCases(), extraTestCases())
804
805 for _, testCase := range testCases {
806 for _, step := range testCase.Steps {
807 step.Probe.Mode = generator.ProbeModeServiceName
808 }
809 t.Run(testCase.Description, func(t *testing.T) {
810 recorder := &testRecorder{t: t}
811 nft, err := nftctrl.New(recorder, podIfaceGroup)
812 if err != nil {
813 t.Errorf("Failed to create nftctrl: %v", err)
814 return
815 }
816 defer nft.Close()
817 kubernetes.nft = nft
818
819 if err := kubernetes.initializeNft(); err != nil {
820 t.Errorf("nftctrl initialization failed: %v", err)
821 return
822 }
823
824 result := interpreter.ExecuteTestCase(testCase)
825 if result.Err != nil {
826 t.Error(result.Err)
827 return
828 }
829 if !result.Passed(ignoreLoopback) {
830 printer.PrintTestCaseResult(result)
831
832 cmd := exec.Command("nft", "list", "ruleset")
833 cmd.Stdout = os.Stdout
834 cmd.Stderr = os.Stderr
835 cmd.Run()
836
837 t.Error("connectivity test failed")
838 }
839
840 kubernetes.nft = nil
841 testCaseState := &connectivity.TestCaseState{
842 Kubernetes: kubernetes,
843 Resources: resources,
844 }
845 err = testCaseState.ResetClusterState()
846 if err != nil {
847 t.Errorf("failed to reset cluster: %v", err)
848 }
849
850 // Flush the conntrack table. Otherwise, UDP connectivity tests can
851 // spuriously succeed when they should be blocked, because they match an
852 // entry in the conntrack table from a previous test.
853 err = netlink.ConntrackTableFlush(netlink.ConntrackTable)
854 if err != nil {
855 t.Errorf("failed to flush conntrack table: %v", err)
856 }
857 })
858 }
859
860 initTestCases := initializationTestCases()
861 for _, testCase := range initTestCases {
862 t.Run(testCase.description, func(t *testing.T) {
863 recorder := &testRecorder{t: t}
864 nft, err := nftctrl.New(recorder, podIfaceGroup)
865 if err != nil {
866 t.Errorf("Failed to create nftctrl: %v", err)
867 return
868 }
869 defer nft.Close()
870
871 testCaseState := &connectivity.TestCaseState{
872 Kubernetes: kubernetes,
873 Resources: resources,
874 }
875
876 if err := testCase.init(testCaseState, nft); err != nil {
877 t.Errorf("initialization failed: %v", err)
878 return
879 }
880
881 if err := nft.Flush(); err != nil {
882 t.Errorf("flush failed: %v", err)
883 return
884 }
885
886 parsedPolicy := matcher.BuildNetworkPolicies(true, testCaseState.Policies)
887 jobBuilder := &probe.JobBuilder{TimeoutSeconds: 1}
888 simRunner := probe.NewSimulatedRunner(parsedPolicy, jobBuilder)
889 probeConfig := generator.NewAllAvailable(generator.ProbeModeServiceName)
890 stepResult := connectivity.NewStepResult(
891 simRunner.RunProbeForConfig(probeConfig, testCaseState.Resources),
892 parsedPolicy,
893 slices.Clone(testCaseState.Policies))
894 kubeRunner := probe.NewKubeRunner(kubernetes, 15, jobBuilder)
895 for i := 0; i <= interpreterConfig.KubeProbeRetries; i++ {
896 stepResult.AddKubeProbe(kubeRunner.RunProbeForConfig(probeConfig, testCaseState.Resources))
897 if stepResult.Passed(ignoreLoopback) {
898 break
899 }
900 }
901
902 if !stepResult.Passed(ignoreLoopback) {
903 printer.PrintStep(0, &generator.TestStep{Probe: probeConfig}, stepResult)
904
905 cmd := exec.Command("nft", "list", "ruleset")
906 cmd.Stdout = os.Stdout
907 cmd.Stderr = os.Stderr
908 cmd.Run()
909
910 t.Error("connectivity test failed")
911 }
912
913 err = testCaseState.ResetClusterState()
914 if err != nil {
915 t.Errorf("failed to reset cluster: %v", err)
916 }
917
918 // Flush the conntrack table. Otherwise, UDP connectivity tests can
919 // spuriously succeed when they should be blocked, because they match an
920 // entry in the conntrack table from a previous test.
921 err = netlink.ConntrackTableFlush(netlink.ConntrackTable)
922 if err != nil {
923 t.Errorf("failed to flush conntrack table: %v", err)
924 }
925 })
926 }
927}