blob: c227fdf72560bb93d415827fbde8ac5ba079060e [file] [log] [blame]
Lorenz Brunb15abad2020-04-16 11:17:12 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package kubernetes
18
19import (
20 "context"
21 "errors"
22 "fmt"
23 "io/ioutil"
24 "os"
25 "path/filepath"
26
27 "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
28
29 "go.uber.org/zap"
30
31 "git.monogon.dev/source/nexantic.git/core/internal/storage"
32 "git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
33
34 v1 "k8s.io/api/core/v1"
35 storagev1 "k8s.io/api/storage/v1"
36 apierrs "k8s.io/apimachinery/pkg/api/errors"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/client-go/informers"
39 coreinformers "k8s.io/client-go/informers/core/v1"
40 storageinformers "k8s.io/client-go/informers/storage/v1"
41 "k8s.io/client-go/kubernetes"
42 "k8s.io/client-go/kubernetes/scheme"
43 typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
44 "k8s.io/client-go/tools/cache"
45 "k8s.io/client-go/tools/record"
46 ref "k8s.io/client-go/tools/reference"
47 "k8s.io/client-go/util/workqueue"
48)
49
Serge Bazanskie6030f62020-06-03 17:52:59 +020050// ONCHANGE(//core/internal/kubernetes/reconciler:resources_csi.go): needs to match csiProvisionerName declared.
Lorenz Brunb15abad2020-04-16 11:17:12 +020051const csiProvisionerName = "com.nexantic.smalltown.vfs"
52
53// csiProvisioner is responsible for the provisioning and deprovisioning of CSI-based container volumes. It runs on all
54// nodes and watches PVCs for ones assigned to the node it's running on and fulfills the provisioning request by
55// creating a directory, applying a quota and creating the corresponding PV. When the PV is released and its retention
56// policy is Delete, the directory and the PV resource are deleted.
57type csiProvisioner struct {
58 nodeName string
59 kubeclientset kubernetes.Interface
60 claimQueue workqueue.RateLimitingInterface
61 pvQueue workqueue.RateLimitingInterface
62 recorder record.EventRecorder
63 pvcInformer coreinformers.PersistentVolumeClaimInformer
64 pvInformer coreinformers.PersistentVolumeInformer
65 storageClassInformer storageinformers.StorageClassInformer
66 storageManager *storage.Manager
67 logger *zap.Logger
68}
69
70// runCSIProvisioner runs the main provisioning machinery. It consists of a bunch of informers which keep track of
71// the events happening on the Kubernetes control plane and informs us when something happens. If anything happens to
72// PVCs or PVs, we enqueue the identifier of that resource in a work queue. Queues are being worked on by only one
73// worker to limit load and avoid complicated locking infrastructure. Failed items are requeued.
74func runCSIProvisioner(storMan *storage.Manager, kubeclientset kubernetes.Interface, informerFactory informers.SharedInformerFactory) supervisor.Runnable {
75 return func(ctx context.Context) error {
76 nodeName, err := os.Hostname()
77 if err != nil {
78 panic(err)
79 }
80
81 // The recorder is used to log Kubernetes events for successful or failed volume provisions. These events then
82 // show up in `kubectl describe pvc` and can be used by admins to debug issues with this provisioner.
83 eventBroadcaster := record.NewBroadcaster()
84 eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
85 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerName, Host: nodeName})
86
87 p := &csiProvisioner{
88 nodeName: nodeName,
89 recorder: recorder,
90 kubeclientset: kubeclientset,
91 pvInformer: informerFactory.Core().V1().PersistentVolumes(),
92 pvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
93 storageClassInformer: informerFactory.Storage().V1().StorageClasses(),
94 claimQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
95 pvQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
96 storageManager: storMan,
97 logger: supervisor.Logger(ctx),
98 }
99
100 p.pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
101 AddFunc: p.enqueueClaim,
102 UpdateFunc: func(old, new interface{}) {
103 p.enqueueClaim(new)
104 },
105 })
106 p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
107 AddFunc: p.enqueuePV,
108 UpdateFunc: func(old, new interface{}) {
109 p.enqueuePV(new)
110 },
111 })
112
113 go p.pvcInformer.Informer().Run(ctx.Done())
114 go p.pvInformer.Informer().Run(ctx.Done())
115 go p.storageClassInformer.Informer().Run(ctx.Done())
116
117 // These will self-terminate once the queues are shut down
118 go p.processQueueItems(p.claimQueue, func(key string) error {
119 return p.processPVC(key)
120 })
121 go p.processQueueItems(p.pvQueue, func(key string) error {
122 return p.processPV(key)
123 })
124
125 supervisor.Signal(ctx, supervisor.SignalHealthy)
126 <-ctx.Done()
127 p.claimQueue.ShutDown()
128 p.pvQueue.ShutDown()
129 return nil
130 }
131}
132
133// isOurPVC checks if the given PVC is is to be provisioned by this provisioner and has been scheduled onto this node
134func (p *csiProvisioner) isOurPVC(pvc *v1.PersistentVolumeClaim) bool {
135 return pvc.ObjectMeta.Annotations["volume.beta.kubernetes.io/storage-provisioner"] == csiProvisionerName &&
136 (pvc.ObjectMeta.Annotations["volume.kubernetes.io/selected-node"] == p.nodeName)
137}
138
139// isOurPV checks if the given PV has been provisioned by this provisioner and has been scheduled onto this node
140func (p *csiProvisioner) isOurPV(pv *v1.PersistentVolume) bool {
141 return pv.ObjectMeta.Annotations["pv.kubernetes.io/provisioned-by"] == csiProvisionerName &&
142 pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] == p.nodeName
143}
144
145// enqueueClaim adds an added/changed PVC to the work queue
146func (p *csiProvisioner) enqueueClaim(obj interface{}) {
147 key, err := cache.MetaNamespaceKeyFunc(obj)
148 if err != nil {
149 p.logger.Error("Not queuing PVC because key could not be derived", zap.Error(err))
150 return
151 }
152 p.claimQueue.Add(key)
153}
154
155// enqueuePV adds an added/changed PV to the work queue
156func (p *csiProvisioner) enqueuePV(obj interface{}) {
157 key, err := cache.MetaNamespaceKeyFunc(obj)
158 if err != nil {
159 p.logger.Error("Not queuing PV because key could not be derived", zap.Error(err))
160 return
161 }
162 p.pvQueue.Add(key)
163}
164
165// processQueueItems gets items from the given work queue and calls the process function for each of them. It self-
166// terminates once the queue is shut down.
167func (p *csiProvisioner) processQueueItems(queue workqueue.RateLimitingInterface, process func(key string) error) {
168 for {
169 obj, shutdown := queue.Get()
170 if shutdown {
171 return
172 }
173
174 func(obj interface{}) {
175 defer queue.Done(obj)
176 key, ok := obj.(string)
177 if !ok {
178 queue.Forget(obj)
179 p.logger.Error("Expected string in workqueue", zap.Any("actual", obj))
180 return
181 }
182
183 if err := process(key); err != nil {
184 p.logger.Warn("Failed processing item, requeueing", zap.String("name", key),
185 zap.Int("num_requeues", queue.NumRequeues(obj)), zap.Error(err))
186 queue.AddRateLimited(obj)
187 }
188
189 queue.Forget(obj)
190 }(obj)
191 }
192}
193
194// getVolumePath gets the path where the volume is stored or an error if the storage manager doesn't
195// have the volume available
196func (p *csiProvisioner) getVolumePath(volumeID string) (string, error) {
197 return p.storageManager.GetPathInPlace(storage.PlaceData, filepath.Join(volumeDir, volumeID))
198}
199
200// ensureVolumePath ensures that the top-level volume directory is created. It fails if the storage manager doesn't
201// have the volume available.
202func (p *csiProvisioner) ensureVolumePath() error {
203 path, err := p.storageManager.GetPathInPlace(storage.PlaceData, volumeDir)
204 if err != nil {
205 return err
206 }
207 return os.MkdirAll(path, 0640)
208}
209
210// processPVC looks at a single PVC item from the queue, determines if it needs to be provisioned and logs the
211// provisioning result to the recorder
212func (p *csiProvisioner) processPVC(key string) error {
213 namespace, name, err := cache.SplitMetaNamespaceKey(key)
214 if err != nil {
215 return fmt.Errorf("invalid resource key: %s", key)
216 }
217 pvc, err := p.pvcInformer.Lister().PersistentVolumeClaims(namespace).Get(name)
218 if apierrs.IsNotFound(err) {
219 return nil // nothing to do, no error
220 } else if err != nil {
221 return fmt.Errorf("failed to get PVC for processing: %w", err)
222 }
223
224 if !p.isOurPVC(pvc) {
225 return nil
226 }
227
228 if pvc.Status.Phase != "Pending" {
229 // If the PVC is not pending, we don't need to provision anything
230 return nil
231 }
232
233 storageClass, err := p.storageClassInformer.Lister().Get(*pvc.Spec.StorageClassName)
234 if err != nil {
235 return fmt.Errorf("")
236 }
237
238 if storageClass.Provisioner != csiProvisionerName {
239 // We're not responsible for this PVC. Can only happen if controller-manager makes a mistake
240 // setting the annotations, but we're bailing here anyways for safety.
241 return nil
242 }
243
244 err = p.provisionPVC(pvc, storageClass)
245
246 if err != nil {
247 p.recorder.Eventf(pvc, v1.EventTypeWarning, "ProvisioningFailed", "Failed to provision PV: %v", err)
248 return err
249 }
250 p.recorder.Eventf(pvc, v1.EventTypeNormal, "Provisioned", "Successfully provisioned PV")
251
252 return nil
253}
254
255// provisionPVC creates the directory where the volume lives, sets a quota for the requested amount of storage and
256// creates the PV object representing this new volume
257func (p *csiProvisioner) provisionPVC(pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
258 claimRef, err := ref.GetReference(scheme.Scheme, pvc)
259 if err != nil {
260 return fmt.Errorf("failed to get reference to PVC: %w", err)
261 }
262
263 storageReq := pvc.Spec.Resources.Requests[v1.ResourceStorage]
264 if storageReq.IsZero() {
265 return fmt.Errorf("PVC is not requesting any storage, this is not supported")
266 }
267 capacity, ok := storageReq.AsInt64()
268 if !ok {
269 return fmt.Errorf("PVC requesting more than 2^63 bytes of storage, this is not supported")
270 }
271
272 if *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock {
273 return fmt.Errorf("Block PVCs are not supported by Smalltown")
274 }
275
276 volumeID := "pvc-" + string(pvc.ObjectMeta.UID)
277 volumePath, err := p.getVolumePath(volumeID)
278 if err != nil {
279 return fmt.Errorf("unable to access volumes: %w", err)
280 }
281
282 if err := p.ensureVolumePath(); err != nil {
283 return fmt.Errorf("failed to create volume location: %w", err)
284 }
285
286 p.logger.Info("Creating local PV", zap.String("volume-id", volumeID))
287 if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
288 return fmt.Errorf("failed to create volume directory: %w", err)
289 }
290 files, err := ioutil.ReadDir(volumePath)
291 if err != nil {
292 return fmt.Errorf("failed to list files in newly-created volume: %w", err)
293 }
294 if len(files) > 0 {
295 return errors.New("newly-created volume already contains data, bailing")
296 }
297 if err := fsquota.SetQuota(volumePath, uint64(capacity), 100000); err != nil {
298 return fmt.Errorf("failed to update quota: %v", err)
299 }
300
301 vol := &v1.PersistentVolume{
302 ObjectMeta: metav1.ObjectMeta{
303 Name: volumeID,
304 Annotations: map[string]string{
305 "pv.kubernetes.io/provisioned-by": csiProvisionerName},
306 },
307 Spec: v1.PersistentVolumeSpec{
308 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
309 Capacity: v1.ResourceList{
310 v1.ResourceStorage: storageReq, // We're always giving the exact amount
311 },
312 PersistentVolumeSource: v1.PersistentVolumeSource{
313 CSI: &v1.CSIPersistentVolumeSource{
314 Driver: csiProvisionerName,
315 VolumeHandle: volumeID,
316 },
317 },
318 ClaimRef: claimRef,
319 NodeAffinity: &v1.VolumeNodeAffinity{
320 Required: &v1.NodeSelector{
321 NodeSelectorTerms: []v1.NodeSelectorTerm{
322 {
323 MatchExpressions: []v1.NodeSelectorRequirement{
324 {
325 Key: "kubernetes.io/hostname",
326 Operator: v1.NodeSelectorOpIn,
327 Values: []string{p.nodeName},
328 },
329 },
330 },
331 },
332 },
333 },
334 StorageClassName: *pvc.Spec.StorageClassName,
335 PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,
336 },
337 }
338
339 if _, err = p.kubeclientset.CoreV1().PersistentVolumes().Create(context.Background(), vol, metav1.CreateOptions{}); !apierrs.IsAlreadyExists(err) && err != nil {
340 return fmt.Errorf("failed to create PV object: %w", err)
341 }
342 return nil
343}
344
345// processPV looks at a single PV item from the queue and checks if it has been released and needs to be deleted. If yes
346// it deletes the associated quota, directory and the PV object and logs the result to the recorder.
347func (p *csiProvisioner) processPV(key string) error {
348 _, name, err := cache.SplitMetaNamespaceKey(key)
349 if err != nil {
350 return fmt.Errorf("invalid resource key: %s", key)
351 }
352 pv, err := p.pvInformer.Lister().Get(name)
353 if apierrs.IsNotFound(err) {
354 return nil // nothing to do, no error
355 } else if err != nil {
356 return fmt.Errorf("failed to get PV for processing: %w", err)
357 }
358
359 if !p.isOurPV(pv) {
360 return nil
361 }
362 if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != "Released" {
363 return nil
364 }
365 volumePath, err := p.getVolumePath(pv.Spec.CSI.VolumeHandle)
366
367 // Log deletes for auditing purposes
368 p.logger.Info("Deleting persistent volume", zap.String("name", pv.Spec.CSI.VolumeHandle))
369 if err := fsquota.SetQuota(volumePath, 0, 0); err != nil {
370 // We record these here manually since a successful deletion removes the PV we'd be attaching them to
371 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to remove quota: %v", err)
372 return fmt.Errorf("failed to remove quota: %w", err)
373 }
374 err = os.RemoveAll(volumePath)
375 if os.IsNotExist(err) {
376 return nil
377 } else if err != nil {
378 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
379 return fmt.Errorf("failed to delete volume: %w", err)
380 }
381
382 err = p.kubeclientset.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})
383 if err != nil && !apierrs.IsNotFound(err) {
384 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete PV object from K8s API: %v", err)
385 return fmt.Errorf("failed to delete PV object: %w", err)
386 }
387 return nil
388}