core/internal: move containerd and kubernetes to localstorage
This moves the last users of the old 'storage' library onto 'localstorage'. We move a lot of 'runtime' directories to a single `/ephemeral` root. This could be called `/run`, but that might imply FHS compliance - which we don't have, nor want to have.
We also slightly refactor Kubernetes services to be a bit nicer to spawn. But generally, this is a pure refactor, with no functional changes.
Test Plan: this should fail. part of a larger stack. D590 is the first tip of the stack that should work.
X-Origin-Diff: phab/D589
GitOrigin-RevId: d2a7c0bb52c2a7c753199221c609e03474936c22
diff --git a/core/internal/kubernetes/service.go b/core/internal/kubernetes/service.go
index ccfb41c..2396066 100644
--- a/core/internal/kubernetes/service.go
+++ b/core/internal/kubernetes/service.go
@@ -22,191 +22,192 @@
"fmt"
"net"
"os"
+ "sync"
"time"
- "go.etcd.io/etcd/clientv3"
- "go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
- schema "git.monogon.dev/source/nexantic.git/core/generated/api"
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
- "git.monogon.dev/source/nexantic.git/core/internal/consensus"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/clusternet"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/reconciler"
- "git.monogon.dev/source/nexantic.git/core/internal/storage"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
"git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
+ apb "git.monogon.dev/source/nexantic.git/core/proto/api"
)
type Config struct {
AdvertiseAddress net.IP
ServiceIPRange net.IPNet
ClusterNet net.IPNet
+
+ KPKI *pki.KubernetesPKI
+ Root *localstorage.Root
}
-type Service struct {
- consensusService *consensus.Service
- storageService *storage.Manager
- logger *zap.Logger
-
+type state struct {
apiserverLogs *logbuffer.LogBuffer
controllerManagerLogs *logbuffer.LogBuffer
schedulerLogs *logbuffer.LogBuffer
kubeletLogs *logbuffer.LogBuffer
-
- kpki *pki.KubernetesPKI
}
-func New(logger *zap.Logger, consensusService *consensus.Service, storageService *storage.Manager) *Service {
- s := &Service{
- consensusService: consensusService,
- storageService: storageService,
- logger: logger,
+type Service struct {
+ c Config
+ stateMu sync.Mutex
+ state *state
+}
+func New(c Config) *Service {
+ s := &Service{
+ c: c,
+ }
+ return s
+}
+
+func (s *Service) getState() *state {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ return s.state
+}
+
+func (s *Service) Run(ctx context.Context) error {
+ st := &state{
apiserverLogs: logbuffer.New(5000, 16384),
controllerManagerLogs: logbuffer.New(5000, 16384),
schedulerLogs: logbuffer.New(5000, 16384),
kubeletLogs: logbuffer.New(5000, 16384),
+ }
+ s.stateMu.Lock()
+ s.state = st
+ s.stateMu.Unlock()
- kpki: pki.NewKubernetes(logger.Named("pki")),
+ controllerManagerConfig, err := getPKIControllerManagerConfig(ctx, s.c.KPKI)
+ if err != nil {
+ return fmt.Errorf("could not generate controller manager pki config: %w", err)
+ }
+ controllerManagerConfig.clusterNet = s.c.ClusterNet
+ schedulerConfig, err := getPKISchedulerConfig(ctx, s.c.KPKI)
+ if err != nil {
+ return fmt.Errorf("could not generate scheduler pki config: %w", err)
}
- return s
-}
+ masterKubeconfig, err := s.c.KPKI.Kubeconfig(ctx, pki.Master)
+ if err != nil {
+ return fmt.Errorf("could not generate master kubeconfig: %w", err)
+ }
-func (s *Service) getKV() clientv3.KV {
- return s.consensusService.GetStore("kubernetes", "")
-}
+ rawClientConfig, err := clientcmd.NewClientConfigFromBytes(masterKubeconfig)
+ if err != nil {
+ return fmt.Errorf("could not generate kubernetes client config: %w", err)
+ }
-func (s *Service) NewCluster() error {
- // TODO(q3k): this needs to be passed by the caller.
- ctx := context.TODO()
- return s.kpki.EnsureAll(ctx, s.getKV())
+ clientConfig, err := rawClientConfig.ClientConfig()
+ clientSet, err := kubernetes.NewForConfig(clientConfig)
+ if err != nil {
+ return fmt.Errorf("could not generate kubernetes client: %w", err)
+ }
+
+ informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
+
+ hostname, err := os.Hostname()
+ if err != nil {
+ return fmt.Errorf("failed to get hostname: %w", err)
+ }
+
+ apiserver := &apiserverService{
+ KPKI: s.c.KPKI,
+ AdvertiseAddress: s.c.AdvertiseAddress,
+ ServiceIPRange: s.c.ServiceIPRange,
+ Output: st.apiserverLogs,
+ EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
+ }
+
+ kubelet := kubeletService{
+ NodeName: hostname,
+ ClusterDNS: nil,
+ KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+ EphemeralDirectory: &s.c.Root.Ephemeral,
+ Output: st.kubeletLogs,
+ KPKI: s.c.KPKI,
+ }
+
+ csiPlugin := csiPluginServer{
+ KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+ VolumesDirectory: &s.c.Root.Data.Volumes,
+ }
+
+ csiProvisioner := csiProvisionerServer{
+ NodeName: hostname,
+ Kubernetes: clientSet,
+ InformerFactory: informerFactory,
+ VolumesDirectory: &s.c.Root.Data.Volumes,
+ }
+
+ clusternet := clusternet.Service{
+ NodeName: hostname,
+ Kubernetes: clientSet,
+ ClusterNet: s.c.ClusterNet,
+ InformerFactory: informerFactory,
+ DataDirectory: &s.c.Root.Data.Kubernetes.ClusterNetworking,
+ }
+
+ for _, sub := range []struct {
+ name string
+ runnable supervisor.Runnable
+ }{
+ {"apiserver", apiserver.Run},
+ {"controller-manager", runControllerManager(*controllerManagerConfig, st.controllerManagerLogs)},
+ {"scheduler", runScheduler(*schedulerConfig, st.schedulerLogs)},
+ {"kubelet", kubelet.Run},
+ {"reconciler", reconciler.Run(clientSet)},
+ {"csi-plugin", csiPlugin.Run},
+ {"csi-provisioner", csiProvisioner.Run},
+ {"clusternet", clusternet.Run},
+ } {
+ err := supervisor.Run(ctx, sub.name, sub.runnable)
+ if err != nil {
+ return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
+ }
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
}
// GetComponentLogs grabs logs from various Kubernetes binaries
func (s *Service) GetComponentLogs(component string, n int) ([]string, error) {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ if s.state == nil {
+ return nil, errors.New("kubernetes not started yet")
+ }
+
switch component {
case "apiserver":
- return s.apiserverLogs.ReadLinesTruncated(n, "..."), nil
+ return s.state.apiserverLogs.ReadLinesTruncated(n, "..."), nil
case "controller-manager":
- return s.controllerManagerLogs.ReadLinesTruncated(n, "..."), nil
+ return s.state.controllerManagerLogs.ReadLinesTruncated(n, "..."), nil
case "scheduler":
- return s.schedulerLogs.ReadLinesTruncated(n, "..."), nil
+ return s.state.schedulerLogs.ReadLinesTruncated(n, "..."), nil
case "kubelet":
- return s.kubeletLogs.ReadLinesTruncated(n, "..."), nil
+ return s.state.kubeletLogs.ReadLinesTruncated(n, "..."), nil
default:
- return []string{}, errors.New("component not available")
+ return nil, errors.New("component not available")
}
}
// GetDebugKubeconfig issues a kubeconfig for an arbitrary given identity. Useful for debugging and testing.
-func (s *Service) GetDebugKubeconfig(ctx context.Context, request *schema.GetDebugKubeconfigRequest) (*schema.GetDebugKubeconfigResponse, error) {
- if !s.consensusService.IsReady() {
- return nil, status.Error(codes.Unavailable, "Consensus not ready yet")
- }
- ca := s.kpki.Certificates[pki.IdCA]
- debugKubeconfig, err := pki.New(ca, "", pki.Client(request.Id, request.Groups)).Kubeconfig(ctx, s.getKV())
+func (s *Service) GetDebugKubeconfig(ctx context.Context, request *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+ ca := s.c.KPKI.Certificates[pki.IdCA]
+ debugKubeconfig, err := pki.New(ca, "", pki.Client(request.Id, request.Groups)).Kubeconfig(ctx, s.c.KPKI.KV)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "Failed to generate kubeconfig: %v", err)
}
- return &schema.GetDebugKubeconfigResponse{DebugKubeconfig: string(debugKubeconfig)}, nil
-}
-
-func (s *Service) Start() error {
- // TODO(lorenz): This creates a new supervision tree, it should instead attach to the root one. But for that SmalltownNode needs
- // to be ported to supervisor.
- supervisor.New(context.TODO(), s.logger, s.Run())
- return nil
-}
-
-func (s *Service) Run() supervisor.Runnable {
- return func(ctx context.Context) error {
- config := Config{
- AdvertiseAddress: net.IP{10, 0, 2, 15}, // Depends on networking
- ServiceIPRange: net.IPNet{ // TODO: Decide if configurable / final value
- IP: net.IP{192, 168, 188, 0},
- Mask: net.IPMask{0xff, 0xff, 0xff, 0x00}, // /24, but Go stores as a literal mask
- },
- ClusterNet: net.IPNet{
- IP: net.IP{192, 168, 188, 0},
- Mask: net.IPMask{0xff, 0xff, 0xfd, 0x00}, // /22
- },
- }
- consensusKV := s.getKV()
- apiserverConfig, err := getPKIApiserverConfig(ctx, consensusKV, s.kpki)
- if err != nil {
- return fmt.Errorf("could not generate apiserver pki config: %w", err)
- }
- apiserverConfig.advertiseAddress = config.AdvertiseAddress
- apiserverConfig.serviceIPRange = config.ServiceIPRange
- controllerManagerConfig, err := getPKIControllerManagerConfig(ctx, consensusKV, s.kpki)
- if err != nil {
- return fmt.Errorf("could not generate controller manager pki config: %w", err)
- }
- controllerManagerConfig.clusterNet = config.ClusterNet
- schedulerConfig, err := getPKISchedulerConfig(ctx, consensusKV, s.kpki)
- if err != nil {
- return fmt.Errorf("could not generate scheduler pki config: %w", err)
- }
-
- masterKubeconfig, err := s.kpki.Kubeconfig(ctx, pki.Master, consensusKV)
- if err != nil {
- return fmt.Errorf("could not generate master kubeconfig: %w", err)
- }
-
- rawClientConfig, err := clientcmd.NewClientConfigFromBytes(masterKubeconfig)
- if err != nil {
- return fmt.Errorf("could not generate kubernetes client config: %w", err)
- }
-
- clientConfig, err := rawClientConfig.ClientConfig()
- clientSet, err := kubernetes.NewForConfig(clientConfig)
- if err != nil {
- return fmt.Errorf("could not generate kubernetes client: %w", err)
- }
-
- informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
-
- hostname, err := os.Hostname()
- if err != nil {
- return fmt.Errorf("failed to get hostname: %w", err)
- }
- err = createKubeletConfig(ctx, consensusKV, s.kpki, hostname)
- if err != nil {
- return fmt.Errorf("could not created kubelet config: %w", err)
- }
-
- key, err := clusternet.EnsureOnDiskKey()
- if err != nil {
- return fmt.Errorf("failed to ensure cluster key: %w", err)
- }
-
- for _, sub := range []struct {
- name string
- runnable supervisor.Runnable
- }{
- {"apiserver", runAPIServer(*apiserverConfig, s.apiserverLogs)},
- {"controller-manager", runControllerManager(*controllerManagerConfig, s.controllerManagerLogs)},
- {"scheduler", runScheduler(*schedulerConfig, s.schedulerLogs)},
- {"kubelet", runKubelet(&KubeletSpec{}, s.kubeletLogs)},
- {"reconciler", reconciler.Run(clientSet)},
- {"csi-plugin", runCSIPlugin(s.storageService)},
- {"pv-provisioner", runCSIProvisioner(s.storageService, clientSet, informerFactory)},
- {"clusternet", clusternet.Run(informerFactory, clusterNet, clientSet, key)},
- } {
- err := supervisor.Run(ctx, sub.name, sub.runnable)
- if err != nil {
- return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
- }
- }
-
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- supervisor.Signal(ctx, supervisor.SignalDone)
- return nil
- }
+ return &apb.GetDebugKubeconfigResponse{DebugKubeconfig: string(debugKubeconfig)}, nil
}