core/internal: move containerd and kubernetes to localstorage

This moves the last users of the old 'storage' library onto 'localstorage'. We move a lot of 'runtime' directories to a single `/ephemeral` root. This could be called `/run`, but that might imply FHS compliance - which we don't have, nor want to have.

We also slightly refactor Kubernetes services to be a bit nicer to spawn. But generally, this is a pure refactor, with no functional changes.

Test Plan: this should fail. part of a larger stack. D590 is the first tip of the stack that should work.

X-Origin-Diff: phab/D589
GitOrigin-RevId: d2a7c0bb52c2a7c753199221c609e03474936c22
diff --git a/core/internal/kubernetes/clusternet/clusternet.go b/core/internal/kubernetes/clusternet/clusternet.go
index aa3e7ce..5c42bb8 100644
--- a/core/internal/kubernetes/clusternet/clusternet.go
+++ b/core/internal/kubernetes/clusternet/clusternet.go
@@ -30,10 +30,13 @@
 	"encoding/json"
 	"errors"
 	"fmt"
-	"io/ioutil"
 	"net"
 	"os"
 
+	"k8s.io/client-go/informers"
+
+	"k8s.io/client-go/kubernetes"
+
 	"github.com/vishvananda/netlink"
 	"go.uber.org/zap"
 	"golang.zx2c4.com/wireguard/wgctrl"
@@ -41,33 +44,34 @@
 	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/types"
-	"k8s.io/client-go/informers"
-	coreinformers "k8s.io/client-go/informers/core/v1"
-	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/tools/cache"
 
 	"git.monogon.dev/source/nexantic.git/core/internal/common"
 	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+	"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
 	"git.monogon.dev/source/nexantic.git/core/pkg/jsonpatch"
 )
 
 const (
 	clusterNetDeviceName = "clusternet"
 	publicKeyAnnotation  = "node.smalltown.nexantic.com/wg-pubkey"
-
-	privateKeyPath = "/data/kubernetes/clusternet.key"
 )
 
-type clusternet struct {
-	nodeName     string
-	wgClient     *wgctrl.Client
-	nodeInformer coreinformers.NodeInformer
-	logger       *zap.Logger
+type Service struct {
+	NodeName        string
+	Kubernetes      kubernetes.Interface
+	ClusterNet      net.IPNet
+	InformerFactory informers.SharedInformerFactory
+	DataDirectory   *localstorage.DataKubernetesClusterNetworkingDirectory
+
+	wgClient *wgctrl.Client
+	privKey  wgtypes.Key
+	logger   *zap.Logger
 }
 
 // ensureNode creates/updates the corresponding WireGuard peer entry for the given node objet
-func (c *clusternet) ensureNode(newNode *corev1.Node) error {
-	if newNode.Name == c.nodeName {
+func (s *Service) ensureNode(newNode *corev1.Node) error {
+	if newNode.Name == s.NodeName {
 		// Node doesn't need to connect to itself
 		return nil
 	}
@@ -83,12 +87,12 @@
 	for _, addr := range newNode.Status.Addresses {
 		if addr.Type == corev1.NodeInternalIP {
 			if internalIP != nil {
-				c.logger.Warn("More than one NodeInternalIP specified, using the first one")
+				s.logger.Warn("More than one NodeInternalIP specified, using the first one")
 				break
 			}
 			internalIP = net.ParseIP(addr.Address)
 			if internalIP == nil {
-				c.logger.Warn("failed to parse Internal IP")
+				s.logger.Warn("failed to parse Internal IP")
 			}
 		}
 	}
@@ -99,16 +103,16 @@
 	for _, podNetStr := range newNode.Spec.PodCIDRs {
 		_, podNet, err := net.ParseCIDR(podNetStr)
 		if err != nil {
-			c.logger.Warn("Node PodCIDR failed to parse, ignored", zap.Error(err), zap.String("node", newNode.Name))
+			s.logger.Warn("Node PodCIDR failed to parse, ignored", zap.Error(err), zap.String("node", newNode.Name))
 			continue
 		}
 		allowedIPs = append(allowedIPs, *podNet)
 	}
-	c.logger.Debug("Adding/Updating WireGuard peer node", zap.String("node", newNode.Name),
+	s.logger.Debug("Adding/Updating WireGuard peer node", zap.String("node", newNode.Name),
 		zap.String("endpointIP", internalIP.String()), zap.Any("allowedIPs", allowedIPs))
 	// WireGuard's kernel side has create/update semantics on peers by default. So we can just add the peer multiple
 	// times to update it.
-	err = c.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+	err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
 		Peers: []wgtypes.PeerConfig{{
 			PublicKey:         pubKey,
 			Endpoint:          &net.UDPAddr{Port: common.WireGuardPort, IP: internalIP},
@@ -123,8 +127,8 @@
 }
 
 // removeNode removes the corresponding WireGuard peer entry for the given node object
-func (c *clusternet) removeNode(oldNode *corev1.Node) error {
-	if oldNode.Name == c.nodeName {
+func (s *Service) removeNode(oldNode *corev1.Node) error {
+	if oldNode.Name == s.NodeName {
 		// Node doesn't need to connect to itself
 		return nil
 	}
@@ -136,7 +140,7 @@
 	if err != nil {
 		return fmt.Errorf("node public-key annotation not decodable: %w", err)
 	}
-	err = c.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+	err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
 		Peers: []wgtypes.PeerConfig{{
 			PublicKey: pubKey,
 			Remove:    true,
@@ -148,119 +152,121 @@
 	return nil
 }
 
-// EnsureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
-func EnsureOnDiskKey() (*wgtypes.Key, error) {
-	privKeyRaw, err := ioutil.ReadFile(privateKeyPath)
+// ensureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
+func (s *Service) ensureOnDiskKey() error {
+	keyRaw, err := s.DataDirectory.Key.Read()
 	if os.IsNotExist(err) {
-		privKey, err := wgtypes.GeneratePrivateKey()
+		key, err := wgtypes.GeneratePrivateKey()
 		if err != nil {
-			return nil, fmt.Errorf("failed to generate private key: %w", err)
+			return fmt.Errorf("failed to generate private key: %w", err)
 		}
-		if err := ioutil.WriteFile(privateKeyPath, []byte(privKey.String()), 0600); err != nil {
-			return nil, fmt.Errorf("failed to store newly generated key: %w", err)
+		if err := s.DataDirectory.Key.Write([]byte(key.String()), 0600); err != nil {
+			return fmt.Errorf("failed to store newly generated key: %w", err)
 		}
-		return &privKey, nil
+
+		s.privKey = key
+		return nil
 	} else if err != nil {
-		return nil, fmt.Errorf("failed to load on-disk key: %w", err)
+		return fmt.Errorf("failed to load on-disk key: %w", err)
 	}
-	privKey, err := wgtypes.ParseKey(string(privKeyRaw))
+
+	key, err := wgtypes.ParseKey(string(keyRaw))
 	if err != nil {
-		return nil, fmt.Errorf("invalid private key in file: %w", err)
+		return fmt.Errorf("invalid private key in file: %w", err)
 	}
-	return &privKey, nil
+	s.privKey = key
+	return nil
+}
+
+// annotateThisNode annotates the node (as defined by NodeName) with the wireguard public key of this node.
+func (s *Service) annotateThisNode(ctx context.Context) error {
+	patch := []jsonpatch.JsonPatchOp{{
+		Operation: "add",
+		Path:      "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
+		Value:     s.privKey.PublicKey().String(),
+	}}
+
+	patchRaw, err := json.Marshal(patch)
+	if err != nil {
+		return fmt.Errorf("failed to encode JSONPatch: %w", err)
+	}
+
+	if _, err := s.Kubernetes.CoreV1().Nodes().Patch(ctx, s.NodeName, types.JSONPatchType, patchRaw, metav1.PatchOptions{}); err != nil {
+		return fmt.Errorf("failed to patch resource: %w", err)
+	}
+
+	return nil
 }
 
 // Run runs the ClusterNet service. See package description for what it does.
-func Run(informerFactory informers.SharedInformerFactory, clusterNet net.IPNet, clientSet kubernetes.Interface, key *wgtypes.Key) supervisor.Runnable {
-	return func(ctx context.Context) error {
-		logger := supervisor.Logger(ctx)
-		nodeName, err := os.Hostname()
-		if err != nil {
-			return fmt.Errorf("failed to determine hostname: %w", err)
-		}
-		wgClient, err := wgctrl.New()
-		if err != nil {
-			return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
-		}
+func (s *Service) Run(ctx context.Context) error {
+	logger := supervisor.Logger(ctx)
 
-		nodeAnnotationPatch := []jsonpatch.JsonPatchOp{{
-			Operation: "add",
-			Path:      "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
-			Value:     key.PublicKey().String(),
-		}}
-
-		nodeAnnotationPatchRaw, err := json.Marshal(nodeAnnotationPatch)
-		if err != nil {
-			return fmt.Errorf("failed to encode JSONPatch: %w", err)
-		}
-
-		if _, err := clientSet.CoreV1().Nodes().Patch(ctx, nodeName, types.JSONPatchType, nodeAnnotationPatchRaw, metav1.PatchOptions{}); err != nil {
-			return fmt.Errorf("failed to set ClusterNet public key for node: %w", err)
-		}
-
-		nodeInformer := informerFactory.Core().V1().Nodes()
-		wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
-		if err := netlink.LinkAdd(wgInterface); err != nil {
-			return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
-		}
-		defer netlink.LinkDel(wgInterface)
-
-		listenPort := common.WireGuardPort
-		if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
-			PrivateKey: key,
-			ListenPort: &listenPort,
-		}); err != nil {
-			return fmt.Errorf("failed to set up WireGuard interface: %w", err)
-		}
-
-		if err := netlink.RouteAdd(&netlink.Route{
-			Dst:       &clusterNet,
-			LinkIndex: wgInterface.Index,
-		}); err != nil && !os.IsExist(err) {
-			return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
-		}
-
-		cnet := clusternet{
-			wgClient:     wgClient,
-			nodeInformer: nodeInformer,
-			logger:       logger,
-			nodeName:     nodeName,
-		}
-
-		nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
-			AddFunc: func(new interface{}) {
-				newNode, ok := new.(*corev1.Node)
-				if !ok {
-					logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
-					return
-				}
-				if err := cnet.ensureNode(newNode); err != nil {
-					logger.Warn("Failed to sync node", zap.Error(err))
-				}
-			},
-			UpdateFunc: func(old, new interface{}) {
-				newNode, ok := new.(*corev1.Node)
-				if !ok {
-					logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
-					return
-				}
-				if err := cnet.ensureNode(newNode); err != nil {
-					logger.Warn("Failed to sync node", zap.Error(err))
-				}
-			},
-			DeleteFunc: func(old interface{}) {
-				oldNode, ok := old.(*corev1.Node)
-				if !ok {
-					logger.Error("Received non-node item in node event handler", zap.Reflect("item", oldNode))
-					return
-				}
-				if err := cnet.removeNode(oldNode); err != nil {
-					logger.Warn("Failed to sync node", zap.Error(err))
-				}
-			},
-		})
-		supervisor.Signal(ctx, supervisor.SignalHealthy)
-		nodeInformer.Informer().Run(ctx.Done())
-		return ctx.Err()
+	wgClient, err := wgctrl.New()
+	if err != nil {
+		return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
 	}
+
+	wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
+	if err := netlink.LinkAdd(wgInterface); err != nil {
+		return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
+	}
+	defer netlink.LinkDel(wgInterface)
+
+	listenPort := common.WireGuardPort
+	if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+		PrivateKey: &s.privKey,
+		ListenPort: &listenPort,
+	}); err != nil {
+		return fmt.Errorf("failed to set up WireGuard interface: %w", err)
+	}
+
+	if err := netlink.RouteAdd(&netlink.Route{
+		Dst:       &s.ClusterNet,
+		LinkIndex: wgInterface.Index,
+	}); err != nil && !os.IsExist(err) {
+		return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
+	}
+
+	if err := s.annotateThisNode(ctx); err != nil {
+		return fmt.Errorf("when annotating this node with public key: %w", err)
+	}
+
+	nodeInformer := s.InformerFactory.Core().V1().Nodes()
+	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+		AddFunc: func(new interface{}) {
+			newNode, ok := new.(*corev1.Node)
+			if !ok {
+				logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
+				return
+			}
+			if err := s.ensureNode(newNode); err != nil {
+				logger.Warn("Failed to sync node", zap.Error(err))
+			}
+		},
+		UpdateFunc: func(old, new interface{}) {
+			newNode, ok := new.(*corev1.Node)
+			if !ok {
+				logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
+				return
+			}
+			if err := s.ensureNode(newNode); err != nil {
+				logger.Warn("Failed to sync node", zap.Error(err))
+			}
+		},
+		DeleteFunc: func(old interface{}) {
+			oldNode, ok := old.(*corev1.Node)
+			if !ok {
+				logger.Error("Received non-node item in node event handler", zap.Reflect("item", oldNode))
+				return
+			}
+			if err := s.removeNode(oldNode); err != nil {
+				logger.Warn("Failed to sync node", zap.Error(err))
+			}
+		},
+	})
+
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	nodeInformer.Informer().Run(ctx.Done())
+	return ctx.Err()
 }