m/n/kubernetes: start splitting, run apiproxy
This begins the process to split the Kubernetes service into a
controller and a worker service.
First, we rename the existing service to a Controller, create a Worker
service, and make the Worker service run our new tinylb-based apiserver
loadbalancer.
We also make the roleserver aware of this change by making it spawn both
the controller and worker services according to roles.
We will move services to the Worker in follow up change requests.
Change-Id: I76e98baa0603ad5df30b5892dd69154b895b35fa
Reviewed-on: https://review.monogon.dev/c/monogon/+/1374
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/debug_service_enabled.go b/metropolis/node/core/debug_service_enabled.go
index f12085a..fc77edb 100644
--- a/metropolis/node/core/debug_service_enabled.go
+++ b/metropolis/node/core/debug_service_enabled.go
@@ -89,10 +89,10 @@
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not get kubernetes status: %v", err)
}
- if v.Svc == nil {
+ if v.Controller == nil {
continue
}
- return v.Svc.GetDebugKubeconfig(ctx, req)
+ return v.Controller.GetDebugKubeconfig(ctx, req)
}
}
diff --git a/metropolis/node/core/roleserve/value_kubernetes.go b/metropolis/node/core/roleserve/value_kubernetes.go
index 88bdfeb..68580e7 100644
--- a/metropolis/node/core/roleserve/value_kubernetes.go
+++ b/metropolis/node/core/roleserve/value_kubernetes.go
@@ -8,5 +8,5 @@
// Kubernetes instance. It allows external services to access the Kubernetes
// Service whenever available (ie. enabled and started by the Role Server).
type KubernetesStatus struct {
- Svc *kubernetes.Service
+ Controller *kubernetes.Controller
}
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index 8e1030c..b501475 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -14,6 +14,8 @@
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -44,7 +46,17 @@
// changed informs the Kubernetes launcher whether two different
// kubernetesStartups differ to the point where a restart of Kubernetes should
// happen.
-func (k *kubernetesStartup) changed(o *kubernetesStartup) bool {
+func (k *kubernetesStartup) workerChanged(o *kubernetesStartup) bool {
+ hasKubernetesA := k.roles.KubernetesWorker != nil
+ hasKubernetesB := o.roles.KubernetesWorker != nil
+ if hasKubernetesA != hasKubernetesB {
+ return true
+ }
+
+ return false
+}
+
+func (k *kubernetesStartup) controllerChanged(o *kubernetesStartup) bool {
hasKubernetesA := k.roles.KubernetesController != nil
hasKubernetesB := o.roles.KubernetesController != nil
if hasKubernetesA != hasKubernetesB {
@@ -98,7 +110,23 @@
},
})
- supervisor.Run(ctx, "run", func(ctx context.Context) error {
+ // TODO(q3k): make these configurable.
+ clusterIPRange := net.IPNet{
+ IP: net.IP{10, 0, 0, 0},
+ // That's a /16.
+ Mask: net.IPMask{0xff, 0xff, 0x00, 0x00},
+ }
+ serviceIPRange := net.IPNet{
+ IP: net.IP{10, 0, 255, 1},
+ // That's a /24.
+ Mask: net.IPMask{0xff, 0xff, 0xff, 0x00},
+ }
+
+ // TODO(q3k): remove this once the controller also uses curator-emitted PKI.
+ clusterDomain := "cluster.local"
+
+ // TODO(q3k): move worker services to worker.
+ supervisor.Run(ctx, "controller", func(ctx context.Context) error {
w := startupV.Watch()
defer w.Close()
supervisor.Logger(ctx).Infof("Waiting for startup data...")
@@ -114,7 +142,7 @@
}
supervisor.Logger(ctx).Infof("Got new startup data.")
if d.roles.KubernetesController == nil {
- supervisor.Logger(ctx).Infof("No Kubernetes role, not starting.")
+ supervisor.Logger(ctx).Infof("No Kubernetes controller role, not starting.")
continue
}
if d.membership.localConsensus == nil {
@@ -146,39 +174,26 @@
return fmt.Errorf("failed to start containerd service: %w", err)
}
- // TODO(lorenz): Align this with the global cluster domain once it
- // exists.
- clusterDomain := "cluster.local"
-
// Start building Kubernetes service...
- pki := kpki.New(supervisor.Logger(ctx), kkv, clusterDomain)
+ pki := kpki.New(kkv, clusterDomain)
- kubeSvc := kubernetes.New(kubernetes.Config{
- Node: &d.membership.credentials.Node,
- // TODO(q3k): make this configurable.
- ServiceIPRange: net.IPNet{
- IP: net.IP{10, 0, 255, 1},
- // That's a /24.
- Mask: net.IPMask{0xff, 0xff, 0xff, 0x00},
- },
- ClusterNet: net.IPNet{
- IP: net.IP{10, 0, 0, 0},
- // That's a /16.
- Mask: net.IPMask{0xff, 0xff, 0x00, 0x00},
- },
- ClusterDomain: clusterDomain,
- KPKI: pki,
- Root: s.storageRoot,
- Network: s.network,
+ controller := kubernetes.NewController(kubernetes.ConfigController{
+ Node: &d.membership.credentials.Node,
+ ServiceIPRange: serviceIPRange,
+ ClusterNet: clusterIPRange,
+ ClusterDomain: clusterDomain,
+ KPKI: pki,
+ Root: s.storageRoot,
+ Network: s.network,
})
// Start Kubernetes.
- if err := supervisor.Run(ctx, "kubernetes", kubeSvc.Run); err != nil {
- return fmt.Errorf("failed to start kubernetes service: %w", err)
+ if err := supervisor.Run(ctx, "run", controller.Run); err != nil {
+ return fmt.Errorf("failed to start kubernetes controller service: %w", err)
}
// Let downstream know that Kubernetes is running.
s.kubernetesStatus.Set(&KubernetesStatus{
- Svc: kubeSvc,
+ Controller: controller,
})
supervisor.Signal(ctx, supervisor.SignalHealthy)
@@ -190,7 +205,66 @@
if err != nil {
return err
}
- if nc.changed(d) {
+ if nc.controllerChanged(d) {
+ supervisor.Logger(ctx).Errorf("watcher got new config, restarting")
+ return fmt.Errorf("restarting")
+ }
+ }
+ })
+
+ supervisor.Run(ctx, "worker", func(ctx context.Context) error {
+ w := startupV.Watch()
+ defer w.Close()
+ supervisor.Logger(ctx).Infof("Waiting for startup data...")
+
+ // Acquire kubernetesStartup, waiting for it to contain local consensus and a
+ // KubernetesWorker local role.
+ var d *kubernetesStartup
+ for {
+ var err error
+ d, err = w.Get(ctx)
+ if err != nil {
+ return err
+ }
+ supervisor.Logger(ctx).Infof("Got new startup data.")
+ if d.roles.KubernetesWorker == nil {
+ supervisor.Logger(ctx).Infof("No Kubernetes worker role, not starting.")
+ continue
+ }
+ break
+ }
+
+ cur, err := d.membership.DialCurator()
+ if err != nil {
+ return fmt.Errorf("could not dial curator: %w", err)
+ }
+ ccli := ipb.NewCuratorClient(cur)
+
+ worker := kubernetes.NewWorker(kubernetes.ConfigWorker{
+ ServiceIPRange: serviceIPRange,
+ ClusterNet: clusterIPRange,
+ ClusterDomain: clusterDomain,
+
+ Root: s.storageRoot,
+ Network: s.network,
+ NodeID: d.membership.NodeID(),
+ CuratorClient: ccli,
+ })
+ // Start Kubernetes.
+ if err := supervisor.Run(ctx, "run", worker.Run); err != nil {
+ return fmt.Errorf("failed to start kubernetes worker service: %w", err)
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ // Restart everything if we get a significantly different config (ie., a config
+ // whose change would/should either turn up or tear down Kubernetes).
+ for {
+ nc, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+ if nc.workerChanged(d) {
supervisor.Logger(ctx).Errorf("watcher got new config, restarting")
return fmt.Errorf("restarting")
}
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index c92ca06..a05d4ae 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -3,18 +3,22 @@
go_library(
name = "kubernetes",
srcs = [
+ "apiproxy.go",
"apiserver.go",
"controller-manager.go",
"csi.go",
"kubelet.go",
"provisioner.go",
"scheduler.go",
- "service.go",
+ "service_controller.go",
+ "service_worker.go",
],
importpath = "source.monogon.dev/metropolis/node/kubernetes",
visibility = ["//metropolis/node:__subpackages__"],
deps = [
+ "//go/net/tinylb",
"//metropolis/node",
+ "//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/identity",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/network",
@@ -25,6 +29,7 @@
"//metropolis/node/kubernetes/pki",
"//metropolis/node/kubernetes/plugins/kvmdevice",
"//metropolis/node/kubernetes/reconciler",
+ "//metropolis/pkg/event/memory",
"//metropolis/pkg/fileargs",
"//metropolis/pkg/fsquota",
"//metropolis/pkg/logtree",
diff --git a/metropolis/node/kubernetes/apiproxy.go b/metropolis/node/kubernetes/apiproxy.go
new file mode 100644
index 0000000..9f9c851
--- /dev/null
+++ b/metropolis/node/kubernetes/apiproxy.go
@@ -0,0 +1,52 @@
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+
+ "source.monogon.dev/go/net/tinylb"
+ "source.monogon.dev/metropolis/node"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+)
+
+// updateLoadBalancerAPIServers provides a tinylb BackendSet memory value with
+// the currently known nodes running a Kubernetes apiserver as retrieved from the
+// given curator client.
+func updateLoadbalancerAPIServers(ctx context.Context, val *memory.Value[tinylb.BackendSet], cur ipb.CuratorClient) error {
+ w, err := cur.Watch(ctx, &ipb.WatchRequest{
+ Kind: &ipb.WatchRequest_NodesInCluster_{
+ NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("watch failed: %w", err)
+ }
+ defer w.CloseSend()
+
+ set := &tinylb.BackendSet{}
+ val.Set(set.Clone())
+ for {
+ ev, err := w.Recv()
+ if err != nil {
+ return fmt.Errorf("receive failed: %w", err)
+ }
+
+ for _, n := range ev.Nodes {
+ if n.Status == nil || n.Status.ExternalAddress == "" {
+ set.Delete(n.Id)
+ continue
+ }
+ if n.Roles.KubernetesController == nil {
+ set.Delete(n.Id)
+ continue
+ }
+ set.Insert(n.Id, &tinylb.SimpleTCPBackend{Remote: net.JoinHostPort(n.Status.ExternalAddress, node.KubernetesAPIPort.PortString())})
+ }
+ for _, t := range ev.NodeTombstones {
+ set.Delete(t.NodeId)
+ }
+ val.Set(set.Clone())
+ }
+}
diff --git a/metropolis/node/kubernetes/pki/BUILD.bazel b/metropolis/node/kubernetes/pki/BUILD.bazel
index f2e4e3c..7bcc531 100644
--- a/metropolis/node/kubernetes/pki/BUILD.bazel
+++ b/metropolis/node/kubernetes/pki/BUILD.bazel
@@ -7,7 +7,6 @@
visibility = ["//metropolis/node:__subpackages__"],
deps = [
"//metropolis/node",
- "//metropolis/pkg/logtree",
"//metropolis/pkg/pki",
"@io_etcd_go_etcd_client_v3//:client",
"@io_k8s_client_go//tools/clientcmd",
diff --git a/metropolis/node/kubernetes/pki/kubernetes.go b/metropolis/node/kubernetes/pki/kubernetes.go
index ef046a2..24c9c52 100644
--- a/metropolis/node/kubernetes/pki/kubernetes.go
+++ b/metropolis/node/kubernetes/pki/kubernetes.go
@@ -37,7 +37,6 @@
configapi "k8s.io/client-go/tools/clientcmd/api"
common "source.monogon.dev/metropolis/node"
- "source.monogon.dev/metropolis/pkg/logtree"
opki "source.monogon.dev/metropolis/pkg/pki"
)
@@ -95,15 +94,13 @@
// generate Kubeconfigs from.
type PKI struct {
namespace opki.Namespace
- logger logtree.LeveledLogger
KV clientv3.KV
Certificates map[KubeCertificateName]*opki.Certificate
}
-func New(l logtree.LeveledLogger, kv clientv3.KV, clusterDomain string) *PKI {
+func New(kv clientv3.KV, clusterDomain string) *PKI {
pki := PKI{
namespace: opki.Namespaced(etcdPrefix),
- logger: l,
KV: kv,
Certificates: make(map[KubeCertificateName]*opki.Certificate),
}
diff --git a/metropolis/node/kubernetes/service.go b/metropolis/node/kubernetes/service_controller.go
similarity index 95%
rename from metropolis/node/kubernetes/service.go
rename to metropolis/node/kubernetes/service_controller.go
index e989507..d17b330 100644
--- a/metropolis/node/kubernetes/service.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -39,10 +39,11 @@
"source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
"source.monogon.dev/metropolis/node/kubernetes/reconciler"
"source.monogon.dev/metropolis/pkg/supervisor"
+
apb "source.monogon.dev/metropolis/proto/api"
)
-type Config struct {
+type ConfigController struct {
ServiceIPRange net.IPNet
ClusterNet net.IPNet
ClusterDomain string
@@ -53,18 +54,18 @@
Node *identity.Node
}
-type Service struct {
- c Config
+type Controller struct {
+ c ConfigController
}
-func New(c Config) *Service {
- s := &Service{
+func NewController(c ConfigController) *Controller {
+ s := &Controller{
c: c,
}
return s
}
-func (s *Service) Run(ctx context.Context) error {
+func (s *Controller) Run(ctx context.Context) error {
controllerManagerConfig, err := getPKIControllerManagerConfig(ctx, s.c.KPKI)
if err != nil {
return fmt.Errorf("could not generate controller manager pki config: %w", err)
@@ -236,7 +237,7 @@
// GetDebugKubeconfig issues a kubeconfig for an arbitrary given identity.
// Useful for debugging and testing.
-func (s *Service) GetDebugKubeconfig(ctx context.Context, request *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+func (s *Controller) GetDebugKubeconfig(ctx context.Context, request *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
client, err := s.c.KPKI.VolatileClient(ctx, request.Id, request.Groups)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "Failed to get volatile client certificate: %v", err)
diff --git a/metropolis/node/kubernetes/service_worker.go b/metropolis/node/kubernetes/service_worker.go
new file mode 100644
index 0000000..5ddc32d
--- /dev/null
+++ b/metropolis/node/kubernetes/service_worker.go
@@ -0,0 +1,73 @@
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+
+ "source.monogon.dev/go/net/tinylb"
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+type ConfigWorker struct {
+ ServiceIPRange net.IPNet
+ ClusterNet net.IPNet
+ ClusterDomain string
+
+ Root *localstorage.Root
+ Network *network.Service
+ NodeID string
+ CuratorClient ipb.CuratorClient
+}
+
+type Worker struct {
+ c ConfigWorker
+}
+
+func NewWorker(c ConfigWorker) *Worker {
+ s := &Worker{
+ c: c,
+ }
+ return s
+}
+
+func (s *Worker) Run(ctx context.Context) error {
+ // Run apiproxy, which load-balances connections from worker components to this
+ // cluster's api servers. This is necessary as we want to round-robin across all
+ // available apiservers, and Kubernetes components do not implement client-side
+ // load-balancing.
+ err := supervisor.Run(ctx, "apiproxy", func(ctx context.Context) error {
+ lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", node.KubernetesWorkerLocalAPIPort))
+ if err != nil {
+ return fmt.Errorf("failed to listen: %w", err)
+ }
+ defer lis.Close()
+
+ v := memory.Value[tinylb.BackendSet]{}
+ srv := tinylb.Server{
+ Provider: &v,
+ Listener: lis,
+ }
+ err = supervisor.Run(ctx, "updater", func(ctx context.Context) error {
+ return updateLoadbalancerAPIServers(ctx, &v, s.c.CuratorClient)
+ })
+ if err != nil {
+ return err
+ }
+
+ supervisor.Logger(ctx).Infof("Starting proxy...")
+ return srv.Run(ctx)
+ })
+ if err != nil {
+ return err
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ return nil
+}
diff --git a/metropolis/node/ports.go b/metropolis/node/ports.go
index c90e7dc..f4ffa0b 100644
--- a/metropolis/node/ports.go
+++ b/metropolis/node/ports.go
@@ -40,6 +40,10 @@
// KubernetesAPIWrappedPort is the TCP port on which the Metropolis
// authenticating proxy for the Kubernetes API is exposed.
KubernetesAPIWrappedPort Port = 6444
+ // KubernetesWorkerLocalAPIPort is the TCP port on which Kubernetes worker nodes
+ // run a loadbalancer to access the cluster's API servers before cluster
+ // networking is available. This port is only bound to 127.0.0.1.
+ KubernetesWorkerLocalAPIPort Port = 6445
// DebuggerPort is the port on which the delve debugger runs (on debug
// builds only). Not to be confused with DebugServicePort.
DebuggerPort Port = 2345
@@ -57,6 +61,8 @@
return "wireguard"
case KubernetesAPIPort:
return "kubernetes-api"
+ case KubernetesWorkerLocalAPIPort:
+ return "kubernetes-worker-local-api"
case KubernetesAPIWrappedPort:
return "kubernetes-api-wrapped"
case DebuggerPort: