blob: c07217fa2396458902a9dfa6cff92f4fc4ccdbf2 [file] [log] [blame]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01001package kubernetes
2
3import (
4 "context"
Serge Bazanski2cfafc92023-03-21 16:42:47 +01005 "crypto/ed25519"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01006 "fmt"
7 "net"
Jan Schär91bf1c82024-07-29 17:31:33 +02008 "net/netip"
Serge Bazanski2cfafc92023-03-21 16:42:47 +01009 "time"
10
11 "k8s.io/client-go/informers"
12 "k8s.io/client-go/kubernetes"
13 "k8s.io/client-go/tools/clientcmd"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010014
15 "source.monogon.dev/go/net/tinylb"
16 "source.monogon.dev/metropolis/node"
Serge Bazanskib565cc62023-03-30 18:43:51 +020017 oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010018 "source.monogon.dev/metropolis/node/core/localstorage"
19 "source.monogon.dev/metropolis/node/core/network"
Serge Bazanski2cfafc92023-03-21 16:42:47 +010020 "source.monogon.dev/metropolis/node/kubernetes/clusternet"
21 "source.monogon.dev/metropolis/node/kubernetes/nfproxy"
22 kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
23 "source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020024 "source.monogon.dev/osbase/event"
25 "source.monogon.dev/osbase/event/memory"
Jan Schär91bf1c82024-07-29 17:31:33 +020026 kubernetesDNS "source.monogon.dev/osbase/net/dns/kubernetes"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020027 "source.monogon.dev/osbase/supervisor"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010028
29 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
30)
31
32type ConfigWorker struct {
33 ServiceIPRange net.IPNet
34 ClusterNet net.IPNet
35 ClusterDomain string
36
37 Root *localstorage.Root
38 Network *network.Service
39 NodeID string
40 CuratorClient ipb.CuratorClient
Serge Bazanskib565cc62023-03-30 18:43:51 +020041 PodNetwork event.Value[*oclusternet.Prefixes]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010042}
43
44type Worker struct {
45 c ConfigWorker
46}
47
48func NewWorker(c ConfigWorker) *Worker {
49 s := &Worker{
50 c: c,
51 }
52 return s
53}
54
55func (s *Worker) Run(ctx context.Context) error {
56 // Run apiproxy, which load-balances connections from worker components to this
57 // cluster's api servers. This is necessary as we want to round-robin across all
58 // available apiservers, and Kubernetes components do not implement client-side
59 // load-balancing.
60 err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
61 lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort))
62 if err != nil {
63 return fmt.Errorf("failed to listen: %w", err)
64 }
65 defer lis.Close()
66
67 v := memory.Value[tinylb.BackendSet]{}
68 srv := tinylb.Server{
69 Provider: &v,
70 Listener: lis,
71 }
72 err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
73 return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
74 })
75 if err != nil {
76 return err
77 }
78
79 supervisor.Logger(ctx).Infof("Starting proxy...")
80 return srv.Run(ctx)
81 })
82 if err != nil {
83 return err
84 }
Serge Bazanskib565cc62023-03-30 18:43:51 +020085
Serge Bazanski2cfafc92023-03-21 16:42:47 +010086 kubelet := kubeletService{
87 ClusterDomain: s.c.ClusterDomain,
88 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
89 EphemeralDirectory: &s.c.Root.Ephemeral,
Lorenz Brun0e291a12023-06-01 12:22:45 +020090 ClusterDNS: []net.IP{node.ContainerDNSIP},
Serge Bazanski2cfafc92023-03-21 16:42:47 +010091 }
92
93 // Gather all required material to send over for certficiate issuance to the
94 // curator...
95 kwr := &ipb.IssueCertificateRequest_KubernetesWorker{}
96
97 kubeletPK, err := kubelet.getPubkey(ctx)
98 if err != nil {
99 return fmt.Errorf("when getting kubelet pubkey: %w", err)
100 }
101 kwr.KubeletPubkey = kubeletPK
102
103 clients := map[string]*struct {
104 dir *localstorage.PKIDirectory
105
106 sk ed25519.PrivateKey
107 pk ed25519.PublicKey
108
109 client *kubernetes.Clientset
110 informers informers.SharedInformerFactory
111 kubeconfig []byte
112
113 certFrom func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte
114 }{
115 "csi": {
116 dir: &s.c.Root.Data.Kubernetes.CSIProvisioner.PKI,
117 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
118 return kw.CsiProvisionerCertificate
119 },
120 },
121 "netserv": {
122 dir: &s.c.Root.Data.Kubernetes.Netservices.PKI,
123 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
124 return kw.NetservicesCertificate
125 },
126 },
127 }
128
129 for name, c := range clients {
130 if err := c.dir.GeneratePrivateKey(); err != nil {
131 return fmt.Errorf("generating %s key: %w", name, err)
132 }
133 k, err := c.dir.ReadPrivateKey()
134 if err != nil {
135 return fmt.Errorf("reading %s key: %w", name, err)
136 }
137 c.sk = k
138 c.pk = c.sk.Public().(ed25519.PublicKey)
139 }
140 kwr.CsiProvisionerPubkey = clients["csi"].pk
141 kwr.NetservicesPubkey = clients["netserv"].pk
142
143 // ...issue certificates...
144 res, err := s.c.CuratorClient.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
145 Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
146 KubernetesWorker: kwr,
147 },
148 })
149 if err != nil {
150 return fmt.Errorf("failed to get certificates from curator: %w", err)
151 }
152 kw := res.Kind.(*ipb.IssueCertificateResponse_KubernetesWorker_).KubernetesWorker
153
154 // ...write them...
155 if err := kubelet.setCertificates(kw); err != nil {
156 return fmt.Errorf("failed to write kubelet certs: %w", err)
157 }
158 for name, c := range clients {
159 if c.dir == nil {
160 continue
161 }
162 if err := c.dir.WriteCertificates(kw.IdentityCaCertificate, c.certFrom(kw)); err != nil {
163 return fmt.Errorf("failed to write %s certs: %w", name, err)
164 }
165 }
166
167 // ... and set up connections.
168 for name, c := range clients {
169 kubeconf, err := kpki.KubeconfigRaw(kw.IdentityCaCertificate, c.certFrom(kw), c.sk, kpki.KubernetesAPIEndpointForWorker)
170 if err != nil {
171 return fmt.Errorf("failed to make %s kubeconfig: %w", name, err)
172 }
173 c.kubeconfig = kubeconf
174 cs, informers, err := connectByKubeconfig(kubeconf)
175 if err != nil {
176 return fmt.Errorf("failed to connect with %s: %w", name, err)
177 }
178 c.client = cs
179 c.informers = informers
180 }
181
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100182 csiPlugin := csiPluginServer{
183 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
184 VolumesDirectory: &s.c.Root.Data.Volumes,
185 }
186
187 csiProvisioner := csiProvisionerServer{
188 NodeName: s.c.NodeID,
189 Kubernetes: clients["csi"].client,
190 InformerFactory: clients["csi"].informers,
191 VolumesDirectory: &s.c.Root.Data.Volumes,
192 }
193
194 clusternet := clusternet.Service{
195 NodeName: s.c.NodeID,
196 Kubernetes: clients["netserv"].client,
197 Prefixes: s.c.PodNetwork,
198 }
199
200 nfproxy := nfproxy.Service{
201 ClusterCIDR: s.c.ClusterNet,
202 ClientSet: clients["netserv"].client,
203 }
204
Jan Schär91bf1c82024-07-29 17:31:33 +0200205 var dnsIPRanges []netip.Prefix
206 for _, ipNet := range []net.IPNet{s.c.ServiceIPRange, s.c.ClusterNet} {
207 ipPrefix, err := netip.ParsePrefix(ipNet.String())
208 if err != nil {
209 return fmt.Errorf("invalid IP range %s", ipNet)
210 }
211 dnsIPRanges = append(dnsIPRanges, ipPrefix)
212 }
213 dnsService := kubernetesDNS.New(s.c.ClusterDomain, dnsIPRanges)
214 dnsService.ClientSet = clients["netserv"].client
215 // Set the DNS handler. When the node has no Kubernetes Worker role,
216 // or the role is removed, this is set to an empty handler in
217 // //metropolis/node/core/roleserve/worker_kubernetes.go.
218 s.c.Network.DNS.SetHandler("kubernetes", dnsService)
219
220 if err := s.c.Network.AddLoopbackIP(node.ContainerDNSIP); err != nil {
221 return fmt.Errorf("failed to add local IP for container DNS: %w", err)
222 }
223 defer func() {
224 if err := s.c.Network.ReleaseLoopbackIP(node.ContainerDNSIP); err != nil {
225 supervisor.Logger(ctx).Errorf("Failed to release local IP for container DNS: %v", err)
226 }
227 }()
228 runDNSListener := func(ctx context.Context) error {
229 return s.c.Network.DNS.RunListenerAddr(ctx, net.JoinHostPort(node.ContainerDNSIP.String(), "53"))
230 }
231
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100232 kvmDevicePlugin := kvmdevice.Plugin{
233 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
234 }
235
236 for _, sub := range []struct {
237 name string
238 runnable supervisor.Runnable
239 }{
240 {"csi-plugin", csiPlugin.Run},
241 {"csi-provisioner", csiProvisioner.Run},
242 {"clusternet", clusternet.Run},
243 {"nfproxy", nfproxy.Run},
Jan Schär91bf1c82024-07-29 17:31:33 +0200244 {"dns-service", dnsService.Run},
245 {"dns-listener", runDNSListener},
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100246 {"kvmdeviceplugin", kvmDevicePlugin.Run},
Lorenz Brun0e291a12023-06-01 12:22:45 +0200247 {"kubelet", kubelet.Run},
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100248 } {
249 err := supervisor.Run(ctx, sub.name, sub.runnable)
250 if err != nil {
251 return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
252 }
253 }
254
Serge Bazanski6fdca3f2023-03-20 17:47:07 +0100255 supervisor.Signal(ctx, supervisor.SignalHealthy)
256 <-ctx.Done()
257 return nil
258}
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100259
260func connectByKubeconfig(kubeconfig []byte) (*kubernetes.Clientset, informers.SharedInformerFactory, error) {
261 rawClientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
262 if err != nil {
263 return nil, nil, fmt.Errorf("could not generate kubernetes client config: %w", err)
264 }
265 clientConfig, err := rawClientConfig.ClientConfig()
Tim Windelschmidt096654a2024-04-18 23:10:19 +0200266 if err != nil {
267 return nil, nil, fmt.Errorf("could not fetch generate client config: %w", err)
268 }
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100269 clientSet, err := kubernetes.NewForConfig(clientConfig)
270 if err != nil {
271 return nil, nil, fmt.Errorf("could not generate kubernetes client: %w", err)
272 }
273 informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
274 return clientSet, informerFactory, nil
275}