metropolis: implement Metrics Service
This is the first pass at a Metrics Service. It currently consists of an
HTTP reverse proxy which authenticates incoming connections using the
Cluster CA and certificates, and passes these connections over to a
locally running node_exporter.
In the future more exporters will be added, and we will likely also run
our own exporter for Metropolis-specific metrics.
Change-Id: Ibab52aa303965dd7d975f5035f411d1c56ad73e6
Reviewed-on: https://review.monogon.dev/c/monogon/+/1816
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/node/core/metrics/BUILD.bazel b/metropolis/node/core/metrics/BUILD.bazel
new file mode 100644
index 0000000..da69068
--- /dev/null
+++ b/metropolis/node/core/metrics/BUILD.bazel
@@ -0,0 +1,34 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "metrics",
+ srcs = [
+ "exporters.go",
+ "metrics.go",
+ ],
+ importpath = "source.monogon.dev/metropolis/node/core/metrics",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//metropolis/node",
+ "//metropolis/node/core/identity",
+ "//metropolis/pkg/logtree",
+ "//metropolis/pkg/supervisor",
+ ],
+)
+
+go_test(
+ name = "metrics_test",
+ srcs = ["metrics_test.go"],
+ data = [
+ # keep
+ "//metropolis/node/core/metrics/fake_exporter",
+ ],
+ embed = [":metrics"],
+ deps = [
+ "//metropolis/cli/pkg/datafile",
+ "//metropolis/node",
+ "//metropolis/node/core/rpc",
+ "//metropolis/pkg/supervisor",
+ "//metropolis/test/util",
+ ],
+)
diff --git a/metropolis/node/core/metrics/exporters.go b/metropolis/node/core/metrics/exporters.go
new file mode 100644
index 0000000..bef545c
--- /dev/null
+++ b/metropolis/node/core/metrics/exporters.go
@@ -0,0 +1,89 @@
+package metrics
+
+import (
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/url"
+
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/pkg/logtree"
+)
+
+// An Exporter is a Prometheus binary running under the Metrics service which
+// collects some metrics and exposes them on a locally bound TCP port.
+//
+// The Metrics Service will forward requests from /metrics/<name> to the
+// exporter.
+type Exporter struct {
+ // Name of the exporter, which becomes part of the metrics URL for this exporter.
+ Name string
+ // Port on which this exporter will be running.
+ Port node.Port
+ // Executable to run to start the exporter.
+ Executable string
+ // Arguments to start the exporter. The exporter should listen at 127.0.0.1 and
+ // the port specified by Port, and serve its metrics on /metrics.
+ Arguments []string
+}
+
+// DefaultExporters are the exporters which we run by default in Metropolis.
+var DefaultExporters = []Exporter{
+ {
+ Name: "node",
+ Port: node.MetricsNodeListenerPort,
+ Executable: "/metrics/bin/node_exporter",
+ Arguments: []string{
+ "--web.listen-address=127.0.0.1:" + node.MetricsNodeListenerPort.PortString(),
+ },
+ },
+}
+
+// forward a given HTTP request to this exporter.
+func (e *Exporter) forward(logger logtree.LeveledLogger, w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ outreq := r.Clone(ctx)
+
+ outreq.URL = &url.URL{
+ Scheme: "http",
+ Host: net.JoinHostPort("127.0.0.1", e.Port.PortString()),
+ Path: "/metrics",
+ }
+ logger.V(1).Infof("%s: forwarding %s to %s", r.RemoteAddr, r.URL.String(), outreq.URL.String())
+
+ if r.ContentLength == 0 {
+ outreq.Body = nil
+ }
+ if outreq.Body != nil {
+ defer outreq.Body.Close()
+ }
+ res, err := http.DefaultTransport.RoundTrip(outreq)
+ if err != nil {
+ logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, e.Name, err)
+ w.WriteHeader(502)
+ fmt.Fprintf(w, "could not reach exporter")
+ return
+ }
+
+ copyHeader(w.Header(), res.Header)
+ w.WriteHeader(res.StatusCode)
+
+ if _, err := io.Copy(w, res.Body); err != nil {
+ logger.Errorf("%s: copying response from %q failed: %v", r.RemoteAddr, e.Name, err)
+ return
+ }
+ res.Body.Close()
+}
+
+func copyHeader(dst, src http.Header) {
+ for k, vv := range src {
+ for _, v := range vv {
+ dst.Add(k, v)
+ }
+ }
+}
+
+func (e *Exporter) externalPath() string {
+ return "/metrics/" + e.Name
+}
diff --git a/metropolis/node/core/metrics/fake_exporter/BUILD.bazel b/metropolis/node/core/metrics/fake_exporter/BUILD.bazel
new file mode 100644
index 0000000..e606be2
--- /dev/null
+++ b/metropolis/node/core/metrics/fake_exporter/BUILD.bazel
@@ -0,0 +1,14 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+
+go_library(
+ name = "fake_exporter_lib",
+ srcs = ["fake_exporter.go"],
+ importpath = "source.monogon.dev/metropolis/node/core/metrics/fake_exporter",
+ visibility = ["//visibility:private"],
+)
+
+go_binary(
+ name = "fake_exporter",
+ embed = [":fake_exporter_lib"],
+ visibility = ["//visibility:public"],
+)
diff --git a/metropolis/node/core/metrics/fake_exporter/fake_exporter.go b/metropolis/node/core/metrics/fake_exporter/fake_exporter.go
new file mode 100644
index 0000000..919851e
--- /dev/null
+++ b/metropolis/node/core/metrics/fake_exporter/fake_exporter.go
@@ -0,0 +1,28 @@
+// fake_exporter is a tiny Prometheus-compatible metrics exporter which exports a
+// single metric with a value configured at startup. It is used to test the
+// metrics service.
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "net/http"
+)
+
+var (
+ flagListen string
+ flagValue int
+)
+
+func main() {
+ flag.StringVar(&flagListen, "listen", ":8080", "address to listen on")
+ flag.IntVar(&flagValue, "value", 1234, "value of 'test' metric to serve")
+ flag.Parse()
+
+ http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintf(w, "test %d\n", flagValue)
+ })
+ log.Printf("Listening on %s", flagListen)
+ http.ListenAndServe(flagListen, nil)
+}
diff --git a/metropolis/node/core/metrics/metrics.go b/metropolis/node/core/metrics/metrics.go
new file mode 100644
index 0000000..2f2da73
--- /dev/null
+++ b/metropolis/node/core/metrics/metrics.go
@@ -0,0 +1,143 @@
+package metrics
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "net"
+ "net/http"
+ "os/exec"
+
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// Service is the Metropolis Metrics Service.
+//
+// Currently, metrics means Prometheus metrics.
+//
+// It runs a forwarding proxy from a public HTTPS listener to a number of
+// locally-running exporters, themselves listening over HTTP. The listener uses
+// the main cluster CA and the node's main certificate, authenticating incoming
+// connections with the same CA.
+//
+// Each exporter is exposed on a separate path, /metrics/<name>, where <name> is
+// the name of the exporter.
+//
+// The HTTPS listener is bound to node.MetricsPort.
+type Service struct {
+ // Credentials used to run the TLS/HTTPS listener and verify incoming
+ // connections.
+ Credentials *identity.NodeCredentials
+ // List of Exporters to run and to forward HTTP requests to. If not set, defaults
+ // to DefaultExporters.
+ Exporters []Exporter
+
+ // enableDynamicAddr enables listening on a dynamically chosen TCP port. This is
+ // used by tests to make sure we don't fail due to the default port being already
+ // in use.
+ enableDynamicAddr bool
+ // dynamicAddr will contain the picked dynamic listen address after the service
+ // starts, if enableDynamicAddr is set.
+ dynamicAddr chan string
+}
+
+// listen starts the public TLS listener for the service.
+func (s *Service) listen() (net.Listener, error) {
+ cert := s.Credentials.TLSCredentials()
+
+ pool := x509.NewCertPool()
+ pool.AddCert(s.Credentials.ClusterCA())
+
+ tlsc := tls.Config{
+ Certificates: []tls.Certificate{
+ cert,
+ },
+ ClientAuth: tls.RequireAndVerifyClientCert,
+ ClientCAs: pool,
+ // TODO(q3k): use VerifyPeerCertificate/VerifyConnection to check that the
+ // incoming client is allowed to access metrics. Currently we allow
+ // anyone/anything with a valid cluster certificate to access them.
+ }
+
+ addr := net.JoinHostPort("", node.MetricsPort.PortString())
+ if s.enableDynamicAddr {
+ addr = ""
+ }
+ return tls.Listen("tcp", addr, &tlsc)
+}
+
+func (s *Service) Run(ctx context.Context) error {
+ lis, err := s.listen()
+ if err != nil {
+ return fmt.Errorf("listen failed: %w", err)
+ }
+ if s.enableDynamicAddr {
+ s.dynamicAddr <- lis.Addr().String()
+ }
+
+ if s.Exporters == nil {
+ s.Exporters = DefaultExporters
+ }
+
+ // First, make sure we don't have duplicate exporters.
+ seenNames := make(map[string]bool)
+ for _, exporter := range s.Exporters {
+ if seenNames[exporter.Name] {
+ return fmt.Errorf("duplicate exporter name: %q", exporter.Name)
+ }
+ seenNames[exporter.Name] = true
+ }
+
+ // Start all exporters as sub-runnables.
+ for _, exporter := range s.Exporters {
+ cmd := exec.CommandContext(ctx, exporter.Executable, exporter.Arguments...)
+ err := supervisor.Run(ctx, exporter.Name, func(ctx context.Context) error {
+ return supervisor.RunCommand(ctx, cmd)
+ })
+ if err != nil {
+ return fmt.Errorf("running %s failed: %w", exporter.Name, err)
+ }
+
+ }
+
+ // And register all exporter forwarding functions on a mux.
+ mux := http.NewServeMux()
+ logger := supervisor.Logger(ctx)
+ for _, exporter := range s.Exporters {
+ exporter := exporter
+
+ mux.HandleFunc(exporter.externalPath(), func(w http.ResponseWriter, r *http.Request) {
+ exporter.forward(logger, w, r)
+ })
+
+ logger.Infof("Registered exporter %q", exporter.Name)
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ // Start forwarding server.
+ srv := http.Server{
+ Handler: mux,
+ BaseContext: func(_ net.Listener) context.Context {
+ return ctx
+ },
+ }
+
+ go func() {
+ <-ctx.Done()
+ srv.Close()
+ }()
+
+ err = srv.Serve(lis)
+ if err != nil && ctx.Err() != nil {
+ return ctx.Err()
+ }
+ return fmt.Errorf("Serve: %w", err)
+}
+
+type sdTarget struct {
+ Targets []string `json:"target"`
+}
diff --git a/metropolis/node/core/metrics/metrics_test.go b/metropolis/node/core/metrics/metrics_test.go
new file mode 100644
index 0000000..5452a90
--- /dev/null
+++ b/metropolis/node/core/metrics/metrics_test.go
@@ -0,0 +1,123 @@
+package metrics
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "strings"
+ "testing"
+ "time"
+
+ "source.monogon.dev/metropolis/cli/pkg/datafile"
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ "source.monogon.dev/metropolis/test/util"
+)
+
+// TestMetricsForwarder exercises the metrics forwarding functionality of the
+// metrics service. That is, it makes sure that the service starts some fake
+// exporters and then forwards HTTP traffic to them.
+func TestMetricsForwarder(t *testing.T) {
+ path, _ := datafile.ResolveRunfile("metropolis/node/core/metrics/fake_exporter/fake_exporter_/fake_exporter")
+
+ exporters := []Exporter{
+ {
+ Name: "test1",
+ Port: node.Port(8081),
+ Executable: path,
+ Arguments: []string{
+ "-listen", "127.0.0.1:8081",
+ "-value", "100",
+ },
+ },
+ {
+ Name: "test2",
+ Port: node.Port(8082),
+ Executable: path,
+ Arguments: []string{
+ "-listen", "127.0.0.1:8082",
+ "-value", "200",
+ },
+ },
+ }
+
+ eph := rpc.NewEphemeralClusterCredentials(t, 1)
+
+ svc := Service{
+ Credentials: eph.Nodes[0],
+ Exporters: exporters,
+
+ enableDynamicAddr: true,
+ dynamicAddr: make(chan string),
+ }
+
+ supervisor.TestHarness(t, svc.Run)
+ addr := <-svc.dynamicAddr
+
+ pool := x509.NewCertPool()
+ pool.AddCert(eph.CA)
+
+ cl := http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ ServerName: eph.Nodes[0].ID(),
+ RootCAs: pool,
+
+ Certificates: []tls.Certificate{eph.Manager},
+ },
+ },
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ util.TestEventual(t, "retrieve-test1", ctx, 10*time.Second, func(ctx context.Context) error {
+ url := (&url.URL{
+ Scheme: "https",
+ Host: addr,
+ Path: "/metrics/test1",
+ }).String()
+ req, _ := http.NewRequest("GET", url, nil)
+ res, err := cl.Do(req)
+ if err != nil {
+ return fmt.Errorf("Get(%q): %v", url, err)
+ }
+ defer res.Body.Close()
+ if res.StatusCode != 200 {
+ return fmt.Errorf("Get(%q): code %d", url, res.StatusCode)
+ }
+ body, _ := io.ReadAll(res.Body)
+ want := "test 100"
+ if !strings.Contains(string(body), want) {
+ return util.Permanent(fmt.Errorf("did not find expected value %q in %q", want, string(body)))
+ }
+ return nil
+ })
+ util.TestEventual(t, "retrieve-test2", ctx, 10*time.Second, func(ctx context.Context) error {
+ url := (&url.URL{
+ Scheme: "https",
+ Host: addr,
+ Path: "/metrics/test2",
+ }).String()
+ req, _ := http.NewRequest("GET", url, nil)
+ res, err := cl.Do(req)
+ if err != nil {
+ return fmt.Errorf("Get(%q): %v", url, err)
+ }
+ defer res.Body.Close()
+ if res.StatusCode != 200 {
+ return fmt.Errorf("Get(%q): code %d", url, res.StatusCode)
+ }
+ body, _ := io.ReadAll(res.Body)
+ want := "test 200"
+ if !strings.Contains(string(body), want) {
+ return util.Permanent(fmt.Errorf("did not find expected value %q in %q", want, string(body)))
+ }
+ return nil
+ })
+}
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index ab84395..3e68818 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -10,6 +10,7 @@
"worker_heartbeat.go",
"worker_hostsfile.go",
"worker_kubernetes.go",
+ "worker_metrics.go",
"worker_nodemgmt.go",
"worker_rolefetch.go",
"worker_statuspush.go",
@@ -24,6 +25,7 @@
"//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/identity",
"//metropolis/node/core/localstorage",
+ "//metropolis/node/core/metrics",
"//metropolis/node/core/mgmt",
"//metropolis/node/core/network",
"//metropolis/node/core/network/hostsfile",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 68b9f59..f0fa273 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -93,6 +93,7 @@
nodeMgmt *workerNodeMgmt
clusternet *workerClusternet
hostsfile *workerHostsfile
+ metrics *workerMetrics
}
// New creates a Role Server services from a Config.
@@ -163,6 +164,10 @@
clusterDirectorySaved: &s.clusterDirectorySaved,
}
+ s.metrics = &workerMetrics{
+ curatorConnection: &s.CuratorConnection,
+ }
+
return s
}
@@ -232,6 +237,7 @@
supervisor.Run(ctx, "nodemgmt", s.nodeMgmt.run)
supervisor.Run(ctx, "clusternet", s.clusternet.run)
supervisor.Run(ctx, "hostsfile", s.hostsfile.run)
+ supervisor.Run(ctx, "metrics", s.metrics.run)
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
diff --git a/metropolis/node/core/roleserve/worker_metrics.go b/metropolis/node/core/roleserve/worker_metrics.go
new file mode 100644
index 0000000..78c62d6
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_metrics.go
@@ -0,0 +1,34 @@
+package roleserve
+
+import (
+ "context"
+
+ "source.monogon.dev/metropolis/node/core/metrics"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// workerMetrics runs the Metrics Service, which runs local Prometheus collectors
+// (themselves usually instances of existing Prometheus Exporters running as
+// sub-processes), and a forwarding service that lets external users access them
+// over HTTPS using the Cluster CA.
+type workerMetrics struct {
+ curatorConnection *memory.Value[*curatorConnection]
+}
+
+func (s *workerMetrics) run(ctx context.Context) error {
+ w := s.curatorConnection.Watch()
+ defer w.Close()
+
+ supervisor.Logger(ctx).Infof("Waiting for curator connection")
+ cc, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+ supervisor.Logger(ctx).Infof("Got curator connection, starting...")
+
+ svc := metrics.Service{
+ Credentials: cc.credentials,
+ }
+ return svc.Run(ctx)
+}
diff --git a/metropolis/node/ports.go b/metropolis/node/ports.go
index 440f127..50e9e9a 100644
--- a/metropolis/node/ports.go
+++ b/metropolis/node/ports.go
@@ -37,6 +37,14 @@
// NodeManagement is the TCP port on which the node-local management service
// serves gRPC traffic for NodeManagement.
NodeManagement Port = 7839
+ // MetricsPort is the TCP port on which the Metrics Service exports
+ // Prometheus-compatible metrics for this node, secured using TLS and the
+ // Cluster/Node certificates.
+ MetricsPort Port = 7840
+ // MetricsNodeListenerPort is the TCP port on which the Prometheus node_exporter
+ // runs, bound to 127.0.0.1. The Metrics Service proxies traffic to it from the
+ // public MetricsPort.
+ MetricsNodeListenerPort Port = 7841
// KubernetesAPIPort is the TCP port on which the Kubernetes API is
// exposed.
KubernetesAPIPort Port = 6443