blob: 7bd4522b2208674c9e3029e48b8f155167307a9e [file] [log] [blame]
Mateusz Zalega6a058e72022-11-30 18:03:07 +01001// Package wrapngo wraps packngo methods providing the following usability
2// enhancements:
3// - API call rate limiting
4// - resource-aware call retries
5// - use of a configurable back-off algorithm implementation
6// - context awareness
7//
8// The implementation is provided with the following caveats:
9//
10// There can be only one call in flight. Concurrent calls to API-related
11// methods of the same client will block. Calls returning packngo structs will
12// return nil data when a non-nil error value is returned. An
13// os.ErrDeadlineExceeded will be returned after the underlying API calls time
14// out beyond the chosen back-off algorithm implementation's maximum allowed
15// retry interval. Other errors, excluding context.Canceled and
16// context.DeadlineExceeded, indicate either an error originating at Equinix'
17// API endpoint (which may still stem from invalid call inputs), or a network
18// error.
19//
20// Packngo wrappers included below may return timeout errors even after the
21// wrapped calls succeed in the event server reply could not have been
22// received.
23//
24// This implies that effects of mutating calls can't always be verified
25// atomically, requiring explicit synchronization between API users, regardless
26// of the retry/recovery logic used.
27//
28// Having that in mind, some call wrappers exposed by this package will attempt
29// to recover from this kind of situations by requesting information on any
30// resources created, and retrying the call if needed. This approach assumes
31// any concurrent mutating API users will be synchronized, as it should be in
32// any case.
33//
34// Another way of handling this problem would be to leave it up to the user to
35// retry calls if needed, though this would leak Equinix Metal API, and
36// complicate implementations depending on this package. Due to that, the prior
37// approach was chosen.
38package wrapngo
39
40import (
41 "context"
42 "errors"
43 "flag"
44 "fmt"
45 "net/http"
Serge Bazanskidea7cd02023-04-26 13:58:17 +020046 "sync/atomic"
Mateusz Zalega6a058e72022-11-30 18:03:07 +010047 "time"
48
49 "github.com/cenkalti/backoff/v4"
50 "github.com/google/uuid"
51 "github.com/packethost/packngo"
Serge Bazanskidea7cd02023-04-26 13:58:17 +020052 "github.com/prometheus/client_golang/prometheus"
Mateusz Zalega6a058e72022-11-30 18:03:07 +010053)
54
55// Opts conveys configurable Client parameters.
56type Opts struct {
57 // User and APIKey are the credentials used to authenticate with
58 // Metal API.
59
60 User string
61 APIKey string
62
63 // Optional parameters:
64
65 // BackOff controls the client's behavior in the event of API calls failing
66 // due to IO timeouts by adjusting the lower bound on time taken between
67 // subsequent calls.
68 BackOff func() backoff.BackOff
69
70 // APIRate is the minimum time taken between subsequent API calls.
71 APIRate time.Duration
Serge Bazanski7448f282023-02-20 14:15:51 +010072
73 // Parallelism defines how many calls to the Equinix API will be issued in
74 // parallel. When this limit is reached, subsequent attmepts to call the API will
75 // block. The order of serving of pending calls is currently undefined.
76 //
77 // If not defined (ie. 0), defaults to 1.
78 Parallelism int
Serge Bazanskidea7cd02023-04-26 13:58:17 +020079
80 MetricsRegistry *prometheus.Registry
Mateusz Zalega6a058e72022-11-30 18:03:07 +010081}
82
83func (o *Opts) RegisterFlags() {
84 flag.StringVar(&o.User, "equinix_api_username", "", "Username for Equinix API")
85 flag.StringVar(&o.APIKey, "equinix_api_key", "", "Key/token/password for Equinix API")
Serge Bazanskiafd3cf82023-04-19 17:43:46 +020086 flag.IntVar(&o.Parallelism, "equinix_parallelism", 3, "How many parallel connections to the Equinix API will be allowed")
Mateusz Zalega6a058e72022-11-30 18:03:07 +010087}
88
89// Client is a limited interface of methods that the Shepherd uses on Equinix. It
90// is provided to allow for dependency injection of a fake equinix API for tests.
91type Client interface {
92 // GetDevice wraps packngo's cl.Devices.Get.
Serge Bazanski4969fd72023-04-19 17:43:12 +020093 //
94 // TODO(q3k): remove unused pid parameter.
95 GetDevice(ctx context.Context, pid, did string, opts *packngo.ListOptions) (*packngo.Device, error)
Mateusz Zalega6a058e72022-11-30 18:03:07 +010096 // ListDevices wraps packngo's cl.Device.List.
97 ListDevices(ctx context.Context, pid string) ([]packngo.Device, error)
98 // CreateDevice attempts to create a new device according to the provided
99 // request. The request _must_ configure a HardwareReservationID. This call
100 // attempts to be as idempotent as possible, and will return ErrRaceLost if a
101 // retry was needed but in the meantime the requested hardware reservation from
102 // which this machine was requested got lost.
103 CreateDevice(ctx context.Context, request *packngo.DeviceCreateRequest) (*packngo.Device, error)
Tim Windelschmidt72a903f2023-06-27 15:49:36 +0200104
105 UpdateDevice(ctx context.Context, id string, request *packngo.DeviceUpdateRequest) (*packngo.Device, error)
Tim Windelschmidt8180de92023-05-11 19:45:37 +0200106 RebootDevice(ctx context.Context, did string) error
107 DeleteDevice(ctx context.Context, id string) error
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100108
109 // ListReservations returns a complete list of hardware reservations associated
110 // with project pid. This is an expensive method that takes a while to execute,
111 // handle with care.
112 ListReservations(ctx context.Context, pid string) ([]packngo.HardwareReservation, error)
Tim Windelschmidt8180de92023-05-11 19:45:37 +0200113 // MoveReservation moves a reserved device to the given project.
114 MoveReservation(ctx context.Context, hardwareReservationDID, projectID string) (*packngo.HardwareReservation, error)
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100115
116 // ListSSHKeys wraps packngo's cl.Keys.List.
117 ListSSHKeys(ctx context.Context) ([]packngo.SSHKey, error)
118 // CreateSSHKey is idempotent - the key label can be used only once. Further
119 // calls referring to the same label and key will not yield errors. See the
120 // package comment for more info on this method's behavior and returned error
121 // values.
122 CreateSSHKey(ctx context.Context, req *packngo.SSHKeyCreateRequest) (*packngo.SSHKey, error)
123 // UpdateSSHKey is idempotent - values included in r can be applied only once,
124 // while subsequent updates using the same data don't produce errors. See the
125 // package comment for information on this method's behavior and returned error
126 // values.
127 UpdateSSHKey(ctx context.Context, kid string, req *packngo.SSHKeyUpdateRequest) (*packngo.SSHKey, error)
128
129 Close()
130}
131
132// client implements the Client interface.
133type client struct {
134 username string
135 token string
136 o *Opts
137 rlt *time.Ticker
138
Serge Bazanskidea7cd02023-04-26 13:58:17 +0200139 serializer *serializer
140 metrics *metricsSet
141}
142
143// serializer is an N-semaphore channel (configured by opts.Parallelism) which is
144// used to limit the number of concurrent calls to the Equinix API.
145//
146// In addition, it implements some simple waiting/usage statistics for
147// metrics/introspection.
148type serializer struct {
149 sem chan struct{}
150 usage int64
151 waiting int64
152}
153
154// up blocks until the serializer has at least one available concurrent call
155// slot. If the given context expires before such a slot is available, the
156// context error is returned.
157func (s *serializer) up(ctx context.Context) error {
158 atomic.AddInt64(&s.waiting, 1)
159 select {
160 case s.sem <- struct{}{}:
161 atomic.AddInt64(&s.waiting, -1)
162 atomic.AddInt64(&s.usage, 1)
163 return nil
164 case <-ctx.Done():
165 atomic.AddInt64(&s.waiting, -1)
166 return ctx.Err()
167 }
168}
169
170// down releases a previously acquire concurrent call slot.
171func (s *serializer) down() {
172 atomic.AddInt64(&s.usage, -1)
173 <-s.sem
174}
175
176// stats returns the number of in-flight and waiting-for-semaphore requests.
177func (s *serializer) stats() (usage, waiting int64) {
178 usage = atomic.LoadInt64(&s.usage)
179 waiting = atomic.LoadInt64(&s.waiting)
180 return
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100181}
182
183// New creates a Client instance based on Opts. PACKNGO_DEBUG environment
184// variable can be set prior to the below call to enable verbose packngo
185// debug logs.
186func New(opts *Opts) Client {
187 return new(opts)
188}
189
190func new(opts *Opts) *client {
191 // Apply the defaults.
192 if opts.APIRate == 0 {
193 opts.APIRate = 2 * time.Second
194 }
195 if opts.BackOff == nil {
196 opts.BackOff = func() backoff.BackOff {
197 return backoff.NewExponentialBackOff()
198 }
199 }
Serge Bazanski7448f282023-02-20 14:15:51 +0100200 if opts.Parallelism == 0 {
201 opts.Parallelism = 1
202 }
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100203
Serge Bazanskidea7cd02023-04-26 13:58:17 +0200204 cl := &client{
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100205 username: opts.User,
206 token: opts.APIKey,
207 o: opts,
208 rlt: time.NewTicker(opts.APIRate),
209
Serge Bazanskidea7cd02023-04-26 13:58:17 +0200210 serializer: &serializer{
211 sem: make(chan struct{}, opts.Parallelism),
212 },
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100213 }
Serge Bazanskidea7cd02023-04-26 13:58:17 +0200214 if opts.MetricsRegistry != nil {
215 ms := newMetricsSet(cl.serializer)
216 opts.MetricsRegistry.MustRegister(ms.inFlight, ms.waiting, ms.requestLatencies)
217 cl.metrics = ms
218 }
219 return cl
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100220}
221
222func (c *client) Close() {
223 c.rlt.Stop()
224}
225
226var (
227 ErrRaceLost = errors.New("race lost with another API user")
228 ErrNoReservationProvided = errors.New("hardware reservation must be set")
229)
230
Tim Windelschmidt8180de92023-05-11 19:45:37 +0200231func (e *client) PowerOffDevice(ctx context.Context, pid string) error {
232 _, err := wrap(ctx, e, func(p *packngo.Client) (*packngo.Response, error) {
233 r, err := p.Devices.PowerOff(pid)
234 if err != nil {
235 return nil, fmt.Errorf("Devices.PowerOff: %w", err)
236 }
237 return r, nil
238 })
239 return err
240}
241
242func (e *client) PowerOnDevice(ctx context.Context, pid string) error {
243 _, err := wrap(ctx, e, func(p *packngo.Client) (*packngo.Response, error) {
244 r, err := p.Devices.PowerOn(pid)
245 if err != nil {
246 return nil, fmt.Errorf("Devices.PowerOn: %w", err)
247 }
248 return r, nil
249 })
250 return err
251}
252
253func (e *client) DeleteDevice(ctx context.Context, id string) error {
254 _, err := wrap(ctx, e, func(p *packngo.Client) (*packngo.Response, error) {
255 r, err := p.Devices.Delete(id, false)
256 if err != nil {
257 return nil, fmt.Errorf("Devices.Delete: %w", err)
258 }
259 return r, nil
260 })
261 return err
262}
263
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100264func (e *client) CreateDevice(ctx context.Context, r *packngo.DeviceCreateRequest) (*packngo.Device, error) {
265 if r.HardwareReservationID == "" {
266 return nil, ErrNoReservationProvided
267 }
268 // Add a tag to the request to detect if someone snatches a hardware reservation
269 // from under us.
270 witnessTag := fmt.Sprintf("wrapngo-idempotency-%s", uuid.New().String())
271 r.Tags = append(r.Tags, witnessTag)
272
273 return wrap(ctx, e, func(cl *packngo.Client) (*packngo.Device, error) {
274 //Does the device already exist?
275 res, _, err := cl.HardwareReservations.Get(r.HardwareReservationID, nil)
276 if err != nil {
277 return nil, fmt.Errorf("couldn't check if device already exists: %w", err)
278 }
279 if res == nil {
280 return nil, fmt.Errorf("unexpected nil response")
281 }
282 if res.Device != nil {
283 // Check if we lost the race for this hardware reservation.
284 tags := make(map[string]bool)
285 for _, tag := range res.Device.Tags {
286 tags[tag] = true
287 }
288 if !tags[witnessTag] {
289 return nil, ErrRaceLost
290 }
291 return res.Device, nil
292 }
293
294 // No device yet. Try to create it.
295 dev, _, err := cl.Devices.Create(r)
296 if err == nil {
297 return dev, nil
298 }
299 // In case of a transient failure (eg. network issue), we retry the whole
300 // operation, which means we first check again if the device already exists. If
301 // it's a permanent error from the API, the backoff logic will fail immediately.
302 return nil, fmt.Errorf("couldn't create device: %w", err)
303 })
304}
305
Tim Windelschmidt72a903f2023-06-27 15:49:36 +0200306func (e *client) UpdateDevice(ctx context.Context, id string, r *packngo.DeviceUpdateRequest) (*packngo.Device, error) {
307 return wrap(ctx, e, func(cl *packngo.Client) (*packngo.Device, error) {
308 dev, _, err := cl.Devices.Update(id, r)
309 return dev, err
310 })
311}
312
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100313func (e *client) ListDevices(ctx context.Context, pid string) ([]packngo.Device, error) {
314 return wrap(ctx, e, func(cl *packngo.Client) ([]packngo.Device, error) {
Tim Windelschmidtd1b17472023-04-18 03:49:12 +0200315 // to increase the chances of a stable pagination, we sort the devices by hostname
316 res, _, err := cl.Devices.List(pid, &packngo.GetOptions{SortBy: "hostname"})
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100317 return res, err
318 })
319}
320
Serge Bazanski4969fd72023-04-19 17:43:12 +0200321func (e *client) GetDevice(ctx context.Context, pid, did string, opts *packngo.ListOptions) (*packngo.Device, error) {
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100322 return wrap(ctx, e, func(cl *packngo.Client) (*packngo.Device, error) {
Serge Bazanski4969fd72023-04-19 17:43:12 +0200323 d, _, err := cl.Devices.Get(did, opts)
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100324 return d, err
325 })
326}
327
328// Currently unexported, only used in tests.
329func (e *client) deleteDevice(ctx context.Context, did string) error {
330 _, err := wrap(ctx, e, func(cl *packngo.Client) (*struct{}, error) {
331 _, err := cl.Devices.Delete(did, false)
332 if httpStatusCode(err) == http.StatusNotFound {
333 // 404s may pop up as an after effect of running the back-off
334 // algorithm, and as such should not be propagated.
335 return nil, nil
336 }
337 return nil, err
338 })
339 return err
340}
341
342func (e *client) ListReservations(ctx context.Context, pid string) ([]packngo.HardwareReservation, error) {
343 return wrap(ctx, e, func(cl *packngo.Client) ([]packngo.HardwareReservation, error) {
Tim Windelschmidt8180de92023-05-11 19:45:37 +0200344 res, _, err := cl.HardwareReservations.List(pid, &packngo.ListOptions{Includes: []string{"facility", "device"}})
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100345 return res, err
346 })
347}
348
Tim Windelschmidt8180de92023-05-11 19:45:37 +0200349func (e *client) MoveReservation(ctx context.Context, hardwareReservationDID, projectID string) (*packngo.HardwareReservation, error) {
350 return wrap(ctx, e, func(cl *packngo.Client) (*packngo.HardwareReservation, error) {
351 hr, _, err := cl.HardwareReservations.Move(hardwareReservationDID, projectID)
352 if err != nil {
353 return nil, fmt.Errorf("HardwareReservations.Move: %w", err)
354 }
355 return hr, err
356 })
357}
358
Mateusz Zalega6a058e72022-11-30 18:03:07 +0100359func (e *client) CreateSSHKey(ctx context.Context, r *packngo.SSHKeyCreateRequest) (*packngo.SSHKey, error) {
360 return wrap(ctx, e, func(cl *packngo.Client) (*packngo.SSHKey, error) {
361 // Does the key already exist?
362 ks, _, err := cl.SSHKeys.List()
363 if err != nil {
364 return nil, fmt.Errorf("SSHKeys.List: %w", err)
365 }
366 for _, k := range ks {
367 if k.Label == r.Label {
368 if k.Key != r.Key {
369 return nil, fmt.Errorf("key label already in use for a different key")
370 }
371 return &k, nil
372 }
373 }
374
375 // No key yet. Try to create it.
376 k, _, err := cl.SSHKeys.Create(r)
377 if err != nil {
378 return nil, fmt.Errorf("SSHKeys.Create: %w", err)
379 }
380 return k, nil
381 })
382}
383
384func (e *client) UpdateSSHKey(ctx context.Context, id string, r *packngo.SSHKeyUpdateRequest) (*packngo.SSHKey, error) {
385 return wrap(ctx, e, func(cl *packngo.Client) (*packngo.SSHKey, error) {
386 k, _, err := cl.SSHKeys.Update(id, r)
387 if err != nil {
388 return nil, fmt.Errorf("SSHKeys.Update: %w", err)
389 }
390 return k, err
391 })
392}
393
394// Currently unexported, only used in tests.
395func (e *client) deleteSSHKey(ctx context.Context, id string) error {
396 _, err := wrap(ctx, e, func(cl *packngo.Client) (struct{}, error) {
397 _, err := cl.SSHKeys.Delete(id)
398 if err != nil {
399 return struct{}{}, fmt.Errorf("SSHKeys.Delete: %w", err)
400 }
401 return struct{}{}, err
402 })
403 return err
404}
405
406func (e *client) ListSSHKeys(ctx context.Context) ([]packngo.SSHKey, error) {
407 return wrap(ctx, e, func(cl *packngo.Client) ([]packngo.SSHKey, error) {
408 ks, _, err := cl.SSHKeys.List()
409 if err != nil {
410 return nil, fmt.Errorf("SSHKeys.List: %w", err)
411 }
412 return ks, nil
413 })
414}
415
416// Currently unexported, only used in tests.
417func (e *client) getSSHKey(ctx context.Context, id string) (*packngo.SSHKey, error) {
418 return wrap(ctx, e, func(cl *packngo.Client) (*packngo.SSHKey, error) {
419 k, _, err := cl.SSHKeys.Get(id, nil)
420 if err != nil {
421 return nil, fmt.Errorf("SSHKeys.Get: %w", err)
422 }
423 return k, nil
424 })
425}
Serge Bazanskiae004682023-04-18 13:28:48 +0200426
427func (e *client) RebootDevice(ctx context.Context, did string) error {
428 _, err := wrap(ctx, e, func(cl *packngo.Client) (struct{}, error) {
429 _, err := cl.Devices.Reboot(did)
430 return struct{}{}, err
431 })
432 return err
433}