metropolis/node/core/metrics: export (controller-manager|scheduler) metrics
Change-Id: Ie61551655cbf1130bb5f5beb2923dac1aa52f868
Reviewed-on: https://review.monogon.dev/c/monogon/+/1952
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/metrics/BUILD.bazel b/metropolis/node/core/metrics/BUILD.bazel
index 9233ee2..f698f0e 100644
--- a/metropolis/node/core/metrics/BUILD.bazel
+++ b/metropolis/node/core/metrics/BUILD.bazel
@@ -16,6 +16,7 @@
"//metropolis/pkg/logtree",
"//metropolis/pkg/supervisor",
"//metropolis/proto/common",
+ "@io_k8s_kubernetes//cmd/kubeadm/app/constants",
],
)
diff --git a/metropolis/node/core/metrics/exporters.go b/metropolis/node/core/metrics/exporters.go
index 2cbe18c..5949eb4 100644
--- a/metropolis/node/core/metrics/exporters.go
+++ b/metropolis/node/core/metrics/exporters.go
@@ -1,12 +1,15 @@
package metrics
import (
+ "crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/url"
+ "k8s.io/kubernetes/cmd/kubeadm/app/constants"
+
"source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/pkg/logtree"
)
@@ -21,6 +24,10 @@
Name string
// Port on which this exporter will be running.
Port node.Port
+ // ServerName used to verify the tls connection.
+ ServerName string
+ // TLSConfigFunc is used to configure tls authentication
+ TLSConfigFunc func(*Service, *Exporter) *tls.Config
// Executable to run to start the exporter.
Executable string
// Arguments to start the exporter. The exporter should listen at 127.0.0.1 and
@@ -46,10 +53,28 @@
Name: "etcd",
Port: node.MetricsEtcdListenerPort,
},
+ {
+ Name: "kubernetes-scheduler",
+ Port: constants.KubeSchedulerPort,
+ ServerName: "kube-scheduler.local",
+ TLSConfigFunc: (*Service).kubeTLSConfig,
+ },
+ {
+ Name: "kubernetes-controller-manager",
+ Port: constants.KubeControllerManagerPort,
+ ServerName: "kube-controller-manager.local",
+ TLSConfigFunc: (*Service).kubeTLSConfig,
+ },
+}
+
+func (s *Service) kubeTLSConfig(e *Exporter) *tls.Config {
+ c := s.KubeTLSConfig.Clone()
+ c.ServerName = e.ServerName
+ return c
}
// forward a given HTTP request to this exporter.
-func (e *Exporter) forward(logger logtree.LeveledLogger, w http.ResponseWriter, r *http.Request) {
+func (e *Exporter) forward(s *Service, logger logtree.LeveledLogger, w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
outreq := r.Clone(ctx)
@@ -58,6 +83,12 @@
Host: net.JoinHostPort("127.0.0.1", e.Port.PortString()),
Path: "/metrics",
}
+
+ transport := http.DefaultTransport.(*http.Transport).Clone()
+ if e.TLSConfigFunc != nil {
+ outreq.URL.Scheme = "https"
+ transport.TLSClientConfig = e.TLSConfigFunc(s, e)
+ }
logger.V(1).Infof("%s: forwarding %s to %s", r.RemoteAddr, r.URL.String(), outreq.URL.String())
if r.ContentLength == 0 {
@@ -66,7 +97,7 @@
if outreq.Body != nil {
defer outreq.Body.Close()
}
- res, err := http.DefaultTransport.RoundTrip(outreq)
+ res, err := transport.RoundTrip(outreq)
if err != nil {
logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, e.Name, err)
w.WriteHeader(502)
diff --git a/metropolis/node/core/metrics/metrics.go b/metropolis/node/core/metrics/metrics.go
index 7126459..e087ada 100644
--- a/metropolis/node/core/metrics/metrics.go
+++ b/metropolis/node/core/metrics/metrics.go
@@ -44,6 +44,10 @@
// LocalRoles contains the local node roles which gets listened on and
// is required to decide whether or not to start the discovery routine
LocalRoles *memory.Value[*cpb.NodeRoles]
+ // KubeTLSConfig provides the tls.Config for authenticating against kubernetes
+ // services.
+ KubeTLSConfig *tls.Config
+
// List of Exporters to run and to forward HTTP requests to. If not set, defaults
// to DefaultExporters.
Exporters []Exporter
@@ -130,7 +134,7 @@
exporter := exporter
mux.HandleFunc(exporter.externalPath(), func(w http.ResponseWriter, r *http.Request) {
- exporter.forward(logger, w, r)
+ exporter.forward(s, logger, w, r)
})
logger.Infof("Registered exporter %q", exporter.Name)
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 02bc510..0c486d1 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -168,6 +168,7 @@
s.metrics = &workerMetrics{
curatorConnection: &s.CuratorConnection,
localRoles: &s.localRoles,
+ localControlplane: &s.localControlPlane,
}
return s
diff --git a/metropolis/node/core/roleserve/worker_metrics.go b/metropolis/node/core/roleserve/worker_metrics.go
index a9add88..d0c89a4 100644
--- a/metropolis/node/core/roleserve/worker_metrics.go
+++ b/metropolis/node/core/roleserve/worker_metrics.go
@@ -2,10 +2,14 @@
import (
"context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
cpb "source.monogon.dev/metropolis/proto/common"
"source.monogon.dev/metropolis/node/core/metrics"
+ kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -19,6 +23,7 @@
type workerMetrics struct {
curatorConnection *memory.Value[*curatorConnection]
localRoles *memory.Value[*cpb.NodeRoles]
+ localControlplane *memory.Value[*localControlPlane]
}
func (s *workerMetrics) run(ctx context.Context) error {
@@ -32,10 +37,53 @@
}
supervisor.Logger(ctx).Infof("Got curator connection, starting...")
+ lw := s.localControlplane.Watch()
+ defer lw.Close()
+ cp, err := lw.Get(ctx)
+ if err != nil {
+ return err
+ }
+
+ pki, err := kpki.FromLocalConsensus(ctx, cp.consensus)
+ if err != nil {
+ return err
+ }
+
+ // TODO(q3k): move this to IssueCertificates and replace with dedicated certificate
+ cert, key, err := pki.Certificate(ctx, kpki.Master)
+ if err != nil {
+ return fmt.Errorf("could not load certificate %q from PKI: %w", kpki.Master, err)
+ }
+ parsedKey, err := x509.ParsePKCS8PrivateKey(key)
+ if err != nil {
+ return fmt.Errorf("failed to parse key for cert %q: %w", kpki.Master, err)
+ }
+
+ caCert, _, err := pki.Certificate(ctx, kpki.IdCA)
+ if err != nil {
+ return fmt.Errorf("could not load certificate %q from PKI: %w", kpki.IdCA, err)
+ }
+ parsedCACert, err := x509.ParseCertificate(caCert)
+ if err != nil {
+ return fmt.Errorf("failed to parse cert %q: %w", kpki.IdCA, err)
+ }
+
+ rootCAs := x509.NewCertPool()
+ rootCAs.AddCert(parsedCACert)
+
+ kubeTLSConfig := &tls.Config{
+ RootCAs: rootCAs,
+ Certificates: []tls.Certificate{{
+ Certificate: [][]byte{cert},
+ PrivateKey: parsedKey,
+ }},
+ }
+
svc := metrics.Service{
- Credentials: cc.credentials,
- Curator: ipb.NewCuratorClient(cc.conn),
- LocalRoles: s.localRoles,
+ Credentials: cc.credentials,
+ Curator: ipb.NewCuratorClient(cc.conn),
+ LocalRoles: s.localRoles,
+ KubeTLSConfig: kubeTLSConfig,
}
return svc.Run(ctx)
}