metropolis/node/core/metrics: export (controller-manager|scheduler) metrics

Change-Id: Ie61551655cbf1130bb5f5beb2923dac1aa52f868
Reviewed-on: https://review.monogon.dev/c/monogon/+/1952
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/metrics/BUILD.bazel b/metropolis/node/core/metrics/BUILD.bazel
index 9233ee2..f698f0e 100644
--- a/metropolis/node/core/metrics/BUILD.bazel
+++ b/metropolis/node/core/metrics/BUILD.bazel
@@ -16,6 +16,7 @@
         "//metropolis/pkg/logtree",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/common",
+        "@io_k8s_kubernetes//cmd/kubeadm/app/constants",
     ],
 )
 
diff --git a/metropolis/node/core/metrics/exporters.go b/metropolis/node/core/metrics/exporters.go
index 2cbe18c..5949eb4 100644
--- a/metropolis/node/core/metrics/exporters.go
+++ b/metropolis/node/core/metrics/exporters.go
@@ -1,12 +1,15 @@
 package metrics
 
 import (
+	"crypto/tls"
 	"fmt"
 	"io"
 	"net"
 	"net/http"
 	"net/url"
 
+	"k8s.io/kubernetes/cmd/kubeadm/app/constants"
+
 	"source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/pkg/logtree"
 )
@@ -21,6 +24,10 @@
 	Name string
 	// Port on which this exporter will be running.
 	Port node.Port
+	// ServerName used to verify the tls connection.
+	ServerName string
+	// TLSConfigFunc is used to configure tls authentication
+	TLSConfigFunc func(*Service, *Exporter) *tls.Config
 	// Executable to run to start the exporter.
 	Executable string
 	// Arguments to start the exporter. The exporter should listen at 127.0.0.1 and
@@ -46,10 +53,28 @@
 		Name: "etcd",
 		Port: node.MetricsEtcdListenerPort,
 	},
+	{
+		Name:          "kubernetes-scheduler",
+		Port:          constants.KubeSchedulerPort,
+		ServerName:    "kube-scheduler.local",
+		TLSConfigFunc: (*Service).kubeTLSConfig,
+	},
+	{
+		Name:          "kubernetes-controller-manager",
+		Port:          constants.KubeControllerManagerPort,
+		ServerName:    "kube-controller-manager.local",
+		TLSConfigFunc: (*Service).kubeTLSConfig,
+	},
+}
+
+func (s *Service) kubeTLSConfig(e *Exporter) *tls.Config {
+	c := s.KubeTLSConfig.Clone()
+	c.ServerName = e.ServerName
+	return c
 }
 
 // forward a given HTTP request to this exporter.
-func (e *Exporter) forward(logger logtree.LeveledLogger, w http.ResponseWriter, r *http.Request) {
+func (e *Exporter) forward(s *Service, logger logtree.LeveledLogger, w http.ResponseWriter, r *http.Request) {
 	ctx := r.Context()
 	outreq := r.Clone(ctx)
 
@@ -58,6 +83,12 @@
 		Host:   net.JoinHostPort("127.0.0.1", e.Port.PortString()),
 		Path:   "/metrics",
 	}
+
+	transport := http.DefaultTransport.(*http.Transport).Clone()
+	if e.TLSConfigFunc != nil {
+		outreq.URL.Scheme = "https"
+		transport.TLSClientConfig = e.TLSConfigFunc(s, e)
+	}
 	logger.V(1).Infof("%s: forwarding %s to %s", r.RemoteAddr, r.URL.String(), outreq.URL.String())
 
 	if r.ContentLength == 0 {
@@ -66,7 +97,7 @@
 	if outreq.Body != nil {
 		defer outreq.Body.Close()
 	}
-	res, err := http.DefaultTransport.RoundTrip(outreq)
+	res, err := transport.RoundTrip(outreq)
 	if err != nil {
 		logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, e.Name, err)
 		w.WriteHeader(502)
diff --git a/metropolis/node/core/metrics/metrics.go b/metropolis/node/core/metrics/metrics.go
index 7126459..e087ada 100644
--- a/metropolis/node/core/metrics/metrics.go
+++ b/metropolis/node/core/metrics/metrics.go
@@ -44,6 +44,10 @@
 	// LocalRoles contains the local node roles which gets listened on and
 	// is required to decide whether or not to start the discovery routine
 	LocalRoles *memory.Value[*cpb.NodeRoles]
+	// KubeTLSConfig provides the tls.Config for authenticating against kubernetes
+	// services.
+	KubeTLSConfig *tls.Config
+
 	// List of Exporters to run and to forward HTTP requests to. If not set, defaults
 	// to DefaultExporters.
 	Exporters []Exporter
@@ -130,7 +134,7 @@
 		exporter := exporter
 
 		mux.HandleFunc(exporter.externalPath(), func(w http.ResponseWriter, r *http.Request) {
-			exporter.forward(logger, w, r)
+			exporter.forward(s, logger, w, r)
 		})
 
 		logger.Infof("Registered exporter %q", exporter.Name)
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 02bc510..0c486d1 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -168,6 +168,7 @@
 	s.metrics = &workerMetrics{
 		curatorConnection: &s.CuratorConnection,
 		localRoles:        &s.localRoles,
+		localControlplane: &s.localControlPlane,
 	}
 
 	return s
diff --git a/metropolis/node/core/roleserve/worker_metrics.go b/metropolis/node/core/roleserve/worker_metrics.go
index a9add88..d0c89a4 100644
--- a/metropolis/node/core/roleserve/worker_metrics.go
+++ b/metropolis/node/core/roleserve/worker_metrics.go
@@ -2,10 +2,14 @@
 
 import (
 	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"fmt"
 
 	cpb "source.monogon.dev/metropolis/proto/common"
 
 	"source.monogon.dev/metropolis/node/core/metrics"
+	kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 
@@ -19,6 +23,7 @@
 type workerMetrics struct {
 	curatorConnection *memory.Value[*curatorConnection]
 	localRoles        *memory.Value[*cpb.NodeRoles]
+	localControlplane *memory.Value[*localControlPlane]
 }
 
 func (s *workerMetrics) run(ctx context.Context) error {
@@ -32,10 +37,53 @@
 	}
 	supervisor.Logger(ctx).Infof("Got curator connection, starting...")
 
+	lw := s.localControlplane.Watch()
+	defer lw.Close()
+	cp, err := lw.Get(ctx)
+	if err != nil {
+		return err
+	}
+
+	pki, err := kpki.FromLocalConsensus(ctx, cp.consensus)
+	if err != nil {
+		return err
+	}
+
+	// TODO(q3k): move this to IssueCertificates and replace with dedicated certificate
+	cert, key, err := pki.Certificate(ctx, kpki.Master)
+	if err != nil {
+		return fmt.Errorf("could not load certificate %q from PKI: %w", kpki.Master, err)
+	}
+	parsedKey, err := x509.ParsePKCS8PrivateKey(key)
+	if err != nil {
+		return fmt.Errorf("failed to parse key for cert %q: %w", kpki.Master, err)
+	}
+
+	caCert, _, err := pki.Certificate(ctx, kpki.IdCA)
+	if err != nil {
+		return fmt.Errorf("could not load certificate %q from PKI: %w", kpki.IdCA, err)
+	}
+	parsedCACert, err := x509.ParseCertificate(caCert)
+	if err != nil {
+		return fmt.Errorf("failed to parse cert %q: %w", kpki.IdCA, err)
+	}
+
+	rootCAs := x509.NewCertPool()
+	rootCAs.AddCert(parsedCACert)
+
+	kubeTLSConfig := &tls.Config{
+		RootCAs: rootCAs,
+		Certificates: []tls.Certificate{{
+			Certificate: [][]byte{cert},
+			PrivateKey:  parsedKey,
+		}},
+	}
+
 	svc := metrics.Service{
-		Credentials: cc.credentials,
-		Curator:     ipb.NewCuratorClient(cc.conn),
-		LocalRoles:  s.localRoles,
+		Credentials:   cc.credentials,
+		Curator:       ipb.NewCuratorClient(cc.conn),
+		LocalRoles:    s.localRoles,
+		KubeTLSConfig: kubeTLSConfig,
 	}
 	return svc.Run(ctx)
 }