m/n/kubernetes: start splitting, run apiproxy

This begins the process to split the Kubernetes service into a
controller and a worker service.

First, we rename the existing service to a Controller, create a Worker
service, and make the Worker service run our new tinylb-based apiserver
loadbalancer.

We also make the roleserver aware of this change by making it spawn both
the controller and worker services according to roles.

We will move services to the Worker in follow up change requests.

Change-Id: I76e98baa0603ad5df30b5892dd69154b895b35fa
Reviewed-on: https://review.monogon.dev/c/monogon/+/1374
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index c92ca06..a05d4ae 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -3,18 +3,22 @@
 go_library(
     name = "kubernetes",
     srcs = [
+        "apiproxy.go",
         "apiserver.go",
         "controller-manager.go",
         "csi.go",
         "kubelet.go",
         "provisioner.go",
         "scheduler.go",
-        "service.go",
+        "service_controller.go",
+        "service_worker.go",
     ],
     importpath = "source.monogon.dev/metropolis/node/kubernetes",
     visibility = ["//metropolis/node:__subpackages__"],
     deps = [
+        "//go/net/tinylb",
         "//metropolis/node",
+        "//metropolis/node/core/curator/proto/api",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/localstorage",
         "//metropolis/node/core/network",
@@ -25,6 +29,7 @@
         "//metropolis/node/kubernetes/pki",
         "//metropolis/node/kubernetes/plugins/kvmdevice",
         "//metropolis/node/kubernetes/reconciler",
+        "//metropolis/pkg/event/memory",
         "//metropolis/pkg/fileargs",
         "//metropolis/pkg/fsquota",
         "//metropolis/pkg/logtree",
diff --git a/metropolis/node/kubernetes/apiproxy.go b/metropolis/node/kubernetes/apiproxy.go
new file mode 100644
index 0000000..9f9c851
--- /dev/null
+++ b/metropolis/node/kubernetes/apiproxy.go
@@ -0,0 +1,52 @@
+package kubernetes
+
+import (
+	"context"
+	"fmt"
+	"net"
+
+	"source.monogon.dev/go/net/tinylb"
+	"source.monogon.dev/metropolis/node"
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/pkg/event/memory"
+)
+
+// updateLoadBalancerAPIServers provides a tinylb BackendSet memory value with
+// the currently known nodes running a Kubernetes apiserver as retrieved from the
+// given curator client.
+func updateLoadbalancerAPIServers(ctx context.Context, val *memory.Value[tinylb.BackendSet], cur ipb.CuratorClient) error {
+	w, err := cur.Watch(ctx, &ipb.WatchRequest{
+		Kind: &ipb.WatchRequest_NodesInCluster_{
+			NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
+		},
+	})
+	if err != nil {
+		return fmt.Errorf("watch failed: %w", err)
+	}
+	defer w.CloseSend()
+
+	set := &tinylb.BackendSet{}
+	val.Set(set.Clone())
+	for {
+		ev, err := w.Recv()
+		if err != nil {
+			return fmt.Errorf("receive failed: %w", err)
+		}
+
+		for _, n := range ev.Nodes {
+			if n.Status == nil || n.Status.ExternalAddress == "" {
+				set.Delete(n.Id)
+				continue
+			}
+			if n.Roles.KubernetesController == nil {
+				set.Delete(n.Id)
+				continue
+			}
+			set.Insert(n.Id, &tinylb.SimpleTCPBackend{Remote: net.JoinHostPort(n.Status.ExternalAddress, node.KubernetesAPIPort.PortString())})
+		}
+		for _, t := range ev.NodeTombstones {
+			set.Delete(t.NodeId)
+		}
+		val.Set(set.Clone())
+	}
+}
diff --git a/metropolis/node/kubernetes/pki/BUILD.bazel b/metropolis/node/kubernetes/pki/BUILD.bazel
index f2e4e3c..7bcc531 100644
--- a/metropolis/node/kubernetes/pki/BUILD.bazel
+++ b/metropolis/node/kubernetes/pki/BUILD.bazel
@@ -7,7 +7,6 @@
     visibility = ["//metropolis/node:__subpackages__"],
     deps = [
         "//metropolis/node",
-        "//metropolis/pkg/logtree",
         "//metropolis/pkg/pki",
         "@io_etcd_go_etcd_client_v3//:client",
         "@io_k8s_client_go//tools/clientcmd",
diff --git a/metropolis/node/kubernetes/pki/kubernetes.go b/metropolis/node/kubernetes/pki/kubernetes.go
index ef046a2..24c9c52 100644
--- a/metropolis/node/kubernetes/pki/kubernetes.go
+++ b/metropolis/node/kubernetes/pki/kubernetes.go
@@ -37,7 +37,6 @@
 	configapi "k8s.io/client-go/tools/clientcmd/api"
 
 	common "source.monogon.dev/metropolis/node"
-	"source.monogon.dev/metropolis/pkg/logtree"
 	opki "source.monogon.dev/metropolis/pkg/pki"
 )
 
@@ -95,15 +94,13 @@
 // generate Kubeconfigs from.
 type PKI struct {
 	namespace    opki.Namespace
-	logger       logtree.LeveledLogger
 	KV           clientv3.KV
 	Certificates map[KubeCertificateName]*opki.Certificate
 }
 
-func New(l logtree.LeveledLogger, kv clientv3.KV, clusterDomain string) *PKI {
+func New(kv clientv3.KV, clusterDomain string) *PKI {
 	pki := PKI{
 		namespace:    opki.Namespaced(etcdPrefix),
-		logger:       l,
 		KV:           kv,
 		Certificates: make(map[KubeCertificateName]*opki.Certificate),
 	}
diff --git a/metropolis/node/kubernetes/service.go b/metropolis/node/kubernetes/service_controller.go
similarity index 95%
rename from metropolis/node/kubernetes/service.go
rename to metropolis/node/kubernetes/service_controller.go
index e989507..d17b330 100644
--- a/metropolis/node/kubernetes/service.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -39,10 +39,11 @@
 	"source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
 	"source.monogon.dev/metropolis/node/kubernetes/reconciler"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+
 	apb "source.monogon.dev/metropolis/proto/api"
 )
 
-type Config struct {
+type ConfigController struct {
 	ServiceIPRange net.IPNet
 	ClusterNet     net.IPNet
 	ClusterDomain  string
@@ -53,18 +54,18 @@
 	Node    *identity.Node
 }
 
-type Service struct {
-	c Config
+type Controller struct {
+	c ConfigController
 }
 
-func New(c Config) *Service {
-	s := &Service{
+func NewController(c ConfigController) *Controller {
+	s := &Controller{
 		c: c,
 	}
 	return s
 }
 
-func (s *Service) Run(ctx context.Context) error {
+func (s *Controller) Run(ctx context.Context) error {
 	controllerManagerConfig, err := getPKIControllerManagerConfig(ctx, s.c.KPKI)
 	if err != nil {
 		return fmt.Errorf("could not generate controller manager pki config: %w", err)
@@ -236,7 +237,7 @@
 
 // GetDebugKubeconfig issues a kubeconfig for an arbitrary given identity.
 // Useful for debugging and testing.
-func (s *Service) GetDebugKubeconfig(ctx context.Context, request *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+func (s *Controller) GetDebugKubeconfig(ctx context.Context, request *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
 	client, err := s.c.KPKI.VolatileClient(ctx, request.Id, request.Groups)
 	if err != nil {
 		return nil, status.Errorf(codes.Unavailable, "Failed to get volatile client certificate: %v", err)
diff --git a/metropolis/node/kubernetes/service_worker.go b/metropolis/node/kubernetes/service_worker.go
new file mode 100644
index 0000000..5ddc32d
--- /dev/null
+++ b/metropolis/node/kubernetes/service_worker.go
@@ -0,0 +1,73 @@
+package kubernetes
+
+import (
+	"context"
+	"fmt"
+	"net"
+
+	"source.monogon.dev/go/net/tinylb"
+	"source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/pkg/event/memory"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+type ConfigWorker struct {
+	ServiceIPRange net.IPNet
+	ClusterNet     net.IPNet
+	ClusterDomain  string
+
+	Root          *localstorage.Root
+	Network       *network.Service
+	NodeID        string
+	CuratorClient ipb.CuratorClient
+}
+
+type Worker struct {
+	c ConfigWorker
+}
+
+func NewWorker(c ConfigWorker) *Worker {
+	s := &Worker{
+		c: c,
+	}
+	return s
+}
+
+func (s *Worker) Run(ctx context.Context) error {
+	// Run apiproxy, which load-balances connections from worker components to this
+	// cluster's api servers. This is necessary as we want to round-robin across all
+	// available apiservers, and Kubernetes components do not implement client-side
+	// load-balancing.
+	err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
+		lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort))
+		if err != nil {
+			return fmt.Errorf("failed to listen: %w", err)
+		}
+		defer lis.Close()
+
+		v := memory.Value[tinylb.BackendSet]{}
+		srv := tinylb.Server{
+			Provider: &v,
+			Listener: lis,
+		}
+		err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
+			return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
+		})
+		if err != nil {
+			return err
+		}
+
+		supervisor.Logger(ctx).Infof("Starting proxy...")
+		return srv.Run(ctx)
+	})
+	if err != nil {
+		return err
+	}
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	<-ctx.Done()
+	return nil
+}