blob: dd19f243134e9215c4f7fd8115a878c5de274bf0 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Jan Schära48bd3c2024-07-29 17:22:18 +02004package kubernetes
5
6// Taken and modified from the Kubernetes plugin of CoreDNS, under Apache 2.0.
7
8import (
9 "context"
10 "errors"
11 "sync/atomic"
12 "time"
13
14 api "k8s.io/api/core/v1"
15 discovery "k8s.io/api/discovery/v1"
16 meta "k8s.io/apimachinery/pkg/apis/meta/v1"
17 "k8s.io/apimachinery/pkg/runtime"
18 "k8s.io/apimachinery/pkg/watch"
19 "k8s.io/client-go/kubernetes"
20 "k8s.io/client-go/tools/cache"
21
22 "source.monogon.dev/osbase/net/dns/kubernetes/object"
23)
24
25const (
26 svcIPIndex = "ServiceIP"
27 epNameNamespaceIndex = "EndpointNameNamespace"
28 epIPIndex = "EndpointsIP"
29)
30
31// epLabelSelector selects EndpointSlices that belong to headless services.
32// Endpoint DNS records are only served for headless services,
33// and we can save resources by not fetching other EndpointSlices.
34const epLabelSelector = api.IsHeadlessService
35
36type dnsController interface {
37 GetSvc(string) *object.Service
38 SvcIndexReverse(string) []*object.Service
39 EpIndex(string) []*object.Endpoints
40 EpIndexReverse(string) []*object.Endpoints
41 NamespaceExists(string) bool
42
43 Start(<-chan struct{})
44 HasSynced() bool
45
46 // Modified returns the timestamp of the most recent changes to services.
47 Modified() int64
48}
49
50type dnsControl struct {
51 // modified tracks timestamp of the most recent changes
52 modified atomic.Int64
53 hasSynced atomic.Bool
54
55 svcController cache.Controller
56 epController cache.Controller
57 nsController cache.Controller
58
59 svcLister cache.Indexer
60 epLister cache.Indexer
61 nsLister cache.Store
62}
63
64// newdnsController creates a controller.
65func newdnsController(ctx context.Context, kubeClient kubernetes.Interface) *dnsControl {
66 dns := dnsControl{}
67
68 dns.svcLister, dns.svcController = object.NewIndexerInformer(
69 &cache.ListWatch{
70 ListFunc: serviceListFunc(ctx, kubeClient, api.NamespaceAll),
71 WatchFunc: serviceWatchFunc(ctx, kubeClient, api.NamespaceAll),
72 },
73 &api.Service{},
74 &dns,
75 cache.Indexers{svcIPIndex: svcIPIndexFunc},
76 object.DefaultProcessor(object.ToService),
77 )
78
79 dns.epLister, dns.epController = object.NewIndexerInformer(
80 &cache.ListWatch{
81 ListFunc: endpointSliceListFunc(ctx, kubeClient, api.NamespaceAll),
82 WatchFunc: endpointSliceWatchFunc(ctx, kubeClient, api.NamespaceAll),
83 },
84 &discovery.EndpointSlice{},
85 &dns,
86 cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
87 object.DefaultProcessor(object.EndpointSliceToEndpoints),
88 )
89
90 dns.nsLister, dns.nsController = object.NewIndexerInformer(
91 &cache.ListWatch{
92 ListFunc: namespaceListFunc(ctx, kubeClient),
93 WatchFunc: namespaceWatchFunc(ctx, kubeClient),
94 },
95 &api.Namespace{},
96 &dns,
97 cache.Indexers{},
98 object.DefaultProcessor(object.ToNamespace),
99 )
100
101 return &dns
102}
103
104func svcIPIndexFunc(obj interface{}) ([]string, error) {
105 svc, ok := obj.(*object.Service)
106 if !ok {
107 return nil, errObj
108 }
109 idx := make([]string, len(svc.ClusterIPs))
110 copy(idx, svc.ClusterIPs)
111 return idx, nil
112}
113
114func epNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
115 s, ok := obj.(*object.Endpoints)
116 if !ok {
117 return nil, errObj
118 }
119 return []string{s.Index}, nil
120}
121
122func epIPIndexFunc(obj interface{}) ([]string, error) {
123 ep, ok := obj.(*object.Endpoints)
124 if !ok {
125 return nil, errObj
126 }
127 idx := make([]string, len(ep.Addresses))
128 for i, addr := range ep.Addresses {
129 idx[i] = addr.IP
130 }
131 return idx, nil
132}
133
134func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string) func(meta.ListOptions) (runtime.Object, error) {
135 return func(opts meta.ListOptions) (runtime.Object, error) {
136 return c.CoreV1().Services(ns).List(ctx, opts)
137 }
138}
139
140func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string) func(meta.ListOptions) (runtime.Object, error) {
141 return func(opts meta.ListOptions) (runtime.Object, error) {
142 opts.LabelSelector = epLabelSelector
143 return c.DiscoveryV1().EndpointSlices(ns).List(ctx, opts)
144 }
145}
146
147func namespaceListFunc(ctx context.Context, c kubernetes.Interface) func(meta.ListOptions) (runtime.Object, error) {
148 return func(opts meta.ListOptions) (runtime.Object, error) {
149 return c.CoreV1().Namespaces().List(ctx, opts)
150 }
151}
152
153func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string) func(options meta.ListOptions) (watch.Interface, error) {
154 return func(options meta.ListOptions) (watch.Interface, error) {
155 return c.CoreV1().Services(ns).Watch(ctx, options)
156 }
157}
158
159func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string) func(options meta.ListOptions) (watch.Interface, error) {
160 return func(options meta.ListOptions) (watch.Interface, error) {
161 options.LabelSelector = epLabelSelector
162 return c.DiscoveryV1().EndpointSlices(ns).Watch(ctx, options)
163 }
164}
165
166func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface) func(options meta.ListOptions) (watch.Interface, error) {
167 return func(options meta.ListOptions) (watch.Interface, error) {
168 return c.CoreV1().Namespaces().Watch(ctx, options)
169 }
170}
171
172// Start starts the controller.
173func (dns *dnsControl) Start(stopCh <-chan struct{}) {
174 go dns.svcController.Run(stopCh)
175 go dns.epController.Run(stopCh)
176 go dns.nsController.Run(stopCh)
177}
178
179// HasSynced returns true if the initial data has been
180// completely loaded into memory.
181func (dns *dnsControl) HasSynced() bool {
182 if dns.hasSynced.Load() {
183 return true
184 }
185 a := dns.svcController.HasSynced()
186 b := dns.epController.HasSynced()
187 c := dns.nsController.HasSynced()
188 if a && b && c {
189 dns.hasSynced.Store(true)
190 return true
191 }
192 return false
193}
194
195func (dns *dnsControl) GetSvc(key string) *object.Service {
196 o, exists, err := dns.svcLister.GetByKey(key)
197 if err != nil || !exists {
198 return nil
199 }
200 s, ok := o.(*object.Service)
201 if ok {
202 return s
203 }
204 return nil
205}
206
207func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) {
208 os, err := dns.svcLister.ByIndex(svcIPIndex, ip)
209 if err != nil {
210 return nil
211 }
212 svcs = make([]*object.Service, 0, len(os))
213 for _, o := range os {
214 s, ok := o.(*object.Service)
215 if ok {
216 svcs = append(svcs, s)
217 }
218 }
219 return svcs
220}
221
222func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
223 os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
224 if err != nil {
225 return nil
226 }
227 ep = make([]*object.Endpoints, 0, len(os))
228 for _, o := range os {
229 e, ok := o.(*object.Endpoints)
230 if ok {
231 ep = append(ep, e)
232 }
233 }
234 return ep
235}
236
237func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
238 os, err := dns.epLister.ByIndex(epIPIndex, ip)
239 if err != nil {
240 return nil
241 }
242 ep = make([]*object.Endpoints, 0, len(os))
243 for _, o := range os {
244 e, ok := o.(*object.Endpoints)
245 if ok {
246 ep = append(ep, e)
247 }
248 }
249 return ep
250}
251
252// NamespaceExists returns true if a namespace with this name exists.
253func (dns *dnsControl) NamespaceExists(name string) bool {
254 _, exists, _ := dns.nsLister.GetByKey(name)
255 return exists
256}
257
258func (dns *dnsControl) OnAdd(obj interface{}, isInInitialList bool) {
259 dns.updateModified()
260 switch obj := obj.(type) {
261 case *object.Endpoints:
262 // Don't record latency during initial sync, because measuring latency only
263 // makes sense for changes that happen while the service is running.
264 if !isInInitialList {
265 recordDNSProgrammingLatency(obj.LastChangeTriggerTime)
266 }
267 }
268}
269
270func (dns *dnsControl) OnDelete(obj interface{}) {
271 dns.updateModified()
272 // Note: We cannot record programming latency on deletes, because the trigger
273 // time annotation is not updated when the object is deleted.
274}
275
276func (dns *dnsControl) OnUpdate(oldObj, newObj interface{}) {
277 // If both objects have the same resource version, they are identical.
278 if oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion() {
279 return
280 }
281
282 switch newObj := newObj.(type) {
283 case *object.Service:
284 if object.ServiceModified(oldObj.(*object.Service), newObj) {
285 dns.updateModified()
286 }
287 case *object.Endpoints:
288 oldObj := oldObj.(*object.Endpoints)
289 if object.EndpointsModified(oldObj, newObj) {
290 dns.updateModified()
291 // If the trigger time has not changed, the process that last updated the
292 // object did not update the trigger time, so we can't know the latency.
293 if oldObj.LastChangeTriggerTime != newObj.LastChangeTriggerTime {
294 recordDNSProgrammingLatency(newObj.LastChangeTriggerTime)
295 }
296 }
297 }
298}
299
300func (dns *dnsControl) Modified() int64 {
301 return dns.modified.Load()
302}
303
304// updateModified set dns.modified to the current time.
305func (dns *dnsControl) updateModified() {
306 unix := time.Now().Unix()
307 dns.modified.Store(unix)
308}
309
310var errObj = errors.New("obj was not of the correct type")