blob: 48932540d22a5c1294b26c149582bd635fee4f11 [file] [log] [blame]
Lorenz Brun0db90ba2020-04-06 14:04:52 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package kubernetes
18
19import (
20 "context"
21 "fmt"
22 "net"
23 "os"
24 "path/filepath"
25 "regexp"
26
Lorenz Brun0db90ba2020-04-06 14:04:52 +020027 "github.com/container-storage-interface/spec/lib/go/csi"
28 "github.com/golang/protobuf/ptypes/wrappers"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020029 "golang.org/x/sys/unix"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/status"
Lorenz Brun37050122021-03-30 14:00:27 +020033 "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020034
Serge Bazanski31370b02021-01-07 16:31:14 +010035 "source.monogon.dev/metropolis/node/core/localstorage"
Serge Bazanski31370b02021-01-07 16:31:14 +010036 "source.monogon.dev/metropolis/pkg/fsquota"
Lorenz Brun4e090352021-03-17 17:44:41 +010037 "source.monogon.dev/metropolis/pkg/logtree"
Lorenz Brun37050122021-03-30 14:00:27 +020038 "source.monogon.dev/metropolis/pkg/loop"
Serge Bazanski31370b02021-01-07 16:31:14 +010039 "source.monogon.dev/metropolis/pkg/supervisor"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020040)
41
Serge Bazanski216fe7b2021-05-21 18:36:16 +020042// Derived from K8s spec for acceptable names, but shortened to 130 characters
43// to avoid issues with maximum path length. We don't provision longer names so
44// this applies only if you manually create a volume with a name of more than
45// 130 characters.
Lorenz Brun37050122021-03-30 14:00:27 +020046var acceptableNames = regexp.MustCompile("^[a-z][a-z0-9-.]{0,128}[a-z0-9]$")
Lorenz Brun0db90ba2020-04-06 14:04:52 +020047
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020048type csiPluginServer struct {
Lorenz Brun37050122021-03-30 14:00:27 +020049 *csi.UnimplementedNodeServer
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020050 KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
51 VolumesDirectory *localstorage.DataVolumesDirectory
Lorenz Brun0db90ba2020-04-06 14:04:52 +020052
Serge Bazanskic7359672020-10-30 16:38:57 +010053 logger logtree.LeveledLogger
Lorenz Brun0db90ba2020-04-06 14:04:52 +020054}
55
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020056func (s *csiPluginServer) Run(ctx context.Context) error {
57 s.logger = supervisor.Logger(ctx)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020058
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020059 pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.Plugins.VFS.FullPath(), Net: "unix"})
60 if err != nil {
61 return fmt.Errorf("failed to listen on CSI socket: %w", err)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020062 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020063 pluginListener.SetUnlinkOnClose(true)
64
65 pluginServer := grpc.NewServer()
66 csi.RegisterIdentityServer(pluginServer, s)
67 csi.RegisterNodeServer(pluginServer, s)
Serge Bazanski216fe7b2021-05-21 18:36:16 +020068 // Enable graceful shutdown since we don't have long-running RPCs and most
69 // of them shouldn't and can't be cancelled anyways.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020070 if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
71 return err
72 }
73
74 registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
75 if err != nil {
76 return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
77 }
78 registrationListener.SetUnlinkOnClose(true)
79
80 registrationServer := grpc.NewServer()
81 pluginregistration.RegisterRegistrationServer(registrationServer, s)
82 if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
83 return err
84 }
85 supervisor.Signal(ctx, supervisor.SignalHealthy)
86 supervisor.Signal(ctx, supervisor.SignalDone)
87 return nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +020088}
89
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020090func (s *csiPluginServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +020091 if !acceptableNames.MatchString(req.VolumeId) {
92 return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
93 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020094
95 // TODO(q3k): move this logic to localstorage?
96 volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
97
Lorenz Brun0db90ba2020-04-06 14:04:52 +020098 switch req.VolumeCapability.AccessMode.Mode {
99 case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
100 case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
101 default:
102 return nil, status.Error(codes.InvalidArgument, "unsupported access mode")
103 }
104 switch req.VolumeCapability.AccessType.(type) {
105 case *csi.VolumeCapability_Mount:
Lorenz Brun37050122021-03-30 14:00:27 +0200106 err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
107 switch {
108 case err == unix.ENOENT:
109 return nil, status.Error(codes.NotFound, "volume not found")
110 case err != nil:
111 return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
112 }
113
114 if req.Readonly {
115 err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND|unix.MS_REMOUNT|unix.MS_RDONLY, "")
116 if err != nil {
117 _ = unix.Unmount(req.TargetPath, 0) // Best-effort
118 return nil, status.Errorf(codes.Unavailable, "failed to remount volume: %v", err)
119 }
120 }
121 case *csi.VolumeCapability_Block:
122 f, err := os.OpenFile(volumePath, os.O_RDWR, 0)
123 if err != nil {
124 return nil, status.Errorf(codes.Unavailable, "failed to open block volume: %v", err)
125 }
126 defer f.Close()
127 var flags uint32 = loop.FlagDirectIO
128 if req.Readonly {
129 flags |= loop.FlagReadOnly
130 }
131 loopdev, err := loop.Create(f, loop.Config{Flags: flags})
132 if err != nil {
133 return nil, status.Errorf(codes.Unavailable, "failed to create loop device: %v", err)
134 }
135 loopdevNum, err := loopdev.Dev()
136 if err != nil {
137 loopdev.Remove()
138 return nil, status.Errorf(codes.Internal, "device number not available: %v", err)
139 }
140 if err := unix.Mknod(req.TargetPath, unix.S_IFBLK|0640, int(loopdevNum)); err != nil {
141 loopdev.Remove()
142 return nil, status.Errorf(codes.Unavailable, "failed to create device node at target path: %v", err)
143 }
144 loopdev.Close()
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200145 default:
146 return nil, status.Error(codes.InvalidArgument, "unsupported access type")
147 }
148
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200149 return &csi.NodePublishVolumeResponse{}, nil
150}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200151
Lorenz Brun37050122021-03-30 14:00:27 +0200152func (s *csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
153 loopdev, err := loop.Open(req.TargetPath)
154 if err == nil {
155 defer loopdev.Close()
156 // We have a block device
157 if err := loopdev.Remove(); err != nil {
158 return nil, status.Errorf(codes.Unavailable, "failed to remove loop device: %v", err)
159 }
160 if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) {
161 return nil, status.Errorf(codes.Unavailable, "failed to remove device inode: %v", err)
162 }
163 return &csi.NodeUnpublishVolumeResponse{}, nil
164 }
165 // Otherwise try a normal unmount
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200166 if err := unix.Unmount(req.TargetPath, 0); err != nil {
167 return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
168 }
169 return &csi.NodeUnpublishVolumeResponse{}, nil
170}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200171
172func (*csiPluginServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200173 quota, err := fsquota.GetQuota(req.VolumePath)
174 if os.IsNotExist(err) {
175 return nil, status.Error(codes.NotFound, "volume does not exist at this path")
176 } else if err != nil {
177 return nil, status.Errorf(codes.Unavailable, "failed to get quota: %v", err)
178 }
179
180 return &csi.NodeGetVolumeStatsResponse{
181 Usage: []*csi.VolumeUsage{
182 {
183 Total: int64(quota.Bytes),
184 Unit: csi.VolumeUsage_BYTES,
185 Used: int64(quota.BytesUsed),
186 Available: int64(quota.Bytes - quota.BytesUsed),
187 },
188 {
189 Total: int64(quota.Inodes),
190 Unit: csi.VolumeUsage_INODES,
191 Used: int64(quota.InodesUsed),
192 Available: int64(quota.Inodes - quota.InodesUsed),
193 },
194 },
195 }, nil
196}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200197
Lorenz Brun37050122021-03-30 14:00:27 +0200198func (s *csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200199 if req.CapacityRange.LimitBytes <= 0 {
200 return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
201 }
Lorenz Brun37050122021-03-30 14:00:27 +0200202 loopdev, err := loop.Open(req.VolumePath)
203 if err == nil {
204 defer loopdev.Close()
205 volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
206 imageFile, err := os.OpenFile(volumePath, os.O_RDWR, 0)
207 if err != nil {
208 return nil, status.Errorf(codes.Unavailable, "failed to open block volume backing file: %v", err)
209 }
210 defer imageFile.Close()
211 if err := unix.Fallocate(int(imageFile.Fd()), 0, 0, req.CapacityRange.LimitBytes); err != nil {
212 return nil, status.Errorf(codes.Unavailable, "failed to expand volume using fallocate: %v", err)
213 }
214 if err := loopdev.RefreshSize(); err != nil {
215 return nil, status.Errorf(codes.Unavailable, "failed to refresh loop device size: %v", err)
216 }
217 return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
218 }
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200219 if err := fsquota.SetQuota(req.VolumePath, uint64(req.CapacityRange.LimitBytes), 0); err != nil {
220 return nil, status.Errorf(codes.Unavailable, "failed to update quota: %v", err)
221 }
222 return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
223}
224
225func rpcCapability(cap csi.NodeServiceCapability_RPC_Type) *csi.NodeServiceCapability {
226 return &csi.NodeServiceCapability{
227 Type: &csi.NodeServiceCapability_Rpc{
228 Rpc: &csi.NodeServiceCapability_RPC{Type: cap},
229 },
230 }
231}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200232
233func (*csiPluginServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200234 return &csi.NodeGetCapabilitiesResponse{
235 Capabilities: []*csi.NodeServiceCapability{
236 rpcCapability(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
237 rpcCapability(csi.NodeServiceCapability_RPC_GET_VOLUME_STATS),
238 },
239 }, nil
240}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200241
242func (*csiPluginServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200243 hostname, err := os.Hostname()
244 if err != nil {
245 return nil, status.Errorf(codes.Unavailable, "failed to get node identity: %v", err)
246 }
247 return &csi.NodeGetInfoResponse{
248 NodeId: hostname,
249 }, nil
250}
251
252// CSI Identity endpoints
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200253func (*csiPluginServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200254 return &csi.GetPluginInfoResponse{
Serge Bazanski662b5b32020-12-21 13:49:00 +0100255 Name: "dev.monogon.metropolis.vfs",
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200256 VendorVersion: "0.0.1", // TODO(lorenz): Maybe stamp?
257 }, nil
258}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200259
260func (*csiPluginServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200261 return &csi.GetPluginCapabilitiesResponse{
262 Capabilities: []*csi.PluginCapability{
263 {
264 Type: &csi.PluginCapability_VolumeExpansion_{
265 VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
266 Type: csi.PluginCapability_VolumeExpansion_ONLINE,
267 },
268 },
269 },
270 },
271 }, nil
272}
273
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200274func (s *csiPluginServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
275 return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: true}}, nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200276}
277
278// Registration endpoints
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200279func (s *csiPluginServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200280 return &pluginregistration.PluginInfo{
Lorenz Brun4e090352021-03-17 17:44:41 +0100281 Type: pluginregistration.CSIPlugin,
Serge Bazanski662b5b32020-12-21 13:49:00 +0100282 Name: "dev.monogon.metropolis.vfs",
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200283 Endpoint: s.KubeletDirectory.Plugins.VFS.FullPath(),
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200284 SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
285 }, nil
286}
287
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200288func (s *csiPluginServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200289 if req.Error != "" {
Serge Bazanskic7359672020-10-30 16:38:57 +0100290 s.logger.Warningf("Kubelet failed registering CSI plugin: %v", req.Error)
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200291 }
292 return &pluginregistration.RegistrationStatusResponse{}, nil
293}