Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 1 | // Package wrapngo wraps packngo methods providing the following usability |
| 2 | // enhancements: |
| 3 | // - API call rate limiting |
| 4 | // - resource-aware call retries |
| 5 | // - use of a configurable back-off algorithm implementation |
| 6 | // - context awareness |
| 7 | // |
| 8 | // The implementation is provided with the following caveats: |
| 9 | // |
| 10 | // There can be only one call in flight. Concurrent calls to API-related |
| 11 | // methods of the same client will block. Calls returning packngo structs will |
| 12 | // return nil data when a non-nil error value is returned. An |
| 13 | // os.ErrDeadlineExceeded will be returned after the underlying API calls time |
| 14 | // out beyond the chosen back-off algorithm implementation's maximum allowed |
| 15 | // retry interval. Other errors, excluding context.Canceled and |
| 16 | // context.DeadlineExceeded, indicate either an error originating at Equinix' |
| 17 | // API endpoint (which may still stem from invalid call inputs), or a network |
| 18 | // error. |
| 19 | // |
| 20 | // Packngo wrappers included below may return timeout errors even after the |
| 21 | // wrapped calls succeed in the event server reply could not have been |
| 22 | // received. |
| 23 | // |
| 24 | // This implies that effects of mutating calls can't always be verified |
| 25 | // atomically, requiring explicit synchronization between API users, regardless |
| 26 | // of the retry/recovery logic used. |
| 27 | // |
| 28 | // Having that in mind, some call wrappers exposed by this package will attempt |
| 29 | // to recover from this kind of situations by requesting information on any |
| 30 | // resources created, and retrying the call if needed. This approach assumes |
| 31 | // any concurrent mutating API users will be synchronized, as it should be in |
| 32 | // any case. |
| 33 | // |
| 34 | // Another way of handling this problem would be to leave it up to the user to |
| 35 | // retry calls if needed, though this would leak Equinix Metal API, and |
| 36 | // complicate implementations depending on this package. Due to that, the prior |
| 37 | // approach was chosen. |
| 38 | package wrapngo |
| 39 | |
| 40 | import ( |
| 41 | "context" |
| 42 | "errors" |
| 43 | "flag" |
| 44 | "fmt" |
| 45 | "net/http" |
Serge Bazanski | dea7cd0 | 2023-04-26 13:58:17 +0200 | [diff] [blame^] | 46 | "sync/atomic" |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 47 | "time" |
| 48 | |
| 49 | "github.com/cenkalti/backoff/v4" |
| 50 | "github.com/google/uuid" |
| 51 | "github.com/packethost/packngo" |
Serge Bazanski | dea7cd0 | 2023-04-26 13:58:17 +0200 | [diff] [blame^] | 52 | "github.com/prometheus/client_golang/prometheus" |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 53 | ) |
| 54 | |
| 55 | // Opts conveys configurable Client parameters. |
| 56 | type Opts struct { |
| 57 | // User and APIKey are the credentials used to authenticate with |
| 58 | // Metal API. |
| 59 | |
| 60 | User string |
| 61 | APIKey string |
| 62 | |
| 63 | // Optional parameters: |
| 64 | |
| 65 | // BackOff controls the client's behavior in the event of API calls failing |
| 66 | // due to IO timeouts by adjusting the lower bound on time taken between |
| 67 | // subsequent calls. |
| 68 | BackOff func() backoff.BackOff |
| 69 | |
| 70 | // APIRate is the minimum time taken between subsequent API calls. |
| 71 | APIRate time.Duration |
Serge Bazanski | 7448f28 | 2023-02-20 14:15:51 +0100 | [diff] [blame] | 72 | |
| 73 | // Parallelism defines how many calls to the Equinix API will be issued in |
| 74 | // parallel. When this limit is reached, subsequent attmepts to call the API will |
| 75 | // block. The order of serving of pending calls is currently undefined. |
| 76 | // |
| 77 | // If not defined (ie. 0), defaults to 1. |
| 78 | Parallelism int |
Serge Bazanski | dea7cd0 | 2023-04-26 13:58:17 +0200 | [diff] [blame^] | 79 | |
| 80 | MetricsRegistry *prometheus.Registry |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 81 | } |
| 82 | |
| 83 | func (o *Opts) RegisterFlags() { |
| 84 | flag.StringVar(&o.User, "equinix_api_username", "", "Username for Equinix API") |
| 85 | flag.StringVar(&o.APIKey, "equinix_api_key", "", "Key/token/password for Equinix API") |
Serge Bazanski | afd3cf8 | 2023-04-19 17:43:46 +0200 | [diff] [blame] | 86 | flag.IntVar(&o.Parallelism, "equinix_parallelism", 3, "How many parallel connections to the Equinix API will be allowed") |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 87 | } |
| 88 | |
| 89 | // Client is a limited interface of methods that the Shepherd uses on Equinix. It |
| 90 | // is provided to allow for dependency injection of a fake equinix API for tests. |
| 91 | type Client interface { |
| 92 | // GetDevice wraps packngo's cl.Devices.Get. |
Serge Bazanski | 4969fd7 | 2023-04-19 17:43:12 +0200 | [diff] [blame] | 93 | // |
| 94 | // TODO(q3k): remove unused pid parameter. |
| 95 | GetDevice(ctx context.Context, pid, did string, opts *packngo.ListOptions) (*packngo.Device, error) |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 96 | // ListDevices wraps packngo's cl.Device.List. |
| 97 | ListDevices(ctx context.Context, pid string) ([]packngo.Device, error) |
| 98 | // CreateDevice attempts to create a new device according to the provided |
| 99 | // request. The request _must_ configure a HardwareReservationID. This call |
| 100 | // attempts to be as idempotent as possible, and will return ErrRaceLost if a |
| 101 | // retry was needed but in the meantime the requested hardware reservation from |
| 102 | // which this machine was requested got lost. |
| 103 | CreateDevice(ctx context.Context, request *packngo.DeviceCreateRequest) (*packngo.Device, error) |
| 104 | |
| 105 | // ListReservations returns a complete list of hardware reservations associated |
| 106 | // with project pid. This is an expensive method that takes a while to execute, |
| 107 | // handle with care. |
| 108 | ListReservations(ctx context.Context, pid string) ([]packngo.HardwareReservation, error) |
| 109 | |
| 110 | // ListSSHKeys wraps packngo's cl.Keys.List. |
| 111 | ListSSHKeys(ctx context.Context) ([]packngo.SSHKey, error) |
| 112 | // CreateSSHKey is idempotent - the key label can be used only once. Further |
| 113 | // calls referring to the same label and key will not yield errors. See the |
| 114 | // package comment for more info on this method's behavior and returned error |
| 115 | // values. |
| 116 | CreateSSHKey(ctx context.Context, req *packngo.SSHKeyCreateRequest) (*packngo.SSHKey, error) |
| 117 | // UpdateSSHKey is idempotent - values included in r can be applied only once, |
| 118 | // while subsequent updates using the same data don't produce errors. See the |
| 119 | // package comment for information on this method's behavior and returned error |
| 120 | // values. |
| 121 | UpdateSSHKey(ctx context.Context, kid string, req *packngo.SSHKeyUpdateRequest) (*packngo.SSHKey, error) |
Serge Bazanski | ae00468 | 2023-04-18 13:28:48 +0200 | [diff] [blame] | 122 | RebootDevice(ctx context.Context, did string) error |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 123 | |
| 124 | Close() |
| 125 | } |
| 126 | |
| 127 | // client implements the Client interface. |
| 128 | type client struct { |
| 129 | username string |
| 130 | token string |
| 131 | o *Opts |
| 132 | rlt *time.Ticker |
| 133 | |
Serge Bazanski | dea7cd0 | 2023-04-26 13:58:17 +0200 | [diff] [blame^] | 134 | serializer *serializer |
| 135 | metrics *metricsSet |
| 136 | } |
| 137 | |
| 138 | // serializer is an N-semaphore channel (configured by opts.Parallelism) which is |
| 139 | // used to limit the number of concurrent calls to the Equinix API. |
| 140 | // |
| 141 | // In addition, it implements some simple waiting/usage statistics for |
| 142 | // metrics/introspection. |
| 143 | type serializer struct { |
| 144 | sem chan struct{} |
| 145 | usage int64 |
| 146 | waiting int64 |
| 147 | } |
| 148 | |
| 149 | // up blocks until the serializer has at least one available concurrent call |
| 150 | // slot. If the given context expires before such a slot is available, the |
| 151 | // context error is returned. |
| 152 | func (s *serializer) up(ctx context.Context) error { |
| 153 | atomic.AddInt64(&s.waiting, 1) |
| 154 | select { |
| 155 | case s.sem <- struct{}{}: |
| 156 | atomic.AddInt64(&s.waiting, -1) |
| 157 | atomic.AddInt64(&s.usage, 1) |
| 158 | return nil |
| 159 | case <-ctx.Done(): |
| 160 | atomic.AddInt64(&s.waiting, -1) |
| 161 | return ctx.Err() |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | // down releases a previously acquire concurrent call slot. |
| 166 | func (s *serializer) down() { |
| 167 | atomic.AddInt64(&s.usage, -1) |
| 168 | <-s.sem |
| 169 | } |
| 170 | |
| 171 | // stats returns the number of in-flight and waiting-for-semaphore requests. |
| 172 | func (s *serializer) stats() (usage, waiting int64) { |
| 173 | usage = atomic.LoadInt64(&s.usage) |
| 174 | waiting = atomic.LoadInt64(&s.waiting) |
| 175 | return |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 176 | } |
| 177 | |
| 178 | // New creates a Client instance based on Opts. PACKNGO_DEBUG environment |
| 179 | // variable can be set prior to the below call to enable verbose packngo |
| 180 | // debug logs. |
| 181 | func New(opts *Opts) Client { |
| 182 | return new(opts) |
| 183 | } |
| 184 | |
| 185 | func new(opts *Opts) *client { |
| 186 | // Apply the defaults. |
| 187 | if opts.APIRate == 0 { |
| 188 | opts.APIRate = 2 * time.Second |
| 189 | } |
| 190 | if opts.BackOff == nil { |
| 191 | opts.BackOff = func() backoff.BackOff { |
| 192 | return backoff.NewExponentialBackOff() |
| 193 | } |
| 194 | } |
Serge Bazanski | 7448f28 | 2023-02-20 14:15:51 +0100 | [diff] [blame] | 195 | if opts.Parallelism == 0 { |
| 196 | opts.Parallelism = 1 |
| 197 | } |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 198 | |
Serge Bazanski | dea7cd0 | 2023-04-26 13:58:17 +0200 | [diff] [blame^] | 199 | cl := &client{ |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 200 | username: opts.User, |
| 201 | token: opts.APIKey, |
| 202 | o: opts, |
| 203 | rlt: time.NewTicker(opts.APIRate), |
| 204 | |
Serge Bazanski | dea7cd0 | 2023-04-26 13:58:17 +0200 | [diff] [blame^] | 205 | serializer: &serializer{ |
| 206 | sem: make(chan struct{}, opts.Parallelism), |
| 207 | }, |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 208 | } |
Serge Bazanski | dea7cd0 | 2023-04-26 13:58:17 +0200 | [diff] [blame^] | 209 | if opts.MetricsRegistry != nil { |
| 210 | ms := newMetricsSet(cl.serializer) |
| 211 | opts.MetricsRegistry.MustRegister(ms.inFlight, ms.waiting, ms.requestLatencies) |
| 212 | cl.metrics = ms |
| 213 | } |
| 214 | return cl |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 215 | } |
| 216 | |
| 217 | func (c *client) Close() { |
| 218 | c.rlt.Stop() |
| 219 | } |
| 220 | |
| 221 | var ( |
| 222 | ErrRaceLost = errors.New("race lost with another API user") |
| 223 | ErrNoReservationProvided = errors.New("hardware reservation must be set") |
| 224 | ) |
| 225 | |
| 226 | func (e *client) CreateDevice(ctx context.Context, r *packngo.DeviceCreateRequest) (*packngo.Device, error) { |
| 227 | if r.HardwareReservationID == "" { |
| 228 | return nil, ErrNoReservationProvided |
| 229 | } |
| 230 | // Add a tag to the request to detect if someone snatches a hardware reservation |
| 231 | // from under us. |
| 232 | witnessTag := fmt.Sprintf("wrapngo-idempotency-%s", uuid.New().String()) |
| 233 | r.Tags = append(r.Tags, witnessTag) |
| 234 | |
| 235 | return wrap(ctx, e, func(cl *packngo.Client) (*packngo.Device, error) { |
| 236 | //Does the device already exist? |
| 237 | res, _, err := cl.HardwareReservations.Get(r.HardwareReservationID, nil) |
| 238 | if err != nil { |
| 239 | return nil, fmt.Errorf("couldn't check if device already exists: %w", err) |
| 240 | } |
| 241 | if res == nil { |
| 242 | return nil, fmt.Errorf("unexpected nil response") |
| 243 | } |
| 244 | if res.Device != nil { |
| 245 | // Check if we lost the race for this hardware reservation. |
| 246 | tags := make(map[string]bool) |
| 247 | for _, tag := range res.Device.Tags { |
| 248 | tags[tag] = true |
| 249 | } |
| 250 | if !tags[witnessTag] { |
| 251 | return nil, ErrRaceLost |
| 252 | } |
| 253 | return res.Device, nil |
| 254 | } |
| 255 | |
| 256 | // No device yet. Try to create it. |
| 257 | dev, _, err := cl.Devices.Create(r) |
| 258 | if err == nil { |
| 259 | return dev, nil |
| 260 | } |
| 261 | // In case of a transient failure (eg. network issue), we retry the whole |
| 262 | // operation, which means we first check again if the device already exists. If |
| 263 | // it's a permanent error from the API, the backoff logic will fail immediately. |
| 264 | return nil, fmt.Errorf("couldn't create device: %w", err) |
| 265 | }) |
| 266 | } |
| 267 | |
| 268 | func (e *client) ListDevices(ctx context.Context, pid string) ([]packngo.Device, error) { |
| 269 | return wrap(ctx, e, func(cl *packngo.Client) ([]packngo.Device, error) { |
Tim Windelschmidt | d1b1747 | 2023-04-18 03:49:12 +0200 | [diff] [blame] | 270 | // to increase the chances of a stable pagination, we sort the devices by hostname |
| 271 | res, _, err := cl.Devices.List(pid, &packngo.GetOptions{SortBy: "hostname"}) |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 272 | return res, err |
| 273 | }) |
| 274 | } |
| 275 | |
Serge Bazanski | 4969fd7 | 2023-04-19 17:43:12 +0200 | [diff] [blame] | 276 | func (e *client) GetDevice(ctx context.Context, pid, did string, opts *packngo.ListOptions) (*packngo.Device, error) { |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 277 | return wrap(ctx, e, func(cl *packngo.Client) (*packngo.Device, error) { |
Serge Bazanski | 4969fd7 | 2023-04-19 17:43:12 +0200 | [diff] [blame] | 278 | d, _, err := cl.Devices.Get(did, opts) |
Mateusz Zalega | 6a058e7 | 2022-11-30 18:03:07 +0100 | [diff] [blame] | 279 | return d, err |
| 280 | }) |
| 281 | } |
| 282 | |
| 283 | // Currently unexported, only used in tests. |
| 284 | func (e *client) deleteDevice(ctx context.Context, did string) error { |
| 285 | _, err := wrap(ctx, e, func(cl *packngo.Client) (*struct{}, error) { |
| 286 | _, err := cl.Devices.Delete(did, false) |
| 287 | if httpStatusCode(err) == http.StatusNotFound { |
| 288 | // 404s may pop up as an after effect of running the back-off |
| 289 | // algorithm, and as such should not be propagated. |
| 290 | return nil, nil |
| 291 | } |
| 292 | return nil, err |
| 293 | }) |
| 294 | return err |
| 295 | } |
| 296 | |
| 297 | func (e *client) ListReservations(ctx context.Context, pid string) ([]packngo.HardwareReservation, error) { |
| 298 | return wrap(ctx, e, func(cl *packngo.Client) ([]packngo.HardwareReservation, error) { |
| 299 | res, _, err := cl.HardwareReservations.List(pid, nil) |
| 300 | return res, err |
| 301 | }) |
| 302 | } |
| 303 | |
| 304 | func (e *client) CreateSSHKey(ctx context.Context, r *packngo.SSHKeyCreateRequest) (*packngo.SSHKey, error) { |
| 305 | return wrap(ctx, e, func(cl *packngo.Client) (*packngo.SSHKey, error) { |
| 306 | // Does the key already exist? |
| 307 | ks, _, err := cl.SSHKeys.List() |
| 308 | if err != nil { |
| 309 | return nil, fmt.Errorf("SSHKeys.List: %w", err) |
| 310 | } |
| 311 | for _, k := range ks { |
| 312 | if k.Label == r.Label { |
| 313 | if k.Key != r.Key { |
| 314 | return nil, fmt.Errorf("key label already in use for a different key") |
| 315 | } |
| 316 | return &k, nil |
| 317 | } |
| 318 | } |
| 319 | |
| 320 | // No key yet. Try to create it. |
| 321 | k, _, err := cl.SSHKeys.Create(r) |
| 322 | if err != nil { |
| 323 | return nil, fmt.Errorf("SSHKeys.Create: %w", err) |
| 324 | } |
| 325 | return k, nil |
| 326 | }) |
| 327 | } |
| 328 | |
| 329 | func (e *client) UpdateSSHKey(ctx context.Context, id string, r *packngo.SSHKeyUpdateRequest) (*packngo.SSHKey, error) { |
| 330 | return wrap(ctx, e, func(cl *packngo.Client) (*packngo.SSHKey, error) { |
| 331 | k, _, err := cl.SSHKeys.Update(id, r) |
| 332 | if err != nil { |
| 333 | return nil, fmt.Errorf("SSHKeys.Update: %w", err) |
| 334 | } |
| 335 | return k, err |
| 336 | }) |
| 337 | } |
| 338 | |
| 339 | // Currently unexported, only used in tests. |
| 340 | func (e *client) deleteSSHKey(ctx context.Context, id string) error { |
| 341 | _, err := wrap(ctx, e, func(cl *packngo.Client) (struct{}, error) { |
| 342 | _, err := cl.SSHKeys.Delete(id) |
| 343 | if err != nil { |
| 344 | return struct{}{}, fmt.Errorf("SSHKeys.Delete: %w", err) |
| 345 | } |
| 346 | return struct{}{}, err |
| 347 | }) |
| 348 | return err |
| 349 | } |
| 350 | |
| 351 | func (e *client) ListSSHKeys(ctx context.Context) ([]packngo.SSHKey, error) { |
| 352 | return wrap(ctx, e, func(cl *packngo.Client) ([]packngo.SSHKey, error) { |
| 353 | ks, _, err := cl.SSHKeys.List() |
| 354 | if err != nil { |
| 355 | return nil, fmt.Errorf("SSHKeys.List: %w", err) |
| 356 | } |
| 357 | return ks, nil |
| 358 | }) |
| 359 | } |
| 360 | |
| 361 | // Currently unexported, only used in tests. |
| 362 | func (e *client) getSSHKey(ctx context.Context, id string) (*packngo.SSHKey, error) { |
| 363 | return wrap(ctx, e, func(cl *packngo.Client) (*packngo.SSHKey, error) { |
| 364 | k, _, err := cl.SSHKeys.Get(id, nil) |
| 365 | if err != nil { |
| 366 | return nil, fmt.Errorf("SSHKeys.Get: %w", err) |
| 367 | } |
| 368 | return k, nil |
| 369 | }) |
| 370 | } |
Serge Bazanski | ae00468 | 2023-04-18 13:28:48 +0200 | [diff] [blame] | 371 | |
| 372 | func (e *client) RebootDevice(ctx context.Context, did string) error { |
| 373 | _, err := wrap(ctx, e, func(cl *packngo.Client) (struct{}, error) { |
| 374 | _, err := cl.Devices.Reboot(did) |
| 375 | return struct{}{}, err |
| 376 | }) |
| 377 | return err |
| 378 | } |