blob: d9f333e526d2531942c0a20ff15743832dbc36ce [file] [log] [blame]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01001package kubernetes
2
3import (
4 "context"
Serge Bazanski2cfafc92023-03-21 16:42:47 +01005 "crypto/ed25519"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01006 "fmt"
7 "net"
Serge Bazanski2cfafc92023-03-21 16:42:47 +01008 "time"
9
10 "k8s.io/client-go/informers"
11 "k8s.io/client-go/kubernetes"
12 "k8s.io/client-go/tools/clientcmd"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010013
14 "source.monogon.dev/go/net/tinylb"
15 "source.monogon.dev/metropolis/node"
Serge Bazanskib565cc62023-03-30 18:43:51 +020016 oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010017 "source.monogon.dev/metropolis/node/core/localstorage"
18 "source.monogon.dev/metropolis/node/core/network"
Serge Bazanski2cfafc92023-03-21 16:42:47 +010019 "source.monogon.dev/metropolis/node/core/network/dns"
20 "source.monogon.dev/metropolis/node/kubernetes/clusternet"
21 "source.monogon.dev/metropolis/node/kubernetes/nfproxy"
22 kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
23 "source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
Serge Bazanskib565cc62023-03-30 18:43:51 +020024 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010025 "source.monogon.dev/metropolis/pkg/event/memory"
26 "source.monogon.dev/metropolis/pkg/supervisor"
27
28 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
29)
30
31type ConfigWorker struct {
32 ServiceIPRange net.IPNet
33 ClusterNet net.IPNet
34 ClusterDomain string
35
36 Root *localstorage.Root
37 Network *network.Service
38 NodeID string
39 CuratorClient ipb.CuratorClient
Serge Bazanskib565cc62023-03-30 18:43:51 +020040 PodNetwork event.Value[*oclusternet.Prefixes]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010041}
42
43type Worker struct {
44 c ConfigWorker
45}
46
47func NewWorker(c ConfigWorker) *Worker {
48 s := &Worker{
49 c: c,
50 }
51 return s
52}
53
54func (s *Worker) Run(ctx context.Context) error {
55 // Run apiproxy, which load-balances connections from worker components to this
56 // cluster's api servers. This is necessary as we want to round-robin across all
57 // available apiservers, and Kubernetes components do not implement client-side
58 // load-balancing.
59 err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
60 lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort))
61 if err != nil {
62 return fmt.Errorf("failed to listen: %w", err)
63 }
64 defer lis.Close()
65
66 v := memory.Value[tinylb.BackendSet]{}
67 srv := tinylb.Server{
68 Provider: &v,
69 Listener: lis,
70 }
71 err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
72 return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
73 })
74 if err != nil {
75 return err
76 }
77
78 supervisor.Logger(ctx).Infof("Starting proxy...")
79 return srv.Run(ctx)
80 })
81 if err != nil {
82 return err
83 }
Serge Bazanskib565cc62023-03-30 18:43:51 +020084
Serge Bazanski2cfafc92023-03-21 16:42:47 +010085 kubelet := kubeletService{
86 ClusterDomain: s.c.ClusterDomain,
87 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
88 EphemeralDirectory: &s.c.Root.Ephemeral,
89 }
90
91 // Gather all required material to send over for certficiate issuance to the
92 // curator...
93 kwr := &ipb.IssueCertificateRequest_KubernetesWorker{}
94
95 kubeletPK, err := kubelet.getPubkey(ctx)
96 if err != nil {
97 return fmt.Errorf("when getting kubelet pubkey: %w", err)
98 }
99 kwr.KubeletPubkey = kubeletPK
100
101 clients := map[string]*struct {
102 dir *localstorage.PKIDirectory
103
104 sk ed25519.PrivateKey
105 pk ed25519.PublicKey
106
107 client *kubernetes.Clientset
108 informers informers.SharedInformerFactory
109 kubeconfig []byte
110
111 certFrom func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte
112 }{
113 "csi": {
114 dir: &s.c.Root.Data.Kubernetes.CSIProvisioner.PKI,
115 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
116 return kw.CsiProvisionerCertificate
117 },
118 },
119 "netserv": {
120 dir: &s.c.Root.Data.Kubernetes.Netservices.PKI,
121 certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
122 return kw.NetservicesCertificate
123 },
124 },
125 }
126
127 for name, c := range clients {
128 if err := c.dir.GeneratePrivateKey(); err != nil {
129 return fmt.Errorf("generating %s key: %w", name, err)
130 }
131 k, err := c.dir.ReadPrivateKey()
132 if err != nil {
133 return fmt.Errorf("reading %s key: %w", name, err)
134 }
135 c.sk = k
136 c.pk = c.sk.Public().(ed25519.PublicKey)
137 }
138 kwr.CsiProvisionerPubkey = clients["csi"].pk
139 kwr.NetservicesPubkey = clients["netserv"].pk
140
141 // ...issue certificates...
142 res, err := s.c.CuratorClient.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
143 Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
144 KubernetesWorker: kwr,
145 },
146 })
147 if err != nil {
148 return fmt.Errorf("failed to get certificates from curator: %w", err)
149 }
150 kw := res.Kind.(*ipb.IssueCertificateResponse_KubernetesWorker_).KubernetesWorker
151
152 // ...write them...
153 if err := kubelet.setCertificates(kw); err != nil {
154 return fmt.Errorf("failed to write kubelet certs: %w", err)
155 }
156 for name, c := range clients {
157 if c.dir == nil {
158 continue
159 }
160 if err := c.dir.WriteCertificates(kw.IdentityCaCertificate, c.certFrom(kw)); err != nil {
161 return fmt.Errorf("failed to write %s certs: %w", name, err)
162 }
163 }
164
165 // ... and set up connections.
166 for name, c := range clients {
167 kubeconf, err := kpki.KubeconfigRaw(kw.IdentityCaCertificate, c.certFrom(kw), c.sk, kpki.KubernetesAPIEndpointForWorker)
168 if err != nil {
169 return fmt.Errorf("failed to make %s kubeconfig: %w", name, err)
170 }
171 c.kubeconfig = kubeconf
172 cs, informers, err := connectByKubeconfig(kubeconf)
173 if err != nil {
174 return fmt.Errorf("failed to connect with %s: %w", name, err)
175 }
176 c.client = cs
177 c.informers = informers
178 }
179
180 // Sub-runnable which starts all parts of Kubernetes that depend on the
181 // machine's external IP address. If it changes, the runnable will exit.
182 // TODO(q3k): test this
183 supervisor.Run(ctx, "networked", func(ctx context.Context) error {
184 networkWatch := s.c.Network.Watch()
185 defer networkWatch.Close()
186
187 var status *network.Status
188
189 supervisor.Logger(ctx).Info("Waiting for node networking...")
190 for status == nil || status.ExternalAddress == nil {
191 status, err = networkWatch.Get(ctx)
192 if err != nil {
193 return fmt.Errorf("failed to get network status: %w", err)
194 }
195 }
196 address := status.ExternalAddress
197 supervisor.Logger(ctx).Info("Node has active networking, starting apiserver/kubelet")
198 kubelet.ClusterDNS = []net.IP{address}
199 err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
200 "kubelet": kubelet.Run,
201 })
202 if err != nil {
203 return fmt.Errorf("when starting kubelet: %w", err)
204 }
205
206 supervisor.Signal(ctx, supervisor.SignalHealthy)
207
208 for status.ExternalAddress.Equal(address) {
209 status, err = networkWatch.Get(ctx)
210 if err != nil {
211 return fmt.Errorf("when watching for network changes: %w", err)
212 }
213 }
214 return fmt.Errorf("network configuration changed (%s -> %s)", address.String(), status.ExternalAddress.String())
215 })
216
217 csiPlugin := csiPluginServer{
218 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
219 VolumesDirectory: &s.c.Root.Data.Volumes,
220 }
221
222 csiProvisioner := csiProvisionerServer{
223 NodeName: s.c.NodeID,
224 Kubernetes: clients["csi"].client,
225 InformerFactory: clients["csi"].informers,
226 VolumesDirectory: &s.c.Root.Data.Volumes,
227 }
228
229 clusternet := clusternet.Service{
230 NodeName: s.c.NodeID,
231 Kubernetes: clients["netserv"].client,
232 Prefixes: s.c.PodNetwork,
233 }
234
235 nfproxy := nfproxy.Service{
236 ClusterCIDR: s.c.ClusterNet,
237 ClientSet: clients["netserv"].client,
238 }
239
240 kvmDevicePlugin := kvmdevice.Plugin{
241 KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
242 }
243
244 for _, sub := range []struct {
245 name string
246 runnable supervisor.Runnable
247 }{
248 {"csi-plugin", csiPlugin.Run},
249 {"csi-provisioner", csiProvisioner.Run},
250 {"clusternet", clusternet.Run},
251 {"nfproxy", nfproxy.Run},
252 {"kvmdeviceplugin", kvmDevicePlugin.Run},
253 } {
254 err := supervisor.Run(ctx, sub.name, sub.runnable)
255 if err != nil {
256 return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
257 }
258 }
259
260 supervisor.Logger(ctx).Info("Registering K8s CoreDNS")
261 clusterDNSDirective := dns.NewKubernetesDirective(s.c.ClusterDomain, clients["netserv"].kubeconfig)
262 s.c.Network.ConfigureDNS(clusterDNSDirective)
263
Serge Bazanski6fdca3f2023-03-20 17:47:07 +0100264 supervisor.Signal(ctx, supervisor.SignalHealthy)
265 <-ctx.Done()
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100266 s.c.Network.ConfigureDNS(dns.CancelDirective(clusterDNSDirective))
Serge Bazanski6fdca3f2023-03-20 17:47:07 +0100267 return nil
268}
Serge Bazanski2cfafc92023-03-21 16:42:47 +0100269
270func connectByKubeconfig(kubeconfig []byte) (*kubernetes.Clientset, informers.SharedInformerFactory, error) {
271 rawClientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
272 if err != nil {
273 return nil, nil, fmt.Errorf("could not generate kubernetes client config: %w", err)
274 }
275 clientConfig, err := rawClientConfig.ClientConfig()
276 clientSet, err := kubernetes.NewForConfig(clientConfig)
277 if err != nil {
278 return nil, nil, fmt.Errorf("could not generate kubernetes client: %w", err)
279 }
280 informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
281 return clientSet, informerFactory, nil
282}