|  | // Copyright 2020 The Monogon Project Authors. | 
|  | // | 
|  | // SPDX-License-Identifier: Apache-2.0 | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  |  | 
|  | // Package nfproxy is a Kubernetes Service IP proxy based exclusively on the | 
|  | // Linux nftables interface.  It uses netfilter's NAT capabilities to accept | 
|  | // traffic on service IPs and DNAT it to the respective endpoint. | 
|  | package nfproxy | 
|  |  | 
|  | import ( | 
|  | "context" | 
|  | "errors" | 
|  | "fmt" | 
|  | "net" | 
|  | "os" | 
|  | "time" | 
|  |  | 
|  | "github.com/sbezverk/nfproxy/pkg/controller" | 
|  | "github.com/sbezverk/nfproxy/pkg/nftables" | 
|  | "github.com/sbezverk/nfproxy/pkg/proxy" | 
|  | v1 "k8s.io/api/core/v1" | 
|  | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 
|  | "k8s.io/apimachinery/pkg/labels" | 
|  | "k8s.io/apimachinery/pkg/selection" | 
|  | kubeinformers "k8s.io/client-go/informers" | 
|  | "k8s.io/client-go/kubernetes" | 
|  | "k8s.io/client-go/kubernetes/scheme" | 
|  | "k8s.io/client-go/tools/record" | 
|  |  | 
|  | "source.monogon.dev/metropolis/pkg/supervisor" | 
|  | ) | 
|  |  | 
|  | type Service struct { | 
|  | // Traffic in ClusterCIDR is assumed to be originated inside the cluster | 
|  | // and will not be SNATed | 
|  | ClusterCIDR net.IPNet | 
|  | // A Kubernetes ClientSet with read access to endpoints and services | 
|  | ClientSet kubernetes.Interface | 
|  | } | 
|  |  | 
|  | func (s *Service) Run(ctx context.Context) error { | 
|  | var ipv4ClusterCIDR string | 
|  | var ipv6ClusterCIDR string | 
|  | if s.ClusterCIDR.IP.To4() == nil && s.ClusterCIDR.IP.To16() != nil { | 
|  | ipv6ClusterCIDR = s.ClusterCIDR.String() | 
|  | } else if s.ClusterCIDR.IP.To4() != nil { | 
|  | ipv4ClusterCIDR = s.ClusterCIDR.String() | 
|  | } else { | 
|  | return errors.New("invalid ClusterCIDR") | 
|  | } | 
|  | nfti, err := nftables.InitNFTables(ipv4ClusterCIDR, ipv6ClusterCIDR) | 
|  | if err != nil { | 
|  | return fmt.Errorf("failed to initialize nftables with error: %w", err) | 
|  | } | 
|  |  | 
|  | // Create event recorder to report events into K8s | 
|  | hostname, err := os.Hostname() | 
|  | if err != nil { | 
|  | return fmt.Errorf("failed to get local host name with error: %w", err) | 
|  | } | 
|  | eventBroadcaster := record.NewBroadcaster() | 
|  | recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "nfproxy", Host: hostname}) | 
|  |  | 
|  | // Create new proxy controller with endpoint slices enabled | 
|  | // https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/ | 
|  | nfproxy := proxy.NewProxy(nfti, hostname, recorder, true) | 
|  |  | 
|  | // Create special informer which doesn't track headless services | 
|  | noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil) | 
|  | if err != nil { | 
|  | return fmt.Errorf("failed to create Requirement for noHeadlessEndpoints: %w", err) | 
|  | } | 
|  | labelSelector := labels.NewSelector() | 
|  | labelSelector = labelSelector.Add(*noHeadlessEndpoints) | 
|  |  | 
|  | kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(s.ClientSet, time.Minute*5, | 
|  | kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) { | 
|  | options.LabelSelector = labelSelector.String() | 
|  | })) | 
|  |  | 
|  | svcController := controller.NewServiceController(nfproxy, s.ClientSet, kubeInformerFactory.Core().V1().Services()) | 
|  | ep := controller.NewEndpointSliceController(nfproxy, s.ClientSet, kubeInformerFactory.Discovery().V1beta1().EndpointSlices()) | 
|  | kubeInformerFactory.Start(ctx.Done()) | 
|  |  | 
|  | if err = svcController.Start(ctx.Done()); err != nil { | 
|  | return fmt.Errorf("error running Service controller: %w", err) | 
|  | } | 
|  | if err = ep.Start(ctx.Done()); err != nil { | 
|  | return fmt.Errorf("error running endpoint controller: %w", err) | 
|  | } | 
|  | supervisor.Signal(ctx, supervisor.SignalHealthy) | 
|  | supervisor.Signal(ctx, supervisor.SignalDone) | 
|  | return nil | 
|  | } |