blob: 14f76a009a6471cc9dc7dbcde6a1842e0a79aba0 [file] [log] [blame]
// Package metricsproxy implements an authenticating proxy in front of the K8s
// controller-manager and scheduler providing unauthenticated access to the
// metrics via local ports
package metricsproxy
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net"
"net/http"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/kubernetes/pki"
"source.monogon.dev/metropolis/pkg/supervisor"
)
type Service struct {
// KPKI is a reference to the Kubernetes PKI
KPKI *pki.PKI
}
type kubernetesExporter struct {
Name string
// TargetPort on which this exporter is running.
TargetPort node.Port
// TargetPort on which the unauthenticated exporter should run.
ListenPort node.Port
// ServerName used to verify the tls connection.
ServerName string
}
// services are the kubernetes services which are exposed via this proxy.
var services = []*kubernetesExporter{
{
Name: "kubernetes-scheduler",
TargetPort: constants.KubeSchedulerPort,
ListenPort: node.MetricsKubeSchedulerListenerPort,
ServerName: "kube-scheduler.local",
},
{
Name: "kubernetes-controller-manager",
TargetPort: constants.KubeControllerManagerPort,
ListenPort: node.MetricsKubeControllerManagerListenerPort,
ServerName: "kube-controller-manager.local",
},
}
type metricsService struct {
*kubernetesExporter
transport *http.Transport
}
func (s *metricsService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
return
}
ctx := r.Context()
// We are supplying the http.Server with a BaseContext that contains the
// context from our runnable which contains the logger
logger := supervisor.Logger(ctx)
url := "https://127.0.0.1:" + s.TargetPort.PortString() + "/metrics"
outReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
res, err := s.transport.RoundTrip(outReq)
if err != nil {
logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
http.Error(w, "could not reach exporter", http.StatusBadGateway)
return
}
defer res.Body.Close()
copyHeader(w.Header(), res.Header)
w.WriteHeader(res.StatusCode)
if _, err := io.Copy(w, res.Body); err != nil {
logger.Errorf("%s: copying response from %q failed: %v", r.RemoteAddr, s.Name, err)
return
}
}
func (s *metricsService) Run(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
srv := http.Server{
BaseContext: func(_ net.Listener) context.Context {
return ctx
},
Addr: net.JoinHostPort("127.0.0.1", s.ListenPort.PortString()),
Handler: s,
}
go func() {
<-ctx.Done()
srv.Close()
}()
err := srv.ListenAndServe()
if err != nil && ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("ListenAndServe: %w", err)
}
func (s *Service) Run(ctx context.Context) error {
// TODO(q3k): move this to IssueCertificates and replace with dedicated certificate
cert, key, err := s.KPKI.Certificate(ctx, pki.Master)
if err != nil {
return fmt.Errorf("could not load certificate %q from PKI: %w", pki.Master, err)
}
parsedKey, err := x509.ParsePKCS8PrivateKey(key)
if err != nil {
return fmt.Errorf("failed to parse key for cert %q: %w", pki.Master, err)
}
caCert, _, err := s.KPKI.Certificate(ctx, pki.IdCA)
if err != nil {
return fmt.Errorf("could not load certificate %q from PKI: %w", pki.IdCA, err)
}
parsedCACert, err := x509.ParseCertificate(caCert)
if err != nil {
return fmt.Errorf("failed to parse cert %q: %w", pki.IdCA, err)
}
rootCAs := x509.NewCertPool()
rootCAs.AddCert(parsedCACert)
tlsConfig := &tls.Config{
RootCAs: rootCAs,
Certificates: []tls.Certificate{{
Certificate: [][]byte{cert},
PrivateKey: parsedKey,
}},
}
for _, svc := range services {
tlsConfig := tlsConfig.Clone()
tlsConfig.ServerName = svc.ServerName
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = tlsConfig
transport.TLSClientConfig.ServerName = svc.ServerName
err := supervisor.Run(ctx, svc.Name, (&metricsService{
kubernetesExporter: svc,
transport: transport,
}).Run)
if err != nil {
return fmt.Errorf("could not run sub-service %q: %w", svc.Name, err)
}
}
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
return ctx.Err()
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}