blob: 6db82bc3e20527d01ee32b66aa8771af20df748d [file] [log] [blame]
Lorenz Brun0db90ba2020-04-06 14:04:52 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package kubernetes
18
19import (
20 "context"
21 "fmt"
22 "net"
23 "os"
24 "path/filepath"
25 "regexp"
26
27 "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
28
29 "github.com/container-storage-interface/spec/lib/go/csi"
30 "github.com/golang/protobuf/ptypes/wrappers"
31 "go.uber.org/zap"
32 "golang.org/x/sys/unix"
33 "google.golang.org/grpc"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/status"
36 pluginregistration "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
37
38 "git.monogon.dev/source/nexantic.git/core/internal/storage"
39 "git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
40)
41
42// Derived from K8s spec for acceptable names, but shortened to 130 characters to avoid issues with
43// maximum path length. We don't provision longer names so this applies only if you manually create
44// a volume with a name of more than 130 characters.
Lorenz Brunb15abad2020-04-16 11:17:12 +020045var acceptableNames = regexp.MustCompile("^[a-z][a-bz0-9-.]{0,128}[a-z0-9]$")
Lorenz Brun0db90ba2020-04-06 14:04:52 +020046
47const volumeDir = "volumes"
48
49const pluginSocketPath = "/data/kubernetes/kubelet/plugins/com.smalltown.vfs.sock"
50
51type csiServer struct {
52 manager *storage.Manager
53 logger *zap.Logger
54}
55
56func runCSIPlugin(storMan *storage.Manager) supervisor.Runnable {
57 return func(ctx context.Context) error {
58 s := &csiServer{
59 manager: storMan,
60 logger: supervisor.Logger(ctx),
61 }
62 pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: pluginSocketPath, Net: "unix"})
63 if err != nil {
64 return fmt.Errorf("failed to listen on CSI socket: %w", err)
65 }
66 pluginListener.SetUnlinkOnClose(true)
67
68 pluginServer := grpc.NewServer()
69 csi.RegisterIdentityServer(pluginServer, s)
70 csi.RegisterNodeServer(pluginServer, s)
71 // Enable graceful shutdown since we don't have long-running RPCs and most of them shouldn't and can't be
72 // cancelled anyways.
73 if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
74 return err
75 }
76
77 registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{
78 Name: "/data/kubernetes/kubelet/plugins_registry/com.smalltown.vfs-reg.sock",
79 Net: "unix",
80 })
81 if err != nil {
82 return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
83 }
84 registrationListener.SetUnlinkOnClose(true)
85
86 registrationServer := grpc.NewServer()
87 pluginregistration.RegisterRegistrationServer(registrationServer, s)
88 if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
89 return err
90 }
91 supervisor.Signal(ctx, supervisor.SignalHealthy)
92 supervisor.Signal(ctx, supervisor.SignalDone)
93 return nil
94 }
95}
96
97func (*csiServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
98 return nil, status.Errorf(codes.Unimplemented, "method NodeStageVolume not supported")
99}
100func (*csiServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
101 return nil, status.Errorf(codes.Unimplemented, "method NodeUnstageVolume not supported")
102}
103
104func (s *csiServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
105 if !acceptableNames.MatchString(req.VolumeId) {
106 return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
107 }
108 volumePath, err := s.manager.GetPathInPlace(storage.PlaceData, filepath.Join(volumeDir, req.VolumeId))
109 if err != nil {
110 return nil, status.Error(codes.Unavailable, "persistent data storage not available")
111 }
112 switch req.VolumeCapability.AccessMode.Mode {
113 case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
114 case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
115 default:
116 return nil, status.Error(codes.InvalidArgument, "unsupported access mode")
117 }
118 switch req.VolumeCapability.AccessType.(type) {
119 case *csi.VolumeCapability_Mount:
120 default:
121 return nil, status.Error(codes.InvalidArgument, "unsupported access type")
122 }
123
124 err = unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
125 if err == unix.ENOENT {
126 return nil, status.Error(codes.NotFound, "volume not found")
127 } else if err != nil {
128 return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
129 }
130 if req.Readonly {
131 err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND|unix.MS_REMOUNT|unix.MS_RDONLY, "")
132 if err != nil {
133 _ = unix.Unmount(req.TargetPath, 0) // Best-effort
134 return nil, status.Errorf(codes.Unavailable, "failed to remount volume: %v", err)
135 }
136 }
137 return &csi.NodePublishVolumeResponse{}, nil
138}
139func (*csiServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
140 if err := unix.Unmount(req.TargetPath, 0); err != nil {
141 return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
142 }
143 return &csi.NodeUnpublishVolumeResponse{}, nil
144}
145func (*csiServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
146 quota, err := fsquota.GetQuota(req.VolumePath)
147 if os.IsNotExist(err) {
148 return nil, status.Error(codes.NotFound, "volume does not exist at this path")
149 } else if err != nil {
150 return nil, status.Errorf(codes.Unavailable, "failed to get quota: %v", err)
151 }
152
153 return &csi.NodeGetVolumeStatsResponse{
154 Usage: []*csi.VolumeUsage{
155 {
156 Total: int64(quota.Bytes),
157 Unit: csi.VolumeUsage_BYTES,
158 Used: int64(quota.BytesUsed),
159 Available: int64(quota.Bytes - quota.BytesUsed),
160 },
161 {
162 Total: int64(quota.Inodes),
163 Unit: csi.VolumeUsage_INODES,
164 Used: int64(quota.InodesUsed),
165 Available: int64(quota.Inodes - quota.InodesUsed),
166 },
167 },
168 }, nil
169}
170func (*csiServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
171 if req.CapacityRange.LimitBytes <= 0 {
172 return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
173 }
174 if err := fsquota.SetQuota(req.VolumePath, uint64(req.CapacityRange.LimitBytes), 0); err != nil {
175 return nil, status.Errorf(codes.Unavailable, "failed to update quota: %v", err)
176 }
177 return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
178}
179
180func rpcCapability(cap csi.NodeServiceCapability_RPC_Type) *csi.NodeServiceCapability {
181 return &csi.NodeServiceCapability{
182 Type: &csi.NodeServiceCapability_Rpc{
183 Rpc: &csi.NodeServiceCapability_RPC{Type: cap},
184 },
185 }
186}
187func (*csiServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
188 return &csi.NodeGetCapabilitiesResponse{
189 Capabilities: []*csi.NodeServiceCapability{
190 rpcCapability(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
191 rpcCapability(csi.NodeServiceCapability_RPC_GET_VOLUME_STATS),
192 },
193 }, nil
194}
195func (*csiServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
196 hostname, err := os.Hostname()
197 if err != nil {
198 return nil, status.Errorf(codes.Unavailable, "failed to get node identity: %v", err)
199 }
200 return &csi.NodeGetInfoResponse{
201 NodeId: hostname,
202 }, nil
203}
204
205// CSI Identity endpoints
206func (*csiServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
207 return &csi.GetPluginInfoResponse{
208 Name: "com.smalltown.vfs",
209 VendorVersion: "0.0.1", // TODO(lorenz): Maybe stamp?
210 }, nil
211}
212func (*csiServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
213 return &csi.GetPluginCapabilitiesResponse{
214 Capabilities: []*csi.PluginCapability{
215 {
216 Type: &csi.PluginCapability_VolumeExpansion_{
217 VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
218 Type: csi.PluginCapability_VolumeExpansion_ONLINE,
219 },
220 },
221 },
222 },
223 }, nil
224}
225
226func (s *csiServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
227 _, err := s.manager.GetPathInPlace(storage.PlaceData, volumeDir)
228 ready := err == nil
229 return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: ready}}, nil
230}
231
232// Registration endpoints
233func (s *csiServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
234 return &pluginregistration.PluginInfo{
235 Type: "CSIPlugin",
236 Name: "com.smalltown.vfs",
237 Endpoint: pluginSocketPath,
238 SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
239 }, nil
240}
241
242func (s *csiServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
243 if req.Error != "" {
244 s.logger.Warn("Kubelet failed registering CSI plugin", zap.String("error", req.Error))
245 }
246 return &pluginregistration.RegistrationStatusResponse{}, nil
247}