m/n/k8s: add nftables network policy controller
This integrates my K8s network policy controller. In its current form it
does not have many guarantees as the custom CNI plugin is not yet in
there but it mostly works. Also there is still a DNS hole as host-local
services are not properly policed yet.
It has a basic smoke test using the connectivity testing helper as well
as some metrics to make sure it is integrated properly and to be able to
monitor its performance.
Change-Id: Ia2f54b9975361270678ce742ae5e32df25e515c5
Reviewed-on: https://review.monogon.dev/c/monogon/+/3740
Tested-by: Jenkins CI
Reviewed-by: Jan Schär <jan@monogon.tech>
diff --git a/build/bazel/go.MODULE.bazel b/build/bazel/go.MODULE.bazel
index 2e5402d..20e7dc4 100644
--- a/build/bazel/go.MODULE.bazel
+++ b/build/bazel/go.MODULE.bazel
@@ -88,6 +88,7 @@
"io_k8s_kubernetes",
"io_k8s_pod_security_admission",
"io_k8s_utils",
+ "org_dolansoft_git_dolansoft_k8s_nft_npc",
"org_go4_netipx",
"org_golang_google_api",
"org_golang_google_genproto_googleapis_api",
diff --git a/go.mod b/go.mod
index 481bc0d..ebfbf6a 100644
--- a/go.mod
+++ b/go.mod
@@ -58,6 +58,7 @@
require (
4d63.com/gocheckcompilerdirectives v1.2.1
cloud.google.com/go/storage v1.38.0
+ git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391
github.com/adrg/xdg v0.4.0
github.com/bazelbuild/rules_go v0.52.0
github.com/cavaliergopher/cpio v1.0.1
@@ -146,7 +147,7 @@
k8s.io/client-go v0.32.0
k8s.io/component-base v0.32.0
k8s.io/klog/v2 v2.130.1
- k8s.io/kubectl v0.0.0
+ k8s.io/kubectl v0.32.0
k8s.io/kubelet v0.32.0
k8s.io/kubernetes v1.32.0
k8s.io/pod-security-admission v0.0.0
@@ -280,6 +281,7 @@
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hodgesds/perf-utils v0.7.0 // indirect
+ github.com/igrmk/treemap/v2 v2.0.1 // indirect
github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/intel/goresctrl v0.8.0 // indirect
diff --git a/go.sum b/go.sum
index c838a8d..8e15075 100644
--- a/go.sum
+++ b/go.sum
@@ -1356,6 +1356,8 @@
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
+git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391 h1:VcDYYx80mOeRWBwBr2Hs1grbz1E1Tmf0yrJEZuF2L6U=
+git.dolansoft.org/dolansoft/k8s-nft-npc v0.0.0-20250205205926-b7f770fa8391/go.mod h1:JVUzK3P8vcS9HGrEDu4Ye+Ll4g3hxJr/DDYkpiuNZik=
git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20210715213245-6c3934b029d8/go.mod h1:CzsSbkDixRphAF5hS6wbMKq0eI6ccJRb7/A0M6JBnwg=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk=
@@ -2535,6 +2537,8 @@
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
+github.com/igrmk/treemap/v2 v2.0.1 h1:Jhy4z3yhATvYZMWCmxsnHO5NnNZBdueSzvxh6353l+0=
+github.com/igrmk/treemap/v2 v2.0.1/go.mod h1:PkTPvx+8OHS8/41jnnyVY+oVsfkaOUZGcr+sfonosd4=
github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 h1:hk4LPqXIY/c9XzRbe7dA6qQxaT6Axcbny0L/G5a4owQ=
github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973/go.mod h1:PoK3ejP3LJkGTzKqRlpvCIFas3ncU02v8zzWDW+g0FY=
github.com/imdario/mergo v0.3.4/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index 636295c..8b8c1c5 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -33,6 +33,7 @@
"//metropolis/node/kubernetes/clusternet",
"//metropolis/node/kubernetes/metricsprovider",
"//metropolis/node/kubernetes/metricsproxy",
+ "//metropolis/node/kubernetes/networkpolicy",
"//metropolis/node/kubernetes/nfproxy",
"//metropolis/node/kubernetes/pki",
"//metropolis/node/kubernetes/plugins/kvmdevice",
diff --git a/metropolis/node/kubernetes/networkpolicy/BUILD.bazel b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
new file mode 100644
index 0000000..d3d3b76
--- /dev/null
+++ b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
@@ -0,0 +1,23 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "networkpolicy",
+ srcs = ["networkpolicy.go"],
+ importpath = "source.monogon.dev/metropolis/node/kubernetes/networkpolicy",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//go/logging",
+ "//metropolis/node",
+ "//osbase/supervisor",
+ "@io_k8s_api//core/v1:core",
+ "@io_k8s_client_go//informers",
+ "@io_k8s_client_go//kubernetes",
+ "@io_k8s_client_go//kubernetes/typed/core/v1:core",
+ "@io_k8s_client_go//tools/cache",
+ "@io_k8s_client_go//tools/cache/synctrack",
+ "@io_k8s_client_go//tools/record",
+ "@io_k8s_client_go//util/workqueue",
+ "@io_k8s_kubectl//pkg/scheme",
+ "@org_dolansoft_git_dolansoft_k8s_nft_npc//nftctrl",
+ ],
+)
diff --git a/metropolis/node/kubernetes/networkpolicy/networkpolicy.go b/metropolis/node/kubernetes/networkpolicy/networkpolicy.go
new file mode 100644
index 0000000..618ff03
--- /dev/null
+++ b/metropolis/node/kubernetes/networkpolicy/networkpolicy.go
@@ -0,0 +1,141 @@
+// Copyright The Monogon Project Authors.
+// SPDX-License-Identifier: Apache-2.0
+
+package networkpolicy
+
+import (
+ "context"
+ "fmt"
+
+ "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl"
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/tools/cache/synctrack"
+ "k8s.io/client-go/tools/record"
+ "k8s.io/client-go/util/workqueue"
+ "k8s.io/kubectl/pkg/scheme"
+
+ "source.monogon.dev/go/logging"
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/osbase/supervisor"
+)
+
+type Service struct {
+ Kubernetes kubernetes.Interface
+}
+
+type workItem struct {
+ typ string
+ name cache.ObjectName
+}
+
+type updateEnqueuer struct {
+ typ string
+ q workqueue.TypedInterface[workItem]
+ hasProcessed *synctrack.AsyncTracker[workItem]
+ l logging.Leveled
+}
+
+func (c *updateEnqueuer) OnAdd(obj interface{}, isInInitialList bool) {
+ name, err := cache.ObjectToName(obj)
+ if err != nil {
+ c.l.Warningf("OnAdd name for type %q cannot be derived: %v", c.typ, err)
+ return
+ }
+ item := workItem{typ: c.typ, name: name}
+ if isInInitialList {
+ c.hasProcessed.Start(item)
+ }
+ c.q.Add(item)
+}
+
+func (c *updateEnqueuer) OnUpdate(oldObj, newObj interface{}) {
+ name, err := cache.ObjectToName(newObj)
+ if err != nil {
+ c.l.Warningf("OnUpdate name for type %q cannot be derived: %v", c.typ, err)
+ return
+ }
+ c.q.Add(workItem{typ: c.typ, name: name})
+}
+
+func (c *updateEnqueuer) OnDelete(obj interface{}) {
+ name, err := cache.DeletionHandlingObjectToName(obj)
+ if err != nil {
+ c.l.Warningf("OnDelete name for type %q cannot be derived: %v", c.typ, err)
+ return
+ }
+ c.q.Add(workItem{typ: c.typ, name: name})
+}
+
+func (c *Service) Run(ctx context.Context) error {
+ eventBroadcaster := record.NewBroadcaster()
+ eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.Kubernetes.CoreV1().Events("")})
+ recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "npc"})
+
+ nft := nftctrl.New(recorder, node.LinkGroupK8sPod)
+ defer nft.Close()
+ l := supervisor.Logger(ctx)
+
+ informerFactory := informers.NewSharedInformerFactory(c.Kubernetes, 0)
+ q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[workItem]{
+ Name: "networkpolicy",
+ })
+
+ var hasProcessed synctrack.AsyncTracker[workItem]
+
+ nsInformer := informerFactory.Core().V1().Namespaces()
+ nsHandler, _ := nsInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "ns", hasProcessed: &hasProcessed, l: l})
+ podInformer := informerFactory.Core().V1().Pods()
+ podHandler, _ := podInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "pod", hasProcessed: &hasProcessed, l: l})
+ nwpInformer := informerFactory.Networking().V1().NetworkPolicies()
+ nwpHandler, _ := nwpInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "nwp", hasProcessed: &hasProcessed, l: l})
+ hasProcessed.UpstreamHasSynced = func() bool {
+ return nsHandler.HasSynced() && podHandler.HasSynced() && nwpHandler.HasSynced()
+ }
+ informerFactory.Start(ctx.Done())
+
+ go func() {
+ <-ctx.Done()
+ q.ShutDown()
+ informerFactory.Shutdown()
+ }()
+
+ hasSynced := false
+ for {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ i, shut := q.Get()
+ if shut {
+ return ctx.Err()
+ }
+ switch i.typ {
+ case "pod":
+ pod, _ := podInformer.Lister().Pods(i.name.Namespace).Get(i.name.Name)
+ nft.SetPod(i.name, pod)
+ case "nwp":
+ nwp, _ := nwpInformer.Lister().NetworkPolicies(i.name.Namespace).Get(i.name.Name)
+ nft.SetNetworkPolicy(i.name, nwp)
+ case "ns":
+ ns, _ := nsInformer.Lister().Get(i.name.Name)
+ nft.SetNamespace(i.name.Name, ns)
+ }
+ hasProcessed.Finished(i)
+ if hasSynced {
+ if err := nft.Flush(); err != nil {
+ return fmt.Errorf("failed to flush after update of %s %v: %w", i.typ, i.name, err)
+ }
+ } else if hasProcessed.HasSynced() {
+ if err := nft.Flush(); err != nil {
+ return fmt.Errorf("initial flush failed: %w", err)
+ }
+ l.Info("Initial sync completed")
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ hasSynced = true
+ }
+ q.Done(i)
+ }
+}
diff --git a/metropolis/node/kubernetes/reconciler/resources_rbac.go b/metropolis/node/kubernetes/reconciler/resources_rbac.go
index 60d1ba4..9ce9942 100644
--- a/metropolis/node/kubernetes/reconciler/resources_rbac.go
+++ b/metropolis/node/kubernetes/reconciler/resources_rbac.go
@@ -94,7 +94,17 @@
},
{
APIGroups: []string{""},
- Resources: []string{"services", "nodes", "namespaces"},
+ Resources: []string{"services", "nodes", "namespaces", "pods"},
+ Verbs: []string{"get", "list", "watch"},
+ },
+ {
+ APIGroups: []string{""},
+ Resources: []string{"events"},
+ Verbs: []string{"get", "list", "watch", "create", "update", "patch"},
+ },
+ {
+ APIGroups: []string{"networking.k8s.io"},
+ Resources: []string{"networkpolicies"},
Verbs: []string{"get", "list", "watch"},
},
},
diff --git a/metropolis/node/kubernetes/service_worker.go b/metropolis/node/kubernetes/service_worker.go
index 6f6633b..8ca8556 100644
--- a/metropolis/node/kubernetes/service_worker.go
+++ b/metropolis/node/kubernetes/service_worker.go
@@ -23,6 +23,7 @@
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/kubernetes/clusternet"
"source.monogon.dev/metropolis/node/kubernetes/metricsprovider"
+ "source.monogon.dev/metropolis/node/kubernetes/networkpolicy"
"source.monogon.dev/metropolis/node/kubernetes/nfproxy"
kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
"source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
@@ -209,6 +210,10 @@
ClientSet: clients["netserv"].client,
}
+ npc := networkpolicy.Service{
+ Kubernetes: clients["netserv"].client,
+ }
+
var dnsIPRanges []netip.Prefix
for _, ipNet := range []net.IPNet{s.c.ServiceIPRange, s.c.ClusterNet} {
ipPrefix, err := netip.ParsePrefix(ipNet.String())
@@ -250,6 +255,7 @@
{"nfproxy", nfproxy.Run},
{"dns-service", dnsService.Run},
{"dns-listener", runDNSListener},
+ {"npc", npc.Run},
{"kvmdeviceplugin", kvmDevicePlugin.Run},
{"kubelet", kubelet.Run},
} {
diff --git a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
index ac449eb..99731a3 100644
--- a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
+++ b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
@@ -43,8 +43,10 @@
"//metropolis/test/util",
"@io_bazel_rules_go//go/runfiles",
"@io_k8s_api//core/v1:core",
+ "@io_k8s_api//networking/v1:networking",
"@io_k8s_apimachinery//pkg/api/errors",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
+ "@io_k8s_apimachinery//pkg/util/intstr",
"@io_k8s_kubernetes//pkg/api/v1/pod",
"@io_k8s_utils//ptr",
"@org_golang_google_protobuf//types/known/fieldmaskpb",
diff --git a/metropolis/test/e2e/suites/kubernetes/run_test.go b/metropolis/test/e2e/suites/kubernetes/run_test.go
index 939c663..ee39c1c 100644
--- a/metropolis/test/e2e/suites/kubernetes/run_test.go
+++ b/metropolis/test/e2e/suites/kubernetes/run_test.go
@@ -22,8 +22,10 @@
"github.com/bazelbuild/rules_go/go/runfiles"
"google.golang.org/protobuf/types/known/fieldmaskpb"
corev1 "k8s.io/api/core/v1"
+ nwkv1 "k8s.io/api/networking/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/intstr"
podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/utils/ptr"
@@ -392,6 +394,51 @@
})
ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess)
})
+ t.Run("Network Policy Smoke Test", func(t *testing.T) {
+ ct := connectivity.SetupTest(t, &connectivity.TestSpec{
+ Name: "npc-smoke",
+ ClientSet: clientSet,
+ RESTConfig: restConfig,
+ NumPods: 2,
+ ExtraPodConfig: func(i int, pod *corev1.Pod) {
+ // Spread pods out over nodes to test inter-node network
+ pod.Labels = make(map[string]string)
+ pod.Labels["name"] = "npc-smoke"
+ pod.Spec.TopologySpreadConstraints = []corev1.TopologySpreadConstraint{{
+ MaxSkew: 1,
+ TopologyKey: "kubernetes.io/hostname",
+ WhenUnsatisfiable: corev1.DoNotSchedule,
+ LabelSelector: metav1.SetAsLabelSelector(pod.Labels),
+ }}
+ },
+ })
+ // Test connectivity before applying network policy
+ ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess)
+ ct.TestPodConnectivity(t, 0, 1, 1235, connectivity.ExpectedSuccess)
+ nwp := &nwkv1.NetworkPolicy{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "npc-smoke",
+ },
+ Spec: nwkv1.NetworkPolicySpec{
+ PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"name": "npc-smoke"}},
+ Ingress: []nwkv1.NetworkPolicyIngressRule{{
+ Ports: []nwkv1.NetworkPolicyPort{{
+ Protocol: ptr.To(corev1.ProtocolTCP),
+ Port: &intstr.IntOrString{Type: intstr.Int, IntVal: 1234},
+ }},
+ From: []nwkv1.NetworkPolicyPeer{{
+ PodSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": "npc-smoke"}},
+ }},
+ }},
+ },
+ }
+ if _, err := clientSet.NetworkingV1().NetworkPolicies("default").Create(context.Background(), nwp, metav1.CreateOptions{}); err != nil {
+ t.Fatal(err)
+ }
+ // Check if policy is in effect
+ ct.TestPodConnectivityEventual(t, 0, 1, 1235, connectivity.ExpectedReject, 30*time.Second)
+ ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess)
+ })
for _, runtimeClass := range []string{"runc", "gvisor"} {
statefulSetName := fmt.Sprintf("test-statefulset-%s", runtimeClass)
util.TestEventual(t, fmt.Sprintf("StatefulSet with %s tests", runtimeClass), ctx, smallTestTimeout, func(ctx context.Context) error {