Implement Block PVCs in our storage backend

This implements full support for Block PVCs in our Kubernetes storage backend.
The block PVCs are backed by files made available to the pods using loop devices and
have read-only and online expansion support.

This also requires a Kubernetes patch because they call losetup if block PVCs are used
with CSI to establish a form of lock on the backing block device. This lock is not
exclusive and does absolutely nothing for our use case and could get very expensive
on dense machines so I removed it.

Test Plan: Comes with E2E tests

X-Origin-Diff: phab/D746
GitOrigin-RevId: 430d3f445286c0d3498b2153df333a19f3fcab89
diff --git a/metropolis/node/core/localstorage/directory_data.go b/metropolis/node/core/localstorage/directory_data.go
index 464b790..a98a915 100644
--- a/metropolis/node/core/localstorage/directory_data.go
+++ b/metropolis/node/core/localstorage/directory_data.go
@@ -124,7 +124,9 @@
 }
 
 func (d *DataDirectory) mount() error {
-	if err := unix.Mount("/dev/data", d.FullPath(), "xfs", unix.MS_NOEXEC|unix.MS_NODEV, "pquota"); err != nil {
+	// TODO(T965): MS_NODEV should definitely be set on the data partition, but as long as the kubelet root
+	// is on there, we can't do it.
+	if err := unix.Mount("/dev/data", d.FullPath(), "xfs", unix.MS_NOEXEC, "pquota"); err != nil {
 		return fmt.Errorf("mounting data directory: %w", err)
 	}
 	return nil
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index 3ffe7a9..cec9a6e 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -26,6 +26,7 @@
         "//metropolis/pkg/fileargs:go_default_library",
         "//metropolis/pkg/fsquota:go_default_library",
         "//metropolis/pkg/logtree:go_default_library",
+        "//metropolis/pkg/loop:go_default_library",
         "//metropolis/pkg/pki:go_default_library",
         "//metropolis/pkg/supervisor:go_default_library",
         "//metropolis/proto/api:go_default_library",
diff --git a/metropolis/node/kubernetes/csi.go b/metropolis/node/kubernetes/csi.go
index 3f88d6f..efd8af4 100644
--- a/metropolis/node/kubernetes/csi.go
+++ b/metropolis/node/kubernetes/csi.go
@@ -30,22 +30,22 @@
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
-	pluginregistration "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
+	"k8s.io/kubelet/pkg/apis/pluginregistration/v1"
 
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/pkg/fsquota"
 	"source.monogon.dev/metropolis/pkg/logtree"
+	"source.monogon.dev/metropolis/pkg/loop"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
 // Derived from K8s spec for acceptable names, but shortened to 130 characters to avoid issues with
 // maximum path length. We don't provision longer names so this applies only if you manually create
 // a volume with a name of more than 130 characters.
-var acceptableNames = regexp.MustCompile("^[a-z][a-bz0-9-.]{0,128}[a-z0-9]$")
-
-const volumeDir = "volumes"
+var acceptableNames = regexp.MustCompile("^[a-z][a-z0-9-.]{0,128}[a-z0-9]$")
 
 type csiPluginServer struct {
+	*csi.UnimplementedNodeServer
 	KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
 	VolumesDirectory *localstorage.DataVolumesDirectory
 
@@ -86,14 +86,6 @@
 	return nil
 }
 
-func (*csiPluginServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
-	return nil, status.Errorf(codes.Unimplemented, "method NodeStageVolume not supported")
-}
-
-func (*csiPluginServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
-	return nil, status.Errorf(codes.Unimplemented, "method NodeUnstageVolume not supported")
-}
-
 func (s *csiPluginServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
 	if !acceptableNames.MatchString(req.VolumeId) {
 		return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
@@ -110,29 +102,66 @@
 	}
 	switch req.VolumeCapability.AccessType.(type) {
 	case *csi.VolumeCapability_Mount:
+		err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
+		switch {
+		case err == unix.ENOENT:
+			return nil, status.Error(codes.NotFound, "volume not found")
+		case err != nil:
+			return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
+		}
+
+		if req.Readonly {
+			err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND|unix.MS_REMOUNT|unix.MS_RDONLY, "")
+			if err != nil {
+				_ = unix.Unmount(req.TargetPath, 0) // Best-effort
+				return nil, status.Errorf(codes.Unavailable, "failed to remount volume: %v", err)
+			}
+		}
+	case *csi.VolumeCapability_Block:
+		f, err := os.OpenFile(volumePath, os.O_RDWR, 0)
+		if err != nil {
+			return nil, status.Errorf(codes.Unavailable, "failed to open block volume: %v", err)
+		}
+		defer f.Close()
+		var flags uint32 = loop.FlagDirectIO
+		if req.Readonly {
+			flags |= loop.FlagReadOnly
+		}
+		loopdev, err := loop.Create(f, loop.Config{Flags: flags})
+		if err != nil {
+			return nil, status.Errorf(codes.Unavailable, "failed to create loop device: %v", err)
+		}
+		loopdevNum, err := loopdev.Dev()
+		if err != nil {
+			loopdev.Remove()
+			return nil, status.Errorf(codes.Internal, "device number not available: %v", err)
+		}
+		if err := unix.Mknod(req.TargetPath, unix.S_IFBLK|0640, int(loopdevNum)); err != nil {
+			loopdev.Remove()
+			return nil, status.Errorf(codes.Unavailable, "failed to create device node at target path: %v", err)
+		}
+		loopdev.Close()
 	default:
 		return nil, status.Error(codes.InvalidArgument, "unsupported access type")
 	}
 
-	err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
-	switch {
-	case err == unix.ENOENT:
-		return nil, status.Error(codes.NotFound, "volume not found")
-	case err != nil:
-		return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
-	}
-
-	if req.Readonly {
-		err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND|unix.MS_REMOUNT|unix.MS_RDONLY, "")
-		if err != nil {
-			_ = unix.Unmount(req.TargetPath, 0) // Best-effort
-			return nil, status.Errorf(codes.Unavailable, "failed to remount volume: %v", err)
-		}
-	}
 	return &csi.NodePublishVolumeResponse{}, nil
 }
 
-func (*csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
+func (s *csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
+	loopdev, err := loop.Open(req.TargetPath)
+	if err == nil {
+		defer loopdev.Close()
+		// We have a block device
+		if err := loopdev.Remove(); err != nil {
+			return nil, status.Errorf(codes.Unavailable, "failed to remove loop device: %v", err)
+		}
+		if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) {
+			return nil, status.Errorf(codes.Unavailable, "failed to remove device inode: %v", err)
+		}
+		return &csi.NodeUnpublishVolumeResponse{}, nil
+	}
+	// Otherwise try a normal unmount
 	if err := unix.Unmount(req.TargetPath, 0); err != nil {
 		return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
 	}
@@ -165,10 +194,27 @@
 	}, nil
 }
 
-func (*csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
+func (s *csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
 	if req.CapacityRange.LimitBytes <= 0 {
 		return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
 	}
+	loopdev, err := loop.Open(req.VolumePath)
+	if err == nil {
+		defer loopdev.Close()
+		volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
+		imageFile, err := os.OpenFile(volumePath, os.O_RDWR, 0)
+		if err != nil {
+			return nil, status.Errorf(codes.Unavailable, "failed to open block volume backing file: %v", err)
+		}
+		defer imageFile.Close()
+		if err := unix.Fallocate(int(imageFile.Fd()), 0, 0, req.CapacityRange.LimitBytes); err != nil {
+			return nil, status.Errorf(codes.Unavailable, "failed to expand volume using fallocate: %v", err)
+		}
+		if err := loopdev.RefreshSize(); err != nil {
+			return nil, status.Errorf(codes.Unavailable, "failed to refresh loop device size: %v", err)
+		}
+		return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
+	}
 	if err := fsquota.SetQuota(req.VolumePath, uint64(req.CapacityRange.LimitBytes), 0); err != nil {
 		return nil, status.Errorf(codes.Unavailable, "failed to update quota: %v", err)
 	}
diff --git a/metropolis/node/kubernetes/provisioner.go b/metropolis/node/kubernetes/provisioner.go
index b69d255..0aa5c66 100644
--- a/metropolis/node/kubernetes/provisioner.go
+++ b/metropolis/node/kubernetes/provisioner.go
@@ -24,6 +24,7 @@
 	"os"
 	"path/filepath"
 
+	"golang.org/x/sys/unix"
 	v1 "k8s.io/api/core/v1"
 	storagev1 "k8s.io/api/storage/v1"
 	apierrs "k8s.io/apimachinery/pkg/api/errors"
@@ -40,8 +41,8 @@
 	"k8s.io/client-go/util/workqueue"
 
 	"source.monogon.dev/metropolis/node/core/localstorage"
-	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/fsquota"
+	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
@@ -255,26 +256,37 @@
 		return fmt.Errorf("PVC requesting more than 2^63 bytes of storage, this is not supported")
 	}
 
-	if *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock {
-		return fmt.Errorf("Block PVCs are currently not supported by Metropolis")
-	}
-
 	volumeID := "pvc-" + string(pvc.ObjectMeta.UID)
 	volumePath := p.volumePath(volumeID)
 
 	p.logger.Infof("Creating local PV %s", volumeID)
-	if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
-		return fmt.Errorf("failed to create volume directory: %w", err)
-	}
-	files, err := ioutil.ReadDir(volumePath)
-	if err != nil {
-		return fmt.Errorf("failed to list files in newly-created volume: %w", err)
-	}
-	if len(files) > 0 {
-		return errors.New("newly-created volume already contains data, bailing")
-	}
-	if err := fsquota.SetQuota(volumePath, uint64(capacity), 100000); err != nil {
-		return fmt.Errorf("failed to update quota: %v", err)
+
+	switch *pvc.Spec.VolumeMode {
+	case "", v1.PersistentVolumeFilesystem:
+		if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
+			return fmt.Errorf("failed to create volume directory: %w", err)
+		}
+		files, err := ioutil.ReadDir(volumePath)
+		if err != nil {
+			return fmt.Errorf("failed to list files in newly-created volume: %w", err)
+		}
+		if len(files) > 0 {
+			return errors.New("newly-created volume already contains data, bailing")
+		}
+		if err := fsquota.SetQuota(volumePath, uint64(capacity), 100000); err != nil {
+			return fmt.Errorf("failed to update quota: %v", err)
+		}
+	case v1.PersistentVolumeBlock:
+		imageFile, err := os.OpenFile(volumePath, os.O_CREATE|os.O_RDWR, 0644)
+		if err != nil {
+			return fmt.Errorf("failed to create volume image: %w", err)
+		}
+		defer imageFile.Close()
+		if err := unix.Fallocate(int(imageFile.Fd()), 0, 0, capacity); err != nil {
+			return fmt.Errorf("failed to fallocate() volume image: %w", err)
+		}
+	default:
+		return fmt.Errorf("VolumeMode \"%s\" is unsupported", *pvc.Spec.VolumeMode)
 	}
 
 	vol := &v1.PersistentVolume{
@@ -294,7 +306,8 @@
 					VolumeHandle: volumeID,
 				},
 			},
-			ClaimRef: claimRef,
+			ClaimRef:   claimRef,
+			VolumeMode: pvc.Spec.VolumeMode,
 			NodeAffinity: &v1.VolumeNodeAffinity{
 				Required: &v1.NodeSelector{
 					NodeSelectorTerms: []v1.NodeSelectorTerm{
@@ -346,17 +359,25 @@
 
 	// Log deletes for auditing purposes
 	p.logger.Infof("Deleting persistent volume %s", pv.Spec.CSI.VolumeHandle)
-	if err := fsquota.SetQuota(volumePath, 0, 0); err != nil {
-		// We record these here manually since a successful deletion removes the PV we'd be attaching them to
-		p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to remove quota: %v", err)
-		return fmt.Errorf("failed to remove quota: %w", err)
-	}
-	err = os.RemoveAll(volumePath)
-	if os.IsNotExist(err) {
-		return nil
-	} else if err != nil {
-		p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
-		return fmt.Errorf("failed to delete volume: %w", err)
+	switch *pv.Spec.VolumeMode {
+	case "", v1.PersistentVolumeFilesystem:
+		if err := fsquota.SetQuota(volumePath, 0, 0); err != nil {
+			// We record these here manually since a successful deletion removes the PV we'd be attaching them to
+			p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to remove quota: %v", err)
+			return fmt.Errorf("failed to remove quota: %w", err)
+		}
+		if err := os.RemoveAll(volumePath); err != nil && !os.IsNotExist(err) {
+			p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
+			return fmt.Errorf("failed to delete volume: %w", err)
+		}
+	case v1.PersistentVolumeBlock:
+		if err := os.Remove(volumePath); err != nil && !os.IsNotExist(err) {
+			p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
+			return fmt.Errorf("failed to delete volume: %w", err)
+		}
+	default:
+		p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Invalid volume mode \"%v\"", *pv.Spec.VolumeMode)
+		return fmt.Errorf("invalid volume mode \"%v\"", *pv.Spec.VolumeMode)
 	}
 
 	err = p.Kubernetes.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})