core/internal: move containerd and kubernetes to localstorage
This moves the last users of the old 'storage' library onto 'localstorage'. We move a lot of 'runtime' directories to a single `/ephemeral` root. This could be called `/run`, but that might imply FHS compliance - which we don't have, nor want to have.
We also slightly refactor Kubernetes services to be a bit nicer to spawn. But generally, this is a pure refactor, with no functional changes.
Test Plan: this should fail. part of a larger stack. D590 is the first tip of the stack that should work.
X-Origin-Diff: phab/D589
GitOrigin-RevId: d2a7c0bb52c2a7c753199221c609e03474936c22
diff --git a/core/internal/kubernetes/csi.go b/core/internal/kubernetes/csi.go
index 6db82bc..e151396 100644
--- a/core/internal/kubernetes/csi.go
+++ b/core/internal/kubernetes/csi.go
@@ -24,8 +24,6 @@
"path/filepath"
"regexp"
- "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
-
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes/wrappers"
"go.uber.org/zap"
@@ -35,7 +33,8 @@
"google.golang.org/grpc/status"
pluginregistration "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
- "git.monogon.dev/source/nexantic.git/core/internal/storage"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
"git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
)
@@ -46,69 +45,63 @@
const volumeDir = "volumes"
-const pluginSocketPath = "/data/kubernetes/kubelet/plugins/com.smalltown.vfs.sock"
+type csiPluginServer struct {
+ KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
+ VolumesDirectory *localstorage.DataVolumesDirectory
-type csiServer struct {
- manager *storage.Manager
- logger *zap.Logger
+ logger *zap.Logger
}
-func runCSIPlugin(storMan *storage.Manager) supervisor.Runnable {
- return func(ctx context.Context) error {
- s := &csiServer{
- manager: storMan,
- logger: supervisor.Logger(ctx),
- }
- pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: pluginSocketPath, Net: "unix"})
- if err != nil {
- return fmt.Errorf("failed to listen on CSI socket: %w", err)
- }
- pluginListener.SetUnlinkOnClose(true)
+func (s *csiPluginServer) Run(ctx context.Context) error {
+ s.logger = supervisor.Logger(ctx)
- pluginServer := grpc.NewServer()
- csi.RegisterIdentityServer(pluginServer, s)
- csi.RegisterNodeServer(pluginServer, s)
- // Enable graceful shutdown since we don't have long-running RPCs and most of them shouldn't and can't be
- // cancelled anyways.
- if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
- return err
- }
-
- registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{
- Name: "/data/kubernetes/kubelet/plugins_registry/com.smalltown.vfs-reg.sock",
- Net: "unix",
- })
- if err != nil {
- return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
- }
- registrationListener.SetUnlinkOnClose(true)
-
- registrationServer := grpc.NewServer()
- pluginregistration.RegisterRegistrationServer(registrationServer, s)
- if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
- return err
- }
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- supervisor.Signal(ctx, supervisor.SignalDone)
- return nil
+ pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.Plugins.VFS.FullPath(), Net: "unix"})
+ if err != nil {
+ return fmt.Errorf("failed to listen on CSI socket: %w", err)
}
+ pluginListener.SetUnlinkOnClose(true)
+
+ pluginServer := grpc.NewServer()
+ csi.RegisterIdentityServer(pluginServer, s)
+ csi.RegisterNodeServer(pluginServer, s)
+ // Enable graceful shutdown since we don't have long-running RPCs and most of them shouldn't and can't be
+ // cancelled anyways.
+ if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
+ return err
+ }
+
+ registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
+ if err != nil {
+ return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
+ }
+ registrationListener.SetUnlinkOnClose(true)
+
+ registrationServer := grpc.NewServer()
+ pluginregistration.RegisterRegistrationServer(registrationServer, s)
+ if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
+ return err
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
}
-func (*csiServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
+func (*csiPluginServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method NodeStageVolume not supported")
}
-func (*csiServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
+
+func (*csiPluginServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method NodeUnstageVolume not supported")
}
-func (s *csiServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+func (s *csiPluginServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if !acceptableNames.MatchString(req.VolumeId) {
return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
}
- volumePath, err := s.manager.GetPathInPlace(storage.PlaceData, filepath.Join(volumeDir, req.VolumeId))
- if err != nil {
- return nil, status.Error(codes.Unavailable, "persistent data storage not available")
- }
+
+ // TODO(q3k): move this logic to localstorage?
+ volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
+
switch req.VolumeCapability.AccessMode.Mode {
case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
@@ -121,12 +114,14 @@
return nil, status.Error(codes.InvalidArgument, "unsupported access type")
}
- err = unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
- if err == unix.ENOENT {
+ err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
+ switch {
+ case err == unix.ENOENT:
return nil, status.Error(codes.NotFound, "volume not found")
- } else if err != nil {
+ case err != nil:
return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
}
+
if req.Readonly {
err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND|unix.MS_REMOUNT|unix.MS_RDONLY, "")
if err != nil {
@@ -136,13 +131,15 @@
}
return &csi.NodePublishVolumeResponse{}, nil
}
-func (*csiServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
+
+func (*csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if err := unix.Unmount(req.TargetPath, 0); err != nil {
return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}
-func (*csiServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
+
+func (*csiPluginServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
quota, err := fsquota.GetQuota(req.VolumePath)
if os.IsNotExist(err) {
return nil, status.Error(codes.NotFound, "volume does not exist at this path")
@@ -167,7 +164,8 @@
},
}, nil
}
-func (*csiServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
+
+func (*csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
if req.CapacityRange.LimitBytes <= 0 {
return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
}
@@ -184,7 +182,8 @@
},
}
}
-func (*csiServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
+
+func (*csiPluginServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
rpcCapability(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
@@ -192,7 +191,8 @@
},
}, nil
}
-func (*csiServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
+
+func (*csiPluginServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, status.Errorf(codes.Unavailable, "failed to get node identity: %v", err)
@@ -203,13 +203,14 @@
}
// CSI Identity endpoints
-func (*csiServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
+func (*csiPluginServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
return &csi.GetPluginInfoResponse{
Name: "com.smalltown.vfs",
VendorVersion: "0.0.1", // TODO(lorenz): Maybe stamp?
}, nil
}
-func (*csiServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
+
+func (*csiPluginServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
@@ -223,23 +224,21 @@
}, nil
}
-func (s *csiServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
- _, err := s.manager.GetPathInPlace(storage.PlaceData, volumeDir)
- ready := err == nil
- return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: ready}}, nil
+func (s *csiPluginServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
+ return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: true}}, nil
}
// Registration endpoints
-func (s *csiServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
+func (s *csiPluginServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
return &pluginregistration.PluginInfo{
Type: "CSIPlugin",
Name: "com.smalltown.vfs",
- Endpoint: pluginSocketPath,
+ Endpoint: s.KubeletDirectory.Plugins.VFS.FullPath(),
SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
}, nil
}
-func (s *csiServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
+func (s *csiPluginServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
if req.Error != "" {
s.logger.Warn("Kubelet failed registering CSI plugin", zap.String("error", req.Error))
}