cloud/shepherd: add equinix API metrics
This adds the following signals to our interaction with the Equinix API:
1. Latency
2. Traffic
3. Errors
4. Saturation
Change-Id: Ic2d5e36a7a26ab906ac1c2fa6741ebf86b9e551a
Reviewed-on: https://review.monogon.dev/c/monogon/+/1606
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/shepherd/equinix/manager/server/main.go b/cloud/shepherd/equinix/manager/server/main.go
index 21e7e5a..984d96f 100644
--- a/cloud/shepherd/equinix/manager/server/main.go
+++ b/cloud/shepherd/equinix/manager/server/main.go
@@ -68,6 +68,7 @@
if c.API.APIKey == "" || c.API.User == "" {
klog.Exitf("-equinix_api_username and -equinix_api_key must be set")
}
+ c.API.MetricsRegistry = registry
api := wrapngo.New(&c.API)
// These variables are _very_ important to configure correctly, otherwise someone
diff --git a/cloud/shepherd/equinix/wrapngo/BUILD.bazel b/cloud/shepherd/equinix/wrapngo/BUILD.bazel
index 166ee76..521e1ca 100644
--- a/cloud/shepherd/equinix/wrapngo/BUILD.bazel
+++ b/cloud/shepherd/equinix/wrapngo/BUILD.bazel
@@ -4,6 +4,7 @@
name = "wrapngo",
srcs = [
"duct_tape.go",
+ "metrics.go",
"wrapn.go",
],
importpath = "source.monogon.dev/cloud/shepherd/equinix/wrapngo",
@@ -12,6 +13,7 @@
"@com_github_cenkalti_backoff_v4//:backoff",
"@com_github_google_uuid//:uuid",
"@com_github_packethost_packngo//:packngo",
+ "@com_github_prometheus_client_golang//prometheus",
"@io_k8s_klog_v2//:klog",
],
)
diff --git a/cloud/shepherd/equinix/wrapngo/duct_tape.go b/cloud/shepherd/equinix/wrapngo/duct_tape.go
index f58cf7b..d5dab7c 100644
--- a/cloud/shepherd/equinix/wrapngo/duct_tape.go
+++ b/cloud/shepherd/equinix/wrapngo/duct_tape.go
@@ -5,6 +5,7 @@
"errors"
"fmt"
"net/http"
+ "time"
"github.com/cenkalti/backoff/v4"
"github.com/packethost/packngo"
@@ -26,14 +27,10 @@
// idempotent logic, as long as it cooperates with the above contract.
func wrap[U any](ctx context.Context, cl *client, fn func(*packngo.Client) (U, error)) (U, error) {
var zero U
- select {
- case cl.serializer <- struct{}{}:
- case <-ctx.Done():
- return zero, ctx.Err()
+ if err := cl.serializer.up(ctx); err != nil {
+ return zero, err
}
- defer func() {
- <-cl.serializer
- }()
+ defer cl.serializer.down()
bc := backoff.WithContext(cl.o.BackOff(), ctx)
pngo, err := cl.clientForContext(ctx)
@@ -60,11 +57,16 @@
type injectContextRoundTripper struct {
ctx context.Context
original http.RoundTripper
+ metrics *metricsSet
}
func (r *injectContextRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
klog.V(5).Infof("Request -> %v", req.URL.String())
+ start := time.Now()
res, err := r.original.RoundTrip(req.WithContext(r.ctx))
+ latency := time.Since(start)
+ r.metrics.onAPIRequestDone(req, res, err, latency)
+
if err != nil {
klog.V(5).Infof("HTTP error <- %v", err)
} else {
@@ -78,6 +80,7 @@
Transport: &injectContextRoundTripper{
ctx: ctx,
original: http.DefaultTransport,
+ metrics: c.metrics,
},
}
return packngo.NewClient(packngo.WithAuth(c.username, c.token), packngo.WithHTTPClient(httpcl))
diff --git a/cloud/shepherd/equinix/wrapngo/metrics.go b/cloud/shepherd/equinix/wrapngo/metrics.go
new file mode 100644
index 0000000..0f4cc94
--- /dev/null
+++ b/cloud/shepherd/equinix/wrapngo/metrics.go
@@ -0,0 +1,129 @@
+package wrapngo
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net/http"
+ "regexp"
+ "strings"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "k8s.io/klog/v2"
+)
+
+// metricsSet contains all the Prometheus metrics collected by wrapngo.
+type metricsSet struct {
+ requestLatencies *prometheus.HistogramVec
+ waiting prometheus.GaugeFunc
+ inFlight prometheus.GaugeFunc
+}
+
+func newMetricsSet(ser *serializer) *metricsSet {
+ return &metricsSet{
+ requestLatencies: prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Name: "equinix_api_latency",
+ Help: "Equinix API request latency in seconds, partitioned by endpoint status code",
+ },
+ []string{"endpoint", "status_code"},
+ ),
+ waiting: prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Name: "equinix_api_waiting",
+ Help: "Number of API requests pending to be sent to Equinix but waiting on semaphore",
+ },
+ func() float64 {
+ _, waiting := ser.stats()
+ return float64(waiting)
+ },
+ ),
+ inFlight: prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Name: "equinix_api_in_flight",
+ Help: "Number of API requests currently being processed by Equinix",
+ },
+ func() float64 {
+ inFlight, _ := ser.stats()
+ return float64(inFlight)
+ },
+ ),
+ }
+}
+
+// getEndpointForPath converts from an Equinix API method and path (eg.
+// /metal/v1/devices/deadbeef) into an 'endpoint' name, which is an imaginary,
+// Monogon-specific name for the API endpoint accessed by this call.
+//
+// If the given path is unknown and thus cannot be converted to an endpoint name,
+// 'Unknown' is return and a warning is logged.
+//
+// We use this function to partition request statistics per API 'endpoint'. An
+// alternative to this would be to record high-level packngo function names, but
+// one packngo function call might actually emit multiple HTTP API requests - so
+// we're stuck recording the low-level requests and gathering statistics from
+// there instead.
+func getEndpointForPath(method, path string) string {
+ path = strings.TrimPrefix(path, "/metal/v1")
+ for name, match := range endpointNames {
+ if match.matches(method, path) {
+ return name
+ }
+ }
+ klog.Warningf("Unknown Equinix API %s %s - cannot determine metric endpoint name", method, path)
+ return "Unknown"
+}
+
+// requestMatch is used to match a HTTP request method/path.
+type requestMatch struct {
+ method string
+ regexp *regexp.Regexp
+}
+
+func (r *requestMatch) matches(method, path string) bool {
+ if r.method != method {
+ return false
+ }
+ return r.regexp.MatchString(path)
+}
+
+var (
+ endpointNames = map[string]requestMatch{
+ "GetDevice": {"GET", regexp.MustCompile(`^/devices/[^/]+$`)},
+ "ListDevices": {"GET", regexp.MustCompile(`^/(organizations|projects)/[^/]+/devices$`)},
+ "CreateDevice": {"POST", regexp.MustCompile(`^/projects/[^/]+/devices$`)},
+ "ListReservations": {"GET", regexp.MustCompile(`^/project/[^/]+/hardware-reservations$`)},
+ "ListSSHKeys": {"GET", regexp.MustCompile(`^/ssh-keys$`)},
+ "CreateSSHKey": {"POST", regexp.MustCompile(`^/project/[^/]+/ssh-keys$`)},
+ "GetSSHKey": {"GET", regexp.MustCompile(`^/ssh-keys/[^/]+$`)},
+ "UpdateSSHKey": {"PATCH", regexp.MustCompile(`^/ssh-keys/[^/]+$`)},
+ "PerformDeviceAction": {"POST", regexp.MustCompile(`^/devices/[^/]+/actions$`)},
+ }
+)
+
+// onAPIRequestDone is called by the wrapngo code on every API response from
+// Equinix, and records the given parameters into metrics.
+func (m *metricsSet) onAPIRequestDone(req *http.Request, res *http.Response, err error, latency time.Duration) {
+ if m == nil {
+ return
+ }
+
+ code := "unknown"
+ if err == nil {
+ code = fmt.Sprintf("%d", res.StatusCode)
+ } else {
+ switch {
+ case errors.Is(err, context.Canceled):
+ code = "ctx canceled"
+ case errors.Is(err, context.DeadlineExceeded):
+ code = "deadline exceeded"
+ }
+ }
+ if code == "unknown" {
+ klog.Warningf("Unexpected HTTP result: req %s %s, error: %v", req.Method, req.URL.Path, res)
+ }
+
+ endpoint := getEndpointForPath(req.Method, req.URL.Path)
+ m.requestLatencies.With(prometheus.Labels{"endpoint": endpoint, "status_code": code}).Observe(latency.Seconds())
+}
diff --git a/cloud/shepherd/equinix/wrapngo/wrapn.go b/cloud/shepherd/equinix/wrapngo/wrapn.go
index 3a0b96d..6680d9b 100644
--- a/cloud/shepherd/equinix/wrapngo/wrapn.go
+++ b/cloud/shepherd/equinix/wrapngo/wrapn.go
@@ -43,11 +43,13 @@
"flag"
"fmt"
"net/http"
+ "sync/atomic"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/packethost/packngo"
+ "github.com/prometheus/client_golang/prometheus"
)
// Opts conveys configurable Client parameters.
@@ -74,6 +76,8 @@
//
// If not defined (ie. 0), defaults to 1.
Parallelism int
+
+ MetricsRegistry *prometheus.Registry
}
func (o *Opts) RegisterFlags() {
@@ -127,9 +131,48 @@
o *Opts
rlt *time.Ticker
- // serializer is a N-semaphore channel (configured by opts.Parallelism) which is
- // used to limit the number of concurrent calls to the Equinix API.
- serializer chan (struct{})
+ serializer *serializer
+ metrics *metricsSet
+}
+
+// serializer is an N-semaphore channel (configured by opts.Parallelism) which is
+// used to limit the number of concurrent calls to the Equinix API.
+//
+// In addition, it implements some simple waiting/usage statistics for
+// metrics/introspection.
+type serializer struct {
+ sem chan struct{}
+ usage int64
+ waiting int64
+}
+
+// up blocks until the serializer has at least one available concurrent call
+// slot. If the given context expires before such a slot is available, the
+// context error is returned.
+func (s *serializer) up(ctx context.Context) error {
+ atomic.AddInt64(&s.waiting, 1)
+ select {
+ case s.sem <- struct{}{}:
+ atomic.AddInt64(&s.waiting, -1)
+ atomic.AddInt64(&s.usage, 1)
+ return nil
+ case <-ctx.Done():
+ atomic.AddInt64(&s.waiting, -1)
+ return ctx.Err()
+ }
+}
+
+// down releases a previously acquire concurrent call slot.
+func (s *serializer) down() {
+ atomic.AddInt64(&s.usage, -1)
+ <-s.sem
+}
+
+// stats returns the number of in-flight and waiting-for-semaphore requests.
+func (s *serializer) stats() (usage, waiting int64) {
+ usage = atomic.LoadInt64(&s.usage)
+ waiting = atomic.LoadInt64(&s.waiting)
+ return
}
// New creates a Client instance based on Opts. PACKNGO_DEBUG environment
@@ -153,14 +196,22 @@
opts.Parallelism = 1
}
- return &client{
+ cl := &client{
username: opts.User,
token: opts.APIKey,
o: opts,
rlt: time.NewTicker(opts.APIRate),
- serializer: make(chan struct{}, opts.Parallelism),
+ serializer: &serializer{
+ sem: make(chan struct{}, opts.Parallelism),
+ },
}
+ if opts.MetricsRegistry != nil {
+ ms := newMetricsSet(cl.serializer)
+ opts.MetricsRegistry.MustRegister(ms.inFlight, ms.waiting, ms.requestLatencies)
+ cl.metrics = ms
+ }
+ return cl
}
func (c *client) Close() {