blob: b1616adc5bef410e054bb89de64c7d0db99d84e3 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
Lorenz Brunb682ba52020-07-08 14:51:36 +02002// SPDX-License-Identifier: Apache-2.0
Lorenz Brunb682ba52020-07-08 14:51:36 +02003
Serge Bazanski216fe7b2021-05-21 18:36:16 +02004// Package nfproxy is a Kubernetes Service IP proxy based exclusively on the
5// Linux nftables interface. It uses netfilter's NAT capabilities to accept
6// traffic on service IPs and DNAT it to the respective endpoint.
Lorenz Brunb682ba52020-07-08 14:51:36 +02007package nfproxy
8
9import (
10 "context"
11 "errors"
12 "fmt"
13 "net"
14 "os"
15 "time"
16
Lorenz Brunb682ba52020-07-08 14:51:36 +020017 "github.com/sbezverk/nfproxy/pkg/controller"
18 "github.com/sbezverk/nfproxy/pkg/nftables"
19 "github.com/sbezverk/nfproxy/pkg/proxy"
20 v1 "k8s.io/api/core/v1"
21 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22 "k8s.io/apimachinery/pkg/labels"
23 "k8s.io/apimachinery/pkg/selection"
24 kubeinformers "k8s.io/client-go/informers"
25 "k8s.io/client-go/kubernetes"
26 "k8s.io/client-go/kubernetes/scheme"
Serge Bazanski6d6ed312023-03-27 11:04:14 +020027 "k8s.io/client-go/tools/cache"
Lorenz Brunb682ba52020-07-08 14:51:36 +020028 "k8s.io/client-go/tools/record"
Serge Bazanski77cb6c52020-12-19 00:09:22 +010029
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020030 "source.monogon.dev/osbase/supervisor"
Lorenz Brunb682ba52020-07-08 14:51:36 +020031)
32
33type Service struct {
Serge Bazanski216fe7b2021-05-21 18:36:16 +020034 // Traffic in ClusterCIDR is assumed to be originated inside the cluster
35 // and will not be SNATed
Lorenz Brunb682ba52020-07-08 14:51:36 +020036 ClusterCIDR net.IPNet
37 // A Kubernetes ClientSet with read access to endpoints and services
38 ClientSet kubernetes.Interface
39}
40
41func (s *Service) Run(ctx context.Context) error {
42 var ipv4ClusterCIDR string
43 var ipv6ClusterCIDR string
44 if s.ClusterCIDR.IP.To4() == nil && s.ClusterCIDR.IP.To16() != nil {
45 ipv6ClusterCIDR = s.ClusterCIDR.String()
46 } else if s.ClusterCIDR.IP.To4() != nil {
47 ipv4ClusterCIDR = s.ClusterCIDR.String()
48 } else {
49 return errors.New("invalid ClusterCIDR")
50 }
51 nfti, err := nftables.InitNFTables(ipv4ClusterCIDR, ipv6ClusterCIDR)
52 if err != nil {
53 return fmt.Errorf("failed to initialize nftables with error: %w", err)
54 }
55
56 // Create event recorder to report events into K8s
57 hostname, err := os.Hostname()
58 if err != nil {
59 return fmt.Errorf("failed to get local host name with error: %w", err)
60 }
61 eventBroadcaster := record.NewBroadcaster()
62 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "nfproxy", Host: hostname})
63
64 // Create new proxy controller with endpoint slices enabled
65 // https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/
66 nfproxy := proxy.NewProxy(nfti, hostname, recorder, true)
67
68 // Create special informer which doesn't track headless services
69 noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
70 if err != nil {
71 return fmt.Errorf("failed to create Requirement for noHeadlessEndpoints: %w", err)
72 }
73 labelSelector := labels.NewSelector()
74 labelSelector = labelSelector.Add(*noHeadlessEndpoints)
75
76 kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(s.ClientSet, time.Minute*5,
77 kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
78 options.LabelSelector = labelSelector.String()
79 }))
80
Lorenz Brune6e570a2023-11-28 19:23:19 +010081 endpointSlicesInformer := kubeInformerFactory.Discovery().V1().EndpointSlices()
Serge Bazanski6d6ed312023-03-27 11:04:14 +020082 endpointSlicesInformer.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
83 supervisor.Logger(ctx).Errorf("endpoint slices watch error: %v", err)
84 })
85 servicesInformer := kubeInformerFactory.Core().V1().Services()
86 servicesInformer.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
87 supervisor.Logger(ctx).Errorf("service informer watch error: %v", err)
88 })
89
90 svcController := controller.NewServiceController(nfproxy, s.ClientSet, servicesInformer)
91 ep := controller.NewEndpointSliceController(nfproxy, s.ClientSet, endpointSlicesInformer)
Lorenz Brunb682ba52020-07-08 14:51:36 +020092 kubeInformerFactory.Start(ctx.Done())
93
94 if err = svcController.Start(ctx.Done()); err != nil {
95 return fmt.Errorf("error running Service controller: %w", err)
96 }
97 if err = ep.Start(ctx.Done()); err != nil {
98 return fmt.Errorf("error running endpoint controller: %w", err)
99 }
100 supervisor.Signal(ctx, supervisor.SignalHealthy)
101 supervisor.Signal(ctx, supervisor.SignalDone)
102 return nil
103}