diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index 8b8c1c5..d2c3065 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -50,9 +50,13 @@
         "@io_k8s_api//core/v1:core",
         "@io_k8s_api//storage/v1:storage",
         "@io_k8s_apimachinery//pkg/api/errors",
+        "@io_k8s_apimachinery//pkg/api/resource",
         "@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
+        "@io_k8s_apimachinery//pkg/apis/meta/v1/unstructured",
         "@io_k8s_apimachinery//pkg/runtime",
         "@io_k8s_apimachinery//pkg/runtime/schema",
+        "@io_k8s_apimachinery//pkg/types",
+        "@io_k8s_apimachinery//pkg/util/strategicpatch",
         "@io_k8s_apiserver//pkg/apis/apiserver",
         "@io_k8s_client_go//applyconfigurations/core/v1:core",
         "@io_k8s_client_go//informers",
diff --git a/metropolis/node/kubernetes/csi.go b/metropolis/node/kubernetes/csi.go
index fad0183..be9e3f5 100644
--- a/metropolis/node/kubernetes/csi.go
+++ b/metropolis/node/kubernetes/csi.go
@@ -188,28 +188,22 @@
 }
 
 func (s *csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
-	if req.CapacityRange.LimitBytes <= 0 {
-		return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
+	info, err := os.Stat(req.VolumePath)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "failed to stat volume: %v", err)
 	}
-	loopdev, err := loop.Open(req.VolumePath)
-	if err == nil {
-		defer loopdev.Close()
-		volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
-		imageFile, err := os.OpenFile(volumePath, os.O_RDWR, 0)
+	if info.IsDir() {
+		// Mount volume. Nothing to do here.
+	} else {
+		// Block volume.
+		loopdev, err := loop.Open(req.VolumePath)
 		if err != nil {
-			return nil, status.Errorf(codes.Unavailable, "failed to open block volume backing file: %v", err)
+			return nil, status.Errorf(codes.Unavailable, "failed to open loop device: %v", err)
 		}
-		defer imageFile.Close()
-		if err := unix.Fallocate(int(imageFile.Fd()), 0, 0, req.CapacityRange.LimitBytes); err != nil {
-			return nil, status.Errorf(codes.Unavailable, "failed to expand volume using fallocate: %v", err)
-		}
+		defer loopdev.Close()
 		if err := loopdev.RefreshSize(); err != nil {
 			return nil, status.Errorf(codes.Unavailable, "failed to refresh loop device size: %v", err)
 		}
-		return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
-	}
-	if err := fsquota.SetQuota(req.VolumePath, uint64(req.CapacityRange.LimitBytes), uint64(req.CapacityRange.LimitBytes)/inodeCapacityRatio); err != nil {
-		return nil, status.Errorf(codes.Unavailable, "failed to update quota: %v", err)
 	}
 	return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
 }
diff --git a/metropolis/node/kubernetes/provisioner.go b/metropolis/node/kubernetes/provisioner.go
index 92caa59..cd2ed45 100644
--- a/metropolis/node/kubernetes/provisioner.go
+++ b/metropolis/node/kubernetes/provisioner.go
@@ -5,16 +5,24 @@
 
 import (
 	"context"
+	"encoding/json"
 	"errors"
 	"fmt"
+	"math"
 	"os"
 	"path/filepath"
+	"sync"
+	"time"
 
 	"golang.org/x/sys/unix"
 	v1 "k8s.io/api/core/v1"
 	storagev1 "k8s.io/api/storage/v1"
 	apierrs "k8s.io/apimachinery/pkg/api/errors"
+	"k8s.io/apimachinery/pkg/api/resource"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+	"k8s.io/apimachinery/pkg/types"
+	"k8s.io/apimachinery/pkg/util/strategicpatch"
 	"k8s.io/client-go/informers"
 	coreinformers "k8s.io/client-go/informers/core/v1"
 	storageinformers "k8s.io/client-go/informers/storage/v1"
@@ -25,6 +33,7 @@
 	"k8s.io/client-go/tools/record"
 	ref "k8s.io/client-go/tools/reference"
 	"k8s.io/client-go/util/workqueue"
+	"k8s.io/utils/ptr"
 
 	"source.monogon.dev/go/logging"
 	"source.monogon.dev/metropolis/node/core/localstorage"
@@ -53,13 +62,20 @@
 	InformerFactory  informers.SharedInformerFactory
 	VolumesDirectory *localstorage.DataVolumesDirectory
 
-	claimQueue           workqueue.TypedRateLimitingInterface[string]
+	claimQueue           workqueue.TypedDelayingInterface[string]
+	claimRateLimiter     workqueue.TypedRateLimiter[string]
+	claimNextTry         map[string]time.Time
 	pvQueue              workqueue.TypedRateLimitingInterface[string]
 	recorder             record.EventRecorder
 	pvcInformer          coreinformers.PersistentVolumeClaimInformer
 	pvInformer           coreinformers.PersistentVolumeInformer
 	storageClassInformer storageinformers.StorageClassInformer
-	logger               logging.Leveled
+	pvcMutationCache     cache.MutationCache
+	pvMutationCache      cache.MutationCache
+	// processMutex ensures that the two workers (one for PVCs and one for PVs)
+	// are not doing work concurrently.
+	processMutex sync.Mutex
+	logger       logging.Leveled
 }
 
 // runCSIProvisioner runs the main provisioning machinery. It consists of a
@@ -76,11 +92,15 @@
 	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.Kubernetes.CoreV1().Events("")})
 	p.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerServerName, Host: p.NodeName})
 
-	p.pvInformer = p.InformerFactory.Core().V1().PersistentVolumes()
 	p.pvcInformer = p.InformerFactory.Core().V1().PersistentVolumeClaims()
+	p.pvInformer = p.InformerFactory.Core().V1().PersistentVolumes()
 	p.storageClassInformer = p.InformerFactory.Storage().V1().StorageClasses()
+	p.pvcMutationCache = cache.NewIntegerResourceVersionMutationCache(p.pvcInformer.Informer().GetStore(), nil, time.Minute, false)
+	p.pvMutationCache = cache.NewIntegerResourceVersionMutationCache(p.pvInformer.Informer().GetStore(), nil, time.Minute, false)
 
-	p.claimQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
+	p.claimQueue = workqueue.NewTypedDelayingQueue[string]()
+	p.claimRateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, 5*time.Minute)
+	p.claimNextTry = make(map[string]time.Time)
 	p.pvQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
 
 	p.logger = supervisor.Logger(ctx)
@@ -93,6 +113,10 @@
 		UpdateFunc: func(old, new interface{}) {
 			p.enqueueClaim(new)
 		},
+		// We need to handle deletes to ensure that deleted keys are removed from
+		// the rate limiter, because there are cases where we leave a key in the
+		// rate limiter without scheduling a retry.
+		DeleteFunc: p.enqueueClaim,
 	})
 	p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
 		AddFunc: p.enqueuePV,
@@ -113,11 +137,11 @@
 	go p.storageClassInformer.Informer().Run(ctx.Done())
 
 	// These will self-terminate once the queues are shut down
-	go p.processQueueItems(p.claimQueue, func(key string) error {
-		return p.processPVC(key)
+	go p.processQueueItems(p.claimQueue, func(key string) {
+		p.processPVCRetryWrapper(ctx, key)
 	})
-	go p.processQueueItems(p.pvQueue, func(key string) error {
-		return p.processPV(key)
+	go p.processQueueItems(p.pvQueue, func(key string) {
+		p.processPVRetryWrapper(ctx, key)
 	})
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
@@ -142,7 +166,7 @@
 // isOurPV checks if the given PV has been provisioned by this provisioner and
 // has been scheduled onto this node
 func (p *csiProvisionerServer) isOurPV(pv *v1.PersistentVolume) bool {
-	if pv.ObjectMeta.Annotations["pv.kubernetes.io/provisioned-by"] != csiProvisionerServerName {
+	if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != csiProvisionerServerName {
 		return false
 	}
 	if pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] != p.NodeName {
@@ -153,7 +177,7 @@
 
 // enqueueClaim adds an added/changed PVC to the work queue
 func (p *csiProvisionerServer) enqueueClaim(obj interface{}) {
-	key, err := cache.MetaNamespaceKeyFunc(obj)
+	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
 	if err != nil {
 		p.logger.Errorf("Not queuing PVC because key could not be derived: %v", err)
 		return
@@ -173,7 +197,7 @@
 
 // processQueueItems gets items from the given work queue and calls the process
 // function for each of them. It self- terminates once the queue is shut down.
-func (p *csiProvisionerServer) processQueueItems(queue workqueue.TypedRateLimitingInterface[string], process func(key string) error) {
+func (p *csiProvisionerServer) processQueueItems(queue workqueue.TypedInterface[string], process func(key string)) {
 	for {
 		obj, shutdown := queue.Get()
 		if shutdown {
@@ -183,16 +207,40 @@
 		func(obj string) {
 			defer queue.Done(obj)
 
-			if err := process(obj); err != nil {
-				p.logger.Warningf("Failed processing item %q, requeueing (numrequeues: %d): %v", obj, queue.NumRequeues(obj), err)
-				queue.AddRateLimited(obj)
-			} else {
-				queue.Forget(obj)
-			}
+			p.processMutex.Lock()
+			defer p.processMutex.Unlock()
+
+			process(obj)
 		}(obj)
 	}
 }
 
+var errSkipRateLimitReset = errors.New("skip ratelimit reset")
+
+func (p *csiProvisionerServer) processPVCRetryWrapper(ctx context.Context, key string) {
+	err := p.processPVC(ctx, key)
+	if errors.Is(err, errSkipRateLimitReset) {
+		// ignore
+	} else if err != nil {
+		p.logger.Warningf("Failed processing PVC %s, requeueing (numrequeues: %d): %v", key, p.claimRateLimiter.NumRequeues(key), err)
+		duration := p.claimRateLimiter.When(key)
+		p.claimNextTry[key] = time.Now().Add(duration)
+		p.claimQueue.AddAfter(key, duration)
+	} else {
+		p.claimRateLimiter.Forget(key)
+		delete(p.claimNextTry, key)
+	}
+}
+
+func (p *csiProvisionerServer) processPVRetryWrapper(ctx context.Context, key string) {
+	if err := p.processPV(ctx, key); err != nil {
+		p.logger.Warningf("Failed processing PV %s, requeueing (numrequeues: %d): %v", key, p.pvQueue.NumRequeues(key), err)
+		p.pvQueue.AddRateLimited(key)
+	} else {
+		p.pvQueue.Forget(key)
+	}
+}
+
 // volumePath gets the path where the volume is stored.
 func (p *csiProvisionerServer) volumePath(volumeID string) string {
 	return filepath.Join(p.VolumesDirectory.FullPath(), volumeID)
@@ -200,75 +248,112 @@
 
 // processPVC looks at a single PVC item from the queue, determines if it needs
 // to be provisioned and logs the provisioning result to the recorder
-func (p *csiProvisionerServer) processPVC(key string) error {
-	namespace, name, err := cache.SplitMetaNamespaceKey(key)
+func (p *csiProvisionerServer) processPVC(ctx context.Context, key string) error {
+	val, exists, err := p.pvcMutationCache.GetByKey(key)
 	if err != nil {
-		return fmt.Errorf("invalid resource key: %s", key)
-	}
-	pvc, err := p.pvcInformer.Lister().PersistentVolumeClaims(namespace).Get(name)
-	if apierrs.IsNotFound(err) {
-		return nil // nothing to do, no error
-	} else if err != nil {
 		return fmt.Errorf("failed to get PVC for processing: %w", err)
 	}
+	if !exists {
+		return nil // nothing to do, no error
+	}
+	pvc, ok := val.(*v1.PersistentVolumeClaim)
+	if !ok {
+		return fmt.Errorf("value in MutationCache is not a PVC: %+v", val)
+	}
 
 	if !p.isOurPVC(pvc) {
 		return nil
 	}
 
-	if pvc.Status.Phase != "Pending" {
-		// If the PVC is not pending, we don't need to provision anything
-		return nil
+	if pvc.Spec.VolumeName == "" {
+		// The claim is pending, so we may need to provision it.
+		storageClass, err := p.storageClassInformer.Lister().Get(*pvc.Spec.StorageClassName)
+		if err != nil {
+			return fmt.Errorf("could not get storage class: %w", err)
+		}
+
+		if storageClass.Provisioner != csiProvisionerServerName {
+			// We're not responsible for this PVC. Can only happen if
+			// controller-manager makes a mistake setting the annotations, but
+			// we're bailing here anyways for safety.
+			return nil
+		}
+
+		err = p.provisionPVC(ctx, pvc, storageClass)
+
+		if err != nil {
+			p.recorder.Eventf(pvc, v1.EventTypeWarning, "ProvisioningFailed", "Failed to provision PV: %v", err)
+			return err
+		}
+	} else if pvc.Status.Phase == v1.ClaimBound {
+		// The claim is bound, so we may need to resize it.
+		requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
+		statusSize := pvc.Status.Capacity[v1.ResourceStorage]
+		if requestSize.Cmp(statusSize) <= 0 {
+			// No resize needed.
+			return nil
+		}
+
+		val, exists, err := p.pvMutationCache.GetByKey(pvc.Spec.VolumeName)
+		if err != nil {
+			return fmt.Errorf("failed to get PV of PVC %s: %w", key, err)
+		}
+		if !exists {
+			return nil
+		}
+		pv, ok := val.(*v1.PersistentVolume)
+		if !ok {
+			return fmt.Errorf("value in MutationCache is not a PV: %+v", val)
+		}
+		if pv.Status.Phase != v1.VolumeBound || pv.Spec.ClaimRef == nil || pv.Spec.ClaimRef.UID != pvc.UID {
+			return nil
+		}
+		if !p.isOurPV(pv) {
+			return nil
+		}
+
+		err = p.processResize(ctx, pvc, pv)
+		if errors.Is(err, errSkipRateLimitReset) {
+			return err
+		} else if err != nil {
+			p.recorder.Eventf(pvc, v1.EventTypeWarning, "VolumeResizeFailed", "Failed to resize PV: %v", err)
+			return fmt.Errorf("failed to process resize of PVC %s: %w", key, err)
+		}
 	}
-
-	storageClass, err := p.storageClassInformer.Lister().Get(*pvc.Spec.StorageClassName)
-	if err != nil {
-		return fmt.Errorf("could not get storage class: %w", err)
-	}
-
-	if storageClass.Provisioner != csiProvisionerServerName {
-		// We're not responsible for this PVC. Can only happen if
-		// controller-manager makes a mistake setting the annotations, but
-		// we're bailing here anyways for safety.
-		return nil
-	}
-
-	err = p.provisionPVC(pvc, storageClass)
-
-	if err != nil {
-		p.recorder.Eventf(pvc, v1.EventTypeWarning, "ProvisioningFailed", "Failed to provision PV: %v", err)
-		return err
-	}
-	p.recorder.Eventf(pvc, v1.EventTypeNormal, "Provisioned", "Successfully provisioned PV")
-
 	return nil
 }
 
 // provisionPVC creates the directory where the volume lives, sets a quota for
 // the requested amount of storage and creates the PV object representing this
 // new volume
-func (p *csiProvisionerServer) provisionPVC(pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
+func (p *csiProvisionerServer) provisionPVC(ctx context.Context, pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
+	key := cache.MetaObjectToName(pvc).String()
 	claimRef, err := ref.GetReference(scheme.Scheme, pvc)
 	if err != nil {
 		return fmt.Errorf("failed to get reference to PVC: %w", err)
 	}
 
 	storageReq := pvc.Spec.Resources.Requests[v1.ResourceStorage]
-	if storageReq.IsZero() {
-		return fmt.Errorf("PVC is not requesting any storage, this is not supported")
+	capacity, err := quantityToBytes(storageReq)
+	if err != nil {
+		return err
 	}
-	capacity, ok := storageReq.AsInt64()
-	if !ok {
-		return fmt.Errorf("PVC requesting more than 2^63 bytes of storage, this is not supported")
-	}
+	newSize := *resource.NewQuantity(capacity, resource.BinarySI)
 
 	volumeID := "pvc-" + string(pvc.ObjectMeta.UID)
+	if _, err := p.pvInformer.Lister().Get(volumeID); err == nil {
+		return nil // Volume already exists.
+	}
 	volumePath := p.volumePath(volumeID)
+	volumeMode := ptr.Deref(pvc.Spec.VolumeMode, "")
+	if volumeMode == "" {
+		volumeMode = v1.PersistentVolumeFilesystem
+	}
 
-	p.logger.Infof("Creating local PV %s", volumeID)
+	p.logger.Infof("Creating persistent volume %s with mode %s and size %s for claim %s", volumeID, volumeMode, newSize.String(), key)
 
-	switch *pvc.Spec.VolumeMode {
-	case "", v1.PersistentVolumeFilesystem:
+	switch volumeMode {
+	case v1.PersistentVolumeFilesystem:
 		if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
 			return fmt.Errorf("failed to create volume directory: %w", err)
 		}
@@ -288,23 +373,24 @@
 			return fmt.Errorf("failed to create volume image: %w", err)
 		}
 		defer imageFile.Close()
-		if err := unix.Fallocate(int(imageFile.Fd()), 0, 0, capacity); err != nil {
-			return fmt.Errorf("failed to fallocate() volume image: %w", err)
+		if err := allocateBlockVolume(imageFile, capacity); err != nil {
+			return fmt.Errorf("failed to allocate volume image: %w", err)
 		}
 	default:
-		return fmt.Errorf("VolumeMode \"%s\" is unsupported", *pvc.Spec.VolumeMode)
+		return fmt.Errorf("VolumeMode %q is unsupported", *pvc.Spec.VolumeMode)
 	}
 
 	vol := &v1.PersistentVolume{
 		ObjectMeta: metav1.ObjectMeta{
 			Name: volumeID,
 			Annotations: map[string]string{
-				"pv.kubernetes.io/provisioned-by": csiProvisionerServerName},
+				"pv.kubernetes.io/provisioned-by": csiProvisionerServerName,
+			},
 		},
 		Spec: v1.PersistentVolumeSpec{
 			AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
 			Capacity: v1.ResourceList{
-				v1.ResourceStorage: storageReq, // We're always giving the exact amount
+				v1.ResourceStorage: newSize,
 			},
 			PersistentVolumeSource: v1.PersistentVolumeSource{
 				CSI: &v1.CSIPersistentVolumeSource{
@@ -334,17 +420,180 @@
 		},
 	}
 
-	_, err = p.Kubernetes.CoreV1().PersistentVolumes().Create(context.Background(), vol, metav1.CreateOptions{})
+	_, err = p.Kubernetes.CoreV1().PersistentVolumes().Create(ctx, vol, metav1.CreateOptions{})
 	if err != nil && !apierrs.IsAlreadyExists(err) {
 		return fmt.Errorf("failed to create PV object: %w", err)
 	}
 	return nil
 }
 
+// See https://github.com/kubernetes-csi/external-resizer/blob/master/pkg/controller/expand_and_recover.go
+func (p *csiProvisionerServer) processResize(ctx context.Context, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
+	key := cache.MetaObjectToName(pvc).String()
+	requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
+	allocatedSize, hasAllocatedSize := pvc.Status.AllocatedResources[v1.ResourceStorage]
+	pvSize := pv.Spec.Capacity[v1.ResourceStorage]
+	resizeStatus := pvc.Status.AllocatedResourceStatuses[v1.ResourceStorage]
+
+	newSize := requestSize
+	if hasAllocatedSize {
+		// Usually, we want to keep resizing to the same target size once we have
+		// picked one and ignore changes in request size.
+		newSize = allocatedSize
+	}
+	switch resizeStatus {
+	case v1.PersistentVolumeClaimNodeResizePending,
+		v1.PersistentVolumeClaimNodeResizeInProgress:
+		// We are waiting for node resize. The PV should be large enough at this
+		// point, which means we don't need to do anything here.
+		if pvSize.Cmp(newSize) >= 0 || pvSize.Cmp(requestSize) >= 0 {
+			// We don't need to do anything and don't need to schedule a retry, but we
+			// still don't want to reset the rate limiter in case the node resize
+			// fails repeatedly.
+			return errSkipRateLimitReset
+		}
+	case "", v1.PersistentVolumeClaimControllerResizeInfeasible:
+		// In this case, there is no ongoing or partially complete resize operation,
+		// and we can be sure that the actually allocated size is equal to pvSize.
+		// That means it's safe to pick a new target size.
+		if pvSize.Cmp(requestSize) < 0 {
+			newSize = requestSize
+		}
+	}
+	capacity, err := quantityToBytes(newSize)
+	if err != nil {
+		return err
+	}
+
+	keepConditions := false
+	if hasAllocatedSize && allocatedSize.Cmp(newSize) == 0 {
+		now := time.Now()
+		if p.claimNextTry[key].After(now) {
+			// Not enough time has passed since the last attempt and the target size
+			// is still the same.
+			p.claimQueue.AddAfter(key, p.claimNextTry[key].Sub(now))
+			return errSkipRateLimitReset
+		}
+		keepConditions = true
+	}
+
+	newPVC := pvc.DeepCopy()
+	mapSet(&newPVC.Status.AllocatedResources, v1.ResourceStorage, newSize)
+	mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimControllerResizeInProgress)
+	conditions := []v1.PersistentVolumeClaimCondition{{
+		Type:               v1.PersistentVolumeClaimResizing,
+		Status:             v1.ConditionTrue,
+		LastTransitionTime: metav1.Now(),
+	}}
+	mergeResizeConditionOnPVC(newPVC, conditions, keepConditions)
+	pvc, err = p.patchPVCStatus(ctx, pvc, newPVC)
+	if err != nil {
+		return fmt.Errorf("failed to update PVC before resizing: %w", err)
+	}
+
+	expandedSize := *resource.NewQuantity(capacity, resource.BinarySI)
+	p.logger.Infof("Resizing persistent volume %s to new size %s", pv.Spec.CSI.VolumeHandle, expandedSize.String())
+	err = p.controllerExpandVolume(pv, capacity)
+	if err != nil {
+		// If the resize fails because the requested size is too large, then set
+		// status to infeasible, which allows the user to change the request to a
+		// smaller size.
+		isInfeasible := errors.Is(err, unix.ENOSPC) || errors.Is(err, unix.EDQUOT) || errors.Is(err, unix.EFBIG) || errors.Is(err, unix.EINVAL)
+		newPVC = pvc.DeepCopy()
+		if isInfeasible {
+			mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimControllerResizeInfeasible)
+		}
+		conditions = []v1.PersistentVolumeClaimCondition{{
+			Type:               v1.PersistentVolumeClaimControllerResizeError,
+			Status:             v1.ConditionTrue,
+			LastTransitionTime: metav1.Now(),
+			Message:            fmt.Sprintf("Failed to expand PV: %v", err),
+		}}
+		mergeResizeConditionOnPVC(newPVC, conditions, true)
+		_, patchErr := p.patchPVCStatus(ctx, pvc, newPVC)
+		if patchErr != nil {
+			return fmt.Errorf("failed to update PVC after resizing: %w", patchErr)
+		}
+		return fmt.Errorf("failed to expand PV: %w", err)
+	}
+
+	newPV := pv.DeepCopy()
+	newPV.Spec.Capacity[v1.ResourceStorage] = expandedSize
+	pv, err = patchPV(ctx, p.Kubernetes, pv, newPV)
+	if err != nil {
+		return fmt.Errorf("failed to update PV with new capacity: %w", err)
+	}
+	p.pvMutationCache.Mutation(pv)
+
+	newPVC = pvc.DeepCopy()
+	mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimNodeResizePending)
+	conditions = []v1.PersistentVolumeClaimCondition{{
+		Type:               v1.PersistentVolumeClaimFileSystemResizePending,
+		Status:             v1.ConditionTrue,
+		LastTransitionTime: metav1.Now(),
+	}}
+	mergeResizeConditionOnPVC(newPVC, conditions, true)
+	_, err = p.patchPVCStatus(ctx, pvc, newPVC)
+	if err != nil {
+		return fmt.Errorf("failed to update PVC after resizing: %w", err)
+	}
+
+	return nil
+}
+
+func (p *csiProvisionerServer) controllerExpandVolume(pv *v1.PersistentVolume, capacity int64) error {
+	volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
+	switch ptr.Deref(pv.Spec.VolumeMode, "") {
+	case "", v1.PersistentVolumeFilesystem:
+		if err := fsquota.SetQuota(volumePath, uint64(capacity), uint64(capacity)/inodeCapacityRatio); err != nil {
+			return fmt.Errorf("failed to update quota: %w", err)
+		}
+		return nil
+	case v1.PersistentVolumeBlock:
+		imageFile, err := os.OpenFile(volumePath, os.O_RDWR, 0)
+		if err != nil {
+			return fmt.Errorf("failed to open block volume backing file: %w", err)
+		}
+		defer imageFile.Close()
+		if err := allocateBlockVolume(imageFile, capacity); err != nil {
+			return fmt.Errorf("failed to allocate space: %w", err)
+		}
+		return nil
+	default:
+		return fmt.Errorf("VolumeMode %q is unsupported", *pv.Spec.VolumeMode)
+	}
+}
+
+func allocateBlockVolume(imageFile *os.File, capacity int64) error {
+	// On XFS, fallocate is not atomic: It allocates space in steps of around
+	// 8 GB, and does not check upfront if there is enough space to satisfy the
+	// entire allocation. As the last step, if allocation succeeded, it updates
+	// the file size. This means that fallocate can fail and leave the file size
+	// unchanged, but still allocate part of the requested capacity past EOF.
+	//
+	// To clean this up, we truncate the file to its current size, which leaves
+	// the size unchanged but removes allocated space past EOF. We also do this if
+	// fallocate succeeds, in case a previous allocation has left space past EOF
+	// and was not cleaned up.
+	allocErr := unix.Fallocate(int(imageFile.Fd()), 0, 0, capacity)
+	info, err := imageFile.Stat()
+	if err != nil {
+		return err
+	}
+	err = imageFile.Truncate(info.Size())
+	if err != nil {
+		return err
+	}
+	if allocErr != nil {
+		return fmt.Errorf("fallocate: %w", allocErr)
+	}
+	return nil
+}
+
 // processPV looks at a single PV item from the queue and checks if it has been
 // released and needs to be deleted. If yes it deletes the associated quota,
 // directory and the PV object and logs the result to the recorder.
-func (p *csiProvisionerServer) processPV(key string) error {
+func (p *csiProvisionerServer) processPV(ctx context.Context, key string) error {
 	_, name, err := cache.SplitMetaNamespaceKey(key)
 	if err != nil {
 		return fmt.Errorf("invalid resource key: %s", key)
@@ -359,22 +608,33 @@
 	if !p.isOurPV(pv) {
 		return nil
 	}
-	if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != "Released" {
+	if pv.Status.Phase == v1.VolumeBound && pv.Spec.ClaimRef != nil {
+		// Resize processing depends on both the PV and the claim. Instead of
+		// directly retrieving the claim here and calling processResize, we add it
+		// to the claimQueue. This ensures that all resize retries are handled by
+		// the claimQueue.
+		claimKey := cache.NewObjectName(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name).String()
+		p.claimQueue.Add(claimKey)
+	}
+	if pv.ObjectMeta.DeletionTimestamp != nil {
+		return nil
+	}
+	if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != v1.VolumeReleased {
 		return nil
 	}
 	volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
 
 	// Log deletes for auditing purposes
 	p.logger.Infof("Deleting persistent volume %s", pv.Spec.CSI.VolumeHandle)
-	switch *pv.Spec.VolumeMode {
+	switch ptr.Deref(pv.Spec.VolumeMode, "") {
 	case "", v1.PersistentVolumeFilesystem:
-		if err := fsquota.SetQuota(volumePath, 0, 0); err != nil {
+		if err := fsquota.SetQuota(volumePath, 0, 0); err != nil && !os.IsNotExist(err) {
 			// We record these here manually since a successful deletion
 			// removes the PV we'd be attaching them to.
 			p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to remove quota: %v", err)
 			return fmt.Errorf("failed to remove quota: %w", err)
 		}
-		if err := os.RemoveAll(volumePath); err != nil && !os.IsNotExist(err) {
+		if err := os.RemoveAll(volumePath); err != nil {
 			p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
 			return fmt.Errorf("failed to delete volume: %w", err)
 		}
@@ -388,10 +648,168 @@
 		return fmt.Errorf("invalid volume mode \"%v\"", *pv.Spec.VolumeMode)
 	}
 
-	err = p.Kubernetes.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})
+	err = p.Kubernetes.CoreV1().PersistentVolumes().Delete(ctx, pv.Name, metav1.DeleteOptions{})
 	if err != nil && !apierrs.IsNotFound(err) {
 		p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete PV object from K8s API: %v", err)
 		return fmt.Errorf("failed to delete PV object: %w", err)
 	}
 	return nil
 }
+
+// quantityToBytes returns size rounded up to an integer amount.
+// Based on Kubernetes staging/src/k8s.io/cloud-provider/volume/helpers/rounding.go
+func quantityToBytes(size resource.Quantity) (int64, error) {
+	if size.CmpInt64(math.MaxInt64) >= 0 {
+		return 0, fmt.Errorf("quantity %s is too big, overflows int64", size.String())
+	}
+	val := size.Value()
+	if val <= 0 {
+		return 0, fmt.Errorf("invalid quantity %s, must be positive", size.String())
+	}
+	return val, nil
+}
+
+// patchPVCStatus, createPVCPatch, addResourceVersion, patchPV,
+// mergeResizeConditionOnPVC are taken from Kubernetes
+// pkg/volume/util/resize_util.go under Apache 2.0 and modified.
+
+// patchPVCStatus updates a PVC using patch instead of update. Update should not
+// be used because when the client is an older version, it will drop fields
+// which it does not support. It's done this way in both kubelet and
+// external-resizer.
+func (p *csiProvisionerServer) patchPVCStatus(
+	ctx context.Context,
+	oldPVC *v1.PersistentVolumeClaim,
+	newPVC *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
+	patchBytes, err := createPVCPatch(oldPVC, newPVC, true /* addResourceVersionCheck */)
+	if err != nil {
+		return oldPVC, fmt.Errorf("failed to create PVC patch: %w", err)
+	}
+
+	updatedClaim, updateErr := p.Kubernetes.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
+		Patch(ctx, oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
+	if updateErr != nil {
+		return oldPVC, fmt.Errorf("failed to patch PVC object: %w", updateErr)
+	}
+	p.pvcMutationCache.Mutation(updatedClaim)
+	return updatedClaim, nil
+}
+
+func createPVCPatch(
+	oldPVC *v1.PersistentVolumeClaim,
+	newPVC *v1.PersistentVolumeClaim, addResourceVersionCheck bool) ([]byte, error) {
+	oldData, err := json.Marshal(oldPVC)
+	if err != nil {
+		return nil, fmt.Errorf("failed to marshal old data: %w", err)
+	}
+
+	newData, err := json.Marshal(newPVC)
+	if err != nil {
+		return nil, fmt.Errorf("failed to marshal new data: %w", err)
+	}
+
+	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPVC)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create 2 way merge patch: %w", err)
+	}
+
+	if addResourceVersionCheck {
+		patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
+		if err != nil {
+			return nil, fmt.Errorf("failed to add resource version: %w", err)
+		}
+	}
+
+	return patchBytes, nil
+}
+
+func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
+	var patchMap map[string]interface{}
+	err := json.Unmarshal(patchBytes, &patchMap)
+	if err != nil {
+		return nil, fmt.Errorf("error unmarshalling patch: %w", err)
+	}
+	u := unstructured.Unstructured{Object: patchMap}
+	u.SetResourceVersion(resourceVersion)
+	versionBytes, err := json.Marshal(patchMap)
+	if err != nil {
+		return nil, fmt.Errorf("error marshalling json patch: %w", err)
+	}
+	return versionBytes, nil
+}
+
+func patchPV(
+	ctx context.Context,
+	kubeClient kubernetes.Interface,
+	oldPV *v1.PersistentVolume,
+	newPV *v1.PersistentVolume) (*v1.PersistentVolume, error) {
+	oldData, err := json.Marshal(oldPV)
+	if err != nil {
+		return oldPV, fmt.Errorf("failed to marshal old data: %w", err)
+	}
+
+	newData, err := json.Marshal(newPV)
+	if err != nil {
+		return oldPV, fmt.Errorf("failed to marshal new data: %w", err)
+	}
+
+	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPV)
+	if err != nil {
+		return oldPV, fmt.Errorf("failed to create 2 way merge patch: %w", err)
+	}
+
+	updatedPV, err := kubeClient.CoreV1().PersistentVolumes().
+		Patch(ctx, oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
+	if err != nil {
+		return oldPV, fmt.Errorf("failed to patch PV object: %w", err)
+	}
+	return updatedPV, nil
+}
+
+var knownResizeConditions = map[v1.PersistentVolumeClaimConditionType]bool{
+	v1.PersistentVolumeClaimFileSystemResizePending: true,
+	v1.PersistentVolumeClaimResizing:                true,
+	v1.PersistentVolumeClaimControllerResizeError:   true,
+	v1.PersistentVolumeClaimNodeResizeError:         true,
+}
+
+// mergeResizeConditionOnPVC updates pvc with requested resize conditions
+// leaving other conditions untouched.
+func mergeResizeConditionOnPVC(
+	pvc *v1.PersistentVolumeClaim,
+	resizeConditions []v1.PersistentVolumeClaimCondition,
+	keepOldResizeConditions bool) {
+	resizeConditionMap := map[v1.PersistentVolumeClaimConditionType]v1.PersistentVolumeClaimCondition{}
+	for _, condition := range resizeConditions {
+		resizeConditionMap[condition.Type] = condition
+	}
+
+	var newConditions []v1.PersistentVolumeClaimCondition
+	for _, condition := range pvc.Status.Conditions {
+		// If Condition is of not resize type, we keep it.
+		if _, ok := knownResizeConditions[condition.Type]; !ok {
+			newConditions = append(newConditions, condition)
+			continue
+		}
+
+		if newCondition, ok := resizeConditionMap[condition.Type]; ok {
+			newConditions = append(newConditions, newCondition)
+			delete(resizeConditionMap, condition.Type)
+		} else if keepOldResizeConditions {
+			newConditions = append(newConditions, condition)
+		}
+	}
+
+	for _, newCondition := range resizeConditionMap {
+		newConditions = append(newConditions, newCondition)
+	}
+	pvc.Status.Conditions = newConditions
+}
+
+// mapSet is like `(*m)[key] = value` but also initializes *m if it is nil.
+func mapSet[Map ~map[K]V, K comparable, V any](m *Map, key K, value V) {
+	if *m == nil {
+		*m = make(map[K]V)
+	}
+	(*m)[key] = value
+}
diff --git a/metropolis/node/kubernetes/reconciler/resources_rbac.go b/metropolis/node/kubernetes/reconciler/resources_rbac.go
index 9ce9942..bf9885c 100644
--- a/metropolis/node/kubernetes/reconciler/resources_rbac.go
+++ b/metropolis/node/kubernetes/reconciler/resources_rbac.go
@@ -73,8 +73,18 @@
 				},
 				{
 					APIGroups: []string{""},
-					Resources: []string{"persistentvolumes", "persistentvolumeclaims"},
-					Verbs:     []string{"*"},
+					Resources: []string{"persistentvolumes"},
+					Verbs:     []string{"get", "list", "watch", "create", "patch", "delete"},
+				},
+				{
+					APIGroups: []string{""},
+					Resources: []string{"persistentvolumeclaims"},
+					Verbs:     []string{"get", "list", "watch"},
+				},
+				{
+					APIGroups: []string{""},
+					Resources: []string{"persistentvolumeclaims/status"},
+					Verbs:     []string{"patch"},
 				},
 			},
 		},
