m/n/kubernetes: improve CSI registration reliability
Kubelet's plugin registration mechanism is quite awful, it
relies on being notified by inotify that a new registration socket has
been placed into a specific path, which it then interrogates and
reports back if the registration succeeded.
That registration sometimes involves network operations which are prone
to failure. It reports that failure back to the registration server
asynchronously but does not attempt to retry the process.
To actually get Kubelet to retry, one needs to remove and recreate the
registration socket.
This change implements such a mechanism, recreating the socket and
registration server on every reported registration failure.
Supervisor backoff is used to prevent busy-looping on non-transient
errors.
Change-Id: I79eaf0efdf55ccdede15d8cee42cda7c276e4b50
Reviewed-on: https://review.monogon.dev/c/monogon/+/2785
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/kubernetes/csi.go b/metropolis/node/kubernetes/csi.go
index ab9549d..58c381a 100644
--- a/metropolis/node/kubernetes/csi.go
+++ b/metropolis/node/kubernetes/csi.go
@@ -73,17 +73,12 @@
return err
}
- // Try to remove socket if an unclean shutdown happened
- os.Remove(s.KubeletDirectory.PluginsRegistry.VFSReg.FullPath())
-
- registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
- if err != nil {
- return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
+ r := pluginRegistrationServer{
+ regErr: make(chan error, 1),
+ KubeletDirectory: s.KubeletDirectory,
}
- registrationServer := grpc.NewServer()
- pluginregistration.RegisterRegistrationServer(registrationServer, s)
- if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
+ if err := supervisor.Run(ctx, "registration", r.Run); err != nil {
return err
}
supervisor.Signal(ctx, supervisor.SignalHealthy)
@@ -282,19 +277,63 @@
return &csi.ProbeResponse{Ready: &wrapperspb.BoolValue{Value: true}}, nil
}
-// Registration endpoints
-func (s *csiPluginServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
+// pluginRegistrationServer implements the pluginregistration.Registration
+// service. It has a special restart mechanic to accomodate a design issue
+// in Kubelet which requires it to remove and recreate its gRPC socket for
+// every new registration attempt.
+type pluginRegistrationServer struct {
+ // regErr has a buffer of 1, so that at least one error can always be
+ // sent into it in a non-blocking way. There is a race if
+ // NotifyRegistrationStatus is called twice with an error as the buffered
+ // item might have been received but not fully processed yet.
+ // As distinguishing between calls on different socket iterations is
+ // hard, doing it this way errs on the side of caution, i.e.
+ // generating too many restarts. This way is better as if we miss one
+ // such error the registration will not be available until the node
+ // gets restarted.
+ regErr chan error
+
+ KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
+}
+
+func (r *pluginRegistrationServer) Run(ctx context.Context) error {
+ // Remove registration socket if it exists
+ os.Remove(r.KubeletDirectory.PluginsRegistry.VFSReg.FullPath())
+
+ registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: r.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
+ if err != nil {
+ return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
+ }
+ defer registrationListener.Close()
+
+ grpcS := grpc.NewServer()
+ pluginregistration.RegisterRegistrationServer(grpcS, r)
+
+ supervisor.Run(ctx, "rpc", supervisor.GRPCServer(grpcS, registrationListener, true))
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err = <-r.regErr:
+ return err
+ }
+}
+
+func (r *pluginRegistrationServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
return &pluginregistration.PluginInfo{
Type: pluginregistration.CSIPlugin,
Name: "dev.monogon.metropolis.vfs",
- Endpoint: s.KubeletDirectory.Plugins.VFS.FullPath(),
+ Endpoint: r.KubeletDirectory.Plugins.VFS.FullPath(),
SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
}, nil
}
-func (s *csiPluginServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
- if req.Error != "" {
- s.logger.Warningf("Kubelet failed registering CSI plugin: %v", req.Error)
+func (r *pluginRegistrationServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
+ if !req.PluginRegistered {
+ select {
+ case r.regErr <- fmt.Errorf("registration failed: %v", req.Error):
+ default:
+ }
}
return &pluginregistration.RegistrationStatusResponse{}, nil
}