blob: 2572dfa040a735b4551f0e1f0634bfe10525ffdc [file] [log] [blame]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01001package kubernetes
2
3import (
4 "context"
Serge Bazanski2cfafc92023-03-21 16:42:47 +01005 "crypto/ed25519"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01006 "fmt"
7 "net"
Serge Bazanski2cfafc92023-03-21 16:42:47 +01008 "time"
9
10 "k8s.io/client-go/informers"
11 "k8s.io/client-go/kubernetes"
12 "k8s.io/client-go/tools/clientcmd"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010013
14 "source.monogon.dev/go/net/tinylb"
15 "source.monogon.dev/metropolis/node"
Serge Bazanskib565cc62023-03-30 18:43:51 +020016 oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010017 "source.monogon.dev/metropolis/node/core/localstorage"
18 "source.monogon.dev/metropolis/node/core/network"
Serge Bazanski2cfafc92023-03-21 16:42:47 +010019 "source.monogon.dev/metropolis/node/core/network/dns"
20 "source.monogon.dev/metropolis/node/kubernetes/clusternet"
21 "source.monogon.dev/metropolis/node/kubernetes/nfproxy"
22 kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
23 "source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
Serge Bazanskib565cc62023-03-30 18:43:51 +020024 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010025 "source.monogon.dev/metropolis/pkg/event/memory"
26 "source.monogon.dev/metropolis/pkg/supervisor"
27
28 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
29)
30
31type ConfigWorker struct {
32 ServiceIPRange net.IPNet
33 ClusterNet net.IPNet
34 ClusterDomain string
35
36 Root *localstorage.Root
37 Network *network.Service
38 NodeID string
39 CuratorClient ipb.CuratorClient
Serge Bazanskib565cc62023-03-30 18:43:51 +020040 PodNetwork event.Value[*oclusternet.Prefixes]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010041}
42
43type Worker struct {
44 c ConfigWorker
45}
46
47func NewWorker(c ConfigWorker) *Worker {
48 s := &Worker{
49 c: c,
50 }
51 return s
52}
53
54func (s *Worker) Run(ctx context.Context) error {
55 // Run apiproxy, which load-balances connections from worker components to this
56 // cluster's api servers. This is necessary as we want to round-robin across all
57 // available apiservers, and Kubernetes components do not implement client-side
58 // load-balancing.
59 err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
60 lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort))
61 if err != nil {
62 return fmt.Errorf("failed to listen: %w", err)
63 }
64 defer lis.Close()
65
66 v := memory.Value[tinylb.BackendSet]{}
67 srv := tinylb.Server{
68 Provider: &v,
69 Listener: lis,
70 }
71 err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
72 return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
73 })
74 if err != nil {
75 return err
76 }
77
78 supervisor.Logger(ctx).Infof("Starting proxy...")
79 return srv.Run(ctx)
80 })
81 if err != nil {
82 return err
83 }
Serge Bazanskib565cc62023-03-30 18:43:51 +020084
Serge Bazanski2cfafc92023-03-21 16:42:47 +010085 kubelet := kubeletService{
86 ClusterDomain: s.c.ClusterDomain,
87 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
88 EphemeralDirectory: &s.c.Root.Ephemeral,
Lorenz Brun0e291a12023-06-01 12:22:45 +020089 ClusterDNS: []net.IP{node.ContainerDNSIP},
Serge Bazanski2cfafc92023-03-21 16:42:47 +010090 }
91
92 // Gather all required material to send over for certficiate issuance to the
93 // curator...
94 kwr := &ipb.IssueCertificateRequest_KubernetesWorker{}
95
96 kubeletPK, err := kubelet.getPubkey(ctx)
97 if err != nil {
98 return fmt.Errorf("when getting kubelet pubkey: %w", err)
99 }
100 kwr.KubeletPubkey = kubeletPK
101
102 clients := map[string]*struct {
103 dir *localstorage.PKIDirectory
104
105 sk ed25519.PrivateKey
106 pk ed25519.PublicKey
107
108 client *kubernetes.Clientset
109 informers informers.SharedInformerFactory
110 kubeconfig []byte
111
112 certFrom func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte
113 }{
114 "csi": {
115 dir: &s.c.Root.Data.Kubernetes.CSIProvisioner.PKI,
116 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
117 return kw.CsiProvisionerCertificate
118 },
119 },
120 "netserv": {
121 dir: &s.c.Root.Data.Kubernetes.Netservices.PKI,
122 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
123 return kw.NetservicesCertificate
124 },
125 },
126 }
127
128 for name, c := range clients {
129 if err := c.dir.GeneratePrivateKey(); err != nil {
130 return fmt.Errorf("generating %s key: %w", name, err)
131 }
132 k, err := c.dir.ReadPrivateKey()
133 if err != nil {
134 return fmt.Errorf("reading %s key: %w", name, err)
135 }
136 c.sk = k
137 c.pk = c.sk.Public().(ed25519.PublicKey)
138 }
139 kwr.CsiProvisionerPubkey = clients["csi"].pk
140 kwr.NetservicesPubkey = clients["netserv"].pk
141
142 // ...issue certificates...
143 res, err := s.c.CuratorClient.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
144 Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
145 KubernetesWorker: kwr,
146 },
147 })
148 if err != nil {
149 return fmt.Errorf("failed to get certificates from curator: %w", err)
150 }
151 kw := res.Kind.(*ipb.IssueCertificateResponse_KubernetesWorker_).KubernetesWorker
152
153 // ...write them...
154 if err := kubelet.setCertificates(kw); err != nil {
155 return fmt.Errorf("failed to write kubelet certs: %w", err)
156 }
157 for name, c := range clients {
158 if c.dir == nil {
159 continue
160 }
161 if err := c.dir.WriteCertificates(kw.IdentityCaCertificate, c.certFrom(kw)); err != nil {
162 return fmt.Errorf("failed to write %s certs: %w", name, err)
163 }
164 }
165
166 // ... and set up connections.
167 for name, c := range clients {
168 kubeconf, err := kpki.KubeconfigRaw(kw.IdentityCaCertificate, c.certFrom(kw), c.sk, kpki.KubernetesAPIEndpointForWorker)
169 if err != nil {
170 return fmt.Errorf("failed to make %s kubeconfig: %w", name, err)
171 }
172 c.kubeconfig = kubeconf
173 cs, informers, err := connectByKubeconfig(kubeconf)
174 if err != nil {
175 return fmt.Errorf("failed to connect with %s: %w", name, err)
176 }
177 c.client = cs
178 c.informers = informers
179 }
180
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100181 csiPlugin := csiPluginServer{
182 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
183 VolumesDirectory: &s.c.Root.Data.Volumes,
184 }
185
186 csiProvisioner := csiProvisionerServer{
187 NodeName: s.c.NodeID,
188 Kubernetes: clients["csi"].client,
189 InformerFactory: clients["csi"].informers,
190 VolumesDirectory: &s.c.Root.Data.Volumes,
191 }
192
193 clusternet := clusternet.Service{
194 NodeName: s.c.NodeID,
195 Kubernetes: clients["netserv"].client,
196 Prefixes: s.c.PodNetwork,
197 }
198
199 nfproxy := nfproxy.Service{
200 ClusterCIDR: s.c.ClusterNet,
201 ClientSet: clients["netserv"].client,
202 }
203
204 kvmDevicePlugin := kvmdevice.Plugin{
205 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
206 }
207
208 for _, sub := range []struct {
209 name string
210 runnable supervisor.Runnable
211 }{
212 {"csi-plugin", csiPlugin.Run},
213 {"csi-provisioner", csiProvisioner.Run},
214 {"clusternet", clusternet.Run},
215 {"nfproxy", nfproxy.Run},
216 {"kvmdeviceplugin", kvmDevicePlugin.Run},
Lorenz Brun0e291a12023-06-01 12:22:45 +0200217 {"kubelet", kubelet.Run},
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100218 } {
219 err := supervisor.Run(ctx, sub.name, sub.runnable)
220 if err != nil {
221 return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
222 }
223 }
224
225 supervisor.Logger(ctx).Info("Registering K8s CoreDNS")
226 clusterDNSDirective := dns.NewKubernetesDirective(s.c.ClusterDomain, clients["netserv"].kubeconfig)
227 s.c.Network.ConfigureDNS(clusterDNSDirective)
228
Serge Bazanski6fdca3f2023-03-20 17:47:07 +0100229 supervisor.Signal(ctx, supervisor.SignalHealthy)
230 <-ctx.Done()
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100231 s.c.Network.ConfigureDNS(dns.CancelDirective(clusterDNSDirective))
Serge Bazanski6fdca3f2023-03-20 17:47:07 +0100232 return nil
233}
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100234
235func connectByKubeconfig(kubeconfig []byte) (*kubernetes.Clientset, informers.SharedInformerFactory, error) {
236 rawClientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
237 if err != nil {
238 return nil, nil, fmt.Errorf("could not generate kubernetes client config: %w", err)
239 }
240 clientConfig, err := rawClientConfig.ClientConfig()
241 clientSet, err := kubernetes.NewForConfig(clientConfig)
242 if err != nil {
243 return nil, nil, fmt.Errorf("could not generate kubernetes client: %w", err)
244 }
245 informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
246 return clientSet, informerFactory, nil
247}