blob: 02bce4b3bbfd5facba2ce9552f3f1c32c1ccc9ed [file] [log] [blame]
Tim Windelschmidtf64f1972023-07-28 00:00:50 +00001// Package metricsproxy implements an authenticating proxy in front of the K8s
2// controller-manager and scheduler providing unauthenticated access to the
3// metrics via local ports
4package metricsproxy
5
6import (
7 "context"
8 "crypto/tls"
9 "crypto/x509"
10 "fmt"
11 "io"
12 "net"
13 "net/http"
14
15 "k8s.io/kubernetes/cmd/kubeadm/app/constants"
16
17 "source.monogon.dev/metropolis/node"
18 "source.monogon.dev/metropolis/node/kubernetes/pki"
19 "source.monogon.dev/metropolis/pkg/supervisor"
20)
21
22type Service struct {
23 // KPKI is a reference to the Kubernetes PKI
24 KPKI *pki.PKI
25}
26
27type kubernetesExporter struct {
28 Name string
29 // TargetPort on which this exporter is running.
30 TargetPort node.Port
31 // TargetPort on which the unauthenticated exporter should run.
32 ListenPort node.Port
33 // ServerName used to verify the tls connection.
34 ServerName string
35}
36
37// services are the kubernetes services which are exposed via this proxy.
38var services = []*kubernetesExporter{
39 {
40 Name: "kubernetes-scheduler",
41 TargetPort: constants.KubeSchedulerPort,
42 ListenPort: node.MetricsKubeSchedulerListenerPort,
43 ServerName: "kube-scheduler.local",
44 },
45 {
46 Name: "kubernetes-controller-manager",
47 TargetPort: constants.KubeControllerManagerPort,
48 ListenPort: node.MetricsKubeControllerManagerListenerPort,
49 ServerName: "kube-controller-manager.local",
50 },
Lorenz Brun4b42c8a2023-11-19 07:02:51 +010051 {
52 Name: "kubernetes-apiserver",
53 TargetPort: node.KubernetesAPIPort,
54 ListenPort: node.MetricsKubeAPIServerListenerPort,
55 ServerName: "kubernetes",
56 },
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000057}
58
59type metricsService struct {
60 *kubernetesExporter
61 transport *http.Transport
62}
63
64func (s *metricsService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
65 if r.Method != http.MethodGet {
66 http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
67 return
68 }
69
70 ctx := r.Context()
71
72 // We are supplying the http.Server with a BaseContext that contains the
73 // context from our runnable which contains the logger
74 logger := supervisor.Logger(ctx)
75
76 url := "https://127.0.0.1:" + s.TargetPort.PortString() + "/metrics"
77 outReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
78 if err != nil {
79 logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
80 http.Error(w, "internal server error", http.StatusInternalServerError)
81 return
82 }
83
84 res, err := s.transport.RoundTrip(outReq)
85 if err != nil {
86 logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
87 http.Error(w, "could not reach exporter", http.StatusBadGateway)
88 return
89 }
90 defer res.Body.Close()
91
92 copyHeader(w.Header(), res.Header)
93 w.WriteHeader(res.StatusCode)
94
95 if _, err := io.Copy(w, res.Body); err != nil {
96 logger.Errorf("%s: copying response from %q failed: %v", r.RemoteAddr, s.Name, err)
97 return
98 }
99}
100
101func (s *metricsService) Run(ctx context.Context) error {
102 supervisor.Signal(ctx, supervisor.SignalHealthy)
103
104 srv := http.Server{
105 BaseContext: func(_ net.Listener) context.Context {
106 return ctx
107 },
108 Addr: net.JoinHostPort("127.0.0.1", s.ListenPort.PortString()),
109 Handler: s,
110 }
111
112 go func() {
113 <-ctx.Done()
114 srv.Close()
115 }()
116
117 err := srv.ListenAndServe()
118 if err != nil && ctx.Err() != nil {
119 return ctx.Err()
120 }
121 return fmt.Errorf("ListenAndServe: %w", err)
122}
123
124func (s *Service) Run(ctx context.Context) error {
125 // TODO(q3k): move this to IssueCertificates and replace with dedicated certificate
126 cert, key, err := s.KPKI.Certificate(ctx, pki.Master)
127 if err != nil {
128 return fmt.Errorf("could not load certificate %q from PKI: %w", pki.Master, err)
129 }
130 parsedKey, err := x509.ParsePKCS8PrivateKey(key)
131 if err != nil {
132 return fmt.Errorf("failed to parse key for cert %q: %w", pki.Master, err)
133 }
134
135 caCert, _, err := s.KPKI.Certificate(ctx, pki.IdCA)
136 if err != nil {
137 return fmt.Errorf("could not load certificate %q from PKI: %w", pki.IdCA, err)
138 }
139 parsedCACert, err := x509.ParseCertificate(caCert)
140 if err != nil {
141 return fmt.Errorf("failed to parse cert %q: %w", pki.IdCA, err)
142 }
143
144 rootCAs := x509.NewCertPool()
145 rootCAs.AddCert(parsedCACert)
146
147 tlsConfig := &tls.Config{
148 RootCAs: rootCAs,
149 Certificates: []tls.Certificate{{
150 Certificate: [][]byte{cert},
151 PrivateKey: parsedKey,
152 }},
153 }
154
155 for _, svc := range services {
156 tlsConfig := tlsConfig.Clone()
157 tlsConfig.ServerName = svc.ServerName
158
159 transport := http.DefaultTransport.(*http.Transport).Clone()
160 transport.TLSClientConfig = tlsConfig
161 transport.TLSClientConfig.ServerName = svc.ServerName
162
163 err := supervisor.Run(ctx, svc.Name, (&metricsService{
164 kubernetesExporter: svc,
165 transport: transport,
166 }).Run)
167 if err != nil {
168 return fmt.Errorf("could not run sub-service %q: %w", svc.Name, err)
169 }
170 }
171
172 supervisor.Signal(ctx, supervisor.SignalHealthy)
173
174 <-ctx.Done()
175 return ctx.Err()
176}
177
178func copyHeader(dst, src http.Header) {
179 for k, vv := range src {
180 for _, v := range vv {
181 dst.Add(k, v)
182 }
183 }
184}