m/n/kubernetes/networkpolicy: add Cyclonus test suite
This adds a test for the network policy controller, based on the
Cyclonus test suite. Running Cyclonus on a real cluster takes multiple
hours, as there are over 200 test cases, each of which takes around 1
minute. The test implemented here uses a fake Kubernetes API and pods,
which allows running all tests in around 15 seconds.
IPv6 is partially implemented but disabled. The tests pass, but each
test takes around 2 seconds, because some ICMPv6 replies for blocked TCP
connections seem to get lost somewhere and are only processed when the
TCP SYN is resent one second later.
Change-Id: Id77f2dd4d884b6d156e238e07e88c222e3bbe9a2
Reviewed-on: https://review.monogon.dev/c/monogon/+/3905
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/kubernetes/networkpolicy/networkpolicy_test.go b/metropolis/node/kubernetes/networkpolicy/networkpolicy_test.go
new file mode 100644
index 0000000..22cf569
--- /dev/null
+++ b/metropolis/node/kubernetes/networkpolicy/networkpolicy_test.go
@@ -0,0 +1,927 @@
+// Copyright The Monogon Project Authors.
+// SPDX-License-Identifier: Apache-2.0
+
+// This package tests the network policy controller, using the Cyclonus network
+// policy conformance test suite. It uses the real policy controller and
+// nftables controller, but fake Kubernetes API and pods, which allows the tests
+// to run much faster than would be possible with the real API server and pods.
+//
+// By default, the test runs in a ktest, which means we are testing with the
+// same kernel version, and thus nftables implementation, which is used in
+// Monogon OS. But you can also run tests manually in a new user and network
+// namespace, which allows you to use all the debugging tools available on your
+// machine. Useful commands:
+//
+// bazel build //metropolis/node/kubernetes/networkpolicy:networkpolicy_test
+// Build the test.
+// unshare --map-user=0 --net
+// Create a user and network namespace.
+// tcpdump -i any &
+// Capture traffic in the background.
+// IN_KTEST=true bazel-bin/metropolis/node/kubernetes/networkpolicy/networkpolicy_test_/networkpolicy_test
+// Run the test.
+package networkpolicy_test
+
+import (
+ "crypto/sha256"
+ "errors"
+ "fmt"
+ "net"
+ "os"
+ "os/exec"
+ "runtime"
+ "slices"
+ "strconv"
+ "strings"
+ "syscall"
+ "testing"
+ "time"
+
+ "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl"
+ "github.com/mattfenwick/cyclonus/pkg/connectivity"
+ "github.com/mattfenwick/cyclonus/pkg/connectivity/probe"
+ "github.com/mattfenwick/cyclonus/pkg/generator"
+ "github.com/mattfenwick/cyclonus/pkg/kube"
+ "github.com/mattfenwick/cyclonus/pkg/matcher"
+ "github.com/vishvananda/netlink"
+ "golang.org/x/sys/unix"
+ corev1 "k8s.io/api/core/v1"
+ networkingv1 "k8s.io/api/networking/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ kruntime "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/tools/cache"
+)
+
+// Change this to 1 to test IPv6 instead of IPv4. Tests pass with IPv6, but run
+// much slower, apparently because some ICMPv6 packets are dropped and need to
+// be retransmitted.
+//
+// TODO: Fix slow IPv6 tests and run them automatically, e.g. as a separate
+// ktest with a flag to select IPv6.
+const testIPIndex = 0
+
+const podIfaceGroup uint32 = 8
+
+// Loopback traffic is only affected by network policies when connecting through
+// a ClusterIP service. For now, we don't simulate service IPs, so this is
+// disabled.
+const ignoreLoopback = true
+
+var serverPorts = []int{80, 81}
+var serverProtocols = []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}
+
+var gatewayIPv4 = net.ParseIP("10.8.0.1").To4()
+var gatewayIPv6 = net.ParseIP("fd08::1")
+
+func netListen(port int, protocol corev1.Protocol, payload string, stop <-chan struct{}) error {
+ switch protocol {
+ case corev1.ProtocolTCP:
+ listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: port})
+ if err != nil {
+ return err
+ }
+ go func() {
+ <-stop
+ listener.Close()
+ }()
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ return err
+ }
+ conn.Write([]byte(payload))
+ conn.Close()
+ }
+ case corev1.ProtocolUDP:
+ conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: port})
+ if err != nil {
+ return err
+ }
+ go func() {
+ <-stop
+ conn.Close()
+ }()
+ var buf [16]byte
+ for {
+ _, clientAddr, err := conn.ReadFrom(buf[:])
+ if err != nil {
+ return err
+ }
+ conn.WriteTo([]byte(payload), clientAddr)
+ }
+ default:
+ return fmt.Errorf("unsupported protocol: %q", protocol)
+ }
+}
+
+func netConnect(ip net.IP, port int, protocol corev1.Protocol, expectPayload string) (error, error) {
+ addr := (&net.TCPAddr{IP: ip, Port: port}).String()
+ payload := make([]byte, len(expectPayload))
+ switch protocol {
+ case corev1.ProtocolTCP:
+ conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
+ if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) {
+ return err, nil
+ } else if err != nil {
+ return nil, err
+ }
+ defer conn.Close()
+ _, err = conn.Read(payload)
+ if err != nil {
+ return nil, err
+ }
+ case corev1.ProtocolUDP:
+ conn, err := net.Dial("udp", addr)
+ if err != nil {
+ return nil, err
+ }
+ defer conn.Close()
+ err = conn.SetDeadline(time.Now().Add(2 * time.Second))
+ if err != nil {
+ return nil, err
+ }
+ _, err = conn.Write([]byte("hello"))
+ if err != nil {
+ return nil, err
+ }
+ _, err = conn.Read(payload)
+ if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) {
+ return err, nil
+ } else if err != nil {
+ return nil, err
+ }
+ default:
+ return nil, fmt.Errorf("unsupported protocol: %q", protocol)
+ }
+ if string(payload) != expectPayload {
+ return nil, fmt.Errorf("wrong payload, expected %q, got %q", expectPayload, payload)
+ }
+ return nil, nil
+}
+
+type fakePod struct {
+ name string
+ ips []podIP
+ netns *os.File
+ rawNetns syscall.RawConn
+ stopCh chan struct{}
+}
+
+type podIP struct {
+ ip net.IP
+ gatewayIP net.IP
+ zeroIP net.IP
+ fullMask net.IPMask
+ zeroMask net.IPMask
+}
+
+func createFakePod(namespace, name string, ips []net.IP) (*fakePod, error) {
+ p := &fakePod{
+ name: fmt.Sprintf("%s/%s", namespace, name),
+ ips: make([]podIP, len(ips)),
+ stopCh: make(chan struct{}),
+ }
+ for i, ip := range ips {
+ p := &p.ips[i]
+ switch {
+ case ip.To4() != nil:
+ p.ip = ip.To4()
+ p.gatewayIP = gatewayIPv4
+ p.zeroIP = net.ParseIP("0.0.0.0").To4()
+ p.fullMask = net.CIDRMask(32, 32)
+ p.zeroMask = net.CIDRMask(0, 32)
+ case ip.To16() != nil:
+ p.ip = ip.To16()
+ p.gatewayIP = gatewayIPv6
+ p.zeroIP = net.ParseIP("::")
+ p.fullMask = net.CIDRMask(128, 128)
+ p.zeroMask = net.CIDRMask(0, 128)
+ default:
+ return nil, fmt.Errorf("invalid IP: %v", ip)
+ }
+ }
+
+ // Create new network namespace.
+ runLockedThread(func() {
+ err := unix.Unshare(unix.CLONE_NEWNET)
+ if err != nil {
+ panic(err)
+ }
+ // Obtain an FD of the new net namespace.
+ p.netns, err = os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid()))
+ if err != nil {
+ panic(err)
+ }
+ })
+
+ var err error
+ p.rawNetns, err = p.netns.SyscallConn()
+ if err != nil {
+ panic(err)
+ }
+
+ // Create veth pair.
+ linkAttrs := netlink.NewLinkAttrs()
+ linkAttrs.Name = fmt.Sprintf("veth_%s_%s", namespace, name)
+ if len(linkAttrs.Name) > 15 {
+ hash := sha256.Sum256([]byte(linkAttrs.Name))
+ linkAttrs.Name = fmt.Sprintf("veth_%x", hash[:5])
+ }
+ linkAttrs.Group = podIfaceGroup
+ linkAttrs.Flags = net.FlagUp
+ p.rawNetns.Control(func(fd uintptr) {
+ err = netlink.LinkAdd(&netlink.Veth{LinkAttrs: linkAttrs, PeerName: "veth", PeerNamespace: netlink.NsFd(fd)})
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ veth, err := netlink.LinkByName(linkAttrs.Name)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, ip := range p.ips {
+ // Add gateway address to link.
+ addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD}
+ err = netlink.AddrAdd(veth, addr)
+ if err != nil {
+ return nil, err
+ }
+
+ // Add route.
+ err = netlink.RouteAdd(&netlink.Route{
+ LinkIndex: veth.Attrs().Index,
+ Scope: netlink.SCOPE_HOST,
+ Dst: &net.IPNet{IP: ip.ip, Mask: ip.fullMask},
+ })
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ p.runInNetns(func() {
+ // Enable loopback traffic in the pod.
+ loopback, err := netlink.LinkByName("lo")
+ if err != nil {
+ panic(err)
+ }
+ err = netlink.LinkSetUp(loopback)
+ if err != nil {
+ panic(err)
+ }
+
+ // Set up the veth in the pod namespace.
+ veth, err := netlink.LinkByName("veth")
+ if err != nil {
+ panic(err)
+ }
+ err = netlink.LinkSetUp(veth)
+ if err != nil {
+ panic(err)
+ }
+
+ for _, ip := range p.ips {
+ // IFA_F_NODAD disables duplicate address detection for IPv6.
+ addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.ip, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD}
+ err = netlink.AddrAdd(veth, addr)
+ if err != nil {
+ panic(err)
+ }
+
+ err = netlink.RouteAdd(&netlink.Route{
+ LinkIndex: veth.Attrs().Index,
+ Scope: netlink.SCOPE_LINK,
+ Dst: &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask},
+ Src: ip.ip,
+ })
+ if err != nil {
+ panic(err)
+ }
+ err = netlink.RouteAdd(&netlink.Route{
+ LinkIndex: veth.Attrs().Index,
+ Scope: netlink.SCOPE_UNIVERSE,
+ Dst: &net.IPNet{IP: ip.zeroIP, Mask: ip.zeroMask},
+ Gw: ip.gatewayIP,
+ })
+ if err != nil {
+ panic(err)
+ }
+ }
+ })
+
+ for _, protocol := range serverProtocols {
+ for _, port := range serverPorts {
+ go p.runInNetns(func() {
+ err := netListen(port, protocol, p.name, p.stopCh)
+ select {
+ case <-p.stopCh:
+ default:
+ panic(err)
+ }
+ })
+ }
+ }
+
+ return p, nil
+}
+
+func (p *fakePod) stop() {
+ close(p.stopCh)
+ p.netns.Close()
+}
+
+func (p *fakePod) runInNetns(f func()) {
+ runLockedThread(func() {
+ p.rawNetns.Control(func(fd uintptr) {
+ err := unix.Setns(int(fd), unix.CLONE_NEWNET)
+ if err != nil {
+ panic(err)
+ }
+ })
+ f()
+ })
+}
+
+func (p *fakePod) connect(target *fakePod, port int, protocol corev1.Protocol) (error, error) {
+ var errConnectivity, err error
+ p.runInNetns(func() {
+ errConnectivity, err = netConnect(target.ips[testIPIndex].ip, port, protocol, target.name)
+ })
+ return errConnectivity, err
+}
+
+// runLockedThread runs f locked on an OS thread, which allows f to switch to a
+// different network namespace. After f returns, the previous network namespace
+// is restored, and if that succeeds, the thread is unlocked, allowing the
+// thread to be reused for other goroutines.
+func runLockedThread(f func()) {
+ runtime.LockOSThread()
+ oldNs, err := os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid()))
+ if err != nil {
+ panic(err)
+ }
+ defer oldNs.Close()
+ f()
+ err = unix.Setns(int(oldNs.Fd()), unix.CLONE_NEWNET)
+ if err == nil {
+ runtime.UnlockOSThread()
+ }
+}
+
+type fakeKubernetes struct {
+ *kube.MockKubernetes
+ nft *nftctrl.Controller
+ fakePods map[string]*fakePod
+ nextPodIP uint64
+}
+
+func (k *fakeKubernetes) CreateNamespace(kubeNamespace *corev1.Namespace) (*corev1.Namespace, error) {
+ kubeNamespace, err := k.MockKubernetes.CreateNamespace(kubeNamespace)
+ if err == nil && k.nft != nil {
+ k.nft.SetNamespace(kubeNamespace.Name, kubeNamespace)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubeNamespace, err
+}
+
+func (k *fakeKubernetes) SetNamespaceLabels(namespace string, labels map[string]string) (*corev1.Namespace, error) {
+ kubeNamespace, err := k.MockKubernetes.SetNamespaceLabels(namespace, labels)
+ if err == nil && k.nft != nil {
+ k.nft.SetNamespace(namespace, kubeNamespace)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubeNamespace, err
+}
+
+func (k *fakeKubernetes) DeleteNamespace(namespace string) error {
+ err := k.MockKubernetes.DeleteNamespace(namespace)
+ if err == nil && k.nft != nil {
+ k.nft.SetNamespace(namespace, nil)
+ if err := k.nft.Flush(); err != nil {
+ return err
+ }
+ }
+ return err
+}
+
+func (k *fakeKubernetes) CreateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
+ kubePolicy, err := k.MockKubernetes.CreateNetworkPolicy(kubePolicy)
+ if err == nil && k.nft != nil {
+ k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubePolicy, err
+}
+
+func (k *fakeKubernetes) UpdateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
+ kubePolicy, err := k.MockKubernetes.UpdateNetworkPolicy(kubePolicy)
+ if err == nil && k.nft != nil {
+ k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubePolicy, err
+}
+
+func (k *fakeKubernetes) DeleteNetworkPolicy(namespace string, name string) error {
+ err := k.MockKubernetes.DeleteNetworkPolicy(namespace, name)
+ if err == nil && k.nft != nil {
+ k.nft.SetNetworkPolicy(cache.NewObjectName(namespace, name), nil)
+ if err := k.nft.Flush(); err != nil {
+ return err
+ }
+ }
+ return err
+}
+
+func (k *fakeKubernetes) DeleteAllNetworkPoliciesInNamespace(namespace string) error {
+ policies, err := k.GetNetworkPoliciesInNamespace(namespace)
+ if err == nil && k.nft != nil {
+ for _, kubePolicy := range policies {
+ k.nft.SetNetworkPolicy(cache.MetaObjectToName(&kubePolicy), nil)
+ if err := k.nft.Flush(); err != nil {
+ return err
+ }
+ }
+ }
+ return k.MockKubernetes.DeleteAllNetworkPoliciesInNamespace(namespace)
+}
+
+func (k *fakeKubernetes) CreatePod(kubePod *corev1.Pod) (*corev1.Pod, error) {
+ kubePod, err := k.MockKubernetes.CreatePod(kubePod)
+ objectName := cache.MetaObjectToName(kubePod)
+ if err == nil {
+ ipSuffix := []byte{uint8(k.nextPodIP >> 8), uint8(k.nextPodIP)}
+ podIPv4 := slices.Concat(gatewayIPv4[:2], ipSuffix)
+ podIPv6 := slices.Concat(gatewayIPv6[:14], ipSuffix)
+ k.nextPodIP++
+ kubePod.Status.PodIPs = []corev1.PodIP{{IP: podIPv4.String()}, {IP: podIPv6.String()}}
+ kubePod.Status.PodIP = kubePod.Status.PodIPs[testIPIndex].IP
+ ips := []net.IP{podIPv4, podIPv6}
+ fakePod, err := createFakePod(kubePod.Namespace, kubePod.Name, ips)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create fake pod: %w", err)
+ }
+ k.fakePods[objectName.String()] = fakePod
+ }
+ if err == nil && k.nft != nil {
+ k.nft.SetPod(objectName, kubePod)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubePod, err
+}
+
+func (k *fakeKubernetes) DeletePod(namespace string, pod string) error {
+ err := k.MockKubernetes.DeletePod(namespace, pod)
+ objectName := cache.NewObjectName(namespace, pod)
+ if err == nil {
+ k.fakePods[objectName.String()].stop()
+ delete(k.fakePods, objectName.String())
+ }
+ if err == nil && k.nft != nil {
+ k.nft.SetPod(objectName, nil)
+ if err := k.nft.Flush(); err != nil {
+ return err
+ }
+ }
+ return err
+}
+
+func (k *fakeKubernetes) SetPodLabels(namespace string, pod string, labels map[string]string) (*corev1.Pod, error) {
+ kubePod, err := k.MockKubernetes.SetPodLabels(namespace, pod, labels)
+ if err == nil && k.nft != nil {
+ k.nft.SetPod(cache.MetaObjectToName(kubePod), kubePod)
+ if err := k.nft.Flush(); err != nil {
+ return nil, err
+ }
+ }
+ return kubePod, err
+}
+
+var protocolMap = map[string]corev1.Protocol{
+ "tcp": corev1.ProtocolTCP,
+ "udp": corev1.ProtocolUDP,
+ "sctp": corev1.ProtocolSCTP,
+}
+
+func (k *fakeKubernetes) ExecuteRemoteCommand(namespace string, pod string, container string, command []string) (string, string, error, error) {
+ // command is expected to have this format:
+ // /agnhost connect s-x-a.x.svc.cluster.local:80 --timeout=1s --protocol=tcp
+ if len(command) != 5 {
+ return "", "", nil, fmt.Errorf("unexpected command length: %v", command)
+ }
+ targetService, targetPortStr, ok := strings.Cut(command[2], ".svc.cluster.local:")
+ if !ok {
+ return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2])
+ }
+ targetService, targetNamespace, ok := strings.Cut(targetService, ".")
+ if !ok {
+ return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2])
+ }
+ targetPod := strings.TrimPrefix(targetService, fmt.Sprintf("s-%s-", targetNamespace))
+ protocol := strings.TrimPrefix(command[4], "--protocol=")
+
+ sourceFakePod := k.fakePods[cache.NewObjectName(namespace, pod).String()]
+ targetFakePod := k.fakePods[cache.NewObjectName(targetNamespace, targetPod).String()]
+
+ targetPort, err := strconv.Atoi(targetPortStr)
+ if err != nil {
+ return "", "", nil, err
+ }
+ kubeProtocol, ok := protocolMap[protocol]
+ if !ok {
+ return "", "", nil, fmt.Errorf("invalid protocol: %q", protocol)
+ }
+
+ connectErr, err := sourceFakePod.connect(targetFakePod, targetPort, kubeProtocol)
+ return "", "", connectErr, err
+}
+
+func (k *fakeKubernetes) initializeNft() error {
+ namespaces, err := k.GetAllNamespaces()
+ if err != nil {
+ return err
+ }
+ for _, kubeNamespace := range namespaces.Items {
+ k.nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
+ pods, err := k.GetPodsInNamespace(kubeNamespace.Name)
+ if err != nil {
+ return err
+ }
+ for _, kubePod := range pods {
+ k.nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
+ }
+ policies, err := k.GetNetworkPoliciesInNamespace(kubeNamespace.Name)
+ if err != nil {
+ return err
+ }
+ if len(policies) != 0 {
+ return fmt.Errorf("expected no initial policies, but found %s", cache.MetaObjectToName(&policies[0]).String())
+ }
+ }
+ if err := k.nft.Flush(); err != nil {
+ return fmt.Errorf("initial flush failed: %w", err)
+ }
+ return nil
+}
+
+type testRecorder struct {
+ t *testing.T
+}
+
+func (t *testRecorder) Event(object kruntime.Object, eventtype, reason, message string) {
+ if eventtype == corev1.EventTypeWarning {
+ t.t.Errorf("Warning event: object %v, %s: %s", object, reason, message)
+ } else {
+ t.t.Logf("%s event: object %v, %s: %s", eventtype, object, reason, message)
+ }
+}
+
+func (t *testRecorder) Eventf(object kruntime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
+ t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
+}
+
+func (t *testRecorder) AnnotatedEventf(object kruntime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
+ t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
+}
+
+const maxNamespaceName = "ns-3456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
+const maxPodName = "pod-456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
+const maxPolicyName = "policy-789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
+
+func setPolicyName(name string) generator.Setter {
+ return func(policy *generator.Netpol) {
+ policy.Name = name
+ }
+}
+
+func extraTestCases() []*generator.TestCase {
+ return []*generator.TestCase{
+ {
+ Description: "Update namespace so that additional peer selector matches",
+ Tags: generator.NewStringSet(generator.TagSetNamespaceLabels),
+ Steps: []*generator.TestStep{
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.CreatePolicy(generator.BuildPolicy(generator.SetPeers(true, []networkingv1.NetworkPolicyPeer{
+ {NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"ns": "y"}}},
+ {NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"new-ns": "qrs"}}},
+ })).NetworkPolicy())),
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.SetNamespaceLabels("y", map[string]string{"ns": "y", "new-ns": "qrs"})),
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.SetNamespaceLabels("y", map[string]string{"ns": "y"})),
+ },
+ },
+ {
+ Description: "Delete one of multiple policies",
+ Tags: generator.NewStringSet(generator.TagDeletePolicy),
+ Steps: []*generator.TestStep{
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy1")).NetworkPolicy()),
+ generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy2"), generator.SetPorts(true, nil)).NetworkPolicy()),
+ generator.DeletePolicy("x", "policy1"),
+ ),
+ },
+ },
+ {
+ Description: "Create namespace, pod and policy with maximum name length",
+ Tags: generator.NewStringSet(generator.TagCreateNamespace),
+ Steps: []*generator.TestStep{
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.CreateNamespace(maxNamespaceName, nil),
+ generator.CreatePod(maxNamespaceName, maxPodName, nil),
+ generator.CreatePolicy(generator.BuildPolicy(
+ generator.SetNamespace(maxNamespaceName),
+ setPolicyName(maxPolicyName),
+ generator.SetPodSelector(metav1.LabelSelector{})).NetworkPolicy())),
+ generator.NewTestStep(generator.ProbeAllAvailable,
+ generator.DeleteNamespace(maxNamespaceName)),
+ },
+ },
+ }
+}
+
+type initTestCase struct {
+ description string
+ init func(*connectivity.TestCaseState, *nftctrl.Controller) error
+}
+
+func initializationTestCases() []initTestCase {
+ return []initTestCase{
+ {
+ description: "Delete rule during initialization",
+ // It is possible that we already receive updates/deletes during
+ // initialization, if one of the resource types has not yet finished
+ // loading. This means that we need to be able to handle such updates
+ // before the first Flush.
+ init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error {
+ namespaces, err := t.Kubernetes.GetAllNamespaces()
+ if err != nil {
+ return err
+ }
+ for _, kubeNamespace := range namespaces.Items {
+ nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
+ pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name)
+ if err != nil {
+ return err
+ }
+ for _, kubePod := range pods {
+ nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
+ }
+ }
+
+ policy1 := generator.BuildPolicy().NetworkPolicy()
+ policy1.Name = "policy1"
+ nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), policy1)
+
+ policy2 := generator.BuildPolicy(generator.SetPorts(true, nil)).NetworkPolicy()
+ policy2.Name = "policy2"
+ if err := t.CreatePolicy(policy2); err != nil {
+ return fmt.Errorf("failed to create policy: %w", err)
+ }
+ nft.SetNetworkPolicy(cache.MetaObjectToName(policy2), policy2)
+
+ // Delete policy 1.
+ nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), nil)
+
+ return nil
+ },
+ },
+ {
+ description: "Initialize namespaces last",
+ init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error {
+ policy := generator.BuildPolicy().NetworkPolicy()
+ policy.Spec.Ingress[0].From[0].NamespaceSelector = nil
+ if err := t.CreatePolicy(policy); err != nil {
+ return fmt.Errorf("failed to create policy: %w", err)
+ }
+
+ namespaces, err := t.Kubernetes.GetAllNamespaces()
+ if err != nil {
+ return err
+ }
+ for _, kubeNamespace := range namespaces.Items {
+ pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name)
+ if err != nil {
+ return err
+ }
+ for _, kubePod := range pods {
+ nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
+ }
+ policies, err := t.Kubernetes.GetNetworkPoliciesInNamespace(kubeNamespace.Name)
+ if err != nil {
+ return err
+ }
+ for _, policy := range policies {
+ nft.SetNetworkPolicy(cache.MetaObjectToName(&policy), &policy)
+ }
+ }
+ for _, kubeNamespace := range namespaces.Items {
+ nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
+ }
+ return nil
+ },
+ },
+ }
+}
+
+func TestCyclonus(t *testing.T) {
+ if os.Getenv("IN_KTEST") != "true" {
+ t.Skip("Not in ktest")
+ }
+
+ if err := os.WriteFile("/proc/sys/net/ipv4/ip_forward", []byte("1\n"), 0644); err != nil {
+ t.Fatalf("Failed to enable IPv4 forwarding: %v", err)
+ }
+ if err := os.WriteFile("/proc/sys/net/ipv6/conf/all/forwarding", []byte("1\n"), 0644); err != nil {
+ t.Fatalf("Failed to enable IPv6 forwarding: %v", err)
+ }
+ // By default, linux rate limits ICMP replies, which slows down our tests.
+ if err := os.WriteFile("/proc/sys/net/ipv4/icmp_ratemask", []byte("0\n"), 0644); err != nil {
+ t.Fatalf("Failed to disable ICMPv4 rate limiting: %v", err)
+ }
+ if err := os.WriteFile("/proc/sys/net/ipv6/icmp/ratemask", []byte("\n"), 0644); err != nil {
+ t.Fatalf("Failed to disable ICMPv6 rate limiting: %v", err)
+ }
+ // Disable IPv6 duplicate address detection, which delays IPv6 connectivity
+ // becoming available.
+ if err := os.WriteFile("/proc/sys/net/ipv6/conf/default/accept_dad", []byte("0\n"), 0644); err != nil {
+ t.Fatalf("Failed to disable IPv6 DAD: %v", err)
+ }
+
+ kubernetes := &fakeKubernetes{
+ MockKubernetes: kube.NewMockKubernetes(1.0),
+ fakePods: make(map[string]*fakePod),
+ nextPodIP: 2,
+ }
+
+ allowDNS := false // This creates policies which allow port 53, we don't need this.
+ serverNamespaces := []string{"x", "y", "z"}
+ serverPods := []string{"a", "b", "c"}
+ externalIPs := []string{}
+ podCreationTimeoutSeconds := 1
+ batchJobs := false
+ imageRegistry := "registry.k8s.io"
+
+ resources, err := probe.NewDefaultResources(kubernetes, serverNamespaces, serverPods, serverPorts, serverProtocols, externalIPs, podCreationTimeoutSeconds, batchJobs, imageRegistry)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ interpreterConfig := &connectivity.InterpreterConfig{
+ ResetClusterBeforeTestCase: false,
+ KubeProbeRetries: 1,
+ PerturbationWaitSeconds: 0,
+ VerifyClusterStateBeforeTestCase: true,
+ BatchJobs: batchJobs,
+ IgnoreLoopback: ignoreLoopback,
+ JobTimeoutSeconds: 1,
+ FailFast: false,
+ }
+ interpreter := connectivity.NewInterpreter(kubernetes, resources, interpreterConfig)
+ printer := &connectivity.Printer{
+ Noisy: false,
+ IgnoreLoopback: ignoreLoopback,
+ JunitResultsFile: "",
+ }
+
+ zcPod, err := resources.GetPod("z", "c")
+ if err != nil {
+ t.Fatal(err)
+ }
+ testCaseGenerator := generator.NewTestCaseGenerator(allowDNS, zcPod.IP, serverNamespaces, nil, nil)
+ testCases := slices.Concat(testCaseGenerator.GenerateTestCases(), extraTestCases())
+
+ for _, testCase := range testCases {
+ for _, step := range testCase.Steps {
+ step.Probe.Mode = generator.ProbeModeServiceName
+ }
+ t.Run(testCase.Description, func(t *testing.T) {
+ recorder := &testRecorder{t: t}
+ nft, err := nftctrl.New(recorder, podIfaceGroup)
+ if err != nil {
+ t.Errorf("Failed to create nftctrl: %v", err)
+ return
+ }
+ defer nft.Close()
+ kubernetes.nft = nft
+
+ if err := kubernetes.initializeNft(); err != nil {
+ t.Errorf("nftctrl initialization failed: %v", err)
+ return
+ }
+
+ result := interpreter.ExecuteTestCase(testCase)
+ if result.Err != nil {
+ t.Error(result.Err)
+ return
+ }
+ if !result.Passed(ignoreLoopback) {
+ printer.PrintTestCaseResult(result)
+
+ cmd := exec.Command("nft", "list", "ruleset")
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ cmd.Run()
+
+ t.Error("connectivity test failed")
+ }
+
+ kubernetes.nft = nil
+ testCaseState := &connectivity.TestCaseState{
+ Kubernetes: kubernetes,
+ Resources: resources,
+ }
+ err = testCaseState.ResetClusterState()
+ if err != nil {
+ t.Errorf("failed to reset cluster: %v", err)
+ }
+
+ // Flush the conntrack table. Otherwise, UDP connectivity tests can
+ // spuriously succeed when they should be blocked, because they match an
+ // entry in the conntrack table from a previous test.
+ err = netlink.ConntrackTableFlush(netlink.ConntrackTable)
+ if err != nil {
+ t.Errorf("failed to flush conntrack table: %v", err)
+ }
+ })
+ }
+
+ initTestCases := initializationTestCases()
+ for _, testCase := range initTestCases {
+ t.Run(testCase.description, func(t *testing.T) {
+ recorder := &testRecorder{t: t}
+ nft, err := nftctrl.New(recorder, podIfaceGroup)
+ if err != nil {
+ t.Errorf("Failed to create nftctrl: %v", err)
+ return
+ }
+ defer nft.Close()
+
+ testCaseState := &connectivity.TestCaseState{
+ Kubernetes: kubernetes,
+ Resources: resources,
+ }
+
+ if err := testCase.init(testCaseState, nft); err != nil {
+ t.Errorf("initialization failed: %v", err)
+ return
+ }
+
+ if err := nft.Flush(); err != nil {
+ t.Errorf("flush failed: %v", err)
+ return
+ }
+
+ parsedPolicy := matcher.BuildNetworkPolicies(true, testCaseState.Policies)
+ jobBuilder := &probe.JobBuilder{TimeoutSeconds: 1}
+ simRunner := probe.NewSimulatedRunner(parsedPolicy, jobBuilder)
+ probeConfig := generator.NewAllAvailable(generator.ProbeModeServiceName)
+ stepResult := connectivity.NewStepResult(
+ simRunner.RunProbeForConfig(probeConfig, testCaseState.Resources),
+ parsedPolicy,
+ slices.Clone(testCaseState.Policies))
+ kubeRunner := probe.NewKubeRunner(kubernetes, 15, jobBuilder)
+ for i := 0; i <= interpreterConfig.KubeProbeRetries; i++ {
+ stepResult.AddKubeProbe(kubeRunner.RunProbeForConfig(probeConfig, testCaseState.Resources))
+ if stepResult.Passed(ignoreLoopback) {
+ break
+ }
+ }
+
+ if !stepResult.Passed(ignoreLoopback) {
+ printer.PrintStep(0, &generator.TestStep{Probe: probeConfig}, stepResult)
+
+ cmd := exec.Command("nft", "list", "ruleset")
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ cmd.Run()
+
+ t.Error("connectivity test failed")
+ }
+
+ err = testCaseState.ResetClusterState()
+ if err != nil {
+ t.Errorf("failed to reset cluster: %v", err)
+ }
+
+ // Flush the conntrack table. Otherwise, UDP connectivity tests can
+ // spuriously succeed when they should be blocked, because they match an
+ // entry in the conntrack table from a previous test.
+ err = netlink.ConntrackTableFlush(netlink.ConntrackTable)
+ if err != nil {
+ t.Errorf("failed to flush conntrack table: %v", err)
+ }
+ })
+ }
+}