blob: 9a3d936b94b3774fa5809de9dc0e32c29b172bda [file] [log] [blame]
Lorenz Brunb15abad2020-04-16 11:17:12 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package kubernetes
18
19import (
20 "context"
21 "errors"
22 "fmt"
23 "io/ioutil"
24 "os"
25 "path/filepath"
26
27 "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
28
29 "go.uber.org/zap"
30
31 "git.monogon.dev/source/nexantic.git/core/internal/storage"
32 "git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
33
34 v1 "k8s.io/api/core/v1"
35 storagev1 "k8s.io/api/storage/v1"
36 apierrs "k8s.io/apimachinery/pkg/api/errors"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/client-go/informers"
39 coreinformers "k8s.io/client-go/informers/core/v1"
40 storageinformers "k8s.io/client-go/informers/storage/v1"
41 "k8s.io/client-go/kubernetes"
42 "k8s.io/client-go/kubernetes/scheme"
43 typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
44 "k8s.io/client-go/tools/cache"
45 "k8s.io/client-go/tools/record"
46 ref "k8s.io/client-go/tools/reference"
47 "k8s.io/client-go/util/workqueue"
48)
49
50const csiProvisionerName = "com.nexantic.smalltown.vfs"
51
52// csiProvisioner is responsible for the provisioning and deprovisioning of CSI-based container volumes. It runs on all
53// nodes and watches PVCs for ones assigned to the node it's running on and fulfills the provisioning request by
54// creating a directory, applying a quota and creating the corresponding PV. When the PV is released and its retention
55// policy is Delete, the directory and the PV resource are deleted.
56type csiProvisioner struct {
57 nodeName string
58 kubeclientset kubernetes.Interface
59 claimQueue workqueue.RateLimitingInterface
60 pvQueue workqueue.RateLimitingInterface
61 recorder record.EventRecorder
62 pvcInformer coreinformers.PersistentVolumeClaimInformer
63 pvInformer coreinformers.PersistentVolumeInformer
64 storageClassInformer storageinformers.StorageClassInformer
65 storageManager *storage.Manager
66 logger *zap.Logger
67}
68
69// runCSIProvisioner runs the main provisioning machinery. It consists of a bunch of informers which keep track of
70// the events happening on the Kubernetes control plane and informs us when something happens. If anything happens to
71// PVCs or PVs, we enqueue the identifier of that resource in a work queue. Queues are being worked on by only one
72// worker to limit load and avoid complicated locking infrastructure. Failed items are requeued.
73func runCSIProvisioner(storMan *storage.Manager, kubeclientset kubernetes.Interface, informerFactory informers.SharedInformerFactory) supervisor.Runnable {
74 return func(ctx context.Context) error {
75 nodeName, err := os.Hostname()
76 if err != nil {
77 panic(err)
78 }
79
80 // The recorder is used to log Kubernetes events for successful or failed volume provisions. These events then
81 // show up in `kubectl describe pvc` and can be used by admins to debug issues with this provisioner.
82 eventBroadcaster := record.NewBroadcaster()
83 eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
84 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerName, Host: nodeName})
85
86 p := &csiProvisioner{
87 nodeName: nodeName,
88 recorder: recorder,
89 kubeclientset: kubeclientset,
90 pvInformer: informerFactory.Core().V1().PersistentVolumes(),
91 pvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
92 storageClassInformer: informerFactory.Storage().V1().StorageClasses(),
93 claimQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
94 pvQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
95 storageManager: storMan,
96 logger: supervisor.Logger(ctx),
97 }
98
99 p.pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
100 AddFunc: p.enqueueClaim,
101 UpdateFunc: func(old, new interface{}) {
102 p.enqueueClaim(new)
103 },
104 })
105 p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
106 AddFunc: p.enqueuePV,
107 UpdateFunc: func(old, new interface{}) {
108 p.enqueuePV(new)
109 },
110 })
111
112 go p.pvcInformer.Informer().Run(ctx.Done())
113 go p.pvInformer.Informer().Run(ctx.Done())
114 go p.storageClassInformer.Informer().Run(ctx.Done())
115
116 // These will self-terminate once the queues are shut down
117 go p.processQueueItems(p.claimQueue, func(key string) error {
118 return p.processPVC(key)
119 })
120 go p.processQueueItems(p.pvQueue, func(key string) error {
121 return p.processPV(key)
122 })
123
124 supervisor.Signal(ctx, supervisor.SignalHealthy)
125 <-ctx.Done()
126 p.claimQueue.ShutDown()
127 p.pvQueue.ShutDown()
128 return nil
129 }
130}
131
132// isOurPVC checks if the given PVC is is to be provisioned by this provisioner and has been scheduled onto this node
133func (p *csiProvisioner) isOurPVC(pvc *v1.PersistentVolumeClaim) bool {
134 return pvc.ObjectMeta.Annotations["volume.beta.kubernetes.io/storage-provisioner"] == csiProvisionerName &&
135 (pvc.ObjectMeta.Annotations["volume.kubernetes.io/selected-node"] == p.nodeName)
136}
137
138// isOurPV checks if the given PV has been provisioned by this provisioner and has been scheduled onto this node
139func (p *csiProvisioner) isOurPV(pv *v1.PersistentVolume) bool {
140 return pv.ObjectMeta.Annotations["pv.kubernetes.io/provisioned-by"] == csiProvisionerName &&
141 pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] == p.nodeName
142}
143
144// enqueueClaim adds an added/changed PVC to the work queue
145func (p *csiProvisioner) enqueueClaim(obj interface{}) {
146 key, err := cache.MetaNamespaceKeyFunc(obj)
147 if err != nil {
148 p.logger.Error("Not queuing PVC because key could not be derived", zap.Error(err))
149 return
150 }
151 p.claimQueue.Add(key)
152}
153
154// enqueuePV adds an added/changed PV to the work queue
155func (p *csiProvisioner) enqueuePV(obj interface{}) {
156 key, err := cache.MetaNamespaceKeyFunc(obj)
157 if err != nil {
158 p.logger.Error("Not queuing PV because key could not be derived", zap.Error(err))
159 return
160 }
161 p.pvQueue.Add(key)
162}
163
164// processQueueItems gets items from the given work queue and calls the process function for each of them. It self-
165// terminates once the queue is shut down.
166func (p *csiProvisioner) processQueueItems(queue workqueue.RateLimitingInterface, process func(key string) error) {
167 for {
168 obj, shutdown := queue.Get()
169 if shutdown {
170 return
171 }
172
173 func(obj interface{}) {
174 defer queue.Done(obj)
175 key, ok := obj.(string)
176 if !ok {
177 queue.Forget(obj)
178 p.logger.Error("Expected string in workqueue", zap.Any("actual", obj))
179 return
180 }
181
182 if err := process(key); err != nil {
183 p.logger.Warn("Failed processing item, requeueing", zap.String("name", key),
184 zap.Int("num_requeues", queue.NumRequeues(obj)), zap.Error(err))
185 queue.AddRateLimited(obj)
186 }
187
188 queue.Forget(obj)
189 }(obj)
190 }
191}
192
193// getVolumePath gets the path where the volume is stored or an error if the storage manager doesn't
194// have the volume available
195func (p *csiProvisioner) getVolumePath(volumeID string) (string, error) {
196 return p.storageManager.GetPathInPlace(storage.PlaceData, filepath.Join(volumeDir, volumeID))
197}
198
199// ensureVolumePath ensures that the top-level volume directory is created. It fails if the storage manager doesn't
200// have the volume available.
201func (p *csiProvisioner) ensureVolumePath() error {
202 path, err := p.storageManager.GetPathInPlace(storage.PlaceData, volumeDir)
203 if err != nil {
204 return err
205 }
206 return os.MkdirAll(path, 0640)
207}
208
209// processPVC looks at a single PVC item from the queue, determines if it needs to be provisioned and logs the
210// provisioning result to the recorder
211func (p *csiProvisioner) processPVC(key string) error {
212 namespace, name, err := cache.SplitMetaNamespaceKey(key)
213 if err != nil {
214 return fmt.Errorf("invalid resource key: %s", key)
215 }
216 pvc, err := p.pvcInformer.Lister().PersistentVolumeClaims(namespace).Get(name)
217 if apierrs.IsNotFound(err) {
218 return nil // nothing to do, no error
219 } else if err != nil {
220 return fmt.Errorf("failed to get PVC for processing: %w", err)
221 }
222
223 if !p.isOurPVC(pvc) {
224 return nil
225 }
226
227 if pvc.Status.Phase != "Pending" {
228 // If the PVC is not pending, we don't need to provision anything
229 return nil
230 }
231
232 storageClass, err := p.storageClassInformer.Lister().Get(*pvc.Spec.StorageClassName)
233 if err != nil {
234 return fmt.Errorf("")
235 }
236
237 if storageClass.Provisioner != csiProvisionerName {
238 // We're not responsible for this PVC. Can only happen if controller-manager makes a mistake
239 // setting the annotations, but we're bailing here anyways for safety.
240 return nil
241 }
242
243 err = p.provisionPVC(pvc, storageClass)
244
245 if err != nil {
246 p.recorder.Eventf(pvc, v1.EventTypeWarning, "ProvisioningFailed", "Failed to provision PV: %v", err)
247 return err
248 }
249 p.recorder.Eventf(pvc, v1.EventTypeNormal, "Provisioned", "Successfully provisioned PV")
250
251 return nil
252}
253
254// provisionPVC creates the directory where the volume lives, sets a quota for the requested amount of storage and
255// creates the PV object representing this new volume
256func (p *csiProvisioner) provisionPVC(pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
257 claimRef, err := ref.GetReference(scheme.Scheme, pvc)
258 if err != nil {
259 return fmt.Errorf("failed to get reference to PVC: %w", err)
260 }
261
262 storageReq := pvc.Spec.Resources.Requests[v1.ResourceStorage]
263 if storageReq.IsZero() {
264 return fmt.Errorf("PVC is not requesting any storage, this is not supported")
265 }
266 capacity, ok := storageReq.AsInt64()
267 if !ok {
268 return fmt.Errorf("PVC requesting more than 2^63 bytes of storage, this is not supported")
269 }
270
271 if *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock {
272 return fmt.Errorf("Block PVCs are not supported by Smalltown")
273 }
274
275 volumeID := "pvc-" + string(pvc.ObjectMeta.UID)
276 volumePath, err := p.getVolumePath(volumeID)
277 if err != nil {
278 return fmt.Errorf("unable to access volumes: %w", err)
279 }
280
281 if err := p.ensureVolumePath(); err != nil {
282 return fmt.Errorf("failed to create volume location: %w", err)
283 }
284
285 p.logger.Info("Creating local PV", zap.String("volume-id", volumeID))
286 if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
287 return fmt.Errorf("failed to create volume directory: %w", err)
288 }
289 files, err := ioutil.ReadDir(volumePath)
290 if err != nil {
291 return fmt.Errorf("failed to list files in newly-created volume: %w", err)
292 }
293 if len(files) > 0 {
294 return errors.New("newly-created volume already contains data, bailing")
295 }
296 if err := fsquota.SetQuota(volumePath, uint64(capacity), 100000); err != nil {
297 return fmt.Errorf("failed to update quota: %v", err)
298 }
299
300 vol := &v1.PersistentVolume{
301 ObjectMeta: metav1.ObjectMeta{
302 Name: volumeID,
303 Annotations: map[string]string{
304 "pv.kubernetes.io/provisioned-by": csiProvisionerName},
305 },
306 Spec: v1.PersistentVolumeSpec{
307 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
308 Capacity: v1.ResourceList{
309 v1.ResourceStorage: storageReq, // We're always giving the exact amount
310 },
311 PersistentVolumeSource: v1.PersistentVolumeSource{
312 CSI: &v1.CSIPersistentVolumeSource{
313 Driver: csiProvisionerName,
314 VolumeHandle: volumeID,
315 },
316 },
317 ClaimRef: claimRef,
318 NodeAffinity: &v1.VolumeNodeAffinity{
319 Required: &v1.NodeSelector{
320 NodeSelectorTerms: []v1.NodeSelectorTerm{
321 {
322 MatchExpressions: []v1.NodeSelectorRequirement{
323 {
324 Key: "kubernetes.io/hostname",
325 Operator: v1.NodeSelectorOpIn,
326 Values: []string{p.nodeName},
327 },
328 },
329 },
330 },
331 },
332 },
333 StorageClassName: *pvc.Spec.StorageClassName,
334 PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,
335 },
336 }
337
338 if _, err = p.kubeclientset.CoreV1().PersistentVolumes().Create(context.Background(), vol, metav1.CreateOptions{}); !apierrs.IsAlreadyExists(err) && err != nil {
339 return fmt.Errorf("failed to create PV object: %w", err)
340 }
341 return nil
342}
343
344// processPV looks at a single PV item from the queue and checks if it has been released and needs to be deleted. If yes
345// it deletes the associated quota, directory and the PV object and logs the result to the recorder.
346func (p *csiProvisioner) processPV(key string) error {
347 _, name, err := cache.SplitMetaNamespaceKey(key)
348 if err != nil {
349 return fmt.Errorf("invalid resource key: %s", key)
350 }
351 pv, err := p.pvInformer.Lister().Get(name)
352 if apierrs.IsNotFound(err) {
353 return nil // nothing to do, no error
354 } else if err != nil {
355 return fmt.Errorf("failed to get PV for processing: %w", err)
356 }
357
358 if !p.isOurPV(pv) {
359 return nil
360 }
361 if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != "Released" {
362 return nil
363 }
364 volumePath, err := p.getVolumePath(pv.Spec.CSI.VolumeHandle)
365
366 // Log deletes for auditing purposes
367 p.logger.Info("Deleting persistent volume", zap.String("name", pv.Spec.CSI.VolumeHandle))
368 if err := fsquota.SetQuota(volumePath, 0, 0); err != nil {
369 // We record these here manually since a successful deletion removes the PV we'd be attaching them to
370 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to remove quota: %v", err)
371 return fmt.Errorf("failed to remove quota: %w", err)
372 }
373 err = os.RemoveAll(volumePath)
374 if os.IsNotExist(err) {
375 return nil
376 } else if err != nil {
377 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
378 return fmt.Errorf("failed to delete volume: %w", err)
379 }
380
381 err = p.kubeclientset.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})
382 if err != nil && !apierrs.IsNotFound(err) {
383 p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete PV object from K8s API: %v", err)
384 return fmt.Errorf("failed to delete PV object: %w", err)
385 }
386 return nil
387}