blob: f32dc37a4371782bffe4d1a2b85e571433aab67e [file] [log] [blame]
// Copyright The Monogon Project Authors.
// SPDX-License-Identifier: Apache-2.0
// This package tests the network policy controller, using the Cyclonus network
// policy conformance test suite. It uses the real policy controller and
// nftables controller, but fake Kubernetes API and pods, which allows the tests
// to run much faster than would be possible with the real API server and pods.
//
// By default, the test runs in a ktest, which means we are testing with the
// same kernel version, and thus nftables implementation, which is used in
// Monogon OS. But you can also run tests manually in a new user and network
// namespace, which allows you to use all the debugging tools available on your
// machine. Useful commands:
//
// bazel build //metropolis/node/kubernetes/networkpolicy:networkpolicy_test
// Build the test.
// unshare --map-user=0 --net
// Create a user and network namespace.
// tcpdump -i any &
// Capture traffic in the background.
// IN_KTEST=true bazel-bin/metropolis/node/kubernetes/networkpolicy/networkpolicy_test_/networkpolicy_test
// Run the test.
package networkpolicy_test
import (
"crypto/sha256"
"errors"
"fmt"
"net"
"os"
"os/exec"
"runtime"
"slices"
"strconv"
"strings"
"syscall"
"testing"
"time"
"git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl"
"github.com/mattfenwick/cyclonus/pkg/connectivity"
"github.com/mattfenwick/cyclonus/pkg/connectivity/probe"
"github.com/mattfenwick/cyclonus/pkg/generator"
"github.com/mattfenwick/cyclonus/pkg/kube"
"github.com/mattfenwick/cyclonus/pkg/matcher"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
// Change this to 1 to test IPv6 instead of IPv4. Tests pass with IPv6, but run
// much slower, apparently because some ICMPv6 packets are dropped and need to
// be retransmitted.
//
// TODO: Fix slow IPv6 tests and run them automatically, e.g. as a separate
// ktest with a flag to select IPv6.
const testIPIndex = 0
const podIfaceGroup uint32 = 8
// Loopback traffic is only affected by network policies when connecting through
// a ClusterIP service. For now, we don't simulate service IPs, so this is
// disabled.
const ignoreLoopback = true
var serverPorts = []int{80, 81}
var serverProtocols = []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP}
var gatewayIPv4 = net.ParseIP("10.8.0.1").To4()
var gatewayIPv6 = net.ParseIP("fd08::1")
func netListen(port int, protocol corev1.Protocol, payload string, stop <-chan struct{}) error {
switch protocol {
case corev1.ProtocolTCP:
listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: port})
if err != nil {
return err
}
go func() {
<-stop
listener.Close()
}()
for {
conn, err := listener.Accept()
if err != nil {
return err
}
conn.Write([]byte(payload))
conn.Close()
}
case corev1.ProtocolUDP:
conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: port})
if err != nil {
return err
}
go func() {
<-stop
conn.Close()
}()
var buf [16]byte
for {
_, clientAddr, err := conn.ReadFrom(buf[:])
if err != nil {
return err
}
conn.WriteTo([]byte(payload), clientAddr)
}
default:
return fmt.Errorf("unsupported protocol: %q", protocol)
}
}
func netConnect(ip net.IP, port int, protocol corev1.Protocol, expectPayload string) (error, error) {
addr := (&net.TCPAddr{IP: ip, Port: port}).String()
payload := make([]byte, len(expectPayload))
switch protocol {
case corev1.ProtocolTCP:
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) {
return err, nil
} else if err != nil {
return nil, err
}
defer conn.Close()
_, err = conn.Read(payload)
if err != nil {
return nil, err
}
case corev1.ProtocolUDP:
conn, err := net.Dial("udp", addr)
if err != nil {
return nil, err
}
defer conn.Close()
err = conn.SetDeadline(time.Now().Add(2 * time.Second))
if err != nil {
return nil, err
}
_, err = conn.Write([]byte("hello"))
if err != nil {
return nil, err
}
_, err = conn.Read(payload)
if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) {
return err, nil
} else if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported protocol: %q", protocol)
}
if string(payload) != expectPayload {
return nil, fmt.Errorf("wrong payload, expected %q, got %q", expectPayload, payload)
}
return nil, nil
}
type fakePod struct {
name string
ips []podIP
netns *os.File
rawNetns syscall.RawConn
stopCh chan struct{}
}
type podIP struct {
ip net.IP
gatewayIP net.IP
zeroIP net.IP
fullMask net.IPMask
zeroMask net.IPMask
}
func createFakePod(namespace, name string, ips []net.IP) (*fakePod, error) {
p := &fakePod{
name: fmt.Sprintf("%s/%s", namespace, name),
ips: make([]podIP, len(ips)),
stopCh: make(chan struct{}),
}
for i, ip := range ips {
p := &p.ips[i]
switch {
case ip.To4() != nil:
p.ip = ip.To4()
p.gatewayIP = gatewayIPv4
p.zeroIP = net.ParseIP("0.0.0.0").To4()
p.fullMask = net.CIDRMask(32, 32)
p.zeroMask = net.CIDRMask(0, 32)
case ip.To16() != nil:
p.ip = ip.To16()
p.gatewayIP = gatewayIPv6
p.zeroIP = net.ParseIP("::")
p.fullMask = net.CIDRMask(128, 128)
p.zeroMask = net.CIDRMask(0, 128)
default:
return nil, fmt.Errorf("invalid IP: %v", ip)
}
}
// Create new network namespace.
runLockedThread(func() {
err := unix.Unshare(unix.CLONE_NEWNET)
if err != nil {
panic(err)
}
// Obtain an FD of the new net namespace.
p.netns, err = os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid()))
if err != nil {
panic(err)
}
})
var err error
p.rawNetns, err = p.netns.SyscallConn()
if err != nil {
panic(err)
}
// Create veth pair.
linkAttrs := netlink.NewLinkAttrs()
linkAttrs.Name = fmt.Sprintf("veth_%s_%s", namespace, name)
if len(linkAttrs.Name) > 15 {
hash := sha256.Sum256([]byte(linkAttrs.Name))
linkAttrs.Name = fmt.Sprintf("veth_%x", hash[:5])
}
linkAttrs.Group = podIfaceGroup
linkAttrs.Flags = net.FlagUp
p.rawNetns.Control(func(fd uintptr) {
err = netlink.LinkAdd(&netlink.Veth{LinkAttrs: linkAttrs, PeerName: "veth", PeerNamespace: netlink.NsFd(fd)})
})
if err != nil {
return nil, err
}
veth, err := netlink.LinkByName(linkAttrs.Name)
if err != nil {
return nil, err
}
for _, ip := range p.ips {
// Add gateway address to link.
addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD}
err = netlink.AddrAdd(veth, addr)
if err != nil {
return nil, err
}
// Add route.
err = netlink.RouteAdd(&netlink.Route{
LinkIndex: veth.Attrs().Index,
Scope: netlink.SCOPE_HOST,
Dst: &net.IPNet{IP: ip.ip, Mask: ip.fullMask},
})
if err != nil {
return nil, err
}
}
p.runInNetns(func() {
// Enable loopback traffic in the pod.
loopback, err := netlink.LinkByName("lo")
if err != nil {
panic(err)
}
err = netlink.LinkSetUp(loopback)
if err != nil {
panic(err)
}
// Set up the veth in the pod namespace.
veth, err := netlink.LinkByName("veth")
if err != nil {
panic(err)
}
err = netlink.LinkSetUp(veth)
if err != nil {
panic(err)
}
for _, ip := range p.ips {
// IFA_F_NODAD disables duplicate address detection for IPv6.
addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.ip, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD}
err = netlink.AddrAdd(veth, addr)
if err != nil {
panic(err)
}
err = netlink.RouteAdd(&netlink.Route{
LinkIndex: veth.Attrs().Index,
Scope: netlink.SCOPE_LINK,
Dst: &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask},
Src: ip.ip,
})
if err != nil {
panic(err)
}
err = netlink.RouteAdd(&netlink.Route{
LinkIndex: veth.Attrs().Index,
Scope: netlink.SCOPE_UNIVERSE,
Dst: &net.IPNet{IP: ip.zeroIP, Mask: ip.zeroMask},
Gw: ip.gatewayIP,
})
if err != nil {
panic(err)
}
}
})
for _, protocol := range serverProtocols {
for _, port := range serverPorts {
go p.runInNetns(func() {
err := netListen(port, protocol, p.name, p.stopCh)
select {
case <-p.stopCh:
default:
panic(err)
}
})
}
}
return p, nil
}
func (p *fakePod) stop() {
close(p.stopCh)
p.netns.Close()
}
func (p *fakePod) runInNetns(f func()) {
runLockedThread(func() {
p.rawNetns.Control(func(fd uintptr) {
err := unix.Setns(int(fd), unix.CLONE_NEWNET)
if err != nil {
panic(err)
}
})
f()
})
}
func (p *fakePod) connect(target *fakePod, port int, protocol corev1.Protocol) (error, error) {
var errConnectivity, err error
p.runInNetns(func() {
errConnectivity, err = netConnect(target.ips[testIPIndex].ip, port, protocol, target.name)
})
return errConnectivity, err
}
// runLockedThread runs f locked on an OS thread, which allows f to switch to a
// different network namespace. After f returns, the previous network namespace
// is restored, and if that succeeds, the thread is unlocked, allowing the
// thread to be reused for other goroutines.
func runLockedThread(f func()) {
runtime.LockOSThread()
oldNs, err := os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid()))
if err != nil {
panic(err)
}
defer oldNs.Close()
f()
err = unix.Setns(int(oldNs.Fd()), unix.CLONE_NEWNET)
if err == nil {
runtime.UnlockOSThread()
}
}
type fakeKubernetes struct {
*kube.MockKubernetes
nft *nftctrl.Controller
fakePods map[string]*fakePod
nextPodIP uint64
}
func (k *fakeKubernetes) CreateNamespace(kubeNamespace *corev1.Namespace) (*corev1.Namespace, error) {
kubeNamespace, err := k.MockKubernetes.CreateNamespace(kubeNamespace)
if err == nil && k.nft != nil {
k.nft.SetNamespace(kubeNamespace.Name, kubeNamespace)
if err := k.nft.Flush(); err != nil {
return nil, err
}
}
return kubeNamespace, err
}
func (k *fakeKubernetes) SetNamespaceLabels(namespace string, labels map[string]string) (*corev1.Namespace, error) {
kubeNamespace, err := k.MockKubernetes.SetNamespaceLabels(namespace, labels)
if err == nil && k.nft != nil {
k.nft.SetNamespace(namespace, kubeNamespace)
if err := k.nft.Flush(); err != nil {
return nil, err
}
}
return kubeNamespace, err
}
func (k *fakeKubernetes) DeleteNamespace(namespace string) error {
err := k.MockKubernetes.DeleteNamespace(namespace)
if err == nil && k.nft != nil {
k.nft.SetNamespace(namespace, nil)
if err := k.nft.Flush(); err != nil {
return err
}
}
return err
}
func (k *fakeKubernetes) CreateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
kubePolicy, err := k.MockKubernetes.CreateNetworkPolicy(kubePolicy)
if err == nil && k.nft != nil {
k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy)
if err := k.nft.Flush(); err != nil {
return nil, err
}
}
return kubePolicy, err
}
func (k *fakeKubernetes) UpdateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
kubePolicy, err := k.MockKubernetes.UpdateNetworkPolicy(kubePolicy)
if err == nil && k.nft != nil {
k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy)
if err := k.nft.Flush(); err != nil {
return nil, err
}
}
return kubePolicy, err
}
func (k *fakeKubernetes) DeleteNetworkPolicy(namespace string, name string) error {
err := k.MockKubernetes.DeleteNetworkPolicy(namespace, name)
if err == nil && k.nft != nil {
k.nft.SetNetworkPolicy(cache.NewObjectName(namespace, name), nil)
if err := k.nft.Flush(); err != nil {
return err
}
}
return err
}
func (k *fakeKubernetes) DeleteAllNetworkPoliciesInNamespace(namespace string) error {
policies, err := k.GetNetworkPoliciesInNamespace(namespace)
if err == nil && k.nft != nil {
for _, kubePolicy := range policies {
k.nft.SetNetworkPolicy(cache.MetaObjectToName(&kubePolicy), nil)
if err := k.nft.Flush(); err != nil {
return err
}
}
}
return k.MockKubernetes.DeleteAllNetworkPoliciesInNamespace(namespace)
}
func (k *fakeKubernetes) CreatePod(kubePod *corev1.Pod) (*corev1.Pod, error) {
kubePod, err := k.MockKubernetes.CreatePod(kubePod)
objectName := cache.MetaObjectToName(kubePod)
if err == nil {
ipSuffix := []byte{uint8(k.nextPodIP >> 8), uint8(k.nextPodIP)}
podIPv4 := slices.Concat(gatewayIPv4[:2], ipSuffix)
podIPv6 := slices.Concat(gatewayIPv6[:14], ipSuffix)
k.nextPodIP++
kubePod.Status.PodIPs = []corev1.PodIP{{IP: podIPv4.String()}, {IP: podIPv6.String()}}
kubePod.Status.PodIP = kubePod.Status.PodIPs[testIPIndex].IP
ips := []net.IP{podIPv4, podIPv6}
fakePod, err := createFakePod(kubePod.Namespace, kubePod.Name, ips)
if err != nil {
return nil, fmt.Errorf("failed to create fake pod: %w", err)
}
k.fakePods[objectName.String()] = fakePod
}
if err == nil && k.nft != nil {
k.nft.SetPod(objectName, kubePod)
if err := k.nft.Flush(); err != nil {
return nil, err
}
}
return kubePod, err
}
func (k *fakeKubernetes) DeletePod(namespace string, pod string) error {
err := k.MockKubernetes.DeletePod(namespace, pod)
objectName := cache.NewObjectName(namespace, pod)
if err == nil {
k.fakePods[objectName.String()].stop()
delete(k.fakePods, objectName.String())
}
if err == nil && k.nft != nil {
k.nft.SetPod(objectName, nil)
if err := k.nft.Flush(); err != nil {
return err
}
}
return err
}
func (k *fakeKubernetes) SetPodLabels(namespace string, pod string, labels map[string]string) (*corev1.Pod, error) {
kubePod, err := k.MockKubernetes.SetPodLabels(namespace, pod, labels)
if err == nil && k.nft != nil {
k.nft.SetPod(cache.MetaObjectToName(kubePod), kubePod)
if err := k.nft.Flush(); err != nil {
return nil, err
}
}
return kubePod, err
}
var protocolMap = map[string]corev1.Protocol{
"tcp": corev1.ProtocolTCP,
"udp": corev1.ProtocolUDP,
"sctp": corev1.ProtocolSCTP,
}
func (k *fakeKubernetes) ExecuteRemoteCommand(namespace string, pod string, container string, command []string) (string, string, error, error) {
// command is expected to have this format:
// /agnhost connect s-x-a.x.svc.cluster.local:80 --timeout=1s --protocol=tcp
if len(command) != 5 {
return "", "", nil, fmt.Errorf("unexpected command length: %v", command)
}
targetService, targetPortStr, ok := strings.Cut(command[2], ".svc.cluster.local:")
if !ok {
return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2])
}
targetService, targetNamespace, ok := strings.Cut(targetService, ".")
if !ok {
return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2])
}
targetPod := strings.TrimPrefix(targetService, fmt.Sprintf("s-%s-", targetNamespace))
protocol := strings.TrimPrefix(command[4], "--protocol=")
sourceFakePod := k.fakePods[cache.NewObjectName(namespace, pod).String()]
targetFakePod := k.fakePods[cache.NewObjectName(targetNamespace, targetPod).String()]
targetPort, err := strconv.Atoi(targetPortStr)
if err != nil {
return "", "", nil, err
}
kubeProtocol, ok := protocolMap[protocol]
if !ok {
return "", "", nil, fmt.Errorf("invalid protocol: %q", protocol)
}
connectErr, err := sourceFakePod.connect(targetFakePod, targetPort, kubeProtocol)
return "", "", connectErr, err
}
func (k *fakeKubernetes) initializeNft() error {
namespaces, err := k.GetAllNamespaces()
if err != nil {
return err
}
for _, kubeNamespace := range namespaces.Items {
k.nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
pods, err := k.GetPodsInNamespace(kubeNamespace.Name)
if err != nil {
return err
}
for _, kubePod := range pods {
k.nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
}
policies, err := k.GetNetworkPoliciesInNamespace(kubeNamespace.Name)
if err != nil {
return err
}
if len(policies) != 0 {
return fmt.Errorf("expected no initial policies, but found %s", cache.MetaObjectToName(&policies[0]).String())
}
}
if err := k.nft.Flush(); err != nil {
return fmt.Errorf("initial flush failed: %w", err)
}
return nil
}
type testRecorder struct {
t *testing.T
}
func (t *testRecorder) Event(object kruntime.Object, eventtype, reason, message string) {
if eventtype == corev1.EventTypeWarning {
t.t.Errorf("Warning event: object %v, %s: %s", object, reason, message)
} else {
t.t.Logf("%s event: object %v, %s: %s", eventtype, object, reason, message)
}
}
func (t *testRecorder) Eventf(object kruntime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
func (t *testRecorder) AnnotatedEventf(object kruntime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
const maxNamespaceName = "ns-3456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
const maxPodName = "pod-456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
const maxPolicyName = "policy-789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012"
func setPolicyName(name string) generator.Setter {
return func(policy *generator.Netpol) {
policy.Name = name
}
}
func extraTestCases() []*generator.TestCase {
return []*generator.TestCase{
{
Description: "Update namespace so that additional peer selector matches",
Tags: generator.NewStringSet(generator.TagSetNamespaceLabels),
Steps: []*generator.TestStep{
generator.NewTestStep(generator.ProbeAllAvailable,
generator.CreatePolicy(generator.BuildPolicy(generator.SetPeers(true, []networkingv1.NetworkPolicyPeer{
{NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"ns": "y"}}},
{NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"new-ns": "qrs"}}},
})).NetworkPolicy())),
generator.NewTestStep(generator.ProbeAllAvailable,
generator.SetNamespaceLabels("y", map[string]string{"ns": "y", "new-ns": "qrs"})),
generator.NewTestStep(generator.ProbeAllAvailable,
generator.SetNamespaceLabels("y", map[string]string{"ns": "y"})),
},
},
{
Description: "Delete one of multiple policies",
Tags: generator.NewStringSet(generator.TagDeletePolicy),
Steps: []*generator.TestStep{
generator.NewTestStep(generator.ProbeAllAvailable,
generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy1")).NetworkPolicy()),
generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy2"), generator.SetPorts(true, nil)).NetworkPolicy()),
generator.DeletePolicy("x", "policy1"),
),
},
},
{
Description: "Create namespace, pod and policy with maximum name length",
Tags: generator.NewStringSet(generator.TagCreateNamespace),
Steps: []*generator.TestStep{
generator.NewTestStep(generator.ProbeAllAvailable,
generator.CreateNamespace(maxNamespaceName, nil),
generator.CreatePod(maxNamespaceName, maxPodName, nil),
generator.CreatePolicy(generator.BuildPolicy(
generator.SetNamespace(maxNamespaceName),
setPolicyName(maxPolicyName),
generator.SetPodSelector(metav1.LabelSelector{})).NetworkPolicy())),
generator.NewTestStep(generator.ProbeAllAvailable,
generator.DeleteNamespace(maxNamespaceName)),
},
},
}
}
type initTestCase struct {
description string
init func(*connectivity.TestCaseState, *nftctrl.Controller) error
}
func initializationTestCases() []initTestCase {
return []initTestCase{
{
description: "Delete rule during initialization",
// It is possible that we already receive updates/deletes during
// initialization, if one of the resource types has not yet finished
// loading. This means that we need to be able to handle such updates
// before the first Flush.
init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error {
namespaces, err := t.Kubernetes.GetAllNamespaces()
if err != nil {
return err
}
for _, kubeNamespace := range namespaces.Items {
nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name)
if err != nil {
return err
}
for _, kubePod := range pods {
nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
}
}
policy1 := generator.BuildPolicy().NetworkPolicy()
policy1.Name = "policy1"
nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), policy1)
policy2 := generator.BuildPolicy(generator.SetPorts(true, nil)).NetworkPolicy()
policy2.Name = "policy2"
if err := t.CreatePolicy(policy2); err != nil {
return fmt.Errorf("failed to create policy: %w", err)
}
nft.SetNetworkPolicy(cache.MetaObjectToName(policy2), policy2)
// Delete policy 1.
nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), nil)
return nil
},
},
{
description: "Initialize namespaces last",
init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error {
policy := generator.BuildPolicy().NetworkPolicy()
policy.Spec.Ingress[0].From[0].NamespaceSelector = nil
if err := t.CreatePolicy(policy); err != nil {
return fmt.Errorf("failed to create policy: %w", err)
}
namespaces, err := t.Kubernetes.GetAllNamespaces()
if err != nil {
return err
}
for _, kubeNamespace := range namespaces.Items {
pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name)
if err != nil {
return err
}
for _, kubePod := range pods {
nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod)
}
policies, err := t.Kubernetes.GetNetworkPoliciesInNamespace(kubeNamespace.Name)
if err != nil {
return err
}
for _, policy := range policies {
nft.SetNetworkPolicy(cache.MetaObjectToName(&policy), &policy)
}
}
for _, kubeNamespace := range namespaces.Items {
nft.SetNamespace(kubeNamespace.Name, &kubeNamespace)
}
return nil
},
},
}
}
func TestCyclonus(t *testing.T) {
if os.Getenv("IN_KTEST") != "true" {
t.Skip("Not in ktest")
}
if err := os.WriteFile("/proc/sys/net/ipv4/ip_forward", []byte("1\n"), 0644); err != nil {
t.Fatalf("Failed to enable IPv4 forwarding: %v", err)
}
if err := os.WriteFile("/proc/sys/net/ipv6/conf/all/forwarding", []byte("1\n"), 0644); err != nil {
t.Fatalf("Failed to enable IPv6 forwarding: %v", err)
}
// By default, linux rate limits ICMP replies, which slows down our tests.
if err := os.WriteFile("/proc/sys/net/ipv4/icmp_ratemask", []byte("0\n"), 0644); err != nil {
t.Fatalf("Failed to disable ICMPv4 rate limiting: %v", err)
}
if err := os.WriteFile("/proc/sys/net/ipv6/icmp/ratemask", []byte("\n"), 0644); err != nil {
t.Fatalf("Failed to disable ICMPv6 rate limiting: %v", err)
}
// Disable IPv6 duplicate address detection, which delays IPv6 connectivity
// becoming available.
if err := os.WriteFile("/proc/sys/net/ipv6/conf/default/accept_dad", []byte("0\n"), 0644); err != nil {
t.Fatalf("Failed to disable IPv6 DAD: %v", err)
}
kubernetes := &fakeKubernetes{
MockKubernetes: kube.NewMockKubernetes(1.0),
fakePods: make(map[string]*fakePod),
nextPodIP: 2,
}
allowDNS := false // This creates policies which allow port 53, we don't need this.
serverNamespaces := []string{"x", "y", "z"}
serverPods := []string{"a", "b", "c"}
externalIPs := []string{}
podCreationTimeoutSeconds := 1
batchJobs := false
imageRegistry := "registry.k8s.io"
resources, err := probe.NewDefaultResources(kubernetes, serverNamespaces, serverPods, serverPorts, serverProtocols, externalIPs, podCreationTimeoutSeconds, batchJobs, imageRegistry)
if err != nil {
t.Fatal(err)
}
interpreterConfig := &connectivity.InterpreterConfig{
ResetClusterBeforeTestCase: false,
KubeProbeRetries: 1,
PerturbationWaitSeconds: 0,
VerifyClusterStateBeforeTestCase: true,
BatchJobs: batchJobs,
IgnoreLoopback: ignoreLoopback,
JobTimeoutSeconds: 1,
FailFast: false,
}
interpreter := connectivity.NewInterpreter(kubernetes, resources, interpreterConfig)
printer := &connectivity.Printer{
Noisy: false,
IgnoreLoopback: ignoreLoopback,
JunitResultsFile: "",
}
zcPod, err := resources.GetPod("z", "c")
if err != nil {
t.Fatal(err)
}
testCaseGenerator := generator.NewTestCaseGenerator(allowDNS, zcPod.IP, serverNamespaces, nil, nil)
testCases := slices.Concat(testCaseGenerator.GenerateTestCases(), extraTestCases())
for _, testCase := range testCases {
for _, step := range testCase.Steps {
step.Probe.Mode = generator.ProbeModeServiceName
}
t.Run(testCase.Description, func(t *testing.T) {
recorder := &testRecorder{t: t}
nft, err := nftctrl.New(recorder, podIfaceGroup)
if err != nil {
t.Fatalf("Failed to create nftctrl: %v", err)
}
defer nft.Close()
kubernetes.nft = nft
if err := kubernetes.initializeNft(); err != nil {
t.Fatalf("nftctrl initialization failed: %v", err)
}
result := interpreter.ExecuteTestCase(testCase)
if result.Err != nil {
t.Fatal(result.Err)
}
if !result.Passed(ignoreLoopback) {
printer.PrintTestCaseResult(result)
cmd := exec.Command("nft", "list", "ruleset")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Run()
t.Error("connectivity test failed")
}
kubernetes.nft = nil
testCaseState := &connectivity.TestCaseState{
Kubernetes: kubernetes,
Resources: resources,
}
err = testCaseState.ResetClusterState()
if err != nil {
t.Errorf("failed to reset cluster: %v", err)
}
// Flush the conntrack table. Otherwise, UDP connectivity tests can
// spuriously succeed when they should be blocked, because they match an
// entry in the conntrack table from a previous test.
err = netlink.ConntrackTableFlush(netlink.ConntrackTable)
if err != nil {
t.Errorf("failed to flush conntrack table: %v", err)
}
})
}
initTestCases := initializationTestCases()
for _, testCase := range initTestCases {
t.Run(testCase.description, func(t *testing.T) {
recorder := &testRecorder{t: t}
nft, err := nftctrl.New(recorder, podIfaceGroup)
if err != nil {
t.Fatalf("Failed to create nftctrl: %v", err)
}
defer nft.Close()
testCaseState := &connectivity.TestCaseState{
Kubernetes: kubernetes,
Resources: resources,
}
if err := testCase.init(testCaseState, nft); err != nil {
t.Fatalf("initialization failed: %v", err)
}
if err := nft.Flush(); err != nil {
t.Fatalf("flush failed: %v", err)
}
parsedPolicy := matcher.BuildNetworkPolicies(true, testCaseState.Policies)
jobBuilder := &probe.JobBuilder{TimeoutSeconds: 1}
simRunner := probe.NewSimulatedRunner(parsedPolicy, jobBuilder)
probeConfig := generator.NewAllAvailable(generator.ProbeModeServiceName)
stepResult := connectivity.NewStepResult(
simRunner.RunProbeForConfig(probeConfig, testCaseState.Resources),
parsedPolicy,
slices.Clone(testCaseState.Policies))
kubeRunner := probe.NewKubeRunner(kubernetes, 15, jobBuilder)
for i := 0; i <= interpreterConfig.KubeProbeRetries; i++ {
stepResult.AddKubeProbe(kubeRunner.RunProbeForConfig(probeConfig, testCaseState.Resources))
if stepResult.Passed(ignoreLoopback) {
break
}
}
if !stepResult.Passed(ignoreLoopback) {
printer.PrintStep(0, &generator.TestStep{Probe: probeConfig}, stepResult)
cmd := exec.Command("nft", "list", "ruleset")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Run()
t.Error("connectivity test failed")
}
err = testCaseState.ResetClusterState()
if err != nil {
t.Errorf("failed to reset cluster: %v", err)
}
// Flush the conntrack table. Otherwise, UDP connectivity tests can
// spuriously succeed when they should be blocked, because they match an
// entry in the conntrack table from a previous test.
err = netlink.ConntrackTableFlush(netlink.ConntrackTable)
if err != nil {
t.Errorf("failed to flush conntrack table: %v", err)
}
})
}
}