blob: 85a78a1abca68fd9f68ff0ff0ea81b93ad66a567 [file] [log] [blame]
Lorenz Brunf042e6f2020-06-24 16:46:09 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanski216fe7b2021-05-21 18:36:16 +020017// Package clusternet implements a WireGuard-based overlay network for
18// Kubernetes. It relies on controller-manager's IPAM to assign IP ranges to
19// nodes and on Kubernetes' Node objects to distribute the Node IPs and public
20// keys.
Lorenz Brunf042e6f2020-06-24 16:46:09 +020021//
Serge Bazanski216fe7b2021-05-21 18:36:16 +020022// It sets up a single WireGuard network interface and routes the entire
23// ClusterCIDR into that network interface, relying on WireGuard's AllowedIPs
24// mechanism to look up the correct peer node to send the traffic to. This
25// means that the routing table doesn't change and doesn't have to be
26// separately managed. When clusternet is started it annotates its WireGuard
27// public key onto its node object.
28// For each node object that's created or updated on the K8s apiserver it
29// checks if a public key annotation is set and if yes a peer with that public
30// key, its InternalIP as endpoint and the CIDR for that node as AllowedIPs is
31// created.
Lorenz Brunf042e6f2020-06-24 16:46:09 +020032package clusternet
33
34import (
35 "context"
36 "encoding/json"
37 "errors"
38 "fmt"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020039 "net"
40 "os"
41
42 "github.com/vishvananda/netlink"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020043 "golang.zx2c4.com/wireguard/wgctrl"
44 "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
45 corev1 "k8s.io/api/core/v1"
46 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
47 "k8s.io/apimachinery/pkg/types"
Serge Bazanski77cb6c52020-12-19 00:09:22 +010048 "k8s.io/client-go/informers"
49 "k8s.io/client-go/kubernetes"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020050 "k8s.io/client-go/tools/cache"
51
Serge Bazanski31370b02021-01-07 16:31:14 +010052 common "source.monogon.dev/metropolis/node"
53 "source.monogon.dev/metropolis/node/core/localstorage"
Serge Bazanski31370b02021-01-07 16:31:14 +010054 "source.monogon.dev/metropolis/pkg/jsonpatch"
Serge Bazanski216fe7b2021-05-21 18:36:16 +020055 "source.monogon.dev/metropolis/pkg/logtree"
Serge Bazanski31370b02021-01-07 16:31:14 +010056 "source.monogon.dev/metropolis/pkg/supervisor"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020057)
58
59const (
60 clusterNetDeviceName = "clusternet"
Serge Bazanski662b5b32020-12-21 13:49:00 +010061 publicKeyAnnotation = "node.metropolis.monogon.dev/wg-pubkey"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020062)
63
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020064type Service struct {
65 NodeName string
66 Kubernetes kubernetes.Interface
67 ClusterNet net.IPNet
68 InformerFactory informers.SharedInformerFactory
69 DataDirectory *localstorage.DataKubernetesClusterNetworkingDirectory
70
71 wgClient *wgctrl.Client
72 privKey wgtypes.Key
Serge Bazanskic7359672020-10-30 16:38:57 +010073 logger logtree.LeveledLogger
Lorenz Brunf042e6f2020-06-24 16:46:09 +020074}
75
Serge Bazanski216fe7b2021-05-21 18:36:16 +020076// ensureNode creates/updates the corresponding WireGuard peer entry for the
77// given node objet
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020078func (s *Service) ensureNode(newNode *corev1.Node) error {
79 if newNode.Name == s.NodeName {
Lorenz Brunf042e6f2020-06-24 16:46:09 +020080 // Node doesn't need to connect to itself
81 return nil
82 }
83 pubKeyRaw := newNode.Annotations[publicKeyAnnotation]
84 if pubKeyRaw == "" {
85 return nil
86 }
87 pubKey, err := wgtypes.ParseKey(pubKeyRaw)
88 if err != nil {
89 return fmt.Errorf("failed to parse public-key annotation: %w", err)
90 }
91 var internalIP net.IP
92 for _, addr := range newNode.Status.Addresses {
93 if addr.Type == corev1.NodeInternalIP {
94 if internalIP != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +010095 s.logger.Warningf("More than one NodeInternalIP specified, using the first one")
Lorenz Brunf042e6f2020-06-24 16:46:09 +020096 break
97 }
98 internalIP = net.ParseIP(addr.Address)
99 if internalIP == nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100100 s.logger.Warningf("Failed to parse Internal IP %s", addr.Address)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200101 }
102 }
103 }
104 if internalIP == nil {
105 return errors.New("node has no Internal IP")
106 }
107 var allowedIPs []net.IPNet
108 for _, podNetStr := range newNode.Spec.PodCIDRs {
109 _, podNet, err := net.ParseCIDR(podNetStr)
110 if err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100111 s.logger.Warningf("Node %s PodCIDR failed to parse, ignored: %v", newNode.Name, err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200112 continue
113 }
114 allowedIPs = append(allowedIPs, *podNet)
115 }
Lorenz Brunb682ba52020-07-08 14:51:36 +0200116 allowedIPs = append(allowedIPs, net.IPNet{IP: internalIP, Mask: net.CIDRMask(32, 32)})
Serge Bazanskic7359672020-10-30 16:38:57 +0100117 s.logger.V(1).Infof("Adding/Updating WireGuard peer node %s, endpoint %s, allowedIPs %+v", newNode.Name, internalIP.String(), allowedIPs)
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200118 // WireGuard's kernel side has create/update semantics on peers by default.
119 // So we can just add the peer multiple times to update it.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200120 err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200121 Peers: []wgtypes.PeerConfig{{
122 PublicKey: pubKey,
123 Endpoint: &net.UDPAddr{Port: common.WireGuardPort, IP: internalIP},
124 ReplaceAllowedIPs: true,
125 AllowedIPs: allowedIPs,
126 }},
127 })
128 if err != nil {
129 return fmt.Errorf("failed to add WireGuard peer node: %w", err)
130 }
131 return nil
132}
133
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200134// removeNode removes the corresponding WireGuard peer entry for the given node
135// object
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200136func (s *Service) removeNode(oldNode *corev1.Node) error {
137 if oldNode.Name == s.NodeName {
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200138 // Node doesn't need to connect to itself
139 return nil
140 }
141 pubKeyRaw := oldNode.Annotations[publicKeyAnnotation]
142 if pubKeyRaw == "" {
143 return nil
144 }
145 pubKey, err := wgtypes.ParseKey(pubKeyRaw)
146 if err != nil {
147 return fmt.Errorf("node public-key annotation not decodable: %w", err)
148 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200149 err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200150 Peers: []wgtypes.PeerConfig{{
151 PublicKey: pubKey,
152 Remove: true,
153 }},
154 })
155 if err != nil {
156 return fmt.Errorf("failed to remove WireGuard peer node: %w", err)
157 }
158 return nil
159}
160
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200161// ensureOnDiskKey loads the private key from disk or (if none exists)
162// generates one and persists it.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200163func (s *Service) ensureOnDiskKey() error {
164 keyRaw, err := s.DataDirectory.Key.Read()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200165 if os.IsNotExist(err) {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200166 key, err := wgtypes.GeneratePrivateKey()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200167 if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200168 return fmt.Errorf("failed to generate private key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200169 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200170 if err := s.DataDirectory.Key.Write([]byte(key.String()), 0600); err != nil {
171 return fmt.Errorf("failed to store newly generated key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200172 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200173
174 s.privKey = key
175 return nil
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200176 } else if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200177 return fmt.Errorf("failed to load on-disk key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200178 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200179
180 key, err := wgtypes.ParseKey(string(keyRaw))
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200181 if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200182 return fmt.Errorf("invalid private key in file: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200183 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200184 s.privKey = key
185 return nil
186}
187
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200188// annotateThisNode annotates the node (as defined by NodeName) with the
189// wireguard public key of this node.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200190func (s *Service) annotateThisNode(ctx context.Context) error {
191 patch := []jsonpatch.JsonPatchOp{{
192 Operation: "add",
193 Path: "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
194 Value: s.privKey.PublicKey().String(),
195 }}
196
197 patchRaw, err := json.Marshal(patch)
198 if err != nil {
199 return fmt.Errorf("failed to encode JSONPatch: %w", err)
200 }
201
202 if _, err := s.Kubernetes.CoreV1().Nodes().Patch(ctx, s.NodeName, types.JSONPatchType, patchRaw, metav1.PatchOptions{}); err != nil {
203 return fmt.Errorf("failed to patch resource: %w", err)
204 }
205
206 return nil
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200207}
208
209// Run runs the ClusterNet service. See package description for what it does.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200210func (s *Service) Run(ctx context.Context) error {
211 logger := supervisor.Logger(ctx)
Lorenz Brunca24cfa2020-08-18 13:49:37 +0200212 s.logger = logger
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200213
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200214 wgClient, err := wgctrl.New()
215 if err != nil {
216 return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200217 }
Lorenz Brunca24cfa2020-08-18 13:49:37 +0200218 s.wgClient = wgClient
219
220 if err := s.ensureOnDiskKey(); err != nil {
221 return fmt.Errorf("failed to ensure on-disk key: %w", err)
222 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200223
224 wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
225 if err := netlink.LinkAdd(wgInterface); err != nil {
226 return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
227 }
228 defer netlink.LinkDel(wgInterface)
229
230 listenPort := common.WireGuardPort
231 if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
232 PrivateKey: &s.privKey,
233 ListenPort: &listenPort,
234 }); err != nil {
235 return fmt.Errorf("failed to set up WireGuard interface: %w", err)
236 }
237
238 if err := netlink.RouteAdd(&netlink.Route{
239 Dst: &s.ClusterNet,
240 LinkIndex: wgInterface.Index,
241 }); err != nil && !os.IsExist(err) {
242 return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
243 }
244
245 if err := s.annotateThisNode(ctx); err != nil {
246 return fmt.Errorf("when annotating this node with public key: %w", err)
247 }
248
249 nodeInformer := s.InformerFactory.Core().V1().Nodes()
250 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
251 AddFunc: func(new interface{}) {
252 newNode, ok := new.(*corev1.Node)
253 if !ok {
Serge Bazanskic7359672020-10-30 16:38:57 +0100254 logger.Errorf("Received non-node item %+v in node event handler", new)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200255 return
256 }
257 if err := s.ensureNode(newNode); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100258 logger.Warningf("Failed to sync node: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200259 }
260 },
261 UpdateFunc: func(old, new interface{}) {
262 newNode, ok := new.(*corev1.Node)
263 if !ok {
Serge Bazanskic7359672020-10-30 16:38:57 +0100264 logger.Errorf("Received non-node item %+v in node event handler", new)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200265 return
266 }
267 if err := s.ensureNode(newNode); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100268 logger.Warningf("Failed to sync node: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200269 }
270 },
271 DeleteFunc: func(old interface{}) {
272 oldNode, ok := old.(*corev1.Node)
273 if !ok {
Serge Bazanskic7359672020-10-30 16:38:57 +0100274 logger.Errorf("Received non-node item %+v in node event handler", oldNode)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200275 return
276 }
277 if err := s.removeNode(oldNode); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100278 logger.Warningf("Failed to sync node: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200279 }
280 },
281 })
282
283 supervisor.Signal(ctx, supervisor.SignalHealthy)
284 nodeInformer.Informer().Run(ctx.Done())
285 return ctx.Err()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200286}