m/n/kubernetes/networkpolicy: add Cyclonus test suite

This adds a test for the network policy controller, based on the
Cyclonus test suite. Running Cyclonus on a real cluster takes multiple
hours, as there are over 200 test cases, each of which takes around 1
minute. The test implemented here uses a fake Kubernetes API and pods,
which allows running all tests in around 15 seconds.

IPv6 is partially implemented but disabled. The tests pass, but each
test takes around 2 seconds, because some ICMPv6 replies for blocked TCP
connections seem to get lost somewhere and are only processed when the
TCP SYN is resent one second later.

Change-Id: Id77f2dd4d884b6d156e238e07e88c222e3bbe9a2
Reviewed-on: https://review.monogon.dev/c/monogon/+/3905
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/build/bazel/go.MODULE.bazel b/build/bazel/go.MODULE.bazel
index a723144..5afba0f 100644
--- a/build/bazel/go.MODULE.bazel
+++ b/build/bazel/go.MODULE.bazel
@@ -41,6 +41,7 @@
     "com_github_kballard_go_shellquote",
     "com_github_klauspost_compress",
     "com_github_lib_pq",
+    "com_github_mattfenwick_cyclonus",
     "com_github_mattn_go_shellwords",
     "com_github_mdlayher_arp",
     "com_github_mdlayher_ethernet",
diff --git a/go.mod b/go.mod
index b02c7b3..aba53ba 100644
--- a/go.mod
+++ b/go.mod
@@ -90,6 +90,7 @@
 	github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
 	github.com/klauspost/compress v1.17.11
 	github.com/lib/pq v1.10.9
+	github.com/mattfenwick/cyclonus v0.5.6
 	github.com/mattn/go-shellwords v1.0.12
 	github.com/mdlayher/arp v0.0.0-20220512170110-6706a2966875
 	github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118
@@ -253,7 +254,9 @@
 	github.com/go-openapi/jsonpointer v0.21.0 // indirect
 	github.com/go-openapi/jsonreference v0.20.2 // indirect
 	github.com/go-openapi/swag v0.23.0 // indirect
+	github.com/go-resty/resty/v2 v2.7.0 // indirect
 	github.com/go-sql-driver/mysql v1.7.1 // indirect
+	github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
 	github.com/godbus/dbus/v5 v5.1.0 // indirect
 	github.com/gofrs/flock v0.8.1 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
@@ -265,6 +268,7 @@
 	github.com/google/gnostic-models v0.6.8 // indirect
 	github.com/google/go-dap v0.12.0 // indirect
 	github.com/google/gofuzz v1.2.0 // indirect
+	github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db // indirect
 	github.com/google/s2a-go v0.1.8 // indirect
 	github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
 	github.com/google/subcommands v1.0.2-0.20190508160503-636abe8753b8 // indirect
@@ -298,6 +302,7 @@
 	github.com/jpillora/backoff v1.0.0 // indirect
 	github.com/jsimonetti/rtnetlink v1.4.1 // indirect
 	github.com/json-iterator/go v1.1.12 // indirect
+	github.com/jstemmer/go-junit-report v0.9.1 // indirect
 	github.com/karrick/godirwalk v1.17.0 // indirect
 	github.com/kr/fs v0.1.0 // indirect
 	github.com/kr/pty v1.1.8 // indirect
@@ -308,6 +313,7 @@
 	github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
 	github.com/lufia/iostat v1.2.1 // indirect
 	github.com/mailru/easyjson v0.7.7 // indirect
+	github.com/mattfenwick/collections v0.3.2 // indirect
 	github.com/mattn/go-colorable v0.1.13 // indirect
 	github.com/mattn/go-isatty v0.0.20 // indirect
 	github.com/mattn/go-runewidth v0.0.15 // indirect
@@ -338,6 +344,9 @@
 	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
 	github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
 	github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
+	github.com/olekukonko/tablewriter v0.0.5 // indirect
+	github.com/onsi/ginkgo/v2 v2.22.0 // indirect
+	github.com/onsi/gomega v1.36.0 // indirect
 	github.com/opencontainers/image-spec v1.1.0 // indirect
 	github.com/opencontainers/runtime-spec v1.2.0 // indirect
 	github.com/opencontainers/runtime-tools v0.9.1-0.20221107090550-2e043c6bd626 // indirect
diff --git a/go.sum b/go.sum
index b113d29..b41631a 100644
--- a/go.sum
+++ b/go.sum
@@ -2150,6 +2150,8 @@
 github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow=
 github.com/go-redis/redis v6.15.8+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
 github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
+github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
+github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
 github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
 github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
 github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
@@ -2158,7 +2160,6 @@
 github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
-github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
 github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
 github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
 github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
@@ -2690,6 +2691,7 @@
 github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
 github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
+github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o=
 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
 github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
@@ -2796,6 +2798,10 @@
 github.com/markbates/pkger v0.15.1/go.mod h1:0JoVlrol20BSywW79rN3kdFFsE5xYM+rSCQDXbLhiuI=
 github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
 github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho=
+github.com/mattfenwick/collections v0.3.2 h1:sdEBe7/cL+dK7CZ0SXod07qzCpoGJu2xtOywAF155Us=
+github.com/mattfenwick/collections v0.3.2/go.mod h1:S2z3MxcMsoBiP8Zw4ewmFvQcfq7QfesA7V192udZ0W8=
+github.com/mattfenwick/cyclonus v0.5.6 h1:8bZ+v2Kcn4H7Z+OhX1l3BQrAHqUdmBwdBBiDIk3Wj7s=
+github.com/mattfenwick/cyclonus v0.5.6/go.mod h1:xGvx12xAzlAOEfqih/19zqOquWD477qXyGQvQz6f3MU=
 github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
 github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
 github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
@@ -3005,6 +3011,7 @@
 github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
 github.com/olekukonko/tablewriter v0.0.2/go.mod h1:rSAaSIOAGT9odnlyGlUfAJaoc5w2fSBUmeGDbRWPxyQ=
 github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
+github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
 github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
 github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@@ -3015,7 +3022,6 @@
 github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg=
 github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
 github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1lskyM0=
-github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
 github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
 github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
 github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU=
@@ -3955,6 +3961,7 @@
 golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20211111083644-e5c967477495/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
diff --git a/metropolis/node/kubernetes/networkpolicy/BUILD.bazel b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
index d3d3b76..287427e 100644
--- a/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
+++ b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
@@ -1,4 +1,5 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+load("//osbase/test/ktest:ktest.bzl", "k_test")
 
 go_library(
     name = "networkpolicy",
@@ -21,3 +22,28 @@
         "@org_dolansoft_git_dolansoft_k8s_nft_npc//nftctrl",
     ],
 )
+
+go_test(
+    name = "networkpolicy_test",
+    srcs = ["networkpolicy_test.go"],
+    deps = [
+        "@com_github_mattfenwick_cyclonus//pkg/connectivity",
+        "@com_github_mattfenwick_cyclonus//pkg/connectivity/probe",
+        "@com_github_mattfenwick_cyclonus//pkg/generator",
+        "@com_github_mattfenwick_cyclonus//pkg/kube",
+        "@com_github_mattfenwick_cyclonus//pkg/matcher",
+        "@com_github_vishvananda_netlink//:netlink",
+        "@io_k8s_api//core/v1:core",
+        "@io_k8s_api//networking/v1:networking",
+        "@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
+        "@io_k8s_apimachinery//pkg/runtime",
+        "@io_k8s_client_go//tools/cache",
+        "@org_dolansoft_git_dolansoft_k8s_nft_npc//nftctrl",
+        "@org_golang_x_sys//unix",
+    ],
+)
+
+k_test(
+    name = "ktest",
+    tester = ":networkpolicy_test",
+)
diff --git a/metropolis/node/kubernetes/networkpolicy/networkpolicy_test.go b/metropolis/node/kubernetes/networkpolicy/networkpolicy_test.go
new file mode 100644
index 0000000..22cf569
--- /dev/null
+++ b/metropolis/node/kubernetes/networkpolicy/networkpolicy_test.go
@@ -0,0 +1,927 @@
+// Copyright The Monogon Project Authors.
+// SPDX-License-Identifier: Apache-2.0
+
+// This package tests the network policy controller, using the Cyclonus network
+// policy conformance test suite. It uses the real policy controller and
+// nftables controller, but fake Kubernetes API and pods, which allows the tests
+// to run much faster than would be possible with the real API server and pods.
+//
+// By default, the test runs in a ktest, which means we are testing with the
+// same kernel version, and thus nftables implementation, which is used in
+// Monogon OS. But you can also run tests manually in a new user and network
+// namespace, which allows you to use all the debugging tools available on your
+// machine. Useful commands:
+//
+//	bazel build //metropolis/node/kubernetes/networkpolicy:networkpolicy_test
+//		Build the test.
+//	unshare --map-user=0 --net
+//		Create a user and network namespace.
+//	tcpdump -i any &
+//		Capture traffic in the background.
+//	IN_KTEST=true bazel-bin/metropolis/node/kubernetes/networkpolicy/networkpolicy_test_/networkpolicy_test
+//		Run the test.
+package networkpolicy_test
+
+import (
+	"crypto/sha256"
+	"errors"
+	"fmt"
+	"net"
+	"os"
+	"os/exec"
+	"runtime"
+	"slices"
+	"strconv"
+	"strings"
+	"syscall"
+	"testing"
+	"time"
+
+	"git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl"
+	"github.com/mattfenwick/cyclonus/pkg/connectivity"
+	"github.com/mattfenwick/cyclonus/pkg/connectivity/probe"
+	"github.com/mattfenwick/cyclonus/pkg/generator"
+	"github.com/mattfenwick/cyclonus/pkg/kube"
+	"github.com/mattfenwick/cyclonus/pkg/matcher"
+	"github.com/vishvananda/netlink"
+	"golang.org/x/sys/unix"
+	corev1 "k8s.io/api/core/v1"
+	networkingv1 "k8s.io/api/networking/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	kruntime "k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/client-go/tools/cache"
+)
+
+// Change this to 1 to test IPv6 instead of IPv4. Tests pass with IPv6, but run
+// much slower, apparently because some ICMPv6 packets are dropped and need to
+// be retransmitted.
+//
+// TODO: Fix slow IPv6 tests and run them automatically, e.g. as a separate
+// ktest with a flag to select IPv6.
+const testIPIndex = 0
+
+const podIfaceGroup uint32 = 8
+
+// Loopback traffic is only affected by network policies when connecting through
+// a ClusterIP service. For now, we don't simulate service IPs, so this is
+// disabled.
+const ignoreLoopback = true
+
+var serverPorts = []int{80, 81}
+var serverProtocols = []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}
+
+var gatewayIPv4 = net.ParseIP("10.8.0.1").To4()
+var gatewayIPv6 = net.ParseIP("fd08::1")
+
+func netListen(port int, protocol corev1.Protocol, payload string, stop <-chan struct{}) error {
+	switch protocol {
+	case corev1.ProtocolTCP:
+		listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: port})
+		if err != nil {
+			return err
+		}
+		go func() {
+			<-stop
+			listener.Close()
+		}()
+		for {
+			conn, err := listener.Accept()
+			if err != nil {
+				return err
+			}
+			conn.Write([]byte(payload))
+			conn.Close()
+		}
+	case corev1.ProtocolUDP:
+		conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: port})
+		if err != nil {
+			return err
+		}
+		go func() {
+			<-stop
+			conn.Close()
+		}()
+		var buf [16]byte
+		for {
+			_, clientAddr, err := conn.ReadFrom(buf[:])
+			if err != nil {
+				return err
+			}
+			conn.WriteTo([]byte(payload), clientAddr)
+		}
+	default:
+		return fmt.Errorf("unsupported protocol: %q", protocol)
+	}
+}
+
+func netConnect(ip net.IP, port int, protocol corev1.Protocol, expectPayload string) (error, error) {
+	addr := (&net.TCPAddr{IP: ip, Port: port}).String()
+	payload := make([]byte, len(expectPayload))
+	switch protocol {
+	case corev1.ProtocolTCP:
+		conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
+		if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) {
+			return err, nil
+		} else if err != nil {
+			return nil, err
+		}
+		defer conn.Close()
+		_, err = conn.Read(payload)
+		if err != nil {
+			return nil, err
+		}
+	case corev1.ProtocolUDP:
+		conn, err := net.Dial("udp", addr)
+		if err != nil {
+			return nil, err
+		}
+		defer conn.Close()
+		err = conn.SetDeadline(time.Now().Add(2 * time.Second))
+		if err != nil {
+			return nil, err
+		}
+		_, err = conn.Write([]byte("hello"))
+		if err != nil {
+			return nil, err
+		}
+		_, err = conn.Read(payload)
+		if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) {
+			return err, nil
+		} else if err != nil {
+			return nil, err
+		}
+	default:
+		return nil, fmt.Errorf("unsupported protocol: %q", protocol)
+	}
+	if string(payload) != expectPayload {
+		return nil, fmt.Errorf("wrong payload, expected %q, got %q", expectPayload, payload)
+	}
+	return nil, nil
+}
+
+type fakePod struct {
+	name     string
+	ips      []podIP
+	netns    *os.File
+	rawNetns syscall.RawConn
+	stopCh   chan struct{}
+}
+
+type podIP struct {
+	ip        net.IP
+	gatewayIP net.IP
+	zeroIP    net.IP
+	fullMask  net.IPMask
+	zeroMask  net.IPMask
+}
+
+func createFakePod(namespace, name string, ips []net.IP) (*fakePod, error) {
+	p := &fakePod{
+		name:   fmt.Sprintf("%s/%s", namespace, name),
+		ips:    make([]podIP, len(ips)),
+		stopCh: make(chan struct{}),
+	}
+	for i, ip := range ips {
+		p := &p.ips[i]
+		switch {
+		case ip.To4() != nil:
+			p.ip = ip.To4()
+			p.gatewayIP = gatewayIPv4
+			p.zeroIP = net.ParseIP("0.0.0.0").To4()
+			p.fullMask = net.CIDRMask(32, 32)
+			p.zeroMask = net.CIDRMask(0, 32)
+		case ip.To16() != nil:
+			p.ip = ip.To16()
+			p.gatewayIP = gatewayIPv6
+			p.zeroIP = net.ParseIP("::")
+			p.fullMask = net.CIDRMask(128, 128)
+			p.zeroMask = net.CIDRMask(0, 128)
+		default:
+			return nil, fmt.Errorf("invalid IP: %v", ip)
+		}
+	}
+
+	// Create new network namespace.
+	runLockedThread(func() {
+		err := unix.Unshare(unix.CLONE_NEWNET)
+		if err != nil {
+			panic(err)
+		}
+		// Obtain an FD of the new net namespace.
+		p.netns, err = os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid()))
+		if err != nil {
+			panic(err)
+		}
+	})
+
+	var err error
+	p.rawNetns, err = p.netns.SyscallConn()
+	if err != nil {
+		panic(err)
+	}
+
+	// Create veth pair.
+	linkAttrs := netlink.NewLinkAttrs()
+	linkAttrs.Name = fmt.Sprintf("veth_%s_%s", namespace, name)
+	if len(linkAttrs.Name) > 15 {
+		hash := sha256.Sum256([]byte(linkAttrs.Name))
+		linkAttrs.Name = fmt.Sprintf("veth_%x", hash[:5])
+	}
+	linkAttrs.Group = podIfaceGroup
+	linkAttrs.Flags = net.FlagUp
+	p.rawNetns.Control(func(fd uintptr) {
+		err = netlink.LinkAdd(&netlink.Veth{LinkAttrs: linkAttrs, PeerName: "veth", PeerNamespace: netlink.NsFd(fd)})
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	veth, err := netlink.LinkByName(linkAttrs.Name)
+	if err != nil {
+		return nil, err
+	}
+
+	for _, ip := range p.ips {
+		// Add gateway address to link.
+		addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD}
+		err = netlink.AddrAdd(veth, addr)
+		if err != nil {
+			return nil, err
+		}
+
+		// Add route.
+		err = netlink.RouteAdd(&netlink.Route{
+			LinkIndex: veth.Attrs().Index,
+			Scope:     netlink.SCOPE_HOST,
+			Dst:       &net.IPNet{IP: ip.ip, Mask: ip.fullMask},
+		})
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	p.runInNetns(func() {
+		// Enable loopback traffic in the pod.
+		loopback, err := netlink.LinkByName("lo")
+		if err != nil {
+			panic(err)
+		}
+		err = netlink.LinkSetUp(loopback)
+		if err != nil {
+			panic(err)
+		}
+
+		// Set up the veth in the pod namespace.
+		veth, err := netlink.LinkByName("veth")
+		if err != nil {
+			panic(err)
+		}
+		err = netlink.LinkSetUp(veth)
+		if err != nil {
+			panic(err)
+		}
+
+		for _, ip := range p.ips {
+			// IFA_F_NODAD disables duplicate address detection for IPv6.
+			addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.ip, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD}
+			err = netlink.AddrAdd(veth, addr)
+			if err != nil {
+				panic(err)
+			}
+
+			err = netlink.RouteAdd(&netlink.Route{
+				LinkIndex: veth.Attrs().Index,
+				Scope:     netlink.SCOPE_LINK,
+				Dst:       &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask},
+				Src:       ip.ip,
+			})
+			if err != nil {
+				panic(err)
+			}
+			err = netlink.RouteAdd(&netlink.Route{
+				LinkIndex: veth.Attrs().Index,
+				Scope:     netlink.SCOPE_UNIVERSE,
+				Dst:       &net.IPNet{IP: ip.zeroIP, Mask: ip.zeroMask},
+				Gw:        ip.gatewayIP,
+			})
+			if err != nil {
+				panic(err)
+			}
+		}
+	})
+
+	for _, protocol := range serverProtocols {
+		for _, port := range serverPorts {
+			go p.runInNetns(func() {
+				err := netListen(port, protocol, p.name, p.stopCh)
+				select {
+				case <-p.stopCh:
+				default:
+					panic(err)
+				}
+			})
+		}
+	}
+
+	return p, nil
+}
+
+func (p *fakePod) stop() {
+	close(p.stopCh)
+	p.netns.Close()
+}
+
+func (p *fakePod) runInNetns(f func()) {
+	runLockedThread(func() {
+		p.rawNetns.Control(func(fd uintptr) {
+			err := unix.Setns(int(fd), unix.CLONE_NEWNET)
+			if err != nil {
+				panic(err)
+			}
+		})
+		f()
+	})
+}
+
+func (p *fakePod) connect(target *fakePod, port int, protocol corev1.Protocol) (error, error) {
+	var errConnectivity, err error
+	p.runInNetns(func() {
+		errConnectivity, err = netConnect(target.ips[testIPIndex].ip, port, protocol, target.name)
+	})
+	return errConnectivity, err
+}
+
+// runLockedThread runs f locked on an OS thread, which allows f to switch to a
+// different network namespace. After f returns, the previous network namespace
+// is restored, and if that succeeds, the thread is unlocked, allowing the
+// thread to be reused for other goroutines.
+func runLockedThread(f func()) {
+	runtime.LockOSThread()
+	oldNs, err := os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid()))
+	if err != nil {
+		panic(err)
+	}
+	defer oldNs.Close()
+	f()
+	err = unix.Setns(int(oldNs.Fd()), unix.CLONE_NEWNET)
+	if err == nil {
+		runtime.UnlockOSThread()
+	}
+}
+
+type fakeKubernetes struct {
+	*kube.MockKubernetes
+	nft       *nftctrl.Controller
+	fakePods  map[string]*fakePod
+	nextPodIP uint64
+}
+
+func (k *fakeKubernetes) CreateNamespace(kubeNamespace *corev1.Namespace) (*corev1.Namespace, error) {
+	kubeNamespace, err := k.MockKubernetes.CreateNamespace(kubeNamespace)
+	if err == nil && k.nft != nil {
+		k.nft.SetNamespace(kubeNamespace.Name, kubeNamespace)
+		if err := k.nft.Flush(); err != nil {
+			return nil, err
+		}
+	}
+	return kubeNamespace, err
+}
+
+func (k *fakeKubernetes) SetNamespaceLabels(namespace string, labels map[string]string) (*corev1.Namespace, error) {
+	kubeNamespace, err := k.MockKubernetes.SetNamespaceLabels(namespace, labels)
+	if err == nil && k.nft != nil {
+		k.nft.SetNamespace(namespace, kubeNamespace)
+		if err := k.nft.Flush(); err != nil {
+			return nil, err
+		}
+	}
+	return kubeNamespace, err
+}
+
+func (k *fakeKubernetes) DeleteNamespace(namespace string) error {
+	err := k.MockKubernetes.DeleteNamespace(namespace)
+	if err == nil && k.nft != nil {
+		k.nft.SetNamespace(namespace, nil)
+		if err := k.nft.Flush(); err != nil {
+			return err
+		}
+	}
+	return err
+}
+
+func (k *fakeKubernetes) CreateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
+	kubePolicy, err := k.MockKubernetes.CreateNetworkPolicy(kubePolicy)
+	if err == nil && k.nft != nil {
+		k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy)
+		if err := k.nft.Flush(); err != nil {
+			return nil, err
+		}
+	}
+	return kubePolicy, err
+}
+
+func (k *fakeKubernetes) UpdateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
+	kubePolicy, err := k.MockKubernetes.UpdateNetworkPolicy(kubePolicy)
+	if err == nil && k.nft != nil {
+		k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy)
+		if err := k.nft.Flush(); err != nil {
+			return nil, err
+		}
+	}
+	return kubePolicy, err
+}
+
+func (k *fakeKubernetes) DeleteNetworkPolicy(namespace string, name string) error {
+	err := k.MockKubernetes.DeleteNetworkPolicy(namespace, name)
+	if err == nil && k.nft != nil {
+		k.nft.SetNetworkPolicy(cache.NewObjectName(namespace, name), nil)
+		if err := k.nft.Flush(); err != nil {
+			return err
+		}
+	}
+	return err
+}
+
+func (k *fakeKubernetes) DeleteAllNetworkPoliciesInNamespace(namespace string) error {
+	policies, err := k.GetNetworkPoliciesInNamespace(namespace)
+	if err == nil && k.nft != nil {
+		for _, kubePolicy := range policies {
+			k.nft.SetNetworkPolicy(cache.MetaObjectToName(&kubePolicy), nil)
+			if err := k.nft.Flush(); err != nil {
+				return err
+			}
+		}
+	}
+	return k.MockKubernetes.DeleteAllNetworkPoliciesInNamespace(namespace)
+}
+
+func (k *fakeKubernetes) CreatePod(kubePod *corev1.Pod) (*corev1.Pod, error) {
+	kubePod, err := k.MockKubernetes.CreatePod(kubePod)
+	objectName := cache.MetaObjectToName(kubePod)
+	if err == nil {
+		ipSuffix := []byte{uint8(k.nextPodIP >> 8), uint8(k.nextPodIP)}
+		podIPv4 := slices.Concat(gatewayIPv4[:2], ipSuffix)
+		podIPv6 := slices.Concat(gatewayIPv6[:14], ipSuffix)
+		k.nextPodIP++
+		kubePod.Status.PodIPs = []corev1.PodIP{{IP: podIPv4.String()}, {IP: podIPv6.String()}}
+		kubePod.Status.PodIP = kubePod.Status.PodIPs[testIPIndex].IP
+		ips := []net.IP{podIPv4, podIPv6}
+		fakePod, err := createFakePod(kubePod.Namespace, kubePod.Name, ips)
+		if err != nil {
+			return nil, fmt.Errorf("failed to create fake pod: %w", err)
+		}
+		k.fakePods[objectName.String()] = fakePod
+	}
+	if err == nil && k.nft != nil {
+		k.nft.SetPod(objectName, kubePod)
+		if err := k.nft.Flush(); err != nil {
+			return nil, err
+		}
+	}
+	return kubePod, err
+}
+
+func (k *fakeKubernetes) DeletePod(namespace string, pod string) error {
+	err := k.MockKubernetes.DeletePod(namespace, pod)
+	objectName := cache.NewObjectName(namespace, pod)
+	if err == nil {
+		k.fakePods[objectName.String()].stop()
+		delete(k.fakePods, objectName.String())
+	}
+	if err == nil && k.nft != nil {
+		k.nft.SetPod(objectName, nil)
+		if err := k.nft.Flush(); err != nil {
+			return err
+		}
+	}
+	return err
+}
+
+func (k *fakeKubernetes) SetPodLabels(namespace string, pod string, labels map[string]string) (*corev1.Pod, error) {
+	kubePod, err := k.MockKubernetes.SetPodLabels(namespace, pod, labels)
+	if err == nil && k.nft != nil {
+		k.nft.SetPod(cache.MetaObjectToName(kubePod), kubePod)
+		if err := k.nft.Flush(); err != nil {
+			return nil, err
+		}
+	}
+	return kubePod, err
+}
+
+var protocolMap = map[string]corev1.Protocol{
+	"tcp":  corev1.ProtocolTCP,
+	"udp":  corev1.ProtocolUDP,
+	"sctp": corev1.ProtocolSCTP,
+}
+
+func (k *fakeKubernetes) ExecuteRemoteCommand(namespace string, pod string, container string, command []string) (string, string, error, error) {
+	// command is expected to have this format:
+	// /agnhost connect s-x-a.x.svc.cluster.local:80 --timeout=1s --protocol=tcp
+	if len(command) != 5 {
+		return "", "", nil, fmt.Errorf("unexpected command length: %v", command)
+	}
+	targetService, targetPortStr, ok := strings.Cut(command[2], ".svc.cluster.local:")
+	if !ok {
+		return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2])
+	}
+	targetService, targetNamespace, ok := strings.Cut(targetService, ".")
+	if !ok {
+		return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2])
+	}
+	targetPod := strings.TrimPrefix(targetService, fmt.Sprintf("s-%s-", targetNamespace))
+	protocol := strings.TrimPrefix(command[4], "--protocol=")
+
+	sourceFakePod := k.fakePods[cache.NewObjectName(namespace, pod).String()]
+	targetFakePod := k.fakePods[cache.NewObjectName(targetNamespace, targetPod).String()]
+
+	targetPort, err := strconv.Atoi(targetPortStr)
+	if err != nil {
+		return "", "", nil, err
+	}
+	kubeProtocol, ok := protocolMap[protocol]
+	if !ok {
+		return "", "", nil, fmt.Errorf("invalid protocol: %q", protocol)
+	}
+
+	connectErr, err := sourceFakePod.connect(targetFakePod, targetPort, kubeProtocol)
+	return "", "", connectErr, err
+}
+
+func (k *fakeKubernetes) initializeNft() error {
+	namespaces, err := k.GetAllNamespaces()
+	if err != nil {
+		return err
+	}
+	for _, kubeNamespace := range namespaces.Items {
+		k.nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
+		pods, err := k.GetPodsInNamespace(kubeNamespace.Name)
+		if err != nil {
+			return err
+		}
+		for _, kubePod := range pods {
+			k.nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
+		}
+		policies, err := k.GetNetworkPoliciesInNamespace(kubeNamespace.Name)
+		if err != nil {
+			return err
+		}
+		if len(policies) != 0 {
+			return fmt.Errorf("expected no initial policies, but found %s", cache.MetaObjectToName(&policies[0]).String())
+		}
+	}
+	if err := k.nft.Flush(); err != nil {
+		return fmt.Errorf("initial flush failed: %w", err)
+	}
+	return nil
+}
+
+type testRecorder struct {
+	t *testing.T
+}
+
+func (t *testRecorder) Event(object kruntime.Object, eventtype, reason, message string) {
+	if eventtype == corev1.EventTypeWarning {
+		t.t.Errorf("Warning event: object %v, %s: %s", object, reason, message)
+	} else {
+		t.t.Logf("%s event: object %v, %s: %s", eventtype, object, reason, message)
+	}
+}
+
+func (t *testRecorder) Eventf(object kruntime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
+	t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
+}
+
+func (t *testRecorder) AnnotatedEventf(object kruntime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
+	t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
+}
+
+const maxNamespaceName = "ns-3456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
+const maxPodName = "pod-456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
+const maxPolicyName = "policy-789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
+
+func setPolicyName(name string) generator.Setter {
+	return func(policy *generator.Netpol) {
+		policy.Name = name
+	}
+}
+
+func extraTestCases() []*generator.TestCase {
+	return []*generator.TestCase{
+		{
+			Description: "Update namespace so that additional peer selector matches",
+			Tags:        generator.NewStringSet(generator.TagSetNamespaceLabels),
+			Steps: []*generator.TestStep{
+				generator.NewTestStep(generator.ProbeAllAvailable,
+					generator.CreatePolicy(generator.BuildPolicy(generator.SetPeers(true, []networkingv1.NetworkPolicyPeer{
+						{NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"ns": "y"}}},
+						{NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"new-ns": "qrs"}}},
+					})).NetworkPolicy())),
+				generator.NewTestStep(generator.ProbeAllAvailable,
+					generator.SetNamespaceLabels("y", map[string]string{"ns": "y", "new-ns": "qrs"})),
+				generator.NewTestStep(generator.ProbeAllAvailable,
+					generator.SetNamespaceLabels("y", map[string]string{"ns": "y"})),
+			},
+		},
+		{
+			Description: "Delete one of multiple policies",
+			Tags:        generator.NewStringSet(generator.TagDeletePolicy),
+			Steps: []*generator.TestStep{
+				generator.NewTestStep(generator.ProbeAllAvailable,
+					generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy1")).NetworkPolicy()),
+					generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy2"), generator.SetPorts(true, nil)).NetworkPolicy()),
+					generator.DeletePolicy("x", "policy1"),
+				),
+			},
+		},
+		{
+			Description: "Create namespace, pod and policy with maximum name length",
+			Tags:        generator.NewStringSet(generator.TagCreateNamespace),
+			Steps: []*generator.TestStep{
+				generator.NewTestStep(generator.ProbeAllAvailable,
+					generator.CreateNamespace(maxNamespaceName, nil),
+					generator.CreatePod(maxNamespaceName, maxPodName, nil),
+					generator.CreatePolicy(generator.BuildPolicy(
+						generator.SetNamespace(maxNamespaceName),
+						setPolicyName(maxPolicyName),
+						generator.SetPodSelector(metav1.LabelSelector{})).NetworkPolicy())),
+				generator.NewTestStep(generator.ProbeAllAvailable,
+					generator.DeleteNamespace(maxNamespaceName)),
+			},
+		},
+	}
+}
+
+type initTestCase struct {
+	description string
+	init        func(*connectivity.TestCaseState, *nftctrl.Controller) error
+}
+
+func initializationTestCases() []initTestCase {
+	return []initTestCase{
+		{
+			description: "Delete rule during initialization",
+			// It is possible that we already receive updates/deletes during
+			// initialization, if one of the resource types has not yet finished
+			// loading. This means that we need to be able to handle such updates
+			// before the first Flush.
+			init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error {
+				namespaces, err := t.Kubernetes.GetAllNamespaces()
+				if err != nil {
+					return err
+				}
+				for _, kubeNamespace := range namespaces.Items {
+					nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
+					pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name)
+					if err != nil {
+						return err
+					}
+					for _, kubePod := range pods {
+						nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
+					}
+				}
+
+				policy1 := generator.BuildPolicy().NetworkPolicy()
+				policy1.Name = "policy1"
+				nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), policy1)
+
+				policy2 := generator.BuildPolicy(generator.SetPorts(true, nil)).NetworkPolicy()
+				policy2.Name = "policy2"
+				if err := t.CreatePolicy(policy2); err != nil {
+					return fmt.Errorf("failed to create policy: %w", err)
+				}
+				nft.SetNetworkPolicy(cache.MetaObjectToName(policy2), policy2)
+
+				// Delete policy 1.
+				nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), nil)
+
+				return nil
+			},
+		},
+		{
+			description: "Initialize namespaces last",
+			init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error {
+				policy := generator.BuildPolicy().NetworkPolicy()
+				policy.Spec.Ingress[0].From[0].NamespaceSelector = nil
+				if err := t.CreatePolicy(policy); err != nil {
+					return fmt.Errorf("failed to create policy: %w", err)
+				}
+
+				namespaces, err := t.Kubernetes.GetAllNamespaces()
+				if err != nil {
+					return err
+				}
+				for _, kubeNamespace := range namespaces.Items {
+					pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name)
+					if err != nil {
+						return err
+					}
+					for _, kubePod := range pods {
+						nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
+					}
+					policies, err := t.Kubernetes.GetNetworkPoliciesInNamespace(kubeNamespace.Name)
+					if err != nil {
+						return err
+					}
+					for _, policy := range policies {
+						nft.SetNetworkPolicy(cache.MetaObjectToName(&policy), &policy)
+					}
+				}
+				for _, kubeNamespace := range namespaces.Items {
+					nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
+				}
+				return nil
+			},
+		},
+	}
+}
+
+func TestCyclonus(t *testing.T) {
+	if os.Getenv("IN_KTEST") != "true" {
+		t.Skip("Not in ktest")
+	}
+
+	if err := os.WriteFile("/proc/sys/net/ipv4/ip_forward", []byte("1\n"), 0644); err != nil {
+		t.Fatalf("Failed to enable IPv4 forwarding: %v", err)
+	}
+	if err := os.WriteFile("/proc/sys/net/ipv6/conf/all/forwarding", []byte("1\n"), 0644); err != nil {
+		t.Fatalf("Failed to enable IPv6 forwarding: %v", err)
+	}
+	// By default, linux rate limits ICMP replies, which slows down our tests.
+	if err := os.WriteFile("/proc/sys/net/ipv4/icmp_ratemask", []byte("0\n"), 0644); err != nil {
+		t.Fatalf("Failed to disable ICMPv4 rate limiting: %v", err)
+	}
+	if err := os.WriteFile("/proc/sys/net/ipv6/icmp/ratemask", []byte("\n"), 0644); err != nil {
+		t.Fatalf("Failed to disable ICMPv6 rate limiting: %v", err)
+	}
+	// Disable IPv6 duplicate address detection, which delays IPv6 connectivity
+	// becoming available.
+	if err := os.WriteFile("/proc/sys/net/ipv6/conf/default/accept_dad", []byte("0\n"), 0644); err != nil {
+		t.Fatalf("Failed to disable IPv6 DAD: %v", err)
+	}
+
+	kubernetes := &fakeKubernetes{
+		MockKubernetes: kube.NewMockKubernetes(1.0),
+		fakePods:       make(map[string]*fakePod),
+		nextPodIP:      2,
+	}
+
+	allowDNS := false // This creates policies which allow port 53, we don't need this.
+	serverNamespaces := []string{"x", "y", "z"}
+	serverPods := []string{"a", "b", "c"}
+	externalIPs := []string{}
+	podCreationTimeoutSeconds := 1
+	batchJobs := false
+	imageRegistry := "registry.k8s.io"
+
+	resources, err := probe.NewDefaultResources(kubernetes, serverNamespaces, serverPods, serverPorts, serverProtocols, externalIPs, podCreationTimeoutSeconds, batchJobs, imageRegistry)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	interpreterConfig := &connectivity.InterpreterConfig{
+		ResetClusterBeforeTestCase:       false,
+		KubeProbeRetries:                 1,
+		PerturbationWaitSeconds:          0,
+		VerifyClusterStateBeforeTestCase: true,
+		BatchJobs:                        batchJobs,
+		IgnoreLoopback:                   ignoreLoopback,
+		JobTimeoutSeconds:                1,
+		FailFast:                         false,
+	}
+	interpreter := connectivity.NewInterpreter(kubernetes, resources, interpreterConfig)
+	printer := &connectivity.Printer{
+		Noisy:            false,
+		IgnoreLoopback:   ignoreLoopback,
+		JunitResultsFile: "",
+	}
+
+	zcPod, err := resources.GetPod("z", "c")
+	if err != nil {
+		t.Fatal(err)
+	}
+	testCaseGenerator := generator.NewTestCaseGenerator(allowDNS, zcPod.IP, serverNamespaces, nil, nil)
+	testCases := slices.Concat(testCaseGenerator.GenerateTestCases(), extraTestCases())
+
+	for _, testCase := range testCases {
+		for _, step := range testCase.Steps {
+			step.Probe.Mode = generator.ProbeModeServiceName
+		}
+		t.Run(testCase.Description, func(t *testing.T) {
+			recorder := &testRecorder{t: t}
+			nft, err := nftctrl.New(recorder, podIfaceGroup)
+			if err != nil {
+				t.Errorf("Failed to create nftctrl: %v", err)
+				return
+			}
+			defer nft.Close()
+			kubernetes.nft = nft
+
+			if err := kubernetes.initializeNft(); err != nil {
+				t.Errorf("nftctrl initialization failed: %v", err)
+				return
+			}
+
+			result := interpreter.ExecuteTestCase(testCase)
+			if result.Err != nil {
+				t.Error(result.Err)
+				return
+			}
+			if !result.Passed(ignoreLoopback) {
+				printer.PrintTestCaseResult(result)
+
+				cmd := exec.Command("nft", "list", "ruleset")
+				cmd.Stdout = os.Stdout
+				cmd.Stderr = os.Stderr
+				cmd.Run()
+
+				t.Error("connectivity test failed")
+			}
+
+			kubernetes.nft = nil
+			testCaseState := &connectivity.TestCaseState{
+				Kubernetes: kubernetes,
+				Resources:  resources,
+			}
+			err = testCaseState.ResetClusterState()
+			if err != nil {
+				t.Errorf("failed to reset cluster: %v", err)
+			}
+
+			// Flush the conntrack table. Otherwise, UDP connectivity tests can
+			// spuriously succeed when they should be blocked, because they match an
+			// entry in the conntrack table from a previous test.
+			err = netlink.ConntrackTableFlush(netlink.ConntrackTable)
+			if err != nil {
+				t.Errorf("failed to flush conntrack table: %v", err)
+			}
+		})
+	}
+
+	initTestCases := initializationTestCases()
+	for _, testCase := range initTestCases {
+		t.Run(testCase.description, func(t *testing.T) {
+			recorder := &testRecorder{t: t}
+			nft, err := nftctrl.New(recorder, podIfaceGroup)
+			if err != nil {
+				t.Errorf("Failed to create nftctrl: %v", err)
+				return
+			}
+			defer nft.Close()
+
+			testCaseState := &connectivity.TestCaseState{
+				Kubernetes: kubernetes,
+				Resources:  resources,
+			}
+
+			if err := testCase.init(testCaseState, nft); err != nil {
+				t.Errorf("initialization failed: %v", err)
+				return
+			}
+
+			if err := nft.Flush(); err != nil {
+				t.Errorf("flush failed: %v", err)
+				return
+			}
+
+			parsedPolicy := matcher.BuildNetworkPolicies(true, testCaseState.Policies)
+			jobBuilder := &probe.JobBuilder{TimeoutSeconds: 1}
+			simRunner := probe.NewSimulatedRunner(parsedPolicy, jobBuilder)
+			probeConfig := generator.NewAllAvailable(generator.ProbeModeServiceName)
+			stepResult := connectivity.NewStepResult(
+				simRunner.RunProbeForConfig(probeConfig, testCaseState.Resources),
+				parsedPolicy,
+				slices.Clone(testCaseState.Policies))
+			kubeRunner := probe.NewKubeRunner(kubernetes, 15, jobBuilder)
+			for i := 0; i <= interpreterConfig.KubeProbeRetries; i++ {
+				stepResult.AddKubeProbe(kubeRunner.RunProbeForConfig(probeConfig, testCaseState.Resources))
+				if stepResult.Passed(ignoreLoopback) {
+					break
+				}
+			}
+
+			if !stepResult.Passed(ignoreLoopback) {
+				printer.PrintStep(0, &generator.TestStep{Probe: probeConfig}, stepResult)
+
+				cmd := exec.Command("nft", "list", "ruleset")
+				cmd.Stdout = os.Stdout
+				cmd.Stderr = os.Stderr
+				cmd.Run()
+
+				t.Error("connectivity test failed")
+			}
+
+			err = testCaseState.ResetClusterState()
+			if err != nil {
+				t.Errorf("failed to reset cluster: %v", err)
+			}
+
+			// Flush the conntrack table. Otherwise, UDP connectivity tests can
+			// spuriously succeed when they should be blocked, because they match an
+			// entry in the conntrack table from a previous test.
+			err = netlink.ConntrackTableFlush(netlink.ConntrackTable)
+			if err != nil {
+				t.Errorf("failed to flush conntrack table: %v", err)
+			}
+		})
+	}
+}