blob: eb43ec01278a5c83863914e7b6bb087a85a301fe [file] [log] [blame]
Lorenz Brun0db90ba2020-04-06 14:04:52 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package kubernetes
18
19import (
20 "context"
Tim Windelschmidtd5f851b2024-04-23 14:59:37 +020021 "errors"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020022 "fmt"
23 "net"
24 "os"
25 "path/filepath"
26 "regexp"
27
Lorenz Brun0db90ba2020-04-06 14:04:52 +020028 "github.com/container-storage-interface/spec/lib/go/csi"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020029 "golang.org/x/sys/unix"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/status"
Lorenz Brun65702192023-08-31 16:27:38 +020033 "google.golang.org/protobuf/types/known/wrapperspb"
Lorenz Brun6211e4d2023-11-14 19:09:40 +010034 pluginregistration "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020035
Serge Bazanski3c5d0632024-09-12 10:49:12 +000036 "source.monogon.dev/go/logging"
Serge Bazanski31370b02021-01-07 16:31:14 +010037 "source.monogon.dev/metropolis/node/core/localstorage"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020038 "source.monogon.dev/osbase/fsquota"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020039 "source.monogon.dev/osbase/loop"
40 "source.monogon.dev/osbase/supervisor"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020041)
42
Serge Bazanski216fe7b2021-05-21 18:36:16 +020043// Derived from K8s spec for acceptable names, but shortened to 130 characters
44// to avoid issues with maximum path length. We don't provision longer names so
45// this applies only if you manually create a volume with a name of more than
46// 130 characters.
Lorenz Brun37050122021-03-30 14:00:27 +020047var acceptableNames = regexp.MustCompile("^[a-z][a-z0-9-.]{0,128}[a-z0-9]$")
Lorenz Brun0db90ba2020-04-06 14:04:52 +020048
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020049type csiPluginServer struct {
Lorenz Brun37050122021-03-30 14:00:27 +020050 *csi.UnimplementedNodeServer
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020051 KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
52 VolumesDirectory *localstorage.DataVolumesDirectory
Lorenz Brun0db90ba2020-04-06 14:04:52 +020053
Serge Bazanski3c5d0632024-09-12 10:49:12 +000054 logger logging.Leveled
Lorenz Brun0db90ba2020-04-06 14:04:52 +020055}
56
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020057func (s *csiPluginServer) Run(ctx context.Context) error {
58 s.logger = supervisor.Logger(ctx)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020059
Lorenz Brun4599aa22023-06-28 13:09:32 +020060 // Try to remove socket if an unclean shutdown happened.
61 os.Remove(s.KubeletDirectory.Plugins.VFS.FullPath())
62
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020063 pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.Plugins.VFS.FullPath(), Net: "unix"})
64 if err != nil {
65 return fmt.Errorf("failed to listen on CSI socket: %w", err)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020066 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020067
68 pluginServer := grpc.NewServer()
69 csi.RegisterIdentityServer(pluginServer, s)
70 csi.RegisterNodeServer(pluginServer, s)
Serge Bazanski216fe7b2021-05-21 18:36:16 +020071 // Enable graceful shutdown since we don't have long-running RPCs and most
72 // of them shouldn't and can't be cancelled anyways.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020073 if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
74 return err
75 }
76
Lorenz Brun1dd0c652024-02-20 18:45:06 +010077 r := pluginRegistrationServer{
78 regErr: make(chan error, 1),
79 KubeletDirectory: s.KubeletDirectory,
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020080 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020081
Lorenz Brun1dd0c652024-02-20 18:45:06 +010082 if err := supervisor.Run(ctx, "registration", r.Run); err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020083 return err
84 }
85 supervisor.Signal(ctx, supervisor.SignalHealthy)
86 supervisor.Signal(ctx, supervisor.SignalDone)
87 return nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +020088}
89
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020090func (s *csiPluginServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +020091 if !acceptableNames.MatchString(req.VolumeId) {
92 return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
93 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020094
95 // TODO(q3k): move this logic to localstorage?
96 volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
97
Lorenz Brun0db90ba2020-04-06 14:04:52 +020098 switch req.VolumeCapability.AccessMode.Mode {
99 case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
100 case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
101 default:
102 return nil, status.Error(codes.InvalidArgument, "unsupported access mode")
103 }
Lorenz Brund1c392a2023-07-06 19:10:56 +0200104 if err := os.MkdirAll(req.TargetPath, 0700); err != nil {
105 return nil, status.Errorf(codes.Internal, "unable to create requested target path: %v", err)
106 }
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200107 switch req.VolumeCapability.AccessType.(type) {
108 case *csi.VolumeCapability_Mount:
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200109 var mountFlags uintptr = unix.MS_BIND
110 if req.Readonly {
111 mountFlags |= unix.MS_RDONLY
112 }
113
114 err := unix.Mount(volumePath, req.TargetPath, "", mountFlags, "")
Lorenz Brun37050122021-03-30 14:00:27 +0200115 switch {
Tim Windelschmidtd5f851b2024-04-23 14:59:37 +0200116 case errors.Is(err, unix.ENOENT):
Lorenz Brun37050122021-03-30 14:00:27 +0200117 return nil, status.Error(codes.NotFound, "volume not found")
118 case err != nil:
119 return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
120 }
121
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200122 flagSet := make(map[string]bool)
123 for _, flag := range req.VolumeCapability.GetMount().GetMountFlags() {
124 flagSet[flag] = true
125 }
126
127 flagPairs := map[string]string{
128 "exec": "noexec",
129 "dev": "nodev",
130 "suid": "nosuid",
131 }
132 for pFlag, nFlag := range flagPairs {
133 if flagSet[pFlag] && flagSet[nFlag] {
134 return nil, status.Errorf(codes.InvalidArgument, "contradictory flag pair found. can't have both %q and %q set", pFlag, nFlag)
135 } else if !flagSet[pFlag] && !flagSet[nFlag] {
136 // If neither of a flag pair is found, add the negative flag as default.
137 flagSet[nFlag] = true
Lorenz Brun37050122021-03-30 14:00:27 +0200138 }
139 }
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200140
141 var mountAttr unix.MountAttr
142 for flag := range flagSet {
143 switch flag {
144 case "exec":
Jan Schär652c2ad2024-11-19 17:40:50 +0100145 mountAttr.Attr_clr |= unix.MOUNT_ATTR_NOEXEC
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200146 case "noexec":
Jan Schär652c2ad2024-11-19 17:40:50 +0100147 mountAttr.Attr_set |= unix.MOUNT_ATTR_NOEXEC
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200148 case "dev":
Jan Schär652c2ad2024-11-19 17:40:50 +0100149 mountAttr.Attr_clr |= unix.MOUNT_ATTR_NODEV
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200150 case "nodev":
Jan Schär652c2ad2024-11-19 17:40:50 +0100151 mountAttr.Attr_set |= unix.MOUNT_ATTR_NODEV
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200152 case "suid":
Jan Schär652c2ad2024-11-19 17:40:50 +0100153 mountAttr.Attr_clr |= unix.MOUNT_ATTR_NOSUID
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200154 case "nosuid":
Jan Schär652c2ad2024-11-19 17:40:50 +0100155 mountAttr.Attr_set |= unix.MOUNT_ATTR_NOSUID
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200156 default:
157 return nil, status.Errorf(codes.InvalidArgument, "unknown mount flag: %s", flag)
158 }
159 }
160
161 if err := unix.MountSetattr(-1, req.TargetPath, 0, &mountAttr); err != nil {
162 _ = unix.Unmount(req.TargetPath, 0) // Best-effort
163 return nil, status.Errorf(codes.Internal, "unable to set mount attributes: %v", err)
164 }
Lorenz Brun37050122021-03-30 14:00:27 +0200165 case *csi.VolumeCapability_Block:
166 f, err := os.OpenFile(volumePath, os.O_RDWR, 0)
167 if err != nil {
168 return nil, status.Errorf(codes.Unavailable, "failed to open block volume: %v", err)
169 }
170 defer f.Close()
171 var flags uint32 = loop.FlagDirectIO
172 if req.Readonly {
173 flags |= loop.FlagReadOnly
174 }
175 loopdev, err := loop.Create(f, loop.Config{Flags: flags})
176 if err != nil {
177 return nil, status.Errorf(codes.Unavailable, "failed to create loop device: %v", err)
178 }
179 loopdevNum, err := loopdev.Dev()
180 if err != nil {
181 loopdev.Remove()
182 return nil, status.Errorf(codes.Internal, "device number not available: %v", err)
183 }
184 if err := unix.Mknod(req.TargetPath, unix.S_IFBLK|0640, int(loopdevNum)); err != nil {
185 loopdev.Remove()
186 return nil, status.Errorf(codes.Unavailable, "failed to create device node at target path: %v", err)
187 }
188 loopdev.Close()
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200189 default:
190 return nil, status.Error(codes.InvalidArgument, "unsupported access type")
191 }
192
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200193 return &csi.NodePublishVolumeResponse{}, nil
194}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200195
Lorenz Brun37050122021-03-30 14:00:27 +0200196func (s *csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
197 loopdev, err := loop.Open(req.TargetPath)
198 if err == nil {
199 defer loopdev.Close()
200 // We have a block device
201 if err := loopdev.Remove(); err != nil {
202 return nil, status.Errorf(codes.Unavailable, "failed to remove loop device: %v", err)
203 }
204 if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) {
205 return nil, status.Errorf(codes.Unavailable, "failed to remove device inode: %v", err)
206 }
207 return &csi.NodeUnpublishVolumeResponse{}, nil
208 }
209 // Otherwise try a normal unmount
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200210 if err := unix.Unmount(req.TargetPath, 0); err != nil {
211 return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
212 }
213 return &csi.NodeUnpublishVolumeResponse{}, nil
214}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200215
216func (*csiPluginServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200217 quota, err := fsquota.GetQuota(req.VolumePath)
218 if os.IsNotExist(err) {
219 return nil, status.Error(codes.NotFound, "volume does not exist at this path")
220 } else if err != nil {
221 return nil, status.Errorf(codes.Unavailable, "failed to get quota: %v", err)
222 }
223
224 return &csi.NodeGetVolumeStatsResponse{
225 Usage: []*csi.VolumeUsage{
226 {
227 Total: int64(quota.Bytes),
228 Unit: csi.VolumeUsage_BYTES,
229 Used: int64(quota.BytesUsed),
230 Available: int64(quota.Bytes - quota.BytesUsed),
231 },
232 {
233 Total: int64(quota.Inodes),
234 Unit: csi.VolumeUsage_INODES,
235 Used: int64(quota.InodesUsed),
236 Available: int64(quota.Inodes - quota.InodesUsed),
237 },
238 },
239 }, nil
240}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200241
Lorenz Brun37050122021-03-30 14:00:27 +0200242func (s *csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200243 if req.CapacityRange.LimitBytes <= 0 {
244 return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
245 }
Lorenz Brun37050122021-03-30 14:00:27 +0200246 loopdev, err := loop.Open(req.VolumePath)
247 if err == nil {
248 defer loopdev.Close()
249 volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
250 imageFile, err := os.OpenFile(volumePath, os.O_RDWR, 0)
251 if err != nil {
252 return nil, status.Errorf(codes.Unavailable, "failed to open block volume backing file: %v", err)
253 }
254 defer imageFile.Close()
255 if err := unix.Fallocate(int(imageFile.Fd()), 0, 0, req.CapacityRange.LimitBytes); err != nil {
256 return nil, status.Errorf(codes.Unavailable, "failed to expand volume using fallocate: %v", err)
257 }
258 if err := loopdev.RefreshSize(); err != nil {
259 return nil, status.Errorf(codes.Unavailable, "failed to refresh loop device size: %v", err)
260 }
261 return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
262 }
Lorenz Brun397f7ea2024-08-20 21:26:06 +0200263 if err := fsquota.SetQuota(req.VolumePath, uint64(req.CapacityRange.LimitBytes), uint64(req.CapacityRange.LimitBytes)/inodeCapacityRatio); err != nil {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200264 return nil, status.Errorf(codes.Unavailable, "failed to update quota: %v", err)
265 }
266 return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
267}
268
269func rpcCapability(cap csi.NodeServiceCapability_RPC_Type) *csi.NodeServiceCapability {
270 return &csi.NodeServiceCapability{
271 Type: &csi.NodeServiceCapability_Rpc{
272 Rpc: &csi.NodeServiceCapability_RPC{Type: cap},
273 },
274 }
275}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200276
277func (*csiPluginServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200278 return &csi.NodeGetCapabilitiesResponse{
279 Capabilities: []*csi.NodeServiceCapability{
280 rpcCapability(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
281 rpcCapability(csi.NodeServiceCapability_RPC_GET_VOLUME_STATS),
282 },
283 }, nil
284}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200285
286func (*csiPluginServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200287 hostname, err := os.Hostname()
288 if err != nil {
289 return nil, status.Errorf(codes.Unavailable, "failed to get node identity: %v", err)
290 }
291 return &csi.NodeGetInfoResponse{
292 NodeId: hostname,
293 }, nil
294}
295
296// CSI Identity endpoints
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200297func (*csiPluginServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200298 return &csi.GetPluginInfoResponse{
Serge Bazanski662b5b32020-12-21 13:49:00 +0100299 Name: "dev.monogon.metropolis.vfs",
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200300 VendorVersion: "0.0.1", // TODO(lorenz): Maybe stamp?
301 }, nil
302}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200303
304func (*csiPluginServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200305 return &csi.GetPluginCapabilitiesResponse{
306 Capabilities: []*csi.PluginCapability{
307 {
308 Type: &csi.PluginCapability_VolumeExpansion_{
309 VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
310 Type: csi.PluginCapability_VolumeExpansion_ONLINE,
311 },
312 },
313 },
314 },
315 }, nil
316}
317
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200318func (s *csiPluginServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
Lorenz Brun65702192023-08-31 16:27:38 +0200319 return &csi.ProbeResponse{Ready: &wrapperspb.BoolValue{Value: true}}, nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200320}
321
Lorenz Brun1dd0c652024-02-20 18:45:06 +0100322// pluginRegistrationServer implements the pluginregistration.Registration
323// service. It has a special restart mechanic to accomodate a design issue
324// in Kubelet which requires it to remove and recreate its gRPC socket for
325// every new registration attempt.
326type pluginRegistrationServer struct {
327 // regErr has a buffer of 1, so that at least one error can always be
328 // sent into it in a non-blocking way. There is a race if
329 // NotifyRegistrationStatus is called twice with an error as the buffered
330 // item might have been received but not fully processed yet.
331 // As distinguishing between calls on different socket iterations is
332 // hard, doing it this way errs on the side of caution, i.e.
333 // generating too many restarts. This way is better as if we miss one
334 // such error the registration will not be available until the node
335 // gets restarted.
336 regErr chan error
337
338 KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
339}
340
341func (r *pluginRegistrationServer) Run(ctx context.Context) error {
342 // Remove registration socket if it exists
343 os.Remove(r.KubeletDirectory.PluginsRegistry.VFSReg.FullPath())
344
345 registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: r.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
346 if err != nil {
347 return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
348 }
349 defer registrationListener.Close()
350
351 grpcS := grpc.NewServer()
352 pluginregistration.RegisterRegistrationServer(grpcS, r)
353
354 supervisor.Run(ctx, "rpc", supervisor.GRPCServer(grpcS, registrationListener, true))
355 supervisor.Signal(ctx, supervisor.SignalHealthy)
356 select {
357 case <-ctx.Done():
358 return ctx.Err()
359 case err = <-r.regErr:
360 return err
361 }
362}
363
364func (r *pluginRegistrationServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200365 return &pluginregistration.PluginInfo{
Lorenz Brun4e090352021-03-17 17:44:41 +0100366 Type: pluginregistration.CSIPlugin,
Serge Bazanski662b5b32020-12-21 13:49:00 +0100367 Name: "dev.monogon.metropolis.vfs",
Lorenz Brun1dd0c652024-02-20 18:45:06 +0100368 Endpoint: r.KubeletDirectory.Plugins.VFS.FullPath(),
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200369 SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
370 }, nil
371}
372
Lorenz Brun1dd0c652024-02-20 18:45:06 +0100373func (r *pluginRegistrationServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
374 if !req.PluginRegistered {
375 select {
376 case r.regErr <- fmt.Errorf("registration failed: %v", req.Error):
377 default:
378 }
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200379 }
380 return &pluginregistration.RegistrationStatusResponse{}, nil
381}