Add service proxy

This adds a service proxy based on nfproxy and changes to the service IP allocation to make it work.
Also adds support for masquerading outbound traffic for outbound network connectivity.

Test Plan:
Currently manually tested by creating an alpine pod and running 'apk add curl && curl -k https://192.168.188.1:443/'.
Will be covered later by CTS.

Bug: T810

X-Origin-Diff: phab/D580
GitOrigin-RevId: cace863fd8c2f045560f8abf84c40cc77bc275d4
diff --git a/core/internal/kubernetes/BUILD.bazel b/core/internal/kubernetes/BUILD.bazel
index 3bcbe6a..6b5d652 100644
--- a/core/internal/kubernetes/BUILD.bazel
+++ b/core/internal/kubernetes/BUILD.bazel
@@ -17,6 +17,7 @@
         "//core/internal/common:go_default_library",
         "//core/internal/common/supervisor:go_default_library",
         "//core/internal/kubernetes/clusternet:go_default_library",
+        "//core/internal/kubernetes/nfproxy:go_default_library",
         "//core/internal/kubernetes/pki:go_default_library",
         "//core/internal/kubernetes/reconciler:go_default_library",
         "//core/internal/localstorage:go_default_library",
diff --git a/core/internal/kubernetes/clusternet/clusternet.go b/core/internal/kubernetes/clusternet/clusternet.go
index 5c42bb8..e41ba8a 100644
--- a/core/internal/kubernetes/clusternet/clusternet.go
+++ b/core/internal/kubernetes/clusternet/clusternet.go
@@ -108,6 +108,7 @@
 		}
 		allowedIPs = append(allowedIPs, *podNet)
 	}
+	allowedIPs = append(allowedIPs, net.IPNet{IP: internalIP, Mask: net.CIDRMask(32, 32)})
 	s.logger.Debug("Adding/Updating WireGuard peer node", zap.String("node", newNode.Name),
 		zap.String("endpointIP", internalIP.String()), zap.Any("allowedIPs", allowedIPs))
 	// WireGuard's kernel side has create/update semantics on peers by default. So we can just add the peer multiple
diff --git a/core/internal/kubernetes/nfproxy/BUILD.bazel b/core/internal/kubernetes/nfproxy/BUILD.bazel
new file mode 100644
index 0000000..4bc7ab7
--- /dev/null
+++ b/core/internal/kubernetes/nfproxy/BUILD.bazel
@@ -0,0 +1,22 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+    name = "go_default_library",
+    srcs = ["nfproxy.go"],
+    importpath = "git.monogon.dev/source/nexantic.git/core/internal/kubernetes/nfproxy",
+    visibility = ["//core:__subpackages__"],
+    deps = [
+        "//core/internal/common/supervisor:go_default_library",
+        "@com_github_sbezverk_nfproxy//pkg/controller:go_default_library",
+        "@com_github_sbezverk_nfproxy//pkg/nftables:go_default_library",
+        "@com_github_sbezverk_nfproxy//pkg/proxy:go_default_library",
+        "@io_k8s_api//core/v1:go_default_library",
+        "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
+        "@io_k8s_apimachinery//pkg/labels:go_default_library",
+        "@io_k8s_apimachinery//pkg/selection:go_default_library",
+        "@io_k8s_client_go//informers:go_default_library",
+        "@io_k8s_client_go//kubernetes:go_default_library",
+        "@io_k8s_client_go//kubernetes/scheme:go_default_library",
+        "@io_k8s_client_go//tools/record:go_default_library",
+    ],
+)
diff --git a/core/internal/kubernetes/nfproxy/nfproxy.go b/core/internal/kubernetes/nfproxy/nfproxy.go
new file mode 100644
index 0000000..25962bf
--- /dev/null
+++ b/core/internal/kubernetes/nfproxy/nfproxy.go
@@ -0,0 +1,104 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package nfproxy is a Kubernetes Service IP proxy based exclusively on the Linux nftables interface.
+// It uses netfilter's NAT capabilities to accept traffic on service IPs and DNAT it to the respective endpoint.
+package nfproxy
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"net"
+	"os"
+	"time"
+
+	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+
+	"github.com/sbezverk/nfproxy/pkg/controller"
+	"github.com/sbezverk/nfproxy/pkg/nftables"
+	"github.com/sbezverk/nfproxy/pkg/proxy"
+	v1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/labels"
+	"k8s.io/apimachinery/pkg/selection"
+	kubeinformers "k8s.io/client-go/informers"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/kubernetes/scheme"
+	"k8s.io/client-go/tools/record"
+)
+
+type Service struct {
+	// Traffic in ClusterCIDR is assumed to be originated inside the cluster and will not be SNATed
+	ClusterCIDR net.IPNet
+	// A Kubernetes ClientSet with read access to endpoints and services
+	ClientSet kubernetes.Interface
+}
+
+func (s *Service) Run(ctx context.Context) error {
+	var ipv4ClusterCIDR string
+	var ipv6ClusterCIDR string
+	if s.ClusterCIDR.IP.To4() == nil && s.ClusterCIDR.IP.To16() != nil {
+		ipv6ClusterCIDR = s.ClusterCIDR.String()
+	} else if s.ClusterCIDR.IP.To4() != nil {
+		ipv4ClusterCIDR = s.ClusterCIDR.String()
+	} else {
+		return errors.New("invalid ClusterCIDR")
+	}
+	nfti, err := nftables.InitNFTables(ipv4ClusterCIDR, ipv6ClusterCIDR)
+	if err != nil {
+		return fmt.Errorf("failed to initialize nftables with error: %w", err)
+	}
+
+	// Create event recorder to report events into K8s
+	hostname, err := os.Hostname()
+	if err != nil {
+		return fmt.Errorf("failed to get local host name with error: %w", err)
+	}
+	eventBroadcaster := record.NewBroadcaster()
+	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "nfproxy", Host: hostname})
+
+	// Create new proxy controller with endpoint slices enabled
+	// https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/
+	nfproxy := proxy.NewProxy(nfti, hostname, recorder, true)
+
+	// Create special informer which doesn't track headless services
+	noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
+	if err != nil {
+		return fmt.Errorf("failed to create Requirement for noHeadlessEndpoints: %w", err)
+	}
+	labelSelector := labels.NewSelector()
+	labelSelector = labelSelector.Add(*noHeadlessEndpoints)
+
+	kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(s.ClientSet, time.Minute*5,
+		kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
+			options.LabelSelector = labelSelector.String()
+		}))
+
+	svcController := controller.NewServiceController(nfproxy, s.ClientSet, kubeInformerFactory.Core().V1().Services())
+	ep := controller.NewEndpointSliceController(nfproxy, s.ClientSet, kubeInformerFactory.Discovery().V1beta1().EndpointSlices())
+	kubeInformerFactory.Start(ctx.Done())
+
+	if err = svcController.Start(ctx.Done()); err != nil {
+		return fmt.Errorf("error running Service controller: %w", err)
+	}
+	if err = ep.Start(ctx.Done()); err != nil {
+		return fmt.Errorf("error running endpoint controller: %w", err)
+	}
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	supervisor.Signal(ctx, supervisor.SignalDone)
+	return nil
+}
diff --git a/core/internal/kubernetes/pki/kubernetes.go b/core/internal/kubernetes/pki/kubernetes.go
index 48ce6e9..0de8f6d 100644
--- a/core/internal/kubernetes/pki/kubernetes.go
+++ b/core/internal/kubernetes/pki/kubernetes.go
@@ -103,7 +103,7 @@
 			"kubernetes.default.svc.cluster.local",
 			"localhost",
 		},
-		[]net.IP{{127, 0, 0, 1}}, // TODO(q3k): add service network internal apiserver address
+		[]net.IP{{10, 0, 255, 1}, {127, 0, 0, 1}}, // TODO(q3k): add service network internal apiserver address
 	))
 	make(IdCA, KubeletClient, Client("smalltown:apiserver-kubelet-client", nil))
 	make(IdCA, ControllerManagerClient, Client("system:kube-controller-manager", nil))
diff --git a/core/internal/kubernetes/service.go b/core/internal/kubernetes/service.go
index 2396066..a22b6b9 100644
--- a/core/internal/kubernetes/service.go
+++ b/core/internal/kubernetes/service.go
@@ -33,6 +33,7 @@
 
 	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
 	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/clusternet"
+	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/nfproxy"
 	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
 	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/reconciler"
 	"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
@@ -156,6 +157,11 @@
 		DataDirectory:   &s.c.Root.Data.Kubernetes.ClusterNetworking,
 	}
 
+	nfproxy := nfproxy.Service{
+		ClusterCIDR: s.c.ClusterNet,
+		ClientSet:   clientSet,
+	}
+
 	for _, sub := range []struct {
 		name     string
 		runnable supervisor.Runnable
@@ -168,6 +174,7 @@
 		{"csi-plugin", csiPlugin.Run},
 		{"csi-provisioner", csiProvisioner.Run},
 		{"clusternet", clusternet.Run},
+		{"nfproxy", nfproxy.Run},
 	} {
 		err := supervisor.Run(ctx, sub.name, sub.runnable)
 		if err != nil {