|  | package kubernetes | 
|  |  | 
|  | import ( | 
|  | "context" | 
|  | "crypto/ed25519" | 
|  | "fmt" | 
|  | "net" | 
|  | "time" | 
|  |  | 
|  | "k8s.io/client-go/informers" | 
|  | "k8s.io/client-go/kubernetes" | 
|  | "k8s.io/client-go/tools/clientcmd" | 
|  |  | 
|  | "source.monogon.dev/go/net/tinylb" | 
|  | "source.monogon.dev/metropolis/node" | 
|  | oclusternet "source.monogon.dev/metropolis/node/core/clusternet" | 
|  | "source.monogon.dev/metropolis/node/core/localstorage" | 
|  | "source.monogon.dev/metropolis/node/core/network" | 
|  | "source.monogon.dev/metropolis/node/core/network/dns" | 
|  | "source.monogon.dev/metropolis/node/kubernetes/clusternet" | 
|  | "source.monogon.dev/metropolis/node/kubernetes/nfproxy" | 
|  | kpki "source.monogon.dev/metropolis/node/kubernetes/pki" | 
|  | "source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice" | 
|  | "source.monogon.dev/metropolis/pkg/event" | 
|  | "source.monogon.dev/metropolis/pkg/event/memory" | 
|  | "source.monogon.dev/metropolis/pkg/supervisor" | 
|  |  | 
|  | ipb "source.monogon.dev/metropolis/node/core/curator/proto/api" | 
|  | ) | 
|  |  | 
|  | type ConfigWorker struct { | 
|  | ServiceIPRange net.IPNet | 
|  | ClusterNet     net.IPNet | 
|  | ClusterDomain  string | 
|  |  | 
|  | Root          *localstorage.Root | 
|  | Network       *network.Service | 
|  | NodeID        string | 
|  | CuratorClient ipb.CuratorClient | 
|  | PodNetwork    event.Value[*oclusternet.Prefixes] | 
|  | } | 
|  |  | 
|  | type Worker struct { | 
|  | c ConfigWorker | 
|  | } | 
|  |  | 
|  | func NewWorker(c ConfigWorker) *Worker { | 
|  | s := &Worker{ | 
|  | c: c, | 
|  | } | 
|  | return s | 
|  | } | 
|  |  | 
|  | func (s *Worker) Run(ctx context.Context) error { | 
|  | // Run apiproxy, which load-balances connections from worker components to this | 
|  | // cluster's api servers. This is necessary as we want to round-robin across all | 
|  | // available apiservers, and Kubernetes components do not implement client-side | 
|  | // load-balancing. | 
|  | err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error { | 
|  | lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort)) | 
|  | if err != nil { | 
|  | return fmt.Errorf("failed to listen: %w", err) | 
|  | } | 
|  | defer lis.Close() | 
|  |  | 
|  | v := memory.Value[tinylb.BackendSet]{} | 
|  | srv := tinylb.Server{ | 
|  | Provider: &v, | 
|  | Listener: lis, | 
|  | } | 
|  | err = supervisor.Run(ctx, "updater", func(ctx context.Context) error { | 
|  | return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient) | 
|  | }) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  |  | 
|  | supervisor.Logger(ctx).Infof("Starting proxy...") | 
|  | return srv.Run(ctx) | 
|  | }) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  |  | 
|  | kubelet := kubeletService{ | 
|  | ClusterDomain:      s.c.ClusterDomain, | 
|  | KubeletDirectory:   &s.c.Root.Data.Kubernetes.Kubelet, | 
|  | EphemeralDirectory: &s.c.Root.Ephemeral, | 
|  | ClusterDNS:         []net.IP{node.ContainerDNSIP}, | 
|  | } | 
|  |  | 
|  | // Gather all required material to send over for certficiate issuance to the | 
|  | // curator... | 
|  | kwr := &ipb.IssueCertificateRequest_KubernetesWorker{} | 
|  |  | 
|  | kubeletPK, err := kubelet.getPubkey(ctx) | 
|  | if err != nil { | 
|  | return fmt.Errorf("when getting kubelet pubkey: %w", err) | 
|  | } | 
|  | kwr.KubeletPubkey = kubeletPK | 
|  |  | 
|  | clients := map[string]*struct { | 
|  | dir *localstorage.PKIDirectory | 
|  |  | 
|  | sk ed25519.PrivateKey | 
|  | pk ed25519.PublicKey | 
|  |  | 
|  | client     *kubernetes.Clientset | 
|  | informers  informers.SharedInformerFactory | 
|  | kubeconfig []byte | 
|  |  | 
|  | certFrom func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte | 
|  | }{ | 
|  | "csi": { | 
|  | dir: &s.c.Root.Data.Kubernetes.CSIProvisioner.PKI, | 
|  | certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte { | 
|  | return kw.CsiProvisionerCertificate | 
|  | }, | 
|  | }, | 
|  | "netserv": { | 
|  | dir: &s.c.Root.Data.Kubernetes.Netservices.PKI, | 
|  | certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte { | 
|  | return kw.NetservicesCertificate | 
|  | }, | 
|  | }, | 
|  | } | 
|  |  | 
|  | for name, c := range clients { | 
|  | if err := c.dir.GeneratePrivateKey(); err != nil { | 
|  | return fmt.Errorf("generating %s key: %w", name, err) | 
|  | } | 
|  | k, err := c.dir.ReadPrivateKey() | 
|  | if err != nil { | 
|  | return fmt.Errorf("reading %s key: %w", name, err) | 
|  | } | 
|  | c.sk = k | 
|  | c.pk = c.sk.Public().(ed25519.PublicKey) | 
|  | } | 
|  | kwr.CsiProvisionerPubkey = clients["csi"].pk | 
|  | kwr.NetservicesPubkey = clients["netserv"].pk | 
|  |  | 
|  | // ...issue certificates... | 
|  | res, err := s.c.CuratorClient.IssueCertificate(ctx, &ipb.IssueCertificateRequest{ | 
|  | Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{ | 
|  | KubernetesWorker: kwr, | 
|  | }, | 
|  | }) | 
|  | if err != nil { | 
|  | return fmt.Errorf("failed to get certificates from curator: %w", err) | 
|  | } | 
|  | kw := res.Kind.(*ipb.IssueCertificateResponse_KubernetesWorker_).KubernetesWorker | 
|  |  | 
|  | // ...write them... | 
|  | if err := kubelet.setCertificates(kw); err != nil { | 
|  | return fmt.Errorf("failed to write kubelet certs: %w", err) | 
|  | } | 
|  | for name, c := range clients { | 
|  | if c.dir == nil { | 
|  | continue | 
|  | } | 
|  | if err := c.dir.WriteCertificates(kw.IdentityCaCertificate, c.certFrom(kw)); err != nil { | 
|  | return fmt.Errorf("failed to write %s certs: %w", name, err) | 
|  | } | 
|  | } | 
|  |  | 
|  | // ... and set up connections. | 
|  | for name, c := range clients { | 
|  | kubeconf, err := kpki.KubeconfigRaw(kw.IdentityCaCertificate, c.certFrom(kw), c.sk, kpki.KubernetesAPIEndpointForWorker) | 
|  | if err != nil { | 
|  | return fmt.Errorf("failed to make %s kubeconfig: %w", name, err) | 
|  | } | 
|  | c.kubeconfig = kubeconf | 
|  | cs, informers, err := connectByKubeconfig(kubeconf) | 
|  | if err != nil { | 
|  | return fmt.Errorf("failed to connect with %s: %w", name, err) | 
|  | } | 
|  | c.client = cs | 
|  | c.informers = informers | 
|  | } | 
|  |  | 
|  | csiPlugin := csiPluginServer{ | 
|  | KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet, | 
|  | VolumesDirectory: &s.c.Root.Data.Volumes, | 
|  | } | 
|  |  | 
|  | csiProvisioner := csiProvisionerServer{ | 
|  | NodeName:         s.c.NodeID, | 
|  | Kubernetes:       clients["csi"].client, | 
|  | InformerFactory:  clients["csi"].informers, | 
|  | VolumesDirectory: &s.c.Root.Data.Volumes, | 
|  | } | 
|  |  | 
|  | clusternet := clusternet.Service{ | 
|  | NodeName:   s.c.NodeID, | 
|  | Kubernetes: clients["netserv"].client, | 
|  | Prefixes:   s.c.PodNetwork, | 
|  | } | 
|  |  | 
|  | nfproxy := nfproxy.Service{ | 
|  | ClusterCIDR: s.c.ClusterNet, | 
|  | ClientSet:   clients["netserv"].client, | 
|  | } | 
|  |  | 
|  | kvmDevicePlugin := kvmdevice.Plugin{ | 
|  | KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet, | 
|  | } | 
|  |  | 
|  | for _, sub := range []struct { | 
|  | name     string | 
|  | runnable supervisor.Runnable | 
|  | }{ | 
|  | {"csi-plugin", csiPlugin.Run}, | 
|  | {"csi-provisioner", csiProvisioner.Run}, | 
|  | {"clusternet", clusternet.Run}, | 
|  | {"nfproxy", nfproxy.Run}, | 
|  | {"kvmdeviceplugin", kvmDevicePlugin.Run}, | 
|  | {"kubelet", kubelet.Run}, | 
|  | } { | 
|  | err := supervisor.Run(ctx, sub.name, sub.runnable) | 
|  | if err != nil { | 
|  | return fmt.Errorf("could not run sub-service %q: %w", sub.name, err) | 
|  | } | 
|  | } | 
|  |  | 
|  | supervisor.Logger(ctx).Info("Registering K8s CoreDNS") | 
|  | clusterDNSDirective := dns.NewKubernetesDirective(s.c.ClusterDomain, clients["netserv"].kubeconfig) | 
|  | s.c.Network.ConfigureDNS(clusterDNSDirective) | 
|  |  | 
|  | supervisor.Signal(ctx, supervisor.SignalHealthy) | 
|  | <-ctx.Done() | 
|  | s.c.Network.ConfigureDNS(dns.CancelDirective(clusterDNSDirective)) | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func connectByKubeconfig(kubeconfig []byte) (*kubernetes.Clientset, informers.SharedInformerFactory, error) { | 
|  | rawClientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig) | 
|  | if err != nil { | 
|  | return nil, nil, fmt.Errorf("could not generate kubernetes client config: %w", err) | 
|  | } | 
|  | clientConfig, err := rawClientConfig.ClientConfig() | 
|  | clientSet, err := kubernetes.NewForConfig(clientConfig) | 
|  | if err != nil { | 
|  | return nil, nil, fmt.Errorf("could not generate kubernetes client: %w", err) | 
|  | } | 
|  | informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute) | 
|  | return clientSet, informerFactory, nil | 
|  | } |