blob: 5c42bb89f846f2bb6fc27ea5f9a3007db646aed8 [file] [log] [blame]
Lorenz Brunf042e6f2020-06-24 16:46:09 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17// Package clusternet implements a WireGuard-based overlay network for Kubernetes. It relies on controller-manager's
18// IPAM to assign IP ranges to nodes and on Kubernetes' Node objects to distribute the Node IPs and public keys.
19//
20// It sets up a single WireGuard network interface and routes the entire ClusterCIDR into that network interface,
21// relying on WireGuard's AllowedIPs mechanism to look up the correct peer node to send the traffic to. This means
22// that the routing table doesn't change and doesn't have to be separately managed. When clusternet is started
23// it annotates its WireGuard public key onto its node object.
24// For each node object that's created or updated on the K8s apiserver it checks if a public key annotation is set and
25// if yes a peer with that public key, its InternalIP as endpoint and the CIDR for that node as AllowedIPs is created.
26package clusternet
27
28import (
29 "context"
30 "encoding/json"
31 "errors"
32 "fmt"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020033 "net"
34 "os"
35
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020036 "k8s.io/client-go/informers"
37
38 "k8s.io/client-go/kubernetes"
39
Lorenz Brunf042e6f2020-06-24 16:46:09 +020040 "github.com/vishvananda/netlink"
41 "go.uber.org/zap"
42 "golang.zx2c4.com/wireguard/wgctrl"
43 "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
44 corev1 "k8s.io/api/core/v1"
45 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
46 "k8s.io/apimachinery/pkg/types"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020047 "k8s.io/client-go/tools/cache"
48
49 "git.monogon.dev/source/nexantic.git/core/internal/common"
50 "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020051 "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020052 "git.monogon.dev/source/nexantic.git/core/pkg/jsonpatch"
53)
54
55const (
56 clusterNetDeviceName = "clusternet"
57 publicKeyAnnotation = "node.smalltown.nexantic.com/wg-pubkey"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020058)
59
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020060type Service struct {
61 NodeName string
62 Kubernetes kubernetes.Interface
63 ClusterNet net.IPNet
64 InformerFactory informers.SharedInformerFactory
65 DataDirectory *localstorage.DataKubernetesClusterNetworkingDirectory
66
67 wgClient *wgctrl.Client
68 privKey wgtypes.Key
69 logger *zap.Logger
Lorenz Brunf042e6f2020-06-24 16:46:09 +020070}
71
72// ensureNode creates/updates the corresponding WireGuard peer entry for the given node objet
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020073func (s *Service) ensureNode(newNode *corev1.Node) error {
74 if newNode.Name == s.NodeName {
Lorenz Brunf042e6f2020-06-24 16:46:09 +020075 // Node doesn't need to connect to itself
76 return nil
77 }
78 pubKeyRaw := newNode.Annotations[publicKeyAnnotation]
79 if pubKeyRaw == "" {
80 return nil
81 }
82 pubKey, err := wgtypes.ParseKey(pubKeyRaw)
83 if err != nil {
84 return fmt.Errorf("failed to parse public-key annotation: %w", err)
85 }
86 var internalIP net.IP
87 for _, addr := range newNode.Status.Addresses {
88 if addr.Type == corev1.NodeInternalIP {
89 if internalIP != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020090 s.logger.Warn("More than one NodeInternalIP specified, using the first one")
Lorenz Brunf042e6f2020-06-24 16:46:09 +020091 break
92 }
93 internalIP = net.ParseIP(addr.Address)
94 if internalIP == nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020095 s.logger.Warn("failed to parse Internal IP")
Lorenz Brunf042e6f2020-06-24 16:46:09 +020096 }
97 }
98 }
99 if internalIP == nil {
100 return errors.New("node has no Internal IP")
101 }
102 var allowedIPs []net.IPNet
103 for _, podNetStr := range newNode.Spec.PodCIDRs {
104 _, podNet, err := net.ParseCIDR(podNetStr)
105 if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200106 s.logger.Warn("Node PodCIDR failed to parse, ignored", zap.Error(err), zap.String("node", newNode.Name))
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200107 continue
108 }
109 allowedIPs = append(allowedIPs, *podNet)
110 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200111 s.logger.Debug("Adding/Updating WireGuard peer node", zap.String("node", newNode.Name),
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200112 zap.String("endpointIP", internalIP.String()), zap.Any("allowedIPs", allowedIPs))
113 // WireGuard's kernel side has create/update semantics on peers by default. So we can just add the peer multiple
114 // times to update it.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200115 err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200116 Peers: []wgtypes.PeerConfig{{
117 PublicKey: pubKey,
118 Endpoint: &net.UDPAddr{Port: common.WireGuardPort, IP: internalIP},
119 ReplaceAllowedIPs: true,
120 AllowedIPs: allowedIPs,
121 }},
122 })
123 if err != nil {
124 return fmt.Errorf("failed to add WireGuard peer node: %w", err)
125 }
126 return nil
127}
128
129// removeNode removes the corresponding WireGuard peer entry for the given node object
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200130func (s *Service) removeNode(oldNode *corev1.Node) error {
131 if oldNode.Name == s.NodeName {
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200132 // Node doesn't need to connect to itself
133 return nil
134 }
135 pubKeyRaw := oldNode.Annotations[publicKeyAnnotation]
136 if pubKeyRaw == "" {
137 return nil
138 }
139 pubKey, err := wgtypes.ParseKey(pubKeyRaw)
140 if err != nil {
141 return fmt.Errorf("node public-key annotation not decodable: %w", err)
142 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200143 err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200144 Peers: []wgtypes.PeerConfig{{
145 PublicKey: pubKey,
146 Remove: true,
147 }},
148 })
149 if err != nil {
150 return fmt.Errorf("failed to remove WireGuard peer node: %w", err)
151 }
152 return nil
153}
154
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200155// ensureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
156func (s *Service) ensureOnDiskKey() error {
157 keyRaw, err := s.DataDirectory.Key.Read()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200158 if os.IsNotExist(err) {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200159 key, err := wgtypes.GeneratePrivateKey()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200160 if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200161 return fmt.Errorf("failed to generate private key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200162 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200163 if err := s.DataDirectory.Key.Write([]byte(key.String()), 0600); err != nil {
164 return fmt.Errorf("failed to store newly generated key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200165 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200166
167 s.privKey = key
168 return nil
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200169 } else if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200170 return fmt.Errorf("failed to load on-disk key: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200171 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200172
173 key, err := wgtypes.ParseKey(string(keyRaw))
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200174 if err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200175 return fmt.Errorf("invalid private key in file: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200176 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200177 s.privKey = key
178 return nil
179}
180
181// annotateThisNode annotates the node (as defined by NodeName) with the wireguard public key of this node.
182func (s *Service) annotateThisNode(ctx context.Context) error {
183 patch := []jsonpatch.JsonPatchOp{{
184 Operation: "add",
185 Path: "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
186 Value: s.privKey.PublicKey().String(),
187 }}
188
189 patchRaw, err := json.Marshal(patch)
190 if err != nil {
191 return fmt.Errorf("failed to encode JSONPatch: %w", err)
192 }
193
194 if _, err := s.Kubernetes.CoreV1().Nodes().Patch(ctx, s.NodeName, types.JSONPatchType, patchRaw, metav1.PatchOptions{}); err != nil {
195 return fmt.Errorf("failed to patch resource: %w", err)
196 }
197
198 return nil
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200199}
200
201// Run runs the ClusterNet service. See package description for what it does.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200202func (s *Service) Run(ctx context.Context) error {
203 logger := supervisor.Logger(ctx)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200204
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200205 wgClient, err := wgctrl.New()
206 if err != nil {
207 return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200208 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200209
210 wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
211 if err := netlink.LinkAdd(wgInterface); err != nil {
212 return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
213 }
214 defer netlink.LinkDel(wgInterface)
215
216 listenPort := common.WireGuardPort
217 if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
218 PrivateKey: &s.privKey,
219 ListenPort: &listenPort,
220 }); err != nil {
221 return fmt.Errorf("failed to set up WireGuard interface: %w", err)
222 }
223
224 if err := netlink.RouteAdd(&netlink.Route{
225 Dst: &s.ClusterNet,
226 LinkIndex: wgInterface.Index,
227 }); err != nil && !os.IsExist(err) {
228 return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
229 }
230
231 if err := s.annotateThisNode(ctx); err != nil {
232 return fmt.Errorf("when annotating this node with public key: %w", err)
233 }
234
235 nodeInformer := s.InformerFactory.Core().V1().Nodes()
236 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
237 AddFunc: func(new interface{}) {
238 newNode, ok := new.(*corev1.Node)
239 if !ok {
240 logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
241 return
242 }
243 if err := s.ensureNode(newNode); err != nil {
244 logger.Warn("Failed to sync node", zap.Error(err))
245 }
246 },
247 UpdateFunc: func(old, new interface{}) {
248 newNode, ok := new.(*corev1.Node)
249 if !ok {
250 logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
251 return
252 }
253 if err := s.ensureNode(newNode); err != nil {
254 logger.Warn("Failed to sync node", zap.Error(err))
255 }
256 },
257 DeleteFunc: func(old interface{}) {
258 oldNode, ok := old.(*corev1.Node)
259 if !ok {
260 logger.Error("Received non-node item in node event handler", zap.Reflect("item", oldNode))
261 return
262 }
263 if err := s.removeNode(oldNode); err != nil {
264 logger.Warn("Failed to sync node", zap.Error(err))
265 }
266 },
267 })
268
269 supervisor.Signal(ctx, supervisor.SignalHealthy)
270 nodeInformer.Informer().Run(ctx.Done())
271 return ctx.Err()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200272}