blob: 6f6633bda08ee3df6dbddde31f05936b2a2876c7 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01004package kubernetes
5
6import (
7 "context"
Serge Bazanski2cfafc92023-03-21 16:42:47 +01008 "crypto/ed25519"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01009 "fmt"
10 "net"
Jan Schär91bf1c82024-07-29 17:31:33 +020011 "net/netip"
Serge Bazanski2cfafc92023-03-21 16:42:47 +010012 "time"
13
14 "k8s.io/client-go/informers"
15 "k8s.io/client-go/kubernetes"
16 "k8s.io/client-go/tools/clientcmd"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010017
18 "source.monogon.dev/go/net/tinylb"
19 "source.monogon.dev/metropolis/node"
Serge Bazanskib565cc62023-03-30 18:43:51 +020020 oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010021 "source.monogon.dev/metropolis/node/core/localstorage"
Lorenz Brune8beaed2025-02-05 22:03:50 +010022 "source.monogon.dev/metropolis/node/core/metrics"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010023 "source.monogon.dev/metropolis/node/core/network"
Serge Bazanski2cfafc92023-03-21 16:42:47 +010024 "source.monogon.dev/metropolis/node/kubernetes/clusternet"
Lorenz Brune8beaed2025-02-05 22:03:50 +010025 "source.monogon.dev/metropolis/node/kubernetes/metricsprovider"
Serge Bazanski2cfafc92023-03-21 16:42:47 +010026 "source.monogon.dev/metropolis/node/kubernetes/nfproxy"
27 kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
28 "source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020029 "source.monogon.dev/osbase/event"
30 "source.monogon.dev/osbase/event/memory"
Jan Schär91bf1c82024-07-29 17:31:33 +020031 kubernetesDNS "source.monogon.dev/osbase/net/dns/kubernetes"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020032 "source.monogon.dev/osbase/supervisor"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010033
34 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
35)
36
37type ConfigWorker struct {
38 ServiceIPRange net.IPNet
39 ClusterNet net.IPNet
40 ClusterDomain string
41
42 Root *localstorage.Root
43 Network *network.Service
44 NodeID string
45 CuratorClient ipb.CuratorClient
Serge Bazanskib565cc62023-03-30 18:43:51 +020046 PodNetwork event.Value[*oclusternet.Prefixes]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010047}
48
49type Worker struct {
50 c ConfigWorker
51}
52
53func NewWorker(c ConfigWorker) *Worker {
54 s := &Worker{
55 c: c,
56 }
57 return s
58}
59
60func (s *Worker) Run(ctx context.Context) error {
Lorenz Brune8beaed2025-02-05 22:03:50 +010061 metrics.CoreRegistry.MustRegister(metricsprovider.Registry)
62 defer metrics.CoreRegistry.Unregister(metricsprovider.Registry)
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010063 // Run apiproxy, which load-balances connections from worker components to this
64 // cluster's api servers. This is necessary as we want to round-robin across all
65 // available apiservers, and Kubernetes components do not implement client-side
66 // load-balancing.
67 err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
68 lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort))
69 if err != nil {
70 return fmt.Errorf("failed to listen: %w", err)
71 }
72 defer lis.Close()
73
74 v := memory.Value[tinylb.BackendSet]{}
75 srv := tinylb.Server{
76 Provider: &v,
77 Listener: lis,
78 }
79 err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
80 return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
81 })
82 if err != nil {
83 return err
84 }
85
86 supervisor.Logger(ctx).Infof("Starting proxy...")
87 return srv.Run(ctx)
88 })
89 if err != nil {
90 return err
91 }
Serge Bazanskib565cc62023-03-30 18:43:51 +020092
Serge Bazanski2cfafc92023-03-21 16:42:47 +010093 kubelet := kubeletService{
94 ClusterDomain: s.c.ClusterDomain,
95 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
96 EphemeralDirectory: &s.c.Root.Ephemeral,
Lorenz Brun0e291a12023-06-01 12:22:45 +020097 ClusterDNS: []net.IP{node.ContainerDNSIP},
Serge Bazanski2cfafc92023-03-21 16:42:47 +010098 }
99
100 // Gather all required material to send over for certficiate issuance to the
101 // curator...
102 kwr := &ipb.IssueCertificateRequest_KubernetesWorker{}
103
104 kubeletPK, err := kubelet.getPubkey(ctx)
105 if err != nil {
106 return fmt.Errorf("when getting kubelet pubkey: %w", err)
107 }
108 kwr.KubeletPubkey = kubeletPK
109
110 clients := map[string]*struct {
111 dir *localstorage.PKIDirectory
112
113 sk ed25519.PrivateKey
114 pk ed25519.PublicKey
115
116 client *kubernetes.Clientset
117 informers informers.SharedInformerFactory
118 kubeconfig []byte
119
120 certFrom func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte
121 }{
122 "csi": {
123 dir: &s.c.Root.Data.Kubernetes.CSIProvisioner.PKI,
124 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
125 return kw.CsiProvisionerCertificate
126 },
127 },
128 "netserv": {
129 dir: &s.c.Root.Data.Kubernetes.Netservices.PKI,
130 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
131 return kw.NetservicesCertificate
132 },
133 },
134 }
135
136 for name, c := range clients {
137 if err := c.dir.GeneratePrivateKey(); err != nil {
138 return fmt.Errorf("generating %s key: %w", name, err)
139 }
140 k, err := c.dir.ReadPrivateKey()
141 if err != nil {
142 return fmt.Errorf("reading %s key: %w", name, err)
143 }
144 c.sk = k
145 c.pk = c.sk.Public().(ed25519.PublicKey)
146 }
147 kwr.CsiProvisionerPubkey = clients["csi"].pk
148 kwr.NetservicesPubkey = clients["netserv"].pk
149
150 // ...issue certificates...
151 res, err := s.c.CuratorClient.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
152 Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
153 KubernetesWorker: kwr,
154 },
155 })
156 if err != nil {
157 return fmt.Errorf("failed to get certificates from curator: %w", err)
158 }
159 kw := res.Kind.(*ipb.IssueCertificateResponse_KubernetesWorker_).KubernetesWorker
160
161 // ...write them...
162 if err := kubelet.setCertificates(kw); err != nil {
163 return fmt.Errorf("failed to write kubelet certs: %w", err)
164 }
165 for name, c := range clients {
166 if c.dir == nil {
167 continue
168 }
169 if err := c.dir.WriteCertificates(kw.IdentityCaCertificate, c.certFrom(kw)); err != nil {
170 return fmt.Errorf("failed to write %s certs: %w", name, err)
171 }
172 }
173
174 // ... and set up connections.
175 for name, c := range clients {
176 kubeconf, err := kpki.KubeconfigRaw(kw.IdentityCaCertificate, c.certFrom(kw), c.sk, kpki.KubernetesAPIEndpointForWorker)
177 if err != nil {
178 return fmt.Errorf("failed to make %s kubeconfig: %w", name, err)
179 }
180 c.kubeconfig = kubeconf
181 cs, informers, err := connectByKubeconfig(kubeconf)
182 if err != nil {
183 return fmt.Errorf("failed to connect with %s: %w", name, err)
184 }
185 c.client = cs
186 c.informers = informers
187 }
188
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100189 csiPlugin := csiPluginServer{
190 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
191 VolumesDirectory: &s.c.Root.Data.Volumes,
192 }
193
194 csiProvisioner := csiProvisionerServer{
195 NodeName: s.c.NodeID,
196 Kubernetes: clients["csi"].client,
197 InformerFactory: clients["csi"].informers,
198 VolumesDirectory: &s.c.Root.Data.Volumes,
199 }
200
201 clusternet := clusternet.Service{
202 NodeName: s.c.NodeID,
203 Kubernetes: clients["netserv"].client,
204 Prefixes: s.c.PodNetwork,
205 }
206
207 nfproxy := nfproxy.Service{
208 ClusterCIDR: s.c.ClusterNet,
209 ClientSet: clients["netserv"].client,
210 }
211
Jan Schär91bf1c82024-07-29 17:31:33 +0200212 var dnsIPRanges []netip.Prefix
213 for _, ipNet := range []net.IPNet{s.c.ServiceIPRange, s.c.ClusterNet} {
214 ipPrefix, err := netip.ParsePrefix(ipNet.String())
215 if err != nil {
216 return fmt.Errorf("invalid IP range %s", ipNet)
217 }
218 dnsIPRanges = append(dnsIPRanges, ipPrefix)
219 }
220 dnsService := kubernetesDNS.New(s.c.ClusterDomain, dnsIPRanges)
221 dnsService.ClientSet = clients["netserv"].client
222 // Set the DNS handler. When the node has no Kubernetes Worker role,
223 // or the role is removed, this is set to an empty handler in
224 // //metropolis/node/core/roleserve/worker_kubernetes.go.
225 s.c.Network.DNS.SetHandler("kubernetes", dnsService)
226
227 if err := s.c.Network.AddLoopbackIP(node.ContainerDNSIP); err != nil {
228 return fmt.Errorf("failed to add local IP for container DNS: %w", err)
229 }
230 defer func() {
231 if err := s.c.Network.ReleaseLoopbackIP(node.ContainerDNSIP); err != nil {
232 supervisor.Logger(ctx).Errorf("Failed to release local IP for container DNS: %v", err)
233 }
234 }()
235 runDNSListener := func(ctx context.Context) error {
236 return s.c.Network.DNS.RunListenerAddr(ctx, net.JoinHostPort(node.ContainerDNSIP.String(), "53"))
237 }
238
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100239 kvmDevicePlugin := kvmdevice.Plugin{
240 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
241 }
242
243 for _, sub := range []struct {
244 name string
245 runnable supervisor.Runnable
246 }{
247 {"csi-plugin", csiPlugin.Run},
248 {"csi-provisioner", csiProvisioner.Run},
249 {"clusternet", clusternet.Run},
250 {"nfproxy", nfproxy.Run},
Jan Schär91bf1c82024-07-29 17:31:33 +0200251 {"dns-service", dnsService.Run},
252 {"dns-listener", runDNSListener},
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100253 {"kvmdeviceplugin", kvmDevicePlugin.Run},
Lorenz Brun0e291a12023-06-01 12:22:45 +0200254 {"kubelet", kubelet.Run},
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100255 } {
256 err := supervisor.Run(ctx, sub.name, sub.runnable)
257 if err != nil {
258 return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
259 }
260 }
261
Serge Bazanski6fdca3f2023-03-20 17:47:07 +0100262 supervisor.Signal(ctx, supervisor.SignalHealthy)
263 <-ctx.Done()
264 return nil
265}
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100266
267func connectByKubeconfig(kubeconfig []byte) (*kubernetes.Clientset, informers.SharedInformerFactory, error) {
268 rawClientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
269 if err != nil {
270 return nil, nil, fmt.Errorf("could not generate kubernetes client config: %w", err)
271 }
272 clientConfig, err := rawClientConfig.ClientConfig()
Tim Windelschmidt096654a2024-04-18 23:10:19 +0200273 if err != nil {
274 return nil, nil, fmt.Errorf("could not fetch generate client config: %w", err)
275 }
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100276 clientSet, err := kubernetes.NewForConfig(clientConfig)
277 if err != nil {
278 return nil, nil, fmt.Errorf("could not generate kubernetes client: %w", err)
279 }
280 informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
281 return clientSet, informerFactory, nil
282}