metropolis/node/core/metrics: fixup metrics authentication
Change-Id: I67643855ab61bfdea980211ffe01e50c2409882b
Reviewed-on: https://review.monogon.dev/c/monogon/+/1979
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/kubernetes/metricsproxy/metricsproxy.go b/metropolis/node/kubernetes/metricsproxy/metricsproxy.go
new file mode 100644
index 0000000..14f76a0
--- /dev/null
+++ b/metropolis/node/kubernetes/metricsproxy/metricsproxy.go
@@ -0,0 +1,178 @@
+// Package metricsproxy implements an authenticating proxy in front of the K8s
+// controller-manager and scheduler providing unauthenticated access to the
+// metrics via local ports
+package metricsproxy
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+
+ "k8s.io/kubernetes/cmd/kubeadm/app/constants"
+
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/kubernetes/pki"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+type Service struct {
+ // KPKI is a reference to the Kubernetes PKI
+ KPKI *pki.PKI
+}
+
+type kubernetesExporter struct {
+ Name string
+ // TargetPort on which this exporter is running.
+ TargetPort node.Port
+ // TargetPort on which the unauthenticated exporter should run.
+ ListenPort node.Port
+ // ServerName used to verify the tls connection.
+ ServerName string
+}
+
+// services are the kubernetes services which are exposed via this proxy.
+var services = []*kubernetesExporter{
+ {
+ Name: "kubernetes-scheduler",
+ TargetPort: constants.KubeSchedulerPort,
+ ListenPort: node.MetricsKubeSchedulerListenerPort,
+ ServerName: "kube-scheduler.local",
+ },
+ {
+ Name: "kubernetes-controller-manager",
+ TargetPort: constants.KubeControllerManagerPort,
+ ListenPort: node.MetricsKubeControllerManagerListenerPort,
+ ServerName: "kube-controller-manager.local",
+ },
+}
+
+type metricsService struct {
+ *kubernetesExporter
+ transport *http.Transport
+}
+
+func (s *metricsService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
+ return
+ }
+
+ ctx := r.Context()
+
+ // We are supplying the http.Server with a BaseContext that contains the
+ // context from our runnable which contains the logger
+ logger := supervisor.Logger(ctx)
+
+ url := "https://127.0.0.1:" + s.TargetPort.PortString() + "/metrics"
+ outReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
+ if err != nil {
+ logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
+ http.Error(w, "internal server error", http.StatusInternalServerError)
+ return
+ }
+
+ res, err := s.transport.RoundTrip(outReq)
+ if err != nil {
+ logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
+ http.Error(w, "could not reach exporter", http.StatusBadGateway)
+ return
+ }
+ defer res.Body.Close()
+
+ copyHeader(w.Header(), res.Header)
+ w.WriteHeader(res.StatusCode)
+
+ if _, err := io.Copy(w, res.Body); err != nil {
+ logger.Errorf("%s: copying response from %q failed: %v", r.RemoteAddr, s.Name, err)
+ return
+ }
+}
+
+func (s *metricsService) Run(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ srv := http.Server{
+ BaseContext: func(_ net.Listener) context.Context {
+ return ctx
+ },
+ Addr: net.JoinHostPort("127.0.0.1", s.ListenPort.PortString()),
+ Handler: s,
+ }
+
+ go func() {
+ <-ctx.Done()
+ srv.Close()
+ }()
+
+ err := srv.ListenAndServe()
+ if err != nil && ctx.Err() != nil {
+ return ctx.Err()
+ }
+ return fmt.Errorf("ListenAndServe: %w", err)
+}
+
+func (s *Service) Run(ctx context.Context) error {
+ // TODO(q3k): move this to IssueCertificates and replace with dedicated certificate
+ cert, key, err := s.KPKI.Certificate(ctx, pki.Master)
+ if err != nil {
+ return fmt.Errorf("could not load certificate %q from PKI: %w", pki.Master, err)
+ }
+ parsedKey, err := x509.ParsePKCS8PrivateKey(key)
+ if err != nil {
+ return fmt.Errorf("failed to parse key for cert %q: %w", pki.Master, err)
+ }
+
+ caCert, _, err := s.KPKI.Certificate(ctx, pki.IdCA)
+ if err != nil {
+ return fmt.Errorf("could not load certificate %q from PKI: %w", pki.IdCA, err)
+ }
+ parsedCACert, err := x509.ParseCertificate(caCert)
+ if err != nil {
+ return fmt.Errorf("failed to parse cert %q: %w", pki.IdCA, err)
+ }
+
+ rootCAs := x509.NewCertPool()
+ rootCAs.AddCert(parsedCACert)
+
+ tlsConfig := &tls.Config{
+ RootCAs: rootCAs,
+ Certificates: []tls.Certificate{{
+ Certificate: [][]byte{cert},
+ PrivateKey: parsedKey,
+ }},
+ }
+
+ for _, svc := range services {
+ tlsConfig := tlsConfig.Clone()
+ tlsConfig.ServerName = svc.ServerName
+
+ transport := http.DefaultTransport.(*http.Transport).Clone()
+ transport.TLSClientConfig = tlsConfig
+ transport.TLSClientConfig.ServerName = svc.ServerName
+
+ err := supervisor.Run(ctx, svc.Name, (&metricsService{
+ kubernetesExporter: svc,
+ transport: transport,
+ }).Run)
+ if err != nil {
+ return fmt.Errorf("could not run sub-service %q: %w", svc.Name, err)
+ }
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ <-ctx.Done()
+ return ctx.Err()
+}
+
+func copyHeader(dst, src http.Header) {
+ for k, vv := range src {
+ for _, v := range vv {
+ dst.Add(k, v)
+ }
+ }
+}