blob: 2572dfa040a735b4551f0e1f0634bfe10525ffdc [file] [log] [blame] [edit]
package kubernetes
import (
"context"
"crypto/ed25519"
"fmt"
"net"
"time"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"source.monogon.dev/go/net/tinylb"
"source.monogon.dev/metropolis/node"
oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/core/network/dns"
"source.monogon.dev/metropolis/node/kubernetes/clusternet"
"source.monogon.dev/metropolis/node/kubernetes/nfproxy"
kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
"source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
)
type ConfigWorker struct {
ServiceIPRange net.IPNet
ClusterNet net.IPNet
ClusterDomain string
Root *localstorage.Root
Network *network.Service
NodeID string
CuratorClient ipb.CuratorClient
PodNetwork event.Value[*oclusternet.Prefixes]
}
type Worker struct {
c ConfigWorker
}
func NewWorker(c ConfigWorker) *Worker {
s := &Worker{
c: c,
}
return s
}
func (s *Worker) Run(ctx context.Context) error {
// Run apiproxy, which load-balances connections from worker components to this
// cluster's api servers. This is necessary as we want to round-robin across all
// available apiservers, and Kubernetes components do not implement client-side
// load-balancing.
err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
defer lis.Close()
v := memory.Value[tinylb.BackendSet]{}
srv := tinylb.Server{
Provider: &v,
Listener: lis,
}
err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
})
if err != nil {
return err
}
supervisor.Logger(ctx).Infof("Starting proxy...")
return srv.Run(ctx)
})
if err != nil {
return err
}
kubelet := kubeletService{
ClusterDomain: s.c.ClusterDomain,
KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
EphemeralDirectory: &s.c.Root.Ephemeral,
ClusterDNS: []net.IP{node.ContainerDNSIP},
}
// Gather all required material to send over for certficiate issuance to the
// curator...
kwr := &ipb.IssueCertificateRequest_KubernetesWorker{}
kubeletPK, err := kubelet.getPubkey(ctx)
if err != nil {
return fmt.Errorf("when getting kubelet pubkey: %w", err)
}
kwr.KubeletPubkey = kubeletPK
clients := map[string]*struct {
dir *localstorage.PKIDirectory
sk ed25519.PrivateKey
pk ed25519.PublicKey
client *kubernetes.Clientset
informers informers.SharedInformerFactory
kubeconfig []byte
certFrom func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte
}{
"csi": {
dir: &s.c.Root.Data.Kubernetes.CSIProvisioner.PKI,
certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
return kw.CsiProvisionerCertificate
},
},
"netserv": {
dir: &s.c.Root.Data.Kubernetes.Netservices.PKI,
certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
return kw.NetservicesCertificate
},
},
}
for name, c := range clients {
if err := c.dir.GeneratePrivateKey(); err != nil {
return fmt.Errorf("generating %s key: %w", name, err)
}
k, err := c.dir.ReadPrivateKey()
if err != nil {
return fmt.Errorf("reading %s key: %w", name, err)
}
c.sk = k
c.pk = c.sk.Public().(ed25519.PublicKey)
}
kwr.CsiProvisionerPubkey = clients["csi"].pk
kwr.NetservicesPubkey = clients["netserv"].pk
// ...issue certificates...
res, err := s.c.CuratorClient.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
KubernetesWorker: kwr,
},
})
if err != nil {
return fmt.Errorf("failed to get certificates from curator: %w", err)
}
kw := res.Kind.(*ipb.IssueCertificateResponse_KubernetesWorker_).KubernetesWorker
// ...write them...
if err := kubelet.setCertificates(kw); err != nil {
return fmt.Errorf("failed to write kubelet certs: %w", err)
}
for name, c := range clients {
if c.dir == nil {
continue
}
if err := c.dir.WriteCertificates(kw.IdentityCaCertificate, c.certFrom(kw)); err != nil {
return fmt.Errorf("failed to write %s certs: %w", name, err)
}
}
// ... and set up connections.
for name, c := range clients {
kubeconf, err := kpki.KubeconfigRaw(kw.IdentityCaCertificate, c.certFrom(kw), c.sk, kpki.KubernetesAPIEndpointForWorker)
if err != nil {
return fmt.Errorf("failed to make %s kubeconfig: %w", name, err)
}
c.kubeconfig = kubeconf
cs, informers, err := connectByKubeconfig(kubeconf)
if err != nil {
return fmt.Errorf("failed to connect with %s: %w", name, err)
}
c.client = cs
c.informers = informers
}
csiPlugin := csiPluginServer{
KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
VolumesDirectory: &s.c.Root.Data.Volumes,
}
csiProvisioner := csiProvisionerServer{
NodeName: s.c.NodeID,
Kubernetes: clients["csi"].client,
InformerFactory: clients["csi"].informers,
VolumesDirectory: &s.c.Root.Data.Volumes,
}
clusternet := clusternet.Service{
NodeName: s.c.NodeID,
Kubernetes: clients["netserv"].client,
Prefixes: s.c.PodNetwork,
}
nfproxy := nfproxy.Service{
ClusterCIDR: s.c.ClusterNet,
ClientSet: clients["netserv"].client,
}
kvmDevicePlugin := kvmdevice.Plugin{
KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
}
for _, sub := range []struct {
name string
runnable supervisor.Runnable
}{
{"csi-plugin", csiPlugin.Run},
{"csi-provisioner", csiProvisioner.Run},
{"clusternet", clusternet.Run},
{"nfproxy", nfproxy.Run},
{"kvmdeviceplugin", kvmDevicePlugin.Run},
{"kubelet", kubelet.Run},
} {
err := supervisor.Run(ctx, sub.name, sub.runnable)
if err != nil {
return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
}
}
supervisor.Logger(ctx).Info("Registering K8s CoreDNS")
clusterDNSDirective := dns.NewKubernetesDirective(s.c.ClusterDomain, clients["netserv"].kubeconfig)
s.c.Network.ConfigureDNS(clusterDNSDirective)
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
s.c.Network.ConfigureDNS(dns.CancelDirective(clusterDNSDirective))
return nil
}
func connectByKubeconfig(kubeconfig []byte) (*kubernetes.Clientset, informers.SharedInformerFactory, error) {
rawClientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
if err != nil {
return nil, nil, fmt.Errorf("could not generate kubernetes client config: %w", err)
}
clientConfig, err := rawClientConfig.ClientConfig()
clientSet, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return nil, nil, fmt.Errorf("could not generate kubernetes client: %w", err)
}
informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
return clientSet, informerFactory, nil
}