blob: f150a13da9519927026413649c5dd64349b16e28 [file] [log] [blame]
Lorenz Brun0db90ba2020-04-06 14:04:52 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package kubernetes
18
19import (
20 "context"
21 "fmt"
22 "net"
23 "os"
24 "path/filepath"
25 "regexp"
26
Lorenz Brun0db90ba2020-04-06 14:04:52 +020027 "github.com/container-storage-interface/spec/lib/go/csi"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020028 "golang.org/x/sys/unix"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/status"
Lorenz Brun65702192023-08-31 16:27:38 +020032 "google.golang.org/protobuf/types/known/wrapperspb"
Lorenz Brun6211e4d2023-11-14 19:09:40 +010033 pluginregistration "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020034
Serge Bazanski31370b02021-01-07 16:31:14 +010035 "source.monogon.dev/metropolis/node/core/localstorage"
Serge Bazanski31370b02021-01-07 16:31:14 +010036 "source.monogon.dev/metropolis/pkg/fsquota"
Lorenz Brun4e090352021-03-17 17:44:41 +010037 "source.monogon.dev/metropolis/pkg/logtree"
Lorenz Brun37050122021-03-30 14:00:27 +020038 "source.monogon.dev/metropolis/pkg/loop"
Serge Bazanski31370b02021-01-07 16:31:14 +010039 "source.monogon.dev/metropolis/pkg/supervisor"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020040)
41
Serge Bazanski216fe7b2021-05-21 18:36:16 +020042// Derived from K8s spec for acceptable names, but shortened to 130 characters
43// to avoid issues with maximum path length. We don't provision longer names so
44// this applies only if you manually create a volume with a name of more than
45// 130 characters.
Lorenz Brun37050122021-03-30 14:00:27 +020046var acceptableNames = regexp.MustCompile("^[a-z][a-z0-9-.]{0,128}[a-z0-9]$")
Lorenz Brun0db90ba2020-04-06 14:04:52 +020047
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020048type csiPluginServer struct {
Lorenz Brun37050122021-03-30 14:00:27 +020049 *csi.UnimplementedNodeServer
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020050 KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
51 VolumesDirectory *localstorage.DataVolumesDirectory
Lorenz Brun0db90ba2020-04-06 14:04:52 +020052
Serge Bazanskic7359672020-10-30 16:38:57 +010053 logger logtree.LeveledLogger
Lorenz Brun0db90ba2020-04-06 14:04:52 +020054}
55
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020056func (s *csiPluginServer) Run(ctx context.Context) error {
57 s.logger = supervisor.Logger(ctx)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020058
Lorenz Brun4599aa22023-06-28 13:09:32 +020059 // Try to remove socket if an unclean shutdown happened.
60 os.Remove(s.KubeletDirectory.Plugins.VFS.FullPath())
61
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020062 pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.Plugins.VFS.FullPath(), Net: "unix"})
63 if err != nil {
64 return fmt.Errorf("failed to listen on CSI socket: %w", err)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020065 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020066
67 pluginServer := grpc.NewServer()
68 csi.RegisterIdentityServer(pluginServer, s)
69 csi.RegisterNodeServer(pluginServer, s)
Serge Bazanski216fe7b2021-05-21 18:36:16 +020070 // Enable graceful shutdown since we don't have long-running RPCs and most
71 // of them shouldn't and can't be cancelled anyways.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020072 if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
73 return err
74 }
75
Lorenz Brun1dd0c652024-02-20 18:45:06 +010076 r := pluginRegistrationServer{
77 regErr: make(chan error, 1),
78 KubeletDirectory: s.KubeletDirectory,
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020079 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020080
Lorenz Brun1dd0c652024-02-20 18:45:06 +010081 if err := supervisor.Run(ctx, "registration", r.Run); err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020082 return err
83 }
84 supervisor.Signal(ctx, supervisor.SignalHealthy)
85 supervisor.Signal(ctx, supervisor.SignalDone)
86 return nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +020087}
88
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020089func (s *csiPluginServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +020090 if !acceptableNames.MatchString(req.VolumeId) {
91 return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
92 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020093
94 // TODO(q3k): move this logic to localstorage?
95 volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
96
Lorenz Brun0db90ba2020-04-06 14:04:52 +020097 switch req.VolumeCapability.AccessMode.Mode {
98 case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
99 case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
100 default:
101 return nil, status.Error(codes.InvalidArgument, "unsupported access mode")
102 }
Lorenz Brund1c392a2023-07-06 19:10:56 +0200103 if err := os.MkdirAll(req.TargetPath, 0700); err != nil {
104 return nil, status.Errorf(codes.Internal, "unable to create requested target path: %v", err)
105 }
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200106 switch req.VolumeCapability.AccessType.(type) {
107 case *csi.VolumeCapability_Mount:
Lorenz Brun37050122021-03-30 14:00:27 +0200108 err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
109 switch {
110 case err == unix.ENOENT:
111 return nil, status.Error(codes.NotFound, "volume not found")
112 case err != nil:
113 return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
114 }
115
116 if req.Readonly {
117 err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND|unix.MS_REMOUNT|unix.MS_RDONLY, "")
118 if err != nil {
119 _ = unix.Unmount(req.TargetPath, 0) // Best-effort
120 return nil, status.Errorf(codes.Unavailable, "failed to remount volume: %v", err)
121 }
122 }
123 case *csi.VolumeCapability_Block:
124 f, err := os.OpenFile(volumePath, os.O_RDWR, 0)
125 if err != nil {
126 return nil, status.Errorf(codes.Unavailable, "failed to open block volume: %v", err)
127 }
128 defer f.Close()
129 var flags uint32 = loop.FlagDirectIO
130 if req.Readonly {
131 flags |= loop.FlagReadOnly
132 }
133 loopdev, err := loop.Create(f, loop.Config{Flags: flags})
134 if err != nil {
135 return nil, status.Errorf(codes.Unavailable, "failed to create loop device: %v", err)
136 }
137 loopdevNum, err := loopdev.Dev()
138 if err != nil {
139 loopdev.Remove()
140 return nil, status.Errorf(codes.Internal, "device number not available: %v", err)
141 }
142 if err := unix.Mknod(req.TargetPath, unix.S_IFBLK|0640, int(loopdevNum)); err != nil {
143 loopdev.Remove()
144 return nil, status.Errorf(codes.Unavailable, "failed to create device node at target path: %v", err)
145 }
146 loopdev.Close()
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200147 default:
148 return nil, status.Error(codes.InvalidArgument, "unsupported access type")
149 }
150
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200151 return &csi.NodePublishVolumeResponse{}, nil
152}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200153
Lorenz Brun37050122021-03-30 14:00:27 +0200154func (s *csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
155 loopdev, err := loop.Open(req.TargetPath)
156 if err == nil {
157 defer loopdev.Close()
158 // We have a block device
159 if err := loopdev.Remove(); err != nil {
160 return nil, status.Errorf(codes.Unavailable, "failed to remove loop device: %v", err)
161 }
162 if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) {
163 return nil, status.Errorf(codes.Unavailable, "failed to remove device inode: %v", err)
164 }
165 return &csi.NodeUnpublishVolumeResponse{}, nil
166 }
167 // Otherwise try a normal unmount
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200168 if err := unix.Unmount(req.TargetPath, 0); err != nil {
169 return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
170 }
171 return &csi.NodeUnpublishVolumeResponse{}, nil
172}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200173
174func (*csiPluginServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200175 quota, err := fsquota.GetQuota(req.VolumePath)
176 if os.IsNotExist(err) {
177 return nil, status.Error(codes.NotFound, "volume does not exist at this path")
178 } else if err != nil {
179 return nil, status.Errorf(codes.Unavailable, "failed to get quota: %v", err)
180 }
181
182 return &csi.NodeGetVolumeStatsResponse{
183 Usage: []*csi.VolumeUsage{
184 {
185 Total: int64(quota.Bytes),
186 Unit: csi.VolumeUsage_BYTES,
187 Used: int64(quota.BytesUsed),
188 Available: int64(quota.Bytes - quota.BytesUsed),
189 },
190 {
191 Total: int64(quota.Inodes),
192 Unit: csi.VolumeUsage_INODES,
193 Used: int64(quota.InodesUsed),
194 Available: int64(quota.Inodes - quota.InodesUsed),
195 },
196 },
197 }, nil
198}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200199
Lorenz Brun37050122021-03-30 14:00:27 +0200200func (s *csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200201 if req.CapacityRange.LimitBytes <= 0 {
202 return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
203 }
Lorenz Brun37050122021-03-30 14:00:27 +0200204 loopdev, err := loop.Open(req.VolumePath)
205 if err == nil {
206 defer loopdev.Close()
207 volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
208 imageFile, err := os.OpenFile(volumePath, os.O_RDWR, 0)
209 if err != nil {
210 return nil, status.Errorf(codes.Unavailable, "failed to open block volume backing file: %v", err)
211 }
212 defer imageFile.Close()
213 if err := unix.Fallocate(int(imageFile.Fd()), 0, 0, req.CapacityRange.LimitBytes); err != nil {
214 return nil, status.Errorf(codes.Unavailable, "failed to expand volume using fallocate: %v", err)
215 }
216 if err := loopdev.RefreshSize(); err != nil {
217 return nil, status.Errorf(codes.Unavailable, "failed to refresh loop device size: %v", err)
218 }
219 return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
220 }
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200221 if err := fsquota.SetQuota(req.VolumePath, uint64(req.CapacityRange.LimitBytes), 0); err != nil {
222 return nil, status.Errorf(codes.Unavailable, "failed to update quota: %v", err)
223 }
224 return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
225}
226
227func rpcCapability(cap csi.NodeServiceCapability_RPC_Type) *csi.NodeServiceCapability {
228 return &csi.NodeServiceCapability{
229 Type: &csi.NodeServiceCapability_Rpc{
230 Rpc: &csi.NodeServiceCapability_RPC{Type: cap},
231 },
232 }
233}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200234
235func (*csiPluginServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200236 return &csi.NodeGetCapabilitiesResponse{
237 Capabilities: []*csi.NodeServiceCapability{
238 rpcCapability(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
239 rpcCapability(csi.NodeServiceCapability_RPC_GET_VOLUME_STATS),
240 },
241 }, nil
242}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200243
244func (*csiPluginServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200245 hostname, err := os.Hostname()
246 if err != nil {
247 return nil, status.Errorf(codes.Unavailable, "failed to get node identity: %v", err)
248 }
249 return &csi.NodeGetInfoResponse{
250 NodeId: hostname,
251 }, nil
252}
253
254// CSI Identity endpoints
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200255func (*csiPluginServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200256 return &csi.GetPluginInfoResponse{
Serge Bazanski662b5b32020-12-21 13:49:00 +0100257 Name: "dev.monogon.metropolis.vfs",
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200258 VendorVersion: "0.0.1", // TODO(lorenz): Maybe stamp?
259 }, nil
260}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200261
262func (*csiPluginServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200263 return &csi.GetPluginCapabilitiesResponse{
264 Capabilities: []*csi.PluginCapability{
265 {
266 Type: &csi.PluginCapability_VolumeExpansion_{
267 VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
268 Type: csi.PluginCapability_VolumeExpansion_ONLINE,
269 },
270 },
271 },
272 },
273 }, nil
274}
275
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200276func (s *csiPluginServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
Lorenz Brun65702192023-08-31 16:27:38 +0200277 return &csi.ProbeResponse{Ready: &wrapperspb.BoolValue{Value: true}}, nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200278}
279
Lorenz Brun1dd0c652024-02-20 18:45:06 +0100280// pluginRegistrationServer implements the pluginregistration.Registration
281// service. It has a special restart mechanic to accomodate a design issue
282// in Kubelet which requires it to remove and recreate its gRPC socket for
283// every new registration attempt.
284type pluginRegistrationServer struct {
285 // regErr has a buffer of 1, so that at least one error can always be
286 // sent into it in a non-blocking way. There is a race if
287 // NotifyRegistrationStatus is called twice with an error as the buffered
288 // item might have been received but not fully processed yet.
289 // As distinguishing between calls on different socket iterations is
290 // hard, doing it this way errs on the side of caution, i.e.
291 // generating too many restarts. This way is better as if we miss one
292 // such error the registration will not be available until the node
293 // gets restarted.
294 regErr chan error
295
296 KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
297}
298
299func (r *pluginRegistrationServer) Run(ctx context.Context) error {
300 // Remove registration socket if it exists
301 os.Remove(r.KubeletDirectory.PluginsRegistry.VFSReg.FullPath())
302
303 registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: r.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
304 if err != nil {
305 return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
306 }
307 defer registrationListener.Close()
308
309 grpcS := grpc.NewServer()
310 pluginregistration.RegisterRegistrationServer(grpcS, r)
311
312 supervisor.Run(ctx, "rpc", supervisor.GRPCServer(grpcS, registrationListener, true))
313 supervisor.Signal(ctx, supervisor.SignalHealthy)
314 select {
315 case <-ctx.Done():
316 return ctx.Err()
317 case err = <-r.regErr:
318 return err
319 }
320}
321
322func (r *pluginRegistrationServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200323 return &pluginregistration.PluginInfo{
Lorenz Brun4e090352021-03-17 17:44:41 +0100324 Type: pluginregistration.CSIPlugin,
Serge Bazanski662b5b32020-12-21 13:49:00 +0100325 Name: "dev.monogon.metropolis.vfs",
Lorenz Brun1dd0c652024-02-20 18:45:06 +0100326 Endpoint: r.KubeletDirectory.Plugins.VFS.FullPath(),
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200327 SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
328 }, nil
329}
330
Lorenz Brun1dd0c652024-02-20 18:45:06 +0100331func (r *pluginRegistrationServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
332 if !req.PluginRegistered {
333 select {
334 case r.regErr <- fmt.Errorf("registration failed: %v", req.Error):
335 default:
336 }
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200337 }
338 return &pluginregistration.RegistrationStatusResponse{}, nil
339}