blob: 2e6e1908a86dafcaa0180417aa2cab207a9c0e59 [file] [log] [blame]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01001package kubernetes
2
3import (
4 "context"
5 "fmt"
6 "net"
7
8 "source.monogon.dev/go/net/tinylb"
9 "source.monogon.dev/metropolis/node"
Serge Bazanskib565cc62023-03-30 18:43:51 +020010 oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010011 "source.monogon.dev/metropolis/node/core/localstorage"
12 "source.monogon.dev/metropolis/node/core/network"
Serge Bazanskib565cc62023-03-30 18:43:51 +020013 "source.monogon.dev/metropolis/pkg/event"
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010014 "source.monogon.dev/metropolis/pkg/event/memory"
15 "source.monogon.dev/metropolis/pkg/supervisor"
16
17 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
18)
19
20type ConfigWorker struct {
21 ServiceIPRange net.IPNet
22 ClusterNet net.IPNet
23 ClusterDomain string
24
25 Root *localstorage.Root
26 Network *network.Service
27 NodeID string
28 CuratorClient ipb.CuratorClient
Serge Bazanskib565cc62023-03-30 18:43:51 +020029 PodNetwork event.Value[*oclusternet.Prefixes]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010030}
31
32type Worker struct {
33 c ConfigWorker
34}
35
36func NewWorker(c ConfigWorker) *Worker {
37 s := &Worker{
38 c: c,
39 }
40 return s
41}
42
43func (s *Worker) Run(ctx context.Context) error {
44 // Run apiproxy, which load-balances connections from worker components to this
45 // cluster's api servers. This is necessary as we want to round-robin across all
46 // available apiservers, and Kubernetes components do not implement client-side
47 // load-balancing.
48 err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
49 lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort))
50 if err != nil {
51 return fmt.Errorf("failed to listen: %w", err)
52 }
53 defer lis.Close()
54
55 v := memory.Value[tinylb.BackendSet]{}
56 srv := tinylb.Server{
57 Provider: &v,
58 Listener: lis,
59 }
60 err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
61 return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
62 })
63 if err != nil {
64 return err
65 }
66
67 supervisor.Logger(ctx).Infof("Starting proxy...")
68 return srv.Run(ctx)
69 })
70 if err != nil {
71 return err
72 }
Serge Bazanskib565cc62023-03-30 18:43:51 +020073
Serge Bazanski6fdca3f2023-03-20 17:47:07 +010074 supervisor.Signal(ctx, supervisor.SignalHealthy)
75 <-ctx.Done()
76 return nil
77}