blob: 618ff031d39fed3daf538a5ca75fb8f2d1c9464c [file] [log] [blame]
Lorenz Brun52700ae2025-01-28 15:07:08 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
4package networkpolicy
5
6import (
7 "context"
8 "fmt"
9
10 "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl"
11 v1 "k8s.io/api/core/v1"
12 "k8s.io/client-go/informers"
13 "k8s.io/client-go/kubernetes"
14 typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
15 "k8s.io/client-go/tools/cache"
16 "k8s.io/client-go/tools/cache/synctrack"
17 "k8s.io/client-go/tools/record"
18 "k8s.io/client-go/util/workqueue"
19 "k8s.io/kubectl/pkg/scheme"
20
21 "source.monogon.dev/go/logging"
22 "source.monogon.dev/metropolis/node"
23 "source.monogon.dev/osbase/supervisor"
24)
25
26type Service struct {
27 Kubernetes kubernetes.Interface
28}
29
30type workItem struct {
31 typ string
32 name cache.ObjectName
33}
34
35type updateEnqueuer struct {
36 typ string
37 q workqueue.TypedInterface[workItem]
38 hasProcessed *synctrack.AsyncTracker[workItem]
39 l logging.Leveled
40}
41
42func (c *updateEnqueuer) OnAdd(obj interface{}, isInInitialList bool) {
43 name, err := cache.ObjectToName(obj)
44 if err != nil {
45 c.l.Warningf("OnAdd name for type %q cannot be derived: %v", c.typ, err)
46 return
47 }
48 item := workItem{typ: c.typ, name: name}
49 if isInInitialList {
50 c.hasProcessed.Start(item)
51 }
52 c.q.Add(item)
53}
54
55func (c *updateEnqueuer) OnUpdate(oldObj, newObj interface{}) {
56 name, err := cache.ObjectToName(newObj)
57 if err != nil {
58 c.l.Warningf("OnUpdate name for type %q cannot be derived: %v", c.typ, err)
59 return
60 }
61 c.q.Add(workItem{typ: c.typ, name: name})
62}
63
64func (c *updateEnqueuer) OnDelete(obj interface{}) {
65 name, err := cache.DeletionHandlingObjectToName(obj)
66 if err != nil {
67 c.l.Warningf("OnDelete name for type %q cannot be derived: %v", c.typ, err)
68 return
69 }
70 c.q.Add(workItem{typ: c.typ, name: name})
71}
72
73func (c *Service) Run(ctx context.Context) error {
74 eventBroadcaster := record.NewBroadcaster()
75 eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.Kubernetes.CoreV1().Events("")})
76 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "npc"})
77
78 nft := nftctrl.New(recorder, node.LinkGroupK8sPod)
79 defer nft.Close()
80 l := supervisor.Logger(ctx)
81
82 informerFactory := informers.NewSharedInformerFactory(c.Kubernetes, 0)
83 q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[workItem]{
84 Name: "networkpolicy",
85 })
86
87 var hasProcessed synctrack.AsyncTracker[workItem]
88
89 nsInformer := informerFactory.Core().V1().Namespaces()
90 nsHandler, _ := nsInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "ns", hasProcessed: &hasProcessed, l: l})
91 podInformer := informerFactory.Core().V1().Pods()
92 podHandler, _ := podInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "pod", hasProcessed: &hasProcessed, l: l})
93 nwpInformer := informerFactory.Networking().V1().NetworkPolicies()
94 nwpHandler, _ := nwpInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "nwp", hasProcessed: &hasProcessed, l: l})
95 hasProcessed.UpstreamHasSynced = func() bool {
96 return nsHandler.HasSynced() && podHandler.HasSynced() && nwpHandler.HasSynced()
97 }
98 informerFactory.Start(ctx.Done())
99
100 go func() {
101 <-ctx.Done()
102 q.ShutDown()
103 informerFactory.Shutdown()
104 }()
105
106 hasSynced := false
107 for {
108 if ctx.Err() != nil {
109 return ctx.Err()
110 }
111 i, shut := q.Get()
112 if shut {
113 return ctx.Err()
114 }
115 switch i.typ {
116 case "pod":
117 pod, _ := podInformer.Lister().Pods(i.name.Namespace).Get(i.name.Name)
118 nft.SetPod(i.name, pod)
119 case "nwp":
120 nwp, _ := nwpInformer.Lister().NetworkPolicies(i.name.Namespace).Get(i.name.Name)
121 nft.SetNetworkPolicy(i.name, nwp)
122 case "ns":
123 ns, _ := nsInformer.Lister().Get(i.name.Name)
124 nft.SetNamespace(i.name.Name, ns)
125 }
126 hasProcessed.Finished(i)
127 if hasSynced {
128 if err := nft.Flush(); err != nil {
129 return fmt.Errorf("failed to flush after update of %s %v: %w", i.typ, i.name, err)
130 }
131 } else if hasProcessed.HasSynced() {
132 if err := nft.Flush(); err != nil {
133 return fmt.Errorf("initial flush failed: %w", err)
134 }
135 l.Info("Initial sync completed")
136 supervisor.Signal(ctx, supervisor.SignalHealthy)
137 hasSynced = true
138 }
139 q.Done(i)
140 }
141}