m/node/kubernetes: implement storage resizing
This implements persistent volume resizing in the storage provisioner.
The logic is based on https://github.com/kubernetes-csi/external-resizer
The mutation caches are an optimization to prevent unnecessary repeated
processing, because they make the controller remember changes that it
has made itself, when the watch events for those changes have not
arrived yet.
The controller supports the RecoverVolumeExpansionFailure feature, which
allows reducing the requested size when the previous resize fails due to
insufficient space. When resize fails, it is retried with backoff.
Change-Id: I0f3d40c1a592b30d25739f5d20b529dfe25dfbe1
Reviewed-on: https://review.monogon.dev/c/monogon/+/4008
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/test/e2e/suites/kubernetes/run_test.go b/metropolis/test/e2e/suites/kubernetes/run_test.go
index ee39c1c..8333567 100644
--- a/metropolis/test/e2e/suites/kubernetes/run_test.go
+++ b/metropolis/test/e2e/suites/kubernetes/run_test.go
@@ -15,6 +15,7 @@
_ "net/http/pprof"
"net/url"
"os"
+ "slices"
"strings"
"testing"
"time"
@@ -25,6 +26,7 @@
nwkv1 "k8s.io/api/networking/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/utils/ptr"
@@ -445,6 +447,7 @@
_, err := clientSet.AppsV1().StatefulSets("default").Create(ctx, makeTestStatefulSet(statefulSetName, runtimeClass), metav1.CreateOptions{})
return err
})
+ var podName string
util.TestEventual(t, fmt.Sprintf("StatefulSet with %s tests successful", runtimeClass), ctx, smallTestTimeout, func(ctx context.Context) error {
res, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("name=%s", statefulSetName)})
if err != nil {
@@ -454,20 +457,51 @@
return errors.New("pod didn't get created")
}
pod := res.Items[0]
- lines, err := getPodLogLines(ctx, clientSet, pod.Name, 50)
+ podName = pod.Name
+ lines, err := getPodLogLines(ctx, clientSet, podName, 50)
if err != nil {
return fmt.Errorf("could not get logs: %w", err)
}
- if len(lines) > 0 {
- switch lines[len(lines)-1] {
- case "[TESTS-PASSED]":
- return nil
- case "[TESTS-FAILED]":
- return util.Permanent(fmt.Errorf("tests failed, log:\n %s", strings.Join(lines, "\n ")))
- }
+ if slices.Contains(lines, "[INIT-PASSED]") {
+ return nil
+ }
+ if slices.Contains(lines, "[INIT-FAILED]") {
+ return util.Permanent(fmt.Errorf("tests failed, log:\n %s", strings.Join(lines, "\n ")))
}
return fmt.Errorf("pod is not ready: %v, log:\n %s", pod.Status.Phase, strings.Join(lines, "\n "))
})
+ util.TestEventual(t, fmt.Sprintf("StatefulSet with %s request resize", runtimeClass), ctx, smallTestTimeout, func(ctx context.Context) error {
+ for _, templateName := range []string{"vol-default", "vol-readonly", "vol-block"} {
+ name := fmt.Sprintf("%s-%s-0", templateName, statefulSetName)
+ patch := `{"spec": {"resources": {"requests": {"storage": "4Mi"}}}}`
+ _, err := clientSet.CoreV1().PersistentVolumeClaims("default").Patch(ctx, name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ i := 0
+ util.TestEventual(t, fmt.Sprintf("StatefulSet with %s resize successful", runtimeClass), ctx, smallTestTimeout, func(ctx context.Context) error {
+ // Make a change to the pod to make kubelet look at it and notice that it
+ // should call NodeExpandVolume. If we don't do this, it might take up to
+ // 1 minute for kubelet to notice, which slows down the test.
+ patch := fmt.Sprintf(`{"metadata": {"labels": {"trigger-kubelet-update": "%d"}}}`, i)
+ i += 1
+ _, err := clientSet.CoreV1().Pods("default").Patch(ctx, podName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
+ if err != nil {
+ return err
+ }
+
+ lines, err := getPodLogLines(ctx, clientSet, podName, 50)
+ if err != nil {
+ return fmt.Errorf("could not get logs: %w", err)
+ }
+ if slices.Contains(lines, "[RESIZE-PASSED]") {
+ return nil
+ }
+ return fmt.Errorf("waiting for resize, log:\n %s", strings.Join(lines, "\n "))
+ })
}
util.TestEventual(t, "Deployment in user namespace", ctx, largeTestTimeout, func(ctx context.Context) error {
deployment := makeTestDeploymentSpec("test-userns-1")