m/n/k8s: add nftables network policy controller
This integrates my K8s network policy controller. In its current form it
does not have many guarantees as the custom CNI plugin is not yet in
there but it mostly works. Also there is still a DNS hole as host-local
services are not properly policed yet.
It has a basic smoke test using the connectivity testing helper as well
as some metrics to make sure it is integrated properly and to be able to
monitor its performance.
Change-Id: Ia2f54b9975361270678ce742ae5e32df25e515c5
Reviewed-on: https://review.monogon.dev/c/monogon/+/3740
Tested-by: Jenkins CI
Reviewed-by: Jan Schär <jan@monogon.tech>
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index 636295c..8b8c1c5 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -33,6 +33,7 @@
"//metropolis/node/kubernetes/clusternet",
"//metropolis/node/kubernetes/metricsprovider",
"//metropolis/node/kubernetes/metricsproxy",
+ "//metropolis/node/kubernetes/networkpolicy",
"//metropolis/node/kubernetes/nfproxy",
"//metropolis/node/kubernetes/pki",
"//metropolis/node/kubernetes/plugins/kvmdevice",
diff --git a/metropolis/node/kubernetes/networkpolicy/BUILD.bazel b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
new file mode 100644
index 0000000..d3d3b76
--- /dev/null
+++ b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
@@ -0,0 +1,23 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "networkpolicy",
+ srcs = ["networkpolicy.go"],
+ importpath = "source.monogon.dev/metropolis/node/kubernetes/networkpolicy",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//go/logging",
+ "//metropolis/node",
+ "//osbase/supervisor",
+ "@io_k8s_api//core/v1:core",
+ "@io_k8s_client_go//informers",
+ "@io_k8s_client_go//kubernetes",
+ "@io_k8s_client_go//kubernetes/typed/core/v1:core",
+ "@io_k8s_client_go//tools/cache",
+ "@io_k8s_client_go//tools/cache/synctrack",
+ "@io_k8s_client_go//tools/record",
+ "@io_k8s_client_go//util/workqueue",
+ "@io_k8s_kubectl//pkg/scheme",
+ "@org_dolansoft_git_dolansoft_k8s_nft_npc//nftctrl",
+ ],
+)
diff --git a/metropolis/node/kubernetes/networkpolicy/networkpolicy.go b/metropolis/node/kubernetes/networkpolicy/networkpolicy.go
new file mode 100644
index 0000000..618ff03
--- /dev/null
+++ b/metropolis/node/kubernetes/networkpolicy/networkpolicy.go
@@ -0,0 +1,141 @@
+// Copyright The Monogon Project Authors.
+// SPDX-License-Identifier: Apache-2.0
+
+package networkpolicy
+
+import (
+ "context"
+ "fmt"
+
+ "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl"
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/tools/cache/synctrack"
+ "k8s.io/client-go/tools/record"
+ "k8s.io/client-go/util/workqueue"
+ "k8s.io/kubectl/pkg/scheme"
+
+ "source.monogon.dev/go/logging"
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/osbase/supervisor"
+)
+
+type Service struct {
+ Kubernetes kubernetes.Interface
+}
+
+type workItem struct {
+ typ string
+ name cache.ObjectName
+}
+
+type updateEnqueuer struct {
+ typ string
+ q workqueue.TypedInterface[workItem]
+ hasProcessed *synctrack.AsyncTracker[workItem]
+ l logging.Leveled
+}
+
+func (c *updateEnqueuer) OnAdd(obj interface{}, isInInitialList bool) {
+ name, err := cache.ObjectToName(obj)
+ if err != nil {
+ c.l.Warningf("OnAdd name for type %q cannot be derived: %v", c.typ, err)
+ return
+ }
+ item := workItem{typ: c.typ, name: name}
+ if isInInitialList {
+ c.hasProcessed.Start(item)
+ }
+ c.q.Add(item)
+}
+
+func (c *updateEnqueuer) OnUpdate(oldObj, newObj interface{}) {
+ name, err := cache.ObjectToName(newObj)
+ if err != nil {
+ c.l.Warningf("OnUpdate name for type %q cannot be derived: %v", c.typ, err)
+ return
+ }
+ c.q.Add(workItem{typ: c.typ, name: name})
+}
+
+func (c *updateEnqueuer) OnDelete(obj interface{}) {
+ name, err := cache.DeletionHandlingObjectToName(obj)
+ if err != nil {
+ c.l.Warningf("OnDelete name for type %q cannot be derived: %v", c.typ, err)
+ return
+ }
+ c.q.Add(workItem{typ: c.typ, name: name})
+}
+
+func (c *Service) Run(ctx context.Context) error {
+ eventBroadcaster := record.NewBroadcaster()
+ eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.Kubernetes.CoreV1().Events("")})
+ recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "npc"})
+
+ nft := nftctrl.New(recorder, node.LinkGroupK8sPod)
+ defer nft.Close()
+ l := supervisor.Logger(ctx)
+
+ informerFactory := informers.NewSharedInformerFactory(c.Kubernetes, 0)
+ q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[workItem]{
+ Name: "networkpolicy",
+ })
+
+ var hasProcessed synctrack.AsyncTracker[workItem]
+
+ nsInformer := informerFactory.Core().V1().Namespaces()
+ nsHandler, _ := nsInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "ns", hasProcessed: &hasProcessed, l: l})
+ podInformer := informerFactory.Core().V1().Pods()
+ podHandler, _ := podInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "pod", hasProcessed: &hasProcessed, l: l})
+ nwpInformer := informerFactory.Networking().V1().NetworkPolicies()
+ nwpHandler, _ := nwpInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "nwp", hasProcessed: &hasProcessed, l: l})
+ hasProcessed.UpstreamHasSynced = func() bool {
+ return nsHandler.HasSynced() && podHandler.HasSynced() && nwpHandler.HasSynced()
+ }
+ informerFactory.Start(ctx.Done())
+
+ go func() {
+ <-ctx.Done()
+ q.ShutDown()
+ informerFactory.Shutdown()
+ }()
+
+ hasSynced := false
+ for {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ i, shut := q.Get()
+ if shut {
+ return ctx.Err()
+ }
+ switch i.typ {
+ case "pod":
+ pod, _ := podInformer.Lister().Pods(i.name.Namespace).Get(i.name.Name)
+ nft.SetPod(i.name, pod)
+ case "nwp":
+ nwp, _ := nwpInformer.Lister().NetworkPolicies(i.name.Namespace).Get(i.name.Name)
+ nft.SetNetworkPolicy(i.name, nwp)
+ case "ns":
+ ns, _ := nsInformer.Lister().Get(i.name.Name)
+ nft.SetNamespace(i.name.Name, ns)
+ }
+ hasProcessed.Finished(i)
+ if hasSynced {
+ if err := nft.Flush(); err != nil {
+ return fmt.Errorf("failed to flush after update of %s %v: %w", i.typ, i.name, err)
+ }
+ } else if hasProcessed.HasSynced() {
+ if err := nft.Flush(); err != nil {
+ return fmt.Errorf("initial flush failed: %w", err)
+ }
+ l.Info("Initial sync completed")
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ hasSynced = true
+ }
+ q.Done(i)
+ }
+}
diff --git a/metropolis/node/kubernetes/reconciler/resources_rbac.go b/metropolis/node/kubernetes/reconciler/resources_rbac.go
index 60d1ba4..9ce9942 100644
--- a/metropolis/node/kubernetes/reconciler/resources_rbac.go
+++ b/metropolis/node/kubernetes/reconciler/resources_rbac.go
@@ -94,7 +94,17 @@
},
{
APIGroups: []string{""},
- Resources: []string{"services", "nodes", "namespaces"},
+ Resources: []string{"services", "nodes", "namespaces", "pods"},
+ Verbs: []string{"get", "list", "watch"},
+ },
+ {
+ APIGroups: []string{""},
+ Resources: []string{"events"},
+ Verbs: []string{"get", "list", "watch", "create", "update", "patch"},
+ },
+ {
+ APIGroups: []string{"networking.k8s.io"},
+ Resources: []string{"networkpolicies"},
Verbs: []string{"get", "list", "watch"},
},
},
diff --git a/metropolis/node/kubernetes/service_worker.go b/metropolis/node/kubernetes/service_worker.go
index 6f6633b..8ca8556 100644
--- a/metropolis/node/kubernetes/service_worker.go
+++ b/metropolis/node/kubernetes/service_worker.go
@@ -23,6 +23,7 @@
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/kubernetes/clusternet"
"source.monogon.dev/metropolis/node/kubernetes/metricsprovider"
+ "source.monogon.dev/metropolis/node/kubernetes/networkpolicy"
"source.monogon.dev/metropolis/node/kubernetes/nfproxy"
kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
"source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
@@ -209,6 +210,10 @@
ClientSet: clients["netserv"].client,
}
+ npc := networkpolicy.Service{
+ Kubernetes: clients["netserv"].client,
+ }
+
var dnsIPRanges []netip.Prefix
for _, ipNet := range []net.IPNet{s.c.ServiceIPRange, s.c.ClusterNet} {
ipPrefix, err := netip.ParsePrefix(ipNet.String())
@@ -250,6 +255,7 @@
{"nfproxy", nfproxy.Run},
{"dns-service", dnsService.Run},
{"dns-listener", runDNSListener},
+ {"npc", npc.Run},
{"kvmdeviceplugin", kvmDevicePlugin.Run},
{"kubelet", kubelet.Run},
} {
diff --git a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
index ac449eb..99731a3 100644
--- a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
+++ b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
@@ -43,8 +43,10 @@
"//metropolis/test/util",
"@io_bazel_rules_go//go/runfiles",
"@io_k8s_api//core/v1:core",
+ "@io_k8s_api//networking/v1:networking",
"@io_k8s_apimachinery//pkg/api/errors",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
+ "@io_k8s_apimachinery//pkg/util/intstr",
"@io_k8s_kubernetes//pkg/api/v1/pod",
"@io_k8s_utils//ptr",
"@org_golang_google_protobuf//types/known/fieldmaskpb",
diff --git a/metropolis/test/e2e/suites/kubernetes/run_test.go b/metropolis/test/e2e/suites/kubernetes/run_test.go
index 939c663..ee39c1c 100644
--- a/metropolis/test/e2e/suites/kubernetes/run_test.go
+++ b/metropolis/test/e2e/suites/kubernetes/run_test.go
@@ -22,8 +22,10 @@
"github.com/bazelbuild/rules_go/go/runfiles"
"google.golang.org/protobuf/types/known/fieldmaskpb"
corev1 "k8s.io/api/core/v1"
+ nwkv1 "k8s.io/api/networking/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/intstr"
podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/utils/ptr"
@@ -392,6 +394,51 @@
})
ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess)
})
+ t.Run("Network Policy Smoke Test", func(t *testing.T) {
+ ct := connectivity.SetupTest(t, &connectivity.TestSpec{
+ Name: "npc-smoke",
+ ClientSet: clientSet,
+ RESTConfig: restConfig,
+ NumPods: 2,
+ ExtraPodConfig: func(i int, pod *corev1.Pod) {
+ // Spread pods out over nodes to test inter-node network
+ pod.Labels = make(map[string]string)
+ pod.Labels["name"] = "npc-smoke"
+ pod.Spec.TopologySpreadConstraints = []corev1.TopologySpreadConstraint{{
+ MaxSkew: 1,
+ TopologyKey: "kubernetes.io/hostname",
+ WhenUnsatisfiable: corev1.DoNotSchedule,
+ LabelSelector: metav1.SetAsLabelSelector(pod.Labels),
+ }}
+ },
+ })
+ // Test connectivity before applying network policy
+ ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess)
+ ct.TestPodConnectivity(t, 0, 1, 1235, connectivity.ExpectedSuccess)
+ nwp := &nwkv1.NetworkPolicy{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "npc-smoke",
+ },
+ Spec: nwkv1.NetworkPolicySpec{
+ PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"name": "npc-smoke"}},
+ Ingress: []nwkv1.NetworkPolicyIngressRule{{
+ Ports: []nwkv1.NetworkPolicyPort{{
+ Protocol: ptr.To(corev1.ProtocolTCP),
+ Port: &intstr.IntOrString{Type: intstr.Int, IntVal: 1234},
+ }},
+ From: []nwkv1.NetworkPolicyPeer{{
+ PodSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"name": "npc-smoke"}},
+ }},
+ }},
+ },
+ }
+ if _, err := clientSet.NetworkingV1().NetworkPolicies("default").Create(context.Background(), nwp, metav1.CreateOptions{}); err != nil {
+ t.Fatal(err)
+ }
+ // Check if policy is in effect
+ ct.TestPodConnectivityEventual(t, 0, 1, 1235, connectivity.ExpectedReject, 30*time.Second)
+ ct.TestPodConnectivity(t, 0, 1, 1234, connectivity.ExpectedSuccess)
+ })
for _, runtimeClass := range []string{"runc", "gvisor"} {
statefulSetName := fmt.Sprintf("test-statefulset-%s", runtimeClass)
util.TestEventual(t, fmt.Sprintf("StatefulSet with %s tests", runtimeClass), ctx, smallTestTimeout, func(ctx context.Context) error {