blob: cd2ed457032a1dc16a52c5c5860457e5833f9d95 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
Lorenz Brunb15abad2020-04-16 11:17:12 +02002// SPDX-License-Identifier: Apache-2.0
Lorenz Brunb15abad2020-04-16 11:17:12 +02003
4package kubernetes
5
6import (
7 "context"
Jan Schärb00f7f92025-03-06 17:27:22 +01008 "encoding/json"
Lorenz Brunb15abad2020-04-16 11:17:12 +02009 "errors"
10 "fmt"
Jan Schärb00f7f92025-03-06 17:27:22 +010011 "math"
Lorenz Brunb15abad2020-04-16 11:17:12 +020012 "os"
13 "path/filepath"
Jan Schärb00f7f92025-03-06 17:27:22 +010014 "sync"
15 "time"
Lorenz Brunb15abad2020-04-16 11:17:12 +020016
Lorenz Brun37050122021-03-30 14:00:27 +020017 "golang.org/x/sys/unix"
Lorenz Brunb15abad2020-04-16 11:17:12 +020018 v1 "k8s.io/api/core/v1"
19 storagev1 "k8s.io/api/storage/v1"
20 apierrs "k8s.io/apimachinery/pkg/api/errors"
Jan Schärb00f7f92025-03-06 17:27:22 +010021 "k8s.io/apimachinery/pkg/api/resource"
Lorenz Brunb15abad2020-04-16 11:17:12 +020022 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Jan Schärb00f7f92025-03-06 17:27:22 +010023 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
24 "k8s.io/apimachinery/pkg/types"
25 "k8s.io/apimachinery/pkg/util/strategicpatch"
Lorenz Brunb15abad2020-04-16 11:17:12 +020026 "k8s.io/client-go/informers"
27 coreinformers "k8s.io/client-go/informers/core/v1"
28 storageinformers "k8s.io/client-go/informers/storage/v1"
29 "k8s.io/client-go/kubernetes"
30 "k8s.io/client-go/kubernetes/scheme"
31 typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
32 "k8s.io/client-go/tools/cache"
33 "k8s.io/client-go/tools/record"
34 ref "k8s.io/client-go/tools/reference"
35 "k8s.io/client-go/util/workqueue"
Jan Schärb00f7f92025-03-06 17:27:22 +010036 "k8s.io/utils/ptr"
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020037
Serge Bazanski3c5d0632024-09-12 10:49:12 +000038 "source.monogon.dev/go/logging"
Serge Bazanski31370b02021-01-07 16:31:14 +010039 "source.monogon.dev/metropolis/node/core/localstorage"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020040 "source.monogon.dev/osbase/fsquota"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020041 "source.monogon.dev/osbase/supervisor"
Lorenz Brunb15abad2020-04-16 11:17:12 +020042)
43
Lorenz Brun397f7ea2024-08-20 21:26:06 +020044// inodeCapacityRatio describes the ratio between the byte capacity of a volume
45// and its inode capacity. One inode on XFS is 512 bytes and by default 25%
46// (1/4) of capacity can be used for metadata.
47const inodeCapacityRatio = 4 * 512
48
Serge Bazanski216fe7b2021-05-21 18:36:16 +020049// ONCHANGE(//metropolis/node/kubernetes/reconciler:resources_csi.go): needs to
50// match csiProvisionerServerName declared.
Serge Bazanski662b5b32020-12-21 13:49:00 +010051const csiProvisionerServerName = "dev.monogon.metropolis.vfs"
Lorenz Brunb15abad2020-04-16 11:17:12 +020052
Serge Bazanski216fe7b2021-05-21 18:36:16 +020053// csiProvisionerServer is responsible for the provisioning and deprovisioning
54// of CSI-based container volumes. It runs on all nodes and watches PVCs for
55// ones assigned to the node it's running on and fulfills the provisioning
56// request by creating a directory, applying a quota and creating the
57// corresponding PV. When the PV is released and its retention policy is
58// Delete, the directory and the PV resource are deleted.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020059type csiProvisionerServer struct {
60 NodeName string
61 Kubernetes kubernetes.Interface
62 InformerFactory informers.SharedInformerFactory
63 VolumesDirectory *localstorage.DataVolumesDirectory
64
Jan Schärb00f7f92025-03-06 17:27:22 +010065 claimQueue workqueue.TypedDelayingInterface[string]
66 claimRateLimiter workqueue.TypedRateLimiter[string]
67 claimNextTry map[string]time.Time
Jan Schär896b1382025-01-15 13:54:26 +010068 pvQueue workqueue.TypedRateLimitingInterface[string]
Lorenz Brunb15abad2020-04-16 11:17:12 +020069 recorder record.EventRecorder
70 pvcInformer coreinformers.PersistentVolumeClaimInformer
71 pvInformer coreinformers.PersistentVolumeInformer
72 storageClassInformer storageinformers.StorageClassInformer
Jan Schärb00f7f92025-03-06 17:27:22 +010073 pvcMutationCache cache.MutationCache
74 pvMutationCache cache.MutationCache
75 // processMutex ensures that the two workers (one for PVCs and one for PVs)
76 // are not doing work concurrently.
77 processMutex sync.Mutex
78 logger logging.Leveled
Lorenz Brunb15abad2020-04-16 11:17:12 +020079}
80
Serge Bazanski216fe7b2021-05-21 18:36:16 +020081// runCSIProvisioner runs the main provisioning machinery. It consists of a
82// bunch of informers which keep track of the events happening on the
83// Kubernetes control plane and informs us when something happens. If anything
84// happens to PVCs or PVs, we enqueue the identifier of that resource in a work
85// queue. Queues are being worked on by only one worker to limit load and avoid
86// complicated locking infrastructure. Failed items are requeued.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020087func (p *csiProvisionerServer) Run(ctx context.Context) error {
Serge Bazanski216fe7b2021-05-21 18:36:16 +020088 // The recorder is used to log Kubernetes events for successful or failed
89 // volume provisions. These events then show up in `kubectl describe pvc`
90 // and can be used by admins to debug issues with this provisioner.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020091 eventBroadcaster := record.NewBroadcaster()
92 eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.Kubernetes.CoreV1().Events("")})
93 p.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerServerName, Host: p.NodeName})
Lorenz Brunb15abad2020-04-16 11:17:12 +020094
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020095 p.pvcInformer = p.InformerFactory.Core().V1().PersistentVolumeClaims()
Jan Schärb00f7f92025-03-06 17:27:22 +010096 p.pvInformer = p.InformerFactory.Core().V1().PersistentVolumes()
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020097 p.storageClassInformer = p.InformerFactory.Storage().V1().StorageClasses()
Jan Schärb00f7f92025-03-06 17:27:22 +010098 p.pvcMutationCache = cache.NewIntegerResourceVersionMutationCache(p.pvcInformer.Informer().GetStore(), nil, time.Minute, false)
99 p.pvMutationCache = cache.NewIntegerResourceVersionMutationCache(p.pvInformer.Informer().GetStore(), nil, time.Minute, false)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200100
Jan Schärb00f7f92025-03-06 17:27:22 +0100101 p.claimQueue = workqueue.NewTypedDelayingQueue[string]()
102 p.claimRateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Second, 5*time.Minute)
103 p.claimNextTry = make(map[string]time.Time)
Jan Schär896b1382025-01-15 13:54:26 +0100104 p.pvQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
Lorenz Brunb15abad2020-04-16 11:17:12 +0200105
Serge Bazanskice19acc2023-03-21 16:28:07 +0100106 p.logger = supervisor.Logger(ctx)
107
108 p.pvcInformer.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
109 p.logger.Errorf("pvcInformer watch error: %v", err)
110 })
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200111 p.pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
112 AddFunc: p.enqueueClaim,
113 UpdateFunc: func(old, new interface{}) {
114 p.enqueueClaim(new)
115 },
Jan Schärb00f7f92025-03-06 17:27:22 +0100116 // We need to handle deletes to ensure that deleted keys are removed from
117 // the rate limiter, because there are cases where we leave a key in the
118 // rate limiter without scheduling a retry.
119 DeleteFunc: p.enqueueClaim,
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200120 })
121 p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
122 AddFunc: p.enqueuePV,
123 UpdateFunc: func(old, new interface{}) {
124 p.enqueuePV(new)
125 },
126 })
Serge Bazanskice19acc2023-03-21 16:28:07 +0100127 p.pvInformer.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
128 p.logger.Errorf("pvInformer watch error: %v", err)
129 })
130
131 p.storageClassInformer.Informer().SetWatchErrorHandler(func(_ *cache.Reflector, err error) {
132 p.logger.Errorf("storageClassInformer watch error: %v", err)
133 })
Lorenz Brunb15abad2020-04-16 11:17:12 +0200134
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200135 go p.pvcInformer.Informer().Run(ctx.Done())
136 go p.pvInformer.Informer().Run(ctx.Done())
137 go p.storageClassInformer.Informer().Run(ctx.Done())
Lorenz Brunb15abad2020-04-16 11:17:12 +0200138
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200139 // These will self-terminate once the queues are shut down
Jan Schärb00f7f92025-03-06 17:27:22 +0100140 go p.processQueueItems(p.claimQueue, func(key string) {
141 p.processPVCRetryWrapper(ctx, key)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200142 })
Jan Schärb00f7f92025-03-06 17:27:22 +0100143 go p.processQueueItems(p.pvQueue, func(key string) {
144 p.processPVRetryWrapper(ctx, key)
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200145 })
Lorenz Brunb15abad2020-04-16 11:17:12 +0200146
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200147 supervisor.Signal(ctx, supervisor.SignalHealthy)
148 <-ctx.Done()
149 p.claimQueue.ShutDown()
150 p.pvQueue.ShutDown()
151 return nil
Lorenz Brunb15abad2020-04-16 11:17:12 +0200152}
153
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200154// isOurPVC checks if the given PVC is is to be provisioned by this provisioner
155// and has been scheduled onto this node
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200156func (p *csiProvisionerServer) isOurPVC(pvc *v1.PersistentVolumeClaim) bool {
157 if pvc.ObjectMeta.Annotations["volume.beta.kubernetes.io/storage-provisioner"] != csiProvisionerServerName {
158 return false
159 }
160 if pvc.ObjectMeta.Annotations["volume.kubernetes.io/selected-node"] != p.NodeName {
161 return false
162 }
163 return true
Lorenz Brunb15abad2020-04-16 11:17:12 +0200164}
165
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200166// isOurPV checks if the given PV has been provisioned by this provisioner and
167// has been scheduled onto this node
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200168func (p *csiProvisionerServer) isOurPV(pv *v1.PersistentVolume) bool {
Jan Schärb00f7f92025-03-06 17:27:22 +0100169 if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != csiProvisionerServerName {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200170 return false
171 }
172 if pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] != p.NodeName {
173 return false
174 }
175 return true
Lorenz Brunb15abad2020-04-16 11:17:12 +0200176}
177
178// enqueueClaim adds an added/changed PVC to the work queue
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200179func (p *csiProvisionerServer) enqueueClaim(obj interface{}) {
Jan Schärb00f7f92025-03-06 17:27:22 +0100180 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200181 if err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100182 p.logger.Errorf("Not queuing PVC because key could not be derived: %v", err)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200183 return
184 }
185 p.claimQueue.Add(key)
186}
187
188// enqueuePV adds an added/changed PV to the work queue
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200189func (p *csiProvisionerServer) enqueuePV(obj interface{}) {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200190 key, err := cache.MetaNamespaceKeyFunc(obj)
191 if err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100192 p.logger.Errorf("Not queuing PV because key could not be derived: %v", err)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200193 return
194 }
195 p.pvQueue.Add(key)
196}
197
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200198// processQueueItems gets items from the given work queue and calls the process
199// function for each of them. It self- terminates once the queue is shut down.
Jan Schärb00f7f92025-03-06 17:27:22 +0100200func (p *csiProvisionerServer) processQueueItems(queue workqueue.TypedInterface[string], process func(key string)) {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200201 for {
202 obj, shutdown := queue.Get()
203 if shutdown {
204 return
205 }
206
Jan Schär896b1382025-01-15 13:54:26 +0100207 func(obj string) {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200208 defer queue.Done(obj)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200209
Jan Schärb00f7f92025-03-06 17:27:22 +0100210 p.processMutex.Lock()
211 defer p.processMutex.Unlock()
212
213 process(obj)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200214 }(obj)
215 }
216}
217
Jan Schärb00f7f92025-03-06 17:27:22 +0100218var errSkipRateLimitReset = errors.New("skip ratelimit reset")
219
220func (p *csiProvisionerServer) processPVCRetryWrapper(ctx context.Context, key string) {
221 err := p.processPVC(ctx, key)
222 if errors.Is(err, errSkipRateLimitReset) {
223 // ignore
224 } else if err != nil {
225 p.logger.Warningf("Failed processing PVC %s, requeueing (numrequeues: %d): %v", key, p.claimRateLimiter.NumRequeues(key), err)
226 duration := p.claimRateLimiter.When(key)
227 p.claimNextTry[key] = time.Now().Add(duration)
228 p.claimQueue.AddAfter(key, duration)
229 } else {
230 p.claimRateLimiter.Forget(key)
231 delete(p.claimNextTry, key)
232 }
233}
234
235func (p *csiProvisionerServer) processPVRetryWrapper(ctx context.Context, key string) {
236 if err := p.processPV(ctx, key); err != nil {
237 p.logger.Warningf("Failed processing PV %s, requeueing (numrequeues: %d): %v", key, p.pvQueue.NumRequeues(key), err)
238 p.pvQueue.AddRateLimited(key)
239 } else {
240 p.pvQueue.Forget(key)
241 }
242}
243
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200244// volumePath gets the path where the volume is stored.
245func (p *csiProvisionerServer) volumePath(volumeID string) string {
246 return filepath.Join(p.VolumesDirectory.FullPath(), volumeID)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200247}
248
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200249// processPVC looks at a single PVC item from the queue, determines if it needs
250// to be provisioned and logs the provisioning result to the recorder
Jan Schärb00f7f92025-03-06 17:27:22 +0100251func (p *csiProvisionerServer) processPVC(ctx context.Context, key string) error {
252 val, exists, err := p.pvcMutationCache.GetByKey(key)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200253 if err != nil {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200254 return fmt.Errorf("failed to get PVC for processing: %w", err)
255 }
Jan Schärb00f7f92025-03-06 17:27:22 +0100256 if !exists {
257 return nil // nothing to do, no error
258 }
259 pvc, ok := val.(*v1.PersistentVolumeClaim)
260 if !ok {
261 return fmt.Errorf("value in MutationCache is not a PVC: %+v", val)
262 }
Lorenz Brunb15abad2020-04-16 11:17:12 +0200263
264 if !p.isOurPVC(pvc) {
265 return nil
266 }
267
Jan Schärb00f7f92025-03-06 17:27:22 +0100268 if pvc.Spec.VolumeName == "" {
269 // The claim is pending, so we may need to provision it.
270 storageClass, err := p.storageClassInformer.Lister().Get(*pvc.Spec.StorageClassName)
271 if err != nil {
272 return fmt.Errorf("could not get storage class: %w", err)
273 }
274
275 if storageClass.Provisioner != csiProvisionerServerName {
276 // We're not responsible for this PVC. Can only happen if
277 // controller-manager makes a mistake setting the annotations, but
278 // we're bailing here anyways for safety.
279 return nil
280 }
281
282 err = p.provisionPVC(ctx, pvc, storageClass)
283
284 if err != nil {
285 p.recorder.Eventf(pvc, v1.EventTypeWarning, "ProvisioningFailed", "Failed to provision PV: %v", err)
286 return err
287 }
288 } else if pvc.Status.Phase == v1.ClaimBound {
289 // The claim is bound, so we may need to resize it.
290 requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
291 statusSize := pvc.Status.Capacity[v1.ResourceStorage]
292 if requestSize.Cmp(statusSize) <= 0 {
293 // No resize needed.
294 return nil
295 }
296
297 val, exists, err := p.pvMutationCache.GetByKey(pvc.Spec.VolumeName)
298 if err != nil {
299 return fmt.Errorf("failed to get PV of PVC %s: %w", key, err)
300 }
301 if !exists {
302 return nil
303 }
304 pv, ok := val.(*v1.PersistentVolume)
305 if !ok {
306 return fmt.Errorf("value in MutationCache is not a PV: %+v", val)
307 }
308 if pv.Status.Phase != v1.VolumeBound || pv.Spec.ClaimRef == nil || pv.Spec.ClaimRef.UID != pvc.UID {
309 return nil
310 }
311 if !p.isOurPV(pv) {
312 return nil
313 }
314
315 err = p.processResize(ctx, pvc, pv)
316 if errors.Is(err, errSkipRateLimitReset) {
317 return err
318 } else if err != nil {
319 p.recorder.Eventf(pvc, v1.EventTypeWarning, "VolumeResizeFailed", "Failed to resize PV: %v", err)
320 return fmt.Errorf("failed to process resize of PVC %s: %w", key, err)
321 }
Lorenz Brunb15abad2020-04-16 11:17:12 +0200322 }
Lorenz Brunb15abad2020-04-16 11:17:12 +0200323 return nil
324}
325
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200326// provisionPVC creates the directory where the volume lives, sets a quota for
327// the requested amount of storage and creates the PV object representing this
328// new volume
Jan Schärb00f7f92025-03-06 17:27:22 +0100329func (p *csiProvisionerServer) provisionPVC(ctx context.Context, pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
330 key := cache.MetaObjectToName(pvc).String()
Lorenz Brunb15abad2020-04-16 11:17:12 +0200331 claimRef, err := ref.GetReference(scheme.Scheme, pvc)
332 if err != nil {
333 return fmt.Errorf("failed to get reference to PVC: %w", err)
334 }
335
336 storageReq := pvc.Spec.Resources.Requests[v1.ResourceStorage]
Jan Schärb00f7f92025-03-06 17:27:22 +0100337 capacity, err := quantityToBytes(storageReq)
338 if err != nil {
339 return err
Lorenz Brunb15abad2020-04-16 11:17:12 +0200340 }
Jan Schärb00f7f92025-03-06 17:27:22 +0100341 newSize := *resource.NewQuantity(capacity, resource.BinarySI)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200342
Lorenz Brunb15abad2020-04-16 11:17:12 +0200343 volumeID := "pvc-" + string(pvc.ObjectMeta.UID)
Jan Schärb00f7f92025-03-06 17:27:22 +0100344 if _, err := p.pvInformer.Lister().Get(volumeID); err == nil {
345 return nil // Volume already exists.
346 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200347 volumePath := p.volumePath(volumeID)
Jan Schärb00f7f92025-03-06 17:27:22 +0100348 volumeMode := ptr.Deref(pvc.Spec.VolumeMode, "")
349 if volumeMode == "" {
350 volumeMode = v1.PersistentVolumeFilesystem
351 }
Lorenz Brunb15abad2020-04-16 11:17:12 +0200352
Jan Schärb00f7f92025-03-06 17:27:22 +0100353 p.logger.Infof("Creating persistent volume %s with mode %s and size %s for claim %s", volumeID, volumeMode, newSize.String(), key)
Lorenz Brun37050122021-03-30 14:00:27 +0200354
Jan Schärb00f7f92025-03-06 17:27:22 +0100355 switch volumeMode {
356 case v1.PersistentVolumeFilesystem:
Lorenz Brun37050122021-03-30 14:00:27 +0200357 if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
358 return fmt.Errorf("failed to create volume directory: %w", err)
359 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100360 files, err := os.ReadDir(volumePath)
Lorenz Brun37050122021-03-30 14:00:27 +0200361 if err != nil {
362 return fmt.Errorf("failed to list files in newly-created volume: %w", err)
363 }
364 if len(files) > 0 {
365 return errors.New("newly-created volume already contains data, bailing")
366 }
Lorenz Brun397f7ea2024-08-20 21:26:06 +0200367 if err := fsquota.SetQuota(volumePath, uint64(capacity), uint64(capacity)/inodeCapacityRatio); err != nil {
Serge Bazanskice19acc2023-03-21 16:28:07 +0100368 return fmt.Errorf("failed to update quota: %w", err)
Lorenz Brun37050122021-03-30 14:00:27 +0200369 }
370 case v1.PersistentVolumeBlock:
371 imageFile, err := os.OpenFile(volumePath, os.O_CREATE|os.O_RDWR, 0644)
372 if err != nil {
373 return fmt.Errorf("failed to create volume image: %w", err)
374 }
375 defer imageFile.Close()
Jan Schärb00f7f92025-03-06 17:27:22 +0100376 if err := allocateBlockVolume(imageFile, capacity); err != nil {
377 return fmt.Errorf("failed to allocate volume image: %w", err)
Lorenz Brun37050122021-03-30 14:00:27 +0200378 }
379 default:
Jan Schärb00f7f92025-03-06 17:27:22 +0100380 return fmt.Errorf("VolumeMode %q is unsupported", *pvc.Spec.VolumeMode)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200381 }
382
383 vol := &v1.PersistentVolume{
384 ObjectMeta: metav1.ObjectMeta{
385 Name: volumeID,
386 Annotations: map[string]string{
Jan Schärb00f7f92025-03-06 17:27:22 +0100387 "pv.kubernetes.io/provisioned-by": csiProvisionerServerName,
388 },
Lorenz Brunb15abad2020-04-16 11:17:12 +0200389 },
390 Spec: v1.PersistentVolumeSpec{
391 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
392 Capacity: v1.ResourceList{
Jan Schärb00f7f92025-03-06 17:27:22 +0100393 v1.ResourceStorage: newSize,
Lorenz Brunb15abad2020-04-16 11:17:12 +0200394 },
395 PersistentVolumeSource: v1.PersistentVolumeSource{
396 CSI: &v1.CSIPersistentVolumeSource{
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200397 Driver: csiProvisionerServerName,
Lorenz Brunb15abad2020-04-16 11:17:12 +0200398 VolumeHandle: volumeID,
399 },
400 },
Lorenz Brun37050122021-03-30 14:00:27 +0200401 ClaimRef: claimRef,
402 VolumeMode: pvc.Spec.VolumeMode,
Lorenz Brunb15abad2020-04-16 11:17:12 +0200403 NodeAffinity: &v1.VolumeNodeAffinity{
404 Required: &v1.NodeSelector{
405 NodeSelectorTerms: []v1.NodeSelectorTerm{
406 {
407 MatchExpressions: []v1.NodeSelectorRequirement{
408 {
409 Key: "kubernetes.io/hostname",
410 Operator: v1.NodeSelectorOpIn,
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200411 Values: []string{p.NodeName},
Lorenz Brunb15abad2020-04-16 11:17:12 +0200412 },
413 },
414 },
415 },
416 },
417 },
418 StorageClassName: *pvc.Spec.StorageClassName,
419 PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,
420 },
421 }
422
Jan Schärb00f7f92025-03-06 17:27:22 +0100423 _, err = p.Kubernetes.CoreV1().PersistentVolumes().Create(ctx, vol, metav1.CreateOptions{})
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200424 if err != nil && !apierrs.IsAlreadyExists(err) {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200425 return fmt.Errorf("failed to create PV object: %w", err)
426 }
427 return nil
428}
429
Jan Schärb00f7f92025-03-06 17:27:22 +0100430// See https://github.com/kubernetes-csi/external-resizer/blob/master/pkg/controller/expand_and_recover.go
431func (p *csiProvisionerServer) processResize(ctx context.Context, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
432 key := cache.MetaObjectToName(pvc).String()
433 requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
434 allocatedSize, hasAllocatedSize := pvc.Status.AllocatedResources[v1.ResourceStorage]
435 pvSize := pv.Spec.Capacity[v1.ResourceStorage]
436 resizeStatus := pvc.Status.AllocatedResourceStatuses[v1.ResourceStorage]
437
438 newSize := requestSize
439 if hasAllocatedSize {
440 // Usually, we want to keep resizing to the same target size once we have
441 // picked one and ignore changes in request size.
442 newSize = allocatedSize
443 }
444 switch resizeStatus {
445 case v1.PersistentVolumeClaimNodeResizePending,
446 v1.PersistentVolumeClaimNodeResizeInProgress:
447 // We are waiting for node resize. The PV should be large enough at this
448 // point, which means we don't need to do anything here.
449 if pvSize.Cmp(newSize) >= 0 || pvSize.Cmp(requestSize) >= 0 {
450 // We don't need to do anything and don't need to schedule a retry, but we
451 // still don't want to reset the rate limiter in case the node resize
452 // fails repeatedly.
453 return errSkipRateLimitReset
454 }
455 case "", v1.PersistentVolumeClaimControllerResizeInfeasible:
456 // In this case, there is no ongoing or partially complete resize operation,
457 // and we can be sure that the actually allocated size is equal to pvSize.
458 // That means it's safe to pick a new target size.
459 if pvSize.Cmp(requestSize) < 0 {
460 newSize = requestSize
461 }
462 }
463 capacity, err := quantityToBytes(newSize)
464 if err != nil {
465 return err
466 }
467
468 keepConditions := false
469 if hasAllocatedSize && allocatedSize.Cmp(newSize) == 0 {
470 now := time.Now()
471 if p.claimNextTry[key].After(now) {
472 // Not enough time has passed since the last attempt and the target size
473 // is still the same.
474 p.claimQueue.AddAfter(key, p.claimNextTry[key].Sub(now))
475 return errSkipRateLimitReset
476 }
477 keepConditions = true
478 }
479
480 newPVC := pvc.DeepCopy()
481 mapSet(&newPVC.Status.AllocatedResources, v1.ResourceStorage, newSize)
482 mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimControllerResizeInProgress)
483 conditions := []v1.PersistentVolumeClaimCondition{{
484 Type: v1.PersistentVolumeClaimResizing,
485 Status: v1.ConditionTrue,
486 LastTransitionTime: metav1.Now(),
487 }}
488 mergeResizeConditionOnPVC(newPVC, conditions, keepConditions)
489 pvc, err = p.patchPVCStatus(ctx, pvc, newPVC)
490 if err != nil {
491 return fmt.Errorf("failed to update PVC before resizing: %w", err)
492 }
493
494 expandedSize := *resource.NewQuantity(capacity, resource.BinarySI)
495 p.logger.Infof("Resizing persistent volume %s to new size %s", pv.Spec.CSI.VolumeHandle, expandedSize.String())
496 err = p.controllerExpandVolume(pv, capacity)
497 if err != nil {
498 // If the resize fails because the requested size is too large, then set
499 // status to infeasible, which allows the user to change the request to a
500 // smaller size.
501 isInfeasible := errors.Is(err, unix.ENOSPC) || errors.Is(err, unix.EDQUOT) || errors.Is(err, unix.EFBIG) || errors.Is(err, unix.EINVAL)
502 newPVC = pvc.DeepCopy()
503 if isInfeasible {
504 mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimControllerResizeInfeasible)
505 }
506 conditions = []v1.PersistentVolumeClaimCondition{{
507 Type: v1.PersistentVolumeClaimControllerResizeError,
508 Status: v1.ConditionTrue,
509 LastTransitionTime: metav1.Now(),
510 Message: fmt.Sprintf("Failed to expand PV: %v", err),
511 }}
512 mergeResizeConditionOnPVC(newPVC, conditions, true)
513 _, patchErr := p.patchPVCStatus(ctx, pvc, newPVC)
514 if patchErr != nil {
515 return fmt.Errorf("failed to update PVC after resizing: %w", patchErr)
516 }
517 return fmt.Errorf("failed to expand PV: %w", err)
518 }
519
520 newPV := pv.DeepCopy()
521 newPV.Spec.Capacity[v1.ResourceStorage] = expandedSize
522 pv, err = patchPV(ctx, p.Kubernetes, pv, newPV)
523 if err != nil {
524 return fmt.Errorf("failed to update PV with new capacity: %w", err)
525 }
526 p.pvMutationCache.Mutation(pv)
527
528 newPVC = pvc.DeepCopy()
529 mapSet(&newPVC.Status.AllocatedResourceStatuses, v1.ResourceStorage, v1.PersistentVolumeClaimNodeResizePending)
530 conditions = []v1.PersistentVolumeClaimCondition{{
531 Type: v1.PersistentVolumeClaimFileSystemResizePending,
532 Status: v1.ConditionTrue,
533 LastTransitionTime: metav1.Now(),
534 }}
535 mergeResizeConditionOnPVC(newPVC, conditions, true)
536 _, err = p.patchPVCStatus(ctx, pvc, newPVC)
537 if err != nil {
538 return fmt.Errorf("failed to update PVC after resizing: %w", err)
539 }
540
541 return nil
542}
543
544func (p *csiProvisionerServer) controllerExpandVolume(pv *v1.PersistentVolume, capacity int64) error {
545 volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
546 switch ptr.Deref(pv.Spec.VolumeMode, "") {
547 case "", v1.PersistentVolumeFilesystem:
548 if err := fsquota.SetQuota(volumePath, uint64(capacity), uint64(capacity)/inodeCapacityRatio); err != nil {
549 return fmt.Errorf("failed to update quota: %w", err)
550 }
551 return nil
552 case v1.PersistentVolumeBlock:
553 imageFile, err := os.OpenFile(volumePath, os.O_RDWR, 0)
554 if err != nil {
555 return fmt.Errorf("failed to open block volume backing file: %w", err)
556 }
557 defer imageFile.Close()
558 if err := allocateBlockVolume(imageFile, capacity); err != nil {
559 return fmt.Errorf("failed to allocate space: %w", err)
560 }
561 return nil
562 default:
563 return fmt.Errorf("VolumeMode %q is unsupported", *pv.Spec.VolumeMode)
564 }
565}
566
567func allocateBlockVolume(imageFile *os.File, capacity int64) error {
568 // On XFS, fallocate is not atomic: It allocates space in steps of around
569 // 8 GB, and does not check upfront if there is enough space to satisfy the
570 // entire allocation. As the last step, if allocation succeeded, it updates
571 // the file size. This means that fallocate can fail and leave the file size
572 // unchanged, but still allocate part of the requested capacity past EOF.
573 //
574 // To clean this up, we truncate the file to its current size, which leaves
575 // the size unchanged but removes allocated space past EOF. We also do this if
576 // fallocate succeeds, in case a previous allocation has left space past EOF
577 // and was not cleaned up.
578 allocErr := unix.Fallocate(int(imageFile.Fd()), 0, 0, capacity)
579 info, err := imageFile.Stat()
580 if err != nil {
581 return err
582 }
583 err = imageFile.Truncate(info.Size())
584 if err != nil {
585 return err
586 }
587 if allocErr != nil {
588 return fmt.Errorf("fallocate: %w", allocErr)
589 }
590 return nil
591}
592
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200593// processPV looks at a single PV item from the queue and checks if it has been
594// released and needs to be deleted. If yes it deletes the associated quota,
595// directory and the PV object and logs the result to the recorder.
Jan Schärb00f7f92025-03-06 17:27:22 +0100596func (p *csiProvisionerServer) processPV(ctx context.Context, key string) error {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200597 _, name, err := cache.SplitMetaNamespaceKey(key)
598 if err != nil {
599 return fmt.Errorf("invalid resource key: %s", key)
600 }
601 pv, err := p.pvInformer.Lister().Get(name)
602 if apierrs.IsNotFound(err) {
603 return nil // nothing to do, no error
604 } else if err != nil {
605 return fmt.Errorf("failed to get PV for processing: %w", err)
606 }
607
608 if !p.isOurPV(pv) {
609 return nil
610 }
Jan Schärb00f7f92025-03-06 17:27:22 +0100611 if pv.Status.Phase == v1.VolumeBound && pv.Spec.ClaimRef != nil {
612 // Resize processing depends on both the PV and the claim. Instead of
613 // directly retrieving the claim here and calling processResize, we add it
614 // to the claimQueue. This ensures that all resize retries are handled by
615 // the claimQueue.
616 claimKey := cache.NewObjectName(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name).String()
617 p.claimQueue.Add(claimKey)
618 }
619 if pv.ObjectMeta.DeletionTimestamp != nil {
620 return nil
621 }
622 if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != v1.VolumeReleased {
Lorenz Brunb15abad2020-04-16 11:17:12 +0200623 return nil
624 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200625 volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200626
627 // Log deletes for auditing purposes
Serge Bazanskic7359672020-10-30 16:38:57 +0100628 p.logger.Infof("Deleting persistent volume %s", pv.Spec.CSI.VolumeHandle)
Jan Schärb00f7f92025-03-06 17:27:22 +0100629 switch ptr.Deref(pv.Spec.VolumeMode, "") {
Lorenz Brun37050122021-03-30 14:00:27 +0200630 case "", v1.PersistentVolumeFilesystem:
Jan Schärb00f7f92025-03-06 17:27:22 +0100631 if err := fsquota.SetQuota(volumePath, 0, 0); err != nil && !os.IsNotExist(err) {
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200632 // We record these here manually since a successful deletion
633 // removes the PV we'd be attaching them to.
Lorenz Brun37050122021-03-30 14:00:27 +0200634 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to remove quota: %v", err)
635 return fmt.Errorf("failed to remove quota: %w", err)
636 }
Jan Schärb00f7f92025-03-06 17:27:22 +0100637 if err := os.RemoveAll(volumePath); err != nil {
Lorenz Brun37050122021-03-30 14:00:27 +0200638 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
639 return fmt.Errorf("failed to delete volume: %w", err)
640 }
641 case v1.PersistentVolumeBlock:
642 if err := os.Remove(volumePath); err != nil && !os.IsNotExist(err) {
643 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
644 return fmt.Errorf("failed to delete volume: %w", err)
645 }
646 default:
647 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Invalid volume mode \"%v\"", *pv.Spec.VolumeMode)
648 return fmt.Errorf("invalid volume mode \"%v\"", *pv.Spec.VolumeMode)
Lorenz Brunb15abad2020-04-16 11:17:12 +0200649 }
650
Jan Schärb00f7f92025-03-06 17:27:22 +0100651 err = p.Kubernetes.CoreV1().PersistentVolumes().Delete(ctx, pv.Name, metav1.DeleteOptions{})
Lorenz Brunb15abad2020-04-16 11:17:12 +0200652 if err != nil && !apierrs.IsNotFound(err) {
653 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete PV object from K8s API: %v", err)
654 return fmt.Errorf("failed to delete PV object: %w", err)
655 }
656 return nil
657}
Jan Schärb00f7f92025-03-06 17:27:22 +0100658
659// quantityToBytes returns size rounded up to an integer amount.
660// Based on Kubernetes staging/src/k8s.io/cloud-provider/volume/helpers/rounding.go
661func quantityToBytes(size resource.Quantity) (int64, error) {
662 if size.CmpInt64(math.MaxInt64) >= 0 {
663 return 0, fmt.Errorf("quantity %s is too big, overflows int64", size.String())
664 }
665 val := size.Value()
666 if val <= 0 {
667 return 0, fmt.Errorf("invalid quantity %s, must be positive", size.String())
668 }
669 return val, nil
670}
671
672// patchPVCStatus, createPVCPatch, addResourceVersion, patchPV,
673// mergeResizeConditionOnPVC are taken from Kubernetes
674// pkg/volume/util/resize_util.go under Apache 2.0 and modified.
675
676// patchPVCStatus updates a PVC using patch instead of update. Update should not
677// be used because when the client is an older version, it will drop fields
678// which it does not support. It's done this way in both kubelet and
679// external-resizer.
680func (p *csiProvisionerServer) patchPVCStatus(
681 ctx context.Context,
682 oldPVC *v1.PersistentVolumeClaim,
683 newPVC *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
684 patchBytes, err := createPVCPatch(oldPVC, newPVC, true /* addResourceVersionCheck */)
685 if err != nil {
686 return oldPVC, fmt.Errorf("failed to create PVC patch: %w", err)
687 }
688
689 updatedClaim, updateErr := p.Kubernetes.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
690 Patch(ctx, oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
691 if updateErr != nil {
692 return oldPVC, fmt.Errorf("failed to patch PVC object: %w", updateErr)
693 }
694 p.pvcMutationCache.Mutation(updatedClaim)
695 return updatedClaim, nil
696}
697
698func createPVCPatch(
699 oldPVC *v1.PersistentVolumeClaim,
700 newPVC *v1.PersistentVolumeClaim, addResourceVersionCheck bool) ([]byte, error) {
701 oldData, err := json.Marshal(oldPVC)
702 if err != nil {
703 return nil, fmt.Errorf("failed to marshal old data: %w", err)
704 }
705
706 newData, err := json.Marshal(newPVC)
707 if err != nil {
708 return nil, fmt.Errorf("failed to marshal new data: %w", err)
709 }
710
711 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPVC)
712 if err != nil {
713 return nil, fmt.Errorf("failed to create 2 way merge patch: %w", err)
714 }
715
716 if addResourceVersionCheck {
717 patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
718 if err != nil {
719 return nil, fmt.Errorf("failed to add resource version: %w", err)
720 }
721 }
722
723 return patchBytes, nil
724}
725
726func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
727 var patchMap map[string]interface{}
728 err := json.Unmarshal(patchBytes, &patchMap)
729 if err != nil {
730 return nil, fmt.Errorf("error unmarshalling patch: %w", err)
731 }
732 u := unstructured.Unstructured{Object: patchMap}
733 u.SetResourceVersion(resourceVersion)
734 versionBytes, err := json.Marshal(patchMap)
735 if err != nil {
736 return nil, fmt.Errorf("error marshalling json patch: %w", err)
737 }
738 return versionBytes, nil
739}
740
741func patchPV(
742 ctx context.Context,
743 kubeClient kubernetes.Interface,
744 oldPV *v1.PersistentVolume,
745 newPV *v1.PersistentVolume) (*v1.PersistentVolume, error) {
746 oldData, err := json.Marshal(oldPV)
747 if err != nil {
748 return oldPV, fmt.Errorf("failed to marshal old data: %w", err)
749 }
750
751 newData, err := json.Marshal(newPV)
752 if err != nil {
753 return oldPV, fmt.Errorf("failed to marshal new data: %w", err)
754 }
755
756 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPV)
757 if err != nil {
758 return oldPV, fmt.Errorf("failed to create 2 way merge patch: %w", err)
759 }
760
761 updatedPV, err := kubeClient.CoreV1().PersistentVolumes().
762 Patch(ctx, oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
763 if err != nil {
764 return oldPV, fmt.Errorf("failed to patch PV object: %w", err)
765 }
766 return updatedPV, nil
767}
768
769var knownResizeConditions = map[v1.PersistentVolumeClaimConditionType]bool{
770 v1.PersistentVolumeClaimFileSystemResizePending: true,
771 v1.PersistentVolumeClaimResizing: true,
772 v1.PersistentVolumeClaimControllerResizeError: true,
773 v1.PersistentVolumeClaimNodeResizeError: true,
774}
775
776// mergeResizeConditionOnPVC updates pvc with requested resize conditions
777// leaving other conditions untouched.
778func mergeResizeConditionOnPVC(
779 pvc *v1.PersistentVolumeClaim,
780 resizeConditions []v1.PersistentVolumeClaimCondition,
781 keepOldResizeConditions bool) {
782 resizeConditionMap := map[v1.PersistentVolumeClaimConditionType]v1.PersistentVolumeClaimCondition{}
783 for _, condition := range resizeConditions {
784 resizeConditionMap[condition.Type] = condition
785 }
786
787 var newConditions []v1.PersistentVolumeClaimCondition
788 for _, condition := range pvc.Status.Conditions {
789 // If Condition is of not resize type, we keep it.
790 if _, ok := knownResizeConditions[condition.Type]; !ok {
791 newConditions = append(newConditions, condition)
792 continue
793 }
794
795 if newCondition, ok := resizeConditionMap[condition.Type]; ok {
796 newConditions = append(newConditions, newCondition)
797 delete(resizeConditionMap, condition.Type)
798 } else if keepOldResizeConditions {
799 newConditions = append(newConditions, condition)
800 }
801 }
802
803 for _, newCondition := range resizeConditionMap {
804 newConditions = append(newConditions, newCondition)
805 }
806 pvc.Status.Conditions = newConditions
807}
808
809// mapSet is like `(*m)[key] = value` but also initializes *m if it is nil.
810func mapSet[Map ~map[K]V, K comparable, V any](m *Map, key K, value V) {
811 if *m == nil {
812 *m = make(map[K]V)
813 }
814 (*m)[key] = value
815}