m/n/kubernetes: factor out generating KPKI, support multiple endpoints in Kubeconfig
Change-Id: I0e648c24ffa134314a03715575d1af1b925fd450
Reviewed-on: https://review.monogon.dev/c/monogon/+/1377
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index b501475..f8e8438 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -5,7 +5,6 @@
"fmt"
"net"
- "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/kubernetes"
@@ -152,19 +151,13 @@
break
}
- supervisor.Logger(ctx).Infof("Waiting for local consensus (%+v)...")
- cstW := d.membership.localConsensus.Watch()
- defer cstW.Close()
- cst, err := cstW.Get(ctx, consensus.FilterRunning)
+ supervisor.Logger(ctx).Infof("Waiting for local consensus...")
+ pki, err := kpki.FromLocalConsensus(ctx, d.membership.localConsensus)
if err != nil {
- return fmt.Errorf("waiting for local consensus: %w", err)
+ return fmt.Errorf("getting kubernetes PKI client: %w", err)
}
supervisor.Logger(ctx).Infof("Got data, starting Kubernetes...")
- kkv, err := cst.KubernetesClient()
- if err != nil {
- return fmt.Errorf("retrieving kubernetes client: %w", err)
- }
// Start containerd.
containerdSvc := &containerd.Service{
@@ -174,8 +167,6 @@
return fmt.Errorf("failed to start containerd service: %w", err)
}
- // Start building Kubernetes service...
- pki := kpki.New(kkv, clusterDomain)
controller := kubernetes.NewController(kubernetes.ConfigController{
Node: &d.membership.credentials.Node,
@@ -240,6 +231,14 @@
}
ccli := ipb.NewCuratorClient(cur)
+ // Start containerd.
+ containerdSvc := &containerd.Service{
+ EphemeralVolume: &s.storageRoot.Ephemeral.Containerd,
+ }
+ if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
+ return fmt.Errorf("failed to start containerd service: %w", err)
+ }
+
worker := kubernetes.NewWorker(kubernetes.ConfigWorker{
ServiceIPRange: serviceIPRange,
ClusterNet: clusterIPRange,
diff --git a/metropolis/node/kubernetes/controller-manager.go b/metropolis/node/kubernetes/controller-manager.go
index d26a2af..a6c424b 100644
--- a/metropolis/node/kubernetes/controller-manager.go
+++ b/metropolis/node/kubernetes/controller-manager.go
@@ -53,7 +53,7 @@
if err != nil {
return nil, fmt.Errorf("failed to get serviceaccount privkey: %w", err)
}
- config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.ControllerManagerClient)
+ config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.ControllerManagerClient, pki.KubernetesAPIEndpointForController)
if err != nil {
return nil, fmt.Errorf("failed to get controller-manager kubeconfig: %w", err)
}
diff --git a/metropolis/node/kubernetes/kubelet.go b/metropolis/node/kubernetes/kubelet.go
index c643e91..7a0d362 100644
--- a/metropolis/node/kubernetes/kubelet.go
+++ b/metropolis/node/kubernetes/kubelet.go
@@ -54,7 +54,7 @@
return fmt.Errorf("when generating local kubelet credentials: %w", err)
}
- clientKubeconfig, err := pki.Kubeconfig(ctx, s.KPKI.KV, client)
+ clientKubeconfig, err := pki.Kubeconfig(ctx, s.KPKI.KV, client, pki.KubernetesAPIEndpointForController)
if err != nil {
return fmt.Errorf("when generating kubeconfig: %w", err)
}
diff --git a/metropolis/node/kubernetes/pki/BUILD.bazel b/metropolis/node/kubernetes/pki/BUILD.bazel
index 7bcc531..1471fd3 100644
--- a/metropolis/node/kubernetes/pki/BUILD.bazel
+++ b/metropolis/node/kubernetes/pki/BUILD.bazel
@@ -7,6 +7,7 @@
visibility = ["//metropolis/node:__subpackages__"],
deps = [
"//metropolis/node",
+ "//metropolis/node/core/consensus",
"//metropolis/pkg/pki",
"@io_etcd_go_etcd_client_v3//:client",
"@io_k8s_client_go//tools/clientcmd",
diff --git a/metropolis/node/kubernetes/pki/kubernetes.go b/metropolis/node/kubernetes/pki/kubernetes.go
index 24c9c52..dce019b 100644
--- a/metropolis/node/kubernetes/pki/kubernetes.go
+++ b/metropolis/node/kubernetes/pki/kubernetes.go
@@ -24,10 +24,13 @@
package pki
import (
+ "bytes"
"context"
+ "crypto/ed25519"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
+ "encoding/hex"
"encoding/pem"
"fmt"
"net"
@@ -37,6 +40,7 @@
configapi "k8s.io/client-go/tools/clientcmd/api"
common "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/consensus"
opki "source.monogon.dev/metropolis/pkg/pki"
)
@@ -129,6 +133,10 @@
"kubernetes.default.svc",
"kubernetes.default.svc." + clusterDomain,
"localhost",
+ // Domain used to access the apiserver by Kubernetes components themselves,
+ // without going over Kubernetes networking. This domain only lives as a set of
+ // entries in local hostsfiles.
+ "metropolis-kube-apiserver",
},
// TODO(q3k): add service network internal apiserver address
[]net.IP{{10, 0, 255, 1}, {127, 0, 0, 1}},
@@ -153,6 +161,31 @@
return &pki
}
+// FromLocalConsensus returns a PKI stored on the given local consensus instance,
+// in the correct etcd namespace.
+func FromLocalConsensus(ctx context.Context, svc consensus.ServiceHandle) (*PKI, error) {
+ // TODO(q3k): make this configurable
+ clusterDomain := "cluster.local"
+
+ cstW := svc.Watch()
+ defer cstW.Close()
+ cst, err := cstW.Get(ctx, consensus.FilterRunning)
+ if err != nil {
+ return nil, fmt.Errorf("waiting for local consensus: %w", err)
+ }
+ kkv, err := cst.KubernetesClient()
+ if err != nil {
+ return nil, fmt.Errorf("retrieving kubernetes client: %w", err)
+ }
+ pki := New(kkv, clusterDomain)
+ // Run EnsureAll ASAP to prevent race conditions between two kpki instances
+ // attempting to initialize the PKI data at the same time.
+ if err := pki.EnsureAll(ctx); err != nil {
+ return nil, fmt.Errorf("initial ensure failed: %w", err)
+ }
+ return pki, nil
+}
+
// EnsureAll ensures that all static certificates (and the serviceaccount key)
// are present on etcd.
func (k *PKI) EnsureAll(ctx context.Context) error {
@@ -171,12 +204,12 @@
// Kubeconfig generates a kubeconfig blob for a given certificate name. The
// same lifetime semantics as in .Certificate apply.
-func (k *PKI) Kubeconfig(ctx context.Context, name KubeCertificateName) ([]byte, error) {
+func (k *PKI) Kubeconfig(ctx context.Context, name KubeCertificateName, endpoint KubernetesAPIEndpoint) ([]byte, error) {
c, ok := k.Certificates[name]
if !ok {
return nil, fmt.Errorf("no certificate %q", name)
}
- return Kubeconfig(ctx, k.KV, c)
+ return Kubeconfig(ctx, k.KV, c, endpoint)
}
// Certificate retrieves an x509 DER-encoded (but not PEM-wrapped) key and
@@ -197,31 +230,47 @@
return
}
-// Kubeconfig generates a kubeconfig blob for this certificate. The same
-// lifetime semantics as in .Ensure apply.
-func Kubeconfig(ctx context.Context, kv clientv3.KV, c *opki.Certificate) ([]byte, error) {
+// A KubernetesAPIEndpoint describes where a Kubeconfig will make a client
+// attempt to connect to reach the Kubernetes apiservers(s).
+type KubernetesAPIEndpoint string
- cert, err := c.Ensure(ctx, kv)
- if err != nil {
- return nil, fmt.Errorf("could not ensure certificate exists: %w", err)
+var (
+ // KubernetesAPIEndpointForWorker points Kubernetes workers to connect to a
+ // locally-running apiproxy, which in turn loadbalances the connection to
+ // controller nodes running in the cluster.
+ KubernetesAPIEndpointForWorker = KubernetesAPIEndpoint(fmt.Sprintf("https://127.0.0.1:%d", common.KubernetesWorkerLocalAPIPort))
+ // KubernetesAPIEndpointForController points Kubernetes controllers to connect to
+ // the locally-running API server.
+ KubernetesAPIEndpointForController = KubernetesAPIEndpoint(fmt.Sprintf("https://127.0.0.1:%d", common.KubernetesAPIPort))
+)
+
+// KubeconfigRaw emits a Kubeconfig for a given set of certificates, private key,
+// and a KubernetesAPIEndpoint. This function does not rely on the rest of the
+// (K)PKI infrastructure.
+func KubeconfigRaw(cacert, cert []byte, priv ed25519.PrivateKey, endpoint KubernetesAPIEndpoint) ([]byte, error) {
+ caX, _ := x509.ParseCertificate(cacert)
+ certX, _ := x509.ParseCertificate(cert)
+ if err := certX.CheckSignatureFrom(caX); err != nil {
+ return nil, fmt.Errorf("given ca does not sign given cert")
}
- key, err := c.PrivateKeyX509()
+ pub1 := priv.Public().(ed25519.PublicKey)
+ pub2 := certX.PublicKey.(ed25519.PublicKey)
+ if !bytes.Equal(pub1, pub2) {
+ return nil, fmt.Errorf("given private key does not match given cert (cert: %s, key: %s)", hex.EncodeToString(pub2), hex.EncodeToString(pub1))
+ }
+
+ key, err := x509.MarshalPKCS8PrivateKey(priv)
if err != nil {
- return nil, fmt.Errorf("could not get certificate's private key: %w", err)
+ return nil, fmt.Errorf("could not marshal private key: %w", err)
}
kubeconfig := configapi.NewConfig()
cluster := configapi.NewCluster()
- cluster.Server = fmt.Sprintf("https://127.0.0.1:%d", common.KubernetesAPIPort)
- ca, err := c.Issuer.CACertificate(ctx, kv)
- if err != nil {
- return nil, fmt.Errorf("could not get CA certificate: %w", err)
- }
- if ca != nil {
- cluster.CertificateAuthorityData = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: ca})
- }
+ cluster.Server = string(endpoint)
+
+ cluster.CertificateAuthorityData = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cacert})
kubeconfig.Clusters["default"] = cluster
authInfo := configapi.NewAuthInfo()
@@ -238,6 +287,24 @@
return clientcmd.Write(*kubeconfig)
}
+// Kubeconfig generates a kubeconfig blob for this certificate. The same
+// lifetime semantics as in .Ensure apply.
+func Kubeconfig(ctx context.Context, kv clientv3.KV, c *opki.Certificate, endpoint KubernetesAPIEndpoint) ([]byte, error) {
+ cert, err := c.Ensure(ctx, kv)
+ if err != nil {
+ return nil, fmt.Errorf("could not ensure certificate exists: %w", err)
+ }
+ if len(c.PrivateKey) != ed25519.PrivateKeySize {
+ return nil, fmt.Errorf("certificate has no associated private key")
+ }
+ ca, err := c.Issuer.CACertificate(ctx, kv)
+ if err != nil {
+ return nil, fmt.Errorf("could not get CA certificate: %w", err)
+ }
+
+ return KubeconfigRaw(ca, cert, c.PrivateKey, endpoint)
+}
+
// ServiceAccountKey retrieves (and possibly generates and stores on etcd) the
// Kubernetes service account key. The returned data is ready to be used by
// Kubernetes components (in PKIX form).
diff --git a/metropolis/node/kubernetes/scheduler.go b/metropolis/node/kubernetes/scheduler.go
index 6007142..5537dcc 100644
--- a/metropolis/node/kubernetes/scheduler.go
+++ b/metropolis/node/kubernetes/scheduler.go
@@ -40,7 +40,7 @@
if err != nil {
return nil, fmt.Errorf("failed to get scheduler serving certificate: %w", err)
}
- config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.SchedulerClient)
+ config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.SchedulerClient, pki.KubernetesAPIEndpointForController)
if err != nil {
return nil, fmt.Errorf("failed to get scheduler kubeconfig: %w", err)
}
diff --git a/metropolis/node/kubernetes/service_controller.go b/metropolis/node/kubernetes/service_controller.go
index 5d9580f..da92b7f 100644
--- a/metropolis/node/kubernetes/service_controller.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -76,7 +76,7 @@
return fmt.Errorf("could not generate scheduler pki config: %w", err)
}
- masterKubeconfig, err := s.c.KPKI.Kubeconfig(ctx, pki.Master)
+ masterKubeconfig, err := s.c.KPKI.Kubeconfig(ctx, pki.Master, pki.KubernetesAPIEndpointForController)
if err != nil {
return fmt.Errorf("could not generate master kubeconfig: %w", err)
}
@@ -243,7 +243,7 @@
if err != nil {
return nil, status.Errorf(codes.Unavailable, "Failed to get volatile client certificate: %v", err)
}
- kubeconfig, err := pki.Kubeconfig(ctx, s.c.KPKI.KV, client)
+ kubeconfig, err := pki.Kubeconfig(ctx, s.c.KPKI.KV, client, pki.KubernetesAPIEndpointForController)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "Failed to generate kubeconfig: %v", err)
}