Add Wireguard-based K8s pod networking
This adds a pod networking runnable based on Wireguard which watches all nodes
and adds their K8s IPAM allocations as routes into the kernel & WireGuard. It only depends
on K8s and only performs direct routing.
Test Plan: Manually tested by spinning up a two-node cluster and running two Alpine pods pinging eachother. Can be covered by E2E tests once we can do image preseeding for the test infra (T793).
Bug: T487
X-Origin-Diff: phab/D573
GitOrigin-RevId: ba3fc36f421fd75002f6cf8bea25ed6f1eb457b0
diff --git a/core/internal/common/setup.go b/core/internal/common/setup.go
index fa5cd59..6510774 100644
--- a/core/internal/common/setup.go
+++ b/core/internal/common/setup.go
@@ -34,6 +34,7 @@
MasterServicePort = 7833
ExternalServicePort = 7836
DebugServicePort = 7837
+ WireGuardPort = 7838
KubernetesAPIPort = 6443
DebuggerPort = 2345
)
diff --git a/core/internal/containerd/BUILD.bazel b/core/internal/containerd/BUILD.bazel
index 56c2822..a1deae0 100644
--- a/core/internal/containerd/BUILD.bazel
+++ b/core/internal/containerd/BUILD.bazel
@@ -15,6 +15,5 @@
exports_files([
"config.toml",
"runsc.toml",
- "loopback.json",
- "ptp.json",
+ "cnispec.gojson",
])
diff --git a/core/internal/containerd/cnispec.gojson b/core/internal/containerd/cnispec.gojson
new file mode 100644
index 0000000..0057036
--- /dev/null
+++ b/core/internal/containerd/cnispec.gojson
@@ -0,0 +1,29 @@
+{{- /*gotype: github.com/containerd/cri/pkg/server.cniConfigTemplate*/ -}}
+{
+ "name": "k8s-pod-network",
+ "cniVersion": "0.3.1",
+ "plugins": [
+ {
+ "type": "ptp",
+ "mtu": 1420,
+ "ipam": {
+ "type": "host-local",
+ "dataDir": "/containerd/run/ipam",
+ "ranges": [
+ {{range $i, $range := .PodCIDRRanges}}{{if $i}},
+ {{end}}[
+ {
+ "subnet": "{{$range}}"
+ }
+ ]
+ {{end}}
+ ],
+ "routes": [
+ {{range $i, $route := .Routes}}{{if $i}},
+ {{end}}{
+ "dst": "{{$route}}"
+}{{end}}]
+}
+}
+]
+}
\ No newline at end of file
diff --git a/core/internal/containerd/config.toml b/core/internal/containerd/config.toml
index 5a7e2f6..415391a 100644
--- a/core/internal/containerd/config.toml
+++ b/core/internal/containerd/config.toml
@@ -90,8 +90,8 @@
[plugins."io.containerd.grpc.v1.cri".cni]
bin_dir = "/containerd/bin/cni"
conf_dir = "/containerd/conf/cni"
- max_conf_num = 1
- conf_template = ""
+ max_conf_num = 0
+ conf_template = "/containerd/conf/cnispec.gojson"
[plugins."io.containerd.grpc.v1.cri".registry]
[plugins."io.containerd.grpc.v1.cri".registry.mirrors]
[plugins."io.containerd.grpc.v1.cri".registry.mirrors."docker.io"]
diff --git a/core/internal/containerd/loopback.json b/core/internal/containerd/loopback.json
deleted file mode 100644
index f375c5d..0000000
--- a/core/internal/containerd/loopback.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{
- "cniVersion": "0.3.0",
- "type": "loopback"
-}
diff --git a/core/internal/containerd/ptp.json b/core/internal/containerd/ptp.json
deleted file mode 100644
index d95da5d..0000000
--- a/core/internal/containerd/ptp.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
- "name": "k8s-pod-network",
- "cniVersion": "0.3.1",
- "type": "ptp",
- "mtu": 1420,
- "ipam": {
- "type": "host-local",
- "subnet": "192.168.198.0/24",
- "routes": [{ "dst": "0.0.0.0/0" }],
- "dataDir": "/containerd/run/ipam"
- }
-}
diff --git a/core/internal/kubernetes/BUILD.bazel b/core/internal/kubernetes/BUILD.bazel
index 97387df..0a7fa22 100644
--- a/core/internal/kubernetes/BUILD.bazel
+++ b/core/internal/kubernetes/BUILD.bazel
@@ -18,6 +18,7 @@
"//core/internal/common:go_default_library",
"//core/internal/common/supervisor:go_default_library",
"//core/internal/consensus:go_default_library",
+ "//core/internal/kubernetes/clusternet:go_default_library",
"//core/internal/kubernetes/pki:go_default_library",
"//core/internal/kubernetes/reconciler:go_default_library",
"//core/internal/storage:go_default_library",
diff --git a/core/internal/kubernetes/apiserver.go b/core/internal/kubernetes/apiserver.go
index 0a740dd..26c258a 100644
--- a/core/internal/kubernetes/apiserver.go
+++ b/core/internal/kubernetes/apiserver.go
@@ -103,7 +103,7 @@
pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.kubeletClientCert})),
args.FileOpt("--kubelet-client-key", "kubelet-client-key.pem",
pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.kubeletClientKey})),
- "--kubelet-preferred-address-types=Hostname",
+ "--kubelet-preferred-address-types=InternalIP",
args.FileOpt("--proxy-client-cert-file", "aggregation-client-cert.pem",
pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.aggregationClientCert})),
args.FileOpt("--proxy-client-key-file", "aggregation-client-key.pem",
diff --git a/core/internal/kubernetes/clusternet/BUILD.bazel b/core/internal/kubernetes/clusternet/BUILD.bazel
new file mode 100644
index 0000000..484439c
--- /dev/null
+++ b/core/internal/kubernetes/clusternet/BUILD.bazel
@@ -0,0 +1,27 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "clusternet.go",
+ "netlink_compat.go",
+ ],
+ importpath = "git.monogon.dev/source/nexantic.git/core/internal/kubernetes/clusternet",
+ visibility = ["//core:__subpackages__"],
+ deps = [
+ "//core/internal/common:go_default_library",
+ "//core/internal/common/supervisor:go_default_library",
+ "//core/pkg/jsonpatch:go_default_library",
+ "@com_github_vishvananda_netlink//:go_default_library",
+ "@com_zx2c4_golang_wireguard_wgctrl//:go_default_library",
+ "@com_zx2c4_golang_wireguard_wgctrl//wgtypes:go_default_library",
+ "@io_k8s_api//core/v1:go_default_library",
+ "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
+ "@io_k8s_apimachinery//pkg/types:go_default_library",
+ "@io_k8s_client_go//informers:go_default_library",
+ "@io_k8s_client_go//informers/core/v1:go_default_library",
+ "@io_k8s_client_go//kubernetes:go_default_library",
+ "@io_k8s_client_go//tools/cache:go_default_library",
+ "@org_uber_go_zap//:go_default_library",
+ ],
+)
diff --git a/core/internal/kubernetes/clusternet/clusternet.go b/core/internal/kubernetes/clusternet/clusternet.go
new file mode 100644
index 0000000..aa3e7ce
--- /dev/null
+++ b/core/internal/kubernetes/clusternet/clusternet.go
@@ -0,0 +1,266 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package clusternet implements a WireGuard-based overlay network for Kubernetes. It relies on controller-manager's
+// IPAM to assign IP ranges to nodes and on Kubernetes' Node objects to distribute the Node IPs and public keys.
+//
+// It sets up a single WireGuard network interface and routes the entire ClusterCIDR into that network interface,
+// relying on WireGuard's AllowedIPs mechanism to look up the correct peer node to send the traffic to. This means
+// that the routing table doesn't change and doesn't have to be separately managed. When clusternet is started
+// it annotates its WireGuard public key onto its node object.
+// For each node object that's created or updated on the K8s apiserver it checks if a public key annotation is set and
+// if yes a peer with that public key, its InternalIP as endpoint and the CIDR for that node as AllowedIPs is created.
+package clusternet
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "net"
+ "os"
+
+ "github.com/vishvananda/netlink"
+ "go.uber.org/zap"
+ "golang.zx2c4.com/wireguard/wgctrl"
+ "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/informers"
+ coreinformers "k8s.io/client-go/informers/core/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/common"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/pkg/jsonpatch"
+)
+
+const (
+ clusterNetDeviceName = "clusternet"
+ publicKeyAnnotation = "node.smalltown.nexantic.com/wg-pubkey"
+
+ privateKeyPath = "/data/kubernetes/clusternet.key"
+)
+
+type clusternet struct {
+ nodeName string
+ wgClient *wgctrl.Client
+ nodeInformer coreinformers.NodeInformer
+ logger *zap.Logger
+}
+
+// ensureNode creates/updates the corresponding WireGuard peer entry for the given node objet
+func (c *clusternet) ensureNode(newNode *corev1.Node) error {
+ if newNode.Name == c.nodeName {
+ // Node doesn't need to connect to itself
+ return nil
+ }
+ pubKeyRaw := newNode.Annotations[publicKeyAnnotation]
+ if pubKeyRaw == "" {
+ return nil
+ }
+ pubKey, err := wgtypes.ParseKey(pubKeyRaw)
+ if err != nil {
+ return fmt.Errorf("failed to parse public-key annotation: %w", err)
+ }
+ var internalIP net.IP
+ for _, addr := range newNode.Status.Addresses {
+ if addr.Type == corev1.NodeInternalIP {
+ if internalIP != nil {
+ c.logger.Warn("More than one NodeInternalIP specified, using the first one")
+ break
+ }
+ internalIP = net.ParseIP(addr.Address)
+ if internalIP == nil {
+ c.logger.Warn("failed to parse Internal IP")
+ }
+ }
+ }
+ if internalIP == nil {
+ return errors.New("node has no Internal IP")
+ }
+ var allowedIPs []net.IPNet
+ for _, podNetStr := range newNode.Spec.PodCIDRs {
+ _, podNet, err := net.ParseCIDR(podNetStr)
+ if err != nil {
+ c.logger.Warn("Node PodCIDR failed to parse, ignored", zap.Error(err), zap.String("node", newNode.Name))
+ continue
+ }
+ allowedIPs = append(allowedIPs, *podNet)
+ }
+ c.logger.Debug("Adding/Updating WireGuard peer node", zap.String("node", newNode.Name),
+ zap.String("endpointIP", internalIP.String()), zap.Any("allowedIPs", allowedIPs))
+ // WireGuard's kernel side has create/update semantics on peers by default. So we can just add the peer multiple
+ // times to update it.
+ err = c.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ Peers: []wgtypes.PeerConfig{{
+ PublicKey: pubKey,
+ Endpoint: &net.UDPAddr{Port: common.WireGuardPort, IP: internalIP},
+ ReplaceAllowedIPs: true,
+ AllowedIPs: allowedIPs,
+ }},
+ })
+ if err != nil {
+ return fmt.Errorf("failed to add WireGuard peer node: %w", err)
+ }
+ return nil
+}
+
+// removeNode removes the corresponding WireGuard peer entry for the given node object
+func (c *clusternet) removeNode(oldNode *corev1.Node) error {
+ if oldNode.Name == c.nodeName {
+ // Node doesn't need to connect to itself
+ return nil
+ }
+ pubKeyRaw := oldNode.Annotations[publicKeyAnnotation]
+ if pubKeyRaw == "" {
+ return nil
+ }
+ pubKey, err := wgtypes.ParseKey(pubKeyRaw)
+ if err != nil {
+ return fmt.Errorf("node public-key annotation not decodable: %w", err)
+ }
+ err = c.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ Peers: []wgtypes.PeerConfig{{
+ PublicKey: pubKey,
+ Remove: true,
+ }},
+ })
+ if err != nil {
+ return fmt.Errorf("failed to remove WireGuard peer node: %w", err)
+ }
+ return nil
+}
+
+// EnsureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
+func EnsureOnDiskKey() (*wgtypes.Key, error) {
+ privKeyRaw, err := ioutil.ReadFile(privateKeyPath)
+ if os.IsNotExist(err) {
+ privKey, err := wgtypes.GeneratePrivateKey()
+ if err != nil {
+ return nil, fmt.Errorf("failed to generate private key: %w", err)
+ }
+ if err := ioutil.WriteFile(privateKeyPath, []byte(privKey.String()), 0600); err != nil {
+ return nil, fmt.Errorf("failed to store newly generated key: %w", err)
+ }
+ return &privKey, nil
+ } else if err != nil {
+ return nil, fmt.Errorf("failed to load on-disk key: %w", err)
+ }
+ privKey, err := wgtypes.ParseKey(string(privKeyRaw))
+ if err != nil {
+ return nil, fmt.Errorf("invalid private key in file: %w", err)
+ }
+ return &privKey, nil
+}
+
+// Run runs the ClusterNet service. See package description for what it does.
+func Run(informerFactory informers.SharedInformerFactory, clusterNet net.IPNet, clientSet kubernetes.Interface, key *wgtypes.Key) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ logger := supervisor.Logger(ctx)
+ nodeName, err := os.Hostname()
+ if err != nil {
+ return fmt.Errorf("failed to determine hostname: %w", err)
+ }
+ wgClient, err := wgctrl.New()
+ if err != nil {
+ return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
+ }
+
+ nodeAnnotationPatch := []jsonpatch.JsonPatchOp{{
+ Operation: "add",
+ Path: "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
+ Value: key.PublicKey().String(),
+ }}
+
+ nodeAnnotationPatchRaw, err := json.Marshal(nodeAnnotationPatch)
+ if err != nil {
+ return fmt.Errorf("failed to encode JSONPatch: %w", err)
+ }
+
+ if _, err := clientSet.CoreV1().Nodes().Patch(ctx, nodeName, types.JSONPatchType, nodeAnnotationPatchRaw, metav1.PatchOptions{}); err != nil {
+ return fmt.Errorf("failed to set ClusterNet public key for node: %w", err)
+ }
+
+ nodeInformer := informerFactory.Core().V1().Nodes()
+ wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
+ if err := netlink.LinkAdd(wgInterface); err != nil {
+ return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
+ }
+ defer netlink.LinkDel(wgInterface)
+
+ listenPort := common.WireGuardPort
+ if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ PrivateKey: key,
+ ListenPort: &listenPort,
+ }); err != nil {
+ return fmt.Errorf("failed to set up WireGuard interface: %w", err)
+ }
+
+ if err := netlink.RouteAdd(&netlink.Route{
+ Dst: &clusterNet,
+ LinkIndex: wgInterface.Index,
+ }); err != nil && !os.IsExist(err) {
+ return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
+ }
+
+ cnet := clusternet{
+ wgClient: wgClient,
+ nodeInformer: nodeInformer,
+ logger: logger,
+ nodeName: nodeName,
+ }
+
+ nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(new interface{}) {
+ newNode, ok := new.(*corev1.Node)
+ if !ok {
+ logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
+ return
+ }
+ if err := cnet.ensureNode(newNode); err != nil {
+ logger.Warn("Failed to sync node", zap.Error(err))
+ }
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newNode, ok := new.(*corev1.Node)
+ if !ok {
+ logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
+ return
+ }
+ if err := cnet.ensureNode(newNode); err != nil {
+ logger.Warn("Failed to sync node", zap.Error(err))
+ }
+ },
+ DeleteFunc: func(old interface{}) {
+ oldNode, ok := old.(*corev1.Node)
+ if !ok {
+ logger.Error("Received non-node item in node event handler", zap.Reflect("item", oldNode))
+ return
+ }
+ if err := cnet.removeNode(oldNode); err != nil {
+ logger.Warn("Failed to sync node", zap.Error(err))
+ }
+ },
+ })
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ nodeInformer.Informer().Run(ctx.Done())
+ return ctx.Err()
+ }
+}
diff --git a/core/internal/kubernetes/clusternet/netlink_compat.go b/core/internal/kubernetes/clusternet/netlink_compat.go
new file mode 100644
index 0000000..a90cc47
--- /dev/null
+++ b/core/internal/kubernetes/clusternet/netlink_compat.go
@@ -0,0 +1,33 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Can be removed once https://github.com/vishvananda/netlink/pull/464 lands
+package clusternet
+
+import "github.com/vishvananda/netlink"
+
+// Wireguard represent links of type "wireguard", see https://www.wireguard.com/
+type Wireguard struct {
+ netlink.LinkAttrs
+}
+
+func (wg *Wireguard) Attrs() *netlink.LinkAttrs {
+ return &wg.LinkAttrs
+}
+
+func (wg *Wireguard) Type() string {
+ return "wireguard"
+}
diff --git a/core/internal/kubernetes/controller-manager.go b/core/internal/kubernetes/controller-manager.go
index 0934ae1..8a85a99 100644
--- a/core/internal/kubernetes/controller-manager.go
+++ b/core/internal/kubernetes/controller-manager.go
@@ -41,6 +41,8 @@
serverKey []byte
}
+var clusterNet = net.IPNet{IP: net.IP{10, 0, 0, 0}, Mask: net.IPMask{255, 255, 0, 0}}
+
func getPKIControllerManagerConfig(ctx context.Context, kv clientv3.KV, kpki *pki.KubernetesPKI) (*controllerManagerConfig, error) {
var config controllerManagerConfig
var err error
@@ -84,7 +86,10 @@
pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
args.FileOpt("--tls-private-key-file", "server-key.pem",
pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
+ "--allocate-node-cidrs",
+ "--cluster-cidr="+clusterNet.String(),
)
+
if args.Error() != nil {
return fmt.Errorf("failed to use fileargs: %w", err)
}
diff --git a/core/internal/kubernetes/service.go b/core/internal/kubernetes/service.go
index 826e2d8..ccfb41c 100644
--- a/core/internal/kubernetes/service.go
+++ b/core/internal/kubernetes/service.go
@@ -24,18 +24,18 @@
"os"
"time"
- "k8s.io/client-go/informers"
- "k8s.io/client-go/tools/clientcmd"
-
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/clientcmd"
schema "git.monogon.dev/source/nexantic.git/core/generated/api"
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/internal/consensus"
+ "git.monogon.dev/source/nexantic.git/core/internal/kubernetes/clusternet"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/reconciler"
"git.monogon.dev/source/nexantic.git/core/internal/storage"
@@ -181,6 +181,11 @@
return fmt.Errorf("could not created kubelet config: %w", err)
}
+ key, err := clusternet.EnsureOnDiskKey()
+ if err != nil {
+ return fmt.Errorf("failed to ensure cluster key: %w", err)
+ }
+
for _, sub := range []struct {
name string
runnable supervisor.Runnable
@@ -192,6 +197,7 @@
{"reconciler", reconciler.Run(clientSet)},
{"csi-plugin", runCSIPlugin(s.storageService)},
{"pv-provisioner", runCSIProvisioner(s.storageService, clientSet, informerFactory)},
+ {"clusternet", clusternet.Run(informerFactory, clusterNet, clientSet, key)},
} {
err := supervisor.Run(ctx, sub.name, sub.runnable)
if err != nil {
diff --git a/core/internal/network/main.go b/core/internal/network/main.go
index 2466e05..ac9ce46 100644
--- a/core/internal/network/main.go
+++ b/core/internal/network/main.go
@@ -19,6 +19,7 @@
import (
"context"
"fmt"
+ "io/ioutil"
"net"
"os"
@@ -135,6 +136,10 @@
s.logger.Fatal("Failed to list network links", zap.Error(err))
}
+ if err := ioutil.WriteFile("/proc/sys/net/ipv4/ip_forward", []byte("1\n"), 0644); err != nil {
+ s.logger.Panic("Failed to enable IPv4 forwarding", zap.Error(err))
+ }
+
var ethernetLinks []netlink.Link
for _, link := range links {
attrs := link.Attrs()