| Jan Schär | 17ad63f | 2025-02-27 14:43:56 +0100 | [diff] [blame] | 1 | // Copyright The Monogon Project Authors. |
| 2 | // SPDX-License-Identifier: Apache-2.0 |
| 3 | |
| 4 | // This package tests the network policy controller, using the Cyclonus network |
| 5 | // policy conformance test suite. It uses the real policy controller and |
| 6 | // nftables controller, but fake Kubernetes API and pods, which allows the tests |
| 7 | // to run much faster than would be possible with the real API server and pods. |
| 8 | // |
| 9 | // By default, the test runs in a ktest, which means we are testing with the |
| 10 | // same kernel version, and thus nftables implementation, which is used in |
| 11 | // Monogon OS. But you can also run tests manually in a new user and network |
| 12 | // namespace, which allows you to use all the debugging tools available on your |
| 13 | // machine. Useful commands: |
| 14 | // |
| 15 | // bazel build //metropolis/node/kubernetes/networkpolicy:networkpolicy_test |
| 16 | // Build the test. |
| 17 | // unshare --map-user=0 --net |
| 18 | // Create a user and network namespace. |
| 19 | // tcpdump -i any & |
| 20 | // Capture traffic in the background. |
| 21 | // IN_KTEST=true bazel-bin/metropolis/node/kubernetes/networkpolicy/networkpolicy_test_/networkpolicy_test |
| 22 | // Run the test. |
| 23 | package networkpolicy_test |
| 24 | |
| 25 | import ( |
| 26 | "crypto/sha256" |
| 27 | "errors" |
| 28 | "fmt" |
| 29 | "net" |
| 30 | "os" |
| 31 | "os/exec" |
| 32 | "runtime" |
| 33 | "slices" |
| 34 | "strconv" |
| 35 | "strings" |
| 36 | "syscall" |
| 37 | "testing" |
| 38 | "time" |
| 39 | |
| 40 | "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl" |
| 41 | "github.com/mattfenwick/cyclonus/pkg/connectivity" |
| 42 | "github.com/mattfenwick/cyclonus/pkg/connectivity/probe" |
| 43 | "github.com/mattfenwick/cyclonus/pkg/generator" |
| 44 | "github.com/mattfenwick/cyclonus/pkg/kube" |
| 45 | "github.com/mattfenwick/cyclonus/pkg/matcher" |
| 46 | "github.com/vishvananda/netlink" |
| 47 | "golang.org/x/sys/unix" |
| 48 | corev1 "k8s.io/api/core/v1" |
| 49 | networkingv1 "k8s.io/api/networking/v1" |
| 50 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 51 | kruntime "k8s.io/apimachinery/pkg/runtime" |
| 52 | "k8s.io/client-go/tools/cache" |
| 53 | ) |
| 54 | |
| 55 | // Change this to 1 to test IPv6 instead of IPv4. Tests pass with IPv6, but run |
| 56 | // much slower, apparently because some ICMPv6 packets are dropped and need to |
| 57 | // be retransmitted. |
| 58 | // |
| 59 | // TODO: Fix slow IPv6 tests and run them automatically, e.g. as a separate |
| 60 | // ktest with a flag to select IPv6. |
| 61 | const testIPIndex = 0 |
| 62 | |
| 63 | const podIfaceGroup uint32 = 8 |
| 64 | |
| 65 | // Loopback traffic is only affected by network policies when connecting through |
| 66 | // a ClusterIP service. For now, we don't simulate service IPs, so this is |
| 67 | // disabled. |
| 68 | const ignoreLoopback = true |
| 69 | |
| 70 | var serverPorts = []int{80, 81} |
| 71 | var serverProtocols = []corev1.Protocol{corev1.ProtocolTCP, corev1.ProtocolUDP} |
| 72 | |
| 73 | var gatewayIPv4 = net.ParseIP("10.8.0.1").To4() |
| 74 | var gatewayIPv6 = net.ParseIP("fd08::1") |
| 75 | |
| 76 | func netListen(port int, protocol corev1.Protocol, payload string, stop <-chan struct{}) error { |
| 77 | switch protocol { |
| 78 | case corev1.ProtocolTCP: |
| 79 | listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: port}) |
| 80 | if err != nil { |
| 81 | return err |
| 82 | } |
| 83 | go func() { |
| 84 | <-stop |
| 85 | listener.Close() |
| 86 | }() |
| 87 | for { |
| 88 | conn, err := listener.Accept() |
| 89 | if err != nil { |
| 90 | return err |
| 91 | } |
| 92 | conn.Write([]byte(payload)) |
| 93 | conn.Close() |
| 94 | } |
| 95 | case corev1.ProtocolUDP: |
| 96 | conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: port}) |
| 97 | if err != nil { |
| 98 | return err |
| 99 | } |
| 100 | go func() { |
| 101 | <-stop |
| 102 | conn.Close() |
| 103 | }() |
| 104 | var buf [16]byte |
| 105 | for { |
| 106 | _, clientAddr, err := conn.ReadFrom(buf[:]) |
| 107 | if err != nil { |
| 108 | return err |
| 109 | } |
| 110 | conn.WriteTo([]byte(payload), clientAddr) |
| 111 | } |
| 112 | default: |
| 113 | return fmt.Errorf("unsupported protocol: %q", protocol) |
| 114 | } |
| 115 | } |
| 116 | |
| 117 | func netConnect(ip net.IP, port int, protocol corev1.Protocol, expectPayload string) (error, error) { |
| 118 | addr := (&net.TCPAddr{IP: ip, Port: port}).String() |
| 119 | payload := make([]byte, len(expectPayload)) |
| 120 | switch protocol { |
| 121 | case corev1.ProtocolTCP: |
| 122 | conn, err := net.DialTimeout("tcp", addr, 2*time.Second) |
| 123 | if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) { |
| 124 | return err, nil |
| 125 | } else if err != nil { |
| 126 | return nil, err |
| 127 | } |
| 128 | defer conn.Close() |
| 129 | _, err = conn.Read(payload) |
| 130 | if err != nil { |
| 131 | return nil, err |
| 132 | } |
| 133 | case corev1.ProtocolUDP: |
| 134 | conn, err := net.Dial("udp", addr) |
| 135 | if err != nil { |
| 136 | return nil, err |
| 137 | } |
| 138 | defer conn.Close() |
| 139 | err = conn.SetDeadline(time.Now().Add(2 * time.Second)) |
| 140 | if err != nil { |
| 141 | return nil, err |
| 142 | } |
| 143 | _, err = conn.Write([]byte("hello")) |
| 144 | if err != nil { |
| 145 | return nil, err |
| 146 | } |
| 147 | _, err = conn.Read(payload) |
| 148 | if errors.Is(err, unix.EHOSTUNREACH) || errors.Is(err, unix.EACCES) { |
| 149 | return err, nil |
| 150 | } else if err != nil { |
| 151 | return nil, err |
| 152 | } |
| 153 | default: |
| 154 | return nil, fmt.Errorf("unsupported protocol: %q", protocol) |
| 155 | } |
| 156 | if string(payload) != expectPayload { |
| 157 | return nil, fmt.Errorf("wrong payload, expected %q, got %q", expectPayload, payload) |
| 158 | } |
| 159 | return nil, nil |
| 160 | } |
| 161 | |
| 162 | type fakePod struct { |
| 163 | name string |
| 164 | ips []podIP |
| 165 | netns *os.File |
| 166 | rawNetns syscall.RawConn |
| 167 | stopCh chan struct{} |
| 168 | } |
| 169 | |
| 170 | type podIP struct { |
| 171 | ip net.IP |
| 172 | gatewayIP net.IP |
| 173 | zeroIP net.IP |
| 174 | fullMask net.IPMask |
| 175 | zeroMask net.IPMask |
| 176 | } |
| 177 | |
| 178 | func createFakePod(namespace, name string, ips []net.IP) (*fakePod, error) { |
| 179 | p := &fakePod{ |
| 180 | name: fmt.Sprintf("%s/%s", namespace, name), |
| 181 | ips: make([]podIP, len(ips)), |
| 182 | stopCh: make(chan struct{}), |
| 183 | } |
| 184 | for i, ip := range ips { |
| 185 | p := &p.ips[i] |
| 186 | switch { |
| 187 | case ip.To4() != nil: |
| 188 | p.ip = ip.To4() |
| 189 | p.gatewayIP = gatewayIPv4 |
| 190 | p.zeroIP = net.ParseIP("0.0.0.0").To4() |
| 191 | p.fullMask = net.CIDRMask(32, 32) |
| 192 | p.zeroMask = net.CIDRMask(0, 32) |
| 193 | case ip.To16() != nil: |
| 194 | p.ip = ip.To16() |
| 195 | p.gatewayIP = gatewayIPv6 |
| 196 | p.zeroIP = net.ParseIP("::") |
| 197 | p.fullMask = net.CIDRMask(128, 128) |
| 198 | p.zeroMask = net.CIDRMask(0, 128) |
| 199 | default: |
| 200 | return nil, fmt.Errorf("invalid IP: %v", ip) |
| 201 | } |
| 202 | } |
| 203 | |
| 204 | // Create new network namespace. |
| 205 | runLockedThread(func() { |
| 206 | err := unix.Unshare(unix.CLONE_NEWNET) |
| 207 | if err != nil { |
| 208 | panic(err) |
| 209 | } |
| 210 | // Obtain an FD of the new net namespace. |
| 211 | p.netns, err = os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid())) |
| 212 | if err != nil { |
| 213 | panic(err) |
| 214 | } |
| 215 | }) |
| 216 | |
| 217 | var err error |
| 218 | p.rawNetns, err = p.netns.SyscallConn() |
| 219 | if err != nil { |
| 220 | panic(err) |
| 221 | } |
| 222 | |
| 223 | // Create veth pair. |
| 224 | linkAttrs := netlink.NewLinkAttrs() |
| 225 | linkAttrs.Name = fmt.Sprintf("veth_%s_%s", namespace, name) |
| 226 | if len(linkAttrs.Name) > 15 { |
| 227 | hash := sha256.Sum256([]byte(linkAttrs.Name)) |
| 228 | linkAttrs.Name = fmt.Sprintf("veth_%x", hash[:5]) |
| 229 | } |
| 230 | linkAttrs.Group = podIfaceGroup |
| 231 | linkAttrs.Flags = net.FlagUp |
| 232 | p.rawNetns.Control(func(fd uintptr) { |
| 233 | err = netlink.LinkAdd(&netlink.Veth{LinkAttrs: linkAttrs, PeerName: "veth", PeerNamespace: netlink.NsFd(fd)}) |
| 234 | }) |
| 235 | if err != nil { |
| 236 | return nil, err |
| 237 | } |
| 238 | |
| 239 | veth, err := netlink.LinkByName(linkAttrs.Name) |
| 240 | if err != nil { |
| 241 | return nil, err |
| 242 | } |
| 243 | |
| 244 | for _, ip := range p.ips { |
| 245 | // Add gateway address to link. |
| 246 | addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD} |
| 247 | err = netlink.AddrAdd(veth, addr) |
| 248 | if err != nil { |
| 249 | return nil, err |
| 250 | } |
| 251 | |
| 252 | // Add route. |
| 253 | err = netlink.RouteAdd(&netlink.Route{ |
| 254 | LinkIndex: veth.Attrs().Index, |
| 255 | Scope: netlink.SCOPE_HOST, |
| 256 | Dst: &net.IPNet{IP: ip.ip, Mask: ip.fullMask}, |
| 257 | }) |
| 258 | if err != nil { |
| 259 | return nil, err |
| 260 | } |
| 261 | } |
| 262 | |
| 263 | p.runInNetns(func() { |
| 264 | // Enable loopback traffic in the pod. |
| 265 | loopback, err := netlink.LinkByName("lo") |
| 266 | if err != nil { |
| 267 | panic(err) |
| 268 | } |
| 269 | err = netlink.LinkSetUp(loopback) |
| 270 | if err != nil { |
| 271 | panic(err) |
| 272 | } |
| 273 | |
| 274 | // Set up the veth in the pod namespace. |
| 275 | veth, err := netlink.LinkByName("veth") |
| 276 | if err != nil { |
| 277 | panic(err) |
| 278 | } |
| 279 | err = netlink.LinkSetUp(veth) |
| 280 | if err != nil { |
| 281 | panic(err) |
| 282 | } |
| 283 | |
| 284 | for _, ip := range p.ips { |
| 285 | // IFA_F_NODAD disables duplicate address detection for IPv6. |
| 286 | addr := &netlink.Addr{IPNet: &net.IPNet{IP: ip.ip, Mask: ip.fullMask}, Flags: unix.IFA_F_NODAD} |
| 287 | err = netlink.AddrAdd(veth, addr) |
| 288 | if err != nil { |
| 289 | panic(err) |
| 290 | } |
| 291 | |
| 292 | err = netlink.RouteAdd(&netlink.Route{ |
| 293 | LinkIndex: veth.Attrs().Index, |
| 294 | Scope: netlink.SCOPE_LINK, |
| 295 | Dst: &net.IPNet{IP: ip.gatewayIP, Mask: ip.fullMask}, |
| 296 | Src: ip.ip, |
| 297 | }) |
| 298 | if err != nil { |
| 299 | panic(err) |
| 300 | } |
| 301 | err = netlink.RouteAdd(&netlink.Route{ |
| 302 | LinkIndex: veth.Attrs().Index, |
| 303 | Scope: netlink.SCOPE_UNIVERSE, |
| 304 | Dst: &net.IPNet{IP: ip.zeroIP, Mask: ip.zeroMask}, |
| 305 | Gw: ip.gatewayIP, |
| 306 | }) |
| 307 | if err != nil { |
| 308 | panic(err) |
| 309 | } |
| 310 | } |
| 311 | }) |
| 312 | |
| 313 | for _, protocol := range serverProtocols { |
| 314 | for _, port := range serverPorts { |
| 315 | go p.runInNetns(func() { |
| 316 | err := netListen(port, protocol, p.name, p.stopCh) |
| 317 | select { |
| 318 | case <-p.stopCh: |
| 319 | default: |
| 320 | panic(err) |
| 321 | } |
| 322 | }) |
| 323 | } |
| 324 | } |
| 325 | |
| 326 | return p, nil |
| 327 | } |
| 328 | |
| 329 | func (p *fakePod) stop() { |
| 330 | close(p.stopCh) |
| 331 | p.netns.Close() |
| 332 | } |
| 333 | |
| 334 | func (p *fakePod) runInNetns(f func()) { |
| 335 | runLockedThread(func() { |
| 336 | p.rawNetns.Control(func(fd uintptr) { |
| 337 | err := unix.Setns(int(fd), unix.CLONE_NEWNET) |
| 338 | if err != nil { |
| 339 | panic(err) |
| 340 | } |
| 341 | }) |
| 342 | f() |
| 343 | }) |
| 344 | } |
| 345 | |
| 346 | func (p *fakePod) connect(target *fakePod, port int, protocol corev1.Protocol) (error, error) { |
| 347 | var errConnectivity, err error |
| 348 | p.runInNetns(func() { |
| 349 | errConnectivity, err = netConnect(target.ips[testIPIndex].ip, port, protocol, target.name) |
| 350 | }) |
| 351 | return errConnectivity, err |
| 352 | } |
| 353 | |
| 354 | // runLockedThread runs f locked on an OS thread, which allows f to switch to a |
| 355 | // different network namespace. After f returns, the previous network namespace |
| 356 | // is restored, and if that succeeds, the thread is unlocked, allowing the |
| 357 | // thread to be reused for other goroutines. |
| 358 | func runLockedThread(f func()) { |
| 359 | runtime.LockOSThread() |
| 360 | oldNs, err := os.Open(fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), unix.Gettid())) |
| 361 | if err != nil { |
| 362 | panic(err) |
| 363 | } |
| 364 | defer oldNs.Close() |
| 365 | f() |
| 366 | err = unix.Setns(int(oldNs.Fd()), unix.CLONE_NEWNET) |
| 367 | if err == nil { |
| 368 | runtime.UnlockOSThread() |
| 369 | } |
| 370 | } |
| 371 | |
| 372 | type fakeKubernetes struct { |
| 373 | *kube.MockKubernetes |
| 374 | nft *nftctrl.Controller |
| 375 | fakePods map[string]*fakePod |
| 376 | nextPodIP uint64 |
| 377 | } |
| 378 | |
| 379 | func (k *fakeKubernetes) CreateNamespace(kubeNamespace *corev1.Namespace) (*corev1.Namespace, error) { |
| 380 | kubeNamespace, err := k.MockKubernetes.CreateNamespace(kubeNamespace) |
| 381 | if err == nil && k.nft != nil { |
| 382 | k.nft.SetNamespace(kubeNamespace.Name, kubeNamespace) |
| 383 | if err := k.nft.Flush(); err != nil { |
| 384 | return nil, err |
| 385 | } |
| 386 | } |
| 387 | return kubeNamespace, err |
| 388 | } |
| 389 | |
| 390 | func (k *fakeKubernetes) SetNamespaceLabels(namespace string, labels map[string]string) (*corev1.Namespace, error) { |
| 391 | kubeNamespace, err := k.MockKubernetes.SetNamespaceLabels(namespace, labels) |
| 392 | if err == nil && k.nft != nil { |
| 393 | k.nft.SetNamespace(namespace, kubeNamespace) |
| 394 | if err := k.nft.Flush(); err != nil { |
| 395 | return nil, err |
| 396 | } |
| 397 | } |
| 398 | return kubeNamespace, err |
| 399 | } |
| 400 | |
| 401 | func (k *fakeKubernetes) DeleteNamespace(namespace string) error { |
| 402 | err := k.MockKubernetes.DeleteNamespace(namespace) |
| 403 | if err == nil && k.nft != nil { |
| 404 | k.nft.SetNamespace(namespace, nil) |
| 405 | if err := k.nft.Flush(); err != nil { |
| 406 | return err |
| 407 | } |
| 408 | } |
| 409 | return err |
| 410 | } |
| 411 | |
| 412 | func (k *fakeKubernetes) CreateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) { |
| 413 | kubePolicy, err := k.MockKubernetes.CreateNetworkPolicy(kubePolicy) |
| 414 | if err == nil && k.nft != nil { |
| 415 | k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy) |
| 416 | if err := k.nft.Flush(); err != nil { |
| 417 | return nil, err |
| 418 | } |
| 419 | } |
| 420 | return kubePolicy, err |
| 421 | } |
| 422 | |
| 423 | func (k *fakeKubernetes) UpdateNetworkPolicy(kubePolicy *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) { |
| 424 | kubePolicy, err := k.MockKubernetes.UpdateNetworkPolicy(kubePolicy) |
| 425 | if err == nil && k.nft != nil { |
| 426 | k.nft.SetNetworkPolicy(cache.MetaObjectToName(kubePolicy), kubePolicy) |
| 427 | if err := k.nft.Flush(); err != nil { |
| 428 | return nil, err |
| 429 | } |
| 430 | } |
| 431 | return kubePolicy, err |
| 432 | } |
| 433 | |
| 434 | func (k *fakeKubernetes) DeleteNetworkPolicy(namespace string, name string) error { |
| 435 | err := k.MockKubernetes.DeleteNetworkPolicy(namespace, name) |
| 436 | if err == nil && k.nft != nil { |
| 437 | k.nft.SetNetworkPolicy(cache.NewObjectName(namespace, name), nil) |
| 438 | if err := k.nft.Flush(); err != nil { |
| 439 | return err |
| 440 | } |
| 441 | } |
| 442 | return err |
| 443 | } |
| 444 | |
| 445 | func (k *fakeKubernetes) DeleteAllNetworkPoliciesInNamespace(namespace string) error { |
| 446 | policies, err := k.GetNetworkPoliciesInNamespace(namespace) |
| 447 | if err == nil && k.nft != nil { |
| 448 | for _, kubePolicy := range policies { |
| 449 | k.nft.SetNetworkPolicy(cache.MetaObjectToName(&kubePolicy), nil) |
| 450 | if err := k.nft.Flush(); err != nil { |
| 451 | return err |
| 452 | } |
| 453 | } |
| 454 | } |
| 455 | return k.MockKubernetes.DeleteAllNetworkPoliciesInNamespace(namespace) |
| 456 | } |
| 457 | |
| 458 | func (k *fakeKubernetes) CreatePod(kubePod *corev1.Pod) (*corev1.Pod, error) { |
| 459 | kubePod, err := k.MockKubernetes.CreatePod(kubePod) |
| 460 | objectName := cache.MetaObjectToName(kubePod) |
| 461 | if err == nil { |
| 462 | ipSuffix := []byte{uint8(k.nextPodIP >> 8), uint8(k.nextPodIP)} |
| 463 | podIPv4 := slices.Concat(gatewayIPv4[:2], ipSuffix) |
| 464 | podIPv6 := slices.Concat(gatewayIPv6[:14], ipSuffix) |
| 465 | k.nextPodIP++ |
| 466 | kubePod.Status.PodIPs = []corev1.PodIP{{IP: podIPv4.String()}, {IP: podIPv6.String()}} |
| 467 | kubePod.Status.PodIP = kubePod.Status.PodIPs[testIPIndex].IP |
| 468 | ips := []net.IP{podIPv4, podIPv6} |
| 469 | fakePod, err := createFakePod(kubePod.Namespace, kubePod.Name, ips) |
| 470 | if err != nil { |
| 471 | return nil, fmt.Errorf("failed to create fake pod: %w", err) |
| 472 | } |
| 473 | k.fakePods[objectName.String()] = fakePod |
| 474 | } |
| 475 | if err == nil && k.nft != nil { |
| 476 | k.nft.SetPod(objectName, kubePod) |
| 477 | if err := k.nft.Flush(); err != nil { |
| 478 | return nil, err |
| 479 | } |
| 480 | } |
| 481 | return kubePod, err |
| 482 | } |
| 483 | |
| 484 | func (k *fakeKubernetes) DeletePod(namespace string, pod string) error { |
| 485 | err := k.MockKubernetes.DeletePod(namespace, pod) |
| 486 | objectName := cache.NewObjectName(namespace, pod) |
| 487 | if err == nil { |
| 488 | k.fakePods[objectName.String()].stop() |
| 489 | delete(k.fakePods, objectName.String()) |
| 490 | } |
| 491 | if err == nil && k.nft != nil { |
| 492 | k.nft.SetPod(objectName, nil) |
| 493 | if err := k.nft.Flush(); err != nil { |
| 494 | return err |
| 495 | } |
| 496 | } |
| 497 | return err |
| 498 | } |
| 499 | |
| 500 | func (k *fakeKubernetes) SetPodLabels(namespace string, pod string, labels map[string]string) (*corev1.Pod, error) { |
| 501 | kubePod, err := k.MockKubernetes.SetPodLabels(namespace, pod, labels) |
| 502 | if err == nil && k.nft != nil { |
| 503 | k.nft.SetPod(cache.MetaObjectToName(kubePod), kubePod) |
| 504 | if err := k.nft.Flush(); err != nil { |
| 505 | return nil, err |
| 506 | } |
| 507 | } |
| 508 | return kubePod, err |
| 509 | } |
| 510 | |
| 511 | var protocolMap = map[string]corev1.Protocol{ |
| 512 | "tcp": corev1.ProtocolTCP, |
| 513 | "udp": corev1.ProtocolUDP, |
| 514 | "sctp": corev1.ProtocolSCTP, |
| 515 | } |
| 516 | |
| 517 | func (k *fakeKubernetes) ExecuteRemoteCommand(namespace string, pod string, container string, command []string) (string, string, error, error) { |
| 518 | // command is expected to have this format: |
| 519 | // /agnhost connect s-x-a.x.svc.cluster.local:80 --timeout=1s --protocol=tcp |
| 520 | if len(command) != 5 { |
| 521 | return "", "", nil, fmt.Errorf("unexpected command length: %v", command) |
| 522 | } |
| 523 | targetService, targetPortStr, ok := strings.Cut(command[2], ".svc.cluster.local:") |
| 524 | if !ok { |
| 525 | return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2]) |
| 526 | } |
| 527 | targetService, targetNamespace, ok := strings.Cut(targetService, ".") |
| 528 | if !ok { |
| 529 | return "", "", nil, fmt.Errorf("failed to parse target: %q", command[2]) |
| 530 | } |
| 531 | targetPod := strings.TrimPrefix(targetService, fmt.Sprintf("s-%s-", targetNamespace)) |
| 532 | protocol := strings.TrimPrefix(command[4], "--protocol=") |
| 533 | |
| 534 | sourceFakePod := k.fakePods[cache.NewObjectName(namespace, pod).String()] |
| 535 | targetFakePod := k.fakePods[cache.NewObjectName(targetNamespace, targetPod).String()] |
| 536 | |
| 537 | targetPort, err := strconv.Atoi(targetPortStr) |
| 538 | if err != nil { |
| 539 | return "", "", nil, err |
| 540 | } |
| 541 | kubeProtocol, ok := protocolMap[protocol] |
| 542 | if !ok { |
| 543 | return "", "", nil, fmt.Errorf("invalid protocol: %q", protocol) |
| 544 | } |
| 545 | |
| 546 | connectErr, err := sourceFakePod.connect(targetFakePod, targetPort, kubeProtocol) |
| 547 | return "", "", connectErr, err |
| 548 | } |
| 549 | |
| 550 | func (k *fakeKubernetes) initializeNft() error { |
| 551 | namespaces, err := k.GetAllNamespaces() |
| 552 | if err != nil { |
| 553 | return err |
| 554 | } |
| 555 | for _, kubeNamespace := range namespaces.Items { |
| 556 | k.nft.SetNamespace(kubeNamespace.Name, &kubeNamespace) |
| 557 | pods, err := k.GetPodsInNamespace(kubeNamespace.Name) |
| 558 | if err != nil { |
| 559 | return err |
| 560 | } |
| 561 | for _, kubePod := range pods { |
| 562 | k.nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod) |
| 563 | } |
| 564 | policies, err := k.GetNetworkPoliciesInNamespace(kubeNamespace.Name) |
| 565 | if err != nil { |
| 566 | return err |
| 567 | } |
| 568 | if len(policies) != 0 { |
| 569 | return fmt.Errorf("expected no initial policies, but found %s", cache.MetaObjectToName(&policies[0]).String()) |
| 570 | } |
| 571 | } |
| 572 | if err := k.nft.Flush(); err != nil { |
| 573 | return fmt.Errorf("initial flush failed: %w", err) |
| 574 | } |
| 575 | return nil |
| 576 | } |
| 577 | |
| 578 | type testRecorder struct { |
| 579 | t *testing.T |
| 580 | } |
| 581 | |
| 582 | func (t *testRecorder) Event(object kruntime.Object, eventtype, reason, message string) { |
| 583 | if eventtype == corev1.EventTypeWarning { |
| 584 | t.t.Errorf("Warning event: object %v, %s: %s", object, reason, message) |
| 585 | } else { |
| 586 | t.t.Logf("%s event: object %v, %s: %s", eventtype, object, reason, message) |
| 587 | } |
| 588 | } |
| 589 | |
| 590 | func (t *testRecorder) Eventf(object kruntime.Object, eventtype, reason, messageFmt string, args ...interface{}) { |
| 591 | t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...)) |
| 592 | } |
| 593 | |
| 594 | func (t *testRecorder) AnnotatedEventf(object kruntime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { |
| 595 | t.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...)) |
| 596 | } |
| 597 | |
| 598 | const maxNamespaceName = "ns-3456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012" |
| 599 | const maxPodName = "pod-456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012" |
| 600 | const maxPolicyName = "policy-789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012" |
| 601 | |
| 602 | func setPolicyName(name string) generator.Setter { |
| 603 | return func(policy *generator.Netpol) { |
| 604 | policy.Name = name |
| 605 | } |
| 606 | } |
| 607 | |
| 608 | func extraTestCases() []*generator.TestCase { |
| 609 | return []*generator.TestCase{ |
| 610 | { |
| 611 | Description: "Update namespace so that additional peer selector matches", |
| 612 | Tags: generator.NewStringSet(generator.TagSetNamespaceLabels), |
| 613 | Steps: []*generator.TestStep{ |
| 614 | generator.NewTestStep(generator.ProbeAllAvailable, |
| 615 | generator.CreatePolicy(generator.BuildPolicy(generator.SetPeers(true, []networkingv1.NetworkPolicyPeer{ |
| 616 | {NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"ns": "y"}}}, |
| 617 | {NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"new-ns": "qrs"}}}, |
| 618 | })).NetworkPolicy())), |
| 619 | generator.NewTestStep(generator.ProbeAllAvailable, |
| 620 | generator.SetNamespaceLabels("y", map[string]string{"ns": "y", "new-ns": "qrs"})), |
| 621 | generator.NewTestStep(generator.ProbeAllAvailable, |
| 622 | generator.SetNamespaceLabels("y", map[string]string{"ns": "y"})), |
| 623 | }, |
| 624 | }, |
| 625 | { |
| 626 | Description: "Delete one of multiple policies", |
| 627 | Tags: generator.NewStringSet(generator.TagDeletePolicy), |
| 628 | Steps: []*generator.TestStep{ |
| 629 | generator.NewTestStep(generator.ProbeAllAvailable, |
| 630 | generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy1")).NetworkPolicy()), |
| 631 | generator.CreatePolicy(generator.BuildPolicy(setPolicyName("policy2"), generator.SetPorts(true, nil)).NetworkPolicy()), |
| 632 | generator.DeletePolicy("x", "policy1"), |
| 633 | ), |
| 634 | }, |
| 635 | }, |
| 636 | { |
| 637 | Description: "Create namespace, pod and policy with maximum name length", |
| 638 | Tags: generator.NewStringSet(generator.TagCreateNamespace), |
| 639 | Steps: []*generator.TestStep{ |
| 640 | generator.NewTestStep(generator.ProbeAllAvailable, |
| 641 | generator.CreateNamespace(maxNamespaceName, nil), |
| 642 | generator.CreatePod(maxNamespaceName, maxPodName, nil), |
| 643 | generator.CreatePolicy(generator.BuildPolicy( |
| 644 | generator.SetNamespace(maxNamespaceName), |
| 645 | setPolicyName(maxPolicyName), |
| 646 | generator.SetPodSelector(metav1.LabelSelector{})).NetworkPolicy())), |
| 647 | generator.NewTestStep(generator.ProbeAllAvailable, |
| 648 | generator.DeleteNamespace(maxNamespaceName)), |
| 649 | }, |
| 650 | }, |
| 651 | } |
| 652 | } |
| 653 | |
| 654 | type initTestCase struct { |
| 655 | description string |
| 656 | init func(*connectivity.TestCaseState, *nftctrl.Controller) error |
| 657 | } |
| 658 | |
| 659 | func initializationTestCases() []initTestCase { |
| 660 | return []initTestCase{ |
| 661 | { |
| 662 | description: "Delete rule during initialization", |
| 663 | // It is possible that we already receive updates/deletes during |
| 664 | // initialization, if one of the resource types has not yet finished |
| 665 | // loading. This means that we need to be able to handle such updates |
| 666 | // before the first Flush. |
| 667 | init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error { |
| 668 | namespaces, err := t.Kubernetes.GetAllNamespaces() |
| 669 | if err != nil { |
| 670 | return err |
| 671 | } |
| 672 | for _, kubeNamespace := range namespaces.Items { |
| 673 | nft.SetNamespace(kubeNamespace.Name, &kubeNamespace) |
| 674 | pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name) |
| 675 | if err != nil { |
| 676 | return err |
| 677 | } |
| 678 | for _, kubePod := range pods { |
| 679 | nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod) |
| 680 | } |
| 681 | } |
| 682 | |
| 683 | policy1 := generator.BuildPolicy().NetworkPolicy() |
| 684 | policy1.Name = "policy1" |
| 685 | nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), policy1) |
| 686 | |
| 687 | policy2 := generator.BuildPolicy(generator.SetPorts(true, nil)).NetworkPolicy() |
| 688 | policy2.Name = "policy2" |
| 689 | if err := t.CreatePolicy(policy2); err != nil { |
| 690 | return fmt.Errorf("failed to create policy: %w", err) |
| 691 | } |
| 692 | nft.SetNetworkPolicy(cache.MetaObjectToName(policy2), policy2) |
| 693 | |
| 694 | // Delete policy 1. |
| 695 | nft.SetNetworkPolicy(cache.MetaObjectToName(policy1), nil) |
| 696 | |
| 697 | return nil |
| 698 | }, |
| 699 | }, |
| 700 | { |
| 701 | description: "Initialize namespaces last", |
| 702 | init: func(t *connectivity.TestCaseState, nft *nftctrl.Controller) error { |
| 703 | policy := generator.BuildPolicy().NetworkPolicy() |
| 704 | policy.Spec.Ingress[0].From[0].NamespaceSelector = nil |
| 705 | if err := t.CreatePolicy(policy); err != nil { |
| 706 | return fmt.Errorf("failed to create policy: %w", err) |
| 707 | } |
| 708 | |
| 709 | namespaces, err := t.Kubernetes.GetAllNamespaces() |
| 710 | if err != nil { |
| 711 | return err |
| 712 | } |
| 713 | for _, kubeNamespace := range namespaces.Items { |
| 714 | pods, err := t.Kubernetes.GetPodsInNamespace(kubeNamespace.Name) |
| 715 | if err != nil { |
| 716 | return err |
| 717 | } |
| 718 | for _, kubePod := range pods { |
| 719 | nft.SetPod(cache.MetaObjectToName(&kubePod), &kubePod) |
| 720 | } |
| 721 | policies, err := t.Kubernetes.GetNetworkPoliciesInNamespace(kubeNamespace.Name) |
| 722 | if err != nil { |
| 723 | return err |
| 724 | } |
| 725 | for _, policy := range policies { |
| 726 | nft.SetNetworkPolicy(cache.MetaObjectToName(&policy), &policy) |
| 727 | } |
| 728 | } |
| 729 | for _, kubeNamespace := range namespaces.Items { |
| 730 | nft.SetNamespace(kubeNamespace.Name, &kubeNamespace) |
| 731 | } |
| 732 | return nil |
| 733 | }, |
| 734 | }, |
| 735 | } |
| 736 | } |
| 737 | |
| 738 | func TestCyclonus(t *testing.T) { |
| 739 | if os.Getenv("IN_KTEST") != "true" { |
| 740 | t.Skip("Not in ktest") |
| 741 | } |
| 742 | |
| 743 | if err := os.WriteFile("/proc/sys/net/ipv4/ip_forward", []byte("1\n"), 0644); err != nil { |
| 744 | t.Fatalf("Failed to enable IPv4 forwarding: %v", err) |
| 745 | } |
| 746 | if err := os.WriteFile("/proc/sys/net/ipv6/conf/all/forwarding", []byte("1\n"), 0644); err != nil { |
| 747 | t.Fatalf("Failed to enable IPv6 forwarding: %v", err) |
| 748 | } |
| 749 | // By default, linux rate limits ICMP replies, which slows down our tests. |
| 750 | if err := os.WriteFile("/proc/sys/net/ipv4/icmp_ratemask", []byte("0\n"), 0644); err != nil { |
| 751 | t.Fatalf("Failed to disable ICMPv4 rate limiting: %v", err) |
| 752 | } |
| 753 | if err := os.WriteFile("/proc/sys/net/ipv6/icmp/ratemask", []byte("\n"), 0644); err != nil { |
| 754 | t.Fatalf("Failed to disable ICMPv6 rate limiting: %v", err) |
| 755 | } |
| 756 | // Disable IPv6 duplicate address detection, which delays IPv6 connectivity |
| 757 | // becoming available. |
| 758 | if err := os.WriteFile("/proc/sys/net/ipv6/conf/default/accept_dad", []byte("0\n"), 0644); err != nil { |
| 759 | t.Fatalf("Failed to disable IPv6 DAD: %v", err) |
| 760 | } |
| 761 | |
| 762 | kubernetes := &fakeKubernetes{ |
| 763 | MockKubernetes: kube.NewMockKubernetes(1.0), |
| 764 | fakePods: make(map[string]*fakePod), |
| 765 | nextPodIP: 2, |
| 766 | } |
| 767 | |
| 768 | allowDNS := false // This creates policies which allow port 53, we don't need this. |
| 769 | serverNamespaces := []string{"x", "y", "z"} |
| 770 | serverPods := []string{"a", "b", "c"} |
| 771 | externalIPs := []string{} |
| 772 | podCreationTimeoutSeconds := 1 |
| 773 | batchJobs := false |
| 774 | imageRegistry := "registry.k8s.io" |
| 775 | |
| 776 | resources, err := probe.NewDefaultResources(kubernetes, serverNamespaces, serverPods, serverPorts, serverProtocols, externalIPs, podCreationTimeoutSeconds, batchJobs, imageRegistry) |
| 777 | if err != nil { |
| 778 | t.Fatal(err) |
| 779 | } |
| 780 | |
| 781 | interpreterConfig := &connectivity.InterpreterConfig{ |
| 782 | ResetClusterBeforeTestCase: false, |
| 783 | KubeProbeRetries: 1, |
| 784 | PerturbationWaitSeconds: 0, |
| 785 | VerifyClusterStateBeforeTestCase: true, |
| 786 | BatchJobs: batchJobs, |
| 787 | IgnoreLoopback: ignoreLoopback, |
| 788 | JobTimeoutSeconds: 1, |
| 789 | FailFast: false, |
| 790 | } |
| 791 | interpreter := connectivity.NewInterpreter(kubernetes, resources, interpreterConfig) |
| 792 | printer := &connectivity.Printer{ |
| 793 | Noisy: false, |
| 794 | IgnoreLoopback: ignoreLoopback, |
| 795 | JunitResultsFile: "", |
| 796 | } |
| 797 | |
| 798 | zcPod, err := resources.GetPod("z", "c") |
| 799 | if err != nil { |
| 800 | t.Fatal(err) |
| 801 | } |
| 802 | testCaseGenerator := generator.NewTestCaseGenerator(allowDNS, zcPod.IP, serverNamespaces, nil, nil) |
| 803 | testCases := slices.Concat(testCaseGenerator.GenerateTestCases(), extraTestCases()) |
| 804 | |
| 805 | for _, testCase := range testCases { |
| 806 | for _, step := range testCase.Steps { |
| 807 | step.Probe.Mode = generator.ProbeModeServiceName |
| 808 | } |
| 809 | t.Run(testCase.Description, func(t *testing.T) { |
| 810 | recorder := &testRecorder{t: t} |
| 811 | nft, err := nftctrl.New(recorder, podIfaceGroup) |
| 812 | if err != nil { |
| 813 | t.Errorf("Failed to create nftctrl: %v", err) |
| 814 | return |
| 815 | } |
| 816 | defer nft.Close() |
| 817 | kubernetes.nft = nft |
| 818 | |
| 819 | if err := kubernetes.initializeNft(); err != nil { |
| 820 | t.Errorf("nftctrl initialization failed: %v", err) |
| 821 | return |
| 822 | } |
| 823 | |
| 824 | result := interpreter.ExecuteTestCase(testCase) |
| 825 | if result.Err != nil { |
| 826 | t.Error(result.Err) |
| 827 | return |
| 828 | } |
| 829 | if !result.Passed(ignoreLoopback) { |
| 830 | printer.PrintTestCaseResult(result) |
| 831 | |
| 832 | cmd := exec.Command("nft", "list", "ruleset") |
| 833 | cmd.Stdout = os.Stdout |
| 834 | cmd.Stderr = os.Stderr |
| 835 | cmd.Run() |
| 836 | |
| 837 | t.Error("connectivity test failed") |
| 838 | } |
| 839 | |
| 840 | kubernetes.nft = nil |
| 841 | testCaseState := &connectivity.TestCaseState{ |
| 842 | Kubernetes: kubernetes, |
| 843 | Resources: resources, |
| 844 | } |
| 845 | err = testCaseState.ResetClusterState() |
| 846 | if err != nil { |
| 847 | t.Errorf("failed to reset cluster: %v", err) |
| 848 | } |
| 849 | |
| 850 | // Flush the conntrack table. Otherwise, UDP connectivity tests can |
| 851 | // spuriously succeed when they should be blocked, because they match an |
| 852 | // entry in the conntrack table from a previous test. |
| 853 | err = netlink.ConntrackTableFlush(netlink.ConntrackTable) |
| 854 | if err != nil { |
| 855 | t.Errorf("failed to flush conntrack table: %v", err) |
| 856 | } |
| 857 | }) |
| 858 | } |
| 859 | |
| 860 | initTestCases := initializationTestCases() |
| 861 | for _, testCase := range initTestCases { |
| 862 | t.Run(testCase.description, func(t *testing.T) { |
| 863 | recorder := &testRecorder{t: t} |
| 864 | nft, err := nftctrl.New(recorder, podIfaceGroup) |
| 865 | if err != nil { |
| 866 | t.Errorf("Failed to create nftctrl: %v", err) |
| 867 | return |
| 868 | } |
| 869 | defer nft.Close() |
| 870 | |
| 871 | testCaseState := &connectivity.TestCaseState{ |
| 872 | Kubernetes: kubernetes, |
| 873 | Resources: resources, |
| 874 | } |
| 875 | |
| 876 | if err := testCase.init(testCaseState, nft); err != nil { |
| 877 | t.Errorf("initialization failed: %v", err) |
| 878 | return |
| 879 | } |
| 880 | |
| 881 | if err := nft.Flush(); err != nil { |
| 882 | t.Errorf("flush failed: %v", err) |
| 883 | return |
| 884 | } |
| 885 | |
| 886 | parsedPolicy := matcher.BuildNetworkPolicies(true, testCaseState.Policies) |
| 887 | jobBuilder := &probe.JobBuilder{TimeoutSeconds: 1} |
| 888 | simRunner := probe.NewSimulatedRunner(parsedPolicy, jobBuilder) |
| 889 | probeConfig := generator.NewAllAvailable(generator.ProbeModeServiceName) |
| 890 | stepResult := connectivity.NewStepResult( |
| 891 | simRunner.RunProbeForConfig(probeConfig, testCaseState.Resources), |
| 892 | parsedPolicy, |
| 893 | slices.Clone(testCaseState.Policies)) |
| 894 | kubeRunner := probe.NewKubeRunner(kubernetes, 15, jobBuilder) |
| 895 | for i := 0; i <= interpreterConfig.KubeProbeRetries; i++ { |
| 896 | stepResult.AddKubeProbe(kubeRunner.RunProbeForConfig(probeConfig, testCaseState.Resources)) |
| 897 | if stepResult.Passed(ignoreLoopback) { |
| 898 | break |
| 899 | } |
| 900 | } |
| 901 | |
| 902 | if !stepResult.Passed(ignoreLoopback) { |
| 903 | printer.PrintStep(0, &generator.TestStep{Probe: probeConfig}, stepResult) |
| 904 | |
| 905 | cmd := exec.Command("nft", "list", "ruleset") |
| 906 | cmd.Stdout = os.Stdout |
| 907 | cmd.Stderr = os.Stderr |
| 908 | cmd.Run() |
| 909 | |
| 910 | t.Error("connectivity test failed") |
| 911 | } |
| 912 | |
| 913 | err = testCaseState.ResetClusterState() |
| 914 | if err != nil { |
| 915 | t.Errorf("failed to reset cluster: %v", err) |
| 916 | } |
| 917 | |
| 918 | // Flush the conntrack table. Otherwise, UDP connectivity tests can |
| 919 | // spuriously succeed when they should be blocked, because they match an |
| 920 | // entry in the conntrack table from a previous test. |
| 921 | err = netlink.ConntrackTableFlush(netlink.ConntrackTable) |
| 922 | if err != nil { |
| 923 | t.Errorf("failed to flush conntrack table: %v", err) |
| 924 | } |
| 925 | }) |
| 926 | } |
| 927 | } |