core/internal: move containerd and kubernetes to localstorage
This moves the last users of the old 'storage' library onto 'localstorage'. We move a lot of 'runtime' directories to a single `/ephemeral` root. This could be called `/run`, but that might imply FHS compliance - which we don't have, nor want to have.
We also slightly refactor Kubernetes services to be a bit nicer to spawn. But generally, this is a pure refactor, with no functional changes.
Test Plan: this should fail. part of a larger stack. D590 is the first tip of the stack that should work.
X-Origin-Diff: phab/D589
GitOrigin-RevId: d2a7c0bb52c2a7c753199221c609e03474936c22
diff --git a/core/internal/kubernetes/provisioner.go b/core/internal/kubernetes/provisioner.go
index c864715..0e9e419 100644
--- a/core/internal/kubernetes/provisioner.go
+++ b/core/internal/kubernetes/provisioner.go
@@ -25,11 +25,6 @@
"path/filepath"
"go.uber.org/zap"
-
- "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
- "git.monogon.dev/source/nexantic.git/core/internal/storage"
- "git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
-
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
@@ -44,25 +39,31 @@
"k8s.io/client-go/tools/record"
ref "k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
+ "git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
)
-// ONCHANGE(//core/internal/kubernetes/reconciler:resources_csi.go): needs to match csiProvisionerName declared.
-const csiProvisionerName = "com.nexantic.smalltown.vfs"
+// ONCHANGE(//core/internal/kubernetes/reconciler:resources_csi.go): needs to match csiProvisionerServerName declared.
+const csiProvisionerServerName = "com.nexantic.smalltown.vfs"
-// csiProvisioner is responsible for the provisioning and deprovisioning of CSI-based container volumes. It runs on all
+// csiProvisionerServer is responsible for the provisioning and deprovisioning of CSI-based container volumes. It runs on all
// nodes and watches PVCs for ones assigned to the node it's running on and fulfills the provisioning request by
// creating a directory, applying a quota and creating the corresponding PV. When the PV is released and its retention
// policy is Delete, the directory and the PV resource are deleted.
-type csiProvisioner struct {
- nodeName string
- kubeclientset kubernetes.Interface
+type csiProvisionerServer struct {
+ NodeName string
+ Kubernetes kubernetes.Interface
+ InformerFactory informers.SharedInformerFactory
+ VolumesDirectory *localstorage.DataVolumesDirectory
+
claimQueue workqueue.RateLimitingInterface
pvQueue workqueue.RateLimitingInterface
recorder record.EventRecorder
pvcInformer coreinformers.PersistentVolumeClaimInformer
pvInformer coreinformers.PersistentVolumeInformer
storageClassInformer storageinformers.StorageClassInformer
- storageManager *storage.Manager
logger *zap.Logger
}
@@ -70,79 +71,77 @@
// the events happening on the Kubernetes control plane and informs us when something happens. If anything happens to
// PVCs or PVs, we enqueue the identifier of that resource in a work queue. Queues are being worked on by only one
// worker to limit load and avoid complicated locking infrastructure. Failed items are requeued.
-func runCSIProvisioner(storMan *storage.Manager, kubeclientset kubernetes.Interface, informerFactory informers.SharedInformerFactory) supervisor.Runnable {
- return func(ctx context.Context) error {
- nodeName, err := os.Hostname()
- if err != nil {
- panic(err)
- }
+func (p *csiProvisionerServer) Run(ctx context.Context) error {
+ // The recorder is used to log Kubernetes events for successful or failed volume provisions. These events then
+ // show up in `kubectl describe pvc` and can be used by admins to debug issues with this provisioner.
+ eventBroadcaster := record.NewBroadcaster()
+ eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.Kubernetes.CoreV1().Events("")})
+ p.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerServerName, Host: p.NodeName})
- // The recorder is used to log Kubernetes events for successful or failed volume provisions. These events then
- // show up in `kubectl describe pvc` and can be used by admins to debug issues with this provisioner.
- eventBroadcaster := record.NewBroadcaster()
- eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
- recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerName, Host: nodeName})
+ p.pvInformer = p.InformerFactory.Core().V1().PersistentVolumes()
+ p.pvcInformer = p.InformerFactory.Core().V1().PersistentVolumeClaims()
+ p.storageClassInformer = p.InformerFactory.Storage().V1().StorageClasses()
- p := &csiProvisioner{
- nodeName: nodeName,
- recorder: recorder,
- kubeclientset: kubeclientset,
- pvInformer: informerFactory.Core().V1().PersistentVolumes(),
- pvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
- storageClassInformer: informerFactory.Storage().V1().StorageClasses(),
- claimQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
- pvQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
- storageManager: storMan,
- logger: supervisor.Logger(ctx),
- }
+ p.claimQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+ p.pvQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
- p.pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: p.enqueueClaim,
- UpdateFunc: func(old, new interface{}) {
- p.enqueueClaim(new)
- },
- })
- p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: p.enqueuePV,
- UpdateFunc: func(old, new interface{}) {
- p.enqueuePV(new)
- },
- })
+ p.pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: p.enqueueClaim,
+ UpdateFunc: func(old, new interface{}) {
+ p.enqueueClaim(new)
+ },
+ })
+ p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: p.enqueuePV,
+ UpdateFunc: func(old, new interface{}) {
+ p.enqueuePV(new)
+ },
+ })
+ p.logger = supervisor.Logger(ctx)
- go p.pvcInformer.Informer().Run(ctx.Done())
- go p.pvInformer.Informer().Run(ctx.Done())
- go p.storageClassInformer.Informer().Run(ctx.Done())
+ go p.pvcInformer.Informer().Run(ctx.Done())
+ go p.pvInformer.Informer().Run(ctx.Done())
+ go p.storageClassInformer.Informer().Run(ctx.Done())
- // These will self-terminate once the queues are shut down
- go p.processQueueItems(p.claimQueue, func(key string) error {
- return p.processPVC(key)
- })
- go p.processQueueItems(p.pvQueue, func(key string) error {
- return p.processPV(key)
- })
+ // These will self-terminate once the queues are shut down
+ go p.processQueueItems(p.claimQueue, func(key string) error {
+ return p.processPVC(key)
+ })
+ go p.processQueueItems(p.pvQueue, func(key string) error {
+ return p.processPV(key)
+ })
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- <-ctx.Done()
- p.claimQueue.ShutDown()
- p.pvQueue.ShutDown()
- return nil
- }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ p.claimQueue.ShutDown()
+ p.pvQueue.ShutDown()
+ return nil
}
// isOurPVC checks if the given PVC is is to be provisioned by this provisioner and has been scheduled onto this node
-func (p *csiProvisioner) isOurPVC(pvc *v1.PersistentVolumeClaim) bool {
- return pvc.ObjectMeta.Annotations["volume.beta.kubernetes.io/storage-provisioner"] == csiProvisionerName &&
- (pvc.ObjectMeta.Annotations["volume.kubernetes.io/selected-node"] == p.nodeName)
+func (p *csiProvisionerServer) isOurPVC(pvc *v1.PersistentVolumeClaim) bool {
+ if pvc.ObjectMeta.Annotations["volume.beta.kubernetes.io/storage-provisioner"] != csiProvisionerServerName {
+ return false
+ }
+ if pvc.ObjectMeta.Annotations["volume.kubernetes.io/selected-node"] != p.NodeName {
+ return false
+ }
+ return true
}
// isOurPV checks if the given PV has been provisioned by this provisioner and has been scheduled onto this node
-func (p *csiProvisioner) isOurPV(pv *v1.PersistentVolume) bool {
- return pv.ObjectMeta.Annotations["pv.kubernetes.io/provisioned-by"] == csiProvisionerName &&
- pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] == p.nodeName
+func (p *csiProvisionerServer) isOurPV(pv *v1.PersistentVolume) bool {
+ if pv.ObjectMeta.Annotations["pv.kubernetes.io/provisioned-by"] != csiProvisionerServerName {
+ return false
+ }
+ if pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] != p.NodeName {
+ return false
+ }
+ return true
}
// enqueueClaim adds an added/changed PVC to the work queue
-func (p *csiProvisioner) enqueueClaim(obj interface{}) {
+func (p *csiProvisionerServer) enqueueClaim(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
p.logger.Error("Not queuing PVC because key could not be derived", zap.Error(err))
@@ -152,7 +151,7 @@
}
// enqueuePV adds an added/changed PV to the work queue
-func (p *csiProvisioner) enqueuePV(obj interface{}) {
+func (p *csiProvisionerServer) enqueuePV(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
p.logger.Error("Not queuing PV because key could not be derived", zap.Error(err))
@@ -163,7 +162,7 @@
// processQueueItems gets items from the given work queue and calls the process function for each of them. It self-
// terminates once the queue is shut down.
-func (p *csiProvisioner) processQueueItems(queue workqueue.RateLimitingInterface, process func(key string) error) {
+func (p *csiProvisionerServer) processQueueItems(queue workqueue.RateLimitingInterface, process func(key string) error) {
for {
obj, shutdown := queue.Get()
if shutdown {
@@ -190,25 +189,14 @@
}
}
-// getVolumePath gets the path where the volume is stored or an error if the storage manager doesn't
-// have the volume available
-func (p *csiProvisioner) getVolumePath(volumeID string) (string, error) {
- return p.storageManager.GetPathInPlace(storage.PlaceData, filepath.Join(volumeDir, volumeID))
-}
-
-// ensureVolumePath ensures that the top-level volume directory is created. It fails if the storage manager doesn't
-// have the volume available.
-func (p *csiProvisioner) ensureVolumePath() error {
- path, err := p.storageManager.GetPathInPlace(storage.PlaceData, volumeDir)
- if err != nil {
- return err
- }
- return os.MkdirAll(path, 0640)
+// volumePath gets the path where the volume is stored.
+func (p *csiProvisionerServer) volumePath(volumeID string) string {
+ return filepath.Join(p.VolumesDirectory.FullPath(), volumeID)
}
// processPVC looks at a single PVC item from the queue, determines if it needs to be provisioned and logs the
// provisioning result to the recorder
-func (p *csiProvisioner) processPVC(key string) error {
+func (p *csiProvisionerServer) processPVC(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("invalid resource key: %s", key)
@@ -234,7 +222,7 @@
return fmt.Errorf("")
}
- if storageClass.Provisioner != csiProvisionerName {
+ if storageClass.Provisioner != csiProvisionerServerName {
// We're not responsible for this PVC. Can only happen if controller-manager makes a mistake
// setting the annotations, but we're bailing here anyways for safety.
return nil
@@ -253,7 +241,7 @@
// provisionPVC creates the directory where the volume lives, sets a quota for the requested amount of storage and
// creates the PV object representing this new volume
-func (p *csiProvisioner) provisionPVC(pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
+func (p *csiProvisionerServer) provisionPVC(pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
claimRef, err := ref.GetReference(scheme.Scheme, pvc)
if err != nil {
return fmt.Errorf("failed to get reference to PVC: %w", err)
@@ -273,14 +261,7 @@
}
volumeID := "pvc-" + string(pvc.ObjectMeta.UID)
- volumePath, err := p.getVolumePath(volumeID)
- if err != nil {
- return fmt.Errorf("unable to access volumes: %w", err)
- }
-
- if err := p.ensureVolumePath(); err != nil {
- return fmt.Errorf("failed to create volume location: %w", err)
- }
+ volumePath := p.volumePath(volumeID)
p.logger.Info("Creating local PV", zap.String("volume-id", volumeID))
if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
@@ -301,7 +282,7 @@
ObjectMeta: metav1.ObjectMeta{
Name: volumeID,
Annotations: map[string]string{
- "pv.kubernetes.io/provisioned-by": csiProvisionerName},
+ "pv.kubernetes.io/provisioned-by": csiProvisionerServerName},
},
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
@@ -310,7 +291,7 @@
},
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
- Driver: csiProvisionerName,
+ Driver: csiProvisionerServerName,
VolumeHandle: volumeID,
},
},
@@ -323,7 +304,7 @@
{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
- Values: []string{p.nodeName},
+ Values: []string{p.NodeName},
},
},
},
@@ -335,7 +316,8 @@
},
}
- if _, err = p.kubeclientset.CoreV1().PersistentVolumes().Create(context.Background(), vol, metav1.CreateOptions{}); !apierrs.IsAlreadyExists(err) && err != nil {
+ _, err = p.Kubernetes.CoreV1().PersistentVolumes().Create(context.Background(), vol, metav1.CreateOptions{})
+ if err != nil && !apierrs.IsAlreadyExists(err) {
return fmt.Errorf("failed to create PV object: %w", err)
}
return nil
@@ -343,7 +325,7 @@
// processPV looks at a single PV item from the queue and checks if it has been released and needs to be deleted. If yes
// it deletes the associated quota, directory and the PV object and logs the result to the recorder.
-func (p *csiProvisioner) processPV(key string) error {
+func (p *csiProvisionerServer) processPV(key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("invalid resource key: %s", key)
@@ -361,7 +343,7 @@
if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != "Released" {
return nil
}
- volumePath, err := p.getVolumePath(pv.Spec.CSI.VolumeHandle)
+ volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
// Log deletes for auditing purposes
p.logger.Info("Deleting persistent volume", zap.String("name", pv.Spec.CSI.VolumeHandle))
@@ -378,7 +360,7 @@
return fmt.Errorf("failed to delete volume: %w", err)
}
- err = p.kubeclientset.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})
+ err = p.Kubernetes.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})
if err != nil && !apierrs.IsNotFound(err) {
p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete PV object from K8s API: %v", err)
return fmt.Errorf("failed to delete PV object: %w", err)