osbase/net/dns/kubernetes: add Kubernetes DNS handler

This adds a DNS server handler for Kubernetes DNS service discovery. It
is partially based on the CoreDNS Kubernetes plugin. The query handler
however is written completely from scratch. The handler in the CoreDNS
plugin is very weird; it first handles each query type separately, and
generates msg.Service objects which then need to be converted to dns
records. The new implementation is much simpler, and also more correct:
It handles ANY queries, and follows the rules for NXDOMAIN (If a name is
NXDOMAIN for one qtype, it is NXDOMAIN for all qtypes, and subdomains of
the name are also NXDOMAIN.)

Change-Id: Id1d498ca5384a3b047587ed73e95e4871d82d499
Reviewed-on: https://review.monogon.dev/c/monogon/+/3259
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/osbase/net/dns/kubernetes/controller.go b/osbase/net/dns/kubernetes/controller.go
new file mode 100644
index 0000000..61a33fc
--- /dev/null
+++ b/osbase/net/dns/kubernetes/controller.go
@@ -0,0 +1,307 @@
+package kubernetes
+
+// Taken and modified from the Kubernetes plugin of CoreDNS, under Apache 2.0.
+
+import (
+	"context"
+	"errors"
+	"sync/atomic"
+	"time"
+
+	api "k8s.io/api/core/v1"
+	discovery "k8s.io/api/discovery/v1"
+	meta "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/watch"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/tools/cache"
+
+	"source.monogon.dev/osbase/net/dns/kubernetes/object"
+)
+
+const (
+	svcIPIndex           = "ServiceIP"
+	epNameNamespaceIndex = "EndpointNameNamespace"
+	epIPIndex            = "EndpointsIP"
+)
+
+// epLabelSelector selects EndpointSlices that belong to headless services.
+// Endpoint DNS records are only served for headless services,
+// and we can save resources by not fetching other EndpointSlices.
+const epLabelSelector = api.IsHeadlessService
+
+type dnsController interface {
+	GetSvc(string) *object.Service
+	SvcIndexReverse(string) []*object.Service
+	EpIndex(string) []*object.Endpoints
+	EpIndexReverse(string) []*object.Endpoints
+	NamespaceExists(string) bool
+
+	Start(<-chan struct{})
+	HasSynced() bool
+
+	// Modified returns the timestamp of the most recent changes to services.
+	Modified() int64
+}
+
+type dnsControl struct {
+	// modified tracks timestamp of the most recent changes
+	modified  atomic.Int64
+	hasSynced atomic.Bool
+
+	svcController cache.Controller
+	epController  cache.Controller
+	nsController  cache.Controller
+
+	svcLister cache.Indexer
+	epLister  cache.Indexer
+	nsLister  cache.Store
+}
+
+// newdnsController creates a controller.
+func newdnsController(ctx context.Context, kubeClient kubernetes.Interface) *dnsControl {
+	dns := dnsControl{}
+
+	dns.svcLister, dns.svcController = object.NewIndexerInformer(
+		&cache.ListWatch{
+			ListFunc:  serviceListFunc(ctx, kubeClient, api.NamespaceAll),
+			WatchFunc: serviceWatchFunc(ctx, kubeClient, api.NamespaceAll),
+		},
+		&api.Service{},
+		&dns,
+		cache.Indexers{svcIPIndex: svcIPIndexFunc},
+		object.DefaultProcessor(object.ToService),
+	)
+
+	dns.epLister, dns.epController = object.NewIndexerInformer(
+		&cache.ListWatch{
+			ListFunc:  endpointSliceListFunc(ctx, kubeClient, api.NamespaceAll),
+			WatchFunc: endpointSliceWatchFunc(ctx, kubeClient, api.NamespaceAll),
+		},
+		&discovery.EndpointSlice{},
+		&dns,
+		cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
+		object.DefaultProcessor(object.EndpointSliceToEndpoints),
+	)
+
+	dns.nsLister, dns.nsController = object.NewIndexerInformer(
+		&cache.ListWatch{
+			ListFunc:  namespaceListFunc(ctx, kubeClient),
+			WatchFunc: namespaceWatchFunc(ctx, kubeClient),
+		},
+		&api.Namespace{},
+		&dns,
+		cache.Indexers{},
+		object.DefaultProcessor(object.ToNamespace),
+	)
+
+	return &dns
+}
+
+func svcIPIndexFunc(obj interface{}) ([]string, error) {
+	svc, ok := obj.(*object.Service)
+	if !ok {
+		return nil, errObj
+	}
+	idx := make([]string, len(svc.ClusterIPs))
+	copy(idx, svc.ClusterIPs)
+	return idx, nil
+}
+
+func epNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
+	s, ok := obj.(*object.Endpoints)
+	if !ok {
+		return nil, errObj
+	}
+	return []string{s.Index}, nil
+}
+
+func epIPIndexFunc(obj interface{}) ([]string, error) {
+	ep, ok := obj.(*object.Endpoints)
+	if !ok {
+		return nil, errObj
+	}
+	idx := make([]string, len(ep.Addresses))
+	for i, addr := range ep.Addresses {
+		idx[i] = addr.IP
+	}
+	return idx, nil
+}
+
+func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string) func(meta.ListOptions) (runtime.Object, error) {
+	return func(opts meta.ListOptions) (runtime.Object, error) {
+		return c.CoreV1().Services(ns).List(ctx, opts)
+	}
+}
+
+func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string) func(meta.ListOptions) (runtime.Object, error) {
+	return func(opts meta.ListOptions) (runtime.Object, error) {
+		opts.LabelSelector = epLabelSelector
+		return c.DiscoveryV1().EndpointSlices(ns).List(ctx, opts)
+	}
+}
+
+func namespaceListFunc(ctx context.Context, c kubernetes.Interface) func(meta.ListOptions) (runtime.Object, error) {
+	return func(opts meta.ListOptions) (runtime.Object, error) {
+		return c.CoreV1().Namespaces().List(ctx, opts)
+	}
+}
+
+func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string) func(options meta.ListOptions) (watch.Interface, error) {
+	return func(options meta.ListOptions) (watch.Interface, error) {
+		return c.CoreV1().Services(ns).Watch(ctx, options)
+	}
+}
+
+func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string) func(options meta.ListOptions) (watch.Interface, error) {
+	return func(options meta.ListOptions) (watch.Interface, error) {
+		options.LabelSelector = epLabelSelector
+		return c.DiscoveryV1().EndpointSlices(ns).Watch(ctx, options)
+	}
+}
+
+func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface) func(options meta.ListOptions) (watch.Interface, error) {
+	return func(options meta.ListOptions) (watch.Interface, error) {
+		return c.CoreV1().Namespaces().Watch(ctx, options)
+	}
+}
+
+// Start starts the controller.
+func (dns *dnsControl) Start(stopCh <-chan struct{}) {
+	go dns.svcController.Run(stopCh)
+	go dns.epController.Run(stopCh)
+	go dns.nsController.Run(stopCh)
+}
+
+// HasSynced returns true if the initial data has been
+// completely loaded into memory.
+func (dns *dnsControl) HasSynced() bool {
+	if dns.hasSynced.Load() {
+		return true
+	}
+	a := dns.svcController.HasSynced()
+	b := dns.epController.HasSynced()
+	c := dns.nsController.HasSynced()
+	if a && b && c {
+		dns.hasSynced.Store(true)
+		return true
+	}
+	return false
+}
+
+func (dns *dnsControl) GetSvc(key string) *object.Service {
+	o, exists, err := dns.svcLister.GetByKey(key)
+	if err != nil || !exists {
+		return nil
+	}
+	s, ok := o.(*object.Service)
+	if ok {
+		return s
+	}
+	return nil
+}
+
+func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) {
+	os, err := dns.svcLister.ByIndex(svcIPIndex, ip)
+	if err != nil {
+		return nil
+	}
+	svcs = make([]*object.Service, 0, len(os))
+	for _, o := range os {
+		s, ok := o.(*object.Service)
+		if ok {
+			svcs = append(svcs, s)
+		}
+	}
+	return svcs
+}
+
+func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
+	os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
+	if err != nil {
+		return nil
+	}
+	ep = make([]*object.Endpoints, 0, len(os))
+	for _, o := range os {
+		e, ok := o.(*object.Endpoints)
+		if ok {
+			ep = append(ep, e)
+		}
+	}
+	return ep
+}
+
+func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
+	os, err := dns.epLister.ByIndex(epIPIndex, ip)
+	if err != nil {
+		return nil
+	}
+	ep = make([]*object.Endpoints, 0, len(os))
+	for _, o := range os {
+		e, ok := o.(*object.Endpoints)
+		if ok {
+			ep = append(ep, e)
+		}
+	}
+	return ep
+}
+
+// NamespaceExists returns true if a namespace with this name exists.
+func (dns *dnsControl) NamespaceExists(name string) bool {
+	_, exists, _ := dns.nsLister.GetByKey(name)
+	return exists
+}
+
+func (dns *dnsControl) OnAdd(obj interface{}, isInInitialList bool) {
+	dns.updateModified()
+	switch obj := obj.(type) {
+	case *object.Endpoints:
+		// Don't record latency during initial sync, because measuring latency only
+		// makes sense for changes that happen while the service is running.
+		if !isInInitialList {
+			recordDNSProgrammingLatency(obj.LastChangeTriggerTime)
+		}
+	}
+}
+
+func (dns *dnsControl) OnDelete(obj interface{}) {
+	dns.updateModified()
+	// Note: We cannot record programming latency on deletes, because the trigger
+	// time annotation is not updated when the object is deleted.
+}
+
+func (dns *dnsControl) OnUpdate(oldObj, newObj interface{}) {
+	// If both objects have the same resource version, they are identical.
+	if oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion() {
+		return
+	}
+
+	switch newObj := newObj.(type) {
+	case *object.Service:
+		if object.ServiceModified(oldObj.(*object.Service), newObj) {
+			dns.updateModified()
+		}
+	case *object.Endpoints:
+		oldObj := oldObj.(*object.Endpoints)
+		if object.EndpointsModified(oldObj, newObj) {
+			dns.updateModified()
+			// If the trigger time has not changed, the process that last updated the
+			// object did not update the trigger time, so we can't know the latency.
+			if oldObj.LastChangeTriggerTime != newObj.LastChangeTriggerTime {
+				recordDNSProgrammingLatency(newObj.LastChangeTriggerTime)
+			}
+		}
+	}
+}
+
+func (dns *dnsControl) Modified() int64 {
+	return dns.modified.Load()
+}
+
+// updateModified set dns.modified to the current time.
+func (dns *dnsControl) updateModified() {
+	unix := time.Now().Unix()
+	dns.modified.Store(unix)
+}
+
+var errObj = errors.New("obj was not of the correct type")