core/internal: move containerd and kubernetes to localstorage

This moves the last users of the old 'storage' library onto 'localstorage'. We move a lot of 'runtime' directories to a single `/ephemeral` root. This could be called `/run`, but that might imply FHS compliance - which we don't have, nor want to have.

We also slightly refactor Kubernetes services to be a bit nicer to spawn. But generally, this is a pure refactor, with no functional changes.

Test Plan: this should fail. part of a larger stack. D590 is the first tip of the stack that should work.

X-Origin-Diff: phab/D589
GitOrigin-RevId: d2a7c0bb52c2a7c753199221c609e03474936c22
diff --git a/core/internal/kubernetes/csi.go b/core/internal/kubernetes/csi.go
index 6db82bc..e151396 100644
--- a/core/internal/kubernetes/csi.go
+++ b/core/internal/kubernetes/csi.go
@@ -24,8 +24,6 @@
 	"path/filepath"
 	"regexp"
 
-	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
-
 	"github.com/container-storage-interface/spec/lib/go/csi"
 	"github.com/golang/protobuf/ptypes/wrappers"
 	"go.uber.org/zap"
@@ -35,7 +33,8 @@
 	"google.golang.org/grpc/status"
 	pluginregistration "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
 
-	"git.monogon.dev/source/nexantic.git/core/internal/storage"
+	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+	"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
 	"git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
 )
 
@@ -46,69 +45,63 @@
 
 const volumeDir = "volumes"
 
-const pluginSocketPath = "/data/kubernetes/kubelet/plugins/com.smalltown.vfs.sock"
+type csiPluginServer struct {
+	KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
+	VolumesDirectory *localstorage.DataVolumesDirectory
 
-type csiServer struct {
-	manager *storage.Manager
-	logger  *zap.Logger
+	logger *zap.Logger
 }
 
-func runCSIPlugin(storMan *storage.Manager) supervisor.Runnable {
-	return func(ctx context.Context) error {
-		s := &csiServer{
-			manager: storMan,
-			logger:  supervisor.Logger(ctx),
-		}
-		pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: pluginSocketPath, Net: "unix"})
-		if err != nil {
-			return fmt.Errorf("failed to listen on CSI socket: %w", err)
-		}
-		pluginListener.SetUnlinkOnClose(true)
+func (s *csiPluginServer) Run(ctx context.Context) error {
+	s.logger = supervisor.Logger(ctx)
 
-		pluginServer := grpc.NewServer()
-		csi.RegisterIdentityServer(pluginServer, s)
-		csi.RegisterNodeServer(pluginServer, s)
-		// Enable graceful shutdown since we don't have long-running RPCs and most of them shouldn't and can't be
-		// cancelled anyways.
-		if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
-			return err
-		}
-
-		registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{
-			Name: "/data/kubernetes/kubelet/plugins_registry/com.smalltown.vfs-reg.sock",
-			Net:  "unix",
-		})
-		if err != nil {
-			return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
-		}
-		registrationListener.SetUnlinkOnClose(true)
-
-		registrationServer := grpc.NewServer()
-		pluginregistration.RegisterRegistrationServer(registrationServer, s)
-		if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
-			return err
-		}
-		supervisor.Signal(ctx, supervisor.SignalHealthy)
-		supervisor.Signal(ctx, supervisor.SignalDone)
-		return nil
+	pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.Plugins.VFS.FullPath(), Net: "unix"})
+	if err != nil {
+		return fmt.Errorf("failed to listen on CSI socket: %w", err)
 	}
+	pluginListener.SetUnlinkOnClose(true)
+
+	pluginServer := grpc.NewServer()
+	csi.RegisterIdentityServer(pluginServer, s)
+	csi.RegisterNodeServer(pluginServer, s)
+	// Enable graceful shutdown since we don't have long-running RPCs and most of them shouldn't and can't be
+	// cancelled anyways.
+	if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
+		return err
+	}
+
+	registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
+	if err != nil {
+		return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
+	}
+	registrationListener.SetUnlinkOnClose(true)
+
+	registrationServer := grpc.NewServer()
+	pluginregistration.RegisterRegistrationServer(registrationServer, s)
+	if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
+		return err
+	}
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	supervisor.Signal(ctx, supervisor.SignalDone)
+	return nil
 }
 
-func (*csiServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
+func (*csiPluginServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method NodeStageVolume not supported")
 }
-func (*csiServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
+
+func (*csiPluginServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method NodeUnstageVolume not supported")
 }
 
-func (s *csiServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+func (s *csiPluginServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
 	if !acceptableNames.MatchString(req.VolumeId) {
 		return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
 	}
-	volumePath, err := s.manager.GetPathInPlace(storage.PlaceData, filepath.Join(volumeDir, req.VolumeId))
-	if err != nil {
-		return nil, status.Error(codes.Unavailable, "persistent data storage not available")
-	}
+
+	// TODO(q3k): move this logic to localstorage?
+	volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
+
 	switch req.VolumeCapability.AccessMode.Mode {
 	case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
 	case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
@@ -121,12 +114,14 @@
 		return nil, status.Error(codes.InvalidArgument, "unsupported access type")
 	}
 
-	err = unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
-	if err == unix.ENOENT {
+	err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
+	switch {
+	case err == unix.ENOENT:
 		return nil, status.Error(codes.NotFound, "volume not found")
-	} else if err != nil {
+	case err != nil:
 		return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
 	}
+
 	if req.Readonly {
 		err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND|unix.MS_REMOUNT|unix.MS_RDONLY, "")
 		if err != nil {
@@ -136,13 +131,15 @@
 	}
 	return &csi.NodePublishVolumeResponse{}, nil
 }
-func (*csiServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
+
+func (*csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
 	if err := unix.Unmount(req.TargetPath, 0); err != nil {
 		return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
 	}
 	return &csi.NodeUnpublishVolumeResponse{}, nil
 }
-func (*csiServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
+
+func (*csiPluginServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
 	quota, err := fsquota.GetQuota(req.VolumePath)
 	if os.IsNotExist(err) {
 		return nil, status.Error(codes.NotFound, "volume does not exist at this path")
@@ -167,7 +164,8 @@
 		},
 	}, nil
 }
-func (*csiServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
+
+func (*csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
 	if req.CapacityRange.LimitBytes <= 0 {
 		return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
 	}
@@ -184,7 +182,8 @@
 		},
 	}
 }
-func (*csiServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
+
+func (*csiPluginServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
 	return &csi.NodeGetCapabilitiesResponse{
 		Capabilities: []*csi.NodeServiceCapability{
 			rpcCapability(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
@@ -192,7 +191,8 @@
 		},
 	}, nil
 }
-func (*csiServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
+
+func (*csiPluginServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
 	hostname, err := os.Hostname()
 	if err != nil {
 		return nil, status.Errorf(codes.Unavailable, "failed to get node identity: %v", err)
@@ -203,13 +203,14 @@
 }
 
 // CSI Identity endpoints
-func (*csiServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
+func (*csiPluginServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
 	return &csi.GetPluginInfoResponse{
 		Name:          "com.smalltown.vfs",
 		VendorVersion: "0.0.1", // TODO(lorenz): Maybe stamp?
 	}, nil
 }
-func (*csiServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
+
+func (*csiPluginServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
 	return &csi.GetPluginCapabilitiesResponse{
 		Capabilities: []*csi.PluginCapability{
 			{
@@ -223,23 +224,21 @@
 	}, nil
 }
 
-func (s *csiServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
-	_, err := s.manager.GetPathInPlace(storage.PlaceData, volumeDir)
-	ready := err == nil
-	return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: ready}}, nil
+func (s *csiPluginServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
+	return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: true}}, nil
 }
 
 // Registration endpoints
-func (s *csiServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
+func (s *csiPluginServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
 	return &pluginregistration.PluginInfo{
 		Type:              "CSIPlugin",
 		Name:              "com.smalltown.vfs",
-		Endpoint:          pluginSocketPath,
+		Endpoint:          s.KubeletDirectory.Plugins.VFS.FullPath(),
 		SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
 	}, nil
 }
 
-func (s *csiServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
+func (s *csiPluginServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
 	if req.Error != "" {
 		s.logger.Warn("Kubelet failed registering CSI plugin", zap.String("error", req.Error))
 	}