blob: e65d39baf2cc669f40373b3e3624db1532e55dc0 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01004package kubernetes
5
6import (
7 "context"
Serge Bazanski2cfafc92023-03-21 16:42:47 +01008 "crypto/ed25519"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01009 "fmt"
10 "net"
Jan Schär91bf1c82024-07-29 17:31:33 +020011 "net/netip"
Serge Bazanski2cfafc92023-03-21 16:42:47 +010012 "time"
13
14 "k8s.io/client-go/informers"
15 "k8s.io/client-go/kubernetes"
16 "k8s.io/client-go/tools/clientcmd"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010017
18 "source.monogon.dev/go/net/tinylb"
19 "source.monogon.dev/metropolis/node"
Serge Bazanskib565cc62023-03-30 18:43:51 +020020 oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010021 "source.monogon.dev/metropolis/node/core/localstorage"
22 "source.monogon.dev/metropolis/node/core/network"
Serge Bazanski2cfafc92023-03-21 16:42:47 +010023 "source.monogon.dev/metropolis/node/kubernetes/clusternet"
24 "source.monogon.dev/metropolis/node/kubernetes/nfproxy"
25 kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
26 "source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020027 "source.monogon.dev/osbase/event"
28 "source.monogon.dev/osbase/event/memory"
Jan Schär91bf1c82024-07-29 17:31:33 +020029 kubernetesDNS "source.monogon.dev/osbase/net/dns/kubernetes"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020030 "source.monogon.dev/osbase/supervisor"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010031
32 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
33)
34
35type ConfigWorker struct {
36 ServiceIPRange net.IPNet
37 ClusterNet net.IPNet
38 ClusterDomain string
39
40 Root *localstorage.Root
41 Network *network.Service
42 NodeID string
43 CuratorClient ipb.CuratorClient
Serge Bazanskib565cc62023-03-30 18:43:51 +020044 PodNetwork event.Value[*oclusternet.Prefixes]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010045}
46
47type Worker struct {
48 c ConfigWorker
49}
50
51func NewWorker(c ConfigWorker) *Worker {
52 s := &Worker{
53 c: c,
54 }
55 return s
56}
57
58func (s *Worker) Run(ctx context.Context) error {
59 // Run apiproxy, which load-balances connections from worker components to this
60 // cluster's api servers. This is necessary as we want to round-robin across all
61 // available apiservers, and Kubernetes components do not implement client-side
62 // load-balancing.
63 err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
64 lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort))
65 if err != nil {
66 return fmt.Errorf("failed to listen: %w", err)
67 }
68 defer lis.Close()
69
70 v := memory.Value[tinylb.BackendSet]{}
71 srv := tinylb.Server{
72 Provider: &v,
73 Listener: lis,
74 }
75 err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
76 return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
77 })
78 if err != nil {
79 return err
80 }
81
82 supervisor.Logger(ctx).Infof("Starting proxy...")
83 return srv.Run(ctx)
84 })
85 if err != nil {
86 return err
87 }
Serge Bazanskib565cc62023-03-30 18:43:51 +020088
Serge Bazanski2cfafc92023-03-21 16:42:47 +010089 kubelet := kubeletService{
90 ClusterDomain: s.c.ClusterDomain,
91 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
92 EphemeralDirectory: &s.c.Root.Ephemeral,
Lorenz Brun0e291a12023-06-01 12:22:45 +020093 ClusterDNS: []net.IP{node.ContainerDNSIP},
Serge Bazanski2cfafc92023-03-21 16:42:47 +010094 }
95
96 // Gather all required material to send over for certficiate issuance to the
97 // curator...
98 kwr := &ipb.IssueCertificateRequest_KubernetesWorker{}
99
100 kubeletPK, err := kubelet.getPubkey(ctx)
101 if err != nil {
102 return fmt.Errorf("when getting kubelet pubkey: %w", err)
103 }
104 kwr.KubeletPubkey = kubeletPK
105
106 clients := map[string]*struct {
107 dir *localstorage.PKIDirectory
108
109 sk ed25519.PrivateKey
110 pk ed25519.PublicKey
111
112 client *kubernetes.Clientset
113 informers informers.SharedInformerFactory
114 kubeconfig []byte
115
116 certFrom func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte
117 }{
118 "csi": {
119 dir: &s.c.Root.Data.Kubernetes.CSIProvisioner.PKI,
120 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
121 return kw.CsiProvisionerCertificate
122 },
123 },
124 "netserv": {
125 dir: &s.c.Root.Data.Kubernetes.Netservices.PKI,
126 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
127 return kw.NetservicesCertificate
128 },
129 },
130 }
131
132 for name, c := range clients {
133 if err := c.dir.GeneratePrivateKey(); err != nil {
134 return fmt.Errorf("generating %s key: %w", name, err)
135 }
136 k, err := c.dir.ReadPrivateKey()
137 if err != nil {
138 return fmt.Errorf("reading %s key: %w", name, err)
139 }
140 c.sk = k
141 c.pk = c.sk.Public().(ed25519.PublicKey)
142 }
143 kwr.CsiProvisionerPubkey = clients["csi"].pk
144 kwr.NetservicesPubkey = clients["netserv"].pk
145
146 // ...issue certificates...
147 res, err := s.c.CuratorClient.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
148 Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
149 KubernetesWorker: kwr,
150 },
151 })
152 if err != nil {
153 return fmt.Errorf("failed to get certificates from curator: %w", err)
154 }
155 kw := res.Kind.(*ipb.IssueCertificateResponse_KubernetesWorker_).KubernetesWorker
156
157 // ...write them...
158 if err := kubelet.setCertificates(kw); err != nil {
159 return fmt.Errorf("failed to write kubelet certs: %w", err)
160 }
161 for name, c := range clients {
162 if c.dir == nil {
163 continue
164 }
165 if err := c.dir.WriteCertificates(kw.IdentityCaCertificate, c.certFrom(kw)); err != nil {
166 return fmt.Errorf("failed to write %s certs: %w", name, err)
167 }
168 }
169
170 // ... and set up connections.
171 for name, c := range clients {
172 kubeconf, err := kpki.KubeconfigRaw(kw.IdentityCaCertificate, c.certFrom(kw), c.sk, kpki.KubernetesAPIEndpointForWorker)
173 if err != nil {
174 return fmt.Errorf("failed to make %s kubeconfig: %w", name, err)
175 }
176 c.kubeconfig = kubeconf
177 cs, informers, err := connectByKubeconfig(kubeconf)
178 if err != nil {
179 return fmt.Errorf("failed to connect with %s: %w", name, err)
180 }
181 c.client = cs
182 c.informers = informers
183 }
184
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100185 csiPlugin := csiPluginServer{
186 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
187 VolumesDirectory: &s.c.Root.Data.Volumes,
188 }
189
190 csiProvisioner := csiProvisionerServer{
191 NodeName: s.c.NodeID,
192 Kubernetes: clients["csi"].client,
193 InformerFactory: clients["csi"].informers,
194 VolumesDirectory: &s.c.Root.Data.Volumes,
195 }
196
197 clusternet := clusternet.Service{
198 NodeName: s.c.NodeID,
199 Kubernetes: clients["netserv"].client,
200 Prefixes: s.c.PodNetwork,
201 }
202
203 nfproxy := nfproxy.Service{
204 ClusterCIDR: s.c.ClusterNet,
205 ClientSet: clients["netserv"].client,
206 }
207
Jan Schär91bf1c82024-07-29 17:31:33 +0200208 var dnsIPRanges []netip.Prefix
209 for _, ipNet := range []net.IPNet{s.c.ServiceIPRange, s.c.ClusterNet} {
210 ipPrefix, err := netip.ParsePrefix(ipNet.String())
211 if err != nil {
212 return fmt.Errorf("invalid IP range %s", ipNet)
213 }
214 dnsIPRanges = append(dnsIPRanges, ipPrefix)
215 }
216 dnsService := kubernetesDNS.New(s.c.ClusterDomain, dnsIPRanges)
217 dnsService.ClientSet = clients["netserv"].client
218 // Set the DNS handler. When the node has no Kubernetes Worker role,
219 // or the role is removed, this is set to an empty handler in
220 // //metropolis/node/core/roleserve/worker_kubernetes.go.
221 s.c.Network.DNS.SetHandler("kubernetes", dnsService)
222
223 if err := s.c.Network.AddLoopbackIP(node.ContainerDNSIP); err != nil {
224 return fmt.Errorf("failed to add local IP for container DNS: %w", err)
225 }
226 defer func() {
227 if err := s.c.Network.ReleaseLoopbackIP(node.ContainerDNSIP); err != nil {
228 supervisor.Logger(ctx).Errorf("Failed to release local IP for container DNS: %v", err)
229 }
230 }()
231 runDNSListener := func(ctx context.Context) error {
232 return s.c.Network.DNS.RunListenerAddr(ctx, net.JoinHostPort(node.ContainerDNSIP.String(), "53"))
233 }
234
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100235 kvmDevicePlugin := kvmdevice.Plugin{
236 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
237 }
238
239 for _, sub := range []struct {
240 name string
241 runnable supervisor.Runnable
242 }{
243 {"csi-plugin", csiPlugin.Run},
244 {"csi-provisioner", csiProvisioner.Run},
245 {"clusternet", clusternet.Run},
246 {"nfproxy", nfproxy.Run},
Jan Schär91bf1c82024-07-29 17:31:33 +0200247 {"dns-service", dnsService.Run},
248 {"dns-listener", runDNSListener},
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100249 {"kvmdeviceplugin", kvmDevicePlugin.Run},
Lorenz Brun0e291a12023-06-01 12:22:45 +0200250 {"kubelet", kubelet.Run},
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100251 } {
252 err := supervisor.Run(ctx, sub.name, sub.runnable)
253 if err != nil {
254 return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
255 }
256 }
257
Serge Bazanski6fdca3f2023-03-20 17:47:07 +0100258 supervisor.Signal(ctx, supervisor.SignalHealthy)
259 <-ctx.Done()
260 return nil
261}
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100262
263func connectByKubeconfig(kubeconfig []byte) (*kubernetes.Clientset, informers.SharedInformerFactory, error) {
264 rawClientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
265 if err != nil {
266 return nil, nil, fmt.Errorf("could not generate kubernetes client config: %w", err)
267 }
268 clientConfig, err := rawClientConfig.ClientConfig()
Tim Windelschmidt096654a2024-04-18 23:10:19 +0200269 if err != nil {
270 return nil, nil, fmt.Errorf("could not fetch generate client config: %w", err)
271 }
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100272 clientSet, err := kubernetes.NewForConfig(clientConfig)
273 if err != nil {
274 return nil, nil, fmt.Errorf("could not generate kubernetes client: %w", err)
275 }
276 informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
277 return clientSet, informerFactory, nil
278}