metropolis/node/core/metrics: fixup metrics authentication
Change-Id: I67643855ab61bfdea980211ffe01e50c2409882b
Reviewed-on: https://review.monogon.dev/c/monogon/+/1979
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/metrics/BUILD.bazel b/metropolis/node/core/metrics/BUILD.bazel
index f698f0e..88cdc1b 100644
--- a/metropolis/node/core/metrics/BUILD.bazel
+++ b/metropolis/node/core/metrics/BUILD.bazel
@@ -3,6 +3,7 @@
go_library(
name = "metrics",
srcs = [
+ "discovery.go",
"exporters.go",
"metrics.go",
],
@@ -12,11 +13,7 @@
"//metropolis/node",
"//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/identity",
- "//metropolis/pkg/event/memory",
- "//metropolis/pkg/logtree",
"//metropolis/pkg/supervisor",
- "//metropolis/proto/common",
- "@io_k8s_kubernetes//cmd/kubeadm/app/constants",
],
)
@@ -32,9 +29,8 @@
"//metropolis/cli/pkg/datafile",
"//metropolis/node",
"//metropolis/node/core/curator/proto/api",
- "//metropolis/pkg/event/memory",
+ "//metropolis/pkg/freeport",
"//metropolis/pkg/supervisor",
- "//metropolis/proto/common",
"//metropolis/test/util",
"@com_zx2c4_golang_wireguard_wgctrl//wgtypes",
],
diff --git a/metropolis/node/core/metrics/discovery.go b/metropolis/node/core/metrics/discovery.go
new file mode 100644
index 0000000..037d3b0
--- /dev/null
+++ b/metropolis/node/core/metrics/discovery.go
@@ -0,0 +1,129 @@
+package metrics
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "sync"
+
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+type Discovery struct {
+ Curator ipb.CuratorClient
+
+ // sdResp will contain the cached sdResponse
+ sdResp sdResponse
+ // sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
+ sdRespMtx sync.RWMutex
+}
+
+type sdResponse []sdTarget
+
+type sdTarget struct {
+ Targets []string `json:"targets"`
+ Labels map[string]string `json:"labels"`
+}
+
+// Run is the sub-runnable responsible for fetching and serving node updates.
+func (s *Discovery) Run(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
+ Kind: &apb.WatchRequest_NodesInCluster_{
+ NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("curator watch failed: %w", err)
+ }
+ defer srv.CloseSend()
+
+ defer func() {
+ s.sdRespMtx.Lock()
+ // disable the metrics endpoint until the new routine takes over
+ s.sdResp = nil
+ s.sdRespMtx.Unlock()
+ }()
+
+ nodes := make(map[string]*apb.Node)
+ for {
+ ev, err := srv.Recv()
+ if err != nil {
+ // The watcher wont return a properly wrapped error which confuses
+ // our testing harness. Lets just return the context error directly
+ // if it exists.
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+
+ return fmt.Errorf("curator watch recv failed: %w", err)
+ }
+
+ for _, n := range ev.Nodes {
+ nodes[n.Id] = n
+ }
+
+ for _, t := range ev.NodeTombstones {
+ n, ok := nodes[t.NodeId]
+ if !ok {
+ // This is an indication of us losing data somehow. If this happens, it likely
+ // means a Curator bug.
+ supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
+ continue
+ }
+ delete(nodes, n.Id)
+ }
+
+ s.sdRespMtx.Lock()
+
+ s.sdResp = nil
+ for _, n := range nodes {
+ // Only care about nodes that have all required configuration set.
+ if n.Status == nil || n.Status.ExternalAddress == "" || n.Roles == nil {
+ continue
+ }
+
+ s.sdResp = append(s.sdResp, sdTarget{
+ Targets: []string{n.Status.ExternalAddress},
+ Labels: map[string]string{
+ "__meta_metropolis_role_kubernetes_worker": fmt.Sprintf("%t", n.Roles.KubernetesWorker != nil),
+ "__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", n.Roles.KubernetesController != nil),
+ "__meta_metropolis_role_consensus_member": fmt.Sprintf("%t", n.Roles.ConsensusMember != nil),
+ },
+ })
+ }
+
+ s.sdRespMtx.Unlock()
+ }
+}
+
+func (s *Discovery) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
+ return
+ }
+
+ s.sdRespMtx.RLock()
+ defer s.sdRespMtx.RUnlock()
+
+ // If sdResp is nil, which only happens if we are not a master node
+ // or we are still booting, we respond with NotImplemented.
+ if s.sdResp == nil {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+
+ if err := json.NewEncoder(w).Encode(s.sdResp); err != nil {
+ // If the encoder fails its mostly because of closed connections
+ // so lets just ignore these errors.
+ return
+ }
+}
diff --git a/metropolis/node/core/metrics/exporters.go b/metropolis/node/core/metrics/exporters.go
index 5949eb4..a70e0cb 100644
--- a/metropolis/node/core/metrics/exporters.go
+++ b/metropolis/node/core/metrics/exporters.go
@@ -1,17 +1,12 @@
package metrics
import (
- "crypto/tls"
"fmt"
"io"
- "net"
"net/http"
- "net/url"
-
- "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"source.monogon.dev/metropolis/node"
- "source.monogon.dev/metropolis/pkg/logtree"
+ "source.monogon.dev/metropolis/pkg/supervisor"
)
// An Exporter is a Prometheus binary running under the Metrics service which
@@ -24,10 +19,6 @@
Name string
// Port on which this exporter will be running.
Port node.Port
- // ServerName used to verify the tls connection.
- ServerName string
- // TLSConfigFunc is used to configure tls authentication
- TLSConfigFunc func(*Service, *Exporter) *tls.Config
// Executable to run to start the exporter.
Executable string
// Arguments to start the exporter. The exporter should listen at 127.0.0.1 and
@@ -36,7 +27,7 @@
}
// DefaultExporters are the exporters which we run by default in Metropolis.
-var DefaultExporters = []Exporter{
+var DefaultExporters = []*Exporter{
{
Name: "node",
Port: node.MetricsNodeListenerPort,
@@ -54,57 +45,43 @@
Port: node.MetricsEtcdListenerPort,
},
{
- Name: "kubernetes-scheduler",
- Port: constants.KubeSchedulerPort,
- ServerName: "kube-scheduler.local",
- TLSConfigFunc: (*Service).kubeTLSConfig,
+ Name: "kubernetes-scheduler",
+ Port: node.MetricsKubeSchedulerListenerPort,
},
{
- Name: "kubernetes-controller-manager",
- Port: constants.KubeControllerManagerPort,
- ServerName: "kube-controller-manager.local",
- TLSConfigFunc: (*Service).kubeTLSConfig,
+ Name: "kubernetes-controller-manager",
+ Port: node.MetricsKubeControllerManagerListenerPort,
},
}
-func (s *Service) kubeTLSConfig(e *Exporter) *tls.Config {
- c := s.KubeTLSConfig.Clone()
- c.ServerName = e.ServerName
- return c
-}
-
-// forward a given HTTP request to this exporter.
-func (e *Exporter) forward(s *Service, logger logtree.LeveledLogger, w http.ResponseWriter, r *http.Request) {
- ctx := r.Context()
- outreq := r.Clone(ctx)
-
- outreq.URL = &url.URL{
- Scheme: "http",
- Host: net.JoinHostPort("127.0.0.1", e.Port.PortString()),
- Path: "/metrics",
- }
-
- transport := http.DefaultTransport.(*http.Transport).Clone()
- if e.TLSConfigFunc != nil {
- outreq.URL.Scheme = "https"
- transport.TLSClientConfig = e.TLSConfigFunc(s, e)
- }
- logger.V(1).Infof("%s: forwarding %s to %s", r.RemoteAddr, r.URL.String(), outreq.URL.String())
-
- if r.ContentLength == 0 {
- outreq.Body = nil
- }
- if outreq.Body != nil {
- defer outreq.Body.Close()
- }
- res, err := transport.RoundTrip(outreq)
- if err != nil {
- logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, e.Name, err)
- w.WriteHeader(502)
- fmt.Fprintf(w, "could not reach exporter")
+func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
return
}
+ ctx := r.Context()
+
+ // We are supplying the http.Server with a BaseContext that contains the
+ // context from our runnable which contains the logger.
+ logger := supervisor.Logger(ctx)
+
+ url := "http://127.0.0.1:" + e.Port.PortString() + "/metrics"
+ outReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
+ if err != nil {
+ logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, e.Name, err)
+ http.Error(w, "internal server error", http.StatusInternalServerError)
+ return
+ }
+
+ res, err := http.DefaultTransport.RoundTrip(outReq)
+ if err != nil {
+ logger.Errorf("%s: forwarding to %q failed: %v", r.RemoteAddr, e.Name, err)
+ http.Error(w, "could not reach exporter", http.StatusBadGateway)
+ return
+ }
+ defer res.Body.Close()
+
copyHeader(w.Header(), res.Header)
w.WriteHeader(res.StatusCode)
@@ -112,7 +89,6 @@
logger.Errorf("%s: copying response from %q failed: %v", r.RemoteAddr, e.Name, err)
return
}
- res.Body.Close()
}
func copyHeader(dst, src http.Header) {
diff --git a/metropolis/node/core/metrics/metrics.go b/metropolis/node/core/metrics/metrics.go
index e087ada..4aa0872 100644
--- a/metropolis/node/core/metrics/metrics.go
+++ b/metropolis/node/core/metrics/metrics.go
@@ -4,20 +4,13 @@
"context"
"crypto/tls"
"crypto/x509"
- "encoding/json"
"fmt"
"net"
"net/http"
"os/exec"
- "sync"
-
- apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- cpb "source.monogon.dev/metropolis/proto/common"
"source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/identity"
- "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -38,19 +31,11 @@
// Credentials used to run the TLS/HTTPS listener and verify incoming
// connections.
Credentials *identity.NodeCredentials
- // Curator is the gRPC client that the service will use to reach the cluster's
- // Curator, for pulling a list of all nodes.
- Curator ipb.CuratorClient
- // LocalRoles contains the local node roles which gets listened on and
- // is required to decide whether or not to start the discovery routine
- LocalRoles *memory.Value[*cpb.NodeRoles]
- // KubeTLSConfig provides the tls.Config for authenticating against kubernetes
- // services.
- KubeTLSConfig *tls.Config
+ Discovery Discovery
// List of Exporters to run and to forward HTTP requests to. If not set, defaults
// to DefaultExporters.
- Exporters []Exporter
+ Exporters []*Exporter
// enableDynamicAddr enables listening on a dynamically chosen TCP port. This is
// used by tests to make sure we don't fail due to the default port being already
// in use.
@@ -59,10 +44,6 @@
// dynamicAddr will contain the picked dynamic listen address after the service
// starts, if enableDynamicAddr is set.
dynamicAddr chan string
- // sdResp will contain the cached sdResponse
- sdResp sdResponse
- // sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
- sdRespMtx sync.RWMutex
}
// listen starts the public TLS listener for the service.
@@ -131,21 +112,14 @@
mux := http.NewServeMux()
logger := supervisor.Logger(ctx)
for _, exporter := range s.Exporters {
- exporter := exporter
-
- mux.HandleFunc(exporter.externalPath(), func(w http.ResponseWriter, r *http.Request) {
- exporter.forward(s, logger, w, r)
- })
+ mux.HandleFunc(exporter.externalPath(), exporter.ServeHTTP)
logger.Infof("Registered exporter %q", exporter.Name)
}
// And register a http_sd discovery endpoint.
- mux.HandleFunc("/discovery", s.handleDiscovery)
+ mux.Handle("/discovery", &s.Discovery)
- if err := supervisor.Run(ctx, "watch-roles", s.watchRoles); err != nil {
- return err
- }
supervisor.Signal(ctx, supervisor.SignalHealthy)
// Start forwarding server.
@@ -167,130 +141,3 @@
}
return fmt.Errorf("Serve: %w", err)
}
-
-func shouldStartDiscovery(nr *cpb.NodeRoles) bool {
- return nr.ConsensusMember != nil
-}
-
-func (s *Service) watchRoles(ctx context.Context) error {
- w := s.LocalRoles.Watch()
- defer w.Close()
-
- r, err := w.Get(ctx)
- if err != nil {
- return err
- }
-
- if shouldStartDiscovery(r) {
- supervisor.Logger(ctx).Infof("Starting discovery endpoint")
- if err := supervisor.Run(ctx, "watch", s.watch); err != nil {
- return err
- }
- }
-
- for {
- nr, err := w.Get(ctx)
- if err != nil {
- return err
- }
-
- if shouldStartDiscovery(r) != shouldStartDiscovery(nr) {
- s.sdRespMtx.Lock()
- // disable the metrics endpoint until the new routine takes over
- s.sdResp = nil
- s.sdRespMtx.Unlock()
-
- supervisor.Logger(ctx).Infof("Discovery endpoint config changed, restarting")
- return fmt.Errorf("restarting")
- }
- }
-
-}
-
-// watch is the sub-runnable responsible for fetching node updates.
-func (s *Service) watch(ctx context.Context) error {
- supervisor.Signal(ctx, supervisor.SignalHealthy)
-
- srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
- Kind: &apb.WatchRequest_NodesInCluster_{
- NodesInCluster: &apb.WatchRequest_NodesInCluster{},
- },
- })
- if err != nil {
- return fmt.Errorf("curator watch failed: %w", err)
- }
- defer srv.CloseSend()
-
- nodes := make(map[string]*apb.Node)
- for {
- ev, err := srv.Recv()
- if err != nil {
- return fmt.Errorf("curator watch recv failed: %w", err)
- }
-
- for _, n := range ev.Nodes {
- nodes[n.Id] = n
- }
-
- for _, t := range ev.NodeTombstones {
- n, ok := nodes[t.NodeId]
- if !ok {
- // This is an indication of us losing data somehow. If this happens, it likely
- // means a Curator bug.
- supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
- continue
- }
- delete(nodes, n.Id)
- }
-
- s.sdRespMtx.Lock()
-
- // reset the existing response slice
- s.sdResp = s.sdResp[:0]
- for _, n := range nodes {
- // Only care about nodes that have all required configuration set.
- if n.Status == nil || n.Status.ExternalAddress == "" || n.Roles == nil {
- continue
- }
-
- s.sdResp = append(s.sdResp, sdTarget{
- Targets: []string{n.Status.ExternalAddress},
- Labels: map[string]string{
- "__meta_metropolis_role_kubernetes_worker": fmt.Sprintf("%t", n.Roles.KubernetesWorker != nil),
- "__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", n.Roles.KubernetesController != nil),
- "__meta_metropolis_role_consensus_member": fmt.Sprintf("%t", n.Roles.ConsensusMember != nil),
- },
- })
- }
-
- s.sdRespMtx.Unlock()
- }
-}
-
-func (s *Service) handleDiscovery(w http.ResponseWriter, _ *http.Request) {
- s.sdRespMtx.RLock()
- defer s.sdRespMtx.RUnlock()
-
- // If sdResp is nil, which only happens if we are not a master node
- // or we are still booting, we respond with NotImplemented.
- if s.sdResp == nil {
- w.WriteHeader(http.StatusNotImplemented)
- return
- }
-
- w.Header().Set("Content-Type", "application/json")
- w.WriteHeader(http.StatusOK)
-
- if err := json.NewEncoder(w).Encode(s.sdResp); err != nil {
- // If the encoder fails its mostly because of closed connections
- // so lets just ignore these errors.
- return
- }
-}
-
-type sdResponse []sdTarget
-
-type sdTarget struct {
- Targets []string `json:"targets"`
- Labels map[string]string `json:"labels"`
-}
diff --git a/metropolis/node/core/metrics/metrics_test.go b/metropolis/node/core/metrics/metrics_test.go
index c583f3a..70762e1 100644
--- a/metropolis/node/core/metrics/metrics_test.go
+++ b/metropolis/node/core/metrics/metrics_test.go
@@ -15,40 +15,42 @@
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- cpb "source.monogon.dev/metropolis/proto/common"
"source.monogon.dev/metropolis/cli/pkg/datafile"
"source.monogon.dev/metropolis/node"
- "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/freeport"
"source.monogon.dev/metropolis/pkg/supervisor"
"source.monogon.dev/metropolis/test/util"
)
+func fakeExporter(name, value string) *Exporter {
+ path, _ := datafile.ResolveRunfile("metropolis/node/core/metrics/fake_exporter/fake_exporter_/fake_exporter")
+
+ p, closer, err := freeport.AllocateTCPPort()
+ if err != nil {
+ panic(err)
+ }
+ defer closer.Close()
+ port := node.Port(p)
+
+ return &Exporter{
+ Name: name,
+ Port: port,
+ Executable: path,
+ Arguments: []string{
+ "-listen", "127.0.0.1:" + port.PortString(),
+ "-value", value,
+ },
+ }
+}
+
// TestMetricsForwarder exercises the metrics forwarding functionality of the
// metrics service. That is, it makes sure that the service starts some fake
// exporters and then forwards HTTP traffic to them.
func TestMetricsForwarder(t *testing.T) {
- path, _ := datafile.ResolveRunfile("metropolis/node/core/metrics/fake_exporter/fake_exporter_/fake_exporter")
-
- exporters := []Exporter{
- {
- Name: "test1",
- Port: node.Port(8081),
- Executable: path,
- Arguments: []string{
- "-listen", "127.0.0.1:8081",
- "-value", "100",
- },
- },
- {
- Name: "test2",
- Port: node.Port(8082),
- Executable: path,
- Arguments: []string{
- "-listen", "127.0.0.1:8082",
- "-value", "200",
- },
- },
+ exporters := []*Exporter{
+ fakeExporter("test1", "100"),
+ fakeExporter("test2", "200"),
}
eph := util.NewEphemeralClusterCredentials(t, 1)
@@ -135,14 +137,30 @@
svc := Service{
Credentials: eph.Nodes[0],
- Curator: apb.NewCuratorClient(ccl),
- LocalRoles: &memory.Value[*cpb.NodeRoles]{},
- Exporters: []Exporter{},
+ Discovery: Discovery{Curator: apb.NewCuratorClient(ccl)},
enableDynamicAddr: true,
dynamicAddr: make(chan string),
}
- supervisor.TestHarness(t, svc.Run)
+ enableDiscovery := make(chan bool)
+ supervisor.TestHarness(t, func(ctx context.Context) error {
+ if err := supervisor.Run(ctx, "metrics", svc.Run); err != nil {
+ return err
+ }
+
+ err := supervisor.Run(ctx, "discovery", func(ctx context.Context) error {
+ <-enableDiscovery
+ return svc.Discovery.Run(ctx)
+ })
+ if err != nil {
+ return err
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+ })
+
addr := <-svc.dynamicAddr
pool := x509.NewCertPool()
@@ -174,15 +192,14 @@
return fmt.Errorf("Get(%q): %v", url, err)
}
defer res.Body.Close()
- if res.StatusCode != http.StatusNotImplemented {
+ if res.StatusCode != http.StatusServiceUnavailable {
return fmt.Errorf("Get(%q): code %d", url, res.StatusCode)
}
return nil
})
- // First set the local roles to be a consensus member which starts a watcher,
- // create a fake node after that
- svc.LocalRoles.Set(&cpb.NodeRoles{ConsensusMember: &cpb.NodeRoles_ConsensusMember{}})
+ // First start the watcher, create a fake node after that
+ enableDiscovery <- true
curator.NodeWithPrefixes(wgtypes.Key{}, "metropolis-fake-1", "1.2.3.4")
util.TestEventual(t, "active-discovery", ctx, 10*time.Second, func(ctx context.Context) error {