blob: 92c700e1d16152361a685f8abb188eca70455f9e [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
Lorenz Brun0db90ba2020-04-06 14:04:52 +02002// SPDX-License-Identifier: Apache-2.0
Lorenz Brun0db90ba2020-04-06 14:04:52 +02003
4package kubernetes
5
6import (
7 "context"
Tim Windelschmidtd5f851b2024-04-23 14:59:37 +02008 "errors"
Lorenz Brun0db90ba2020-04-06 14:04:52 +02009 "fmt"
10 "net"
11 "os"
12 "path/filepath"
13 "regexp"
14
Lorenz Brun0db90ba2020-04-06 14:04:52 +020015 "github.com/container-storage-interface/spec/lib/go/csi"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020016 "golang.org/x/sys/unix"
17 "google.golang.org/grpc"
18 "google.golang.org/grpc/codes"
19 "google.golang.org/grpc/status"
Lorenz Brun65702192023-08-31 16:27:38 +020020 "google.golang.org/protobuf/types/known/wrapperspb"
Lorenz Brun6211e4d2023-11-14 19:09:40 +010021 pluginregistration "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020022
Serge Bazanski3c5d0632024-09-12 10:49:12 +000023 "source.monogon.dev/go/logging"
Serge Bazanski31370b02021-01-07 16:31:14 +010024 "source.monogon.dev/metropolis/node/core/localstorage"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020025 "source.monogon.dev/osbase/fsquota"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020026 "source.monogon.dev/osbase/loop"
27 "source.monogon.dev/osbase/supervisor"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020028)
29
Serge Bazanski216fe7b2021-05-21 18:36:16 +020030// Derived from K8s spec for acceptable names, but shortened to 130 characters
31// to avoid issues with maximum path length. We don't provision longer names so
32// this applies only if you manually create a volume with a name of more than
33// 130 characters.
Lorenz Brun37050122021-03-30 14:00:27 +020034var acceptableNames = regexp.MustCompile("^[a-z][a-z0-9-.]{0,128}[a-z0-9]$")
Lorenz Brun0db90ba2020-04-06 14:04:52 +020035
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020036type csiPluginServer struct {
Lorenz Brun37050122021-03-30 14:00:27 +020037 *csi.UnimplementedNodeServer
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020038 KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
39 VolumesDirectory *localstorage.DataVolumesDirectory
Lorenz Brun0db90ba2020-04-06 14:04:52 +020040
Serge Bazanski3c5d0632024-09-12 10:49:12 +000041 logger logging.Leveled
Lorenz Brun0db90ba2020-04-06 14:04:52 +020042}
43
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020044func (s *csiPluginServer) Run(ctx context.Context) error {
45 s.logger = supervisor.Logger(ctx)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020046
Lorenz Brun4599aa22023-06-28 13:09:32 +020047 // Try to remove socket if an unclean shutdown happened.
48 os.Remove(s.KubeletDirectory.Plugins.VFS.FullPath())
49
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020050 pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.Plugins.VFS.FullPath(), Net: "unix"})
51 if err != nil {
52 return fmt.Errorf("failed to listen on CSI socket: %w", err)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020053 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020054
55 pluginServer := grpc.NewServer()
56 csi.RegisterIdentityServer(pluginServer, s)
57 csi.RegisterNodeServer(pluginServer, s)
Serge Bazanski216fe7b2021-05-21 18:36:16 +020058 // Enable graceful shutdown since we don't have long-running RPCs and most
59 // of them shouldn't and can't be cancelled anyways.
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020060 if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
61 return err
62 }
63
Lorenz Brun1dd0c652024-02-20 18:45:06 +010064 r := pluginRegistrationServer{
65 regErr: make(chan error, 1),
66 KubeletDirectory: s.KubeletDirectory,
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020067 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020068
Lorenz Brun1dd0c652024-02-20 18:45:06 +010069 if err := supervisor.Run(ctx, "registration", r.Run); err != nil {
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020070 return err
71 }
72 supervisor.Signal(ctx, supervisor.SignalHealthy)
73 supervisor.Signal(ctx, supervisor.SignalDone)
74 return nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +020075}
76
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020077func (s *csiPluginServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +020078 if !acceptableNames.MatchString(req.VolumeId) {
79 return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
80 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020081
82 // TODO(q3k): move this logic to localstorage?
83 volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
84
Lorenz Brun0db90ba2020-04-06 14:04:52 +020085 switch req.VolumeCapability.AccessMode.Mode {
86 case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
87 case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
88 default:
89 return nil, status.Error(codes.InvalidArgument, "unsupported access mode")
90 }
91 switch req.VolumeCapability.AccessType.(type) {
92 case *csi.VolumeCapability_Mount:
Jan Schärbe70c922024-11-21 11:16:03 +010093 if err := os.MkdirAll(req.TargetPath, 0700); err != nil {
94 return nil, status.Errorf(codes.Internal, "unable to create requested target path: %v", err)
95 }
96
Jan Schär73beb692024-11-27 17:47:09 +010097 err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
Lorenz Brun37050122021-03-30 14:00:27 +020098 switch {
Tim Windelschmidtd5f851b2024-04-23 14:59:37 +020099 case errors.Is(err, unix.ENOENT):
Lorenz Brun37050122021-03-30 14:00:27 +0200100 return nil, status.Error(codes.NotFound, "volume not found")
101 case err != nil:
102 return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
103 }
104
Jan Schärff7452b2024-11-28 13:08:55 +0100105 var flags uintptr = unix.MS_REMOUNT | unix.MS_BIND | unix.MS_NOEXEC | unix.MS_NOSUID | unix.MS_NODEV
Jan Schär73beb692024-11-27 17:47:09 +0100106 if req.Readonly {
107 flags |= unix.MS_RDONLY
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200108 }
Jan Schär73beb692024-11-27 17:47:09 +0100109 if err := unix.Mount("", req.TargetPath, "", flags, ""); err != nil {
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200110 _ = unix.Unmount(req.TargetPath, 0) // Best-effort
Jan Schär73beb692024-11-27 17:47:09 +0100111 return nil, status.Errorf(codes.Internal, "unable to set mount-point flags: %v", err)
Tim Windelschmidta8938da2024-09-13 22:34:01 +0200112 }
Lorenz Brun37050122021-03-30 14:00:27 +0200113 case *csi.VolumeCapability_Block:
114 f, err := os.OpenFile(volumePath, os.O_RDWR, 0)
115 if err != nil {
116 return nil, status.Errorf(codes.Unavailable, "failed to open block volume: %v", err)
117 }
118 defer f.Close()
119 var flags uint32 = loop.FlagDirectIO
120 if req.Readonly {
121 flags |= loop.FlagReadOnly
122 }
123 loopdev, err := loop.Create(f, loop.Config{Flags: flags})
124 if err != nil {
125 return nil, status.Errorf(codes.Unavailable, "failed to create loop device: %v", err)
126 }
127 loopdevNum, err := loopdev.Dev()
128 if err != nil {
129 loopdev.Remove()
130 return nil, status.Errorf(codes.Internal, "device number not available: %v", err)
131 }
132 if err := unix.Mknod(req.TargetPath, unix.S_IFBLK|0640, int(loopdevNum)); err != nil {
133 loopdev.Remove()
134 return nil, status.Errorf(codes.Unavailable, "failed to create device node at target path: %v", err)
135 }
136 loopdev.Close()
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200137 default:
138 return nil, status.Error(codes.InvalidArgument, "unsupported access type")
139 }
140
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200141 return &csi.NodePublishVolumeResponse{}, nil
142}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200143
Lorenz Brun37050122021-03-30 14:00:27 +0200144func (s *csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
145 loopdev, err := loop.Open(req.TargetPath)
146 if err == nil {
147 defer loopdev.Close()
148 // We have a block device
149 if err := loopdev.Remove(); err != nil {
150 return nil, status.Errorf(codes.Unavailable, "failed to remove loop device: %v", err)
151 }
152 if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) {
153 return nil, status.Errorf(codes.Unavailable, "failed to remove device inode: %v", err)
154 }
155 return &csi.NodeUnpublishVolumeResponse{}, nil
156 }
157 // Otherwise try a normal unmount
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200158 if err := unix.Unmount(req.TargetPath, 0); err != nil {
159 return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
160 }
161 return &csi.NodeUnpublishVolumeResponse{}, nil
162}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200163
164func (*csiPluginServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200165 quota, err := fsquota.GetQuota(req.VolumePath)
166 if os.IsNotExist(err) {
167 return nil, status.Error(codes.NotFound, "volume does not exist at this path")
168 } else if err != nil {
169 return nil, status.Errorf(codes.Unavailable, "failed to get quota: %v", err)
170 }
171
172 return &csi.NodeGetVolumeStatsResponse{
173 Usage: []*csi.VolumeUsage{
174 {
175 Total: int64(quota.Bytes),
176 Unit: csi.VolumeUsage_BYTES,
177 Used: int64(quota.BytesUsed),
178 Available: int64(quota.Bytes - quota.BytesUsed),
179 },
180 {
181 Total: int64(quota.Inodes),
182 Unit: csi.VolumeUsage_INODES,
183 Used: int64(quota.InodesUsed),
184 Available: int64(quota.Inodes - quota.InodesUsed),
185 },
186 },
187 }, nil
188}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200189
Lorenz Brun37050122021-03-30 14:00:27 +0200190func (s *csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
Jan Schärb00f7f92025-03-06 17:27:22 +0100191 info, err := os.Stat(req.VolumePath)
192 if err != nil {
193 return nil, status.Errorf(codes.Unavailable, "failed to stat volume: %v", err)
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200194 }
Jan Schärb00f7f92025-03-06 17:27:22 +0100195 if info.IsDir() {
196 // Mount volume. Nothing to do here.
197 } else {
198 // Block volume.
199 loopdev, err := loop.Open(req.VolumePath)
Lorenz Brun37050122021-03-30 14:00:27 +0200200 if err != nil {
Jan Schärb00f7f92025-03-06 17:27:22 +0100201 return nil, status.Errorf(codes.Unavailable, "failed to open loop device: %v", err)
Lorenz Brun37050122021-03-30 14:00:27 +0200202 }
Jan Schärb00f7f92025-03-06 17:27:22 +0100203 defer loopdev.Close()
Lorenz Brun37050122021-03-30 14:00:27 +0200204 if err := loopdev.RefreshSize(); err != nil {
205 return nil, status.Errorf(codes.Unavailable, "failed to refresh loop device size: %v", err)
206 }
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200207 }
208 return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
209}
210
211func rpcCapability(cap csi.NodeServiceCapability_RPC_Type) *csi.NodeServiceCapability {
212 return &csi.NodeServiceCapability{
213 Type: &csi.NodeServiceCapability_Rpc{
214 Rpc: &csi.NodeServiceCapability_RPC{Type: cap},
215 },
216 }
217}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200218
219func (*csiPluginServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200220 return &csi.NodeGetCapabilitiesResponse{
221 Capabilities: []*csi.NodeServiceCapability{
222 rpcCapability(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
223 rpcCapability(csi.NodeServiceCapability_RPC_GET_VOLUME_STATS),
224 },
225 }, nil
226}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200227
228func (*csiPluginServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200229 hostname, err := os.Hostname()
230 if err != nil {
231 return nil, status.Errorf(codes.Unavailable, "failed to get node identity: %v", err)
232 }
233 return &csi.NodeGetInfoResponse{
234 NodeId: hostname,
235 }, nil
236}
237
238// CSI Identity endpoints
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200239func (*csiPluginServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200240 return &csi.GetPluginInfoResponse{
Serge Bazanski662b5b32020-12-21 13:49:00 +0100241 Name: "dev.monogon.metropolis.vfs",
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200242 VendorVersion: "0.0.1", // TODO(lorenz): Maybe stamp?
243 }, nil
244}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200245
246func (*csiPluginServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200247 return &csi.GetPluginCapabilitiesResponse{
248 Capabilities: []*csi.PluginCapability{
249 {
250 Type: &csi.PluginCapability_VolumeExpansion_{
251 VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
252 Type: csi.PluginCapability_VolumeExpansion_ONLINE,
253 },
254 },
255 },
256 },
257 }, nil
258}
259
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200260func (s *csiPluginServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
Lorenz Brun65702192023-08-31 16:27:38 +0200261 return &csi.ProbeResponse{Ready: &wrapperspb.BoolValue{Value: true}}, nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200262}
263
Lorenz Brun1dd0c652024-02-20 18:45:06 +0100264// pluginRegistrationServer implements the pluginregistration.Registration
265// service. It has a special restart mechanic to accomodate a design issue
266// in Kubelet which requires it to remove and recreate its gRPC socket for
267// every new registration attempt.
268type pluginRegistrationServer struct {
Tim Windelschmidtddbb9682025-10-06 21:58:26 +0200269 pluginregistration.UnimplementedRegistrationServer
270
Lorenz Brun1dd0c652024-02-20 18:45:06 +0100271 // regErr has a buffer of 1, so that at least one error can always be
272 // sent into it in a non-blocking way. There is a race if
273 // NotifyRegistrationStatus is called twice with an error as the buffered
274 // item might have been received but not fully processed yet.
275 // As distinguishing between calls on different socket iterations is
276 // hard, doing it this way errs on the side of caution, i.e.
277 // generating too many restarts. This way is better as if we miss one
278 // such error the registration will not be available until the node
279 // gets restarted.
280 regErr chan error
281
282 KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
283}
284
285func (r *pluginRegistrationServer) Run(ctx context.Context) error {
286 // Remove registration socket if it exists
287 os.Remove(r.KubeletDirectory.PluginsRegistry.VFSReg.FullPath())
288
289 registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: r.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
290 if err != nil {
291 return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
292 }
293 defer registrationListener.Close()
294
295 grpcS := grpc.NewServer()
296 pluginregistration.RegisterRegistrationServer(grpcS, r)
297
298 supervisor.Run(ctx, "rpc", supervisor.GRPCServer(grpcS, registrationListener, true))
299 supervisor.Signal(ctx, supervisor.SignalHealthy)
300 select {
301 case <-ctx.Done():
302 return ctx.Err()
303 case err = <-r.regErr:
304 return err
305 }
306}
307
308func (r *pluginRegistrationServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200309 return &pluginregistration.PluginInfo{
Lorenz Brun4e090352021-03-17 17:44:41 +0100310 Type: pluginregistration.CSIPlugin,
Serge Bazanski662b5b32020-12-21 13:49:00 +0100311 Name: "dev.monogon.metropolis.vfs",
Lorenz Brun1dd0c652024-02-20 18:45:06 +0100312 Endpoint: r.KubeletDirectory.Plugins.VFS.FullPath(),
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200313 SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
314 }, nil
315}
316
Lorenz Brun1dd0c652024-02-20 18:45:06 +0100317func (r *pluginRegistrationServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
318 if !req.PluginRegistered {
319 select {
320 case r.regErr <- fmt.Errorf("registration failed: %v", req.Error):
321 default:
322 }
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200323 }
324 return &pluginregistration.RegistrationStatusResponse{}, nil
325}