m/n/kubernetes: switch to typed workqueue
The functions and types without "Typed" are deprecated, and should be
replaced by the corresponding ones with "Typed".
Change-Id: I41c378df953ae4964d1247e470ccf38f13ea1f47
Reviewed-on: https://review.monogon.dev/c/monogon/+/3784
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/metropolis/node/kubernetes/provisioner.go b/metropolis/node/kubernetes/provisioner.go
index aacb949..f92acf2 100644
--- a/metropolis/node/kubernetes/provisioner.go
+++ b/metropolis/node/kubernetes/provisioner.go
@@ -66,8 +66,8 @@
InformerFactory informers.SharedInformerFactory
VolumesDirectory *localstorage.DataVolumesDirectory
- claimQueue workqueue.RateLimitingInterface
- pvQueue workqueue.RateLimitingInterface
+ claimQueue workqueue.TypedRateLimitingInterface[string]
+ pvQueue workqueue.TypedRateLimitingInterface[string]
recorder record.EventRecorder
pvcInformer coreinformers.PersistentVolumeClaimInformer
pvInformer coreinformers.PersistentVolumeInformer
@@ -93,8 +93,8 @@
p.pvcInformer = p.InformerFactory.Core().V1().PersistentVolumeClaims()
p.storageClassInformer = p.InformerFactory.Storage().V1().StorageClasses()
- p.claimQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
- p.pvQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+ p.claimQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
+ p.pvQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
p.logger = supervisor.Logger(ctx)
@@ -186,24 +186,18 @@
// processQueueItems gets items from the given work queue and calls the process
// function for each of them. It self- terminates once the queue is shut down.
-func (p *csiProvisionerServer) processQueueItems(queue workqueue.RateLimitingInterface, process func(key string) error) {
+func (p *csiProvisionerServer) processQueueItems(queue workqueue.TypedRateLimitingInterface[string], process func(key string) error) {
for {
obj, shutdown := queue.Get()
if shutdown {
return
}
- func(obj interface{}) {
+ func(obj string) {
defer queue.Done(obj)
- key, ok := obj.(string)
- if !ok {
- queue.Forget(obj)
- p.logger.Errorf("Expected string in workqueue, got %+v", obj)
- return
- }
- if err := process(key); err != nil {
- p.logger.Warningf("Failed processing item %q, requeueing (numrequeues: %d): %v", key, queue.NumRequeues(obj), err)
+ if err := process(obj); err != nil {
+ p.logger.Warningf("Failed processing item %q, requeueing (numrequeues: %d): %v", obj, queue.NumRequeues(obj), err)
queue.AddRateLimited(obj)
}