blob: f1ccf8a23a28034895c14a0401122da4a1ae60c5 [file] [log] [blame]
Lorenz Brun0db90ba2020-04-06 14:04:52 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package kubernetes
18
19import (
20 "context"
21 "fmt"
22 "net"
23 "os"
24 "path/filepath"
25 "regexp"
26
Lorenz Brun0db90ba2020-04-06 14:04:52 +020027 "github.com/container-storage-interface/spec/lib/go/csi"
28 "github.com/golang/protobuf/ptypes/wrappers"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020029 "golang.org/x/sys/unix"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/status"
33 pluginregistration "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
34
Serge Bazanski77cb6c52020-12-19 00:09:22 +010035 "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
Serge Bazanski549b72b2021-01-07 14:54:19 +010036 "git.monogon.dev/source/nexantic.git/metropolis/pkg/logtree"
37 "git.monogon.dev/source/nexantic.git/metropolis/pkg/fsquota"
38 "git.monogon.dev/source/nexantic.git/metropolis/pkg/supervisor"
Lorenz Brun0db90ba2020-04-06 14:04:52 +020039)
40
41// Derived from K8s spec for acceptable names, but shortened to 130 characters to avoid issues with
42// maximum path length. We don't provision longer names so this applies only if you manually create
43// a volume with a name of more than 130 characters.
Lorenz Brunb15abad2020-04-16 11:17:12 +020044var acceptableNames = regexp.MustCompile("^[a-z][a-bz0-9-.]{0,128}[a-z0-9]$")
Lorenz Brun0db90ba2020-04-06 14:04:52 +020045
46const volumeDir = "volumes"
47
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020048type csiPluginServer struct {
49 KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
50 VolumesDirectory *localstorage.DataVolumesDirectory
Lorenz Brun0db90ba2020-04-06 14:04:52 +020051
Serge Bazanskic7359672020-10-30 16:38:57 +010052 logger logtree.LeveledLogger
Lorenz Brun0db90ba2020-04-06 14:04:52 +020053}
54
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020055func (s *csiPluginServer) Run(ctx context.Context) error {
56 s.logger = supervisor.Logger(ctx)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020057
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020058 pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.Plugins.VFS.FullPath(), Net: "unix"})
59 if err != nil {
60 return fmt.Errorf("failed to listen on CSI socket: %w", err)
Lorenz Brun0db90ba2020-04-06 14:04:52 +020061 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020062 pluginListener.SetUnlinkOnClose(true)
63
64 pluginServer := grpc.NewServer()
65 csi.RegisterIdentityServer(pluginServer, s)
66 csi.RegisterNodeServer(pluginServer, s)
67 // Enable graceful shutdown since we don't have long-running RPCs and most of them shouldn't and can't be
68 // cancelled anyways.
69 if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
70 return err
71 }
72
73 registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
74 if err != nil {
75 return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
76 }
77 registrationListener.SetUnlinkOnClose(true)
78
79 registrationServer := grpc.NewServer()
80 pluginregistration.RegisterRegistrationServer(registrationServer, s)
81 if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
82 return err
83 }
84 supervisor.Signal(ctx, supervisor.SignalHealthy)
85 supervisor.Signal(ctx, supervisor.SignalDone)
86 return nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +020087}
88
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020089func (*csiPluginServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +020090 return nil, status.Errorf(codes.Unimplemented, "method NodeStageVolume not supported")
91}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020092
93func (*csiPluginServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +020094 return nil, status.Errorf(codes.Unimplemented, "method NodeUnstageVolume not supported")
95}
96
Serge Bazanskic2c7ad92020-07-13 17:20:09 +020097func (s *csiPluginServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +020098 if !acceptableNames.MatchString(req.VolumeId) {
99 return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
100 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200101
102 // TODO(q3k): move this logic to localstorage?
103 volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
104
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200105 switch req.VolumeCapability.AccessMode.Mode {
106 case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
107 case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
108 default:
109 return nil, status.Error(codes.InvalidArgument, "unsupported access mode")
110 }
111 switch req.VolumeCapability.AccessType.(type) {
112 case *csi.VolumeCapability_Mount:
113 default:
114 return nil, status.Error(codes.InvalidArgument, "unsupported access type")
115 }
116
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200117 err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
118 switch {
119 case err == unix.ENOENT:
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200120 return nil, status.Error(codes.NotFound, "volume not found")
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200121 case err != nil:
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200122 return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
123 }
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200124
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200125 if req.Readonly {
126 err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND|unix.MS_REMOUNT|unix.MS_RDONLY, "")
127 if err != nil {
128 _ = unix.Unmount(req.TargetPath, 0) // Best-effort
129 return nil, status.Errorf(codes.Unavailable, "failed to remount volume: %v", err)
130 }
131 }
132 return &csi.NodePublishVolumeResponse{}, nil
133}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200134
135func (*csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200136 if err := unix.Unmount(req.TargetPath, 0); err != nil {
137 return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
138 }
139 return &csi.NodeUnpublishVolumeResponse{}, nil
140}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200141
142func (*csiPluginServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200143 quota, err := fsquota.GetQuota(req.VolumePath)
144 if os.IsNotExist(err) {
145 return nil, status.Error(codes.NotFound, "volume does not exist at this path")
146 } else if err != nil {
147 return nil, status.Errorf(codes.Unavailable, "failed to get quota: %v", err)
148 }
149
150 return &csi.NodeGetVolumeStatsResponse{
151 Usage: []*csi.VolumeUsage{
152 {
153 Total: int64(quota.Bytes),
154 Unit: csi.VolumeUsage_BYTES,
155 Used: int64(quota.BytesUsed),
156 Available: int64(quota.Bytes - quota.BytesUsed),
157 },
158 {
159 Total: int64(quota.Inodes),
160 Unit: csi.VolumeUsage_INODES,
161 Used: int64(quota.InodesUsed),
162 Available: int64(quota.Inodes - quota.InodesUsed),
163 },
164 },
165 }, nil
166}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200167
168func (*csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200169 if req.CapacityRange.LimitBytes <= 0 {
170 return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
171 }
172 if err := fsquota.SetQuota(req.VolumePath, uint64(req.CapacityRange.LimitBytes), 0); err != nil {
173 return nil, status.Errorf(codes.Unavailable, "failed to update quota: %v", err)
174 }
175 return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
176}
177
178func rpcCapability(cap csi.NodeServiceCapability_RPC_Type) *csi.NodeServiceCapability {
179 return &csi.NodeServiceCapability{
180 Type: &csi.NodeServiceCapability_Rpc{
181 Rpc: &csi.NodeServiceCapability_RPC{Type: cap},
182 },
183 }
184}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200185
186func (*csiPluginServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200187 return &csi.NodeGetCapabilitiesResponse{
188 Capabilities: []*csi.NodeServiceCapability{
189 rpcCapability(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
190 rpcCapability(csi.NodeServiceCapability_RPC_GET_VOLUME_STATS),
191 },
192 }, nil
193}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200194
195func (*csiPluginServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200196 hostname, err := os.Hostname()
197 if err != nil {
198 return nil, status.Errorf(codes.Unavailable, "failed to get node identity: %v", err)
199 }
200 return &csi.NodeGetInfoResponse{
201 NodeId: hostname,
202 }, nil
203}
204
205// CSI Identity endpoints
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200206func (*csiPluginServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200207 return &csi.GetPluginInfoResponse{
Serge Bazanski662b5b32020-12-21 13:49:00 +0100208 Name: "dev.monogon.metropolis.vfs",
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200209 VendorVersion: "0.0.1", // TODO(lorenz): Maybe stamp?
210 }, nil
211}
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200212
213func (*csiPluginServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200214 return &csi.GetPluginCapabilitiesResponse{
215 Capabilities: []*csi.PluginCapability{
216 {
217 Type: &csi.PluginCapability_VolumeExpansion_{
218 VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
219 Type: csi.PluginCapability_VolumeExpansion_ONLINE,
220 },
221 },
222 },
223 },
224 }, nil
225}
226
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200227func (s *csiPluginServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
228 return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: true}}, nil
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200229}
230
231// Registration endpoints
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200232func (s *csiPluginServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200233 return &pluginregistration.PluginInfo{
234 Type: "CSIPlugin",
Serge Bazanski662b5b32020-12-21 13:49:00 +0100235 Name: "dev.monogon.metropolis.vfs",
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200236 Endpoint: s.KubeletDirectory.Plugins.VFS.FullPath(),
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200237 SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
238 }, nil
239}
240
Serge Bazanskic2c7ad92020-07-13 17:20:09 +0200241func (s *csiPluginServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200242 if req.Error != "" {
Serge Bazanskic7359672020-10-30 16:38:57 +0100243 s.logger.Warningf("Kubelet failed registering CSI plugin: %v", req.Error)
Lorenz Brun0db90ba2020-04-06 14:04:52 +0200244 }
245 return &pluginregistration.RegistrationStatusResponse{}, nil
246}