blob: 04a2fcdca9cdbff11afded164d7f713598c31405 [file] [log] [blame]
Lorenz Brun52700ae2025-01-28 15:07:08 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
4package networkpolicy
5
6import (
7 "context"
8 "fmt"
9
10 "git.dolansoft.org/dolansoft/k8s-nft-npc/nftctrl"
11 v1 "k8s.io/api/core/v1"
12 "k8s.io/client-go/informers"
13 "k8s.io/client-go/kubernetes"
14 typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
15 "k8s.io/client-go/tools/cache"
16 "k8s.io/client-go/tools/cache/synctrack"
17 "k8s.io/client-go/tools/record"
18 "k8s.io/client-go/util/workqueue"
19 "k8s.io/kubectl/pkg/scheme"
20
21 "source.monogon.dev/go/logging"
Jan Schär0f8ce4c2025-09-04 13:27:50 +020022 "source.monogon.dev/metropolis/node/allocs"
Lorenz Brun52700ae2025-01-28 15:07:08 +010023 "source.monogon.dev/osbase/supervisor"
24)
25
26type Service struct {
27 Kubernetes kubernetes.Interface
28}
29
30type workItem struct {
31 typ string
32 name cache.ObjectName
33}
34
35type updateEnqueuer struct {
36 typ string
37 q workqueue.TypedInterface[workItem]
38 hasProcessed *synctrack.AsyncTracker[workItem]
39 l logging.Leveled
40}
41
42func (c *updateEnqueuer) OnAdd(obj interface{}, isInInitialList bool) {
43 name, err := cache.ObjectToName(obj)
44 if err != nil {
45 c.l.Warningf("OnAdd name for type %q cannot be derived: %v", c.typ, err)
46 return
47 }
48 item := workItem{typ: c.typ, name: name}
49 if isInInitialList {
50 c.hasProcessed.Start(item)
51 }
52 c.q.Add(item)
53}
54
55func (c *updateEnqueuer) OnUpdate(oldObj, newObj interface{}) {
56 name, err := cache.ObjectToName(newObj)
57 if err != nil {
58 c.l.Warningf("OnUpdate name for type %q cannot be derived: %v", c.typ, err)
59 return
60 }
61 c.q.Add(workItem{typ: c.typ, name: name})
62}
63
64func (c *updateEnqueuer) OnDelete(obj interface{}) {
65 name, err := cache.DeletionHandlingObjectToName(obj)
66 if err != nil {
67 c.l.Warningf("OnDelete name for type %q cannot be derived: %v", c.typ, err)
68 return
69 }
70 c.q.Add(workItem{typ: c.typ, name: name})
71}
72
73func (c *Service) Run(ctx context.Context) error {
74 eventBroadcaster := record.NewBroadcaster()
75 eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: c.Kubernetes.CoreV1().Events("")})
76 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "npc"})
77
Jan Schär0f8ce4c2025-09-04 13:27:50 +020078 nft, err := nftctrl.New(recorder, allocs.LinkGroupK8sPod)
Lorenz Brun12e4b542025-02-19 16:29:30 +010079 if err != nil {
80 return fmt.Errorf("failed to create nftables controller: %w", err)
81 }
Lorenz Brun52700ae2025-01-28 15:07:08 +010082 defer nft.Close()
83 l := supervisor.Logger(ctx)
84
85 informerFactory := informers.NewSharedInformerFactory(c.Kubernetes, 0)
86 q := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[workItem]{
87 Name: "networkpolicy",
88 })
89
90 var hasProcessed synctrack.AsyncTracker[workItem]
91
92 nsInformer := informerFactory.Core().V1().Namespaces()
93 nsHandler, _ := nsInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "ns", hasProcessed: &hasProcessed, l: l})
94 podInformer := informerFactory.Core().V1().Pods()
95 podHandler, _ := podInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "pod", hasProcessed: &hasProcessed, l: l})
96 nwpInformer := informerFactory.Networking().V1().NetworkPolicies()
97 nwpHandler, _ := nwpInformer.Informer().AddEventHandler(&updateEnqueuer{q: q, typ: "nwp", hasProcessed: &hasProcessed, l: l})
98 hasProcessed.UpstreamHasSynced = func() bool {
99 return nsHandler.HasSynced() && podHandler.HasSynced() && nwpHandler.HasSynced()
100 }
101 informerFactory.Start(ctx.Done())
102
103 go func() {
104 <-ctx.Done()
105 q.ShutDown()
106 informerFactory.Shutdown()
107 }()
108
109 hasSynced := false
110 for {
111 if ctx.Err() != nil {
112 return ctx.Err()
113 }
114 i, shut := q.Get()
115 if shut {
116 return ctx.Err()
117 }
118 switch i.typ {
119 case "pod":
120 pod, _ := podInformer.Lister().Pods(i.name.Namespace).Get(i.name.Name)
121 nft.SetPod(i.name, pod)
122 case "nwp":
123 nwp, _ := nwpInformer.Lister().NetworkPolicies(i.name.Namespace).Get(i.name.Name)
124 nft.SetNetworkPolicy(i.name, nwp)
125 case "ns":
126 ns, _ := nsInformer.Lister().Get(i.name.Name)
127 nft.SetNamespace(i.name.Name, ns)
128 }
129 hasProcessed.Finished(i)
130 if hasSynced {
131 if err := nft.Flush(); err != nil {
132 return fmt.Errorf("failed to flush after update of %s %v: %w", i.typ, i.name, err)
133 }
134 } else if hasProcessed.HasSynced() {
135 if err := nft.Flush(); err != nil {
136 return fmt.Errorf("initial flush failed: %w", err)
137 }
138 l.Info("Initial sync completed")
139 supervisor.Signal(ctx, supervisor.SignalHealthy)
140 hasSynced = true
141 }
142 q.Done(i)
143 }
144}