m/n/kubernetes: factor out generating KPKI, support multiple endpoints in Kubeconfig

Change-Id: I0e648c24ffa134314a03715575d1af1b925fd450
Reviewed-on: https://review.monogon.dev/c/monogon/+/1377
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index b501475..f8e8438 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -5,7 +5,6 @@
 	"fmt"
 	"net"
 
-	"source.monogon.dev/metropolis/node/core/consensus"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/node/kubernetes"
@@ -152,19 +151,13 @@
 
 			break
 		}
-		supervisor.Logger(ctx).Infof("Waiting for local consensus (%+v)...")
-		cstW := d.membership.localConsensus.Watch()
-		defer cstW.Close()
-		cst, err := cstW.Get(ctx, consensus.FilterRunning)
+		supervisor.Logger(ctx).Infof("Waiting for local consensus...")
+		pki, err := kpki.FromLocalConsensus(ctx, d.membership.localConsensus)
 		if err != nil {
-			return fmt.Errorf("waiting for local consensus: %w", err)
+			return fmt.Errorf("getting kubernetes PKI client: %w", err)
 		}
 
 		supervisor.Logger(ctx).Infof("Got data, starting Kubernetes...")
-		kkv, err := cst.KubernetesClient()
-		if err != nil {
-			return fmt.Errorf("retrieving kubernetes client: %w", err)
-		}
 
 		// Start containerd.
 		containerdSvc := &containerd.Service{
@@ -174,8 +167,6 @@
 			return fmt.Errorf("failed to start containerd service: %w", err)
 		}
 
-		// Start building Kubernetes service...
-		pki := kpki.New(kkv, clusterDomain)
 
 		controller := kubernetes.NewController(kubernetes.ConfigController{
 			Node:           &d.membership.credentials.Node,
@@ -240,6 +231,14 @@
 		}
 		ccli := ipb.NewCuratorClient(cur)
 
+		// Start containerd.
+		containerdSvc := &containerd.Service{
+			EphemeralVolume: &s.storageRoot.Ephemeral.Containerd,
+		}
+		if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
+			return fmt.Errorf("failed to start containerd service: %w", err)
+		}
+
 		worker := kubernetes.NewWorker(kubernetes.ConfigWorker{
 			ServiceIPRange: serviceIPRange,
 			ClusterNet:     clusterIPRange,
diff --git a/metropolis/node/kubernetes/controller-manager.go b/metropolis/node/kubernetes/controller-manager.go
index d26a2af..a6c424b 100644
--- a/metropolis/node/kubernetes/controller-manager.go
+++ b/metropolis/node/kubernetes/controller-manager.go
@@ -53,7 +53,7 @@
 	if err != nil {
 		return nil, fmt.Errorf("failed to get serviceaccount privkey: %w", err)
 	}
-	config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.ControllerManagerClient)
+	config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.ControllerManagerClient, pki.KubernetesAPIEndpointForController)
 	if err != nil {
 		return nil, fmt.Errorf("failed to get controller-manager kubeconfig: %w", err)
 	}
diff --git a/metropolis/node/kubernetes/kubelet.go b/metropolis/node/kubernetes/kubelet.go
index c643e91..7a0d362 100644
--- a/metropolis/node/kubernetes/kubelet.go
+++ b/metropolis/node/kubernetes/kubelet.go
@@ -54,7 +54,7 @@
 		return fmt.Errorf("when generating local kubelet credentials: %w", err)
 	}
 
-	clientKubeconfig, err := pki.Kubeconfig(ctx, s.KPKI.KV, client)
+	clientKubeconfig, err := pki.Kubeconfig(ctx, s.KPKI.KV, client, pki.KubernetesAPIEndpointForController)
 	if err != nil {
 		return fmt.Errorf("when generating kubeconfig: %w", err)
 	}
diff --git a/metropolis/node/kubernetes/pki/BUILD.bazel b/metropolis/node/kubernetes/pki/BUILD.bazel
index 7bcc531..1471fd3 100644
--- a/metropolis/node/kubernetes/pki/BUILD.bazel
+++ b/metropolis/node/kubernetes/pki/BUILD.bazel
@@ -7,6 +7,7 @@
     visibility = ["//metropolis/node:__subpackages__"],
     deps = [
         "//metropolis/node",
+        "//metropolis/node/core/consensus",
         "//metropolis/pkg/pki",
         "@io_etcd_go_etcd_client_v3//:client",
         "@io_k8s_client_go//tools/clientcmd",
diff --git a/metropolis/node/kubernetes/pki/kubernetes.go b/metropolis/node/kubernetes/pki/kubernetes.go
index 24c9c52..dce019b 100644
--- a/metropolis/node/kubernetes/pki/kubernetes.go
+++ b/metropolis/node/kubernetes/pki/kubernetes.go
@@ -24,10 +24,13 @@
 package pki
 
 import (
+	"bytes"
 	"context"
+	"crypto/ed25519"
 	"crypto/rand"
 	"crypto/rsa"
 	"crypto/x509"
+	"encoding/hex"
 	"encoding/pem"
 	"fmt"
 	"net"
@@ -37,6 +40,7 @@
 	configapi "k8s.io/client-go/tools/clientcmd/api"
 
 	common "source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/consensus"
 	opki "source.monogon.dev/metropolis/pkg/pki"
 )
 
@@ -129,6 +133,10 @@
 			"kubernetes.default.svc",
 			"kubernetes.default.svc." + clusterDomain,
 			"localhost",
+			// Domain used to access the apiserver by Kubernetes components themselves,
+			// without going over Kubernetes networking. This domain only lives as a set of
+			// entries in local hostsfiles.
+			"metropolis-kube-apiserver",
 		},
 		// TODO(q3k): add service network internal apiserver address
 		[]net.IP{{10, 0, 255, 1}, {127, 0, 0, 1}},
@@ -153,6 +161,31 @@
 	return &pki
 }
 
+// FromLocalConsensus returns a PKI stored on the given local consensus instance,
+// in the correct etcd namespace.
+func FromLocalConsensus(ctx context.Context, svc consensus.ServiceHandle) (*PKI, error) {
+	// TODO(q3k): make this configurable
+	clusterDomain := "cluster.local"
+
+	cstW := svc.Watch()
+	defer cstW.Close()
+	cst, err := cstW.Get(ctx, consensus.FilterRunning)
+	if err != nil {
+		return nil, fmt.Errorf("waiting for local consensus: %w", err)
+	}
+	kkv, err := cst.KubernetesClient()
+	if err != nil {
+		return nil, fmt.Errorf("retrieving kubernetes client: %w", err)
+	}
+	pki := New(kkv, clusterDomain)
+	// Run EnsureAll ASAP to prevent race conditions between two kpki instances
+	// attempting to initialize the PKI data at the same time.
+	if err := pki.EnsureAll(ctx); err != nil {
+		return nil, fmt.Errorf("initial ensure failed: %w", err)
+	}
+	return pki, nil
+}
+
 // EnsureAll ensures that all static certificates (and the serviceaccount key)
 // are present on etcd.
 func (k *PKI) EnsureAll(ctx context.Context) error {
@@ -171,12 +204,12 @@
 
 // Kubeconfig generates a kubeconfig blob for a given certificate name. The
 // same lifetime semantics as in .Certificate apply.
-func (k *PKI) Kubeconfig(ctx context.Context, name KubeCertificateName) ([]byte, error) {
+func (k *PKI) Kubeconfig(ctx context.Context, name KubeCertificateName, endpoint KubernetesAPIEndpoint) ([]byte, error) {
 	c, ok := k.Certificates[name]
 	if !ok {
 		return nil, fmt.Errorf("no certificate %q", name)
 	}
-	return Kubeconfig(ctx, k.KV, c)
+	return Kubeconfig(ctx, k.KV, c, endpoint)
 }
 
 // Certificate retrieves an x509 DER-encoded (but not PEM-wrapped) key and
@@ -197,31 +230,47 @@
 	return
 }
 
-// Kubeconfig generates a kubeconfig blob for this certificate. The same
-// lifetime semantics as in .Ensure apply.
-func Kubeconfig(ctx context.Context, kv clientv3.KV, c *opki.Certificate) ([]byte, error) {
+// A KubernetesAPIEndpoint describes where a Kubeconfig will make a client
+// attempt to connect to reach the Kubernetes apiservers(s).
+type KubernetesAPIEndpoint string
 
-	cert, err := c.Ensure(ctx, kv)
-	if err != nil {
-		return nil, fmt.Errorf("could not ensure certificate exists: %w", err)
+var (
+	// KubernetesAPIEndpointForWorker points Kubernetes workers to connect to a
+	// locally-running apiproxy, which in turn loadbalances the connection to
+	// controller nodes running in the cluster.
+	KubernetesAPIEndpointForWorker = KubernetesAPIEndpoint(fmt.Sprintf("https://127.0.0.1:%d", common.KubernetesWorkerLocalAPIPort))
+	// KubernetesAPIEndpointForController points Kubernetes controllers to connect to
+	// the locally-running API server.
+	KubernetesAPIEndpointForController = KubernetesAPIEndpoint(fmt.Sprintf("https://127.0.0.1:%d", common.KubernetesAPIPort))
+)
+
+// KubeconfigRaw emits a Kubeconfig for a given set of certificates, private key,
+// and a KubernetesAPIEndpoint. This function does not rely on the rest of the
+// (K)PKI infrastructure.
+func KubeconfigRaw(cacert, cert []byte, priv ed25519.PrivateKey, endpoint KubernetesAPIEndpoint) ([]byte, error) {
+	caX, _ := x509.ParseCertificate(cacert)
+	certX, _ := x509.ParseCertificate(cert)
+	if err := certX.CheckSignatureFrom(caX); err != nil {
+		return nil, fmt.Errorf("given ca does not sign given cert")
 	}
-	key, err := c.PrivateKeyX509()
+	pub1 := priv.Public().(ed25519.PublicKey)
+	pub2 := certX.PublicKey.(ed25519.PublicKey)
+	if !bytes.Equal(pub1, pub2) {
+		return nil, fmt.Errorf("given private key does not match given cert (cert: %s, key: %s)", hex.EncodeToString(pub2), hex.EncodeToString(pub1))
+	}
+
+	key, err := x509.MarshalPKCS8PrivateKey(priv)
 	if err != nil {
-		return nil, fmt.Errorf("could not get certificate's private key: %w", err)
+		return nil, fmt.Errorf("could not marshal private key: %w", err)
 	}
 
 	kubeconfig := configapi.NewConfig()
 
 	cluster := configapi.NewCluster()
-	cluster.Server = fmt.Sprintf("https://127.0.0.1:%d", common.KubernetesAPIPort)
 
-	ca, err := c.Issuer.CACertificate(ctx, kv)
-	if err != nil {
-		return nil, fmt.Errorf("could not get CA certificate: %w", err)
-	}
-	if ca != nil {
-		cluster.CertificateAuthorityData = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: ca})
-	}
+	cluster.Server = string(endpoint)
+
+	cluster.CertificateAuthorityData = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cacert})
 	kubeconfig.Clusters["default"] = cluster
 
 	authInfo := configapi.NewAuthInfo()
@@ -238,6 +287,24 @@
 	return clientcmd.Write(*kubeconfig)
 }
 
+// Kubeconfig generates a kubeconfig blob for this certificate. The same
+// lifetime semantics as in .Ensure apply.
+func Kubeconfig(ctx context.Context, kv clientv3.KV, c *opki.Certificate, endpoint KubernetesAPIEndpoint) ([]byte, error) {
+	cert, err := c.Ensure(ctx, kv)
+	if err != nil {
+		return nil, fmt.Errorf("could not ensure certificate exists: %w", err)
+	}
+	if len(c.PrivateKey) != ed25519.PrivateKeySize {
+		return nil, fmt.Errorf("certificate has no associated private key")
+	}
+	ca, err := c.Issuer.CACertificate(ctx, kv)
+	if err != nil {
+		return nil, fmt.Errorf("could not get CA certificate: %w", err)
+	}
+
+	return KubeconfigRaw(ca, cert, c.PrivateKey, endpoint)
+}
+
 // ServiceAccountKey retrieves (and possibly generates and stores on etcd) the
 // Kubernetes service account key. The returned data is ready to be used by
 // Kubernetes components (in PKIX form).
diff --git a/metropolis/node/kubernetes/scheduler.go b/metropolis/node/kubernetes/scheduler.go
index 6007142..5537dcc 100644
--- a/metropolis/node/kubernetes/scheduler.go
+++ b/metropolis/node/kubernetes/scheduler.go
@@ -40,7 +40,7 @@
 	if err != nil {
 		return nil, fmt.Errorf("failed to get scheduler serving certificate: %w", err)
 	}
-	config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.SchedulerClient)
+	config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.SchedulerClient, pki.KubernetesAPIEndpointForController)
 	if err != nil {
 		return nil, fmt.Errorf("failed to get scheduler kubeconfig: %w", err)
 	}
diff --git a/metropolis/node/kubernetes/service_controller.go b/metropolis/node/kubernetes/service_controller.go
index 5d9580f..da92b7f 100644
--- a/metropolis/node/kubernetes/service_controller.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -76,7 +76,7 @@
 		return fmt.Errorf("could not generate scheduler pki config: %w", err)
 	}
 
-	masterKubeconfig, err := s.c.KPKI.Kubeconfig(ctx, pki.Master)
+	masterKubeconfig, err := s.c.KPKI.Kubeconfig(ctx, pki.Master, pki.KubernetesAPIEndpointForController)
 	if err != nil {
 		return fmt.Errorf("could not generate master kubeconfig: %w", err)
 	}
@@ -243,7 +243,7 @@
 	if err != nil {
 		return nil, status.Errorf(codes.Unavailable, "Failed to get volatile client certificate: %v", err)
 	}
-	kubeconfig, err := pki.Kubeconfig(ctx, s.c.KPKI.KV, client)
+	kubeconfig, err := pki.Kubeconfig(ctx, s.c.KPKI.KV, client, pki.KubernetesAPIEndpointForController)
 	if err != nil {
 		return nil, status.Errorf(codes.Unavailable, "Failed to generate kubeconfig: %v", err)
 	}