metropolis: implement Metrics Service

This is the first pass at a Metrics Service. It currently consists of an
HTTP reverse proxy which authenticates incoming connections using the
Cluster CA and certificates, and passes these connections over to a
locally running node_exporter.

In the future more exporters will be added, and we will likely also run
our own exporter for Metropolis-specific metrics.

Change-Id: Ibab52aa303965dd7d975f5035f411d1c56ad73e6
Reviewed-on: https://review.monogon.dev/c/monogon/+/1816
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/node/core/metrics/BUILD.bazel b/metropolis/node/core/metrics/BUILD.bazel
new file mode 100644
index 0000000..da69068
--- /dev/null
+++ b/metropolis/node/core/metrics/BUILD.bazel
@@ -0,0 +1,34 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+    name = "metrics",
+    srcs = [
+        "exporters.go",
+        "metrics.go",
+    ],
+    importpath = "source.monogon.dev/metropolis/node/core/metrics",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//metropolis/node",
+        "//metropolis/node/core/identity",
+        "//metropolis/pkg/logtree",
+        "//metropolis/pkg/supervisor",
+    ],
+)
+
+go_test(
+    name = "metrics_test",
+    srcs = ["metrics_test.go"],
+    data = [
+        # keep
+        "//metropolis/node/core/metrics/fake_exporter",
+    ],
+    embed = [":metrics"],
+    deps = [
+        "//metropolis/cli/pkg/datafile",
+        "//metropolis/node",
+        "//metropolis/node/core/rpc",
+        "//metropolis/pkg/supervisor",
+        "//metropolis/test/util",
+    ],
+)
diff --git a/metropolis/node/core/metrics/exporters.go b/metropolis/node/core/metrics/exporters.go
new file mode 100644
index 0000000..bef545c
--- /dev/null
+++ b/metropolis/node/core/metrics/exporters.go
@@ -0,0 +1,89 @@
+package metrics
+
+import (
+	"fmt"
+	"io"
+	"net"
+	"net/http"
+	"net/url"
+
+	"source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/pkg/logtree"
+)
+
+// An Exporter is a Prometheus binary running under the Metrics service which
+// collects some metrics and exposes them on a locally bound TCP port.
+//
+// The Metrics Service will forward requests from /metrics/<name> to the
+// exporter.
+type Exporter struct {
+	// Name of the exporter, which becomes part of the metrics URL for this exporter.
+	Name string
+	// Port on which this exporter will be running.
+	Port node.Port
+	// Executable to run to start the exporter.
+	Executable string
+	// Arguments to start the exporter. The exporter should listen at 127.0.0.1 and
+	// the port specified by Port, and serve its metrics on /metrics.
+	Arguments []string
+}
+
+// DefaultExporters are the exporters which we run by default in Metropolis.
+var DefaultExporters = []Exporter{
+	{
+		Name:       "node",
+		Port:       node.MetricsNodeListenerPort,
+		Executable: "/metrics/bin/node_exporter",
+		Arguments: []string{
+			"--web.listen-address=127.0.0.1:" + node.MetricsNodeListenerPort.PortString(),
+		},
+	},
+}
+
+// forward a given HTTP request to this exporter.
+func (e *Exporter) forward(logger logtree.LeveledLogger, w http.ResponseWriter, r *http.Request) {
+	ctx := r.Context()
+	outreq := r.Clone(ctx)
+
+	outreq.URL = &url.URL{
+		Scheme: "http",
+		Host:   net.JoinHostPort("127.0.0.1", e.Port.PortString()),
+		Path:   "/metrics",
+	}
+	logger.V(1).Infof("%s: forwarding %s to %s", r.RemoteAddr, r.URL.String(), outreq.URL.String())
+
+	if r.ContentLength == 0 {
+		outreq.Body = nil
+	}
+	if outreq.Body != nil {
+		defer outreq.Body.Close()
+	}
+	res, err := http.DefaultTransport.RoundTrip(outreq)
+	if err != nil {
+		logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, e.Name, err)
+		w.WriteHeader(502)
+		fmt.Fprintf(w, "could not reach exporter")
+		return
+	}
+
+	copyHeader(w.Header(), res.Header)
+	w.WriteHeader(res.StatusCode)
+
+	if _, err := io.Copy(w, res.Body); err != nil {
+		logger.Errorf("%s: copying response from %q failed: %v", r.RemoteAddr, e.Name, err)
+		return
+	}
+	res.Body.Close()
+}
+
+func copyHeader(dst, src http.Header) {
+	for k, vv := range src {
+		for _, v := range vv {
+			dst.Add(k, v)
+		}
+	}
+}
+
+func (e *Exporter) externalPath() string {
+	return "/metrics/" + e.Name
+}
diff --git a/metropolis/node/core/metrics/fake_exporter/BUILD.bazel b/metropolis/node/core/metrics/fake_exporter/BUILD.bazel
new file mode 100644
index 0000000..e606be2
--- /dev/null
+++ b/metropolis/node/core/metrics/fake_exporter/BUILD.bazel
@@ -0,0 +1,14 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+
+go_library(
+    name = "fake_exporter_lib",
+    srcs = ["fake_exporter.go"],
+    importpath = "source.monogon.dev/metropolis/node/core/metrics/fake_exporter",
+    visibility = ["//visibility:private"],
+)
+
+go_binary(
+    name = "fake_exporter",
+    embed = [":fake_exporter_lib"],
+    visibility = ["//visibility:public"],
+)
diff --git a/metropolis/node/core/metrics/fake_exporter/fake_exporter.go b/metropolis/node/core/metrics/fake_exporter/fake_exporter.go
new file mode 100644
index 0000000..919851e
--- /dev/null
+++ b/metropolis/node/core/metrics/fake_exporter/fake_exporter.go
@@ -0,0 +1,28 @@
+// fake_exporter is a tiny Prometheus-compatible metrics exporter which exports a
+// single metric with a value configured at startup. It is used to test the
+// metrics service.
+package main
+
+import (
+	"flag"
+	"fmt"
+	"log"
+	"net/http"
+)
+
+var (
+	flagListen string
+	flagValue  int
+)
+
+func main() {
+	flag.StringVar(&flagListen, "listen", ":8080", "address to listen on")
+	flag.IntVar(&flagValue, "value", 1234, "value of 'test' metric to serve")
+	flag.Parse()
+
+	http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
+		fmt.Fprintf(w, "test %d\n", flagValue)
+	})
+	log.Printf("Listening on %s", flagListen)
+	http.ListenAndServe(flagListen, nil)
+}
diff --git a/metropolis/node/core/metrics/metrics.go b/metropolis/node/core/metrics/metrics.go
new file mode 100644
index 0000000..2f2da73
--- /dev/null
+++ b/metropolis/node/core/metrics/metrics.go
@@ -0,0 +1,143 @@
+package metrics
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"fmt"
+	"net"
+	"net/http"
+	"os/exec"
+
+	"source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// Service is the Metropolis Metrics Service.
+//
+// Currently, metrics means Prometheus metrics.
+//
+// It runs a forwarding proxy from a public HTTPS listener to a number of
+// locally-running exporters, themselves listening over HTTP. The listener uses
+// the main cluster CA and the node's main certificate, authenticating incoming
+// connections with the same CA.
+//
+// Each exporter is exposed on a separate path, /metrics/<name>, where <name> is
+// the name of the exporter.
+//
+// The HTTPS listener is bound to node.MetricsPort.
+type Service struct {
+	// Credentials used to run the TLS/HTTPS listener and verify incoming
+	// connections.
+	Credentials *identity.NodeCredentials
+	// List of Exporters to run and to forward HTTP requests to. If not set, defaults
+	// to DefaultExporters.
+	Exporters []Exporter
+
+	// enableDynamicAddr enables listening on a dynamically chosen TCP port. This is
+	// used by tests to make sure we don't fail due to the default port being already
+	// in use.
+	enableDynamicAddr bool
+	// dynamicAddr will contain the picked dynamic listen address after the service
+	// starts, if enableDynamicAddr is set.
+	dynamicAddr chan string
+}
+
+// listen starts the public TLS listener for the service.
+func (s *Service) listen() (net.Listener, error) {
+	cert := s.Credentials.TLSCredentials()
+
+	pool := x509.NewCertPool()
+	pool.AddCert(s.Credentials.ClusterCA())
+
+	tlsc := tls.Config{
+		Certificates: []tls.Certificate{
+			cert,
+		},
+		ClientAuth: tls.RequireAndVerifyClientCert,
+		ClientCAs:  pool,
+		// TODO(q3k): use VerifyPeerCertificate/VerifyConnection to check that the
+		// incoming client is allowed to access metrics. Currently we allow
+		// anyone/anything with a valid cluster certificate to access them.
+	}
+
+	addr := net.JoinHostPort("", node.MetricsPort.PortString())
+	if s.enableDynamicAddr {
+		addr = ""
+	}
+	return tls.Listen("tcp", addr, &tlsc)
+}
+
+func (s *Service) Run(ctx context.Context) error {
+	lis, err := s.listen()
+	if err != nil {
+		return fmt.Errorf("listen failed: %w", err)
+	}
+	if s.enableDynamicAddr {
+		s.dynamicAddr <- lis.Addr().String()
+	}
+
+	if s.Exporters == nil {
+		s.Exporters = DefaultExporters
+	}
+
+	// First, make sure we don't have duplicate exporters.
+	seenNames := make(map[string]bool)
+	for _, exporter := range s.Exporters {
+		if seenNames[exporter.Name] {
+			return fmt.Errorf("duplicate exporter name: %q", exporter.Name)
+		}
+		seenNames[exporter.Name] = true
+	}
+
+	// Start all exporters as sub-runnables.
+	for _, exporter := range s.Exporters {
+		cmd := exec.CommandContext(ctx, exporter.Executable, exporter.Arguments...)
+		err := supervisor.Run(ctx, exporter.Name, func(ctx context.Context) error {
+			return supervisor.RunCommand(ctx, cmd)
+		})
+		if err != nil {
+			return fmt.Errorf("running %s failed: %w", exporter.Name, err)
+		}
+
+	}
+
+	// And register all exporter forwarding functions on a mux.
+	mux := http.NewServeMux()
+	logger := supervisor.Logger(ctx)
+	for _, exporter := range s.Exporters {
+		exporter := exporter
+
+		mux.HandleFunc(exporter.externalPath(), func(w http.ResponseWriter, r *http.Request) {
+			exporter.forward(logger, w, r)
+		})
+
+		logger.Infof("Registered exporter %q", exporter.Name)
+	}
+
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+	// Start forwarding server.
+	srv := http.Server{
+		Handler: mux,
+		BaseContext: func(_ net.Listener) context.Context {
+			return ctx
+		},
+	}
+
+	go func() {
+		<-ctx.Done()
+		srv.Close()
+	}()
+
+	err = srv.Serve(lis)
+	if err != nil && ctx.Err() != nil {
+		return ctx.Err()
+	}
+	return fmt.Errorf("Serve: %w", err)
+}
+
+type sdTarget struct {
+	Targets []string `json:"target"`
+}
diff --git a/metropolis/node/core/metrics/metrics_test.go b/metropolis/node/core/metrics/metrics_test.go
new file mode 100644
index 0000000..5452a90
--- /dev/null
+++ b/metropolis/node/core/metrics/metrics_test.go
@@ -0,0 +1,123 @@
+package metrics
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"fmt"
+	"io"
+	"net/http"
+	"net/url"
+	"strings"
+	"testing"
+	"time"
+
+	"source.monogon.dev/metropolis/cli/pkg/datafile"
+	"source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+	"source.monogon.dev/metropolis/test/util"
+)
+
+// TestMetricsForwarder exercises the metrics forwarding functionality of the
+// metrics service. That is, it makes sure that the service starts some fake
+// exporters and then forwards HTTP traffic to them.
+func TestMetricsForwarder(t *testing.T) {
+	path, _ := datafile.ResolveRunfile("metropolis/node/core/metrics/fake_exporter/fake_exporter_/fake_exporter")
+
+	exporters := []Exporter{
+		{
+			Name:       "test1",
+			Port:       node.Port(8081),
+			Executable: path,
+			Arguments: []string{
+				"-listen", "127.0.0.1:8081",
+				"-value", "100",
+			},
+		},
+		{
+			Name:       "test2",
+			Port:       node.Port(8082),
+			Executable: path,
+			Arguments: []string{
+				"-listen", "127.0.0.1:8082",
+				"-value", "200",
+			},
+		},
+	}
+
+	eph := rpc.NewEphemeralClusterCredentials(t, 1)
+
+	svc := Service{
+		Credentials: eph.Nodes[0],
+		Exporters:   exporters,
+
+		enableDynamicAddr: true,
+		dynamicAddr:       make(chan string),
+	}
+
+	supervisor.TestHarness(t, svc.Run)
+	addr := <-svc.dynamicAddr
+
+	pool := x509.NewCertPool()
+	pool.AddCert(eph.CA)
+
+	cl := http.Client{
+		Transport: &http.Transport{
+			TLSClientConfig: &tls.Config{
+				ServerName: eph.Nodes[0].ID(),
+				RootCAs:    pool,
+
+				Certificates: []tls.Certificate{eph.Manager},
+			},
+		},
+	}
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	util.TestEventual(t, "retrieve-test1", ctx, 10*time.Second, func(ctx context.Context) error {
+		url := (&url.URL{
+			Scheme: "https",
+			Host:   addr,
+			Path:   "/metrics/test1",
+		}).String()
+		req, _ := http.NewRequest("GET", url, nil)
+		res, err := cl.Do(req)
+		if err != nil {
+			return fmt.Errorf("Get(%q): %v", url, err)
+		}
+		defer res.Body.Close()
+		if res.StatusCode != 200 {
+			return fmt.Errorf("Get(%q): code %d", url, res.StatusCode)
+		}
+		body, _ := io.ReadAll(res.Body)
+		want := "test 100"
+		if !strings.Contains(string(body), want) {
+			return util.Permanent(fmt.Errorf("did not find expected value %q in %q", want, string(body)))
+		}
+		return nil
+	})
+	util.TestEventual(t, "retrieve-test2", ctx, 10*time.Second, func(ctx context.Context) error {
+		url := (&url.URL{
+			Scheme: "https",
+			Host:   addr,
+			Path:   "/metrics/test2",
+		}).String()
+		req, _ := http.NewRequest("GET", url, nil)
+		res, err := cl.Do(req)
+		if err != nil {
+			return fmt.Errorf("Get(%q): %v", url, err)
+		}
+		defer res.Body.Close()
+		if res.StatusCode != 200 {
+			return fmt.Errorf("Get(%q): code %d", url, res.StatusCode)
+		}
+		body, _ := io.ReadAll(res.Body)
+		want := "test 200"
+		if !strings.Contains(string(body), want) {
+			return util.Permanent(fmt.Errorf("did not find expected value %q in %q", want, string(body)))
+		}
+		return nil
+	})
+}
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index ab84395..3e68818 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -10,6 +10,7 @@
         "worker_heartbeat.go",
         "worker_hostsfile.go",
         "worker_kubernetes.go",
+        "worker_metrics.go",
         "worker_nodemgmt.go",
         "worker_rolefetch.go",
         "worker_statuspush.go",
@@ -24,6 +25,7 @@
         "//metropolis/node/core/curator/proto/api",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/localstorage",
+        "//metropolis/node/core/metrics",
         "//metropolis/node/core/mgmt",
         "//metropolis/node/core/network",
         "//metropolis/node/core/network/hostsfile",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 68b9f59..f0fa273 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -93,6 +93,7 @@
 	nodeMgmt     *workerNodeMgmt
 	clusternet   *workerClusternet
 	hostsfile    *workerHostsfile
+	metrics      *workerMetrics
 }
 
 // New creates a Role Server services from a Config.
@@ -163,6 +164,10 @@
 		clusterDirectorySaved: &s.clusterDirectorySaved,
 	}
 
+	s.metrics = &workerMetrics{
+		curatorConnection: &s.CuratorConnection,
+	}
+
 	return s
 }
 
@@ -232,6 +237,7 @@
 	supervisor.Run(ctx, "nodemgmt", s.nodeMgmt.run)
 	supervisor.Run(ctx, "clusternet", s.clusternet.run)
 	supervisor.Run(ctx, "hostsfile", s.hostsfile.run)
+	supervisor.Run(ctx, "metrics", s.metrics.run)
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
 	<-ctx.Done()
diff --git a/metropolis/node/core/roleserve/worker_metrics.go b/metropolis/node/core/roleserve/worker_metrics.go
new file mode 100644
index 0000000..78c62d6
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_metrics.go
@@ -0,0 +1,34 @@
+package roleserve
+
+import (
+	"context"
+
+	"source.monogon.dev/metropolis/node/core/metrics"
+	"source.monogon.dev/metropolis/pkg/event/memory"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// workerMetrics runs the Metrics Service, which runs local Prometheus collectors
+// (themselves usually instances of existing Prometheus Exporters running as
+// sub-processes), and a forwarding service that lets external users access them
+// over HTTPS using the Cluster CA.
+type workerMetrics struct {
+	curatorConnection *memory.Value[*curatorConnection]
+}
+
+func (s *workerMetrics) run(ctx context.Context) error {
+	w := s.curatorConnection.Watch()
+	defer w.Close()
+
+	supervisor.Logger(ctx).Infof("Waiting for curator connection")
+	cc, err := w.Get(ctx)
+	if err != nil {
+		return err
+	}
+	supervisor.Logger(ctx).Infof("Got curator connection, starting...")
+
+	svc := metrics.Service{
+		Credentials: cc.credentials,
+	}
+	return svc.Run(ctx)
+}
diff --git a/metropolis/node/ports.go b/metropolis/node/ports.go
index 440f127..50e9e9a 100644
--- a/metropolis/node/ports.go
+++ b/metropolis/node/ports.go
@@ -37,6 +37,14 @@
 	// NodeManagement is the TCP port on which the node-local management service
 	// serves gRPC traffic for NodeManagement.
 	NodeManagement Port = 7839
+	// MetricsPort is the TCP port on which the Metrics Service exports
+	// Prometheus-compatible metrics for this node, secured using TLS and the
+	// Cluster/Node certificates.
+	MetricsPort Port = 7840
+	// MetricsNodeListenerPort is the TCP port on which the Prometheus node_exporter
+	// runs, bound to 127.0.0.1. The Metrics Service proxies traffic to it from the
+	// public MetricsPort.
+	MetricsNodeListenerPort Port = 7841
 	// KubernetesAPIPort is the TCP port on which the Kubernetes API is
 	// exposed.
 	KubernetesAPIPort Port = 6443
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index 4a54d2f..1e51332 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -18,6 +18,8 @@
 
 import (
 	"context"
+	"crypto/tls"
+	"crypto/x509"
 	"errors"
 	"fmt"
 	"io"
@@ -25,6 +27,7 @@
 	"net/http"
 	_ "net/http"
 	_ "net/http/pprof"
+	"net/url"
 	"os"
 	"strings"
 	"testing"
@@ -357,6 +360,44 @@
 				}
 				return fmt.Errorf("job still running")
 			})
+			util.TestEventual(t, "Prometheus node metrics retrieved", ctx, smallTestTimeout, func(ctx context.Context) error {
+				pool := x509.NewCertPool()
+				pool.AddCert(cluster.CACertificate)
+				cl := http.Client{
+					Transport: &http.Transport{
+						TLSClientConfig: &tls.Config{
+							Certificates: []tls.Certificate{cluster.Owner},
+							RootCAs:      pool,
+						},
+						DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
+							return cluster.DialNode(ctx, addr)
+						},
+					},
+				}
+				u := url.URL{
+					Scheme: "https",
+					Host:   net.JoinHostPort(cluster.NodeIDs[0], common.MetricsPort.PortString()),
+					Path:   "/metrics/node",
+				}
+				res, err := cl.Get(u.String())
+				if err != nil {
+					return err
+				}
+				defer res.Body.Close()
+				if res.StatusCode != 200 {
+					return fmt.Errorf("status code %d", res.StatusCode)
+				}
+
+				body, err := io.ReadAll(res.Body)
+				if err != nil {
+					return err
+				}
+				needle := "node_uname_info"
+				if !strings.Contains(string(body), needle) {
+					return util.Permanent(fmt.Errorf("could not find %q in returned response", needle))
+				}
+				return nil
+			})
 			if os.Getenv("HAVE_NESTED_KVM") != "" {
 				util.TestEventual(t, "Pod for KVM/QEMU smoke test", ctx, smallTestTimeout, func(ctx context.Context) error {
 					runcRuntimeClass := "runc"
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index 0efd08b..615a9cc 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -10,6 +10,7 @@
 	"crypto/ed25519"
 	"crypto/rand"
 	"crypto/tls"
+	"crypto/x509"
 	"errors"
 	"fmt"
 	"io"
@@ -504,6 +505,9 @@
 	// creation.
 	NodeIDs []string
 
+	// CACertificate is the cluster's CA certificate.
+	CACertificate *x509.Certificate
+
 	// nodesDone is a list of channels populated with the return codes from all the
 	// nodes' qemu instances. It's used by Close to ensure all nodes have
 	// successfully been stopped.
@@ -829,6 +833,12 @@
 		ctxC()
 		return nil, fmt.Errorf("GetClusterInfo: %w", err)
 	}
+	caCert, err := x509.ParseCertificate(resI.CaCertificate)
+	if err != nil {
+		ctxC()
+		return nil, fmt.Errorf("ParseCertificate: %w", err)
+	}
+	cluster.CACertificate = caCert
 
 	// Use the retrieved information to configure the rest of the node options.
 	for i := 1; i < opts.NumNodes; i++ {