metropolis/node/core/metrics: fixup metrics authentication

Change-Id: I67643855ab61bfdea980211ffe01e50c2409882b
Reviewed-on: https://review.monogon.dev/c/monogon/+/1979
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/metrics/BUILD.bazel b/metropolis/node/core/metrics/BUILD.bazel
index f698f0e..88cdc1b 100644
--- a/metropolis/node/core/metrics/BUILD.bazel
+++ b/metropolis/node/core/metrics/BUILD.bazel
@@ -3,6 +3,7 @@
 go_library(
     name = "metrics",
     srcs = [
+        "discovery.go",
         "exporters.go",
         "metrics.go",
     ],
@@ -12,11 +13,7 @@
         "//metropolis/node",
         "//metropolis/node/core/curator/proto/api",
         "//metropolis/node/core/identity",
-        "//metropolis/pkg/event/memory",
-        "//metropolis/pkg/logtree",
         "//metropolis/pkg/supervisor",
-        "//metropolis/proto/common",
-        "@io_k8s_kubernetes//cmd/kubeadm/app/constants",
     ],
 )
 
@@ -32,9 +29,8 @@
         "//metropolis/cli/pkg/datafile",
         "//metropolis/node",
         "//metropolis/node/core/curator/proto/api",
-        "//metropolis/pkg/event/memory",
+        "//metropolis/pkg/freeport",
         "//metropolis/pkg/supervisor",
-        "//metropolis/proto/common",
         "//metropolis/test/util",
         "@com_zx2c4_golang_wireguard_wgctrl//wgtypes",
     ],
diff --git a/metropolis/node/core/metrics/discovery.go b/metropolis/node/core/metrics/discovery.go
new file mode 100644
index 0000000..037d3b0
--- /dev/null
+++ b/metropolis/node/core/metrics/discovery.go
@@ -0,0 +1,129 @@
+package metrics
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"sync"
+
+	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+
+	"source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+type Discovery struct {
+	Curator ipb.CuratorClient
+
+	// sdResp will contain the cached sdResponse
+	sdResp sdResponse
+	// sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
+	sdRespMtx sync.RWMutex
+}
+
+type sdResponse []sdTarget
+
+type sdTarget struct {
+	Targets []string          `json:"targets"`
+	Labels  map[string]string `json:"labels"`
+}
+
+// Run is the sub-runnable responsible for fetching and serving node updates.
+func (s *Discovery) Run(ctx context.Context) error {
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+	srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
+		Kind: &apb.WatchRequest_NodesInCluster_{
+			NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+		},
+	})
+	if err != nil {
+		return fmt.Errorf("curator watch failed: %w", err)
+	}
+	defer srv.CloseSend()
+
+	defer func() {
+		s.sdRespMtx.Lock()
+		// disable the metrics endpoint until the new routine takes over
+		s.sdResp = nil
+		s.sdRespMtx.Unlock()
+	}()
+
+	nodes := make(map[string]*apb.Node)
+	for {
+		ev, err := srv.Recv()
+		if err != nil {
+			// The watcher wont return a properly wrapped error which confuses
+			// our testing harness. Lets just return the context error directly
+			// if it exists.
+			if ctx.Err() != nil {
+				return ctx.Err()
+			}
+
+			return fmt.Errorf("curator watch recv failed: %w", err)
+		}
+
+		for _, n := range ev.Nodes {
+			nodes[n.Id] = n
+		}
+
+		for _, t := range ev.NodeTombstones {
+			n, ok := nodes[t.NodeId]
+			if !ok {
+				// This is an indication of us losing data somehow. If this happens, it likely
+				// means a Curator bug.
+				supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
+				continue
+			}
+			delete(nodes, n.Id)
+		}
+
+		s.sdRespMtx.Lock()
+
+		s.sdResp = nil
+		for _, n := range nodes {
+			// Only care about nodes that have all required configuration set.
+			if n.Status == nil || n.Status.ExternalAddress == "" || n.Roles == nil {
+				continue
+			}
+
+			s.sdResp = append(s.sdResp, sdTarget{
+				Targets: []string{n.Status.ExternalAddress},
+				Labels: map[string]string{
+					"__meta_metropolis_role_kubernetes_worker":     fmt.Sprintf("%t", n.Roles.KubernetesWorker != nil),
+					"__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", n.Roles.KubernetesController != nil),
+					"__meta_metropolis_role_consensus_member":      fmt.Sprintf("%t", n.Roles.ConsensusMember != nil),
+				},
+			})
+		}
+
+		s.sdRespMtx.Unlock()
+	}
+}
+
+func (s *Discovery) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != http.MethodGet {
+		http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
+		return
+	}
+
+	s.sdRespMtx.RLock()
+	defer s.sdRespMtx.RUnlock()
+
+	// If sdResp is nil, which only happens if we are not a master node
+	// or we are still booting, we respond with NotImplemented.
+	if s.sdResp == nil {
+		w.WriteHeader(http.StatusServiceUnavailable)
+		return
+	}
+
+	w.Header().Set("Content-Type", "application/json")
+	w.WriteHeader(http.StatusOK)
+
+	if err := json.NewEncoder(w).Encode(s.sdResp); err != nil {
+		// If the encoder fails its mostly because of closed connections
+		// so lets just ignore these errors.
+		return
+	}
+}
diff --git a/metropolis/node/core/metrics/exporters.go b/metropolis/node/core/metrics/exporters.go
index 5949eb4..a70e0cb 100644
--- a/metropolis/node/core/metrics/exporters.go
+++ b/metropolis/node/core/metrics/exporters.go
@@ -1,17 +1,12 @@
 package metrics
 
 import (
-	"crypto/tls"
 	"fmt"
 	"io"
-	"net"
 	"net/http"
-	"net/url"
-
-	"k8s.io/kubernetes/cmd/kubeadm/app/constants"
 
 	"source.monogon.dev/metropolis/node"
-	"source.monogon.dev/metropolis/pkg/logtree"
+	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
 // An Exporter is a Prometheus binary running under the Metrics service which
@@ -24,10 +19,6 @@
 	Name string
 	// Port on which this exporter will be running.
 	Port node.Port
-	// ServerName used to verify the tls connection.
-	ServerName string
-	// TLSConfigFunc is used to configure tls authentication
-	TLSConfigFunc func(*Service, *Exporter) *tls.Config
 	// Executable to run to start the exporter.
 	Executable string
 	// Arguments to start the exporter. The exporter should listen at 127.0.0.1 and
@@ -36,7 +27,7 @@
 }
 
 // DefaultExporters are the exporters which we run by default in Metropolis.
-var DefaultExporters = []Exporter{
+var DefaultExporters = []*Exporter{
 	{
 		Name:       "node",
 		Port:       node.MetricsNodeListenerPort,
@@ -54,57 +45,43 @@
 		Port: node.MetricsEtcdListenerPort,
 	},
 	{
-		Name:          "kubernetes-scheduler",
-		Port:          constants.KubeSchedulerPort,
-		ServerName:    "kube-scheduler.local",
-		TLSConfigFunc: (*Service).kubeTLSConfig,
+		Name: "kubernetes-scheduler",
+		Port: node.MetricsKubeSchedulerListenerPort,
 	},
 	{
-		Name:          "kubernetes-controller-manager",
-		Port:          constants.KubeControllerManagerPort,
-		ServerName:    "kube-controller-manager.local",
-		TLSConfigFunc: (*Service).kubeTLSConfig,
+		Name: "kubernetes-controller-manager",
+		Port: node.MetricsKubeControllerManagerListenerPort,
 	},
 }
 
-func (s *Service) kubeTLSConfig(e *Exporter) *tls.Config {
-	c := s.KubeTLSConfig.Clone()
-	c.ServerName = e.ServerName
-	return c
-}
-
-// forward a given HTTP request to this exporter.
-func (e *Exporter) forward(s *Service, logger logtree.LeveledLogger, w http.ResponseWriter, r *http.Request) {
-	ctx := r.Context()
-	outreq := r.Clone(ctx)
-
-	outreq.URL = &url.URL{
-		Scheme: "http",
-		Host:   net.JoinHostPort("127.0.0.1", e.Port.PortString()),
-		Path:   "/metrics",
-	}
-
-	transport := http.DefaultTransport.(*http.Transport).Clone()
-	if e.TLSConfigFunc != nil {
-		outreq.URL.Scheme = "https"
-		transport.TLSClientConfig = e.TLSConfigFunc(s, e)
-	}
-	logger.V(1).Infof("%s: forwarding %s to %s", r.RemoteAddr, r.URL.String(), outreq.URL.String())
-
-	if r.ContentLength == 0 {
-		outreq.Body = nil
-	}
-	if outreq.Body != nil {
-		defer outreq.Body.Close()
-	}
-	res, err := transport.RoundTrip(outreq)
-	if err != nil {
-		logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, e.Name, err)
-		w.WriteHeader(502)
-		fmt.Fprintf(w, "could not reach exporter")
+func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != http.MethodGet {
+		http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
 		return
 	}
 
+	ctx := r.Context()
+
+	// We are supplying the http.Server with a BaseContext that contains the
+	// context from our runnable which contains the logger.
+	logger := supervisor.Logger(ctx)
+
+	url := "http://127.0.0.1:" + e.Port.PortString() + "/metrics"
+	outReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
+	if err != nil {
+		logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, e.Name, err)
+		http.Error(w, "internal server error", http.StatusInternalServerError)
+		return
+	}
+
+	res, err := http.DefaultTransport.RoundTrip(outReq)
+	if err != nil {
+		logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, e.Name, err)
+		http.Error(w, "could not reach exporter", http.StatusBadGateway)
+		return
+	}
+	defer res.Body.Close()
+
 	copyHeader(w.Header(), res.Header)
 	w.WriteHeader(res.StatusCode)
 
@@ -112,7 +89,6 @@
 		logger.Errorf("%s: copying response from %q failed: %v", r.RemoteAddr, e.Name, err)
 		return
 	}
-	res.Body.Close()
 }
 
 func copyHeader(dst, src http.Header) {
diff --git a/metropolis/node/core/metrics/metrics.go b/metropolis/node/core/metrics/metrics.go
index e087ada..4aa0872 100644
--- a/metropolis/node/core/metrics/metrics.go
+++ b/metropolis/node/core/metrics/metrics.go
@@ -4,20 +4,13 @@
 	"context"
 	"crypto/tls"
 	"crypto/x509"
-	"encoding/json"
 	"fmt"
 	"net"
 	"net/http"
 	"os/exec"
-	"sync"
-
-	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	cpb "source.monogon.dev/metropolis/proto/common"
 
 	"source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/identity"
-	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
@@ -38,19 +31,11 @@
 	// Credentials used to run the TLS/HTTPS listener and verify incoming
 	// connections.
 	Credentials *identity.NodeCredentials
-	// Curator is the gRPC client that the service will use to reach the cluster's
-	// Curator, for pulling a list of all nodes.
-	Curator ipb.CuratorClient
-	// LocalRoles contains the local node roles which gets listened on and
-	// is required to decide whether or not to start the discovery routine
-	LocalRoles *memory.Value[*cpb.NodeRoles]
-	// KubeTLSConfig provides the tls.Config for authenticating against kubernetes
-	// services.
-	KubeTLSConfig *tls.Config
+	Discovery   Discovery
 
 	// List of Exporters to run and to forward HTTP requests to. If not set, defaults
 	// to DefaultExporters.
-	Exporters []Exporter
+	Exporters []*Exporter
 	// enableDynamicAddr enables listening on a dynamically chosen TCP port. This is
 	// used by tests to make sure we don't fail due to the default port being already
 	// in use.
@@ -59,10 +44,6 @@
 	// dynamicAddr will contain the picked dynamic listen address after the service
 	// starts, if enableDynamicAddr is set.
 	dynamicAddr chan string
-	// sdResp will contain the cached sdResponse
-	sdResp sdResponse
-	// sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
-	sdRespMtx sync.RWMutex
 }
 
 // listen starts the public TLS listener for the service.
@@ -131,21 +112,14 @@
 	mux := http.NewServeMux()
 	logger := supervisor.Logger(ctx)
 	for _, exporter := range s.Exporters {
-		exporter := exporter
-
-		mux.HandleFunc(exporter.externalPath(), func(w http.ResponseWriter, r *http.Request) {
-			exporter.forward(s, logger, w, r)
-		})
+		mux.HandleFunc(exporter.externalPath(), exporter.ServeHTTP)
 
 		logger.Infof("Registered exporter %q", exporter.Name)
 	}
 
 	// And register a http_sd discovery endpoint.
-	mux.HandleFunc("/discovery", s.handleDiscovery)
+	mux.Handle("/discovery", &s.Discovery)
 
-	if err := supervisor.Run(ctx, "watch-roles", s.watchRoles); err != nil {
-		return err
-	}
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
 	// Start forwarding server.
@@ -167,130 +141,3 @@
 	}
 	return fmt.Errorf("Serve: %w", err)
 }
-
-func shouldStartDiscovery(nr *cpb.NodeRoles) bool {
-	return nr.ConsensusMember != nil
-}
-
-func (s *Service) watchRoles(ctx context.Context) error {
-	w := s.LocalRoles.Watch()
-	defer w.Close()
-
-	r, err := w.Get(ctx)
-	if err != nil {
-		return err
-	}
-
-	if shouldStartDiscovery(r) {
-		supervisor.Logger(ctx).Infof("Starting discovery endpoint")
-		if err := supervisor.Run(ctx, "watch", s.watch); err != nil {
-			return err
-		}
-	}
-
-	for {
-		nr, err := w.Get(ctx)
-		if err != nil {
-			return err
-		}
-
-		if shouldStartDiscovery(r) != shouldStartDiscovery(nr) {
-			s.sdRespMtx.Lock()
-			// disable the metrics endpoint until the new routine takes over
-			s.sdResp = nil
-			s.sdRespMtx.Unlock()
-
-			supervisor.Logger(ctx).Infof("Discovery endpoint config changed, restarting")
-			return fmt.Errorf("restarting")
-		}
-	}
-
-}
-
-// watch is the sub-runnable responsible for fetching node updates.
-func (s *Service) watch(ctx context.Context) error {
-	supervisor.Signal(ctx, supervisor.SignalHealthy)
-
-	srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
-		Kind: &apb.WatchRequest_NodesInCluster_{
-			NodesInCluster: &apb.WatchRequest_NodesInCluster{},
-		},
-	})
-	if err != nil {
-		return fmt.Errorf("curator watch failed: %w", err)
-	}
-	defer srv.CloseSend()
-
-	nodes := make(map[string]*apb.Node)
-	for {
-		ev, err := srv.Recv()
-		if err != nil {
-			return fmt.Errorf("curator watch recv failed: %w", err)
-		}
-
-		for _, n := range ev.Nodes {
-			nodes[n.Id] = n
-		}
-
-		for _, t := range ev.NodeTombstones {
-			n, ok := nodes[t.NodeId]
-			if !ok {
-				// This is an indication of us losing data somehow. If this happens, it likely
-				// means a Curator bug.
-				supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
-				continue
-			}
-			delete(nodes, n.Id)
-		}
-
-		s.sdRespMtx.Lock()
-
-		// reset the existing response slice
-		s.sdResp = s.sdResp[:0]
-		for _, n := range nodes {
-			// Only care about nodes that have all required configuration set.
-			if n.Status == nil || n.Status.ExternalAddress == "" || n.Roles == nil {
-				continue
-			}
-
-			s.sdResp = append(s.sdResp, sdTarget{
-				Targets: []string{n.Status.ExternalAddress},
-				Labels: map[string]string{
-					"__meta_metropolis_role_kubernetes_worker":     fmt.Sprintf("%t", n.Roles.KubernetesWorker != nil),
-					"__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", n.Roles.KubernetesController != nil),
-					"__meta_metropolis_role_consensus_member":      fmt.Sprintf("%t", n.Roles.ConsensusMember != nil),
-				},
-			})
-		}
-
-		s.sdRespMtx.Unlock()
-	}
-}
-
-func (s *Service) handleDiscovery(w http.ResponseWriter, _ *http.Request) {
-	s.sdRespMtx.RLock()
-	defer s.sdRespMtx.RUnlock()
-
-	// If sdResp is nil, which only happens if we are not a master node
-	// or we are still booting, we respond with NotImplemented.
-	if s.sdResp == nil {
-		w.WriteHeader(http.StatusNotImplemented)
-		return
-	}
-
-	w.Header().Set("Content-Type", "application/json")
-	w.WriteHeader(http.StatusOK)
-
-	if err := json.NewEncoder(w).Encode(s.sdResp); err != nil {
-		// If the encoder fails its mostly because of closed connections
-		// so lets just ignore these errors.
-		return
-	}
-}
-
-type sdResponse []sdTarget
-
-type sdTarget struct {
-	Targets []string          `json:"targets"`
-	Labels  map[string]string `json:"labels"`
-}
diff --git a/metropolis/node/core/metrics/metrics_test.go b/metropolis/node/core/metrics/metrics_test.go
index c583f3a..70762e1 100644
--- a/metropolis/node/core/metrics/metrics_test.go
+++ b/metropolis/node/core/metrics/metrics_test.go
@@ -15,40 +15,42 @@
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 
 	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	cpb "source.monogon.dev/metropolis/proto/common"
 
 	"source.monogon.dev/metropolis/cli/pkg/datafile"
 	"source.monogon.dev/metropolis/node"
-	"source.monogon.dev/metropolis/pkg/event/memory"
+	"source.monogon.dev/metropolis/pkg/freeport"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	"source.monogon.dev/metropolis/test/util"
 )
 
+func fakeExporter(name, value string) *Exporter {
+	path, _ := datafile.ResolveRunfile("metropolis/node/core/metrics/fake_exporter/fake_exporter_/fake_exporter")
+
+	p, closer, err := freeport.AllocateTCPPort()
+	if err != nil {
+		panic(err)
+	}
+	defer closer.Close()
+	port := node.Port(p)
+
+	return &Exporter{
+		Name:       name,
+		Port:       port,
+		Executable: path,
+		Arguments: []string{
+			"-listen", "127.0.0.1:" + port.PortString(),
+			"-value", value,
+		},
+	}
+}
+
 // TestMetricsForwarder exercises the metrics forwarding functionality of the
 // metrics service. That is, it makes sure that the service starts some fake
 // exporters and then forwards HTTP traffic to them.
 func TestMetricsForwarder(t *testing.T) {
-	path, _ := datafile.ResolveRunfile("metropolis/node/core/metrics/fake_exporter/fake_exporter_/fake_exporter")
-
-	exporters := []Exporter{
-		{
-			Name:       "test1",
-			Port:       node.Port(8081),
-			Executable: path,
-			Arguments: []string{
-				"-listen", "127.0.0.1:8081",
-				"-value", "100",
-			},
-		},
-		{
-			Name:       "test2",
-			Port:       node.Port(8082),
-			Executable: path,
-			Arguments: []string{
-				"-listen", "127.0.0.1:8082",
-				"-value", "200",
-			},
-		},
+	exporters := []*Exporter{
+		fakeExporter("test1", "100"),
+		fakeExporter("test2", "200"),
 	}
 
 	eph := util.NewEphemeralClusterCredentials(t, 1)
@@ -135,14 +137,30 @@
 
 	svc := Service{
 		Credentials:       eph.Nodes[0],
-		Curator:           apb.NewCuratorClient(ccl),
-		LocalRoles:        &memory.Value[*cpb.NodeRoles]{},
-		Exporters:         []Exporter{},
+		Discovery:         Discovery{Curator: apb.NewCuratorClient(ccl)},
 		enableDynamicAddr: true,
 		dynamicAddr:       make(chan string),
 	}
 
-	supervisor.TestHarness(t, svc.Run)
+	enableDiscovery := make(chan bool)
+	supervisor.TestHarness(t, func(ctx context.Context) error {
+		if err := supervisor.Run(ctx, "metrics", svc.Run); err != nil {
+			return err
+		}
+
+		err := supervisor.Run(ctx, "discovery", func(ctx context.Context) error {
+			<-enableDiscovery
+			return svc.Discovery.Run(ctx)
+		})
+		if err != nil {
+			return err
+		}
+
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		supervisor.Signal(ctx, supervisor.SignalDone)
+		return nil
+	})
+
 	addr := <-svc.dynamicAddr
 
 	pool := x509.NewCertPool()
@@ -174,15 +192,14 @@
 			return fmt.Errorf("Get(%q): %v", url, err)
 		}
 		defer res.Body.Close()
-		if res.StatusCode != http.StatusNotImplemented {
+		if res.StatusCode != http.StatusServiceUnavailable {
 			return fmt.Errorf("Get(%q): code %d", url, res.StatusCode)
 		}
 		return nil
 	})
 
-	// First set the local roles to be a consensus member which starts a watcher,
-	// create a fake node after that
-	svc.LocalRoles.Set(&cpb.NodeRoles{ConsensusMember: &cpb.NodeRoles_ConsensusMember{}})
+	// First start the watcher, create a fake node after that
+	enableDiscovery <- true
 	curator.NodeWithPrefixes(wgtypes.Key{}, "metropolis-fake-1", "1.2.3.4")
 
 	util.TestEventual(t, "active-discovery", ctx, 10*time.Second, func(ctx context.Context) error {
diff --git a/metropolis/node/core/roleserve/worker_metrics.go b/metropolis/node/core/roleserve/worker_metrics.go
index d0c89a4..26ec940 100644
--- a/metropolis/node/core/roleserve/worker_metrics.go
+++ b/metropolis/node/core/roleserve/worker_metrics.go
@@ -2,18 +2,14 @@
 
 import (
 	"context"
-	"crypto/tls"
-	"crypto/x509"
 	"fmt"
 
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	cpb "source.monogon.dev/metropolis/proto/common"
 
 	"source.monogon.dev/metropolis/node/core/metrics"
-	kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
-
-	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 )
 
 // workerMetrics runs the Metrics Service, which runs local Prometheus collectors
@@ -37,53 +33,47 @@
 	}
 	supervisor.Logger(ctx).Infof("Got curator connection, starting...")
 
-	lw := s.localControlplane.Watch()
-	defer lw.Close()
-	cp, err := lw.Get(ctx)
-	if err != nil {
-		return err
-	}
-
-	pki, err := kpki.FromLocalConsensus(ctx, cp.consensus)
-	if err != nil {
-		return err
-	}
-
-	// TODO(q3k): move this to IssueCertificates and replace with dedicated certificate
-	cert, key, err := pki.Certificate(ctx, kpki.Master)
-	if err != nil {
-		return fmt.Errorf("could not load certificate %q from PKI: %w", kpki.Master, err)
-	}
-	parsedKey, err := x509.ParsePKCS8PrivateKey(key)
-	if err != nil {
-		return fmt.Errorf("failed to parse key for cert %q: %w", kpki.Master, err)
-	}
-
-	caCert, _, err := pki.Certificate(ctx, kpki.IdCA)
-	if err != nil {
-		return fmt.Errorf("could not load certificate %q from PKI: %w", kpki.IdCA, err)
-	}
-	parsedCACert, err := x509.ParseCertificate(caCert)
-	if err != nil {
-		return fmt.Errorf("failed to parse cert %q: %w", kpki.IdCA, err)
-	}
-
-	rootCAs := x509.NewCertPool()
-	rootCAs.AddCert(parsedCACert)
-
-	kubeTLSConfig := &tls.Config{
-		RootCAs: rootCAs,
-		Certificates: []tls.Certificate{{
-			Certificate: [][]byte{cert},
-			PrivateKey:  parsedKey,
-		}},
-	}
-
 	svc := metrics.Service{
-		Credentials:   cc.credentials,
-		Curator:       ipb.NewCuratorClient(cc.conn),
-		LocalRoles:    s.localRoles,
-		KubeTLSConfig: kubeTLSConfig,
+		Credentials: cc.credentials,
+		Discovery: metrics.Discovery{
+			Curator: ipb.NewCuratorClient(cc.conn),
+		},
 	}
+
+	err = supervisor.Run(ctx, "watch-consensus", func(ctx context.Context) error {
+		isConsensusMember := func(roles *cpb.NodeRoles) bool {
+			return roles.ConsensusMember != nil
+		}
+
+		w := s.localRoles.Watch()
+		defer w.Close()
+
+		r, err := w.Get(ctx)
+		if err != nil {
+			return err
+		}
+
+		if isConsensusMember(r) {
+			if err := supervisor.Run(ctx, "discovery", svc.Discovery.Run); err != nil {
+				return err
+			}
+		}
+
+		for {
+			nr, err := w.Get(ctx)
+			if err != nil {
+				return err
+			}
+
+			changed := isConsensusMember(r) != isConsensusMember(nr)
+			if changed {
+				return fmt.Errorf("restarting")
+			}
+		}
+	})
+	if err != nil {
+		return err
+	}
+
 	return svc.Run(ctx)
 }
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index f888a57..a88b4b3 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -26,6 +26,7 @@
         "//metropolis/node/core/network/dns",
         "//metropolis/node/kubernetes/authproxy",
         "//metropolis/node/kubernetes/clusternet",
+        "//metropolis/node/kubernetes/metricsproxy",
         "//metropolis/node/kubernetes/nfproxy",
         "//metropolis/node/kubernetes/pki",
         "//metropolis/node/kubernetes/plugins/kvmdevice",
diff --git a/metropolis/node/kubernetes/metricsproxy/BUILD.bazel b/metropolis/node/kubernetes/metricsproxy/BUILD.bazel
new file mode 100644
index 0000000..95f8b2c
--- /dev/null
+++ b/metropolis/node/kubernetes/metricsproxy/BUILD.bazel
@@ -0,0 +1,14 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+    name = "metricsproxy",
+    srcs = ["metricsproxy.go"],
+    importpath = "source.monogon.dev/metropolis/node/kubernetes/metricsproxy",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//metropolis/node",
+        "//metropolis/node/kubernetes/pki",
+        "//metropolis/pkg/supervisor",
+        "@io_k8s_kubernetes//cmd/kubeadm/app/constants",
+    ],
+)
diff --git a/metropolis/node/kubernetes/metricsproxy/metricsproxy.go b/metropolis/node/kubernetes/metricsproxy/metricsproxy.go
new file mode 100644
index 0000000..14f76a0
--- /dev/null
+++ b/metropolis/node/kubernetes/metricsproxy/metricsproxy.go
@@ -0,0 +1,178 @@
+// Package metricsproxy implements an authenticating proxy in front of the K8s
+// controller-manager and scheduler providing unauthenticated access to the
+// metrics via local ports
+package metricsproxy
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"fmt"
+	"io"
+	"net"
+	"net/http"
+
+	"k8s.io/kubernetes/cmd/kubeadm/app/constants"
+
+	"source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/kubernetes/pki"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+type Service struct {
+	// KPKI is a reference to the Kubernetes PKI
+	KPKI *pki.PKI
+}
+
+type kubernetesExporter struct {
+	Name string
+	// TargetPort on which this exporter is running.
+	TargetPort node.Port
+	// TargetPort on which the unauthenticated exporter should run.
+	ListenPort node.Port
+	// ServerName used to verify the tls connection.
+	ServerName string
+}
+
+// services are the kubernetes services which are exposed via this proxy.
+var services = []*kubernetesExporter{
+	{
+		Name:       "kubernetes-scheduler",
+		TargetPort: constants.KubeSchedulerPort,
+		ListenPort: node.MetricsKubeSchedulerListenerPort,
+		ServerName: "kube-scheduler.local",
+	},
+	{
+		Name:       "kubernetes-controller-manager",
+		TargetPort: constants.KubeControllerManagerPort,
+		ListenPort: node.MetricsKubeControllerManagerListenerPort,
+		ServerName: "kube-controller-manager.local",
+	},
+}
+
+type metricsService struct {
+	*kubernetesExporter
+	transport *http.Transport
+}
+
+func (s *metricsService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != http.MethodGet {
+		http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
+		return
+	}
+
+	ctx := r.Context()
+
+	// We are supplying the http.Server with a BaseContext that contains the
+	// context from our runnable which contains the logger
+	logger := supervisor.Logger(ctx)
+
+	url := "https://127.0.0.1:" + s.TargetPort.PortString() + "/metrics"
+	outReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
+	if err != nil {
+		logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
+		http.Error(w, "internal server error", http.StatusInternalServerError)
+		return
+	}
+
+	res, err := s.transport.RoundTrip(outReq)
+	if err != nil {
+		logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, s.Name, err)
+		http.Error(w, "could not reach exporter", http.StatusBadGateway)
+		return
+	}
+	defer res.Body.Close()
+
+	copyHeader(w.Header(), res.Header)
+	w.WriteHeader(res.StatusCode)
+
+	if _, err := io.Copy(w, res.Body); err != nil {
+		logger.Errorf("%s: copying response from %q failed: %v", r.RemoteAddr, s.Name, err)
+		return
+	}
+}
+
+func (s *metricsService) Run(ctx context.Context) error {
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+	srv := http.Server{
+		BaseContext: func(_ net.Listener) context.Context {
+			return ctx
+		},
+		Addr:    net.JoinHostPort("127.0.0.1", s.ListenPort.PortString()),
+		Handler: s,
+	}
+
+	go func() {
+		<-ctx.Done()
+		srv.Close()
+	}()
+
+	err := srv.ListenAndServe()
+	if err != nil && ctx.Err() != nil {
+		return ctx.Err()
+	}
+	return fmt.Errorf("ListenAndServe: %w", err)
+}
+
+func (s *Service) Run(ctx context.Context) error {
+	// TODO(q3k): move this to IssueCertificates and replace with dedicated certificate
+	cert, key, err := s.KPKI.Certificate(ctx, pki.Master)
+	if err != nil {
+		return fmt.Errorf("could not load certificate %q from PKI: %w", pki.Master, err)
+	}
+	parsedKey, err := x509.ParsePKCS8PrivateKey(key)
+	if err != nil {
+		return fmt.Errorf("failed to parse key for cert %q: %w", pki.Master, err)
+	}
+
+	caCert, _, err := s.KPKI.Certificate(ctx, pki.IdCA)
+	if err != nil {
+		return fmt.Errorf("could not load certificate %q from PKI: %w", pki.IdCA, err)
+	}
+	parsedCACert, err := x509.ParseCertificate(caCert)
+	if err != nil {
+		return fmt.Errorf("failed to parse cert %q: %w", pki.IdCA, err)
+	}
+
+	rootCAs := x509.NewCertPool()
+	rootCAs.AddCert(parsedCACert)
+
+	tlsConfig := &tls.Config{
+		RootCAs: rootCAs,
+		Certificates: []tls.Certificate{{
+			Certificate: [][]byte{cert},
+			PrivateKey:  parsedKey,
+		}},
+	}
+
+	for _, svc := range services {
+		tlsConfig := tlsConfig.Clone()
+		tlsConfig.ServerName = svc.ServerName
+
+		transport := http.DefaultTransport.(*http.Transport).Clone()
+		transport.TLSClientConfig = tlsConfig
+		transport.TLSClientConfig.ServerName = svc.ServerName
+
+		err := supervisor.Run(ctx, svc.Name, (&metricsService{
+			kubernetesExporter: svc,
+			transport:          transport,
+		}).Run)
+		if err != nil {
+			return fmt.Errorf("could not run sub-service %q: %w", svc.Name, err)
+		}
+	}
+
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+	<-ctx.Done()
+	return ctx.Err()
+}
+
+func copyHeader(dst, src http.Header) {
+	for k, vv := range src {
+		for _, v := range vv {
+			dst.Add(k, v)
+		}
+	}
+}
diff --git a/metropolis/node/kubernetes/service_controller.go b/metropolis/node/kubernetes/service_controller.go
index a662666..fbeabca 100644
--- a/metropolis/node/kubernetes/service_controller.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -32,6 +32,7 @@
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/node/core/network/dns"
 	"source.monogon.dev/metropolis/node/kubernetes/authproxy"
+	"source.monogon.dev/metropolis/node/kubernetes/metricsproxy"
 	"source.monogon.dev/metropolis/node/kubernetes/pki"
 	"source.monogon.dev/metropolis/node/kubernetes/reconciler"
 	"source.monogon.dev/metropolis/pkg/supervisor"
@@ -156,6 +157,10 @@
 		Node: s.c.Node,
 	}
 
+	metricsProxy := metricsproxy.Service{
+		KPKI: s.c.KPKI,
+	}
+
 	for _, sub := range []struct {
 		name     string
 		runnable supervisor.Runnable
@@ -164,6 +169,7 @@
 		{"scheduler", runScheduler(*schedulerConfig)},
 		{"reconciler", reconciler.Maintain(clientSet)},
 		{"authproxy", authProxy.Run},
+		{"metricsproxy", metricsProxy.Run},
 	} {
 		err := supervisor.Run(ctx, sub.name, sub.runnable)
 		if err != nil {
diff --git a/metropolis/node/ports.go b/metropolis/node/ports.go
index 40c106f..cf9b0e0 100644
--- a/metropolis/node/ports.go
+++ b/metropolis/node/ports.go
@@ -48,9 +48,17 @@
 	// public MetricsPort.
 	MetricsNodeListenerPort Port = 7841
 	// MetricsEtcdListenerPort is the TCP port on which the etcd exporter
-	// runs, bound to 127.0.0.1. The Metrics Service proxies traffic to it from the
+	// runs, bound to 127.0.0.1. The metrics service proxies traffic to it from the
 	// public MetricsPort.
 	MetricsEtcdListenerPort Port = 7842
+	// MetricsKubeSchedulerListenerPort is the TCP port on which the proxy for
+	// the kube-scheduler runs, bound to 127.0.0.1. The metrics service proxies
+	// traffic to it from the public MetricsPort.
+	MetricsKubeSchedulerListenerPort Port = 7843
+	// MetricsKubeControllerManagerListenerPort is the TCP port on which the
+	// proxy for the controller-manager runs, bound to 127.0.0.1. The metrics
+	// service proxies traffic to it from the public MetricsPort.
+	MetricsKubeControllerManagerListenerPort Port = 7844
 	// KubernetesAPIPort is the TCP port on which the Kubernetes API is
 	// exposed.
 	KubernetesAPIPort Port = 6443