m/node/kubernetes: implement storage resizing

This implements persistent volume resizing in the storage provisioner.
The logic is based on https://github.com/kubernetes-csi/external-resizer

The mutation caches are an optimization to prevent unnecessary repeated
processing, because they make the controller remember changes that it
has made itself, when the watch events for those changes have not
arrived yet.

The controller supports the RecoverVolumeExpansionFailure feature, which
allows reducing the requested size when the previous resize fails due to
insufficient space. When resize fails, it is retried with backoff.

Change-Id: I0f3d40c1a592b30d25739f5d20b529dfe25dfbe1
Reviewed-on: https://review.monogon.dev/c/monogon/+/4008
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/kubernetes/provisioner.go b/metropolis/node/kubernetes/provisioner.go
index 92caa59..cd2ed45 100644
--- a/metropolis/node/kubernetes/provisioner.go
+++ b/metropolis/node/kubernetes/provisioner.go
@@ -5,16 +5,24 @@
 
 import (
 	"context"
+	"encoding/json"
 	"errors"
 	"fmt"
+	"math"
 	"os"
 	"path/filepath"
+	"sync"
+	"time"
 
 	"golang.org/x/sys/unix"
 	v1 "k8s.io/api/core/v1"
 	storagev1 "k8s.io/api/storage/v1"
 	apierrs "k8s.io/apimachinery/pkg/api/errors"
+	"k8s.io/apimachinery/pkg/api/resource"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+	"k8s.io/apimachinery/pkg/types"
+	"k8s.io/apimachinery/pkg/util/strategicpatch"
 	"k8s.io/client-go/informers"
 	coreinformers "k8s.io/client-go/informers/core/v1"
 	storageinformers "k8s.io/client-go/informers/storage/v1"
@@ -25,6 +33,7 @@
 	"k8s.io/client-go/tools/record"
 	ref "k8s.io/client-go/tools/reference"
 	"k8s.io/client-go/util/workqueue"
+	"k8s.io/utils/ptr"
 
 	"source.monogon.dev/go/logging"
 	"source.monogon.dev/metropolis/node/core/localstorage"
@@ -53,13 +62,20 @@
 	InformerFactory  informers.SharedInformerFactory
 	VolumesDirectory *localstorage.DataVolumesDirectory
 
-	claimQueue           workqueue.TypedRateLimitingInterface[string]
+	claimQueue           workqueue.TypedDelayingInterface[string]
+	claimRateLimiter     workqueue.TypedRateLimiter[string]
+	claimNextTry         map[string]time.Time
 	pvQueue              workqueue.TypedRateLimitingInterface[string]
 	recorder             record.EventRecorder
 	pvcInformer          coreinformers.PersistentVolumeClaimInformer
 	pvInformer           coreinformers.PersistentVolumeInformer
 	storageClassInformer storageinformers.StorageClassInformer
-	logger               logging.Leveled
+	pvcMutationCache     cache.MutationCache
+	pvMutationCache      cache.MutationCache
+	// processMutex ensures that the two workers (one for PVCs and one for PVs)
+	// are not doing work concurrently.
+	processMutex sync.Mutex
+	logger       logging.Leveled
 }
 
 // runCSIProvisioner runs the main provisioning machinery. It consists of a
@@ -76,11 +92,15 @@
 	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.Kubernetes.CoreV1().Events("")})
 	p.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerServerName, Host: p.NodeName})
 
-	p.pvInformer = p.InformerFactory.Core().V1().PersistentVolumes()
 	p.pvcInformer = p.InformerFactory.Core().V1().PersistentVolumeClaims()
+	p.pvInformer = p.InformerFactory.Core().V1().PersistentVolumes()
 	p.storageClassInformer = p.InformerFactory.Storage().V1().StorageClasses()
+	p.pvcMutationCache = cache.NewIntegerResourceVersionMutationCache(p.pvcInformer.Informer().GetStore(), nil, time.Minute, false)
+	p.pvMutationCache = cache.NewIntegerResourceVersionMutationCache(p.pvInformer.Informer().GetStore(), nil, time.Minute, false)
 
-	p.claimQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
+	p.claimQueue = workqueue.NewTypedDelayingQueue[string]()
+	p.claimRateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, 5*time.Minute)
+	p.claimNextTry = make(map[string]time.Time)
 	p.pvQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
 
 	p.logger = supervisor.Logger(ctx)
@@ -93,6 +113,10 @@
 		UpdateFunc: func(old, new interface{}) {
 			p.enqueueClaim(new)
 		},
+		// We need to handle deletes to ensure that deleted keys are removed from
+		// the rate limiter, because there are cases where we leave a key in the
+		// rate limiter without scheduling a retry.
+		DeleteFunc: p.enqueueClaim,
 	})
 	p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 		AddFunc: p.enqueuePV,
@@ -113,11 +137,11 @@
 	go p.storageClassInformer.Informer().Run(ctx.Done())
 
 	// These will self-terminate once the queues are shut down
-	go p.processQueueItems(p.claimQueue, func(key string) error {
-		return p.processPVC(key)
+	go p.processQueueItems(p.claimQueue, func(key string) {
+		p.processPVCRetryWrapper(ctx, key)
 	})
-	go p.processQueueItems(p.pvQueue, func(key string) error {
-		return p.processPV(key)
+	go p.processQueueItems(p.pvQueue, func(key string) {
+		p.processPVRetryWrapper(ctx, key)
 	})
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
@@ -142,7 +166,7 @@
 // isOurPV checks if the given PV has been provisioned by this provisioner and
 // has been scheduled onto this node
 func (p *csiProvisionerServer) isOurPV(pv *v1.PersistentVolume) bool {
-	if pv.ObjectMeta.Annotations["pv.kubernetes.io/provisioned-by"] != csiProvisionerServerName {
+	if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != csiProvisionerServerName {
 		return false
 	}
 	if pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] != p.NodeName {
@@ -153,7 +177,7 @@
 
 // enqueueClaim adds an added/changed PVC to the work queue
 func (p *csiProvisionerServer) enqueueClaim(obj interface{}) {
-	key, err := cache.MetaNamespaceKeyFunc(obj)
+	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
 	if err != nil {
 		p.logger.Errorf("Not queuing PVC because key could not be derived: %v", err)
 		return
@@ -173,7 +197,7 @@
 
 // processQueueItems gets items from the given work queue and calls the process
 // function for each of them. It self- terminates once the queue is shut down.
-func (p *csiProvisionerServer) processQueueItems(queue workqueue.TypedRateLimitingInterface[string], process func(key string) error) {
+func (p *csiProvisionerServer) processQueueItems(queue workqueue.TypedInterface[string], process func(key string)) {
 	for {
 		obj, shutdown := queue.Get()
 		if shutdown {
@@ -183,16 +207,40 @@
 		func(obj string) {
 			defer queue.Done(obj)
 
-			if err := process(obj); err != nil {
-				p.logger.Warningf("Failed processing item %q, requeueing (numrequeues: %d): %v", obj, queue.NumRequeues(obj), err)
-				queue.AddRateLimited(obj)
-			} else {
-				queue.Forget(obj)
-			}
+			p.processMutex.Lock()
+			defer p.processMutex.Unlock()
+
+			process(obj)
 		}(obj)
 	}
 }
 
+var errSkipRateLimitReset = errors.New("skip ratelimit reset")
+
+func (p *csiProvisionerServer) processPVCRetryWrapper(ctx context.Context, key string) {
+	err := p.processPVC(ctx, key)
+	if errors.Is(err, errSkipRateLimitReset) {
+		// ignore
+	} else if err != nil {
+		p.logger.Warningf("Failed processing PVC %s, requeueing (numrequeues: %d): %v", key, p.claimRateLimiter.NumRequeues(key), err)
+		duration := p.claimRateLimiter.When(key)
+		p.claimNextTry[key] = time.Now().Add(duration)
+		p.claimQueue.AddAfter(key, duration)
+	} else {
+		p.claimRateLimiter.Forget(key)
+		delete(p.claimNextTry, key)
+	}
+}
+
+func (p *csiProvisionerServer) processPVRetryWrapper(ctx context.Context, key string) {
+	if err := p.processPV(ctx, key); err != nil {
+		p.logger.Warningf("Failed processing PV %s, requeueing (numrequeues: %d): %v", key, p.pvQueue.NumRequeues(key), err)
+		p.pvQueue.AddRateLimited(key)
+	} else {
+		p.pvQueue.Forget(key)
+	}
+}
+
 // volumePath gets the path where the volume is stored.
 func (p *csiProvisionerServer) volumePath(volumeID string) string {
 	return filepath.Join(p.VolumesDirectory.FullPath(), volumeID)
@@ -200,75 +248,112 @@
 
 // processPVC looks at a single PVC item from the queue, determines if it needs
 // to be provisioned and logs the provisioning result to the recorder
-func (p *csiProvisionerServer) processPVC(key string) error {
-	namespace, name, err := cache.SplitMetaNamespaceKey(key)
+func (p *csiProvisionerServer) processPVC(ctx context.Context, key string) error {
+	val, exists, err := p.pvcMutationCache.GetByKey(key)
 	if err != nil {
-		return fmt.Errorf("invalid resource key: %s", key)
-	}
-	pvc, err := p.pvcInformer.Lister().PersistentVolumeClaims(namespace).Get(name)
-	if apierrs.IsNotFound(err) {
-		return nil // nothing to do, no error
-	} else if err != nil {
 		return fmt.Errorf("failed to get PVC for processing: %w", err)
 	}
+	if !exists {
+		return nil // nothing to do, no error
+	}
+	pvc, ok := val.(*v1.PersistentVolumeClaim)
+	if !ok {
+		return fmt.Errorf("value in MutationCache is not a PVC: %+v", val)
+	}
 
 	if !p.isOurPVC(pvc) {
 		return nil
 	}
 
-	if pvc.Status.Phase != "Pending" {
-		// If the PVC is not pending, we don't need to provision anything
-		return nil
+	if pvc.Spec.VolumeName == "" {
+		// The claim is pending, so we may need to provision it.
+		storageClass, err := p.storageClassInformer.Lister().Get(*pvc.Spec.StorageClassName)
+		if err != nil {
+			return fmt.Errorf("could not get storage class: %w", err)
+		}
+
+		if storageClass.Provisioner != csiProvisionerServerName {
+			// We're not responsible for this PVC. Can only happen if
+			// controller-manager makes a mistake setting the annotations, but
+			// we're bailing here anyways for safety.
+			return nil
+		}
+
+		err = p.provisionPVC(ctx, pvc, storageClass)
+
+		if err != nil {
+			p.recorder.Eventf(pvc, v1.EventTypeWarning, "ProvisioningFailed", "Failed to provision PV: %v", err)
+			return err
+		}
+	} else if pvc.Status.Phase == v1.ClaimBound {
+		// The claim is bound, so we may need to resize it.
+		requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
+		statusSize := pvc.Status.Capacity[v1.ResourceStorage]
+		if requestSize.Cmp(statusSize) <= 0 {
+			// No resize needed.
+			return nil
+		}
+
+		val, exists, err := p.pvMutationCache.GetByKey(pvc.Spec.VolumeName)
+		if err != nil {
+			return fmt.Errorf("failed to get PV of PVC %s: %w", key, err)
+		}
+		if !exists {
+			return nil
+		}
+		pv, ok := val.(*v1.PersistentVolume)
+		if !ok {
+			return fmt.Errorf("value in MutationCache is not a PV: %+v", val)
+		}
+		if pv.Status.Phase != v1.VolumeBound || pv.Spec.ClaimRef == nil || pv.Spec.ClaimRef.UID != pvc.UID {
+			return nil
+		}
+		if !p.isOurPV(pv) {
+			return nil
+		}
+
+		err = p.processResize(ctx, pvc, pv)
+		if errors.Is(err, errSkipRateLimitReset) {
+			return err
+		} else if err != nil {
+			p.recorder.Eventf(pvc, v1.EventTypeWarning, "VolumeResizeFailed", "Failed to resize PV: %v", err)
+			return fmt.Errorf("failed to process resize of PVC %s: %w", key, err)
+		}
 	}
-
-	storageClass, err := p.storageClassInformer.Lister().Get(*pvc.Spec.StorageClassName)
-	if err != nil {
-		return fmt.Errorf("could not get storage class: %w", err)
-	}
-
-	if storageClass.Provisioner != csiProvisionerServerName {
-		// We're not responsible for this PVC. Can only happen if
-		// controller-manager makes a mistake setting the annotations, but
-		// we're bailing here anyways for safety.
-		return nil
-	}
-
-	err = p.provisionPVC(pvc, storageClass)
-
-	if err != nil {
-		p.recorder.Eventf(pvc, v1.EventTypeWarning, "ProvisioningFailed", "Failed to provision PV: %v", err)
-		return err
-	}
-	p.recorder.Eventf(pvc, v1.EventTypeNormal, "Provisioned", "Successfully provisioned PV")
-
 	return nil
 }
 
 // provisionPVC creates the directory where the volume lives, sets a quota for
 // the requested amount of storage and creates the PV object representing this
 // new volume
-func (p *csiProvisionerServer) provisionPVC(pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
+func (p *csiProvisionerServer) provisionPVC(ctx context.Context, pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
+	key := cache.MetaObjectToName(pvc).String()
 	claimRef, err := ref.GetReference(scheme.Scheme, pvc)
 	if err != nil {
 		return fmt.Errorf("failed to get reference to PVC: %w", err)
 	}
 
 	storageReq := pvc.Spec.Resources.Requests[v1.ResourceStorage]
-	if storageReq.IsZero() {
-		return fmt.Errorf("PVC is not requesting any storage, this is not supported")
+	capacity, err := quantityToBytes(storageReq)
+	if err != nil {
+		return err
 	}
-	capacity, ok := storageReq.AsInt64()
-	if !ok {
-		return fmt.Errorf("PVC requesting more than 2^63 bytes of storage, this is not supported")
-	}
+	newSize := *resource.NewQuantity(capacity, resource.BinarySI)
 
 	volumeID := "pvc-" + string(pvc.ObjectMeta.UID)
+	if _, err := p.pvInformer.Lister().Get(volumeID); err == nil {
+		return nil // Volume already exists.
+	}
 	volumePath := p.volumePath(volumeID)
+	volumeMode := ptr.Deref(pvc.Spec.VolumeMode, "")
+	if volumeMode == "" {
+		volumeMode = v1.PersistentVolumeFilesystem
+	}
 
-	p.logger.Infof("Creating local PV %s", volumeID)
+	p.logger.Infof("Creating persistent volume %s with mode %s and size %s for claim %s", volumeID, volumeMode, newSize.String(), key)
 
-	switch *pvc.Spec.VolumeMode {
-	case "", v1.PersistentVolumeFilesystem:
+	switch volumeMode {
+	case v1.PersistentVolumeFilesystem:
 		if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
 			return fmt.Errorf("failed to create volume directory: %w", err)
 		}
@@ -288,23 +373,24 @@
 			return fmt.Errorf("failed to create volume image: %w", err)
 		}
 		defer imageFile.Close()
-		if err := unix.Fallocate(int(imageFile.Fd()), 0, 0, capacity); err != nil {
-			return fmt.Errorf("failed to fallocate() volume image: %w", err)
+		if err := allocateBlockVolume(imageFile, capacity); err != nil {
+			return fmt.Errorf("failed to allocate volume image: %w", err)
 		}
 	default:
-		return fmt.Errorf("VolumeMode \"%s\" is unsupported", *pvc.Spec.VolumeMode)
+		return fmt.Errorf("VolumeMode %q is unsupported", *pvc.Spec.VolumeMode)
 	}
 
 	vol := &v1.PersistentVolume{
 		ObjectMeta: metav1.ObjectMeta{
 			Name: volumeID,
 			Annotations: map[string]string{
-				"pv.kubernetes.io/provisioned-by": csiProvisionerServerName},
+				"pv.kubernetes.io/provisioned-by": csiProvisionerServerName,
+			},
 		},
 		Spec: v1.PersistentVolumeSpec{
 			AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
 			Capacity: v1.ResourceList{
-				v1.ResourceStorage: storageReq, // We're always giving the exact amount
+				v1.ResourceStorage: newSize,
 			},
 			PersistentVolumeSource: v1.PersistentVolumeSource{
 				CSI: &v1.CSIPersistentVolumeSource{
@@ -334,17 +420,180 @@
 		},
 	}
 
-	_, err = p.Kubernetes.CoreV1().PersistentVolumes().Create(context.Background(), vol, metav1.CreateOptions{})
+	_, err = p.Kubernetes.CoreV1().PersistentVolumes().Create(ctx, vol, metav1.CreateOptions{})
 	if err != nil && !apierrs.IsAlreadyExists(err) {
 		return fmt.Errorf("failed to create PV object: %w", err)
 	}
 	return nil
 }
 
+// See https://github.com/kubernetes-csi/external-resizer/blob/master/pkg/controller/expand_and_recover.go
+func (p *csiProvisionerServer) processResize(ctx context.Context, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
+	key := cache.MetaObjectToName(pvc).String()
+	requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
+	allocatedSize, hasAllocatedSize := pvc.Status.AllocatedResources[v1.ResourceStorage]
+	pvSize := pv.Spec.Capacity[v1.ResourceStorage]
+	resizeStatus := pvc.Status.AllocatedResourceStatuses[v1.ResourceStorage]
+
+	newSize := requestSize
+	if hasAllocatedSize {
+		// Usually, we want to keep resizing to the same target size once we have
+		// picked one and ignore changes in request size.
+		newSize = allocatedSize
+	}
+	switch resizeStatus {
+	case v1.PersistentVolumeClaimNodeResizePending,
+		v1.PersistentVolumeClaimNodeResizeInProgress:
+		// We are waiting for node resize. The PV should be large enough at this
+		// point, which means we don't need to do anything here.
+		if pvSize.Cmp(newSize) >= 0 || pvSize.Cmp(requestSize) >= 0 {
+			// We don't need to do anything and don't need to schedule a retry, but we
+			// still don't want to reset the rate limiter in case the node resize
+			// fails repeatedly.
+			return errSkipRateLimitReset
+		}
+	case "", v1.PersistentVolumeClaimControllerResizeInfeasible:
+		// In this case, there is no ongoing or partially complete resize operation,
+		// and we can be sure that the actually allocated size is equal to pvSize.
+		// That means it's safe to pick a new target size.
+		if pvSize.Cmp(requestSize) < 0 {
+			newSize = requestSize
+		}
+	}
+	capacity, err := quantityToBytes(newSize)
+	if err != nil {
+		return err
+	}
+
+	keepConditions := false
+	if hasAllocatedSize && allocatedSize.Cmp(newSize) == 0 {
+		now := time.Now()
+		if p.claimNextTry[key].After(now) {
+			// Not enough time has passed since the last attempt and the target size
+			// is still the same.
+			p.claimQueue.AddAfter(key, p.claimNextTry[key].Sub(now))
+			return errSkipRateLimitReset
+		}
+		keepConditions = true
+	}
+
+	newPVC := pvc.DeepCopy()
+	mapSet(&newPVC.Status.AllocatedResources, v1.ResourceStorage, newSize)
+	mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimControllerResizeInProgress)
+	conditions := []v1.PersistentVolumeClaimCondition{{
+		Type:               v1.PersistentVolumeClaimResizing,
+		Status:             v1.ConditionTrue,
+		LastTransitionTime: metav1.Now(),
+	}}
+	mergeResizeConditionOnPVC(newPVC, conditions, keepConditions)
+	pvc, err = p.patchPVCStatus(ctx, pvc, newPVC)
+	if err != nil {
+		return fmt.Errorf("failed to update PVC before resizing: %w", err)
+	}
+
+	expandedSize := *resource.NewQuantity(capacity, resource.BinarySI)
+	p.logger.Infof("Resizing persistent volume %s to new size %s", pv.Spec.CSI.VolumeHandle, expandedSize.String())
+	err = p.controllerExpandVolume(pv, capacity)
+	if err != nil {
+		// If the resize fails because the requested size is too large, then set
+		// status to infeasible, which allows the user to change the request to a
+		// smaller size.
+		isInfeasible := errors.Is(err, unix.ENOSPC) || errors.Is(err, unix.EDQUOT) || errors.Is(err, unix.EFBIG) || errors.Is(err, unix.EINVAL)
+		newPVC = pvc.DeepCopy()
+		if isInfeasible {
+			mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimControllerResizeInfeasible)
+		}
+		conditions = []v1.PersistentVolumeClaimCondition{{
+			Type:               v1.PersistentVolumeClaimControllerResizeError,
+			Status:             v1.ConditionTrue,
+			LastTransitionTime: metav1.Now(),
+			Message:            fmt.Sprintf("Failed to expand PV: %v", err),
+		}}
+		mergeResizeConditionOnPVC(newPVC, conditions, true)
+		_, patchErr := p.patchPVCStatus(ctx, pvc, newPVC)
+		if patchErr != nil {
+			return fmt.Errorf("failed to update PVC after resizing: %w", patchErr)
+		}
+		return fmt.Errorf("failed to expand PV: %w", err)
+	}
+
+	newPV := pv.DeepCopy()
+	newPV.Spec.Capacity[v1.ResourceStorage] = expandedSize
+	pv, err = patchPV(ctx, p.Kubernetes, pv, newPV)
+	if err != nil {
+		return fmt.Errorf("failed to update PV with new capacity: %w", err)
+	}
+	p.pvMutationCache.Mutation(pv)
+
+	newPVC = pvc.DeepCopy()
+	mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimNodeResizePending)
+	conditions = []v1.PersistentVolumeClaimCondition{{
+		Type:               v1.PersistentVolumeClaimFileSystemResizePending,
+		Status:             v1.ConditionTrue,
+		LastTransitionTime: metav1.Now(),
+	}}
+	mergeResizeConditionOnPVC(newPVC, conditions, true)
+	_, err = p.patchPVCStatus(ctx, pvc, newPVC)
+	if err != nil {
+		return fmt.Errorf("failed to update PVC after resizing: %w", err)
+	}
+
+	return nil
+}
+
+func (p *csiProvisionerServer) controllerExpandVolume(pv *v1.PersistentVolume, capacity int64) error {
+	volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
+	switch ptr.Deref(pv.Spec.VolumeMode, "") {
+	case "", v1.PersistentVolumeFilesystem:
+		if err := fsquota.SetQuota(volumePath, uint64(capacity), uint64(capacity)/inodeCapacityRatio); err != nil {
+			return fmt.Errorf("failed to update quota: %w", err)
+		}
+		return nil
+	case v1.PersistentVolumeBlock:
+		imageFile, err := os.OpenFile(volumePath, os.O_RDWR, 0)
+		if err != nil {
+			return fmt.Errorf("failed to open block volume backing file: %w", err)
+		}
+		defer imageFile.Close()
+		if err := allocateBlockVolume(imageFile, capacity); err != nil {
+			return fmt.Errorf("failed to allocate space: %w", err)
+		}
+		return nil
+	default:
+		return fmt.Errorf("VolumeMode %q is unsupported", *pv.Spec.VolumeMode)
+	}
+}
+
+func allocateBlockVolume(imageFile *os.File, capacity int64) error {
+	// On XFS, fallocate is not atomic: It allocates space in steps of around
+	// 8 GB, and does not check upfront if there is enough space to satisfy the
+	// entire allocation. As the last step, if allocation succeeded, it updates
+	// the file size. This means that fallocate can fail and leave the file size
+	// unchanged, but still allocate part of the requested capacity past EOF.
+	//
+	// To clean this up, we truncate the file to its current size, which leaves
+	// the size unchanged but removes allocated space past EOF. We also do this if
+	// fallocate succeeds, in case a previous allocation has left space past EOF
+	// and was not cleaned up.
+	allocErr := unix.Fallocate(int(imageFile.Fd()), 0, 0, capacity)
+	info, err := imageFile.Stat()
+	if err != nil {
+		return err
+	}
+	err = imageFile.Truncate(info.Size())
+	if err != nil {
+		return err
+	}
+	if allocErr != nil {
+		return fmt.Errorf("fallocate: %w", allocErr)
+	}
+	return nil
+}
+
 // processPV looks at a single PV item from the queue and checks if it has been
 // released and needs to be deleted. If yes it deletes the associated quota,
 // directory and the PV object and logs the result to the recorder.
-func (p *csiProvisionerServer) processPV(key string) error {
+func (p *csiProvisionerServer) processPV(ctx context.Context, key string) error {
 	_, name, err := cache.SplitMetaNamespaceKey(key)
 	if err != nil {
 		return fmt.Errorf("invalid resource key: %s", key)
@@ -359,22 +608,33 @@
 	if !p.isOurPV(pv) {
 		return nil
 	}
-	if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != "Released" {
+	if pv.Status.Phase == v1.VolumeBound && pv.Spec.ClaimRef != nil {
+		// Resize processing depends on both the PV and the claim. Instead of
+		// directly retrieving the claim here and calling processResize, we add it
+		// to the claimQueue. This ensures that all resize retries are handled by
+		// the claimQueue.
+		claimKey := cache.NewObjectName(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name).String()
+		p.claimQueue.Add(claimKey)
+	}
+	if pv.ObjectMeta.DeletionTimestamp != nil {
+		return nil
+	}
+	if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != v1.VolumeReleased {
 		return nil
 	}
 	volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
 
 	// Log deletes for auditing purposes
 	p.logger.Infof("Deleting persistent volume %s", pv.Spec.CSI.VolumeHandle)
-	switch *pv.Spec.VolumeMode {
+	switch ptr.Deref(pv.Spec.VolumeMode, "") {
 	case "", v1.PersistentVolumeFilesystem:
-		if err := fsquota.SetQuota(volumePath, 0, 0); err != nil {
+		if err := fsquota.SetQuota(volumePath, 0, 0); err != nil && !os.IsNotExist(err) {
 			// We record these here manually since a successful deletion
 			// removes the PV we'd be attaching them to.
 			p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to remove quota: %v", err)
 			return fmt.Errorf("failed to remove quota: %w", err)
 		}
-		if err := os.RemoveAll(volumePath); err != nil && !os.IsNotExist(err) {
+		if err := os.RemoveAll(volumePath); err != nil {
 			p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
 			return fmt.Errorf("failed to delete volume: %w", err)
 		}
@@ -388,10 +648,168 @@
 		return fmt.Errorf("invalid volume mode \"%v\"", *pv.Spec.VolumeMode)
 	}
 
-	err = p.Kubernetes.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})
+	err = p.Kubernetes.CoreV1().PersistentVolumes().Delete(ctx, pv.Name, metav1.DeleteOptions{})
 	if err != nil && !apierrs.IsNotFound(err) {
 		p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete PV object from K8s API: %v", err)
 		return fmt.Errorf("failed to delete PV object: %w", err)
 	}
 	return nil
 }
+
+// quantityToBytes returns size rounded up to an integer amount.
+// Based on Kubernetes staging/src/k8s.io/cloud-provider/volume/helpers/rounding.go
+func quantityToBytes(size resource.Quantity) (int64, error) {
+	if size.CmpInt64(math.MaxInt64) >= 0 {
+		return 0, fmt.Errorf("quantity %s is too big, overflows int64", size.String())
+	}
+	val := size.Value()
+	if val <= 0 {
+		return 0, fmt.Errorf("invalid quantity %s, must be positive", size.String())
+	}
+	return val, nil
+}
+
+// patchPVCStatus, createPVCPatch, addResourceVersion, patchPV,
+// mergeResizeConditionOnPVC are taken from Kubernetes
+// pkg/volume/util/resize_util.go under Apache 2.0 and modified.
+
+// patchPVCStatus updates a PVC using patch instead of update. Update should not
+// be used because when the client is an older version, it will drop fields
+// which it does not support. It's done this way in both kubelet and
+// external-resizer.
+func (p *csiProvisionerServer) patchPVCStatus(
+	ctx context.Context,
+	oldPVC *v1.PersistentVolumeClaim,
+	newPVC *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
+	patchBytes, err := createPVCPatch(oldPVC, newPVC, true /* addResourceVersionCheck */)
+	if err != nil {
+		return oldPVC, fmt.Errorf("failed to create PVC patch: %w", err)
+	}
+
+	updatedClaim, updateErr := p.Kubernetes.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
+		Patch(ctx, oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
+	if updateErr != nil {
+		return oldPVC, fmt.Errorf("failed to patch PVC object: %w", updateErr)
+	}
+	p.pvcMutationCache.Mutation(updatedClaim)
+	return updatedClaim, nil
+}
+
+func createPVCPatch(
+	oldPVC *v1.PersistentVolumeClaim,
+	newPVC *v1.PersistentVolumeClaim, addResourceVersionCheck bool) ([]byte, error) {
+	oldData, err := json.Marshal(oldPVC)
+	if err != nil {
+		return nil, fmt.Errorf("failed to marshal old data: %w", err)
+	}
+
+	newData, err := json.Marshal(newPVC)
+	if err != nil {
+		return nil, fmt.Errorf("failed to marshal new data: %w", err)
+	}
+
+	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPVC)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create 2 way merge patch: %w", err)
+	}
+
+	if addResourceVersionCheck {
+		patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
+		if err != nil {
+			return nil, fmt.Errorf("failed to add resource version: %w", err)
+		}
+	}
+
+	return patchBytes, nil
+}
+
+func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
+	var patchMap map[string]interface{}
+	err := json.Unmarshal(patchBytes, &patchMap)
+	if err != nil {
+		return nil, fmt.Errorf("error unmarshalling patch: %w", err)
+	}
+	u := unstructured.Unstructured{Object: patchMap}
+	u.SetResourceVersion(resourceVersion)
+	versionBytes, err := json.Marshal(patchMap)
+	if err != nil {
+		return nil, fmt.Errorf("error marshalling json patch: %w", err)
+	}
+	return versionBytes, nil
+}
+
+func patchPV(
+	ctx context.Context,
+	kubeClient kubernetes.Interface,
+	oldPV *v1.PersistentVolume,
+	newPV *v1.PersistentVolume) (*v1.PersistentVolume, error) {
+	oldData, err := json.Marshal(oldPV)
+	if err != nil {
+		return oldPV, fmt.Errorf("failed to marshal old data: %w", err)
+	}
+
+	newData, err := json.Marshal(newPV)
+	if err != nil {
+		return oldPV, fmt.Errorf("failed to marshal new data: %w", err)
+	}
+
+	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPV)
+	if err != nil {
+		return oldPV, fmt.Errorf("failed to create 2 way merge patch: %w", err)
+	}
+
+	updatedPV, err := kubeClient.CoreV1().PersistentVolumes().
+		Patch(ctx, oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
+	if err != nil {
+		return oldPV, fmt.Errorf("failed to patch PV object: %w", err)
+	}
+	return updatedPV, nil
+}
+
+var knownResizeConditions = map[v1.PersistentVolumeClaimConditionType]bool{
+	v1.PersistentVolumeClaimFileSystemResizePending: true,
+	v1.PersistentVolumeClaimResizing:                true,
+	v1.PersistentVolumeClaimControllerResizeError:   true,
+	v1.PersistentVolumeClaimNodeResizeError:         true,
+}
+
+// mergeResizeConditionOnPVC updates pvc with requested resize conditions
+// leaving other conditions untouched.
+func mergeResizeConditionOnPVC(
+	pvc *v1.PersistentVolumeClaim,
+	resizeConditions []v1.PersistentVolumeClaimCondition,
+	keepOldResizeConditions bool) {
+	resizeConditionMap := map[v1.PersistentVolumeClaimConditionType]v1.PersistentVolumeClaimCondition{}
+	for _, condition := range resizeConditions {
+		resizeConditionMap[condition.Type] = condition
+	}
+
+	var newConditions []v1.PersistentVolumeClaimCondition
+	for _, condition := range pvc.Status.Conditions {
+		// If Condition is of not resize type, we keep it.
+		if _, ok := knownResizeConditions[condition.Type]; !ok {
+			newConditions = append(newConditions, condition)
+			continue
+		}
+
+		if newCondition, ok := resizeConditionMap[condition.Type]; ok {
+			newConditions = append(newConditions, newCondition)
+			delete(resizeConditionMap, condition.Type)
+		} else if keepOldResizeConditions {
+			newConditions = append(newConditions, condition)
+		}
+	}
+
+	for _, newCondition := range resizeConditionMap {
+		newConditions = append(newConditions, newCondition)
+	}
+	pvc.Status.Conditions = newConditions
+}
+
+// mapSet is like `(*m)[key] = value` but also initializes *m if it is nil.
+func mapSet[Map ~map[K]V, K comparable, V any](m *Map, key K, value V) {
+	if *m == nil {
+		*m = make(map[K]V)
+	}
+	(*m)[key] = value
+}