m/n/kubernetes/networkpolicy: add Cyclonus test suite
This adds a test for the network policy controller, based on the
Cyclonus test suite. Running Cyclonus on a real cluster takes multiple
hours, as there are over 200 test cases, each of which takes around 1
minute. The test implemented here uses a fake Kubernetes API and pods,
which allows running all tests in around 15 seconds.
IPv6 is partially implemented but disabled. The tests pass, but each
test takes around 2 seconds, because some ICMPv6 replies for blocked TCP
connections seem to get lost somewhere and are only processed when the
TCP SYN is resent one second later.
Change-Id: Id77f2dd4d884b6d156e238e07e88c222e3bbe9a2
Reviewed-on: https://review.monogon.dev/c/monogon/+/3905
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/build/bazel/go.MODULE.bazel b/build/bazel/go.MODULE.bazel
index a723144..5afba0f 100644
--- a/build/bazel/go.MODULE.bazel
+++ b/build/bazel/go.MODULE.bazel
@@ -41,6 +41,7 @@
"com_github_kballard_go_shellquote",
"com_github_klauspost_compress",
"com_github_lib_pq",
+ "com_github_mattfenwick_cyclonus",
"com_github_mattn_go_shellwords",
"com_github_mdlayher_arp",
"com_github_mdlayher_ethernet",
diff --git a/go.mod b/go.mod
index b02c7b3..aba53ba 100644
--- a/go.mod
+++ b/go.mod
@@ -90,6 +90,7 @@
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/klauspost/compress v1.17.11
github.com/lib/pq v1.10.9
+ github.com/mattfenwick/cyclonus v0.5.6
github.com/mattn/go-shellwords v1.0.12
github.com/mdlayher/arp v0.0.0-20220512170110-6706a2966875
github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118
@@ -253,7 +254,9 @@
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
+ github.com/go-resty/resty/v2 v2.7.0 // indirect
github.com/go-sql-driver/mysql v1.7.1 // indirect
+ github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
@@ -265,6 +268,7 @@
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-dap v0.12.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
+ github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/subcommands v1.0.2-0.20190508160503-636abe8753b8 // indirect
@@ -298,6 +302,7 @@
github.com/jpillora/backoff v1.0.0 // indirect
github.com/jsimonetti/rtnetlink v1.4.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
+ github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/karrick/godirwalk v1.17.0 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/kr/pty v1.1.8 // indirect
@@ -308,6 +313,7 @@
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/lufia/iostat v1.2.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
+ github.com/mattfenwick/collections v0.3.2 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
@@ -338,6 +344,9 @@
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
+ github.com/olekukonko/tablewriter v0.0.5 // indirect
+ github.com/onsi/ginkgo/v2 v2.22.0 // indirect
+ github.com/onsi/gomega v1.36.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/opencontainers/runtime-tools v0.9.1-0.20221107090550-2e043c6bd626 // indirect
diff --git a/go.sum b/go.sum
index b113d29..b41631a 100644
--- a/go.sum
+++ b/go.sum
@@ -2150,6 +2150,8 @@
github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow=
github.com/go-redis/redis v6.15.8+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
+github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
+github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
@@ -2158,7 +2160,6 @@
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
-github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
@@ -2690,6 +2691,7 @@
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
@@ -2796,6 +2798,10 @@
github.com/markbates/pkger v0.15.1/go.mod h1:0JoVlrol20BSywW79rN3kdFFsE5xYM+rSCQDXbLhiuI=
github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho=
+github.com/mattfenwick/collections v0.3.2 h1:sdEBe7/cL+dK7CZ0SXod07qzCpoGJu2xtOywAF155Us=
+github.com/mattfenwick/collections v0.3.2/go.mod h1:S2z3MxcMsoBiP8Zw4ewmFvQcfq7QfesA7V192udZ0W8=
+github.com/mattfenwick/cyclonus v0.5.6 h1:8bZ+v2Kcn4H7Z+OhX1l3BQrAHqUdmBwdBBiDIk3Wj7s=
+github.com/mattfenwick/cyclonus v0.5.6/go.mod h1:xGvx12xAzlAOEfqih/19zqOquWD477qXyGQvQz6f3MU=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
@@ -3005,6 +3011,7 @@
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.2/go.mod h1:rSAaSIOAGT9odnlyGlUfAJaoc5w2fSBUmeGDbRWPxyQ=
github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
+github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@@ -3015,7 +3022,6 @@
github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1lskyM0=
-github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU=
@@ -3955,6 +3961,7 @@
golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211111083644-e5c967477495/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
diff --git a/metropolis/node/kubernetes/networkpolicy/BUILD.bazel b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
index d3d3b76..287427e 100644
--- a/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
+++ b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
@@ -1,4 +1,5 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+load("//osbase/test/ktest:ktest.bzl", "k_test")
go_library(
name = "networkpolicy",
@@ -21,3 +22,28 @@
"@org_dolansoft_git_dolansoft_k8s_nft_npc//nftctrl",
],
)
+
+go_test(
+ name = "networkpolicy_test",
+ srcs = ["networkpolicy_test.go"],
+ deps = [
+ "@com_github_mattfenwick_cyclonus//pkg/connectivity",
+ "@com_github_mattfenwick_cyclonus//pkg/connectivity/probe",
+ "@com_github_mattfenwick_cyclonus//pkg/generator",
+ "@com_github_mattfenwick_cyclonus//pkg/kube",
+ "@com_github_mattfenwick_cyclonus//pkg/matcher",
+ "@com_github_vishvananda_netlink//:netlink",
+ "@io_k8s_api//core/v1:core",
+ "@io_k8s_api//networking/v1:networking",
+ "@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
+ "@io_k8s_apimachinery//pkg/runtime",
+ "@io_k8s_client_go//tools/cache",
+ "@org_dolansoft_git_dolansoft_k8s_nft_npc//nftctrl",
+ "@org_golang_x_sys//unix",
+ ],
+)
+
+k_test(
+ name = "ktest",
+ tester = ":networkpolicy_test",
+)
diff --git a/metropolis/node/kubernetes/networkpolicy/networkpolicy_test.go b/metropolis/node/kubernetes/networkpolicy/networkpolicy_test.go
new file mode 100644
index 0000000..22cf569
--- /dev/null
+++ b/metropolis/node/kubernetes/networkpolicy/networkpolicy_test.go
@@ -0,0 +1,927 @@
+// Copyright The Monogon Project Authors.
+// SPDX-License-Identifier: Apache-2.0
+
+// This package tests the network policy controller, using the Cyclonus network
+// policy conformance test suite. It uses the real policy controller and
+// nftables controller, but fake Kubernetes API and pods, which allows the tests
+// to run much faster than would be possible with the real API server and pods.
+//
+// By default, the test runs in a ktest, which means we are testing with the
+// same kernel version, and thus nftables implementation, which is used in
+// Monogon OS. But you can also run tests manually in a new user and network
+// namespace, which allows you to use all the debugging tools available on your
+// machine. Useful commands:
+//
+// bazel build //metropolis/node/kubernetes/networkpolicy:networkpolicy_test
+// Build the test.
+// unshare --map-user=0 --net
+// Create a user and network namespace.
+// tcpdump -i any &
+// Capture traffic in the background.
+// IN_KTEST=true bazel-bin/metropolis/node/kubernetes/networkpolicy/networkpolicy_test_/networkpolicy_test
+// Run the test.
+package networkpolicy_test
+
+import (
+ "crypto/sha256"
+ "errors"
+ "fmt"
+ "net"
+ "os"
+ "os/exec"
+ "runtime"
+ "slices"
+ "strconv"
+ "strings"
+ "syscall"
+ "testing"
+ "time"
+
+ "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl"
+ "github.com/mattfenwick/cyclonus/pkg/connectivity"
+ "github.com/mattfenwick/cyclonus/pkg/connectivity/probe"
+ "github.com/mattfenwick/cyclonus/pkg/generator"
+ "github.com/mattfenwick/cyclonus/pkg/kube"
+ "github.com/mattfenwick/cyclonus/pkg/matcher"
+ "github.com/vishvananda/netlink"
+ "golang.org/x/sys/unix"
+ corev1 "k8s.io/api/core/v1"
+ networkingv1 "k8s.io/api/networking/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ kruntime "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/tools/cache"
+)
+
+// Change this to 1 to test IPv6 instead of IPv4. Tests pass with IPv6, but run
+// much slower, apparently because some ICMPv6 packets are dropped and need to
+// be retransmitted.
+//
+// TODO: Fix slow IPv6 tests and run them automatically, e.g. as a separate
+// ktest with a flag to select IPv6.
+const testIPIndex = 0
+
+const podIfaceGroup uint32 = 8
+
+// Loopback traffic is only affected by network policies when connecting through
+// a ClusterIP service. For now, we don't simulate service IPs, so this is
+// disabled.
+const ignoreLoopback = true
+
+var serverPorts = []int{80, 81}
+var serverProtocols = []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}
+
+var gatewayIPv4 = net.ParseIP("10.8.0.1").To4()
+var gatewayIPv6 = net.ParseIP("fd08::1")
+
+func netListen(port int, protocol corev1.Protocol, payload string, stop <-chan struct{}) error {
+ switch protocol {
+ case corev1.ProtocolTCP:
+ listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: port})
+ if err != nil {
+ return err
+ }
+ go func() {
+ <-stop
+ listener.Close()
+ }()
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ return err
+ }
+ conn.Write([]byte(payload))
+ conn.Close()
+ }
+ case corev1.ProtocolUDP:
+ conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: port})
+ if err != nil {
+ return err
+ }
+ go func() {
+ <-stop
+ conn.Close()
+ }()
+ var buf [16]byte
+ for {
+ _, clientAddr, err := conn.ReadFrom(buf[:])
+ if err != nil {
+ return err
+ }
+ conn.WriteTo([]byte(payload), clientAddr)
+ }
+ default:
+ return fmt.Errorf("unsupported protocol: %q", protocol)
+ }
+}
+
+func netConnect(ip net.IP, port int, protocol corev1.Protocol, expectPayload string) (error, error) {
+ addr := (&net.TCPAddr{IP: ip, Port: port}).String()
+ payload := make([]byte, len(expectPayload))
+ switch protocol {
+ case corev1.ProtocolTCP:
+ conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
+ if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) {
+ return err, nil
+ } else if err != nil {
+ return nil, err
+ }
+ defer conn.Close()
+ _, err = conn.Read(payload)
+ if err != nil {
+ return nil, err
+ }
+ case corev1.ProtocolUDP:
+ conn, err := net.Dial("udp", addr)
+ if err != nil {
+ return nil, err
+ }
+ defer conn.Close()
+ err = conn.SetDeadline(time.Now().Add(2 * time.Second))
+ if err != nil {
+ return nil, err
+ }
+ _, err = conn.Write([]byte("hello"))
+ if err != nil {
+ return nil, err
+ }
+ _, err = conn.Read(payload)
+ if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) {
+ return err, nil
+ } else if err != nil {
+ return nil, err
+ }
+ default:
+ return nil, fmt.Errorf("unsupported protocol: %q", protocol)
+ }
+ if string(payload) != expectPayload {
+ return nil, fmt.Errorf("wrong payload, expected %q, got %q", expectPayload, payload)
+ }
+ return nil, nil
+}
+
+type fakePod struct {
+ name string
+ ips []podIP
+ netns *os.File
+ rawNetns syscall.RawConn
+ stopCh chan struct{}
+}
+
+type podIP struct {
+ ip net.IP
+ gatewayIP net.IP
+ zeroIP net.IP
+ fullMask net.IPMask
+ zeroMask net.IPMask
+}
+
+func createFakePod(namespace, name string, ips []net.IP) (*fakePod, error) {
+ p := &fakePod{
+ name: fmt.Sprintf("%s/%s", namespace, name),
+ ips: make([]podIP, len(ips)),
+ stopCh: make(chan struct{}),
+ }
+ for i, ip := range ips {
+ p := &p.ips[i]
+ switch {
+ case ip.To4() != nil:
+ p.ip = ip.To4()
+ p.gatewayIP = gatewayIPv4
+ p.zeroIP = net.ParseIP("0.0.0.0").To4()
+ p.fullMask = net.CIDRMask(32, 32)
+ p.zeroMask = net.CIDRMask(0, 32)
+ case ip.To16() != nil:
+ p.ip = ip.To16()
+ p.gatewayIP = gatewayIPv6
+ p.zeroIP = net.ParseIP("::")
+ p.fullMask = net.CIDRMask(128, 128)
+ p.zeroMask = net.CIDRMask(0, 128)
+ default:
+ return nil, fmt.Errorf("invalid IP: %v", ip)
+ }
+ }
+
+ // Create new network namespace.
+ runLockedThread(func() {
+ err := unix.Unshare(unix.CLONE_NEWNET)
+ if err != nil {
+ panic(err)
+ }
+ // Obtain an FD of the new net namespace.
+ p.netns, err = os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid()))
+ if err != nil {
+ panic(err)
+ }
+ })
+
+ var err error
+ p.rawNetns, err = p.netns.SyscallConn()
+ if err != nil {
+ panic(err)
+ }
+
+ // Create veth pair.
+ linkAttrs := netlink.NewLinkAttrs()
+ linkAttrs.Name = fmt.Sprintf("veth_%s_%s", namespace, name)
+ if len(linkAttrs.Name) > 15 {
+ hash := sha256.Sum256([]byte(linkAttrs.Name))
+ linkAttrs.Name = fmt.Sprintf("veth_%x", hash[:5])
+ }
+ linkAttrs.Group = podIfaceGroup
+ linkAttrs.Flags = net.FlagUp
+ p.rawNetns.Control(func(fd uintptr) {
+ err = netlink.LinkAdd(&netlink.Veth{LinkAttrs: linkAttrs, PeerName: "veth", PeerNamespace: netlink.NsFd(fd)})
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ veth, err := netlink.LinkByName(linkAttrs.Name)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, ip := range p.ips {
+ // Add gateway address to link.
+ addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD}
+ err = netlink.AddrAdd(veth, addr)
+ if err != nil {
+ return nil, err
+ }
+
+ // Add route.
+ err = netlink.RouteAdd(&netlink.Route{
+ LinkIndex: veth.Attrs().Index,
+ Scope: netlink.SCOPE_HOST,
+ Dst: &net.IPNet{IP: ip.ip, Mask: ip.fullMask},
+ })
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ p.runInNetns(func() {
+ // Enable loopback traffic in the pod.
+ loopback, err := netlink.LinkByName("lo")
+ if err != nil {
+ panic(err)
+ }
+ err = netlink.LinkSetUp(loopback)
+ if err != nil {
+ panic(err)
+ }
+
+ // Set up the veth in the pod namespace.
+ veth, err := netlink.LinkByName("veth")
+ if err != nil {
+ panic(err)
+ }
+ err = netlink.LinkSetUp(veth)
+ if err != nil {
+ panic(err)
+ }
+
+ for _, ip := range p.ips {
+ // IFA_F_NODAD disables duplicate address detection for IPv6.
+ addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.ip, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD}
+ err = netlink.AddrAdd(veth, addr)
+ if err != nil {
+ panic(err)
+ }
+
+ err = netlink.RouteAdd(&netlink.Route{
+ LinkIndex: veth.Attrs().Index,
+ Scope: netlink.SCOPE_LINK,
+ Dst: &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask},
+ Src: ip.ip,
+ })
+ if err != nil {
+ panic(err)
+ }
+ err = netlink.RouteAdd(&netlink.Route{
+ LinkIndex: veth.Attrs().Index,
+ Scope: netlink.SCOPE_UNIVERSE,
+ Dst: &net.IPNet{IP: ip.zeroIP, Mask: ip.zeroMask},
+ Gw: ip.gatewayIP,
+ })
+ if err != nil {
+ panic(err)
+ }
+ }
+ })
+
+ for _, protocol := range serverProtocols {
+ for _, port := range serverPorts {
+ go p.runInNetns(func() {
+ err := netListen(port, protocol, p.name, p.stopCh)
+ select {
+ case <-p.stopCh:
+ default:
+ panic(err)
+ }
+ })
+ }
+ }
+
+ return p, nil
+}
+
+func (p *fakePod) stop() {
+ close(p.stopCh)
+ p.netns.Close()
+}
+
+func (p *fakePod) runInNetns(f func()) {
+ runLockedThread(func() {
+ p.rawNetns.Control(func(fd uintptr) {
+ err := unix.Setns(int(fd), unix.CLONE_NEWNET)
+ if err != nil {
+ panic(err)
+ }
+ })
+ f()
+ })
+}
+
+func (p *fakePod) connect(target *fakePod, port int, protocol corev1.Protocol) (error, error) {
+ var errConnectivity, err error
+ p.runInNetns(func() {
+ errConnectivity, err = netConnect(target.ips[testIPIndex].ip, port, protocol, target.name)
+ })
+ return errConnectivity, err
+}
+
+// runLockedThread runs f locked on an OS thread, which allows f to switch to a
+// different network namespace. After f returns, the previous network namespace
+// is restored, and if that succeeds, the thread is unlocked, allowing the
+// thread to be reused for other goroutines.
+func runLockedThread(f func()) {
+ runtime.LockOSThread()
+ oldNs, err := os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid()))
+ if err != nil {
+ panic(err)
+ }
+ defer oldNs.Close()
+ f()
+ err = unix.Setns(int(oldNs.Fd()), unix.CLONE_NEWNET)
+ if err == nil {
+ runtime.UnlockOSThread()
+ }
+}
+
+type fakeKubernetes struct {
+ *kube.MockKubernetes
+ nft *nftctrl.Controller
+ fakePods map[string]*fakePod
+ nextPodIP uint64
+}
+
+func (k *fakeKubernetes) CreateNamespace(kubeNamespace *corev1.Namespace) (*corev1.Namespace, error) {
+ kubeNamespace, err := k.MockKubernetes.CreateNamespace(kubeNamespace)
+ if err == nil && k.nft != nil {
+ k.nft.SetNamespace(kubeNamespace.Name, kubeNamespace)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubeNamespace, err
+}
+
+func (k *fakeKubernetes) SetNamespaceLabels(namespace string, labels map[string]string) (*corev1.Namespace, error) {
+ kubeNamespace, err := k.MockKubernetes.SetNamespaceLabels(namespace, labels)
+ if err == nil && k.nft != nil {
+ k.nft.SetNamespace(namespace, kubeNamespace)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubeNamespace, err
+}
+
+func (k *fakeKubernetes) DeleteNamespace(namespace string) error {
+ err := k.MockKubernetes.DeleteNamespace(namespace)
+ if err == nil && k.nft != nil {
+ k.nft.SetNamespace(namespace, nil)
+ if err := k.nft.Flush(); err != nil {
+ return err
+ }
+ }
+ return err
+}
+
+func (k *fakeKubernetes) CreateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
+ kubePolicy, err := k.MockKubernetes.CreateNetworkPolicy(kubePolicy)
+ if err == nil && k.nft != nil {
+ k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubePolicy, err
+}
+
+func (k *fakeKubernetes) UpdateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
+ kubePolicy, err := k.MockKubernetes.UpdateNetworkPolicy(kubePolicy)
+ if err == nil && k.nft != nil {
+ k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubePolicy, err
+}
+
+func (k *fakeKubernetes) DeleteNetworkPolicy(namespace string, name string) error {
+ err := k.MockKubernetes.DeleteNetworkPolicy(namespace, name)
+ if err == nil && k.nft != nil {
+ k.nft.SetNetworkPolicy(cache.NewObjectName(namespace, name), nil)
+ if err := k.nft.Flush(); err != nil {
+ return err
+ }
+ }
+ return err
+}
+
+func (k *fakeKubernetes) DeleteAllNetworkPoliciesInNamespace(namespace string) error {
+ policies, err := k.GetNetworkPoliciesInNamespace(namespace)
+ if err == nil && k.nft != nil {
+ for _, kubePolicy := range policies {
+ k.nft.SetNetworkPolicy(cache.MetaObjectToName(&kubePolicy), nil)
+ if err := k.nft.Flush(); err != nil {
+ return err
+ }
+ }
+ }
+ return k.MockKubernetes.DeleteAllNetworkPoliciesInNamespace(namespace)
+}
+
+func (k *fakeKubernetes) CreatePod(kubePod *corev1.Pod) (*corev1.Pod, error) {
+ kubePod, err := k.MockKubernetes.CreatePod(kubePod)
+ objectName := cache.MetaObjectToName(kubePod)
+ if err == nil {
+ ipSuffix := []byte{uint8(k.nextPodIP >> 8), uint8(k.nextPodIP)}
+ podIPv4 := slices.Concat(gatewayIPv4[:2], ipSuffix)
+ podIPv6 := slices.Concat(gatewayIPv6[:14], ipSuffix)
+ k.nextPodIP++
+ kubePod.Status.PodIPs = []corev1.PodIP{{IP: podIPv4.String()}, {IP: podIPv6.String()}}
+ kubePod.Status.PodIP = kubePod.Status.PodIPs[testIPIndex].IP
+ ips := []net.IP{podIPv4, podIPv6}
+ fakePod, err := createFakePod(kubePod.Namespace, kubePod.Name, ips)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create fake pod: %w", err)
+ }
+ k.fakePods[objectName.String()] = fakePod
+ }
+ if err == nil && k.nft != nil {
+ k.nft.SetPod(objectName, kubePod)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubePod, err
+}
+
+func (k *fakeKubernetes) DeletePod(namespace string, pod string) error {
+ err := k.MockKubernetes.DeletePod(namespace, pod)
+ objectName := cache.NewObjectName(namespace, pod)
+ if err == nil {
+ k.fakePods[objectName.String()].stop()
+ delete(k.fakePods, objectName.String())
+ }
+ if err == nil && k.nft != nil {
+ k.nft.SetPod(objectName, nil)
+ if err := k.nft.Flush(); err != nil {
+ return err
+ }
+ }
+ return err
+}
+
+func (k *fakeKubernetes) SetPodLabels(namespace string, pod string, labels map[string]string) (*corev1.Pod, error) {
+ kubePod, err := k.MockKubernetes.SetPodLabels(namespace, pod, labels)
+ if err == nil && k.nft != nil {
+ k.nft.SetPod(cache.MetaObjectToName(kubePod), kubePod)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubePod, err
+}
+
+var protocolMap = map[string]corev1.Protocol{
+ "tcp": corev1.ProtocolTCP,
+ "udp": corev1.ProtocolUDP,
+ "sctp": corev1.ProtocolSCTP,
+}
+
+func (k *fakeKubernetes) ExecuteRemoteCommand(namespace string, pod string, container string, command []string) (string, string, error, error) {
+ // command is expected to have this format:
+ // /agnhost connect s-x-a.x.svc.cluster.local:80 --timeout=1s --protocol=tcp
+ if len(command) != 5 {
+ return "", "", nil, fmt.Errorf("unexpected command length: %v", command)
+ }
+ targetService, targetPortStr, ok := strings.Cut(command[2], ".svc.cluster.local:")
+ if !ok {
+ return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2])
+ }
+ targetService, targetNamespace, ok := strings.Cut(targetService, ".")
+ if !ok {
+ return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2])
+ }
+ targetPod := strings.TrimPrefix(targetService, fmt.Sprintf("s-%s-", targetNamespace))
+ protocol := strings.TrimPrefix(command[4], "--protocol=")
+
+ sourceFakePod := k.fakePods[cache.NewObjectName(namespace, pod).String()]
+ targetFakePod := k.fakePods[cache.NewObjectName(targetNamespace, targetPod).String()]
+
+ targetPort, err := strconv.Atoi(targetPortStr)
+ if err != nil {
+ return "", "", nil, err
+ }
+ kubeProtocol, ok := protocolMap[protocol]
+ if !ok {
+ return "", "", nil, fmt.Errorf("invalid protocol: %q", protocol)
+ }
+
+ connectErr, err := sourceFakePod.connect(targetFakePod, targetPort, kubeProtocol)
+ return "", "", connectErr, err
+}
+
+func (k *fakeKubernetes) initializeNft() error {
+ namespaces, err := k.GetAllNamespaces()
+ if err != nil {
+ return err
+ }
+ for _, kubeNamespace := range namespaces.Items {
+ k.nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
+ pods, err := k.GetPodsInNamespace(kubeNamespace.Name)
+ if err != nil {
+ return err
+ }
+ for _, kubePod := range pods {
+ k.nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
+ }
+ policies, err := k.GetNetworkPoliciesInNamespace(kubeNamespace.Name)
+ if err != nil {
+ return err
+ }
+ if len(policies) != 0 {
+ return fmt.Errorf("expected no initial policies, but found %s", cache.MetaObjectToName(&policies[0]).String())
+ }
+ }
+ if err := k.nft.Flush(); err != nil {
+ return fmt.Errorf("initial flush failed: %w", err)
+ }
+ return nil
+}
+
+type testRecorder struct {
+ t *testing.T
+}
+
+func (t *testRecorder) Event(object kruntime.Object, eventtype, reason, message string) {
+ if eventtype == corev1.EventTypeWarning {
+ t.t.Errorf("Warning event: object %v, %s: %s", object, reason, message)
+ } else {
+ t.t.Logf("%s event: object %v, %s: %s", eventtype, object, reason, message)
+ }
+}
+
+func (t *testRecorder) Eventf(object kruntime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
+ t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
+}
+
+func (t *testRecorder) AnnotatedEventf(object kruntime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
+ t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
+}
+
+const maxNamespaceName = "ns-3456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
+const maxPodName = "pod-456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
+const maxPolicyName = "policy-789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
+
+func setPolicyName(name string) generator.Setter {
+ return func(policy *generator.Netpol) {
+ policy.Name = name
+ }
+}
+
+func extraTestCases() []*generator.TestCase {
+ return []*generator.TestCase{
+ {
+ Description: "Update namespace so that additional peer selector matches",
+ Tags: generator.NewStringSet(generator.TagSetNamespaceLabels),
+ Steps: []*generator.TestStep{
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.CreatePolicy(generator.BuildPolicy(generator.SetPeers(true, []networkingv1.NetworkPolicyPeer{
+ {NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"ns": "y"}}},
+ {NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"new-ns": "qrs"}}},
+ })).NetworkPolicy())),
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.SetNamespaceLabels("y", map[string]string{"ns": "y", "new-ns": "qrs"})),
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.SetNamespaceLabels("y", map[string]string{"ns": "y"})),
+ },
+ },
+ {
+ Description: "Delete one of multiple policies",
+ Tags: generator.NewStringSet(generator.TagDeletePolicy),
+ Steps: []*generator.TestStep{
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy1")).NetworkPolicy()),
+ generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy2"), generator.SetPorts(true, nil)).NetworkPolicy()),
+ generator.DeletePolicy("x", "policy1"),
+ ),
+ },
+ },
+ {
+ Description: "Create namespace, pod and policy with maximum name length",
+ Tags: generator.NewStringSet(generator.TagCreateNamespace),
+ Steps: []*generator.TestStep{
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.CreateNamespace(maxNamespaceName, nil),
+ generator.CreatePod(maxNamespaceName, maxPodName, nil),
+ generator.CreatePolicy(generator.BuildPolicy(
+ generator.SetNamespace(maxNamespaceName),
+ setPolicyName(maxPolicyName),
+ generator.SetPodSelector(metav1.LabelSelector{})).NetworkPolicy())),
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.DeleteNamespace(maxNamespaceName)),
+ },
+ },
+ }
+}
+
+type initTestCase struct {
+ description string
+ init func(*connectivity.TestCaseState, *nftctrl.Controller) error
+}
+
+func initializationTestCases() []initTestCase {
+ return []initTestCase{
+ {
+ description: "Delete rule during initialization",
+ // It is possible that we already receive updates/deletes during
+ // initialization, if one of the resource types has not yet finished
+ // loading. This means that we need to be able to handle such updates
+ // before the first Flush.
+ init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error {
+ namespaces, err := t.Kubernetes.GetAllNamespaces()
+ if err != nil {
+ return err
+ }
+ for _, kubeNamespace := range namespaces.Items {
+ nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
+ pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name)
+ if err != nil {
+ return err
+ }
+ for _, kubePod := range pods {
+ nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
+ }
+ }
+
+ policy1 := generator.BuildPolicy().NetworkPolicy()
+ policy1.Name = "policy1"
+ nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), policy1)
+
+ policy2 := generator.BuildPolicy(generator.SetPorts(true, nil)).NetworkPolicy()
+ policy2.Name = "policy2"
+ if err := t.CreatePolicy(policy2); err != nil {
+ return fmt.Errorf("failed to create policy: %w", err)
+ }
+ nft.SetNetworkPolicy(cache.MetaObjectToName(policy2), policy2)
+
+ // Delete policy 1.
+ nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), nil)
+
+ return nil
+ },
+ },
+ {
+ description: "Initialize namespaces last",
+ init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error {
+ policy := generator.BuildPolicy().NetworkPolicy()
+ policy.Spec.Ingress[0].From[0].NamespaceSelector = nil
+ if err := t.CreatePolicy(policy); err != nil {
+ return fmt.Errorf("failed to create policy: %w", err)
+ }
+
+ namespaces, err := t.Kubernetes.GetAllNamespaces()
+ if err != nil {
+ return err
+ }
+ for _, kubeNamespace := range namespaces.Items {
+ pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name)
+ if err != nil {
+ return err
+ }
+ for _, kubePod := range pods {
+ nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
+ }
+ policies, err := t.Kubernetes.GetNetworkPoliciesInNamespace(kubeNamespace.Name)
+ if err != nil {
+ return err
+ }
+ for _, policy := range policies {
+ nft.SetNetworkPolicy(cache.MetaObjectToName(&policy), &policy)
+ }
+ }
+ for _, kubeNamespace := range namespaces.Items {
+ nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
+ }
+ return nil
+ },
+ },
+ }
+}
+
+func TestCyclonus(t *testing.T) {
+ if os.Getenv("IN_KTEST") != "true" {
+ t.Skip("Not in ktest")
+ }
+
+ if err := os.WriteFile("/proc/sys/net/ipv4/ip_forward", []byte("1\n"), 0644); err != nil {
+ t.Fatalf("Failed to enable IPv4 forwarding: %v", err)
+ }
+ if err := os.WriteFile("/proc/sys/net/ipv6/conf/all/forwarding", []byte("1\n"), 0644); err != nil {
+ t.Fatalf("Failed to enable IPv6 forwarding: %v", err)
+ }
+ // By default, linux rate limits ICMP replies, which slows down our tests.
+ if err := os.WriteFile("/proc/sys/net/ipv4/icmp_ratemask", []byte("0\n"), 0644); err != nil {
+ t.Fatalf("Failed to disable ICMPv4 rate limiting: %v", err)
+ }
+ if err := os.WriteFile("/proc/sys/net/ipv6/icmp/ratemask", []byte("\n"), 0644); err != nil {
+ t.Fatalf("Failed to disable ICMPv6 rate limiting: %v", err)
+ }
+ // Disable IPv6 duplicate address detection, which delays IPv6 connectivity
+ // becoming available.
+ if err := os.WriteFile("/proc/sys/net/ipv6/conf/default/accept_dad", []byte("0\n"), 0644); err != nil {
+ t.Fatalf("Failed to disable IPv6 DAD: %v", err)
+ }
+
+ kubernetes := &fakeKubernetes{
+ MockKubernetes: kube.NewMockKubernetes(1.0),
+ fakePods: make(map[string]*fakePod),
+ nextPodIP: 2,
+ }
+
+ allowDNS := false // This creates policies which allow port 53, we don't need this.
+ serverNamespaces := []string{"x", "y", "z"}
+ serverPods := []string{"a", "b", "c"}
+ externalIPs := []string{}
+ podCreationTimeoutSeconds := 1
+ batchJobs := false
+ imageRegistry := "registry.k8s.io"
+
+ resources, err := probe.NewDefaultResources(kubernetes, serverNamespaces, serverPods, serverPorts, serverProtocols, externalIPs, podCreationTimeoutSeconds, batchJobs, imageRegistry)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ interpreterConfig := &connectivity.InterpreterConfig{
+ ResetClusterBeforeTestCase: false,
+ KubeProbeRetries: 1,
+ PerturbationWaitSeconds: 0,
+ VerifyClusterStateBeforeTestCase: true,
+ BatchJobs: batchJobs,
+ IgnoreLoopback: ignoreLoopback,
+ JobTimeoutSeconds: 1,
+ FailFast: false,
+ }
+ interpreter := connectivity.NewInterpreter(kubernetes, resources, interpreterConfig)
+ printer := &connectivity.Printer{
+ Noisy: false,
+ IgnoreLoopback: ignoreLoopback,
+ JunitResultsFile: "",
+ }
+
+ zcPod, err := resources.GetPod("z", "c")
+ if err != nil {
+ t.Fatal(err)
+ }
+ testCaseGenerator := generator.NewTestCaseGenerator(allowDNS, zcPod.IP, serverNamespaces, nil, nil)
+ testCases := slices.Concat(testCaseGenerator.GenerateTestCases(), extraTestCases())
+
+ for _, testCase := range testCases {
+ for _, step := range testCase.Steps {
+ step.Probe.Mode = generator.ProbeModeServiceName
+ }
+ t.Run(testCase.Description, func(t *testing.T) {
+ recorder := &testRecorder{t: t}
+ nft, err := nftctrl.New(recorder, podIfaceGroup)
+ if err != nil {
+ t.Errorf("Failed to create nftctrl: %v", err)
+ return
+ }
+ defer nft.Close()
+ kubernetes.nft = nft
+
+ if err := kubernetes.initializeNft(); err != nil {
+ t.Errorf("nftctrl initialization failed: %v", err)
+ return
+ }
+
+ result := interpreter.ExecuteTestCase(testCase)
+ if result.Err != nil {
+ t.Error(result.Err)
+ return
+ }
+ if !result.Passed(ignoreLoopback) {
+ printer.PrintTestCaseResult(result)
+
+ cmd := exec.Command("nft", "list", "ruleset")
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ cmd.Run()
+
+ t.Error("connectivity test failed")
+ }
+
+ kubernetes.nft = nil
+ testCaseState := &connectivity.TestCaseState{
+ Kubernetes: kubernetes,
+ Resources: resources,
+ }
+ err = testCaseState.ResetClusterState()
+ if err != nil {
+ t.Errorf("failed to reset cluster: %v", err)
+ }
+
+ // Flush the conntrack table. Otherwise, UDP connectivity tests can
+ // spuriously succeed when they should be blocked, because they match an
+ // entry in the conntrack table from a previous test.
+ err = netlink.ConntrackTableFlush(netlink.ConntrackTable)
+ if err != nil {
+ t.Errorf("failed to flush conntrack table: %v", err)
+ }
+ })
+ }
+
+ initTestCases := initializationTestCases()
+ for _, testCase := range initTestCases {
+ t.Run(testCase.description, func(t *testing.T) {
+ recorder := &testRecorder{t: t}
+ nft, err := nftctrl.New(recorder, podIfaceGroup)
+ if err != nil {
+ t.Errorf("Failed to create nftctrl: %v", err)
+ return
+ }
+ defer nft.Close()
+
+ testCaseState := &connectivity.TestCaseState{
+ Kubernetes: kubernetes,
+ Resources: resources,
+ }
+
+ if err := testCase.init(testCaseState, nft); err != nil {
+ t.Errorf("initialization failed: %v", err)
+ return
+ }
+
+ if err := nft.Flush(); err != nil {
+ t.Errorf("flush failed: %v", err)
+ return
+ }
+
+ parsedPolicy := matcher.BuildNetworkPolicies(true, testCaseState.Policies)
+ jobBuilder := &probe.JobBuilder{TimeoutSeconds: 1}
+ simRunner := probe.NewSimulatedRunner(parsedPolicy, jobBuilder)
+ probeConfig := generator.NewAllAvailable(generator.ProbeModeServiceName)
+ stepResult := connectivity.NewStepResult(
+ simRunner.RunProbeForConfig(probeConfig, testCaseState.Resources),
+ parsedPolicy,
+ slices.Clone(testCaseState.Policies))
+ kubeRunner := probe.NewKubeRunner(kubernetes, 15, jobBuilder)
+ for i := 0; i <= interpreterConfig.KubeProbeRetries; i++ {
+ stepResult.AddKubeProbe(kubeRunner.RunProbeForConfig(probeConfig, testCaseState.Resources))
+ if stepResult.Passed(ignoreLoopback) {
+ break
+ }
+ }
+
+ if !stepResult.Passed(ignoreLoopback) {
+ printer.PrintStep(0, &generator.TestStep{Probe: probeConfig}, stepResult)
+
+ cmd := exec.Command("nft", "list", "ruleset")
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ cmd.Run()
+
+ t.Error("connectivity test failed")
+ }
+
+ err = testCaseState.ResetClusterState()
+ if err != nil {
+ t.Errorf("failed to reset cluster: %v", err)
+ }
+
+ // Flush the conntrack table. Otherwise, UDP connectivity tests can
+ // spuriously succeed when they should be blocked, because they match an
+ // entry in the conntrack table from a previous test.
+ err = netlink.ConntrackTableFlush(netlink.ConntrackTable)
+ if err != nil {
+ t.Errorf("failed to flush conntrack table: %v", err)
+ }
+ })
+ }
+}