blob: 61a33fc74a0bdc8e650fb58ec35200ab0d7071bd [file] [log] [blame]
Jan Schära48bd3c2024-07-29 17:22:18 +02001package kubernetes
2
3// Taken and modified from the Kubernetes plugin of CoreDNS, under Apache 2.0.
4
5import (
6 "context"
7 "errors"
8 "sync/atomic"
9 "time"
10
11 api "k8s.io/api/core/v1"
12 discovery "k8s.io/api/discovery/v1"
13 meta "k8s.io/apimachinery/pkg/apis/meta/v1"
14 "k8s.io/apimachinery/pkg/runtime"
15 "k8s.io/apimachinery/pkg/watch"
16 "k8s.io/client-go/kubernetes"
17 "k8s.io/client-go/tools/cache"
18
19 "source.monogon.dev/osbase/net/dns/kubernetes/object"
20)
21
22const (
23 svcIPIndex = "ServiceIP"
24 epNameNamespaceIndex = "EndpointNameNamespace"
25 epIPIndex = "EndpointsIP"
26)
27
28// epLabelSelector selects EndpointSlices that belong to headless services.
29// Endpoint DNS records are only served for headless services,
30// and we can save resources by not fetching other EndpointSlices.
31const epLabelSelector = api.IsHeadlessService
32
33type dnsController interface {
34 GetSvc(string) *object.Service
35 SvcIndexReverse(string) []*object.Service
36 EpIndex(string) []*object.Endpoints
37 EpIndexReverse(string) []*object.Endpoints
38 NamespaceExists(string) bool
39
40 Start(<-chan struct{})
41 HasSynced() bool
42
43 // Modified returns the timestamp of the most recent changes to services.
44 Modified() int64
45}
46
47type dnsControl struct {
48 // modified tracks timestamp of the most recent changes
49 modified atomic.Int64
50 hasSynced atomic.Bool
51
52 svcController cache.Controller
53 epController cache.Controller
54 nsController cache.Controller
55
56 svcLister cache.Indexer
57 epLister cache.Indexer
58 nsLister cache.Store
59}
60
61// newdnsController creates a controller.
62func newdnsController(ctx context.Context, kubeClient kubernetes.Interface) *dnsControl {
63 dns := dnsControl{}
64
65 dns.svcLister, dns.svcController = object.NewIndexerInformer(
66 &cache.ListWatch{
67 ListFunc: serviceListFunc(ctx, kubeClient, api.NamespaceAll),
68 WatchFunc: serviceWatchFunc(ctx, kubeClient, api.NamespaceAll),
69 },
70 &api.Service{},
71 &dns,
72 cache.Indexers{svcIPIndex: svcIPIndexFunc},
73 object.DefaultProcessor(object.ToService),
74 )
75
76 dns.epLister, dns.epController = object.NewIndexerInformer(
77 &cache.ListWatch{
78 ListFunc: endpointSliceListFunc(ctx, kubeClient, api.NamespaceAll),
79 WatchFunc: endpointSliceWatchFunc(ctx, kubeClient, api.NamespaceAll),
80 },
81 &discovery.EndpointSlice{},
82 &dns,
83 cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
84 object.DefaultProcessor(object.EndpointSliceToEndpoints),
85 )
86
87 dns.nsLister, dns.nsController = object.NewIndexerInformer(
88 &cache.ListWatch{
89 ListFunc: namespaceListFunc(ctx, kubeClient),
90 WatchFunc: namespaceWatchFunc(ctx, kubeClient),
91 },
92 &api.Namespace{},
93 &dns,
94 cache.Indexers{},
95 object.DefaultProcessor(object.ToNamespace),
96 )
97
98 return &dns
99}
100
101func svcIPIndexFunc(obj interface{}) ([]string, error) {
102 svc, ok := obj.(*object.Service)
103 if !ok {
104 return nil, errObj
105 }
106 idx := make([]string, len(svc.ClusterIPs))
107 copy(idx, svc.ClusterIPs)
108 return idx, nil
109}
110
111func epNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
112 s, ok := obj.(*object.Endpoints)
113 if !ok {
114 return nil, errObj
115 }
116 return []string{s.Index}, nil
117}
118
119func epIPIndexFunc(obj interface{}) ([]string, error) {
120 ep, ok := obj.(*object.Endpoints)
121 if !ok {
122 return nil, errObj
123 }
124 idx := make([]string, len(ep.Addresses))
125 for i, addr := range ep.Addresses {
126 idx[i] = addr.IP
127 }
128 return idx, nil
129}
130
131func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string) func(meta.ListOptions) (runtime.Object, error) {
132 return func(opts meta.ListOptions) (runtime.Object, error) {
133 return c.CoreV1().Services(ns).List(ctx, opts)
134 }
135}
136
137func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string) func(meta.ListOptions) (runtime.Object, error) {
138 return func(opts meta.ListOptions) (runtime.Object, error) {
139 opts.LabelSelector = epLabelSelector
140 return c.DiscoveryV1().EndpointSlices(ns).List(ctx, opts)
141 }
142}
143
144func namespaceListFunc(ctx context.Context, c kubernetes.Interface) func(meta.ListOptions) (runtime.Object, error) {
145 return func(opts meta.ListOptions) (runtime.Object, error) {
146 return c.CoreV1().Namespaces().List(ctx, opts)
147 }
148}
149
150func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string) func(options meta.ListOptions) (watch.Interface, error) {
151 return func(options meta.ListOptions) (watch.Interface, error) {
152 return c.CoreV1().Services(ns).Watch(ctx, options)
153 }
154}
155
156func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string) func(options meta.ListOptions) (watch.Interface, error) {
157 return func(options meta.ListOptions) (watch.Interface, error) {
158 options.LabelSelector = epLabelSelector
159 return c.DiscoveryV1().EndpointSlices(ns).Watch(ctx, options)
160 }
161}
162
163func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface) func(options meta.ListOptions) (watch.Interface, error) {
164 return func(options meta.ListOptions) (watch.Interface, error) {
165 return c.CoreV1().Namespaces().Watch(ctx, options)
166 }
167}
168
169// Start starts the controller.
170func (dns *dnsControl) Start(stopCh <-chan struct{}) {
171 go dns.svcController.Run(stopCh)
172 go dns.epController.Run(stopCh)
173 go dns.nsController.Run(stopCh)
174}
175
176// HasSynced returns true if the initial data has been
177// completely loaded into memory.
178func (dns *dnsControl) HasSynced() bool {
179 if dns.hasSynced.Load() {
180 return true
181 }
182 a := dns.svcController.HasSynced()
183 b := dns.epController.HasSynced()
184 c := dns.nsController.HasSynced()
185 if a && b && c {
186 dns.hasSynced.Store(true)
187 return true
188 }
189 return false
190}
191
192func (dns *dnsControl) GetSvc(key string) *object.Service {
193 o, exists, err := dns.svcLister.GetByKey(key)
194 if err != nil || !exists {
195 return nil
196 }
197 s, ok := o.(*object.Service)
198 if ok {
199 return s
200 }
201 return nil
202}
203
204func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) {
205 os, err := dns.svcLister.ByIndex(svcIPIndex, ip)
206 if err != nil {
207 return nil
208 }
209 svcs = make([]*object.Service, 0, len(os))
210 for _, o := range os {
211 s, ok := o.(*object.Service)
212 if ok {
213 svcs = append(svcs, s)
214 }
215 }
216 return svcs
217}
218
219func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
220 os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
221 if err != nil {
222 return nil
223 }
224 ep = make([]*object.Endpoints, 0, len(os))
225 for _, o := range os {
226 e, ok := o.(*object.Endpoints)
227 if ok {
228 ep = append(ep, e)
229 }
230 }
231 return ep
232}
233
234func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
235 os, err := dns.epLister.ByIndex(epIPIndex, ip)
236 if err != nil {
237 return nil
238 }
239 ep = make([]*object.Endpoints, 0, len(os))
240 for _, o := range os {
241 e, ok := o.(*object.Endpoints)
242 if ok {
243 ep = append(ep, e)
244 }
245 }
246 return ep
247}
248
249// NamespaceExists returns true if a namespace with this name exists.
250func (dns *dnsControl) NamespaceExists(name string) bool {
251 _, exists, _ := dns.nsLister.GetByKey(name)
252 return exists
253}
254
255func (dns *dnsControl) OnAdd(obj interface{}, isInInitialList bool) {
256 dns.updateModified()
257 switch obj := obj.(type) {
258 case *object.Endpoints:
259 // Don't record latency during initial sync, because measuring latency only
260 // makes sense for changes that happen while the service is running.
261 if !isInInitialList {
262 recordDNSProgrammingLatency(obj.LastChangeTriggerTime)
263 }
264 }
265}
266
267func (dns *dnsControl) OnDelete(obj interface{}) {
268 dns.updateModified()
269 // Note: We cannot record programming latency on deletes, because the trigger
270 // time annotation is not updated when the object is deleted.
271}
272
273func (dns *dnsControl) OnUpdate(oldObj, newObj interface{}) {
274 // If both objects have the same resource version, they are identical.
275 if oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion() {
276 return
277 }
278
279 switch newObj := newObj.(type) {
280 case *object.Service:
281 if object.ServiceModified(oldObj.(*object.Service), newObj) {
282 dns.updateModified()
283 }
284 case *object.Endpoints:
285 oldObj := oldObj.(*object.Endpoints)
286 if object.EndpointsModified(oldObj, newObj) {
287 dns.updateModified()
288 // If the trigger time has not changed, the process that last updated the
289 // object did not update the trigger time, so we can't know the latency.
290 if oldObj.LastChangeTriggerTime != newObj.LastChangeTriggerTime {
291 recordDNSProgrammingLatency(newObj.LastChangeTriggerTime)
292 }
293 }
294 }
295}
296
297func (dns *dnsControl) Modified() int64 {
298 return dns.modified.Load()
299}
300
301// updateModified set dns.modified to the current time.
302func (dns *dnsControl) updateModified() {
303 unix := time.Now().Unix()
304 dns.modified.Store(unix)
305}
306
307var errObj = errors.New("obj was not of the correct type")