blob: 15e3d5baacbc6b88a9b6c59de7b69b996d550ad1 [file] [log] [blame]
Lorenz Brunf042e6f2020-06-24 16:46:09 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17// Package clusternet implements a WireGuard-based overlay network for Kubernetes. It relies on controller-manager's
18// IPAM to assign IP ranges to nodes and on Kubernetes' Node objects to distribute the Node IPs and public keys.
19//
20// It sets up a single WireGuard network interface and routes the entire ClusterCIDR into that network interface,
21// relying on WireGuard's AllowedIPs mechanism to look up the correct peer node to send the traffic to. This means
22// that the routing table doesn't change and doesn't have to be separately managed. When clusternet is started
23// it annotates its WireGuard public key onto its node object.
24// For each node object that's created or updated on the K8s apiserver it checks if a public key annotation is set and
25// if yes a peer with that public key, its InternalIP as endpoint and the CIDR for that node as AllowedIPs is created.
26package clusternet
27
28import (
29 "context"
30 "encoding/json"
31 "errors"
32 "fmt"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020033 "net"
34 "os"
35
Serge Bazanskic7359672020-10-30 16:38:57 +010036 "git.monogon.dev/source/nexantic.git/core/pkg/logtree"
37
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020038 "k8s.io/client-go/informers"
39
40 "k8s.io/client-go/kubernetes"
41
Lorenz Brunf042e6f2020-06-24 16:46:09 +020042 "github.com/vishvananda/netlink"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020043 "golang.zx2c4.com/wireguard/wgctrl"
44 "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
45 corev1 "k8s.io/api/core/v1"
46 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
47 "k8s.io/apimachinery/pkg/types"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020048 "k8s.io/client-go/tools/cache"
49
50 "git.monogon.dev/source/nexantic.git/core/internal/common"
51 "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020052 "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020053 "git.monogon.dev/source/nexantic.git/core/pkg/jsonpatch"
54)
55
56const (
57 clusterNetDeviceName = "clusternet"
58 publicKeyAnnotation = "node.smalltown.nexantic.com/wg-pubkey"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020059)
60
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020061type Service struct {
62 NodeName string
63 Kubernetes kubernetes.Interface
64 ClusterNet net.IPNet
65 InformerFactory informers.SharedInformerFactory
66 DataDirectory *localstorage.DataKubernetesClusterNetworkingDirectory
67
68 wgClient *wgctrl.Client
69 privKey wgtypes.Key
Serge Bazanskic7359672020-10-30 16:38:57 +010070 logger logtree.LeveledLogger
Lorenz Brunf042e6f2020-06-24 16:46:09 +020071}
72
73// ensureNode creates/updates the corresponding WireGuard peer entry for the given node objet
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020074func (s *Service) ensureNode(newNode *corev1.Node) error {
75 if newNode.Name == s.NodeName {
Lorenz Brunf042e6f2020-06-24 16:46:09 +020076 // Node doesn't need to connect to itself
77 return nil
78 }
79 pubKeyRaw := newNode.Annotations[publicKeyAnnotation]
80 if pubKeyRaw == "" {
81 return nil
82 }
83 pubKey, err := wgtypes.ParseKey(pubKeyRaw)
84 if err != nil {
85 return fmt.Errorf("failed to parse public-key annotation: %w", err)
86 }
87 var internalIP net.IP
88 for _, addr := range newNode.Status.Addresses {
89 if addr.Type == corev1.NodeInternalIP {
90 if internalIP != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +010091 s.logger.Warningf("More than one NodeInternalIP specified, using the first one")
Lorenz Brunf042e6f2020-06-24 16:46:09 +020092 break
93 }
94 internalIP = net.ParseIP(addr.Address)
95 if internalIP == nil {
Serge Bazanskic7359672020-10-30 16:38:57 +010096 s.logger.Warningf("Failed to parse Internal IP %s", addr.Address)
Lorenz Brunf042e6f2020-06-24 16:46:09 +020097 }
98 }
99 }
100 if internalIP == nil {
101 return errors.New("node has no Internal IP")
102 }
103 var allowedIPs []net.IPNet
104 for _, podNetStr := range newNode.Spec.PodCIDRs {
105 _, podNet, err := net.ParseCIDR(podNetStr)
106 if err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100107 s.logger.Warningf("Node %s PodCIDR failed to parse, ignored: %v", newNode.Name, err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200108 continue
109 }
110 allowedIPs = append(allowedIPs, *podNet)
111 }
Lorenz Brunb682ba52020-07-08 14:51:36 +0200112 allowedIPs = append(allowedIPs, net.IPNet{IP: internalIP, Mask: net.CIDRMask(32, 32)})
Serge Bazanskic7359672020-10-30 16:38:57 +0100113 s.logger.V(1).Infof("Adding/Updating WireGuard peer node %s, endpoint %s, allowedIPs %+v", newNode.Name, internalIP.String(), allowedIPs)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200114 // WireGuard's kernel side has create/update semantics on peers by default. So we can just add the peer multiple
115 // times to update it.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200116 err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200117 Peers: []wgtypes.PeerConfig{{
118 PublicKey: pubKey,
119 Endpoint: &net.UDPAddr{Port: common.WireGuardPort, IP: internalIP},
120 ReplaceAllowedIPs: true,
121 AllowedIPs: allowedIPs,
122 }},
123 })
124 if err != nil {
125 return fmt.Errorf("failed to add WireGuard peer node: %w", err)
126 }
127 return nil
128}
129
130// removeNode removes the corresponding WireGuard peer entry for the given node object
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200131func (s *Service) removeNode(oldNode *corev1.Node) error {
132 if oldNode.Name == s.NodeName {
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200133 // Node doesn't need to connect to itself
134 return nil
135 }
136 pubKeyRaw := oldNode.Annotations[publicKeyAnnotation]
137 if pubKeyRaw == "" {
138 return nil
139 }
140 pubKey, err := wgtypes.ParseKey(pubKeyRaw)
141 if err != nil {
142 return fmt.Errorf("node public-key annotation not decodable: %w", err)
143 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200144 err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200145 Peers: []wgtypes.PeerConfig{{
146 PublicKey: pubKey,
147 Remove: true,
148 }},
149 })
150 if err != nil {
151 return fmt.Errorf("failed to remove WireGuard peer node: %w", err)
152 }
153 return nil
154}
155
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200156// ensureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
157func (s *Service) ensureOnDiskKey() error {
158 keyRaw, err := s.DataDirectory.Key.Read()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200159 if os.IsNotExist(err) {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200160 key, err := wgtypes.GeneratePrivateKey()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200161 if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200162 return fmt.Errorf("failed to generate private key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200163 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200164 if err := s.DataDirectory.Key.Write([]byte(key.String()), 0600); err != nil {
165 return fmt.Errorf("failed to store newly generated key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200166 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200167
168 s.privKey = key
169 return nil
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200170 } else if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200171 return fmt.Errorf("failed to load on-disk key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200172 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200173
174 key, err := wgtypes.ParseKey(string(keyRaw))
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200175 if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200176 return fmt.Errorf("invalid private key in file: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200177 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200178 s.privKey = key
179 return nil
180}
181
182// annotateThisNode annotates the node (as defined by NodeName) with the wireguard public key of this node.
183func (s *Service) annotateThisNode(ctx context.Context) error {
184 patch := []jsonpatch.JsonPatchOp{{
185 Operation: "add",
186 Path: "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
187 Value: s.privKey.PublicKey().String(),
188 }}
189
190 patchRaw, err := json.Marshal(patch)
191 if err != nil {
192 return fmt.Errorf("failed to encode JSONPatch: %w", err)
193 }
194
195 if _, err := s.Kubernetes.CoreV1().Nodes().Patch(ctx, s.NodeName, types.JSONPatchType, patchRaw, metav1.PatchOptions{}); err != nil {
196 return fmt.Errorf("failed to patch resource: %w", err)
197 }
198
199 return nil
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200200}
201
202// Run runs the ClusterNet service. See package description for what it does.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200203func (s *Service) Run(ctx context.Context) error {
204 logger := supervisor.Logger(ctx)
Lorenz Brunca24cfa2020-08-18 13:49:37 +0200205 s.logger = logger
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200206
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200207 wgClient, err := wgctrl.New()
208 if err != nil {
209 return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200210 }
Lorenz Brunca24cfa2020-08-18 13:49:37 +0200211 s.wgClient = wgClient
212
213 if err := s.ensureOnDiskKey(); err != nil {
214 return fmt.Errorf("failed to ensure on-disk key: %w", err)
215 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200216
217 wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
218 if err := netlink.LinkAdd(wgInterface); err != nil {
219 return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
220 }
221 defer netlink.LinkDel(wgInterface)
222
223 listenPort := common.WireGuardPort
224 if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
225 PrivateKey: &s.privKey,
226 ListenPort: &listenPort,
227 }); err != nil {
228 return fmt.Errorf("failed to set up WireGuard interface: %w", err)
229 }
230
231 if err := netlink.RouteAdd(&netlink.Route{
232 Dst: &s.ClusterNet,
233 LinkIndex: wgInterface.Index,
234 }); err != nil && !os.IsExist(err) {
235 return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
236 }
237
238 if err := s.annotateThisNode(ctx); err != nil {
239 return fmt.Errorf("when annotating this node with public key: %w", err)
240 }
241
242 nodeInformer := s.InformerFactory.Core().V1().Nodes()
243 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
244 AddFunc: func(new interface{}) {
245 newNode, ok := new.(*corev1.Node)
246 if !ok {
Serge Bazanskic7359672020-10-30 16:38:57 +0100247 logger.Errorf("Received non-node item %+v in node event handler", new)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200248 return
249 }
250 if err := s.ensureNode(newNode); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100251 logger.Warningf("Failed to sync node: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200252 }
253 },
254 UpdateFunc: func(old, new interface{}) {
255 newNode, ok := new.(*corev1.Node)
256 if !ok {
Serge Bazanskic7359672020-10-30 16:38:57 +0100257 logger.Errorf("Received non-node item %+v in node event handler", new)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200258 return
259 }
260 if err := s.ensureNode(newNode); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100261 logger.Warningf("Failed to sync node: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200262 }
263 },
264 DeleteFunc: func(old interface{}) {
265 oldNode, ok := old.(*corev1.Node)
266 if !ok {
Serge Bazanskic7359672020-10-30 16:38:57 +0100267 logger.Errorf("Received non-node item %+v in node event handler", oldNode)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200268 return
269 }
270 if err := s.removeNode(oldNode); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100271 logger.Warningf("Failed to sync node: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200272 }
273 },
274 })
275
276 supervisor.Signal(ctx, supervisor.SignalHealthy)
277 nodeInformer.Informer().Run(ctx.Done())
278 return ctx.Err()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200279}