blob: aa3e7ce88131e115d09ed931dcbeb7451ac7fd23 [file] [log] [blame]
Lorenz Brunf042e6f2020-06-24 16:46:09 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17// Package clusternet implements a WireGuard-based overlay network for Kubernetes. It relies on controller-manager's
18// IPAM to assign IP ranges to nodes and on Kubernetes' Node objects to distribute the Node IPs and public keys.
19//
20// It sets up a single WireGuard network interface and routes the entire ClusterCIDR into that network interface,
21// relying on WireGuard's AllowedIPs mechanism to look up the correct peer node to send the traffic to. This means
22// that the routing table doesn't change and doesn't have to be separately managed. When clusternet is started
23// it annotates its WireGuard public key onto its node object.
24// For each node object that's created or updated on the K8s apiserver it checks if a public key annotation is set and
25// if yes a peer with that public key, its InternalIP as endpoint and the CIDR for that node as AllowedIPs is created.
26package clusternet
27
28import (
29 "context"
30 "encoding/json"
31 "errors"
32 "fmt"
33 "io/ioutil"
34 "net"
35 "os"
36
37 "github.com/vishvananda/netlink"
38 "go.uber.org/zap"
39 "golang.zx2c4.com/wireguard/wgctrl"
40 "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
41 corev1 "k8s.io/api/core/v1"
42 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
43 "k8s.io/apimachinery/pkg/types"
44 "k8s.io/client-go/informers"
45 coreinformers "k8s.io/client-go/informers/core/v1"
46 "k8s.io/client-go/kubernetes"
47 "k8s.io/client-go/tools/cache"
48
49 "git.monogon.dev/source/nexantic.git/core/internal/common"
50 "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
51 "git.monogon.dev/source/nexantic.git/core/pkg/jsonpatch"
52)
53
54const (
55 clusterNetDeviceName = "clusternet"
56 publicKeyAnnotation = "node.smalltown.nexantic.com/wg-pubkey"
57
58 privateKeyPath = "/data/kubernetes/clusternet.key"
59)
60
61type clusternet struct {
62 nodeName string
63 wgClient *wgctrl.Client
64 nodeInformer coreinformers.NodeInformer
65 logger *zap.Logger
66}
67
68// ensureNode creates/updates the corresponding WireGuard peer entry for the given node objet
69func (c *clusternet) ensureNode(newNode *corev1.Node) error {
70 if newNode.Name == c.nodeName {
71 // Node doesn't need to connect to itself
72 return nil
73 }
74 pubKeyRaw := newNode.Annotations[publicKeyAnnotation]
75 if pubKeyRaw == "" {
76 return nil
77 }
78 pubKey, err := wgtypes.ParseKey(pubKeyRaw)
79 if err != nil {
80 return fmt.Errorf("failed to parse public-key annotation: %w", err)
81 }
82 var internalIP net.IP
83 for _, addr := range newNode.Status.Addresses {
84 if addr.Type == corev1.NodeInternalIP {
85 if internalIP != nil {
86 c.logger.Warn("More than one NodeInternalIP specified, using the first one")
87 break
88 }
89 internalIP = net.ParseIP(addr.Address)
90 if internalIP == nil {
91 c.logger.Warn("failed to parse Internal IP")
92 }
93 }
94 }
95 if internalIP == nil {
96 return errors.New("node has no Internal IP")
97 }
98 var allowedIPs []net.IPNet
99 for _, podNetStr := range newNode.Spec.PodCIDRs {
100 _, podNet, err := net.ParseCIDR(podNetStr)
101 if err != nil {
102 c.logger.Warn("Node PodCIDR failed to parse, ignored", zap.Error(err), zap.String("node", newNode.Name))
103 continue
104 }
105 allowedIPs = append(allowedIPs, *podNet)
106 }
107 c.logger.Debug("Adding/Updating WireGuard peer node", zap.String("node", newNode.Name),
108 zap.String("endpointIP", internalIP.String()), zap.Any("allowedIPs", allowedIPs))
109 // WireGuard's kernel side has create/update semantics on peers by default. So we can just add the peer multiple
110 // times to update it.
111 err = c.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
112 Peers: []wgtypes.PeerConfig{{
113 PublicKey: pubKey,
114 Endpoint: &net.UDPAddr{Port: common.WireGuardPort, IP: internalIP},
115 ReplaceAllowedIPs: true,
116 AllowedIPs: allowedIPs,
117 }},
118 })
119 if err != nil {
120 return fmt.Errorf("failed to add WireGuard peer node: %w", err)
121 }
122 return nil
123}
124
125// removeNode removes the corresponding WireGuard peer entry for the given node object
126func (c *clusternet) removeNode(oldNode *corev1.Node) error {
127 if oldNode.Name == c.nodeName {
128 // Node doesn't need to connect to itself
129 return nil
130 }
131 pubKeyRaw := oldNode.Annotations[publicKeyAnnotation]
132 if pubKeyRaw == "" {
133 return nil
134 }
135 pubKey, err := wgtypes.ParseKey(pubKeyRaw)
136 if err != nil {
137 return fmt.Errorf("node public-key annotation not decodable: %w", err)
138 }
139 err = c.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
140 Peers: []wgtypes.PeerConfig{{
141 PublicKey: pubKey,
142 Remove: true,
143 }},
144 })
145 if err != nil {
146 return fmt.Errorf("failed to remove WireGuard peer node: %w", err)
147 }
148 return nil
149}
150
151// EnsureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
152func EnsureOnDiskKey() (*wgtypes.Key, error) {
153 privKeyRaw, err := ioutil.ReadFile(privateKeyPath)
154 if os.IsNotExist(err) {
155 privKey, err := wgtypes.GeneratePrivateKey()
156 if err != nil {
157 return nil, fmt.Errorf("failed to generate private key: %w", err)
158 }
159 if err := ioutil.WriteFile(privateKeyPath, []byte(privKey.String()), 0600); err != nil {
160 return nil, fmt.Errorf("failed to store newly generated key: %w", err)
161 }
162 return &privKey, nil
163 } else if err != nil {
164 return nil, fmt.Errorf("failed to load on-disk key: %w", err)
165 }
166 privKey, err := wgtypes.ParseKey(string(privKeyRaw))
167 if err != nil {
168 return nil, fmt.Errorf("invalid private key in file: %w", err)
169 }
170 return &privKey, nil
171}
172
173// Run runs the ClusterNet service. See package description for what it does.
174func Run(informerFactory informers.SharedInformerFactory, clusterNet net.IPNet, clientSet kubernetes.Interface, key *wgtypes.Key) supervisor.Runnable {
175 return func(ctx context.Context) error {
176 logger := supervisor.Logger(ctx)
177 nodeName, err := os.Hostname()
178 if err != nil {
179 return fmt.Errorf("failed to determine hostname: %w", err)
180 }
181 wgClient, err := wgctrl.New()
182 if err != nil {
183 return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
184 }
185
186 nodeAnnotationPatch := []jsonpatch.JsonPatchOp{{
187 Operation: "add",
188 Path: "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
189 Value: key.PublicKey().String(),
190 }}
191
192 nodeAnnotationPatchRaw, err := json.Marshal(nodeAnnotationPatch)
193 if err != nil {
194 return fmt.Errorf("failed to encode JSONPatch: %w", err)
195 }
196
197 if _, err := clientSet.CoreV1().Nodes().Patch(ctx, nodeName, types.JSONPatchType, nodeAnnotationPatchRaw, metav1.PatchOptions{}); err != nil {
198 return fmt.Errorf("failed to set ClusterNet public key for node: %w", err)
199 }
200
201 nodeInformer := informerFactory.Core().V1().Nodes()
202 wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
203 if err := netlink.LinkAdd(wgInterface); err != nil {
204 return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
205 }
206 defer netlink.LinkDel(wgInterface)
207
208 listenPort := common.WireGuardPort
209 if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
210 PrivateKey: key,
211 ListenPort: &listenPort,
212 }); err != nil {
213 return fmt.Errorf("failed to set up WireGuard interface: %w", err)
214 }
215
216 if err := netlink.RouteAdd(&netlink.Route{
217 Dst: &clusterNet,
218 LinkIndex: wgInterface.Index,
219 }); err != nil && !os.IsExist(err) {
220 return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
221 }
222
223 cnet := clusternet{
224 wgClient: wgClient,
225 nodeInformer: nodeInformer,
226 logger: logger,
227 nodeName: nodeName,
228 }
229
230 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
231 AddFunc: func(new interface{}) {
232 newNode, ok := new.(*corev1.Node)
233 if !ok {
234 logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
235 return
236 }
237 if err := cnet.ensureNode(newNode); err != nil {
238 logger.Warn("Failed to sync node", zap.Error(err))
239 }
240 },
241 UpdateFunc: func(old, new interface{}) {
242 newNode, ok := new.(*corev1.Node)
243 if !ok {
244 logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
245 return
246 }
247 if err := cnet.ensureNode(newNode); err != nil {
248 logger.Warn("Failed to sync node", zap.Error(err))
249 }
250 },
251 DeleteFunc: func(old interface{}) {
252 oldNode, ok := old.(*corev1.Node)
253 if !ok {
254 logger.Error("Received non-node item in node event handler", zap.Reflect("item", oldNode))
255 return
256 }
257 if err := cnet.removeNode(oldNode); err != nil {
258 logger.Warn("Failed to sync node", zap.Error(err))
259 }
260 },
261 })
262 supervisor.Signal(ctx, supervisor.SignalHealthy)
263 nodeInformer.Informer().Run(ctx.Done())
264 return ctx.Err()
265 }
266}