core/internal: move containerd and kubernetes to localstorage

This moves the last users of the old 'storage' library onto 'localstorage'. We move a lot of 'runtime' directories to a single `/ephemeral` root. This could be called `/run`, but that might imply FHS compliance - which we don't have, nor want to have.

We also slightly refactor Kubernetes services to be a bit nicer to spawn. But generally, this is a pure refactor, with no functional changes.

Test Plan: this should fail. part of a larger stack. D590 is the first tip of the stack that should work.

X-Origin-Diff: phab/D589
GitOrigin-RevId: d2a7c0bb52c2a7c753199221c609e03474936c22
diff --git a/core/internal/kubernetes/provisioner.go b/core/internal/kubernetes/provisioner.go
index c864715..0e9e419 100644
--- a/core/internal/kubernetes/provisioner.go
+++ b/core/internal/kubernetes/provisioner.go
@@ -25,11 +25,6 @@
 	"path/filepath"
 
 	"go.uber.org/zap"
-
-	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
-	"git.monogon.dev/source/nexantic.git/core/internal/storage"
-	"git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
-
 	v1 "k8s.io/api/core/v1"
 	storagev1 "k8s.io/api/storage/v1"
 	apierrs "k8s.io/apimachinery/pkg/api/errors"
@@ -44,25 +39,31 @@
 	"k8s.io/client-go/tools/record"
 	ref "k8s.io/client-go/tools/reference"
 	"k8s.io/client-go/util/workqueue"
+
+	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+	"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
+	"git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
 )
 
-// ONCHANGE(//core/internal/kubernetes/reconciler:resources_csi.go): needs to match csiProvisionerName declared.
-const csiProvisionerName = "com.nexantic.smalltown.vfs"
+// ONCHANGE(//core/internal/kubernetes/reconciler:resources_csi.go): needs to match csiProvisionerServerName declared.
+const csiProvisionerServerName = "com.nexantic.smalltown.vfs"
 
-// csiProvisioner is responsible for the provisioning and deprovisioning of CSI-based container volumes. It runs on all
+// csiProvisionerServer is responsible for the provisioning and deprovisioning of CSI-based container volumes. It runs on all
 // nodes and watches PVCs for ones assigned to the node it's running on and fulfills the provisioning request by
 // creating a directory, applying a quota and creating the corresponding PV. When the PV is released and its retention
 // policy is Delete, the directory and the PV resource are deleted.
-type csiProvisioner struct {
-	nodeName             string
-	kubeclientset        kubernetes.Interface
+type csiProvisionerServer struct {
+	NodeName         string
+	Kubernetes       kubernetes.Interface
+	InformerFactory  informers.SharedInformerFactory
+	VolumesDirectory *localstorage.DataVolumesDirectory
+
 	claimQueue           workqueue.RateLimitingInterface
 	pvQueue              workqueue.RateLimitingInterface
 	recorder             record.EventRecorder
 	pvcInformer          coreinformers.PersistentVolumeClaimInformer
 	pvInformer           coreinformers.PersistentVolumeInformer
 	storageClassInformer storageinformers.StorageClassInformer
-	storageManager       *storage.Manager
 	logger               *zap.Logger
 }
 
@@ -70,79 +71,77 @@
 // the events happening on the Kubernetes control plane and informs us when something happens. If anything happens to
 // PVCs or PVs, we enqueue the identifier of that resource in a work queue. Queues are being worked on by only one
 // worker to limit load and avoid complicated locking infrastructure. Failed items are requeued.
-func runCSIProvisioner(storMan *storage.Manager, kubeclientset kubernetes.Interface, informerFactory informers.SharedInformerFactory) supervisor.Runnable {
-	return func(ctx context.Context) error {
-		nodeName, err := os.Hostname()
-		if err != nil {
-			panic(err)
-		}
+func (p *csiProvisionerServer) Run(ctx context.Context) error {
+	// The recorder is used to log Kubernetes events for successful or failed volume provisions. These events then
+	// show up in `kubectl describe pvc` and can be used by admins to debug issues with this provisioner.
+	eventBroadcaster := record.NewBroadcaster()
+	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.Kubernetes.CoreV1().Events("")})
+	p.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerServerName, Host: p.NodeName})
 
-		// The recorder is used to log Kubernetes events for successful or failed volume provisions. These events then
-		// show up in `kubectl describe pvc` and can be used by admins to debug issues with this provisioner.
-		eventBroadcaster := record.NewBroadcaster()
-		eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
-		recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerName, Host: nodeName})
+	p.pvInformer = p.InformerFactory.Core().V1().PersistentVolumes()
+	p.pvcInformer = p.InformerFactory.Core().V1().PersistentVolumeClaims()
+	p.storageClassInformer = p.InformerFactory.Storage().V1().StorageClasses()
 
-		p := &csiProvisioner{
-			nodeName:             nodeName,
-			recorder:             recorder,
-			kubeclientset:        kubeclientset,
-			pvInformer:           informerFactory.Core().V1().PersistentVolumes(),
-			pvcInformer:          informerFactory.Core().V1().PersistentVolumeClaims(),
-			storageClassInformer: informerFactory.Storage().V1().StorageClasses(),
-			claimQueue:           workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
-			pvQueue:              workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
-			storageManager:       storMan,
-			logger:               supervisor.Logger(ctx),
-		}
+	p.claimQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+	p.pvQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
 
-		p.pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
-			AddFunc: p.enqueueClaim,
-			UpdateFunc: func(old, new interface{}) {
-				p.enqueueClaim(new)
-			},
-		})
-		p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
-			AddFunc: p.enqueuePV,
-			UpdateFunc: func(old, new interface{}) {
-				p.enqueuePV(new)
-			},
-		})
+	p.pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+		AddFunc: p.enqueueClaim,
+		UpdateFunc: func(old, new interface{}) {
+			p.enqueueClaim(new)
+		},
+	})
+	p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+		AddFunc: p.enqueuePV,
+		UpdateFunc: func(old, new interface{}) {
+			p.enqueuePV(new)
+		},
+	})
+	p.logger = supervisor.Logger(ctx)
 
-		go p.pvcInformer.Informer().Run(ctx.Done())
-		go p.pvInformer.Informer().Run(ctx.Done())
-		go p.storageClassInformer.Informer().Run(ctx.Done())
+	go p.pvcInformer.Informer().Run(ctx.Done())
+	go p.pvInformer.Informer().Run(ctx.Done())
+	go p.storageClassInformer.Informer().Run(ctx.Done())
 
-		// These will self-terminate once the queues are shut down
-		go p.processQueueItems(p.claimQueue, func(key string) error {
-			return p.processPVC(key)
-		})
-		go p.processQueueItems(p.pvQueue, func(key string) error {
-			return p.processPV(key)
-		})
+	// These will self-terminate once the queues are shut down
+	go p.processQueueItems(p.claimQueue, func(key string) error {
+		return p.processPVC(key)
+	})
+	go p.processQueueItems(p.pvQueue, func(key string) error {
+		return p.processPV(key)
+	})
 
-		supervisor.Signal(ctx, supervisor.SignalHealthy)
-		<-ctx.Done()
-		p.claimQueue.ShutDown()
-		p.pvQueue.ShutDown()
-		return nil
-	}
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	<-ctx.Done()
+	p.claimQueue.ShutDown()
+	p.pvQueue.ShutDown()
+	return nil
 }
 
 // isOurPVC checks if the given PVC is is to be provisioned by this provisioner and has been scheduled onto this node
-func (p *csiProvisioner) isOurPVC(pvc *v1.PersistentVolumeClaim) bool {
-	return pvc.ObjectMeta.Annotations["volume.beta.kubernetes.io/storage-provisioner"] == csiProvisionerName &&
-		(pvc.ObjectMeta.Annotations["volume.kubernetes.io/selected-node"] == p.nodeName)
+func (p *csiProvisionerServer) isOurPVC(pvc *v1.PersistentVolumeClaim) bool {
+	if pvc.ObjectMeta.Annotations["volume.beta.kubernetes.io/storage-provisioner"] != csiProvisionerServerName {
+		return false
+	}
+	if pvc.ObjectMeta.Annotations["volume.kubernetes.io/selected-node"] != p.NodeName {
+		return false
+	}
+	return true
 }
 
 // isOurPV checks if the given PV has been provisioned by this provisioner and has been scheduled onto this node
-func (p *csiProvisioner) isOurPV(pv *v1.PersistentVolume) bool {
-	return pv.ObjectMeta.Annotations["pv.kubernetes.io/provisioned-by"] == csiProvisionerName &&
-		pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] == p.nodeName
+func (p *csiProvisionerServer) isOurPV(pv *v1.PersistentVolume) bool {
+	if pv.ObjectMeta.Annotations["pv.kubernetes.io/provisioned-by"] != csiProvisionerServerName {
+		return false
+	}
+	if pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] != p.NodeName {
+		return false
+	}
+	return true
 }
 
 // enqueueClaim adds an added/changed PVC to the work queue
-func (p *csiProvisioner) enqueueClaim(obj interface{}) {
+func (p *csiProvisionerServer) enqueueClaim(obj interface{}) {
 	key, err := cache.MetaNamespaceKeyFunc(obj)
 	if err != nil {
 		p.logger.Error("Not queuing PVC because key could not be derived", zap.Error(err))
@@ -152,7 +151,7 @@
 }
 
 // enqueuePV adds an added/changed PV to the work queue
-func (p *csiProvisioner) enqueuePV(obj interface{}) {
+func (p *csiProvisionerServer) enqueuePV(obj interface{}) {
 	key, err := cache.MetaNamespaceKeyFunc(obj)
 	if err != nil {
 		p.logger.Error("Not queuing PV because key could not be derived", zap.Error(err))
@@ -163,7 +162,7 @@
 
 // processQueueItems gets items from the given work queue and calls the process function for each of them. It self-
 // terminates once the queue is shut down.
-func (p *csiProvisioner) processQueueItems(queue workqueue.RateLimitingInterface, process func(key string) error) {
+func (p *csiProvisionerServer) processQueueItems(queue workqueue.RateLimitingInterface, process func(key string) error) {
 	for {
 		obj, shutdown := queue.Get()
 		if shutdown {
@@ -190,25 +189,14 @@
 	}
 }
 
-// getVolumePath gets the path where the volume is stored or an error if the storage manager doesn't
-// have the volume available
-func (p *csiProvisioner) getVolumePath(volumeID string) (string, error) {
-	return p.storageManager.GetPathInPlace(storage.PlaceData, filepath.Join(volumeDir, volumeID))
-}
-
-// ensureVolumePath ensures that the top-level volume directory is created. It fails if the storage manager doesn't
-// have the volume available.
-func (p *csiProvisioner) ensureVolumePath() error {
-	path, err := p.storageManager.GetPathInPlace(storage.PlaceData, volumeDir)
-	if err != nil {
-		return err
-	}
-	return os.MkdirAll(path, 0640)
+// volumePath gets the path where the volume is stored.
+func (p *csiProvisionerServer) volumePath(volumeID string) string {
+	return filepath.Join(p.VolumesDirectory.FullPath(), volumeID)
 }
 
 // processPVC looks at a single PVC item from the queue, determines if it needs to be provisioned and logs the
 // provisioning result to the recorder
-func (p *csiProvisioner) processPVC(key string) error {
+func (p *csiProvisionerServer) processPVC(key string) error {
 	namespace, name, err := cache.SplitMetaNamespaceKey(key)
 	if err != nil {
 		return fmt.Errorf("invalid resource key: %s", key)
@@ -234,7 +222,7 @@
 		return fmt.Errorf("")
 	}
 
-	if storageClass.Provisioner != csiProvisionerName {
+	if storageClass.Provisioner != csiProvisionerServerName {
 		// We're not responsible for this PVC. Can only happen if controller-manager makes a mistake
 		// setting the annotations, but we're bailing here anyways for safety.
 		return nil
@@ -253,7 +241,7 @@
 
 // provisionPVC creates the directory where the volume lives, sets a quota for the requested amount of storage and
 // creates the PV object representing this new volume
-func (p *csiProvisioner) provisionPVC(pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
+func (p *csiProvisionerServer) provisionPVC(pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
 	claimRef, err := ref.GetReference(scheme.Scheme, pvc)
 	if err != nil {
 		return fmt.Errorf("failed to get reference to PVC: %w", err)
@@ -273,14 +261,7 @@
 	}
 
 	volumeID := "pvc-" + string(pvc.ObjectMeta.UID)
-	volumePath, err := p.getVolumePath(volumeID)
-	if err != nil {
-		return fmt.Errorf("unable to access volumes: %w", err)
-	}
-
-	if err := p.ensureVolumePath(); err != nil {
-		return fmt.Errorf("failed to create volume location: %w", err)
-	}
+	volumePath := p.volumePath(volumeID)
 
 	p.logger.Info("Creating local PV", zap.String("volume-id", volumeID))
 	if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
@@ -301,7 +282,7 @@
 		ObjectMeta: metav1.ObjectMeta{
 			Name: volumeID,
 			Annotations: map[string]string{
-				"pv.kubernetes.io/provisioned-by": csiProvisionerName},
+				"pv.kubernetes.io/provisioned-by": csiProvisionerServerName},
 		},
 		Spec: v1.PersistentVolumeSpec{
 			AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
@@ -310,7 +291,7 @@
 			},
 			PersistentVolumeSource: v1.PersistentVolumeSource{
 				CSI: &v1.CSIPersistentVolumeSource{
-					Driver:       csiProvisionerName,
+					Driver:       csiProvisionerServerName,
 					VolumeHandle: volumeID,
 				},
 			},
@@ -323,7 +304,7 @@
 								{
 									Key:      "kubernetes.io/hostname",
 									Operator: v1.NodeSelectorOpIn,
-									Values:   []string{p.nodeName},
+									Values:   []string{p.NodeName},
 								},
 							},
 						},
@@ -335,7 +316,8 @@
 		},
 	}
 
-	if _, err = p.kubeclientset.CoreV1().PersistentVolumes().Create(context.Background(), vol, metav1.CreateOptions{}); !apierrs.IsAlreadyExists(err) && err != nil {
+	_, err = p.Kubernetes.CoreV1().PersistentVolumes().Create(context.Background(), vol, metav1.CreateOptions{})
+	if err != nil && !apierrs.IsAlreadyExists(err) {
 		return fmt.Errorf("failed to create PV object: %w", err)
 	}
 	return nil
@@ -343,7 +325,7 @@
 
 // processPV looks at a single PV item from the queue and checks if it has been released and needs to be deleted. If yes
 // it deletes the associated quota, directory and the PV object and logs the result to the recorder.
-func (p *csiProvisioner) processPV(key string) error {
+func (p *csiProvisionerServer) processPV(key string) error {
 	_, name, err := cache.SplitMetaNamespaceKey(key)
 	if err != nil {
 		return fmt.Errorf("invalid resource key: %s", key)
@@ -361,7 +343,7 @@
 	if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != "Released" {
 		return nil
 	}
-	volumePath, err := p.getVolumePath(pv.Spec.CSI.VolumeHandle)
+	volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
 
 	// Log deletes for auditing purposes
 	p.logger.Info("Deleting persistent volume", zap.String("name", pv.Spec.CSI.VolumeHandle))
@@ -378,7 +360,7 @@
 		return fmt.Errorf("failed to delete volume: %w", err)
 	}
 
-	err = p.kubeclientset.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})
+	err = p.Kubernetes.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})
 	if err != nil && !apierrs.IsNotFound(err) {
 		p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete PV object from K8s API: %v", err)
 		return fmt.Errorf("failed to delete PV object: %w", err)