|  | // Copyright 2020 The Monogon Project Authors. | 
|  | // | 
|  | // SPDX-License-Identifier: Apache-2.0 | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  |  | 
|  | // Package clusternet implements a WireGuard-based overlay network for | 
|  | // Kubernetes. It relies on controller-manager's IPAM to assign IP ranges to | 
|  | // nodes and on Kubernetes' Node objects to distribute the Node IPs and public | 
|  | // keys. | 
|  | // | 
|  | // It sets up a single WireGuard network interface and routes the entire | 
|  | // ClusterCIDR into that network interface, relying on WireGuard's AllowedIPs | 
|  | // mechanism to look up the correct peer node to send the traffic to. This | 
|  | // means that the routing table doesn't change and doesn't have to be | 
|  | // separately managed. When clusternet is started it annotates its WireGuard | 
|  | // public key onto its node object. | 
|  | // For each node object that's created or updated on the K8s apiserver it | 
|  | // checks if a public key annotation is set and if yes a peer with that public | 
|  | // key, its InternalIP as endpoint and the CIDR for that node as AllowedIPs is | 
|  | // created. | 
|  | package clusternet | 
|  |  | 
|  | import ( | 
|  | "context" | 
|  | "net/netip" | 
|  | "time" | 
|  |  | 
|  | corev1 "k8s.io/api/core/v1" | 
|  | "k8s.io/apimachinery/pkg/fields" | 
|  | "k8s.io/client-go/kubernetes" | 
|  | "k8s.io/client-go/tools/cache" | 
|  |  | 
|  | oclusternet "source.monogon.dev/metropolis/node/core/clusternet" | 
|  | "source.monogon.dev/metropolis/pkg/event" | 
|  | "source.monogon.dev/metropolis/pkg/logtree" | 
|  | "source.monogon.dev/metropolis/pkg/supervisor" | 
|  | ) | 
|  |  | 
|  | type Service struct { | 
|  | NodeName   string | 
|  | Kubernetes kubernetes.Interface | 
|  | Prefixes   event.Value[*oclusternet.Prefixes] | 
|  |  | 
|  | logger logtree.LeveledLogger | 
|  | } | 
|  |  | 
|  | // ensureNode is called any time the node that this Service is running on gets | 
|  | // updated. It uses this data to update this node's prefixes in the Curator. | 
|  | func (s *Service) ensureNode(newNode *corev1.Node) error { | 
|  | if newNode.Name != s.NodeName { | 
|  | // We only care about our own node | 
|  | return nil | 
|  | } | 
|  |  | 
|  | var prefixes oclusternet.Prefixes | 
|  | for _, podNetStr := range newNode.Spec.PodCIDRs { | 
|  | prefix, err := netip.ParsePrefix(podNetStr) | 
|  | if err != nil { | 
|  | s.logger.Warningf("Node %s PodCIDR failed to parse, ignored: %v", newNode.Name, err) | 
|  | continue | 
|  | } | 
|  | prefixes = append(prefixes, prefix) | 
|  | } | 
|  |  | 
|  | s.logger.V(1).Infof("Updating locally originated prefixes: %+v", prefixes) | 
|  | s.Prefixes.Set(&prefixes) | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // Run runs the ClusterNet service. See package description for what it does. | 
|  | func (s *Service) Run(ctx context.Context) error { | 
|  | logger := supervisor.Logger(ctx) | 
|  | s.logger = logger | 
|  |  | 
|  | // Make a 'shared' informer. It's shared by name, but we don't actually share it | 
|  | // - instead we have to use it as the standard Informer API does not support | 
|  | // error handling. And we want to use a dedicated informer because we want to | 
|  | // only watch our own node. | 
|  | lw := cache.NewListWatchFromClient( | 
|  | s.Kubernetes.CoreV1().RESTClient(), | 
|  | "nodes", "", | 
|  | fields.OneTermEqualSelector("metadata.name", s.NodeName), | 
|  | ) | 
|  | ni := cache.NewSharedInformer(lw, &corev1.Node{}, time.Second*5) | 
|  | ni.AddEventHandler(cache.ResourceEventHandlerFuncs{ | 
|  | AddFunc: func(new interface{}) { | 
|  | newNode, ok := new.(*corev1.Node) | 
|  | if !ok { | 
|  | logger.Errorf("Received non-node item %+v in node event handler", new) | 
|  | return | 
|  | } | 
|  | if err := s.ensureNode(newNode); err != nil { | 
|  | logger.Warningf("Failed to sync node: %v", err) | 
|  | } | 
|  | }, | 
|  | UpdateFunc: func(old, new interface{}) { | 
|  | newNode, ok := new.(*corev1.Node) | 
|  | if !ok { | 
|  | logger.Errorf("Received non-node item %+v in node event handler", new) | 
|  | return | 
|  | } | 
|  | if err := s.ensureNode(newNode); err != nil { | 
|  | logger.Warningf("Failed to sync node: %v", err) | 
|  | } | 
|  | }, | 
|  | }) | 
|  | ni.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { | 
|  | supervisor.Logger(ctx).Errorf("node informer watch error: %v", err) | 
|  | }) | 
|  |  | 
|  | supervisor.Signal(ctx, supervisor.SignalHealthy) | 
|  | ni.Run(ctx.Done()) | 
|  | return ctx.Err() | 
|  | } |