blob: c098facd1eee1a3c3d4962dac4863bef8b09a82d [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Tim Windelschmidtf64f1972023-07-28 00:00:50 +00004// Package metricsproxy implements an authenticating proxy in front of the K8s
5// controller-manager and scheduler providing unauthenticated access to the
6// metrics via local ports
7package metricsproxy
8
9import (
10 "context"
11 "crypto/tls"
12 "crypto/x509"
13 "fmt"
14 "io"
15 "net"
16 "net/http"
17
18 "k8s.io/kubernetes/cmd/kubeadm/app/constants"
19
Jan Schär0f8ce4c2025-09-04 13:27:50 +020020 "source.monogon.dev/metropolis/node/allocs"
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000021 "source.monogon.dev/metropolis/node/kubernetes/pki"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020022 "source.monogon.dev/osbase/supervisor"
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000023)
24
25type Service struct {
26 // KPKI is a reference to the Kubernetes PKI
27 KPKI *pki.PKI
28}
29
30type kubernetesExporter struct {
31 Name string
32 // TargetPort on which this exporter is running.
Jan Schär0f8ce4c2025-09-04 13:27:50 +020033 TargetPort allocs.Port
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000034 // TargetPort on which the unauthenticated exporter should run.
Jan Schär0f8ce4c2025-09-04 13:27:50 +020035 ListenPort allocs.Port
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000036 // ServerName used to verify the tls connection.
37 ServerName string
38}
39
40// services are the kubernetes services which are exposed via this proxy.
41var services = []*kubernetesExporter{
42 {
43 Name: "kubernetes-scheduler",
44 TargetPort: constants.KubeSchedulerPort,
Jan Schär0f8ce4c2025-09-04 13:27:50 +020045 ListenPort: allocs.PortMetricsKubeSchedulerListener,
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000046 ServerName: "kube-scheduler.local",
47 },
48 {
49 Name: "kubernetes-controller-manager",
50 TargetPort: constants.KubeControllerManagerPort,
Jan Schär0f8ce4c2025-09-04 13:27:50 +020051 ListenPort: allocs.PortMetricsKubeControllerManagerListener,
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000052 ServerName: "kube-controller-manager.local",
53 },
Lorenz Brun4b42c8a2023-11-19 07:02:51 +010054 {
55 Name: "kubernetes-apiserver",
Jan Schär0f8ce4c2025-09-04 13:27:50 +020056 TargetPort: allocs.PortKubernetesAPI,
57 ListenPort: allocs.PortMetricsKubeAPIServerListener,
Lorenz Brun4b42c8a2023-11-19 07:02:51 +010058 ServerName: "kubernetes",
59 },
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000060}
61
62type metricsService struct {
63 *kubernetesExporter
64 transport *http.Transport
65}
66
67func (s *metricsService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
68 if r.Method != http.MethodGet {
69 http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
70 return
71 }
72
73 ctx := r.Context()
74
75 // We are supplying the http.Server with a BaseContext that contains the
76 // context from our runnable which contains the logger
77 logger := supervisor.Logger(ctx)
78
79 url := "https://127.0.0.1:" + s.TargetPort.PortString() + "/metrics"
80 outReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
81 if err != nil {
82 logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
83 http.Error(w, "internal server error", http.StatusInternalServerError)
84 return
85 }
86
87 res, err := s.transport.RoundTrip(outReq)
88 if err != nil {
89 logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
Tim Windelschmidt22a71c12024-04-03 04:06:08 +020090 http.Error(w, "could not reach endpoint", http.StatusBadGateway)
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000091 return
92 }
93 defer res.Body.Close()
94
95 copyHeader(w.Header(), res.Header)
96 w.WriteHeader(res.StatusCode)
97
98 if _, err := io.Copy(w, res.Body); err != nil {
99 logger.Errorf("%s: copying response from %q failed: %v", r.RemoteAddr, s.Name, err)
100 return
101 }
102}
103
104func (s *metricsService) Run(ctx context.Context) error {
105 supervisor.Signal(ctx, supervisor.SignalHealthy)
106
107 srv := http.Server{
108 BaseContext: func(_ net.Listener) context.Context {
109 return ctx
110 },
111 Addr: net.JoinHostPort("127.0.0.1", s.ListenPort.PortString()),
112 Handler: s,
113 }
114
115 go func() {
116 <-ctx.Done()
117 srv.Close()
118 }()
119
120 err := srv.ListenAndServe()
121 if err != nil && ctx.Err() != nil {
122 return ctx.Err()
123 }
124 return fmt.Errorf("ListenAndServe: %w", err)
125}
126
127func (s *Service) Run(ctx context.Context) error {
128 // TODO(q3k): move this to IssueCertificates and replace with dedicated certificate
129 cert, key, err := s.KPKI.Certificate(ctx, pki.Master)
130 if err != nil {
131 return fmt.Errorf("could not load certificate %q from PKI: %w", pki.Master, err)
132 }
133 parsedKey, err := x509.ParsePKCS8PrivateKey(key)
134 if err != nil {
135 return fmt.Errorf("failed to parse key for cert %q: %w", pki.Master, err)
136 }
137
138 caCert, _, err := s.KPKI.Certificate(ctx, pki.IdCA)
139 if err != nil {
140 return fmt.Errorf("could not load certificate %q from PKI: %w", pki.IdCA, err)
141 }
142 parsedCACert, err := x509.ParseCertificate(caCert)
143 if err != nil {
144 return fmt.Errorf("failed to parse cert %q: %w", pki.IdCA, err)
145 }
146
147 rootCAs := x509.NewCertPool()
148 rootCAs.AddCert(parsedCACert)
149
150 tlsConfig := &tls.Config{
151 RootCAs: rootCAs,
152 Certificates: []tls.Certificate{{
153 Certificate: [][]byte{cert},
154 PrivateKey: parsedKey,
155 }},
156 }
157
158 for _, svc := range services {
159 tlsConfig := tlsConfig.Clone()
160 tlsConfig.ServerName = svc.ServerName
161
162 transport := http.DefaultTransport.(*http.Transport).Clone()
163 transport.TLSClientConfig = tlsConfig
164 transport.TLSClientConfig.ServerName = svc.ServerName
165
166 err := supervisor.Run(ctx, svc.Name, (&metricsService{
167 kubernetesExporter: svc,
168 transport: transport,
169 }).Run)
170 if err != nil {
171 return fmt.Errorf("could not run sub-service %q: %w", svc.Name, err)
172 }
173 }
174
175 supervisor.Signal(ctx, supervisor.SignalHealthy)
176
177 <-ctx.Done()
178 return ctx.Err()
179}
180
181func copyHeader(dst, src http.Header) {
182 for k, vv := range src {
183 for _, v := range vv {
184 dst.Add(k, v)
185 }
186 }
187}