core/internal: move containerd and kubernetes to localstorage

This moves the last users of the old 'storage' library onto 'localstorage'. We move a lot of 'runtime' directories to a single `/ephemeral` root. This could be called `/run`, but that might imply FHS compliance - which we don't have, nor want to have.

We also slightly refactor Kubernetes services to be a bit nicer to spawn. But generally, this is a pure refactor, with no functional changes.

Test Plan: this should fail. part of a larger stack. D590 is the first tip of the stack that should work.

X-Origin-Diff: phab/D589
GitOrigin-RevId: d2a7c0bb52c2a7c753199221c609e03474936c22
diff --git a/core/internal/kubernetes/service.go b/core/internal/kubernetes/service.go
index ccfb41c..2396066 100644
--- a/core/internal/kubernetes/service.go
+++ b/core/internal/kubernetes/service.go
@@ -22,191 +22,192 @@
 	"fmt"
 	"net"
 	"os"
+	"sync"
 	"time"
 
-	"go.etcd.io/etcd/clientv3"
-	"go.uber.org/zap"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 	"k8s.io/client-go/informers"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/tools/clientcmd"
 
-	schema "git.monogon.dev/source/nexantic.git/core/generated/api"
 	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
-	"git.monogon.dev/source/nexantic.git/core/internal/consensus"
 	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/clusternet"
 	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
 	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/reconciler"
-	"git.monogon.dev/source/nexantic.git/core/internal/storage"
+	"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
 	"git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
+	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
 )
 
 type Config struct {
 	AdvertiseAddress net.IP
 	ServiceIPRange   net.IPNet
 	ClusterNet       net.IPNet
+
+	KPKI *pki.KubernetesPKI
+	Root *localstorage.Root
 }
 
-type Service struct {
-	consensusService *consensus.Service
-	storageService   *storage.Manager
-	logger           *zap.Logger
-
+type state struct {
 	apiserverLogs         *logbuffer.LogBuffer
 	controllerManagerLogs *logbuffer.LogBuffer
 	schedulerLogs         *logbuffer.LogBuffer
 	kubeletLogs           *logbuffer.LogBuffer
-
-	kpki *pki.KubernetesPKI
 }
 
-func New(logger *zap.Logger, consensusService *consensus.Service, storageService *storage.Manager) *Service {
-	s := &Service{
-		consensusService: consensusService,
-		storageService:   storageService,
-		logger:           logger,
+type Service struct {
+	c       Config
+	stateMu sync.Mutex
+	state   *state
+}
 
+func New(c Config) *Service {
+	s := &Service{
+		c: c,
+	}
+	return s
+}
+
+func (s *Service) getState() *state {
+	s.stateMu.Lock()
+	defer s.stateMu.Unlock()
+	return s.state
+}
+
+func (s *Service) Run(ctx context.Context) error {
+	st := &state{
 		apiserverLogs:         logbuffer.New(5000, 16384),
 		controllerManagerLogs: logbuffer.New(5000, 16384),
 		schedulerLogs:         logbuffer.New(5000, 16384),
 		kubeletLogs:           logbuffer.New(5000, 16384),
+	}
+	s.stateMu.Lock()
+	s.state = st
+	s.stateMu.Unlock()
 
-		kpki: pki.NewKubernetes(logger.Named("pki")),
+	controllerManagerConfig, err := getPKIControllerManagerConfig(ctx, s.c.KPKI)
+	if err != nil {
+		return fmt.Errorf("could not generate controller manager pki config: %w", err)
+	}
+	controllerManagerConfig.clusterNet = s.c.ClusterNet
+	schedulerConfig, err := getPKISchedulerConfig(ctx, s.c.KPKI)
+	if err != nil {
+		return fmt.Errorf("could not generate scheduler pki config: %w", err)
 	}
 
-	return s
-}
+	masterKubeconfig, err := s.c.KPKI.Kubeconfig(ctx, pki.Master)
+	if err != nil {
+		return fmt.Errorf("could not generate master kubeconfig: %w", err)
+	}
 
-func (s *Service) getKV() clientv3.KV {
-	return s.consensusService.GetStore("kubernetes", "")
-}
+	rawClientConfig, err := clientcmd.NewClientConfigFromBytes(masterKubeconfig)
+	if err != nil {
+		return fmt.Errorf("could not generate kubernetes client config: %w", err)
+	}
 
-func (s *Service) NewCluster() error {
-	// TODO(q3k): this needs to be passed by the caller.
-	ctx := context.TODO()
-	return s.kpki.EnsureAll(ctx, s.getKV())
+	clientConfig, err := rawClientConfig.ClientConfig()
+	clientSet, err := kubernetes.NewForConfig(clientConfig)
+	if err != nil {
+		return fmt.Errorf("could not generate kubernetes client: %w", err)
+	}
+
+	informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
+
+	hostname, err := os.Hostname()
+	if err != nil {
+		return fmt.Errorf("failed to get hostname: %w", err)
+	}
+
+	apiserver := &apiserverService{
+		KPKI:                        s.c.KPKI,
+		AdvertiseAddress:            s.c.AdvertiseAddress,
+		ServiceIPRange:              s.c.ServiceIPRange,
+		Output:                      st.apiserverLogs,
+		EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
+	}
+
+	kubelet := kubeletService{
+		NodeName:           hostname,
+		ClusterDNS:         nil,
+		KubeletDirectory:   &s.c.Root.Data.Kubernetes.Kubelet,
+		EphemeralDirectory: &s.c.Root.Ephemeral,
+		Output:             st.kubeletLogs,
+		KPKI:               s.c.KPKI,
+	}
+
+	csiPlugin := csiPluginServer{
+		KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+		VolumesDirectory: &s.c.Root.Data.Volumes,
+	}
+
+	csiProvisioner := csiProvisionerServer{
+		NodeName:         hostname,
+		Kubernetes:       clientSet,
+		InformerFactory:  informerFactory,
+		VolumesDirectory: &s.c.Root.Data.Volumes,
+	}
+
+	clusternet := clusternet.Service{
+		NodeName:        hostname,
+		Kubernetes:      clientSet,
+		ClusterNet:      s.c.ClusterNet,
+		InformerFactory: informerFactory,
+		DataDirectory:   &s.c.Root.Data.Kubernetes.ClusterNetworking,
+	}
+
+	for _, sub := range []struct {
+		name     string
+		runnable supervisor.Runnable
+	}{
+		{"apiserver", apiserver.Run},
+		{"controller-manager", runControllerManager(*controllerManagerConfig, st.controllerManagerLogs)},
+		{"scheduler", runScheduler(*schedulerConfig, st.schedulerLogs)},
+		{"kubelet", kubelet.Run},
+		{"reconciler", reconciler.Run(clientSet)},
+		{"csi-plugin", csiPlugin.Run},
+		{"csi-provisioner", csiProvisioner.Run},
+		{"clusternet", clusternet.Run},
+	} {
+		err := supervisor.Run(ctx, sub.name, sub.runnable)
+		if err != nil {
+			return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
+		}
+	}
+
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	supervisor.Signal(ctx, supervisor.SignalDone)
+	return nil
 }
 
 // GetComponentLogs grabs logs from various Kubernetes binaries
 func (s *Service) GetComponentLogs(component string, n int) ([]string, error) {
+	s.stateMu.Lock()
+	defer s.stateMu.Unlock()
+	if s.state == nil {
+		return nil, errors.New("kubernetes not started yet")
+	}
+
 	switch component {
 	case "apiserver":
-		return s.apiserverLogs.ReadLinesTruncated(n, "..."), nil
+		return s.state.apiserverLogs.ReadLinesTruncated(n, "..."), nil
 	case "controller-manager":
-		return s.controllerManagerLogs.ReadLinesTruncated(n, "..."), nil
+		return s.state.controllerManagerLogs.ReadLinesTruncated(n, "..."), nil
 	case "scheduler":
-		return s.schedulerLogs.ReadLinesTruncated(n, "..."), nil
+		return s.state.schedulerLogs.ReadLinesTruncated(n, "..."), nil
 	case "kubelet":
-		return s.kubeletLogs.ReadLinesTruncated(n, "..."), nil
+		return s.state.kubeletLogs.ReadLinesTruncated(n, "..."), nil
 	default:
-		return []string{}, errors.New("component not available")
+		return nil, errors.New("component not available")
 	}
 }
 
 // GetDebugKubeconfig issues a kubeconfig for an arbitrary given identity. Useful for debugging and testing.
-func (s *Service) GetDebugKubeconfig(ctx context.Context, request *schema.GetDebugKubeconfigRequest) (*schema.GetDebugKubeconfigResponse, error) {
-	if !s.consensusService.IsReady() {
-		return nil, status.Error(codes.Unavailable, "Consensus not ready yet")
-	}
-	ca := s.kpki.Certificates[pki.IdCA]
-	debugKubeconfig, err := pki.New(ca, "", pki.Client(request.Id, request.Groups)).Kubeconfig(ctx, s.getKV())
+func (s *Service) GetDebugKubeconfig(ctx context.Context, request *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+	ca := s.c.KPKI.Certificates[pki.IdCA]
+	debugKubeconfig, err := pki.New(ca, "", pki.Client(request.Id, request.Groups)).Kubeconfig(ctx, s.c.KPKI.KV)
 	if err != nil {
 		return nil, status.Errorf(codes.Unavailable, "Failed to generate kubeconfig: %v", err)
 	}
-	return &schema.GetDebugKubeconfigResponse{DebugKubeconfig: string(debugKubeconfig)}, nil
-}
-
-func (s *Service) Start() error {
-	// TODO(lorenz): This creates a new supervision tree, it should instead attach to the root one. But for that SmalltownNode needs
-	// to be ported to supervisor.
-	supervisor.New(context.TODO(), s.logger, s.Run())
-	return nil
-}
-
-func (s *Service) Run() supervisor.Runnable {
-	return func(ctx context.Context) error {
-		config := Config{
-			AdvertiseAddress: net.IP{10, 0, 2, 15}, // Depends on networking
-			ServiceIPRange: net.IPNet{ // TODO: Decide if configurable / final value
-				IP:   net.IP{192, 168, 188, 0},
-				Mask: net.IPMask{0xff, 0xff, 0xff, 0x00}, // /24, but Go stores as a literal mask
-			},
-			ClusterNet: net.IPNet{
-				IP:   net.IP{192, 168, 188, 0},
-				Mask: net.IPMask{0xff, 0xff, 0xfd, 0x00}, // /22
-			},
-		}
-		consensusKV := s.getKV()
-		apiserverConfig, err := getPKIApiserverConfig(ctx, consensusKV, s.kpki)
-		if err != nil {
-			return fmt.Errorf("could not generate apiserver pki config: %w", err)
-		}
-		apiserverConfig.advertiseAddress = config.AdvertiseAddress
-		apiserverConfig.serviceIPRange = config.ServiceIPRange
-		controllerManagerConfig, err := getPKIControllerManagerConfig(ctx, consensusKV, s.kpki)
-		if err != nil {
-			return fmt.Errorf("could not generate controller manager pki config: %w", err)
-		}
-		controllerManagerConfig.clusterNet = config.ClusterNet
-		schedulerConfig, err := getPKISchedulerConfig(ctx, consensusKV, s.kpki)
-		if err != nil {
-			return fmt.Errorf("could not generate scheduler pki config: %w", err)
-		}
-
-		masterKubeconfig, err := s.kpki.Kubeconfig(ctx, pki.Master, consensusKV)
-		if err != nil {
-			return fmt.Errorf("could not generate master kubeconfig: %w", err)
-		}
-
-		rawClientConfig, err := clientcmd.NewClientConfigFromBytes(masterKubeconfig)
-		if err != nil {
-			return fmt.Errorf("could not generate kubernetes client config: %w", err)
-		}
-
-		clientConfig, err := rawClientConfig.ClientConfig()
-		clientSet, err := kubernetes.NewForConfig(clientConfig)
-		if err != nil {
-			return fmt.Errorf("could not generate kubernetes client: %w", err)
-		}
-
-		informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
-
-		hostname, err := os.Hostname()
-		if err != nil {
-			return fmt.Errorf("failed to get hostname: %w", err)
-		}
-		err = createKubeletConfig(ctx, consensusKV, s.kpki, hostname)
-		if err != nil {
-			return fmt.Errorf("could not created kubelet config: %w", err)
-		}
-
-		key, err := clusternet.EnsureOnDiskKey()
-		if err != nil {
-			return fmt.Errorf("failed to ensure cluster key: %w", err)
-		}
-
-		for _, sub := range []struct {
-			name     string
-			runnable supervisor.Runnable
-		}{
-			{"apiserver", runAPIServer(*apiserverConfig, s.apiserverLogs)},
-			{"controller-manager", runControllerManager(*controllerManagerConfig, s.controllerManagerLogs)},
-			{"scheduler", runScheduler(*schedulerConfig, s.schedulerLogs)},
-			{"kubelet", runKubelet(&KubeletSpec{}, s.kubeletLogs)},
-			{"reconciler", reconciler.Run(clientSet)},
-			{"csi-plugin", runCSIPlugin(s.storageService)},
-			{"pv-provisioner", runCSIProvisioner(s.storageService, clientSet, informerFactory)},
-			{"clusternet", clusternet.Run(informerFactory, clusterNet, clientSet, key)},
-		} {
-			err := supervisor.Run(ctx, sub.name, sub.runnable)
-			if err != nil {
-				return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
-			}
-		}
-
-		supervisor.Signal(ctx, supervisor.SignalHealthy)
-		supervisor.Signal(ctx, supervisor.SignalDone)
-		return nil
-	}
+	return &apb.GetDebugKubeconfigResponse{DebugKubeconfig: string(debugKubeconfig)}, nil
 }