| Lorenz Brun | 52700ae | 2025-01-28 15:07:08 +0100 | [diff] [blame] | 1 | // Copyright The Monogon Project Authors. |
| 2 | // SPDX-License-Identifier: Apache-2.0 |
| 3 | |
| 4 | package networkpolicy |
| 5 | |
| 6 | import ( |
| 7 | "context" |
| 8 | "fmt" |
| 9 | |
| 10 | "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl" |
| 11 | v1 "k8s.io/api/core/v1" |
| 12 | "k8s.io/client-go/informers" |
| 13 | "k8s.io/client-go/kubernetes" |
| 14 | typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" |
| 15 | "k8s.io/client-go/tools/cache" |
| 16 | "k8s.io/client-go/tools/cache/synctrack" |
| 17 | "k8s.io/client-go/tools/record" |
| 18 | "k8s.io/client-go/util/workqueue" |
| 19 | "k8s.io/kubectl/pkg/scheme" |
| 20 | |
| 21 | "source.monogon.dev/go/logging" |
| Jan Schär | 0f8ce4c | 2025-09-04 13:27:50 +0200 | [diff] [blame^] | 22 | "source.monogon.dev/metropolis/node/allocs" |
| Lorenz Brun | 52700ae | 2025-01-28 15:07:08 +0100 | [diff] [blame] | 23 | "source.monogon.dev/osbase/supervisor" |
| 24 | ) |
| 25 | |
| 26 | type Service struct { |
| 27 | Kubernetes kubernetes.Interface |
| 28 | } |
| 29 | |
| 30 | type workItem struct { |
| 31 | typ string |
| 32 | name cache.ObjectName |
| 33 | } |
| 34 | |
| 35 | type updateEnqueuer struct { |
| 36 | typ string |
| 37 | q workqueue.TypedInterface[workItem] |
| 38 | hasProcessed *synctrack.AsyncTracker[workItem] |
| 39 | l logging.Leveled |
| 40 | } |
| 41 | |
| 42 | func (c *updateEnqueuer) OnAdd(obj interface{}, isInInitialList bool) { |
| 43 | name, err := cache.ObjectToName(obj) |
| 44 | if err != nil { |
| 45 | c.l.Warningf("OnAdd name for type %q cannot be derived: %v", c.typ, err) |
| 46 | return |
| 47 | } |
| 48 | item := workItem{typ: c.typ, name: name} |
| 49 | if isInInitialList { |
| 50 | c.hasProcessed.Start(item) |
| 51 | } |
| 52 | c.q.Add(item) |
| 53 | } |
| 54 | |
| 55 | func (c *updateEnqueuer) OnUpdate(oldObj, newObj interface{}) { |
| 56 | name, err := cache.ObjectToName(newObj) |
| 57 | if err != nil { |
| 58 | c.l.Warningf("OnUpdate name for type %q cannot be derived: %v", c.typ, err) |
| 59 | return |
| 60 | } |
| 61 | c.q.Add(workItem{typ: c.typ, name: name}) |
| 62 | } |
| 63 | |
| 64 | func (c *updateEnqueuer) OnDelete(obj interface{}) { |
| 65 | name, err := cache.DeletionHandlingObjectToName(obj) |
| 66 | if err != nil { |
| 67 | c.l.Warningf("OnDelete name for type %q cannot be derived: %v", c.typ, err) |
| 68 | return |
| 69 | } |
| 70 | c.q.Add(workItem{typ: c.typ, name: name}) |
| 71 | } |
| 72 | |
| 73 | func (c *Service) Run(ctx context.Context) error { |
| 74 | eventBroadcaster := record.NewBroadcaster() |
| 75 | eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.Kubernetes.CoreV1().Events("")}) |
| 76 | recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "npc"}) |
| 77 | |
| Jan Schär | 0f8ce4c | 2025-09-04 13:27:50 +0200 | [diff] [blame^] | 78 | nft, err := nftctrl.New(recorder, allocs.LinkGroupK8sPod) |
| Lorenz Brun | 12e4b54 | 2025-02-19 16:29:30 +0100 | [diff] [blame] | 79 | if err != nil { |
| 80 | return fmt.Errorf("failed to create nftables controller: %w", err) |
| 81 | } |
| Lorenz Brun | 52700ae | 2025-01-28 15:07:08 +0100 | [diff] [blame] | 82 | defer nft.Close() |
| 83 | l := supervisor.Logger(ctx) |
| 84 | |
| 85 | informerFactory := informers.NewSharedInformerFactory(c.Kubernetes, 0) |
| 86 | q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[workItem]{ |
| 87 | Name: "networkpolicy", |
| 88 | }) |
| 89 | |
| 90 | var hasProcessed synctrack.AsyncTracker[workItem] |
| 91 | |
| 92 | nsInformer := informerFactory.Core().V1().Namespaces() |
| 93 | nsHandler, _ := nsInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "ns", hasProcessed: &hasProcessed, l: l}) |
| 94 | podInformer := informerFactory.Core().V1().Pods() |
| 95 | podHandler, _ := podInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "pod", hasProcessed: &hasProcessed, l: l}) |
| 96 | nwpInformer := informerFactory.Networking().V1().NetworkPolicies() |
| 97 | nwpHandler, _ := nwpInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "nwp", hasProcessed: &hasProcessed, l: l}) |
| 98 | hasProcessed.UpstreamHasSynced = func() bool { |
| 99 | return nsHandler.HasSynced() && podHandler.HasSynced() && nwpHandler.HasSynced() |
| 100 | } |
| 101 | informerFactory.Start(ctx.Done()) |
| 102 | |
| 103 | go func() { |
| 104 | <-ctx.Done() |
| 105 | q.ShutDown() |
| 106 | informerFactory.Shutdown() |
| 107 | }() |
| 108 | |
| 109 | hasSynced := false |
| 110 | for { |
| 111 | if ctx.Err() != nil { |
| 112 | return ctx.Err() |
| 113 | } |
| 114 | i, shut := q.Get() |
| 115 | if shut { |
| 116 | return ctx.Err() |
| 117 | } |
| 118 | switch i.typ { |
| 119 | case "pod": |
| 120 | pod, _ := podInformer.Lister().Pods(i.name.Namespace).Get(i.name.Name) |
| 121 | nft.SetPod(i.name, pod) |
| 122 | case "nwp": |
| 123 | nwp, _ := nwpInformer.Lister().NetworkPolicies(i.name.Namespace).Get(i.name.Name) |
| 124 | nft.SetNetworkPolicy(i.name, nwp) |
| 125 | case "ns": |
| 126 | ns, _ := nsInformer.Lister().Get(i.name.Name) |
| 127 | nft.SetNamespace(i.name.Name, ns) |
| 128 | } |
| 129 | hasProcessed.Finished(i) |
| 130 | if hasSynced { |
| 131 | if err := nft.Flush(); err != nil { |
| 132 | return fmt.Errorf("failed to flush after update of %s %v: %w", i.typ, i.name, err) |
| 133 | } |
| 134 | } else if hasProcessed.HasSynced() { |
| 135 | if err := nft.Flush(); err != nil { |
| 136 | return fmt.Errorf("initial flush failed: %w", err) |
| 137 | } |
| 138 | l.Info("Initial sync completed") |
| 139 | supervisor.Signal(ctx, supervisor.SignalHealthy) |
| 140 | hasSynced = true |
| 141 | } |
| 142 | q.Done(i) |
| 143 | } |
| 144 | } |