blob: def1d6d67c583f165290d7311193c2e2c93897a0 [file] [log] [blame]
Lorenz Brun0db90ba2020-04-06 14:04:52 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package kubernetes
18
19import (
20 "context"
21 "fmt"
22 "net"
23 "os"
24 "path/filepath"
25 "regexp"
26
Serge Bazanskic7359672020-10-30 16:38:57 +010027 "git.monogon.dev/source/nexantic.git/core/pkg/logtree"
28
Lorenz Brun0db90ba2020-04-06 14:04:52 +020029 "github.com/container-storage-interface/spec/lib/go/csi"
30 "github.com/golang/protobuf/ptypes/wrappers"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020031 "golang.org/x/sys/unix"
32 "google.golang.org/grpc"
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/status"
35 pluginregistration "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
36
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020037 "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
38 "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020039 "git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
40)
41
42// Derived from K8s spec for acceptable names, but shortened to 130 characters to avoid issues with
43// maximum path length. We don't provision longer names so this applies only if you manually create
44// a volume with a name of more than 130 characters.
Lorenz Brunb15abad2020-04-16 11:17:12 +020045var acceptableNames = regexp.MustCompile("^[a-z][a-bz0-9-.]{0,128}[a-z0-9]$")
Lorenz Brun0db90ba2020-04-06 14:04:52 +020046
47const volumeDir = "volumes"
48
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020049type csiPluginServer struct {
50 KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
51 VolumesDirectory *localstorage.DataVolumesDirectory
Lorenz Brun0db90ba2020-04-06 14:04:52 +020052
Serge Bazanskic7359672020-10-30 16:38:57 +010053 logger logtree.LeveledLogger
Lorenz Brun0db90ba2020-04-06 14:04:52 +020054}
55
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020056func (s *csiPluginServer) Run(ctx context.Context) error {
57 s.logger = supervisor.Logger(ctx)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020058
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020059 pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.Plugins.VFS.FullPath(), Net: "unix"})
60 if err != nil {
61 return fmt.Errorf("failed to listen on CSI socket: %w", err)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020062 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020063 pluginListener.SetUnlinkOnClose(true)
64
65 pluginServer := grpc.NewServer()
66 csi.RegisterIdentityServer(pluginServer, s)
67 csi.RegisterNodeServer(pluginServer, s)
68 // Enable graceful shutdown since we don't have long-running RPCs and most of them shouldn't and can't be
69 // cancelled anyways.
70 if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
71 return err
72 }
73
74 registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
75 if err != nil {
76 return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
77 }
78 registrationListener.SetUnlinkOnClose(true)
79
80 registrationServer := grpc.NewServer()
81 pluginregistration.RegisterRegistrationServer(registrationServer, s)
82 if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
83 return err
84 }
85 supervisor.Signal(ctx, supervisor.SignalHealthy)
86 supervisor.Signal(ctx, supervisor.SignalDone)
87 return nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +020088}
89
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020090func (*csiPluginServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +020091 return nil, status.Errorf(codes.Unimplemented, "method NodeStageVolume not supported")
92}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020093
94func (*csiPluginServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +020095 return nil, status.Errorf(codes.Unimplemented, "method NodeUnstageVolume not supported")
96}
97
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020098func (s *csiPluginServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +020099 if !acceptableNames.MatchString(req.VolumeId) {
100 return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
101 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200102
103 // TODO(q3k): move this logic to localstorage?
104 volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
105
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200106 switch req.VolumeCapability.AccessMode.Mode {
107 case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
108 case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
109 default:
110 return nil, status.Error(codes.InvalidArgument, "unsupported access mode")
111 }
112 switch req.VolumeCapability.AccessType.(type) {
113 case *csi.VolumeCapability_Mount:
114 default:
115 return nil, status.Error(codes.InvalidArgument, "unsupported access type")
116 }
117
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200118 err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
119 switch {
120 case err == unix.ENOENT:
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200121 return nil, status.Error(codes.NotFound, "volume not found")
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200122 case err != nil:
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200123 return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
124 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200125
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200126 if req.Readonly {
127 err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND|unix.MS_REMOUNT|unix.MS_RDONLY, "")
128 if err != nil {
129 _ = unix.Unmount(req.TargetPath, 0) // Best-effort
130 return nil, status.Errorf(codes.Unavailable, "failed to remount volume: %v", err)
131 }
132 }
133 return &csi.NodePublishVolumeResponse{}, nil
134}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200135
136func (*csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200137 if err := unix.Unmount(req.TargetPath, 0); err != nil {
138 return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
139 }
140 return &csi.NodeUnpublishVolumeResponse{}, nil
141}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200142
143func (*csiPluginServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200144 quota, err := fsquota.GetQuota(req.VolumePath)
145 if os.IsNotExist(err) {
146 return nil, status.Error(codes.NotFound, "volume does not exist at this path")
147 } else if err != nil {
148 return nil, status.Errorf(codes.Unavailable, "failed to get quota: %v", err)
149 }
150
151 return &csi.NodeGetVolumeStatsResponse{
152 Usage: []*csi.VolumeUsage{
153 {
154 Total: int64(quota.Bytes),
155 Unit: csi.VolumeUsage_BYTES,
156 Used: int64(quota.BytesUsed),
157 Available: int64(quota.Bytes - quota.BytesUsed),
158 },
159 {
160 Total: int64(quota.Inodes),
161 Unit: csi.VolumeUsage_INODES,
162 Used: int64(quota.InodesUsed),
163 Available: int64(quota.Inodes - quota.InodesUsed),
164 },
165 },
166 }, nil
167}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200168
169func (*csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200170 if req.CapacityRange.LimitBytes <= 0 {
171 return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
172 }
173 if err := fsquota.SetQuota(req.VolumePath, uint64(req.CapacityRange.LimitBytes), 0); err != nil {
174 return nil, status.Errorf(codes.Unavailable, "failed to update quota: %v", err)
175 }
176 return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
177}
178
179func rpcCapability(cap csi.NodeServiceCapability_RPC_Type) *csi.NodeServiceCapability {
180 return &csi.NodeServiceCapability{
181 Type: &csi.NodeServiceCapability_Rpc{
182 Rpc: &csi.NodeServiceCapability_RPC{Type: cap},
183 },
184 }
185}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200186
187func (*csiPluginServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200188 return &csi.NodeGetCapabilitiesResponse{
189 Capabilities: []*csi.NodeServiceCapability{
190 rpcCapability(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
191 rpcCapability(csi.NodeServiceCapability_RPC_GET_VOLUME_STATS),
192 },
193 }, nil
194}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200195
196func (*csiPluginServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200197 hostname, err := os.Hostname()
198 if err != nil {
199 return nil, status.Errorf(codes.Unavailable, "failed to get node identity: %v", err)
200 }
201 return &csi.NodeGetInfoResponse{
202 NodeId: hostname,
203 }, nil
204}
205
206// CSI Identity endpoints
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200207func (*csiPluginServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200208 return &csi.GetPluginInfoResponse{
209 Name: "com.smalltown.vfs",
210 VendorVersion: "0.0.1", // TODO(lorenz): Maybe stamp?
211 }, nil
212}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200213
214func (*csiPluginServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200215 return &csi.GetPluginCapabilitiesResponse{
216 Capabilities: []*csi.PluginCapability{
217 {
218 Type: &csi.PluginCapability_VolumeExpansion_{
219 VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
220 Type: csi.PluginCapability_VolumeExpansion_ONLINE,
221 },
222 },
223 },
224 },
225 }, nil
226}
227
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200228func (s *csiPluginServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
229 return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: true}}, nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200230}
231
232// Registration endpoints
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200233func (s *csiPluginServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200234 return &pluginregistration.PluginInfo{
235 Type: "CSIPlugin",
236 Name: "com.smalltown.vfs",
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200237 Endpoint: s.KubeletDirectory.Plugins.VFS.FullPath(),
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200238 SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
239 }, nil
240}
241
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200242func (s *csiPluginServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200243 if req.Error != "" {
Serge Bazanskic7359672020-10-30 16:38:57 +0100244 s.logger.Warningf("Kubelet failed registering CSI plugin: %v", req.Error)
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200245 }
246 return &pluginregistration.RegistrationStatusResponse{}, nil
247}