metropolis/node/core/metrics: implement http_sd discovery endpoint

We provide prometheus metrics but dont have a way to discover all nodes,
this change implements a new http endpoint: /discovery. It implements the
http_sd api and returns all current cluster nodes including their roles as
label.

Change-Id: I931a88e2afb285482d122dd059c96f9ebfab4052
Reviewed-on: https://review.monogon.dev/c/monogon/+/1934
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/metrics/BUILD.bazel b/metropolis/node/core/metrics/BUILD.bazel
index 3317719..9233ee2 100644
--- a/metropolis/node/core/metrics/BUILD.bazel
+++ b/metropolis/node/core/metrics/BUILD.bazel
@@ -10,9 +10,12 @@
     visibility = ["//visibility:public"],
     deps = [
         "//metropolis/node",
+        "//metropolis/node/core/curator/proto/api",
         "//metropolis/node/core/identity",
+        "//metropolis/pkg/event/memory",
         "//metropolis/pkg/logtree",
         "//metropolis/pkg/supervisor",
+        "//metropolis/proto/common",
     ],
 )
 
@@ -27,7 +30,11 @@
     deps = [
         "//metropolis/cli/pkg/datafile",
         "//metropolis/node",
+        "//metropolis/node/core/curator/proto/api",
+        "//metropolis/pkg/event/memory",
         "//metropolis/pkg/supervisor",
+        "//metropolis/proto/common",
         "//metropolis/test/util",
+        "@com_zx2c4_golang_wireguard_wgctrl//wgtypes",
     ],
 )
diff --git a/metropolis/node/core/metrics/metrics.go b/metropolis/node/core/metrics/metrics.go
index 2f2da73..631a9d1 100644
--- a/metropolis/node/core/metrics/metrics.go
+++ b/metropolis/node/core/metrics/metrics.go
@@ -4,13 +4,20 @@
 	"context"
 	"crypto/tls"
 	"crypto/x509"
+	"encoding/json"
 	"fmt"
 	"net"
 	"net/http"
 	"os/exec"
+	"sync"
+
+	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
 
 	"source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
@@ -31,17 +38,27 @@
 	// Credentials used to run the TLS/HTTPS listener and verify incoming
 	// connections.
 	Credentials *identity.NodeCredentials
+	// Curator is the gRPC client that the service will use to reach the cluster's
+	// Curator, for pulling a list of all nodes.
+	Curator ipb.CuratorClient
+	// LocalRoles contains the local node roles which gets listened on and
+	// is required to decide whether or not to start the discovery routine
+	LocalRoles *memory.Value[*cpb.NodeRoles]
 	// List of Exporters to run and to forward HTTP requests to. If not set, defaults
 	// to DefaultExporters.
 	Exporters []Exporter
-
 	// enableDynamicAddr enables listening on a dynamically chosen TCP port. This is
 	// used by tests to make sure we don't fail due to the default port being already
 	// in use.
 	enableDynamicAddr bool
+
 	// dynamicAddr will contain the picked dynamic listen address after the service
 	// starts, if enableDynamicAddr is set.
 	dynamicAddr chan string
+	// sdResp will contain the cached sdResponse
+	sdResp sdResponse
+	// sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
+	sdRespMtx sync.RWMutex
 }
 
 // listen starts the public TLS listener for the service.
@@ -116,6 +133,12 @@
 		logger.Infof("Registered exporter %q", exporter.Name)
 	}
 
+	// And register a http_sd discovery endpoint.
+	mux.HandleFunc("/discovery", s.handleDiscovery)
+
+	if err := supervisor.Run(ctx, "watch-roles", s.watchRoles); err != nil {
+		return err
+	}
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
 	// Start forwarding server.
@@ -138,6 +161,129 @@
 	return fmt.Errorf("Serve: %w", err)
 }
 
+func shouldStartDiscovery(nr *cpb.NodeRoles) bool {
+	return nr.ConsensusMember != nil
+}
+
+func (s *Service) watchRoles(ctx context.Context) error {
+	w := s.LocalRoles.Watch()
+	defer w.Close()
+
+	r, err := w.Get(ctx)
+	if err != nil {
+		return err
+	}
+
+	if shouldStartDiscovery(r) {
+		supervisor.Logger(ctx).Infof("Starting discovery endpoint")
+		if err := supervisor.Run(ctx, "watch", s.watch); err != nil {
+			return err
+		}
+	}
+
+	for {
+		nr, err := w.Get(ctx)
+		if err != nil {
+			return err
+		}
+
+		if shouldStartDiscovery(r) != shouldStartDiscovery(nr) {
+			s.sdRespMtx.Lock()
+			// disable the metrics endpoint until the new routine takes over
+			s.sdResp = nil
+			s.sdRespMtx.Unlock()
+
+			supervisor.Logger(ctx).Infof("Discovery endpoint config changed, restarting")
+			return fmt.Errorf("restarting")
+		}
+	}
+
+}
+
+// watch is the sub-runnable responsible for fetching node updates.
+func (s *Service) watch(ctx context.Context) error {
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+	srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
+		Kind: &apb.WatchRequest_NodesInCluster_{
+			NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+		},
+	})
+	if err != nil {
+		return fmt.Errorf("curator watch failed: %w", err)
+	}
+	defer srv.CloseSend()
+
+	nodes := make(map[string]*apb.Node)
+	for {
+		ev, err := srv.Recv()
+		if err != nil {
+			return fmt.Errorf("curator watch recv failed: %w", err)
+		}
+
+		for _, n := range ev.Nodes {
+			nodes[n.Id] = n
+		}
+
+		for _, t := range ev.NodeTombstones {
+			n, ok := nodes[t.NodeId]
+			if !ok {
+				// This is an indication of us losing data somehow. If this happens, it likely
+				// means a Curator bug.
+				supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
+				continue
+			}
+			delete(nodes, n.Id)
+		}
+
+		s.sdRespMtx.Lock()
+
+		// reset the existing response slice
+		s.sdResp = s.sdResp[:0]
+		for _, n := range nodes {
+			// Only care about nodes that have all required configuration set.
+			if n.Status == nil || n.Status.ExternalAddress == "" || n.Roles == nil {
+				continue
+			}
+
+			s.sdResp = append(s.sdResp, sdTarget{
+				Targets: []string{n.Status.ExternalAddress},
+				Labels: map[string]string{
+					"kubernetes_worker":     fmt.Sprintf("%t", n.Roles.KubernetesWorker != nil),
+					"consensus_member":      fmt.Sprintf("%t", n.Roles.ConsensusMember != nil),
+					"kubernetes_controller": fmt.Sprintf("%t", n.Roles.KubernetesController != nil),
+				},
+			})
+		}
+
+		s.sdRespMtx.Unlock()
+	}
+}
+
+func (s *Service) handleDiscovery(w http.ResponseWriter, _ *http.Request) {
+	s.sdRespMtx.RLock()
+	defer s.sdRespMtx.RUnlock()
+
+	// If sdResp is nil, which only happens if we are not a master node
+	// or we are still booting, we respond with NotImplemented.
+	if s.sdResp == nil {
+		w.WriteHeader(http.StatusNotImplemented)
+		return
+	}
+
+	w.Header().Set("Content-Type", "application/json")
+	w.WriteHeader(http.StatusOK)
+
+	if err := json.NewEncoder(w).Encode(s.sdResp); err != nil {
+		// If the encoder fails its mostly because of closed connections
+		// so lets just ignore these errors.
+		return
+	}
+}
+
+type sdResponse []sdTarget
+
 type sdTarget struct {
-	Targets []string `json:"target"`
+	Targets []string          `json:"targets"`
+	Labels  map[string]string `json:"labels"`
 }
diff --git a/metropolis/node/core/metrics/metrics_test.go b/metropolis/node/core/metrics/metrics_test.go
index 03fabca..ebe9bd4 100644
--- a/metropolis/node/core/metrics/metrics_test.go
+++ b/metropolis/node/core/metrics/metrics_test.go
@@ -12,8 +12,14 @@
 	"testing"
 	"time"
 
+	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
+
+	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
+
 	"source.monogon.dev/metropolis/cli/pkg/datafile"
 	"source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	"source.monogon.dev/metropolis/test/util"
 )
@@ -120,3 +126,85 @@
 		return nil
 	})
 }
+
+func TestDiscovery(t *testing.T) {
+	eph := util.NewEphemeralClusterCredentials(t, 1)
+
+	curator, ccl := util.MakeTestCurator(t)
+	defer ccl.Close()
+
+	svc := Service{
+		Credentials:       eph.Nodes[0],
+		Curator:           apb.NewCuratorClient(ccl),
+		LocalRoles:        &memory.Value[*cpb.NodeRoles]{},
+		Exporters:         []Exporter{},
+		enableDynamicAddr: true,
+		dynamicAddr:       make(chan string),
+	}
+
+	supervisor.TestHarness(t, svc.Run)
+	addr := <-svc.dynamicAddr
+
+	pool := x509.NewCertPool()
+	pool.AddCert(eph.CA)
+
+	cl := http.Client{
+		Transport: &http.Transport{
+			TLSClientConfig: &tls.Config{
+				ServerName: eph.Nodes[0].ID(),
+				RootCAs:    pool,
+
+				Certificates: []tls.Certificate{eph.Manager},
+			},
+		},
+	}
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	util.TestEventual(t, "inactive-discovery", ctx, 10*time.Second, func(ctx context.Context) error {
+		url := (&url.URL{
+			Scheme: "https",
+			Host:   addr,
+			Path:   "/discovery",
+		}).String()
+		req, _ := http.NewRequest("GET", url, nil)
+		res, err := cl.Do(req)
+		if err != nil {
+			return fmt.Errorf("Get(%q): %v", url, err)
+		}
+		defer res.Body.Close()
+		if res.StatusCode != http.StatusNotImplemented {
+			return fmt.Errorf("Get(%q): code %d", url, res.StatusCode)
+		}
+		return nil
+	})
+
+	// First set the local roles to be a consensus member which starts a watcher,
+	// create a fake node after that
+	svc.LocalRoles.Set(&cpb.NodeRoles{ConsensusMember: &cpb.NodeRoles_ConsensusMember{}})
+	curator.NodeWithPrefixes(wgtypes.Key{}, "metropolis-fake-1", "1.2.3.4")
+
+	util.TestEventual(t, "active-discovery", ctx, 10*time.Second, func(ctx context.Context) error {
+		url := (&url.URL{
+			Scheme: "https",
+			Host:   addr,
+			Path:   "/discovery",
+		}).String()
+		req, _ := http.NewRequest("GET", url, nil)
+		res, err := cl.Do(req)
+		if err != nil {
+			return fmt.Errorf("Get(%q): %v", url, err)
+		}
+		defer res.Body.Close()
+		if res.StatusCode != http.StatusOK {
+			return fmt.Errorf("Get(%q): code %d", url, res.StatusCode)
+		}
+		body, _ := io.ReadAll(res.Body)
+		want := `[{"targets":["1.2.3.4"],"labels":{"consensus_member":"true","kubernetes_controller":"false","kubernetes_worker":"false"}}]`
+		if !strings.Contains(string(body), want) {
+			return util.Permanent(fmt.Errorf("did not find expected value %q in %q", want, string(body)))
+		}
+		return nil
+	})
+}
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index ddc5811..02bc510 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -167,6 +167,7 @@
 
 	s.metrics = &workerMetrics{
 		curatorConnection: &s.CuratorConnection,
+		localRoles:        &s.localRoles,
 	}
 
 	return s
diff --git a/metropolis/node/core/roleserve/worker_metrics.go b/metropolis/node/core/roleserve/worker_metrics.go
index 78c62d6..a9add88 100644
--- a/metropolis/node/core/roleserve/worker_metrics.go
+++ b/metropolis/node/core/roleserve/worker_metrics.go
@@ -3,9 +3,13 @@
 import (
 	"context"
 
+	cpb "source.monogon.dev/metropolis/proto/common"
+
 	"source.monogon.dev/metropolis/node/core/metrics"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 )
 
 // workerMetrics runs the Metrics Service, which runs local Prometheus collectors
@@ -14,6 +18,7 @@
 // over HTTPS using the Cluster CA.
 type workerMetrics struct {
 	curatorConnection *memory.Value[*curatorConnection]
+	localRoles        *memory.Value[*cpb.NodeRoles]
 }
 
 func (s *workerMetrics) run(ctx context.Context) error {
@@ -29,6 +34,8 @@
 
 	svc := metrics.Service{
 		Credentials: cc.credentials,
+		Curator:     ipb.NewCuratorClient(cc.conn),
+		LocalRoles:  s.localRoles,
 	}
 	return svc.Run(ctx)
 }