blob: cd784346c8cddc10442a88f7bfb29a747f2ce7e7 [file] [log] [blame]
Lorenz Brunf042e6f2020-06-24 16:46:09 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17// Package clusternet implements a WireGuard-based overlay network for Kubernetes. It relies on controller-manager's
18// IPAM to assign IP ranges to nodes and on Kubernetes' Node objects to distribute the Node IPs and public keys.
19//
20// It sets up a single WireGuard network interface and routes the entire ClusterCIDR into that network interface,
21// relying on WireGuard's AllowedIPs mechanism to look up the correct peer node to send the traffic to. This means
22// that the routing table doesn't change and doesn't have to be separately managed. When clusternet is started
23// it annotates its WireGuard public key onto its node object.
24// For each node object that's created or updated on the K8s apiserver it checks if a public key annotation is set and
25// if yes a peer with that public key, its InternalIP as endpoint and the CIDR for that node as AllowedIPs is created.
26package clusternet
27
28import (
29 "context"
30 "encoding/json"
31 "errors"
32 "fmt"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020033 "net"
34 "os"
35
36 "github.com/vishvananda/netlink"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020037 "golang.zx2c4.com/wireguard/wgctrl"
38 "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
39 corev1 "k8s.io/api/core/v1"
40 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
41 "k8s.io/apimachinery/pkg/types"
Serge Bazanski77cb6c52020-12-19 00:09:22 +010042 "k8s.io/client-go/informers"
43 "k8s.io/client-go/kubernetes"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020044 "k8s.io/client-go/tools/cache"
45
Serge Bazanski77cb6c52020-12-19 00:09:22 +010046 common "git.monogon.dev/source/nexantic.git/metropolis/node"
47 "git.monogon.dev/source/nexantic.git/metropolis/node/common/jsonpatch"
48 "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
49 "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
50 "git.monogon.dev/source/nexantic.git/metropolis/node/core/logtree"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020051)
52
53const (
54 clusterNetDeviceName = "clusternet"
Serge Bazanski662b5b32020-12-21 13:49:00 +010055 publicKeyAnnotation = "node.metropolis.monogon.dev/wg-pubkey"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020056)
57
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020058type Service struct {
59 NodeName string
60 Kubernetes kubernetes.Interface
61 ClusterNet net.IPNet
62 InformerFactory informers.SharedInformerFactory
63 DataDirectory *localstorage.DataKubernetesClusterNetworkingDirectory
64
65 wgClient *wgctrl.Client
66 privKey wgtypes.Key
Serge Bazanskic7359672020-10-30 16:38:57 +010067 logger logtree.LeveledLogger
Lorenz Brunf042e6f2020-06-24 16:46:09 +020068}
69
70// ensureNode creates/updates the corresponding WireGuard peer entry for the given node objet
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020071func (s *Service) ensureNode(newNode *corev1.Node) error {
72 if newNode.Name == s.NodeName {
Lorenz Brunf042e6f2020-06-24 16:46:09 +020073 // Node doesn't need to connect to itself
74 return nil
75 }
76 pubKeyRaw := newNode.Annotations[publicKeyAnnotation]
77 if pubKeyRaw == "" {
78 return nil
79 }
80 pubKey, err := wgtypes.ParseKey(pubKeyRaw)
81 if err != nil {
82 return fmt.Errorf("failed to parse public-key annotation: %w", err)
83 }
84 var internalIP net.IP
85 for _, addr := range newNode.Status.Addresses {
86 if addr.Type == corev1.NodeInternalIP {
87 if internalIP != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +010088 s.logger.Warningf("More than one NodeInternalIP specified, using the first one")
Lorenz Brunf042e6f2020-06-24 16:46:09 +020089 break
90 }
91 internalIP = net.ParseIP(addr.Address)
92 if internalIP == nil {
Serge Bazanskic7359672020-10-30 16:38:57 +010093 s.logger.Warningf("Failed to parse Internal IP %s", addr.Address)
Lorenz Brunf042e6f2020-06-24 16:46:09 +020094 }
95 }
96 }
97 if internalIP == nil {
98 return errors.New("node has no Internal IP")
99 }
100 var allowedIPs []net.IPNet
101 for _, podNetStr := range newNode.Spec.PodCIDRs {
102 _, podNet, err := net.ParseCIDR(podNetStr)
103 if err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100104 s.logger.Warningf("Node %s PodCIDR failed to parse, ignored: %v", newNode.Name, err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200105 continue
106 }
107 allowedIPs = append(allowedIPs, *podNet)
108 }
Lorenz Brunb682ba52020-07-08 14:51:36 +0200109 allowedIPs = append(allowedIPs, net.IPNet{IP: internalIP, Mask: net.CIDRMask(32, 32)})
Serge Bazanskic7359672020-10-30 16:38:57 +0100110 s.logger.V(1).Infof("Adding/Updating WireGuard peer node %s, endpoint %s, allowedIPs %+v", newNode.Name, internalIP.String(), allowedIPs)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200111 // WireGuard's kernel side has create/update semantics on peers by default. So we can just add the peer multiple
112 // times to update it.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200113 err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200114 Peers: []wgtypes.PeerConfig{{
115 PublicKey: pubKey,
116 Endpoint: &net.UDPAddr{Port: common.WireGuardPort, IP: internalIP},
117 ReplaceAllowedIPs: true,
118 AllowedIPs: allowedIPs,
119 }},
120 })
121 if err != nil {
122 return fmt.Errorf("failed to add WireGuard peer node: %w", err)
123 }
124 return nil
125}
126
127// removeNode removes the corresponding WireGuard peer entry for the given node object
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200128func (s *Service) removeNode(oldNode *corev1.Node) error {
129 if oldNode.Name == s.NodeName {
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200130 // Node doesn't need to connect to itself
131 return nil
132 }
133 pubKeyRaw := oldNode.Annotations[publicKeyAnnotation]
134 if pubKeyRaw == "" {
135 return nil
136 }
137 pubKey, err := wgtypes.ParseKey(pubKeyRaw)
138 if err != nil {
139 return fmt.Errorf("node public-key annotation not decodable: %w", err)
140 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200141 err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200142 Peers: []wgtypes.PeerConfig{{
143 PublicKey: pubKey,
144 Remove: true,
145 }},
146 })
147 if err != nil {
148 return fmt.Errorf("failed to remove WireGuard peer node: %w", err)
149 }
150 return nil
151}
152
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200153// ensureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
154func (s *Service) ensureOnDiskKey() error {
155 keyRaw, err := s.DataDirectory.Key.Read()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200156 if os.IsNotExist(err) {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200157 key, err := wgtypes.GeneratePrivateKey()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200158 if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200159 return fmt.Errorf("failed to generate private key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200160 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200161 if err := s.DataDirectory.Key.Write([]byte(key.String()), 0600); err != nil {
162 return fmt.Errorf("failed to store newly generated key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200163 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200164
165 s.privKey = key
166 return nil
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200167 } else if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200168 return fmt.Errorf("failed to load on-disk key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200169 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200170
171 key, err := wgtypes.ParseKey(string(keyRaw))
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200172 if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200173 return fmt.Errorf("invalid private key in file: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200174 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200175 s.privKey = key
176 return nil
177}
178
179// annotateThisNode annotates the node (as defined by NodeName) with the wireguard public key of this node.
180func (s *Service) annotateThisNode(ctx context.Context) error {
181 patch := []jsonpatch.JsonPatchOp{{
182 Operation: "add",
183 Path: "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
184 Value: s.privKey.PublicKey().String(),
185 }}
186
187 patchRaw, err := json.Marshal(patch)
188 if err != nil {
189 return fmt.Errorf("failed to encode JSONPatch: %w", err)
190 }
191
192 if _, err := s.Kubernetes.CoreV1().Nodes().Patch(ctx, s.NodeName, types.JSONPatchType, patchRaw, metav1.PatchOptions{}); err != nil {
193 return fmt.Errorf("failed to patch resource: %w", err)
194 }
195
196 return nil
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200197}
198
199// Run runs the ClusterNet service. See package description for what it does.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200200func (s *Service) Run(ctx context.Context) error {
201 logger := supervisor.Logger(ctx)
Lorenz Brunca24cfa2020-08-18 13:49:37 +0200202 s.logger = logger
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200203
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200204 wgClient, err := wgctrl.New()
205 if err != nil {
206 return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200207 }
Lorenz Brunca24cfa2020-08-18 13:49:37 +0200208 s.wgClient = wgClient
209
210 if err := s.ensureOnDiskKey(); err != nil {
211 return fmt.Errorf("failed to ensure on-disk key: %w", err)
212 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200213
214 wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
215 if err := netlink.LinkAdd(wgInterface); err != nil {
216 return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
217 }
218 defer netlink.LinkDel(wgInterface)
219
220 listenPort := common.WireGuardPort
221 if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
222 PrivateKey: &s.privKey,
223 ListenPort: &listenPort,
224 }); err != nil {
225 return fmt.Errorf("failed to set up WireGuard interface: %w", err)
226 }
227
228 if err := netlink.RouteAdd(&netlink.Route{
229 Dst: &s.ClusterNet,
230 LinkIndex: wgInterface.Index,
231 }); err != nil && !os.IsExist(err) {
232 return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
233 }
234
235 if err := s.annotateThisNode(ctx); err != nil {
236 return fmt.Errorf("when annotating this node with public key: %w", err)
237 }
238
239 nodeInformer := s.InformerFactory.Core().V1().Nodes()
240 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
241 AddFunc: func(new interface{}) {
242 newNode, ok := new.(*corev1.Node)
243 if !ok {
Serge Bazanskic7359672020-10-30 16:38:57 +0100244 logger.Errorf("Received non-node item %+v in node event handler", new)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200245 return
246 }
247 if err := s.ensureNode(newNode); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100248 logger.Warningf("Failed to sync node: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200249 }
250 },
251 UpdateFunc: func(old, new interface{}) {
252 newNode, ok := new.(*corev1.Node)
253 if !ok {
Serge Bazanskic7359672020-10-30 16:38:57 +0100254 logger.Errorf("Received non-node item %+v in node event handler", new)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200255 return
256 }
257 if err := s.ensureNode(newNode); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100258 logger.Warningf("Failed to sync node: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200259 }
260 },
261 DeleteFunc: func(old interface{}) {
262 oldNode, ok := old.(*corev1.Node)
263 if !ok {
Serge Bazanskic7359672020-10-30 16:38:57 +0100264 logger.Errorf("Received non-node item %+v in node event handler", oldNode)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200265 return
266 }
267 if err := s.removeNode(oldNode); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100268 logger.Warningf("Failed to sync node: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200269 }
270 },
271 })
272
273 supervisor.Signal(ctx, supervisor.SignalHealthy)
274 nodeInformer.Informer().Run(ctx.Done())
275 return ctx.Err()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200276}