blob: 14f76a009a6471cc9dc7dbcde6a1842e0a79aba0 [file] [log] [blame]
Tim Windelschmidtf64f1972023-07-28 00:00:50 +00001// Package metricsproxy implements an authenticating proxy in front of the K8s
2// controller-manager and scheduler providing unauthenticated access to the
3// metrics via local ports
4package metricsproxy
5
6import (
7 "context"
8 "crypto/tls"
9 "crypto/x509"
10 "fmt"
11 "io"
12 "net"
13 "net/http"
14
15 "k8s.io/kubernetes/cmd/kubeadm/app/constants"
16
17 "source.monogon.dev/metropolis/node"
18 "source.monogon.dev/metropolis/node/kubernetes/pki"
19 "source.monogon.dev/metropolis/pkg/supervisor"
20)
21
22type Service struct {
23 // KPKI is a reference to the Kubernetes PKI
24 KPKI *pki.PKI
25}
26
27type kubernetesExporter struct {
28 Name string
29 // TargetPort on which this exporter is running.
30 TargetPort node.Port
31 // TargetPort on which the unauthenticated exporter should run.
32 ListenPort node.Port
33 // ServerName used to verify the tls connection.
34 ServerName string
35}
36
37// services are the kubernetes services which are exposed via this proxy.
38var services = []*kubernetesExporter{
39 {
40 Name: "kubernetes-scheduler",
41 TargetPort: constants.KubeSchedulerPort,
42 ListenPort: node.MetricsKubeSchedulerListenerPort,
43 ServerName: "kube-scheduler.local",
44 },
45 {
46 Name: "kubernetes-controller-manager",
47 TargetPort: constants.KubeControllerManagerPort,
48 ListenPort: node.MetricsKubeControllerManagerListenerPort,
49 ServerName: "kube-controller-manager.local",
50 },
51}
52
53type metricsService struct {
54 *kubernetesExporter
55 transport *http.Transport
56}
57
58func (s *metricsService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
59 if r.Method != http.MethodGet {
60 http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
61 return
62 }
63
64 ctx := r.Context()
65
66 // We are supplying the http.Server with a BaseContext that contains the
67 // context from our runnable which contains the logger
68 logger := supervisor.Logger(ctx)
69
70 url := "https://127.0.0.1:" + s.TargetPort.PortString() + "/metrics"
71 outReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
72 if err != nil {
73 logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
74 http.Error(w, "internal server error", http.StatusInternalServerError)
75 return
76 }
77
78 res, err := s.transport.RoundTrip(outReq)
79 if err != nil {
80 logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
81 http.Error(w, "could not reach exporter", http.StatusBadGateway)
82 return
83 }
84 defer res.Body.Close()
85
86 copyHeader(w.Header(), res.Header)
87 w.WriteHeader(res.StatusCode)
88
89 if _, err := io.Copy(w, res.Body); err != nil {
90 logger.Errorf("%s: copying response from %q failed: %v", r.RemoteAddr, s.Name, err)
91 return
92 }
93}
94
95func (s *metricsService) Run(ctx context.Context) error {
96 supervisor.Signal(ctx, supervisor.SignalHealthy)
97
98 srv := http.Server{
99 BaseContext: func(_ net.Listener) context.Context {
100 return ctx
101 },
102 Addr: net.JoinHostPort("127.0.0.1", s.ListenPort.PortString()),
103 Handler: s,
104 }
105
106 go func() {
107 <-ctx.Done()
108 srv.Close()
109 }()
110
111 err := srv.ListenAndServe()
112 if err != nil && ctx.Err() != nil {
113 return ctx.Err()
114 }
115 return fmt.Errorf("ListenAndServe: %w", err)
116}
117
118func (s *Service) Run(ctx context.Context) error {
119 // TODO(q3k): move this to IssueCertificates and replace with dedicated certificate
120 cert, key, err := s.KPKI.Certificate(ctx, pki.Master)
121 if err != nil {
122 return fmt.Errorf("could not load certificate %q from PKI: %w", pki.Master, err)
123 }
124 parsedKey, err := x509.ParsePKCS8PrivateKey(key)
125 if err != nil {
126 return fmt.Errorf("failed to parse key for cert %q: %w", pki.Master, err)
127 }
128
129 caCert, _, err := s.KPKI.Certificate(ctx, pki.IdCA)
130 if err != nil {
131 return fmt.Errorf("could not load certificate %q from PKI: %w", pki.IdCA, err)
132 }
133 parsedCACert, err := x509.ParseCertificate(caCert)
134 if err != nil {
135 return fmt.Errorf("failed to parse cert %q: %w", pki.IdCA, err)
136 }
137
138 rootCAs := x509.NewCertPool()
139 rootCAs.AddCert(parsedCACert)
140
141 tlsConfig := &tls.Config{
142 RootCAs: rootCAs,
143 Certificates: []tls.Certificate{{
144 Certificate: [][]byte{cert},
145 PrivateKey: parsedKey,
146 }},
147 }
148
149 for _, svc := range services {
150 tlsConfig := tlsConfig.Clone()
151 tlsConfig.ServerName = svc.ServerName
152
153 transport := http.DefaultTransport.(*http.Transport).Clone()
154 transport.TLSClientConfig = tlsConfig
155 transport.TLSClientConfig.ServerName = svc.ServerName
156
157 err := supervisor.Run(ctx, svc.Name, (&metricsService{
158 kubernetesExporter: svc,
159 transport: transport,
160 }).Run)
161 if err != nil {
162 return fmt.Errorf("could not run sub-service %q: %w", svc.Name, err)
163 }
164 }
165
166 supervisor.Signal(ctx, supervisor.SignalHealthy)
167
168 <-ctx.Done()
169 return ctx.Err()
170}
171
172func copyHeader(dst, src http.Header) {
173 for k, vv := range src {
174 for _, v := range vv {
175 dst.Add(k, v)
176 }
177 }
178}