metropolis/node/core/metrics: implement http_sd discovery endpoint
We provide prometheus metrics but dont have a way to discover all nodes,
this change implements a new http endpoint: /discovery. It implements the
http_sd api and returns all current cluster nodes including their roles as
label.
Change-Id: I931a88e2afb285482d122dd059c96f9ebfab4052
Reviewed-on: https://review.monogon.dev/c/monogon/+/1934
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/metrics/metrics.go b/metropolis/node/core/metrics/metrics.go
index 2f2da73..631a9d1 100644
--- a/metropolis/node/core/metrics/metrics.go
+++ b/metropolis/node/core/metrics/metrics.go
@@ -4,13 +4,20 @@
"context"
"crypto/tls"
"crypto/x509"
+ "encoding/json"
"fmt"
"net"
"net/http"
"os/exec"
+ "sync"
+
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
"source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -31,17 +38,27 @@
// Credentials used to run the TLS/HTTPS listener and verify incoming
// connections.
Credentials *identity.NodeCredentials
+ // Curator is the gRPC client that the service will use to reach the cluster's
+ // Curator, for pulling a list of all nodes.
+ Curator ipb.CuratorClient
+ // LocalRoles contains the local node roles which gets listened on and
+ // is required to decide whether or not to start the discovery routine
+ LocalRoles *memory.Value[*cpb.NodeRoles]
// List of Exporters to run and to forward HTTP requests to. If not set, defaults
// to DefaultExporters.
Exporters []Exporter
-
// enableDynamicAddr enables listening on a dynamically chosen TCP port. This is
// used by tests to make sure we don't fail due to the default port being already
// in use.
enableDynamicAddr bool
+
// dynamicAddr will contain the picked dynamic listen address after the service
// starts, if enableDynamicAddr is set.
dynamicAddr chan string
+ // sdResp will contain the cached sdResponse
+ sdResp sdResponse
+ // sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
+ sdRespMtx sync.RWMutex
}
// listen starts the public TLS listener for the service.
@@ -116,6 +133,12 @@
logger.Infof("Registered exporter %q", exporter.Name)
}
+ // And register a http_sd discovery endpoint.
+ mux.HandleFunc("/discovery", s.handleDiscovery)
+
+ if err := supervisor.Run(ctx, "watch-roles", s.watchRoles); err != nil {
+ return err
+ }
supervisor.Signal(ctx, supervisor.SignalHealthy)
// Start forwarding server.
@@ -138,6 +161,129 @@
return fmt.Errorf("Serve: %w", err)
}
+func shouldStartDiscovery(nr *cpb.NodeRoles) bool {
+ return nr.ConsensusMember != nil
+}
+
+func (s *Service) watchRoles(ctx context.Context) error {
+ w := s.LocalRoles.Watch()
+ defer w.Close()
+
+ r, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+
+ if shouldStartDiscovery(r) {
+ supervisor.Logger(ctx).Infof("Starting discovery endpoint")
+ if err := supervisor.Run(ctx, "watch", s.watch); err != nil {
+ return err
+ }
+ }
+
+ for {
+ nr, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+
+ if shouldStartDiscovery(r) != shouldStartDiscovery(nr) {
+ s.sdRespMtx.Lock()
+ // disable the metrics endpoint until the new routine takes over
+ s.sdResp = nil
+ s.sdRespMtx.Unlock()
+
+ supervisor.Logger(ctx).Infof("Discovery endpoint config changed, restarting")
+ return fmt.Errorf("restarting")
+ }
+ }
+
+}
+
+// watch is the sub-runnable responsible for fetching node updates.
+func (s *Service) watch(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
+ Kind: &apb.WatchRequest_NodesInCluster_{
+ NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("curator watch failed: %w", err)
+ }
+ defer srv.CloseSend()
+
+ nodes := make(map[string]*apb.Node)
+ for {
+ ev, err := srv.Recv()
+ if err != nil {
+ return fmt.Errorf("curator watch recv failed: %w", err)
+ }
+
+ for _, n := range ev.Nodes {
+ nodes[n.Id] = n
+ }
+
+ for _, t := range ev.NodeTombstones {
+ n, ok := nodes[t.NodeId]
+ if !ok {
+ // This is an indication of us losing data somehow. If this happens, it likely
+ // means a Curator bug.
+ supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
+ continue
+ }
+ delete(nodes, n.Id)
+ }
+
+ s.sdRespMtx.Lock()
+
+ // reset the existing response slice
+ s.sdResp = s.sdResp[:0]
+ for _, n := range nodes {
+ // Only care about nodes that have all required configuration set.
+ if n.Status == nil || n.Status.ExternalAddress == "" || n.Roles == nil {
+ continue
+ }
+
+ s.sdResp = append(s.sdResp, sdTarget{
+ Targets: []string{n.Status.ExternalAddress},
+ Labels: map[string]string{
+ "kubernetes_worker": fmt.Sprintf("%t", n.Roles.KubernetesWorker != nil),
+ "consensus_member": fmt.Sprintf("%t", n.Roles.ConsensusMember != nil),
+ "kubernetes_controller": fmt.Sprintf("%t", n.Roles.KubernetesController != nil),
+ },
+ })
+ }
+
+ s.sdRespMtx.Unlock()
+ }
+}
+
+func (s *Service) handleDiscovery(w http.ResponseWriter, _ *http.Request) {
+ s.sdRespMtx.RLock()
+ defer s.sdRespMtx.RUnlock()
+
+ // If sdResp is nil, which only happens if we are not a master node
+ // or we are still booting, we respond with NotImplemented.
+ if s.sdResp == nil {
+ w.WriteHeader(http.StatusNotImplemented)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+
+ if err := json.NewEncoder(w).Encode(s.sdResp); err != nil {
+ // If the encoder fails its mostly because of closed connections
+ // so lets just ignore these errors.
+ return
+ }
+}
+
+type sdResponse []sdTarget
+
type sdTarget struct {
- Targets []string `json:"target"`
+ Targets []string `json:"targets"`
+ Labels map[string]string `json:"labels"`
}