m/n/k8s: add nftables network policy controller
This integrates my K8s network policy controller. In its current form it
does not have many guarantees as the custom CNI plugin is not yet in
there but it mostly works. Also there is still a DNS hole as host-local
services are not properly policed yet.
It has a basic smoke test using the connectivity testing helper as well
as some metrics to make sure it is integrated properly and to be able to
monitor its performance.
Change-Id: Ia2f54b9975361270678ce742ae5e32df25e515c5
Reviewed-on: https://review.monogon.dev/c/monogon/+/3740
Tested-by: Jenkins CI
Reviewed-by: Jan Schär <jan@monogon.tech>
diff --git a/metropolis/node/kubernetes/networkpolicy/BUILD.bazel b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
new file mode 100644
index 0000000..d3d3b76
--- /dev/null
+++ b/metropolis/node/kubernetes/networkpolicy/BUILD.bazel
@@ -0,0 +1,23 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "networkpolicy",
+ srcs = ["networkpolicy.go"],
+ importpath = "source.monogon.dev/metropolis/node/kubernetes/networkpolicy",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//go/logging",
+ "//metropolis/node",
+ "//osbase/supervisor",
+ "@io_k8s_api//core/v1:core",
+ "@io_k8s_client_go//informers",
+ "@io_k8s_client_go//kubernetes",
+ "@io_k8s_client_go//kubernetes/typed/core/v1:core",
+ "@io_k8s_client_go//tools/cache",
+ "@io_k8s_client_go//tools/cache/synctrack",
+ "@io_k8s_client_go//tools/record",
+ "@io_k8s_client_go//util/workqueue",
+ "@io_k8s_kubectl//pkg/scheme",
+ "@org_dolansoft_git_dolansoft_k8s_nft_npc//nftctrl",
+ ],
+)
diff --git a/metropolis/node/kubernetes/networkpolicy/networkpolicy.go b/metropolis/node/kubernetes/networkpolicy/networkpolicy.go
new file mode 100644
index 0000000..618ff03
--- /dev/null
+++ b/metropolis/node/kubernetes/networkpolicy/networkpolicy.go
@@ -0,0 +1,141 @@
+// Copyright The Monogon Project Authors.
+// SPDX-License-Identifier: Apache-2.0
+
+package networkpolicy
+
+import (
+ "context"
+ "fmt"
+
+ "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl"
+ v1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/tools/cache/synctrack"
+ "k8s.io/client-go/tools/record"
+ "k8s.io/client-go/util/workqueue"
+ "k8s.io/kubectl/pkg/scheme"
+
+ "source.monogon.dev/go/logging"
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/osbase/supervisor"
+)
+
+type Service struct {
+ Kubernetes kubernetes.Interface
+}
+
+type workItem struct {
+ typ string
+ name cache.ObjectName
+}
+
+type updateEnqueuer struct {
+ typ string
+ q workqueue.TypedInterface[workItem]
+ hasProcessed *synctrack.AsyncTracker[workItem]
+ l logging.Leveled
+}
+
+func (c *updateEnqueuer) OnAdd(obj interface{}, isInInitialList bool) {
+ name, err := cache.ObjectToName(obj)
+ if err != nil {
+ c.l.Warningf("OnAdd name for type %q cannot be derived: %v", c.typ, err)
+ return
+ }
+ item := workItem{typ: c.typ, name: name}
+ if isInInitialList {
+ c.hasProcessed.Start(item)
+ }
+ c.q.Add(item)
+}
+
+func (c *updateEnqueuer) OnUpdate(oldObj, newObj interface{}) {
+ name, err := cache.ObjectToName(newObj)
+ if err != nil {
+ c.l.Warningf("OnUpdate name for type %q cannot be derived: %v", c.typ, err)
+ return
+ }
+ c.q.Add(workItem{typ: c.typ, name: name})
+}
+
+func (c *updateEnqueuer) OnDelete(obj interface{}) {
+ name, err := cache.DeletionHandlingObjectToName(obj)
+ if err != nil {
+ c.l.Warningf("OnDelete name for type %q cannot be derived: %v", c.typ, err)
+ return
+ }
+ c.q.Add(workItem{typ: c.typ, name: name})
+}
+
+func (c *Service) Run(ctx context.Context) error {
+ eventBroadcaster := record.NewBroadcaster()
+ eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.Kubernetes.CoreV1().Events("")})
+ recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "npc"})
+
+ nft := nftctrl.New(recorder, node.LinkGroupK8sPod)
+ defer nft.Close()
+ l := supervisor.Logger(ctx)
+
+ informerFactory := informers.NewSharedInformerFactory(c.Kubernetes, 0)
+ q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[workItem]{
+ Name: "networkpolicy",
+ })
+
+ var hasProcessed synctrack.AsyncTracker[workItem]
+
+ nsInformer := informerFactory.Core().V1().Namespaces()
+ nsHandler, _ := nsInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "ns", hasProcessed: &hasProcessed, l: l})
+ podInformer := informerFactory.Core().V1().Pods()
+ podHandler, _ := podInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "pod", hasProcessed: &hasProcessed, l: l})
+ nwpInformer := informerFactory.Networking().V1().NetworkPolicies()
+ nwpHandler, _ := nwpInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "nwp", hasProcessed: &hasProcessed, l: l})
+ hasProcessed.UpstreamHasSynced = func() bool {
+ return nsHandler.HasSynced() && podHandler.HasSynced() && nwpHandler.HasSynced()
+ }
+ informerFactory.Start(ctx.Done())
+
+ go func() {
+ <-ctx.Done()
+ q.ShutDown()
+ informerFactory.Shutdown()
+ }()
+
+ hasSynced := false
+ for {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ i, shut := q.Get()
+ if shut {
+ return ctx.Err()
+ }
+ switch i.typ {
+ case "pod":
+ pod, _ := podInformer.Lister().Pods(i.name.Namespace).Get(i.name.Name)
+ nft.SetPod(i.name, pod)
+ case "nwp":
+ nwp, _ := nwpInformer.Lister().NetworkPolicies(i.name.Namespace).Get(i.name.Name)
+ nft.SetNetworkPolicy(i.name, nwp)
+ case "ns":
+ ns, _ := nsInformer.Lister().Get(i.name.Name)
+ nft.SetNamespace(i.name.Name, ns)
+ }
+ hasProcessed.Finished(i)
+ if hasSynced {
+ if err := nft.Flush(); err != nil {
+ return fmt.Errorf("failed to flush after update of %s %v: %w", i.typ, i.name, err)
+ }
+ } else if hasProcessed.HasSynced() {
+ if err := nft.Flush(); err != nil {
+ return fmt.Errorf("initial flush failed: %w", err)
+ }
+ l.Info("Initial sync completed")
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ hasSynced = true
+ }
+ q.Done(i)
+ }
+}