cloud/shepherd: add equinix API metrics

This adds the following signals to our interaction with the Equinix API:

 1. Latency
 2. Traffic
 3. Errors
 4. Saturation

Change-Id: Ic2d5e36a7a26ab906ac1c2fa6741ebf86b9e551a
Reviewed-on: https://review.monogon.dev/c/monogon/+/1606
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/shepherd/equinix/manager/server/main.go b/cloud/shepherd/equinix/manager/server/main.go
index 21e7e5a..984d96f 100644
--- a/cloud/shepherd/equinix/manager/server/main.go
+++ b/cloud/shepherd/equinix/manager/server/main.go
@@ -68,6 +68,7 @@
 	if c.API.APIKey == "" || c.API.User == "" {
 		klog.Exitf("-equinix_api_username and -equinix_api_key must be set")
 	}
+	c.API.MetricsRegistry = registry
 	api := wrapngo.New(&c.API)
 
 	// These variables are _very_ important to configure correctly, otherwise someone
diff --git a/cloud/shepherd/equinix/wrapngo/BUILD.bazel b/cloud/shepherd/equinix/wrapngo/BUILD.bazel
index 166ee76..521e1ca 100644
--- a/cloud/shepherd/equinix/wrapngo/BUILD.bazel
+++ b/cloud/shepherd/equinix/wrapngo/BUILD.bazel
@@ -4,6 +4,7 @@
     name = "wrapngo",
     srcs = [
         "duct_tape.go",
+        "metrics.go",
         "wrapn.go",
     ],
     importpath = "source.monogon.dev/cloud/shepherd/equinix/wrapngo",
@@ -12,6 +13,7 @@
         "@com_github_cenkalti_backoff_v4//:backoff",
         "@com_github_google_uuid//:uuid",
         "@com_github_packethost_packngo//:packngo",
+        "@com_github_prometheus_client_golang//prometheus",
         "@io_k8s_klog_v2//:klog",
     ],
 )
diff --git a/cloud/shepherd/equinix/wrapngo/duct_tape.go b/cloud/shepherd/equinix/wrapngo/duct_tape.go
index f58cf7b..d5dab7c 100644
--- a/cloud/shepherd/equinix/wrapngo/duct_tape.go
+++ b/cloud/shepherd/equinix/wrapngo/duct_tape.go
@@ -5,6 +5,7 @@
 	"errors"
 	"fmt"
 	"net/http"
+	"time"
 
 	"github.com/cenkalti/backoff/v4"
 	"github.com/packethost/packngo"
@@ -26,14 +27,10 @@
 // idempotent logic, as long as it cooperates with the above contract.
 func wrap[U any](ctx context.Context, cl *client, fn func(*packngo.Client) (U, error)) (U, error) {
 	var zero U
-	select {
-	case cl.serializer <- struct{}{}:
-	case <-ctx.Done():
-		return zero, ctx.Err()
+	if err := cl.serializer.up(ctx); err != nil {
+		return zero, err
 	}
-	defer func() {
-		<-cl.serializer
-	}()
+	defer cl.serializer.down()
 
 	bc := backoff.WithContext(cl.o.BackOff(), ctx)
 	pngo, err := cl.clientForContext(ctx)
@@ -60,11 +57,16 @@
 type injectContextRoundTripper struct {
 	ctx      context.Context
 	original http.RoundTripper
+	metrics  *metricsSet
 }
 
 func (r *injectContextRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
 	klog.V(5).Infof("Request -> %v", req.URL.String())
+	start := time.Now()
 	res, err := r.original.RoundTrip(req.WithContext(r.ctx))
+	latency := time.Since(start)
+	r.metrics.onAPIRequestDone(req, res, err, latency)
+
 	if err != nil {
 		klog.V(5).Infof("HTTP error <- %v", err)
 	} else {
@@ -78,6 +80,7 @@
 		Transport: &injectContextRoundTripper{
 			ctx:      ctx,
 			original: http.DefaultTransport,
+			metrics:  c.metrics,
 		},
 	}
 	return packngo.NewClient(packngo.WithAuth(c.username, c.token), packngo.WithHTTPClient(httpcl))
diff --git a/cloud/shepherd/equinix/wrapngo/metrics.go b/cloud/shepherd/equinix/wrapngo/metrics.go
new file mode 100644
index 0000000..0f4cc94
--- /dev/null
+++ b/cloud/shepherd/equinix/wrapngo/metrics.go
@@ -0,0 +1,129 @@
+package wrapngo
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"net/http"
+	"regexp"
+	"strings"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"k8s.io/klog/v2"
+)
+
+// metricsSet contains all the Prometheus metrics collected by wrapngo.
+type metricsSet struct {
+	requestLatencies *prometheus.HistogramVec
+	waiting          prometheus.GaugeFunc
+	inFlight         prometheus.GaugeFunc
+}
+
+func newMetricsSet(ser *serializer) *metricsSet {
+	return &metricsSet{
+		requestLatencies: prometheus.NewHistogramVec(
+			prometheus.HistogramOpts{
+				Name: "equinix_api_latency",
+				Help: "Equinix API request latency in seconds, partitioned by endpoint status code",
+			},
+			[]string{"endpoint", "status_code"},
+		),
+		waiting: prometheus.NewGaugeFunc(
+			prometheus.GaugeOpts{
+				Name: "equinix_api_waiting",
+				Help: "Number of API requests pending to be sent to Equinix but waiting on semaphore",
+			},
+			func() float64 {
+				_, waiting := ser.stats()
+				return float64(waiting)
+			},
+		),
+		inFlight: prometheus.NewGaugeFunc(
+			prometheus.GaugeOpts{
+				Name: "equinix_api_in_flight",
+				Help: "Number of API requests currently being processed by Equinix",
+			},
+			func() float64 {
+				inFlight, _ := ser.stats()
+				return float64(inFlight)
+			},
+		),
+	}
+}
+
+// getEndpointForPath converts from an Equinix API method and path (eg.
+// /metal/v1/devices/deadbeef) into an 'endpoint' name, which is an imaginary,
+// Monogon-specific name for the API endpoint accessed by this call.
+//
+// If the given path is unknown and thus cannot be converted to an endpoint name,
+// 'Unknown' is return and a warning is logged.
+//
+// We use this function to partition request statistics per API 'endpoint'. An
+// alternative to this would be to record high-level packngo function names, but
+// one packngo function call might actually emit multiple HTTP API requests - so
+// we're stuck recording the low-level requests and gathering statistics from
+// there instead.
+func getEndpointForPath(method, path string) string {
+	path = strings.TrimPrefix(path, "/metal/v1")
+	for name, match := range endpointNames {
+		if match.matches(method, path) {
+			return name
+		}
+	}
+	klog.Warningf("Unknown Equinix API %s %s - cannot determine metric endpoint name", method, path)
+	return "Unknown"
+}
+
+// requestMatch is used to match a HTTP request method/path.
+type requestMatch struct {
+	method string
+	regexp *regexp.Regexp
+}
+
+func (r *requestMatch) matches(method, path string) bool {
+	if r.method != method {
+		return false
+	}
+	return r.regexp.MatchString(path)
+}
+
+var (
+	endpointNames = map[string]requestMatch{
+		"GetDevice":           {"GET", regexp.MustCompile(`^/devices/[^/]+$`)},
+		"ListDevices":         {"GET", regexp.MustCompile(`^/(organizations|projects)/[^/]+/devices$`)},
+		"CreateDevice":        {"POST", regexp.MustCompile(`^/projects/[^/]+/devices$`)},
+		"ListReservations":    {"GET", regexp.MustCompile(`^/project/[^/]+/hardware-reservations$`)},
+		"ListSSHKeys":         {"GET", regexp.MustCompile(`^/ssh-keys$`)},
+		"CreateSSHKey":        {"POST", regexp.MustCompile(`^/project/[^/]+/ssh-keys$`)},
+		"GetSSHKey":           {"GET", regexp.MustCompile(`^/ssh-keys/[^/]+$`)},
+		"UpdateSSHKey":        {"PATCH", regexp.MustCompile(`^/ssh-keys/[^/]+$`)},
+		"PerformDeviceAction": {"POST", regexp.MustCompile(`^/devices/[^/]+/actions$`)},
+	}
+)
+
+// onAPIRequestDone is called by the wrapngo code on every API response from
+// Equinix, and records the given parameters into metrics.
+func (m *metricsSet) onAPIRequestDone(req *http.Request, res *http.Response, err error, latency time.Duration) {
+	if m == nil {
+		return
+	}
+
+	code := "unknown"
+	if err == nil {
+		code = fmt.Sprintf("%d", res.StatusCode)
+	} else {
+		switch {
+		case errors.Is(err, context.Canceled):
+			code = "ctx canceled"
+		case errors.Is(err, context.DeadlineExceeded):
+			code = "deadline exceeded"
+		}
+	}
+	if code == "unknown" {
+		klog.Warningf("Unexpected HTTP result: req %s %s, error: %v", req.Method, req.URL.Path, res)
+	}
+
+	endpoint := getEndpointForPath(req.Method, req.URL.Path)
+	m.requestLatencies.With(prometheus.Labels{"endpoint": endpoint, "status_code": code}).Observe(latency.Seconds())
+}
diff --git a/cloud/shepherd/equinix/wrapngo/wrapn.go b/cloud/shepherd/equinix/wrapngo/wrapn.go
index 3a0b96d..6680d9b 100644
--- a/cloud/shepherd/equinix/wrapngo/wrapn.go
+++ b/cloud/shepherd/equinix/wrapngo/wrapn.go
@@ -43,11 +43,13 @@
 	"flag"
 	"fmt"
 	"net/http"
+	"sync/atomic"
 	"time"
 
 	"github.com/cenkalti/backoff/v4"
 	"github.com/google/uuid"
 	"github.com/packethost/packngo"
+	"github.com/prometheus/client_golang/prometheus"
 )
 
 // Opts conveys configurable Client parameters.
@@ -74,6 +76,8 @@
 	//
 	// If not defined (ie. 0), defaults to 1.
 	Parallelism int
+
+	MetricsRegistry *prometheus.Registry
 }
 
 func (o *Opts) RegisterFlags() {
@@ -127,9 +131,48 @@
 	o        *Opts
 	rlt      *time.Ticker
 
-	// serializer is a N-semaphore channel (configured by opts.Parallelism) which is
-	// used to limit the number of concurrent calls to the Equinix API.
-	serializer chan (struct{})
+	serializer *serializer
+	metrics    *metricsSet
+}
+
+// serializer is an N-semaphore channel (configured by opts.Parallelism) which is
+// used to limit the number of concurrent calls to the Equinix API.
+//
+// In addition, it implements some simple waiting/usage statistics for
+// metrics/introspection.
+type serializer struct {
+	sem     chan struct{}
+	usage   int64
+	waiting int64
+}
+
+// up blocks until the serializer has at least one available concurrent call
+// slot. If the given context expires before such a slot is available, the
+// context error is returned.
+func (s *serializer) up(ctx context.Context) error {
+	atomic.AddInt64(&s.waiting, 1)
+	select {
+	case s.sem <- struct{}{}:
+		atomic.AddInt64(&s.waiting, -1)
+		atomic.AddInt64(&s.usage, 1)
+		return nil
+	case <-ctx.Done():
+		atomic.AddInt64(&s.waiting, -1)
+		return ctx.Err()
+	}
+}
+
+// down releases a previously acquire concurrent call slot.
+func (s *serializer) down() {
+	atomic.AddInt64(&s.usage, -1)
+	<-s.sem
+}
+
+// stats returns the number of in-flight and waiting-for-semaphore requests.
+func (s *serializer) stats() (usage, waiting int64) {
+	usage = atomic.LoadInt64(&s.usage)
+	waiting = atomic.LoadInt64(&s.waiting)
+	return
 }
 
 // New creates a Client instance based on Opts. PACKNGO_DEBUG environment
@@ -153,14 +196,22 @@
 		opts.Parallelism = 1
 	}
 
-	return &client{
+	cl := &client{
 		username: opts.User,
 		token:    opts.APIKey,
 		o:        opts,
 		rlt:      time.NewTicker(opts.APIRate),
 
-		serializer: make(chan struct{}, opts.Parallelism),
+		serializer: &serializer{
+			sem: make(chan struct{}, opts.Parallelism),
+		},
 	}
+	if opts.MetricsRegistry != nil {
+		ms := newMetricsSet(cl.serializer)
+		opts.MetricsRegistry.MustRegister(ms.inFlight, ms.waiting, ms.requestLatencies)
+		cl.metrics = ms
+	}
+	return cl
 }
 
 func (c *client) Close() {