blob: 7b51c304fb064fe8826ec868820223acf7594c8d [file] [log] [blame]
Lorenz Brunf042e6f2020-06-24 16:46:09 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanski216fe7b2021-05-21 18:36:16 +020017// Package clusternet implements a WireGuard-based overlay network for
18// Kubernetes. It relies on controller-manager's IPAM to assign IP ranges to
19// nodes and on Kubernetes' Node objects to distribute the Node IPs and public
20// keys.
Lorenz Brunf042e6f2020-06-24 16:46:09 +020021//
Serge Bazanski216fe7b2021-05-21 18:36:16 +020022// It sets up a single WireGuard network interface and routes the entire
23// ClusterCIDR into that network interface, relying on WireGuard's AllowedIPs
24// mechanism to look up the correct peer node to send the traffic to. This
25// means that the routing table doesn't change and doesn't have to be
26// separately managed. When clusternet is started it annotates its WireGuard
27// public key onto its node object.
28// For each node object that's created or updated on the K8s apiserver it
29// checks if a public key annotation is set and if yes a peer with that public
30// key, its InternalIP as endpoint and the CIDR for that node as AllowedIPs is
31// created.
Lorenz Brunf042e6f2020-06-24 16:46:09 +020032package clusternet
33
34import (
35 "context"
Serge Bazanski79208522023-03-28 20:14:58 +020036 "net/netip"
37 "time"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020038
Lorenz Brunf042e6f2020-06-24 16:46:09 +020039 corev1 "k8s.io/api/core/v1"
Serge Bazanski79208522023-03-28 20:14:58 +020040 "k8s.io/apimachinery/pkg/fields"
Serge Bazanski77cb6c52020-12-19 00:09:22 +010041 "k8s.io/client-go/kubernetes"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020042 "k8s.io/client-go/tools/cache"
43
Serge Bazanski79208522023-03-28 20:14:58 +020044 oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
45 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanski216fe7b2021-05-21 18:36:16 +020046 "source.monogon.dev/metropolis/pkg/logtree"
Serge Bazanski31370b02021-01-07 16:31:14 +010047 "source.monogon.dev/metropolis/pkg/supervisor"
Lorenz Brunf042e6f2020-06-24 16:46:09 +020048)
49
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020050type Service struct {
Serge Bazanski79208522023-03-28 20:14:58 +020051 NodeName string
52 Kubernetes kubernetes.Interface
53 Prefixes event.Value[*oclusternet.Prefixes]
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020054
Serge Bazanski79208522023-03-28 20:14:58 +020055 logger logtree.LeveledLogger
Lorenz Brunf042e6f2020-06-24 16:46:09 +020056}
57
Serge Bazanski79208522023-03-28 20:14:58 +020058// ensureNode is called any time the node that this Service is running on gets
59// updated. It uses this data to update this node's prefixes in the Curator.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020060func (s *Service) ensureNode(newNode *corev1.Node) error {
Serge Bazanski79208522023-03-28 20:14:58 +020061 if newNode.Name != s.NodeName {
62 // We only care about our own node
Lorenz Brunf042e6f2020-06-24 16:46:09 +020063 return nil
64 }
Serge Bazanski79208522023-03-28 20:14:58 +020065
Serge Bazanski79208522023-03-28 20:14:58 +020066 var prefixes oclusternet.Prefixes
Lorenz Brunf042e6f2020-06-24 16:46:09 +020067 for _, podNetStr := range newNode.Spec.PodCIDRs {
Serge Bazanski79208522023-03-28 20:14:58 +020068 prefix, err := netip.ParsePrefix(podNetStr)
Lorenz Brunf042e6f2020-06-24 16:46:09 +020069 if err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +010070 s.logger.Warningf("Node %s PodCIDR failed to parse, ignored: %v", newNode.Name, err)
Lorenz Brunf042e6f2020-06-24 16:46:09 +020071 continue
72 }
Serge Bazanski79208522023-03-28 20:14:58 +020073 prefixes = append(prefixes, prefix)
Lorenz Brunf042e6f2020-06-24 16:46:09 +020074 }
Lorenz Brunf042e6f2020-06-24 16:46:09 +020075
Serge Bazanski79208522023-03-28 20:14:58 +020076 s.logger.V(1).Infof("Updating locally originated prefixes: %+v", prefixes)
77 s.Prefixes.Set(&prefixes)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020078 return nil
Lorenz Brunf042e6f2020-06-24 16:46:09 +020079}
80
81// Run runs the ClusterNet service. See package description for what it does.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020082func (s *Service) Run(ctx context.Context) error {
83 logger := supervisor.Logger(ctx)
Lorenz Brunca24cfa2020-08-18 13:49:37 +020084 s.logger = logger
Lorenz Brunf042e6f2020-06-24 16:46:09 +020085
Serge Bazanski79208522023-03-28 20:14:58 +020086 // Make a 'shared' informer. It's shared by name, but we don't actually share it
87 // - instead we have to use it as the standard Informer API does not support
88 // error handling. And we want to use a dedicated informer because we want to
89 // only watch our own node.
90 lw := cache.NewListWatchFromClient(
91 s.Kubernetes.CoreV1().RESTClient(),
92 "nodes", "",
93 fields.OneTermEqualSelector("metadata.name", s.NodeName),
94 )
95 ni := cache.NewSharedInformer(lw, &corev1.Node{}, time.Second*5)
96 ni.AddEventHandler(cache.ResourceEventHandlerFuncs{
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020097 AddFunc: func(new interface{}) {
98 newNode, ok := new.(*corev1.Node)
99 if !ok {
Serge Bazanskic7359672020-10-30 16:38:57 +0100100 logger.Errorf("Received non-node item %+v in node event handler", new)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200101 return
102 }
103 if err := s.ensureNode(newNode); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100104 logger.Warningf("Failed to sync node: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200105 }
106 },
107 UpdateFunc: func(old, new interface{}) {
108 newNode, ok := new.(*corev1.Node)
109 if !ok {
Serge Bazanskic7359672020-10-30 16:38:57 +0100110 logger.Errorf("Received non-node item %+v in node event handler", new)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200111 return
112 }
113 if err := s.ensureNode(newNode); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100114 logger.Warningf("Failed to sync node: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200115 }
116 },
Serge Bazanski79208522023-03-28 20:14:58 +0200117 })
118 ni.SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
119 supervisor.Logger(ctx).Errorf("node informer watch error: %v", err)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200120 })
121
122 supervisor.Signal(ctx, supervisor.SignalHealthy)
Serge Bazanski79208522023-03-28 20:14:58 +0200123 ni.Run(ctx.Done())
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200124 return ctx.Err()
Lorenz Brunf042e6f2020-06-24 16:46:09 +0200125}