cloud/shepherd: add equinix API metrics

This adds the following signals to our interaction with the Equinix API:

 1. Latency
 2. Traffic
 3. Errors
 4. Saturation

Change-Id: Ic2d5e36a7a26ab906ac1c2fa6741ebf86b9e551a
Reviewed-on: https://review.monogon.dev/c/monogon/+/1606
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/shepherd/equinix/wrapngo/wrapn.go b/cloud/shepherd/equinix/wrapngo/wrapn.go
index 3a0b96d..6680d9b 100644
--- a/cloud/shepherd/equinix/wrapngo/wrapn.go
+++ b/cloud/shepherd/equinix/wrapngo/wrapn.go
@@ -43,11 +43,13 @@
 	"flag"
 	"fmt"
 	"net/http"
+	"sync/atomic"
 	"time"
 
 	"github.com/cenkalti/backoff/v4"
 	"github.com/google/uuid"
 	"github.com/packethost/packngo"
+	"github.com/prometheus/client_golang/prometheus"
 )
 
 // Opts conveys configurable Client parameters.
@@ -74,6 +76,8 @@
 	//
 	// If not defined (ie. 0), defaults to 1.
 	Parallelism int
+
+	MetricsRegistry *prometheus.Registry
 }
 
 func (o *Opts) RegisterFlags() {
@@ -127,9 +131,48 @@
 	o        *Opts
 	rlt      *time.Ticker
 
-	// serializer is a N-semaphore channel (configured by opts.Parallelism) which is
-	// used to limit the number of concurrent calls to the Equinix API.
-	serializer chan (struct{})
+	serializer *serializer
+	metrics    *metricsSet
+}
+
+// serializer is an N-semaphore channel (configured by opts.Parallelism) which is
+// used to limit the number of concurrent calls to the Equinix API.
+//
+// In addition, it implements some simple waiting/usage statistics for
+// metrics/introspection.
+type serializer struct {
+	sem     chan struct{}
+	usage   int64
+	waiting int64
+}
+
+// up blocks until the serializer has at least one available concurrent call
+// slot. If the given context expires before such a slot is available, the
+// context error is returned.
+func (s *serializer) up(ctx context.Context) error {
+	atomic.AddInt64(&s.waiting, 1)
+	select {
+	case s.sem <- struct{}{}:
+		atomic.AddInt64(&s.waiting, -1)
+		atomic.AddInt64(&s.usage, 1)
+		return nil
+	case <-ctx.Done():
+		atomic.AddInt64(&s.waiting, -1)
+		return ctx.Err()
+	}
+}
+
+// down releases a previously acquire concurrent call slot.
+func (s *serializer) down() {
+	atomic.AddInt64(&s.usage, -1)
+	<-s.sem
+}
+
+// stats returns the number of in-flight and waiting-for-semaphore requests.
+func (s *serializer) stats() (usage, waiting int64) {
+	usage = atomic.LoadInt64(&s.usage)
+	waiting = atomic.LoadInt64(&s.waiting)
+	return
 }
 
 // New creates a Client instance based on Opts. PACKNGO_DEBUG environment
@@ -153,14 +196,22 @@
 		opts.Parallelism = 1
 	}
 
-	return &client{
+	cl := &client{
 		username: opts.User,
 		token:    opts.APIKey,
 		o:        opts,
 		rlt:      time.NewTicker(opts.APIRate),
 
-		serializer: make(chan struct{}, opts.Parallelism),
+		serializer: &serializer{
+			sem: make(chan struct{}, opts.Parallelism),
+		},
 	}
+	if opts.MetricsRegistry != nil {
+		ms := newMetricsSet(cl.serializer)
+		opts.MetricsRegistry.MustRegister(ms.inFlight, ms.waiting, ms.requestLatencies)
+		cl.metrics = ms
+	}
+	return cl
 }
 
 func (c *client) Close() {