m/n/kubernetes: start splitting, run apiproxy
This begins the process to split the Kubernetes service into a
controller and a worker service.
First, we rename the existing service to a Controller, create a Worker
service, and make the Worker service run our new tinylb-based apiserver
loadbalancer.
We also make the roleserver aware of this change by making it spawn both
the controller and worker services according to roles.
We will move services to the Worker in follow up change requests.
Change-Id: I76e98baa0603ad5df30b5892dd69154b895b35fa
Reviewed-on: https://review.monogon.dev/c/monogon/+/1374
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/kubernetes/service_worker.go b/metropolis/node/kubernetes/service_worker.go
new file mode 100644
index 0000000..5ddc32d
--- /dev/null
+++ b/metropolis/node/kubernetes/service_worker.go
@@ -0,0 +1,73 @@
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+
+ "source.monogon.dev/go/net/tinylb"
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+type ConfigWorker struct {
+ ServiceIPRange net.IPNet
+ ClusterNet net.IPNet
+ ClusterDomain string
+
+ Root *localstorage.Root
+ Network *network.Service
+ NodeID string
+ CuratorClient ipb.CuratorClient
+}
+
+type Worker struct {
+ c ConfigWorker
+}
+
+func NewWorker(c ConfigWorker) *Worker {
+ s := &Worker{
+ c: c,
+ }
+ return s
+}
+
+func (s *Worker) Run(ctx context.Context) error {
+ // Run apiproxy, which load-balances connections from worker components to this
+ // cluster's api servers. This is necessary as we want to round-robin across all
+ // available apiservers, and Kubernetes components do not implement client-side
+ // load-balancing.
+ err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
+ lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort))
+ if err != nil {
+ return fmt.Errorf("failed to listen: %w", err)
+ }
+ defer lis.Close()
+
+ v := memory.Value[tinylb.BackendSet]{}
+ srv := tinylb.Server{
+ Provider: &v,
+ Listener: lis,
+ }
+ err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
+ return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
+ })
+ if err != nil {
+ return err
+ }
+
+ supervisor.Logger(ctx).Infof("Starting proxy...")
+ return srv.Run(ctx)
+ })
+ if err != nil {
+ return err
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ return nil
+}