m/n/kubernetes: use node clusternet to submit cluster networking routes

This completes the work on using the new cluster networking service from
Kubernetes, thereby allowing non-worker nodes to participate in cluster
networking.

Change-Id: I7f3759186d7c8cc49833be29963f82a1714d293e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1418
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/kubernetes/clusternet/clusternet.go b/metropolis/node/kubernetes/clusternet/clusternet.go
index 3c86361..f4974eb 100644
--- a/metropolis/node/kubernetes/clusternet/clusternet.go
+++ b/metropolis/node/kubernetes/clusternet/clusternet.go
@@ -33,176 +33,69 @@
 
 import (
 	"context"
-	"encoding/json"
 	"errors"
-	"fmt"
-	"net"
-	"os"
+	"net/netip"
+	"time"
 
-	"github.com/vishvananda/netlink"
-	"golang.zx2c4.com/wireguard/wgctrl"
-	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 	corev1 "k8s.io/api/core/v1"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/apimachinery/pkg/types"
-	"k8s.io/client-go/informers"
+	"k8s.io/apimachinery/pkg/fields"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/tools/cache"
 
-	common "source.monogon.dev/metropolis/node"
-	"source.monogon.dev/metropolis/node/core/localstorage"
-	"source.monogon.dev/metropolis/pkg/jsonpatch"
+	oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
+	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
-const (
-	clusterNetDeviceName = "clusternet"
-	publicKeyAnnotation  = "node.metropolis.monogon.dev/wg-pubkey"
-)
-
 type Service struct {
-	NodeName        string
-	Kubernetes      kubernetes.Interface
-	ClusterNet      net.IPNet
-	InformerFactory informers.SharedInformerFactory
-	DataDirectory   *localstorage.DataKubernetesClusterNetworkingDirectory
+	NodeName   string
+	Kubernetes kubernetes.Interface
+	Prefixes   event.Value[*oclusternet.Prefixes]
 
-	wgClient *wgctrl.Client
-	privKey  wgtypes.Key
-	logger   logtree.LeveledLogger
+	logger logtree.LeveledLogger
 }
 
-// ensureNode creates/updates the corresponding WireGuard peer entry for the
-// given node objet
+// ensureNode is called any time the node that this Service is running on gets
+// updated. It uses this data to update this node's prefixes in the Curator.
 func (s *Service) ensureNode(newNode *corev1.Node) error {
-	if newNode.Name == s.NodeName {
-		// Node doesn't need to connect to itself
+	if newNode.Name != s.NodeName {
+		// We only care about our own node
 		return nil
 	}
-	pubKeyRaw := newNode.Annotations[publicKeyAnnotation]
-	if pubKeyRaw == "" {
-		return nil
-	}
-	pubKey, err := wgtypes.ParseKey(pubKeyRaw)
-	if err != nil {
-		return fmt.Errorf("failed to parse public-key annotation: %w", err)
-	}
-	var internalIP net.IP
+
+	var internalIP netip.Addr
 	for _, addr := range newNode.Status.Addresses {
 		if addr.Type == corev1.NodeInternalIP {
-			if internalIP != nil {
+			if internalIP.IsUnspecified() {
 				s.logger.Warningf("More than one NodeInternalIP specified, using the first one")
 				break
 			}
-			internalIP = net.ParseIP(addr.Address)
-			if internalIP == nil {
+			ip, err := netip.ParseAddr(addr.Address)
+			if err != nil {
 				s.logger.Warningf("Failed to parse Internal IP %s", addr.Address)
+				continue
 			}
+			internalIP = ip
 		}
 	}
-	if internalIP == nil {
+	if internalIP.IsUnspecified() {
 		return errors.New("node has no Internal IP")
 	}
-	var allowedIPs []net.IPNet
+
+	var prefixes oclusternet.Prefixes
 	for _, podNetStr := range newNode.Spec.PodCIDRs {
-		_, podNet, err := net.ParseCIDR(podNetStr)
+		prefix, err := netip.ParsePrefix(podNetStr)
 		if err != nil {
 			s.logger.Warningf("Node %s PodCIDR failed to parse, ignored: %v", newNode.Name, err)
 			continue
 		}
-		allowedIPs = append(allowedIPs, *podNet)
+		prefixes = append(prefixes, prefix)
 	}
-	allowedIPs = append(allowedIPs, net.IPNet{IP: internalIP, Mask: net.CIDRMask(32, 32)})
-	s.logger.V(1).Infof("Adding/Updating WireGuard peer node %s, endpoint %s, allowedIPs %+v", newNode.Name, internalIP.String(), allowedIPs)
-	// WireGuard's kernel side has create/update semantics on peers by default.
-	// So we can just add the peer multiple times to update it.
-	err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
-		Peers: []wgtypes.PeerConfig{{
-			PublicKey:         pubKey,
-			Endpoint:          &net.UDPAddr{Port: int(common.WireGuardPort), IP: internalIP},
-			ReplaceAllowedIPs: true,
-			AllowedIPs:        allowedIPs,
-		}},
-	})
-	if err != nil {
-		return fmt.Errorf("failed to add WireGuard peer node: %w", err)
-	}
-	return nil
-}
+	prefixes = append(prefixes, netip.PrefixFrom(internalIP, 32))
 
-// removeNode removes the corresponding WireGuard peer entry for the given node
-// object
-func (s *Service) removeNode(oldNode *corev1.Node) error {
-	if oldNode.Name == s.NodeName {
-		// Node doesn't need to connect to itself
-		return nil
-	}
-	pubKeyRaw := oldNode.Annotations[publicKeyAnnotation]
-	if pubKeyRaw == "" {
-		return nil
-	}
-	pubKey, err := wgtypes.ParseKey(pubKeyRaw)
-	if err != nil {
-		return fmt.Errorf("node public-key annotation not decodable: %w", err)
-	}
-	err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
-		Peers: []wgtypes.PeerConfig{{
-			PublicKey: pubKey,
-			Remove:    true,
-		}},
-	})
-	if err != nil {
-		return fmt.Errorf("failed to remove WireGuard peer node: %w", err)
-	}
-	return nil
-}
-
-// ensureOnDiskKey loads the private key from disk or (if none exists)
-// generates one and persists it.
-func (s *Service) ensureOnDiskKey() error {
-	keyRaw, err := s.DataDirectory.Key.Read()
-	if os.IsNotExist(err) {
-		key, err := wgtypes.GeneratePrivateKey()
-		if err != nil {
-			return fmt.Errorf("failed to generate private key: %w", err)
-		}
-		if err := s.DataDirectory.Key.Write([]byte(key.String()), 0600); err != nil {
-			return fmt.Errorf("failed to store newly generated key: %w", err)
-		}
-
-		s.privKey = key
-		return nil
-	} else if err != nil {
-		return fmt.Errorf("failed to load on-disk key: %w", err)
-	}
-
-	key, err := wgtypes.ParseKey(string(keyRaw))
-	if err != nil {
-		return fmt.Errorf("invalid private key in file: %w", err)
-	}
-	s.privKey = key
-	return nil
-}
-
-// annotateThisNode annotates the node (as defined by NodeName) with the
-// wireguard public key of this node.
-func (s *Service) annotateThisNode(ctx context.Context) error {
-	patch := []jsonpatch.JsonPatchOp{{
-		Operation: "add",
-		Path:      "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
-		Value:     s.privKey.PublicKey().String(),
-	}}
-
-	patchRaw, err := json.Marshal(patch)
-	if err != nil {
-		return fmt.Errorf("failed to encode JSONPatch: %w", err)
-	}
-
-	if _, err := s.Kubernetes.CoreV1().Nodes().Patch(ctx, s.NodeName, types.JSONPatchType, patchRaw, metav1.PatchOptions{}); err != nil {
-		return fmt.Errorf("failed to patch resource: %w", err)
-	}
-
+	s.logger.V(1).Infof("Updating locally originated prefixes: %+v", prefixes)
+	s.Prefixes.Set(&prefixes)
 	return nil
 }
 
@@ -211,46 +104,17 @@
 	logger := supervisor.Logger(ctx)
 	s.logger = logger
 
-	wgClient, err := wgctrl.New()
-	if err != nil {
-		return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
-	}
-	s.wgClient = wgClient
-
-	if err := s.ensureOnDiskKey(); err != nil {
-		return fmt.Errorf("failed to ensure on-disk key: %w", err)
-	}
-
-	wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
-	if err := netlink.LinkAdd(wgInterface); err != nil {
-		return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
-	}
-	defer netlink.LinkDel(wgInterface)
-
-	listenPort := int(common.WireGuardPort)
-	if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
-		PrivateKey: &s.privKey,
-		ListenPort: &listenPort,
-	}); err != nil {
-		return fmt.Errorf("failed to set up WireGuard interface: %w", err)
-	}
-
-	if err := netlink.RouteAdd(&netlink.Route{
-		Dst:       &s.ClusterNet,
-		LinkIndex: wgInterface.Index,
-	}); err != nil && !os.IsExist(err) {
-		return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
-	}
-
-	if err := s.annotateThisNode(ctx); err != nil {
-		return fmt.Errorf("when annotating this node with public key: %w", err)
-	}
-
-	nodeInformer := s.InformerFactory.Core().V1().Nodes()
-	nodeInformer.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
-		supervisor.Logger(ctx).Errorf("node informer watch error: %v", err)
-	})
-	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+	// Make a 'shared' informer. It's shared by name, but we don't actually share it
+	// - instead we have to use it as the standard Informer API does not support
+	// error handling. And we want to use a dedicated informer because we want to
+	// only watch our own node.
+	lw := cache.NewListWatchFromClient(
+		s.Kubernetes.CoreV1().RESTClient(),
+		"nodes", "",
+		fields.OneTermEqualSelector("metadata.name", s.NodeName),
+	)
+	ni := cache.NewSharedInformer(lw, &corev1.Node{}, time.Second*5)
+	ni.AddEventHandler(cache.ResourceEventHandlerFuncs{
 		AddFunc: func(new interface{}) {
 			newNode, ok := new.(*corev1.Node)
 			if !ok {
@@ -271,19 +135,12 @@
 				logger.Warningf("Failed to sync node: %v", err)
 			}
 		},
-		DeleteFunc: func(old interface{}) {
-			oldNode, ok := old.(*corev1.Node)
-			if !ok {
-				logger.Errorf("Received non-node item %+v in node event handler", oldNode)
-				return
-			}
-			if err := s.removeNode(oldNode); err != nil {
-				logger.Warningf("Failed to sync node: %v", err)
-			}
-		},
+	})
+	ni.SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
+		supervisor.Logger(ctx).Errorf("node informer watch error: %v", err)
 	})
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
-	nodeInformer.Informer().Run(ctx.Done())
+	ni.Run(ctx.Done())
 	return ctx.Err()
 }