blob: 0eb643558db972d95bd06b70274a930bb41abda9 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01004package kubernetes
5
6import (
7 "context"
Serge Bazanski2cfafc92023-03-21 16:42:47 +01008 "crypto/ed25519"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01009 "fmt"
10 "net"
Jan Schär91bf1c82024-07-29 17:31:33 +020011 "net/netip"
Serge Bazanski2cfafc92023-03-21 16:42:47 +010012 "time"
13
14 "k8s.io/client-go/informers"
15 "k8s.io/client-go/kubernetes"
16 "k8s.io/client-go/tools/clientcmd"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010017
18 "source.monogon.dev/go/net/tinylb"
Jan Schär0f8ce4c2025-09-04 13:27:50 +020019 "source.monogon.dev/metropolis/node/allocs"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010020 "source.monogon.dev/metropolis/node/core/localstorage"
Lorenz Brune8beaed2025-02-05 22:03:50 +010021 "source.monogon.dev/metropolis/node/core/metrics"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010022 "source.monogon.dev/metropolis/node/core/network"
Lorenz Bruncb76c842025-08-11 12:54:28 +020023 "source.monogon.dev/metropolis/node/core/network/ipam"
Serge Bazanski2cfafc92023-03-21 16:42:47 +010024 "source.monogon.dev/metropolis/node/kubernetes/clusternet"
Lorenz Brune8beaed2025-02-05 22:03:50 +010025 "source.monogon.dev/metropolis/node/kubernetes/metricsprovider"
Lorenz Brun52700ae2025-01-28 15:07:08 +010026 "source.monogon.dev/metropolis/node/kubernetes/networkpolicy"
Serge Bazanski2cfafc92023-03-21 16:42:47 +010027 "source.monogon.dev/metropolis/node/kubernetes/nfproxy"
28 kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
29 "source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020030 "source.monogon.dev/osbase/event"
31 "source.monogon.dev/osbase/event/memory"
Jan Schär91bf1c82024-07-29 17:31:33 +020032 kubernetesDNS "source.monogon.dev/osbase/net/dns/kubernetes"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020033 "source.monogon.dev/osbase/supervisor"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010034
35 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
36)
37
38type ConfigWorker struct {
39 ServiceIPRange net.IPNet
40 ClusterNet net.IPNet
41 ClusterDomain string
42
43 Root *localstorage.Root
44 Network *network.Service
45 NodeID string
46 CuratorClient ipb.CuratorClient
Lorenz Bruncb76c842025-08-11 12:54:28 +020047 PodNetwork event.Value[*ipam.Prefixes]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010048}
49
50type Worker struct {
51 c ConfigWorker
52}
53
54func NewWorker(c ConfigWorker) *Worker {
55 s := &Worker{
56 c: c,
57 }
58 return s
59}
60
61func (s *Worker) Run(ctx context.Context) error {
Lorenz Brune8beaed2025-02-05 22:03:50 +010062 metrics.CoreRegistry.MustRegister(metricsprovider.Registry)
63 defer metrics.CoreRegistry.Unregister(metricsprovider.Registry)
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010064 // Run apiproxy, which load-balances connections from worker components to this
65 // cluster's api servers. This is necessary as we want to round-robin across all
66 // available apiservers, and Kubernetes components do not implement client-side
67 // load-balancing.
68 err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
Jan Schär0f8ce4c2025-09-04 13:27:50 +020069 lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", allocs.PortKubernetesWorkerLocalAPI))
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010070 if err != nil {
71 return fmt.Errorf("failed to listen: %w", err)
72 }
73 defer lis.Close()
74
75 v := memory.Value[tinylb.BackendSet]{}
76 srv := tinylb.Server{
77 Provider: &v,
78 Listener: lis,
79 }
80 err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
81 return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
82 })
83 if err != nil {
84 return err
85 }
86
87 supervisor.Logger(ctx).Infof("Starting proxy...")
88 return srv.Run(ctx)
89 })
90 if err != nil {
91 return err
92 }
Serge Bazanskib565cc62023-03-30 18:43:51 +020093
Serge Bazanski2cfafc92023-03-21 16:42:47 +010094 kubelet := kubeletService{
95 ClusterDomain: s.c.ClusterDomain,
96 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
97 EphemeralDirectory: &s.c.Root.Ephemeral,
Jan Schär0f8ce4c2025-09-04 13:27:50 +020098 ClusterDNS: []net.IP{allocs.IPContainerDNS},
Serge Bazanski2cfafc92023-03-21 16:42:47 +010099 }
100
101 // Gather all required material to send over for certficiate issuance to the
102 // curator...
103 kwr := &ipb.IssueCertificateRequest_KubernetesWorker{}
104
105 kubeletPK, err := kubelet.getPubkey(ctx)
106 if err != nil {
107 return fmt.Errorf("when getting kubelet pubkey: %w", err)
108 }
109 kwr.KubeletPubkey = kubeletPK
110
111 clients := map[string]*struct {
112 dir *localstorage.PKIDirectory
113
114 sk ed25519.PrivateKey
115 pk ed25519.PublicKey
116
117 client *kubernetes.Clientset
118 informers informers.SharedInformerFactory
119 kubeconfig []byte
120
121 certFrom func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte
122 }{
123 "csi": {
124 dir: &s.c.Root.Data.Kubernetes.CSIProvisioner.PKI,
125 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
126 return kw.CsiProvisionerCertificate
127 },
128 },
129 "netserv": {
130 dir: &s.c.Root.Data.Kubernetes.Netservices.PKI,
131 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
132 return kw.NetservicesCertificate
133 },
134 },
135 }
136
137 for name, c := range clients {
138 if err := c.dir.GeneratePrivateKey(); err != nil {
139 return fmt.Errorf("generating %s key: %w", name, err)
140 }
141 k, err := c.dir.ReadPrivateKey()
142 if err != nil {
143 return fmt.Errorf("reading %s key: %w", name, err)
144 }
145 c.sk = k
146 c.pk = c.sk.Public().(ed25519.PublicKey)
147 }
148 kwr.CsiProvisionerPubkey = clients["csi"].pk
149 kwr.NetservicesPubkey = clients["netserv"].pk
150
151 // ...issue certificates...
152 res, err := s.c.CuratorClient.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
153 Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
154 KubernetesWorker: kwr,
155 },
156 })
157 if err != nil {
158 return fmt.Errorf("failed to get certificates from curator: %w", err)
159 }
160 kw := res.Kind.(*ipb.IssueCertificateResponse_KubernetesWorker_).KubernetesWorker
161
162 // ...write them...
163 if err := kubelet.setCertificates(kw); err != nil {
164 return fmt.Errorf("failed to write kubelet certs: %w", err)
165 }
166 for name, c := range clients {
167 if c.dir == nil {
168 continue
169 }
170 if err := c.dir.WriteCertificates(kw.IdentityCaCertificate, c.certFrom(kw)); err != nil {
171 return fmt.Errorf("failed to write %s certs: %w", name, err)
172 }
173 }
174
175 // ... and set up connections.
176 for name, c := range clients {
177 kubeconf, err := kpki.KubeconfigRaw(kw.IdentityCaCertificate, c.certFrom(kw), c.sk, kpki.KubernetesAPIEndpointForWorker)
178 if err != nil {
179 return fmt.Errorf("failed to make %s kubeconfig: %w", name, err)
180 }
181 c.kubeconfig = kubeconf
182 cs, informers, err := connectByKubeconfig(kubeconf)
183 if err != nil {
184 return fmt.Errorf("failed to connect with %s: %w", name, err)
185 }
186 c.client = cs
187 c.informers = informers
188 }
189
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100190 csiPlugin := csiPluginServer{
191 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
192 VolumesDirectory: &s.c.Root.Data.Volumes,
193 }
194
195 csiProvisioner := csiProvisionerServer{
196 NodeName: s.c.NodeID,
197 Kubernetes: clients["csi"].client,
198 InformerFactory: clients["csi"].informers,
199 VolumesDirectory: &s.c.Root.Data.Volumes,
200 }
201
202 clusternet := clusternet.Service{
203 NodeName: s.c.NodeID,
204 Kubernetes: clients["netserv"].client,
205 Prefixes: s.c.PodNetwork,
206 }
207
208 nfproxy := nfproxy.Service{
209 ClusterCIDR: s.c.ClusterNet,
210 ClientSet: clients["netserv"].client,
211 }
212
Lorenz Brun52700ae2025-01-28 15:07:08 +0100213 npc := networkpolicy.Service{
214 Kubernetes: clients["netserv"].client,
215 }
216
Jan Schär91bf1c82024-07-29 17:31:33 +0200217 var dnsIPRanges []netip.Prefix
218 for _, ipNet := range []net.IPNet{s.c.ServiceIPRange, s.c.ClusterNet} {
219 ipPrefix, err := netip.ParsePrefix(ipNet.String())
220 if err != nil {
221 return fmt.Errorf("invalid IP range %s", ipNet)
222 }
223 dnsIPRanges = append(dnsIPRanges, ipPrefix)
224 }
225 dnsService := kubernetesDNS.New(s.c.ClusterDomain, dnsIPRanges)
226 dnsService.ClientSet = clients["netserv"].client
227 // Set the DNS handler. When the node has no Kubernetes Worker role,
228 // or the role is removed, this is set to an empty handler in
229 // //metropolis/node/core/roleserve/worker_kubernetes.go.
230 s.c.Network.DNS.SetHandler("kubernetes", dnsService)
231
Jan Schär0f8ce4c2025-09-04 13:27:50 +0200232 if err := s.c.Network.AddLoopbackIP(allocs.IPContainerDNS); err != nil {
Jan Schär91bf1c82024-07-29 17:31:33 +0200233 return fmt.Errorf("failed to add local IP for container DNS: %w", err)
234 }
235 defer func() {
Jan Schär0f8ce4c2025-09-04 13:27:50 +0200236 if err := s.c.Network.ReleaseLoopbackIP(allocs.IPContainerDNS); err != nil {
Jan Schär91bf1c82024-07-29 17:31:33 +0200237 supervisor.Logger(ctx).Errorf("Failed to release local IP for container DNS: %v", err)
238 }
239 }()
240 runDNSListener := func(ctx context.Context) error {
Jan Schär0f8ce4c2025-09-04 13:27:50 +0200241 return s.c.Network.DNS.RunListenerAddr(ctx, net.JoinHostPort(allocs.IPContainerDNS.String(), "53"))
Jan Schär91bf1c82024-07-29 17:31:33 +0200242 }
243
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100244 kvmDevicePlugin := kvmdevice.Plugin{
245 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
246 }
247
248 for _, sub := range []struct {
249 name string
250 runnable supervisor.Runnable
251 }{
252 {"csi-plugin", csiPlugin.Run},
253 {"csi-provisioner", csiProvisioner.Run},
254 {"clusternet", clusternet.Run},
255 {"nfproxy", nfproxy.Run},
Jan Schär91bf1c82024-07-29 17:31:33 +0200256 {"dns-service", dnsService.Run},
257 {"dns-listener", runDNSListener},
Lorenz Brun52700ae2025-01-28 15:07:08 +0100258 {"npc", npc.Run},
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100259 {"kvmdeviceplugin", kvmDevicePlugin.Run},
Lorenz Brun0e291a12023-06-01 12:22:45 +0200260 {"kubelet", kubelet.Run},
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100261 } {
262 err := supervisor.Run(ctx, sub.name, sub.runnable)
263 if err != nil {
264 return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
265 }
266 }
267
Serge Bazanski6fdca3f2023-03-20 17:47:07 +0100268 supervisor.Signal(ctx, supervisor.SignalHealthy)
269 <-ctx.Done()
270 return nil
271}
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100272
273func connectByKubeconfig(kubeconfig []byte) (*kubernetes.Clientset, informers.SharedInformerFactory, error) {
274 rawClientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
275 if err != nil {
276 return nil, nil, fmt.Errorf("could not generate kubernetes client config: %w", err)
277 }
278 clientConfig, err := rawClientConfig.ClientConfig()
Tim Windelschmidt096654a2024-04-18 23:10:19 +0200279 if err != nil {
280 return nil, nil, fmt.Errorf("could not fetch generate client config: %w", err)
281 }
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100282 clientSet, err := kubernetes.NewForConfig(clientConfig)
283 if err != nil {
284 return nil, nil, fmt.Errorf("could not generate kubernetes client: %w", err)
285 }
286 informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
287 return clientSet, informerFactory, nil
288}