cloud/shepherd: add equinix API metrics
This adds the following signals to our interaction with the Equinix API:
1. Latency
2. Traffic
3. Errors
4. Saturation
Change-Id: Ic2d5e36a7a26ab906ac1c2fa6741ebf86b9e551a
Reviewed-on: https://review.monogon.dev/c/monogon/+/1606
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/shepherd/equinix/wrapngo/wrapn.go b/cloud/shepherd/equinix/wrapngo/wrapn.go
index 3a0b96d..6680d9b 100644
--- a/cloud/shepherd/equinix/wrapngo/wrapn.go
+++ b/cloud/shepherd/equinix/wrapngo/wrapn.go
@@ -43,11 +43,13 @@
"flag"
"fmt"
"net/http"
+ "sync/atomic"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/packethost/packngo"
+ "github.com/prometheus/client_golang/prometheus"
)
// Opts conveys configurable Client parameters.
@@ -74,6 +76,8 @@
//
// If not defined (ie. 0), defaults to 1.
Parallelism int
+
+ MetricsRegistry *prometheus.Registry
}
func (o *Opts) RegisterFlags() {
@@ -127,9 +131,48 @@
o *Opts
rlt *time.Ticker
- // serializer is a N-semaphore channel (configured by opts.Parallelism) which is
- // used to limit the number of concurrent calls to the Equinix API.
- serializer chan (struct{})
+ serializer *serializer
+ metrics *metricsSet
+}
+
+// serializer is an N-semaphore channel (configured by opts.Parallelism) which is
+// used to limit the number of concurrent calls to the Equinix API.
+//
+// In addition, it implements some simple waiting/usage statistics for
+// metrics/introspection.
+type serializer struct {
+ sem chan struct{}
+ usage int64
+ waiting int64
+}
+
+// up blocks until the serializer has at least one available concurrent call
+// slot. If the given context expires before such a slot is available, the
+// context error is returned.
+func (s *serializer) up(ctx context.Context) error {
+ atomic.AddInt64(&s.waiting, 1)
+ select {
+ case s.sem <- struct{}{}:
+ atomic.AddInt64(&s.waiting, -1)
+ atomic.AddInt64(&s.usage, 1)
+ return nil
+ case <-ctx.Done():
+ atomic.AddInt64(&s.waiting, -1)
+ return ctx.Err()
+ }
+}
+
+// down releases a previously acquire concurrent call slot.
+func (s *serializer) down() {
+ atomic.AddInt64(&s.usage, -1)
+ <-s.sem
+}
+
+// stats returns the number of in-flight and waiting-for-semaphore requests.
+func (s *serializer) stats() (usage, waiting int64) {
+ usage = atomic.LoadInt64(&s.usage)
+ waiting = atomic.LoadInt64(&s.waiting)
+ return
}
// New creates a Client instance based on Opts. PACKNGO_DEBUG environment
@@ -153,14 +196,22 @@
opts.Parallelism = 1
}
- return &client{
+ cl := &client{
username: opts.User,
token: opts.APIKey,
o: opts,
rlt: time.NewTicker(opts.APIRate),
- serializer: make(chan struct{}, opts.Parallelism),
+ serializer: &serializer{
+ sem: make(chan struct{}, opts.Parallelism),
+ },
}
+ if opts.MetricsRegistry != nil {
+ ms := newMetricsSet(cl.serializer)
+ opts.MetricsRegistry.MustRegister(ms.inFlight, ms.waiting, ms.requestLatencies)
+ cl.metrics = ms
+ }
+ return cl
}
func (c *client) Close() {