m/n/kubernetes: switch to typed workqueue

The functions and types without "Typed" are deprecated, and should be
replaced by the corresponding ones with "Typed".

Change-Id: I41c378df953ae4964d1247e470ccf38f13ea1f47
Reviewed-on: https://review.monogon.dev/c/monogon/+/3784
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/metropolis/node/kubernetes/provisioner.go b/metropolis/node/kubernetes/provisioner.go
index aacb949..f92acf2 100644
--- a/metropolis/node/kubernetes/provisioner.go
+++ b/metropolis/node/kubernetes/provisioner.go
@@ -66,8 +66,8 @@
 	InformerFactory  informers.SharedInformerFactory
 	VolumesDirectory *localstorage.DataVolumesDirectory
 
-	claimQueue           workqueue.RateLimitingInterface
-	pvQueue              workqueue.RateLimitingInterface
+	claimQueue           workqueue.TypedRateLimitingInterface[string]
+	pvQueue              workqueue.TypedRateLimitingInterface[string]
 	recorder             record.EventRecorder
 	pvcInformer          coreinformers.PersistentVolumeClaimInformer
 	pvInformer           coreinformers.PersistentVolumeInformer
@@ -93,8 +93,8 @@
 	p.pvcInformer = p.InformerFactory.Core().V1().PersistentVolumeClaims()
 	p.storageClassInformer = p.InformerFactory.Storage().V1().StorageClasses()
 
-	p.claimQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
-	p.pvQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+	p.claimQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
+	p.pvQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
 
 	p.logger = supervisor.Logger(ctx)
 
@@ -186,24 +186,18 @@
 
 // processQueueItems gets items from the given work queue and calls the process
 // function for each of them. It self- terminates once the queue is shut down.
-func (p *csiProvisionerServer) processQueueItems(queue workqueue.RateLimitingInterface, process func(key string) error) {
+func (p *csiProvisionerServer) processQueueItems(queue workqueue.TypedRateLimitingInterface[string], process func(key string) error) {
 	for {
 		obj, shutdown := queue.Get()
 		if shutdown {
 			return
 		}
 
-		func(obj interface{}) {
+		func(obj string) {
 			defer queue.Done(obj)
-			key, ok := obj.(string)
-			if !ok {
-				queue.Forget(obj)
-				p.logger.Errorf("Expected string in workqueue, got %+v", obj)
-				return
-			}
 
-			if err := process(key); err != nil {
-				p.logger.Warningf("Failed processing item %q, requeueing (numrequeues: %d): %v", key, queue.NumRequeues(obj), err)
+			if err := process(obj); err != nil {
+				p.logger.Warningf("Failed processing item %q, requeueing (numrequeues: %d): %v", obj, queue.NumRequeues(obj), err)
 				queue.AddRateLimited(obj)
 			}