blob: 5ddc32d2bb1a90bac355aa808b0a3c90cfa65265 [file] [log] [blame]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01001package kubernetes
2
3import (
4 "context"
5 "fmt"
6 "net"
7
8 "source.monogon.dev/go/net/tinylb"
9 "source.monogon.dev/metropolis/node"
10 "source.monogon.dev/metropolis/node/core/localstorage"
11 "source.monogon.dev/metropolis/node/core/network"
12 "source.monogon.dev/metropolis/pkg/event/memory"
13 "source.monogon.dev/metropolis/pkg/supervisor"
14
15 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
16)
17
18type ConfigWorker struct {
19 ServiceIPRange net.IPNet
20 ClusterNet net.IPNet
21 ClusterDomain string
22
23 Root *localstorage.Root
24 Network *network.Service
25 NodeID string
26 CuratorClient ipb.CuratorClient
27}
28
29type Worker struct {
30 c ConfigWorker
31}
32
33func NewWorker(c ConfigWorker) *Worker {
34 s := &Worker{
35 c: c,
36 }
37 return s
38}
39
40func (s *Worker) Run(ctx context.Context) error {
41 // Run apiproxy, which load-balances connections from worker components to this
42 // cluster's api servers. This is necessary as we want to round-robin across all
43 // available apiservers, and Kubernetes components do not implement client-side
44 // load-balancing.
45 err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
46 lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort))
47 if err != nil {
48 return fmt.Errorf("failed to listen: %w", err)
49 }
50 defer lis.Close()
51
52 v := memory.Value[tinylb.BackendSet]{}
53 srv := tinylb.Server{
54 Provider: &v,
55 Listener: lis,
56 }
57 err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
58 return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
59 })
60 if err != nil {
61 return err
62 }
63
64 supervisor.Logger(ctx).Infof("Starting proxy...")
65 return srv.Run(ctx)
66 })
67 if err != nil {
68 return err
69 }
70 supervisor.Signal(ctx, supervisor.SignalHealthy)
71 <-ctx.Done()
72 return nil
73}