blob: 9b5a31c579a53152aeca4a75340a089b3db2eee8 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
Lorenz Brunb15abad2020-04-16 11:17:12 +02002// SPDX-License-Identifier: Apache-2.0
Lorenz Brunb15abad2020-04-16 11:17:12 +02003
4package kubernetes
5
6import (
7 "context"
Jan Schärb00f7f92025-03-06 17:27:22 +01008 "encoding/json"
Lorenz Brunb15abad2020-04-16 11:17:12 +02009 "errors"
10 "fmt"
Jan Schärb00f7f92025-03-06 17:27:22 +010011 "math"
Lorenz Brunb15abad2020-04-16 11:17:12 +020012 "os"
13 "path/filepath"
Jan Schärb00f7f92025-03-06 17:27:22 +010014 "sync"
15 "time"
Lorenz Brunb15abad2020-04-16 11:17:12 +020016
Lorenz Brun37050122021-03-30 14:00:27 +020017 "golang.org/x/sys/unix"
Lorenz Brunb15abad2020-04-16 11:17:12 +020018 v1 "k8s.io/api/core/v1"
19 storagev1 "k8s.io/api/storage/v1"
20 apierrs "k8s.io/apimachinery/pkg/api/errors"
Jan Schärb00f7f92025-03-06 17:27:22 +010021 "k8s.io/apimachinery/pkg/api/resource"
Lorenz Brunb15abad2020-04-16 11:17:12 +020022 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Jan Schärb00f7f92025-03-06 17:27:22 +010023 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
24 "k8s.io/apimachinery/pkg/types"
25 "k8s.io/apimachinery/pkg/util/strategicpatch"
Lorenz Brunb15abad2020-04-16 11:17:12 +020026 "k8s.io/client-go/informers"
27 coreinformers "k8s.io/client-go/informers/core/v1"
28 storageinformers "k8s.io/client-go/informers/storage/v1"
29 "k8s.io/client-go/kubernetes"
30 "k8s.io/client-go/kubernetes/scheme"
31 typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
32 "k8s.io/client-go/tools/cache"
33 "k8s.io/client-go/tools/record"
34 ref "k8s.io/client-go/tools/reference"
35 "k8s.io/client-go/util/workqueue"
Jan Schärb00f7f92025-03-06 17:27:22 +010036 "k8s.io/utils/ptr"
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020037
Serge Bazanski3c5d0632024-09-12 10:49:12 +000038 "source.monogon.dev/go/logging"
Serge Bazanski31370b02021-01-07 16:31:14 +010039 "source.monogon.dev/metropolis/node/core/localstorage"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020040 "source.monogon.dev/osbase/fsquota"
Tim Windelschmidtcfbc9032025-07-15 14:18:45 +020041 "source.monogon.dev/osbase/logtree"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020042 "source.monogon.dev/osbase/supervisor"
Lorenz Brunb15abad2020-04-16 11:17:12 +020043)
44
Lorenz Brun397f7ea2024-08-20 21:26:06 +020045// inodeCapacityRatio describes the ratio between the byte capacity of a volume
46// and its inode capacity. One inode on XFS is 512 bytes and by default 25%
47// (1/4) of capacity can be used for metadata.
48const inodeCapacityRatio = 4 * 512
49
Serge Bazanski216fe7b2021-05-21 18:36:16 +020050// ONCHANGE(//metropolis/node/kubernetes/reconciler:resources_csi.go): needs to
51// match csiProvisionerServerName declared.
Serge Bazanski662b5b32020-12-21 13:49:00 +010052const csiProvisionerServerName = "dev.monogon.metropolis.vfs"
Lorenz Brunb15abad2020-04-16 11:17:12 +020053
Serge Bazanski216fe7b2021-05-21 18:36:16 +020054// csiProvisionerServer is responsible for the provisioning and deprovisioning
55// of CSI-based container volumes. It runs on all nodes and watches PVCs for
56// ones assigned to the node it's running on and fulfills the provisioning
57// request by creating a directory, applying a quota and creating the
58// corresponding PV. When the PV is released and its retention policy is
59// Delete, the directory and the PV resource are deleted.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020060type csiProvisionerServer struct {
61 NodeName string
62 Kubernetes kubernetes.Interface
63 InformerFactory informers.SharedInformerFactory
64 VolumesDirectory *localstorage.DataVolumesDirectory
65
Jan Schärb00f7f92025-03-06 17:27:22 +010066 claimQueue workqueue.TypedDelayingInterface[string]
67 claimRateLimiter workqueue.TypedRateLimiter[string]
68 claimNextTry map[string]time.Time
Jan Schär896b1382025-01-15 13:54:26 +010069 pvQueue workqueue.TypedRateLimitingInterface[string]
Lorenz Brunb15abad2020-04-16 11:17:12 +020070 recorder record.EventRecorder
71 pvcInformer coreinformers.PersistentVolumeClaimInformer
72 pvInformer coreinformers.PersistentVolumeInformer
73 storageClassInformer storageinformers.StorageClassInformer
Jan Schärb00f7f92025-03-06 17:27:22 +010074 pvcMutationCache cache.MutationCache
75 pvMutationCache cache.MutationCache
76 // processMutex ensures that the two workers (one for PVCs and one for PVs)
77 // are not doing work concurrently.
78 processMutex sync.Mutex
79 logger logging.Leveled
Lorenz Brunb15abad2020-04-16 11:17:12 +020080}
81
Serge Bazanski216fe7b2021-05-21 18:36:16 +020082// runCSIProvisioner runs the main provisioning machinery. It consists of a
83// bunch of informers which keep track of the events happening on the
84// Kubernetes control plane and informs us when something happens. If anything
85// happens to PVCs or PVs, we enqueue the identifier of that resource in a work
86// queue. Queues are being worked on by only one worker to limit load and avoid
87// complicated locking infrastructure. Failed items are requeued.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020088func (p *csiProvisionerServer) Run(ctx context.Context) error {
Tim Windelschmidtcfbc9032025-07-15 14:18:45 +020089 p.logger = supervisor.Logger(ctx)
90
Serge Bazanski216fe7b2021-05-21 18:36:16 +020091 // The recorder is used to log Kubernetes events for successful or failed
92 // volume provisions. These events then show up in `kubectl describe pvc`
93 // and can be used by admins to debug issues with this provisioner.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020094 eventBroadcaster := record.NewBroadcaster()
95 eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.Kubernetes.CoreV1().Events("")})
96 p.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerServerName, Host: p.NodeName})
Lorenz Brunb15abad2020-04-16 11:17:12 +020097
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020098 p.pvcInformer = p.InformerFactory.Core().V1().PersistentVolumeClaims()
Jan Schärb00f7f92025-03-06 17:27:22 +010099 p.pvInformer = p.InformerFactory.Core().V1().PersistentVolumes()
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200100 p.storageClassInformer = p.InformerFactory.Storage().V1().StorageClasses()
Tim Windelschmidtcfbc9032025-07-15 14:18:45 +0200101
102 klogger := logtree.NewKlogLogger(p.logger)
103 p.pvcMutationCache = cache.NewIntegerResourceVersionMutationCache(klogger, p.pvcInformer.Informer().GetStore(), nil, time.Minute, false)
104 p.pvMutationCache = cache.NewIntegerResourceVersionMutationCache(klogger, p.pvInformer.Informer().GetStore(), nil, time.Minute, false)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200105
Jan Schärb00f7f92025-03-06 17:27:22 +0100106 p.claimQueue = workqueue.NewTypedDelayingQueue[string]()
107 p.claimRateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, 5*time.Minute)
108 p.claimNextTry = make(map[string]time.Time)
Jan Schär896b1382025-01-15 13:54:26 +0100109 p.pvQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
Lorenz Brunb15abad2020-04-16 11:17:12 +0200110
Serge Bazanskice19acc2023-03-21 16:28:07 +0100111 p.pvcInformer.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
112 p.logger.Errorf("pvcInformer watch error: %v", err)
113 })
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200114 p.pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
115 AddFunc: p.enqueueClaim,
116 UpdateFunc: func(old, new interface{}) {
117 p.enqueueClaim(new)
118 },
Jan Schärb00f7f92025-03-06 17:27:22 +0100119 // We need to handle deletes to ensure that deleted keys are removed from
120 // the rate limiter, because there are cases where we leave a key in the
121 // rate limiter without scheduling a retry.
122 DeleteFunc: p.enqueueClaim,
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200123 })
124 p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
125 AddFunc: p.enqueuePV,
126 UpdateFunc: func(old, new interface{}) {
127 p.enqueuePV(new)
128 },
129 })
Serge Bazanskice19acc2023-03-21 16:28:07 +0100130 p.pvInformer.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
131 p.logger.Errorf("pvInformer watch error: %v", err)
132 })
133
134 p.storageClassInformer.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
135 p.logger.Errorf("storageClassInformer watch error: %v", err)
136 })
Lorenz Brunb15abad2020-04-16 11:17:12 +0200137
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200138 go p.pvcInformer.Informer().Run(ctx.Done())
139 go p.pvInformer.Informer().Run(ctx.Done())
140 go p.storageClassInformer.Informer().Run(ctx.Done())
Lorenz Brunb15abad2020-04-16 11:17:12 +0200141
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200142 // These will self-terminate once the queues are shut down
Jan Schärb00f7f92025-03-06 17:27:22 +0100143 go p.processQueueItems(p.claimQueue, func(key string) {
144 p.processPVCRetryWrapper(ctx, key)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200145 })
Jan Schärb00f7f92025-03-06 17:27:22 +0100146 go p.processQueueItems(p.pvQueue, func(key string) {
147 p.processPVRetryWrapper(ctx, key)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200148 })
Lorenz Brunb15abad2020-04-16 11:17:12 +0200149
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200150 supervisor.Signal(ctx, supervisor.SignalHealthy)
151 <-ctx.Done()
152 p.claimQueue.ShutDown()
153 p.pvQueue.ShutDown()
154 return nil
Lorenz Brunb15abad2020-04-16 11:17:12 +0200155}
156
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200157// isOurPVC checks if the given PVC is is to be provisioned by this provisioner
158// and has been scheduled onto this node
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200159func (p *csiProvisionerServer) isOurPVC(pvc *v1.PersistentVolumeClaim) bool {
160 if pvc.ObjectMeta.Annotations["volume.beta.kubernetes.io/storage-provisioner"] != csiProvisionerServerName {
161 return false
162 }
163 if pvc.ObjectMeta.Annotations["volume.kubernetes.io/selected-node"] != p.NodeName {
164 return false
165 }
166 return true
Lorenz Brunb15abad2020-04-16 11:17:12 +0200167}
168
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200169// isOurPV checks if the given PV has been provisioned by this provisioner and
170// has been scheduled onto this node
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200171func (p *csiProvisionerServer) isOurPV(pv *v1.PersistentVolume) bool {
Jan Schärb00f7f92025-03-06 17:27:22 +0100172 if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != csiProvisionerServerName {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200173 return false
174 }
175 if pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] != p.NodeName {
176 return false
177 }
178 return true
Lorenz Brunb15abad2020-04-16 11:17:12 +0200179}
180
181// enqueueClaim adds an added/changed PVC to the work queue
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200182func (p *csiProvisionerServer) enqueueClaim(obj interface{}) {
Jan Schärb00f7f92025-03-06 17:27:22 +0100183 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200184 if err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100185 p.logger.Errorf("Not queuing PVC because key could not be derived: %v", err)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200186 return
187 }
188 p.claimQueue.Add(key)
189}
190
191// enqueuePV adds an added/changed PV to the work queue
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200192func (p *csiProvisionerServer) enqueuePV(obj interface{}) {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200193 key, err := cache.MetaNamespaceKeyFunc(obj)
194 if err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100195 p.logger.Errorf("Not queuing PV because key could not be derived: %v", err)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200196 return
197 }
198 p.pvQueue.Add(key)
199}
200
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200201// processQueueItems gets items from the given work queue and calls the process
202// function for each of them. It self- terminates once the queue is shut down.
Jan Schärb00f7f92025-03-06 17:27:22 +0100203func (p *csiProvisionerServer) processQueueItems(queue workqueue.TypedInterface[string], process func(key string)) {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200204 for {
205 obj, shutdown := queue.Get()
206 if shutdown {
207 return
208 }
209
Jan Schär896b1382025-01-15 13:54:26 +0100210 func(obj string) {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200211 defer queue.Done(obj)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200212
Jan Schärb00f7f92025-03-06 17:27:22 +0100213 p.processMutex.Lock()
214 defer p.processMutex.Unlock()
215
216 process(obj)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200217 }(obj)
218 }
219}
220
Jan Schärb00f7f92025-03-06 17:27:22 +0100221var errSkipRateLimitReset = errors.New("skip ratelimit reset")
222
223func (p *csiProvisionerServer) processPVCRetryWrapper(ctx context.Context, key string) {
224 err := p.processPVC(ctx, key)
225 if errors.Is(err, errSkipRateLimitReset) {
226 // ignore
227 } else if err != nil {
228 p.logger.Warningf("Failed processing PVC %s, requeueing (numrequeues: %d): %v", key, p.claimRateLimiter.NumRequeues(key), err)
229 duration := p.claimRateLimiter.When(key)
230 p.claimNextTry[key] = time.Now().Add(duration)
231 p.claimQueue.AddAfter(key, duration)
232 } else {
233 p.claimRateLimiter.Forget(key)
234 delete(p.claimNextTry, key)
235 }
236}
237
238func (p *csiProvisionerServer) processPVRetryWrapper(ctx context.Context, key string) {
239 if err := p.processPV(ctx, key); err != nil {
240 p.logger.Warningf("Failed processing PV %s, requeueing (numrequeues: %d): %v", key, p.pvQueue.NumRequeues(key), err)
241 p.pvQueue.AddRateLimited(key)
242 } else {
243 p.pvQueue.Forget(key)
244 }
245}
246
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200247// volumePath gets the path where the volume is stored.
248func (p *csiProvisionerServer) volumePath(volumeID string) string {
249 return filepath.Join(p.VolumesDirectory.FullPath(), volumeID)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200250}
251
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200252// processPVC looks at a single PVC item from the queue, determines if it needs
253// to be provisioned and logs the provisioning result to the recorder
Jan Schärb00f7f92025-03-06 17:27:22 +0100254func (p *csiProvisionerServer) processPVC(ctx context.Context, key string) error {
255 val, exists, err := p.pvcMutationCache.GetByKey(key)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200256 if err != nil {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200257 return fmt.Errorf("failed to get PVC for processing: %w", err)
258 }
Jan Schärb00f7f92025-03-06 17:27:22 +0100259 if !exists {
260 return nil // nothing to do, no error
261 }
262 pvc, ok := val.(*v1.PersistentVolumeClaim)
263 if !ok {
264 return fmt.Errorf("value in MutationCache is not a PVC: %+v", val)
265 }
Lorenz Brunb15abad2020-04-16 11:17:12 +0200266
267 if !p.isOurPVC(pvc) {
268 return nil
269 }
270
Jan Schärb00f7f92025-03-06 17:27:22 +0100271 if pvc.Spec.VolumeName == "" {
272 // The claim is pending, so we may need to provision it.
273 storageClass, err := p.storageClassInformer.Lister().Get(*pvc.Spec.StorageClassName)
274 if err != nil {
275 return fmt.Errorf("could not get storage class: %w", err)
276 }
277
278 if storageClass.Provisioner != csiProvisionerServerName {
279 // We're not responsible for this PVC. Can only happen if
280 // controller-manager makes a mistake setting the annotations, but
281 // we're bailing here anyways for safety.
282 return nil
283 }
284
285 err = p.provisionPVC(ctx, pvc, storageClass)
286
287 if err != nil {
288 p.recorder.Eventf(pvc, v1.EventTypeWarning, "ProvisioningFailed", "Failed to provision PV: %v", err)
289 return err
290 }
291 } else if pvc.Status.Phase == v1.ClaimBound {
292 // The claim is bound, so we may need to resize it.
293 requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
294 statusSize := pvc.Status.Capacity[v1.ResourceStorage]
295 if requestSize.Cmp(statusSize) <= 0 {
296 // No resize needed.
297 return nil
298 }
299
300 val, exists, err := p.pvMutationCache.GetByKey(pvc.Spec.VolumeName)
301 if err != nil {
302 return fmt.Errorf("failed to get PV of PVC %s: %w", key, err)
303 }
304 if !exists {
305 return nil
306 }
307 pv, ok := val.(*v1.PersistentVolume)
308 if !ok {
309 return fmt.Errorf("value in MutationCache is not a PV: %+v", val)
310 }
311 if pv.Status.Phase != v1.VolumeBound || pv.Spec.ClaimRef == nil || pv.Spec.ClaimRef.UID != pvc.UID {
312 return nil
313 }
314 if !p.isOurPV(pv) {
315 return nil
316 }
317
318 err = p.processResize(ctx, pvc, pv)
319 if errors.Is(err, errSkipRateLimitReset) {
320 return err
321 } else if err != nil {
322 p.recorder.Eventf(pvc, v1.EventTypeWarning, "VolumeResizeFailed", "Failed to resize PV: %v", err)
323 return fmt.Errorf("failed to process resize of PVC %s: %w", key, err)
324 }
Lorenz Brunb15abad2020-04-16 11:17:12 +0200325 }
Lorenz Brunb15abad2020-04-16 11:17:12 +0200326 return nil
327}
328
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200329// provisionPVC creates the directory where the volume lives, sets a quota for
330// the requested amount of storage and creates the PV object representing this
331// new volume
Jan Schärb00f7f92025-03-06 17:27:22 +0100332func (p *csiProvisionerServer) provisionPVC(ctx context.Context, pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
333 key := cache.MetaObjectToName(pvc).String()
Lorenz Brunb15abad2020-04-16 11:17:12 +0200334 claimRef, err := ref.GetReference(scheme.Scheme, pvc)
335 if err != nil {
336 return fmt.Errorf("failed to get reference to PVC: %w", err)
337 }
338
339 storageReq := pvc.Spec.Resources.Requests[v1.ResourceStorage]
Jan Schärb00f7f92025-03-06 17:27:22 +0100340 capacity, err := quantityToBytes(storageReq)
341 if err != nil {
342 return err
Lorenz Brunb15abad2020-04-16 11:17:12 +0200343 }
Jan Schärb00f7f92025-03-06 17:27:22 +0100344 newSize := *resource.NewQuantity(capacity, resource.BinarySI)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200345
Lorenz Brunb15abad2020-04-16 11:17:12 +0200346 volumeID := "pvc-" + string(pvc.ObjectMeta.UID)
Jan Schärb00f7f92025-03-06 17:27:22 +0100347 if _, err := p.pvInformer.Lister().Get(volumeID); err == nil {
348 return nil // Volume already exists.
349 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200350 volumePath := p.volumePath(volumeID)
Jan Schärb00f7f92025-03-06 17:27:22 +0100351 volumeMode := ptr.Deref(pvc.Spec.VolumeMode, "")
352 if volumeMode == "" {
353 volumeMode = v1.PersistentVolumeFilesystem
354 }
Lorenz Brunb15abad2020-04-16 11:17:12 +0200355
Jan Schärb00f7f92025-03-06 17:27:22 +0100356 p.logger.Infof("Creating persistent volume %s with mode %s and size %s for claim %s", volumeID, volumeMode, newSize.String(), key)
Lorenz Brun37050122021-03-30 14:00:27 +0200357
Jan Schärb00f7f92025-03-06 17:27:22 +0100358 switch volumeMode {
359 case v1.PersistentVolumeFilesystem:
Lorenz Brun37050122021-03-30 14:00:27 +0200360 if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
361 return fmt.Errorf("failed to create volume directory: %w", err)
362 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100363 files, err := os.ReadDir(volumePath)
Lorenz Brun37050122021-03-30 14:00:27 +0200364 if err != nil {
365 return fmt.Errorf("failed to list files in newly-created volume: %w", err)
366 }
367 if len(files) > 0 {
368 return errors.New("newly-created volume already contains data, bailing")
369 }
Lorenz Brun397f7ea2024-08-20 21:26:06 +0200370 if err := fsquota.SetQuota(volumePath, uint64(capacity), uint64(capacity)/inodeCapacityRatio); err != nil {
Serge Bazanskice19acc2023-03-21 16:28:07 +0100371 return fmt.Errorf("failed to update quota: %w", err)
Lorenz Brun37050122021-03-30 14:00:27 +0200372 }
373 case v1.PersistentVolumeBlock:
374 imageFile, err := os.OpenFile(volumePath, os.O_CREATE|os.O_RDWR, 0644)
375 if err != nil {
376 return fmt.Errorf("failed to create volume image: %w", err)
377 }
378 defer imageFile.Close()
Jan Schärb00f7f92025-03-06 17:27:22 +0100379 if err := allocateBlockVolume(imageFile, capacity); err != nil {
380 return fmt.Errorf("failed to allocate volume image: %w", err)
Lorenz Brun37050122021-03-30 14:00:27 +0200381 }
382 default:
Jan Schärb00f7f92025-03-06 17:27:22 +0100383 return fmt.Errorf("VolumeMode %q is unsupported", *pvc.Spec.VolumeMode)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200384 }
385
386 vol := &v1.PersistentVolume{
387 ObjectMeta: metav1.ObjectMeta{
388 Name: volumeID,
389 Annotations: map[string]string{
Jan Schärb00f7f92025-03-06 17:27:22 +0100390 "pv.kubernetes.io/provisioned-by": csiProvisionerServerName,
391 },
Lorenz Brunb15abad2020-04-16 11:17:12 +0200392 },
393 Spec: v1.PersistentVolumeSpec{
394 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
395 Capacity: v1.ResourceList{
Jan Schärb00f7f92025-03-06 17:27:22 +0100396 v1.ResourceStorage: newSize,
Lorenz Brunb15abad2020-04-16 11:17:12 +0200397 },
398 PersistentVolumeSource: v1.PersistentVolumeSource{
399 CSI: &v1.CSIPersistentVolumeSource{
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200400 Driver: csiProvisionerServerName,
Lorenz Brunb15abad2020-04-16 11:17:12 +0200401 VolumeHandle: volumeID,
402 },
403 },
Lorenz Brun37050122021-03-30 14:00:27 +0200404 ClaimRef: claimRef,
405 VolumeMode: pvc.Spec.VolumeMode,
Lorenz Brunb15abad2020-04-16 11:17:12 +0200406 NodeAffinity: &v1.VolumeNodeAffinity{
407 Required: &v1.NodeSelector{
408 NodeSelectorTerms: []v1.NodeSelectorTerm{
409 {
410 MatchExpressions: []v1.NodeSelectorRequirement{
411 {
412 Key: "kubernetes.io/hostname",
413 Operator: v1.NodeSelectorOpIn,
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200414 Values: []string{p.NodeName},
Lorenz Brunb15abad2020-04-16 11:17:12 +0200415 },
416 },
417 },
418 },
419 },
420 },
421 StorageClassName: *pvc.Spec.StorageClassName,
422 PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,
423 },
424 }
425
Jan Schärb00f7f92025-03-06 17:27:22 +0100426 _, err = p.Kubernetes.CoreV1().PersistentVolumes().Create(ctx, vol, metav1.CreateOptions{})
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200427 if err != nil && !apierrs.IsAlreadyExists(err) {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200428 return fmt.Errorf("failed to create PV object: %w", err)
429 }
430 return nil
431}
432
Jan Schärb00f7f92025-03-06 17:27:22 +0100433// See https://github.com/kubernetes-csi/external-resizer/blob/master/pkg/controller/expand_and_recover.go
434func (p *csiProvisionerServer) processResize(ctx context.Context, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
435 key := cache.MetaObjectToName(pvc).String()
436 requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
437 allocatedSize, hasAllocatedSize := pvc.Status.AllocatedResources[v1.ResourceStorage]
438 pvSize := pv.Spec.Capacity[v1.ResourceStorage]
439 resizeStatus := pvc.Status.AllocatedResourceStatuses[v1.ResourceStorage]
440
441 newSize := requestSize
442 if hasAllocatedSize {
443 // Usually, we want to keep resizing to the same target size once we have
444 // picked one and ignore changes in request size.
445 newSize = allocatedSize
446 }
447 switch resizeStatus {
448 case v1.PersistentVolumeClaimNodeResizePending,
449 v1.PersistentVolumeClaimNodeResizeInProgress:
450 // We are waiting for node resize. The PV should be large enough at this
451 // point, which means we don't need to do anything here.
452 if pvSize.Cmp(newSize) >= 0 || pvSize.Cmp(requestSize) >= 0 {
453 // We don't need to do anything and don't need to schedule a retry, but we
454 // still don't want to reset the rate limiter in case the node resize
455 // fails repeatedly.
456 return errSkipRateLimitReset
457 }
458 case "", v1.PersistentVolumeClaimControllerResizeInfeasible:
459 // In this case, there is no ongoing or partially complete resize operation,
460 // and we can be sure that the actually allocated size is equal to pvSize.
461 // That means it's safe to pick a new target size.
462 if pvSize.Cmp(requestSize) < 0 {
463 newSize = requestSize
464 }
465 }
466 capacity, err := quantityToBytes(newSize)
467 if err != nil {
468 return err
469 }
470
471 keepConditions := false
472 if hasAllocatedSize && allocatedSize.Cmp(newSize) == 0 {
473 now := time.Now()
474 if p.claimNextTry[key].After(now) {
475 // Not enough time has passed since the last attempt and the target size
476 // is still the same.
477 p.claimQueue.AddAfter(key, p.claimNextTry[key].Sub(now))
478 return errSkipRateLimitReset
479 }
480 keepConditions = true
481 }
482
483 newPVC := pvc.DeepCopy()
484 mapSet(&newPVC.Status.AllocatedResources, v1.ResourceStorage, newSize)
485 mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimControllerResizeInProgress)
486 conditions := []v1.PersistentVolumeClaimCondition{{
487 Type: v1.PersistentVolumeClaimResizing,
488 Status: v1.ConditionTrue,
489 LastTransitionTime: metav1.Now(),
490 }}
491 mergeResizeConditionOnPVC(newPVC, conditions, keepConditions)
492 pvc, err = p.patchPVCStatus(ctx, pvc, newPVC)
493 if err != nil {
494 return fmt.Errorf("failed to update PVC before resizing: %w", err)
495 }
496
497 expandedSize := *resource.NewQuantity(capacity, resource.BinarySI)
498 p.logger.Infof("Resizing persistent volume %s to new size %s", pv.Spec.CSI.VolumeHandle, expandedSize.String())
499 err = p.controllerExpandVolume(pv, capacity)
500 if err != nil {
501 // If the resize fails because the requested size is too large, then set
502 // status to infeasible, which allows the user to change the request to a
503 // smaller size.
504 isInfeasible := errors.Is(err, unix.ENOSPC) || errors.Is(err, unix.EDQUOT) || errors.Is(err, unix.EFBIG) || errors.Is(err, unix.EINVAL)
505 newPVC = pvc.DeepCopy()
506 if isInfeasible {
507 mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimControllerResizeInfeasible)
508 }
509 conditions = []v1.PersistentVolumeClaimCondition{{
510 Type: v1.PersistentVolumeClaimControllerResizeError,
511 Status: v1.ConditionTrue,
512 LastTransitionTime: metav1.Now(),
513 Message: fmt.Sprintf("Failed to expand PV: %v", err),
514 }}
515 mergeResizeConditionOnPVC(newPVC, conditions, true)
516 _, patchErr := p.patchPVCStatus(ctx, pvc, newPVC)
517 if patchErr != nil {
518 return fmt.Errorf("failed to update PVC after resizing: %w", patchErr)
519 }
520 return fmt.Errorf("failed to expand PV: %w", err)
521 }
522
523 newPV := pv.DeepCopy()
524 newPV.Spec.Capacity[v1.ResourceStorage] = expandedSize
525 pv, err = patchPV(ctx, p.Kubernetes, pv, newPV)
526 if err != nil {
527 return fmt.Errorf("failed to update PV with new capacity: %w", err)
528 }
529 p.pvMutationCache.Mutation(pv)
530
531 newPVC = pvc.DeepCopy()
532 mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimNodeResizePending)
533 conditions = []v1.PersistentVolumeClaimCondition{{
534 Type: v1.PersistentVolumeClaimFileSystemResizePending,
535 Status: v1.ConditionTrue,
536 LastTransitionTime: metav1.Now(),
537 }}
538 mergeResizeConditionOnPVC(newPVC, conditions, true)
539 _, err = p.patchPVCStatus(ctx, pvc, newPVC)
540 if err != nil {
541 return fmt.Errorf("failed to update PVC after resizing: %w", err)
542 }
543
544 return nil
545}
546
547func (p *csiProvisionerServer) controllerExpandVolume(pv *v1.PersistentVolume, capacity int64) error {
548 volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
549 switch ptr.Deref(pv.Spec.VolumeMode, "") {
550 case "", v1.PersistentVolumeFilesystem:
551 if err := fsquota.SetQuota(volumePath, uint64(capacity), uint64(capacity)/inodeCapacityRatio); err != nil {
552 return fmt.Errorf("failed to update quota: %w", err)
553 }
554 return nil
555 case v1.PersistentVolumeBlock:
556 imageFile, err := os.OpenFile(volumePath, os.O_RDWR, 0)
557 if err != nil {
558 return fmt.Errorf("failed to open block volume backing file: %w", err)
559 }
560 defer imageFile.Close()
561 if err := allocateBlockVolume(imageFile, capacity); err != nil {
562 return fmt.Errorf("failed to allocate space: %w", err)
563 }
564 return nil
565 default:
566 return fmt.Errorf("VolumeMode %q is unsupported", *pv.Spec.VolumeMode)
567 }
568}
569
570func allocateBlockVolume(imageFile *os.File, capacity int64) error {
571 // On XFS, fallocate is not atomic: It allocates space in steps of around
572 // 8 GB, and does not check upfront if there is enough space to satisfy the
573 // entire allocation. As the last step, if allocation succeeded, it updates
574 // the file size. This means that fallocate can fail and leave the file size
575 // unchanged, but still allocate part of the requested capacity past EOF.
576 //
577 // To clean this up, we truncate the file to its current size, which leaves
578 // the size unchanged but removes allocated space past EOF. We also do this if
579 // fallocate succeeds, in case a previous allocation has left space past EOF
580 // and was not cleaned up.
581 allocErr := unix.Fallocate(int(imageFile.Fd()), 0, 0, capacity)
582 info, err := imageFile.Stat()
583 if err != nil {
584 return err
585 }
586 err = imageFile.Truncate(info.Size())
587 if err != nil {
588 return err
589 }
590 if allocErr != nil {
591 return fmt.Errorf("fallocate: %w", allocErr)
592 }
593 return nil
594}
595
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200596// processPV looks at a single PV item from the queue and checks if it has been
597// released and needs to be deleted. If yes it deletes the associated quota,
598// directory and the PV object and logs the result to the recorder.
Jan Schärb00f7f92025-03-06 17:27:22 +0100599func (p *csiProvisionerServer) processPV(ctx context.Context, key string) error {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200600 _, name, err := cache.SplitMetaNamespaceKey(key)
601 if err != nil {
602 return fmt.Errorf("invalid resource key: %s", key)
603 }
604 pv, err := p.pvInformer.Lister().Get(name)
605 if apierrs.IsNotFound(err) {
606 return nil // nothing to do, no error
607 } else if err != nil {
608 return fmt.Errorf("failed to get PV for processing: %w", err)
609 }
610
611 if !p.isOurPV(pv) {
612 return nil
613 }
Jan Schärb00f7f92025-03-06 17:27:22 +0100614 if pv.Status.Phase == v1.VolumeBound && pv.Spec.ClaimRef != nil {
615 // Resize processing depends on both the PV and the claim. Instead of
616 // directly retrieving the claim here and calling processResize, we add it
617 // to the claimQueue. This ensures that all resize retries are handled by
618 // the claimQueue.
619 claimKey := cache.NewObjectName(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name).String()
620 p.claimQueue.Add(claimKey)
621 }
622 if pv.ObjectMeta.DeletionTimestamp != nil {
623 return nil
624 }
625 if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != v1.VolumeReleased {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200626 return nil
627 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200628 volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200629
630 // Log deletes for auditing purposes
Serge Bazanskic7359672020-10-30 16:38:57 +0100631 p.logger.Infof("Deleting persistent volume %s", pv.Spec.CSI.VolumeHandle)
Jan Schärb00f7f92025-03-06 17:27:22 +0100632 switch ptr.Deref(pv.Spec.VolumeMode, "") {
Lorenz Brun37050122021-03-30 14:00:27 +0200633 case "", v1.PersistentVolumeFilesystem:
Jan Schärb00f7f92025-03-06 17:27:22 +0100634 if err := fsquota.SetQuota(volumePath, 0, 0); err != nil && !os.IsNotExist(err) {
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200635 // We record these here manually since a successful deletion
636 // removes the PV we'd be attaching them to.
Lorenz Brun37050122021-03-30 14:00:27 +0200637 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to remove quota: %v", err)
638 return fmt.Errorf("failed to remove quota: %w", err)
639 }
Jan Schärb00f7f92025-03-06 17:27:22 +0100640 if err := os.RemoveAll(volumePath); err != nil {
Lorenz Brun37050122021-03-30 14:00:27 +0200641 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
642 return fmt.Errorf("failed to delete volume: %w", err)
643 }
644 case v1.PersistentVolumeBlock:
645 if err := os.Remove(volumePath); err != nil && !os.IsNotExist(err) {
646 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
647 return fmt.Errorf("failed to delete volume: %w", err)
648 }
649 default:
650 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Invalid volume mode \"%v\"", *pv.Spec.VolumeMode)
651 return fmt.Errorf("invalid volume mode \"%v\"", *pv.Spec.VolumeMode)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200652 }
653
Jan Schärb00f7f92025-03-06 17:27:22 +0100654 err = p.Kubernetes.CoreV1().PersistentVolumes().Delete(ctx, pv.Name, metav1.DeleteOptions{})
Lorenz Brunb15abad2020-04-16 11:17:12 +0200655 if err != nil && !apierrs.IsNotFound(err) {
656 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete PV object from K8s API: %v", err)
657 return fmt.Errorf("failed to delete PV object: %w", err)
658 }
659 return nil
660}
Jan Schärb00f7f92025-03-06 17:27:22 +0100661
662// quantityToBytes returns size rounded up to an integer amount.
663// Based on Kubernetes staging/src/k8s.io/cloud-provider/volume/helpers/rounding.go
664func quantityToBytes(size resource.Quantity) (int64, error) {
665 if size.CmpInt64(math.MaxInt64) >= 0 {
666 return 0, fmt.Errorf("quantity %s is too big, overflows int64", size.String())
667 }
668 val := size.Value()
669 if val <= 0 {
670 return 0, fmt.Errorf("invalid quantity %s, must be positive", size.String())
671 }
672 return val, nil
673}
674
675// patchPVCStatus, createPVCPatch, addResourceVersion, patchPV,
676// mergeResizeConditionOnPVC are taken from Kubernetes
677// pkg/volume/util/resize_util.go under Apache 2.0 and modified.
678
679// patchPVCStatus updates a PVC using patch instead of update. Update should not
680// be used because when the client is an older version, it will drop fields
681// which it does not support. It's done this way in both kubelet and
682// external-resizer.
683func (p *csiProvisionerServer) patchPVCStatus(
684 ctx context.Context,
685 oldPVC *v1.PersistentVolumeClaim,
686 newPVC *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
687 patchBytes, err := createPVCPatch(oldPVC, newPVC, true /* addResourceVersionCheck */)
688 if err != nil {
689 return oldPVC, fmt.Errorf("failed to create PVC patch: %w", err)
690 }
691
692 updatedClaim, updateErr := p.Kubernetes.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
693 Patch(ctx, oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
694 if updateErr != nil {
695 return oldPVC, fmt.Errorf("failed to patch PVC object: %w", updateErr)
696 }
697 p.pvcMutationCache.Mutation(updatedClaim)
698 return updatedClaim, nil
699}
700
701func createPVCPatch(
702 oldPVC *v1.PersistentVolumeClaim,
703 newPVC *v1.PersistentVolumeClaim, addResourceVersionCheck bool) ([]byte, error) {
704 oldData, err := json.Marshal(oldPVC)
705 if err != nil {
706 return nil, fmt.Errorf("failed to marshal old data: %w", err)
707 }
708
709 newData, err := json.Marshal(newPVC)
710 if err != nil {
711 return nil, fmt.Errorf("failed to marshal new data: %w", err)
712 }
713
714 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPVC)
715 if err != nil {
716 return nil, fmt.Errorf("failed to create 2 way merge patch: %w", err)
717 }
718
719 if addResourceVersionCheck {
720 patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
721 if err != nil {
722 return nil, fmt.Errorf("failed to add resource version: %w", err)
723 }
724 }
725
726 return patchBytes, nil
727}
728
729func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
730 var patchMap map[string]interface{}
731 err := json.Unmarshal(patchBytes, &patchMap)
732 if err != nil {
733 return nil, fmt.Errorf("error unmarshalling patch: %w", err)
734 }
735 u := unstructured.Unstructured{Object: patchMap}
736 u.SetResourceVersion(resourceVersion)
737 versionBytes, err := json.Marshal(patchMap)
738 if err != nil {
739 return nil, fmt.Errorf("error marshalling json patch: %w", err)
740 }
741 return versionBytes, nil
742}
743
744func patchPV(
745 ctx context.Context,
746 kubeClient kubernetes.Interface,
747 oldPV *v1.PersistentVolume,
748 newPV *v1.PersistentVolume) (*v1.PersistentVolume, error) {
749 oldData, err := json.Marshal(oldPV)
750 if err != nil {
751 return oldPV, fmt.Errorf("failed to marshal old data: %w", err)
752 }
753
754 newData, err := json.Marshal(newPV)
755 if err != nil {
756 return oldPV, fmt.Errorf("failed to marshal new data: %w", err)
757 }
758
759 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPV)
760 if err != nil {
761 return oldPV, fmt.Errorf("failed to create 2 way merge patch: %w", err)
762 }
763
764 updatedPV, err := kubeClient.CoreV1().PersistentVolumes().
765 Patch(ctx, oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
766 if err != nil {
767 return oldPV, fmt.Errorf("failed to patch PV object: %w", err)
768 }
769 return updatedPV, nil
770}
771
772var knownResizeConditions = map[v1.PersistentVolumeClaimConditionType]bool{
773 v1.PersistentVolumeClaimFileSystemResizePending: true,
774 v1.PersistentVolumeClaimResizing: true,
775 v1.PersistentVolumeClaimControllerResizeError: true,
776 v1.PersistentVolumeClaimNodeResizeError: true,
777}
778
779// mergeResizeConditionOnPVC updates pvc with requested resize conditions
780// leaving other conditions untouched.
781func mergeResizeConditionOnPVC(
782 pvc *v1.PersistentVolumeClaim,
783 resizeConditions []v1.PersistentVolumeClaimCondition,
784 keepOldResizeConditions bool) {
785 resizeConditionMap := map[v1.PersistentVolumeClaimConditionType]v1.PersistentVolumeClaimCondition{}
786 for _, condition := range resizeConditions {
787 resizeConditionMap[condition.Type] = condition
788 }
789
790 var newConditions []v1.PersistentVolumeClaimCondition
791 for _, condition := range pvc.Status.Conditions {
792 // If Condition is of not resize type, we keep it.
793 if _, ok := knownResizeConditions[condition.Type]; !ok {
794 newConditions = append(newConditions, condition)
795 continue
796 }
797
798 if newCondition, ok := resizeConditionMap[condition.Type]; ok {
799 newConditions = append(newConditions, newCondition)
800 delete(resizeConditionMap, condition.Type)
801 } else if keepOldResizeConditions {
802 newConditions = append(newConditions, condition)
803 }
804 }
805
806 for _, newCondition := range resizeConditionMap {
807 newConditions = append(newConditions, newCondition)
808 }
809 pvc.Status.Conditions = newConditions
810}
811
812// mapSet is like `(*m)[key] = value` but also initializes *m if it is nil.
813func mapSet[Map ~map[K]V, K comparable, V any](m *Map, key K, value V) {
814 if *m == nil {
815 *m = make(map[K]V)
816 }
817 (*m)[key] = value
818}