cloud: split shepherd up
Change-Id: I8e386d9eaaf17543743e1e8a37a8d71426910d59
Reviewed-on: https://review.monogon.dev/c/monogon/+/2213
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/cloud/shepherd/provider/equinix/updater.go b/cloud/shepherd/provider/equinix/updater.go
new file mode 100644
index 0000000..b053f26
--- /dev/null
+++ b/cloud/shepherd/provider/equinix/updater.go
@@ -0,0 +1,263 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "flag"
+ "fmt"
+ "time"
+
+ "github.com/packethost/packngo"
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/metrics"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ ecl "source.monogon.dev/cloud/equinix/wrapngo"
+ "source.monogon.dev/cloud/lib/sinbin"
+)
+
+type UpdaterConfig struct {
+ // Enable starts the updater.
+ Enable bool
+ // IterationRate is the minimu mtime taken between subsequent iterations of the
+ // updater.
+ IterationRate time.Duration
+}
+
+func (u *UpdaterConfig) RegisterFlags() {
+ flag.BoolVar(&u.Enable, "updater_enable", true, "Enable the updater, which periodically scans equinix machines and updates their status in the BMDB")
+ flag.DurationVar(&u.IterationRate, "updater_iteration_rate", time.Minute, "Rate limiting for updater iteration loop")
+}
+
+// The Updater periodically scans all machines backed by the equinix provider and
+// updaters their Provided status fields based on data retrieved from the Equinix
+// API.
+type Updater struct {
+ config *UpdaterConfig
+ sinbin sinbin.Sinbin[string]
+
+ cl ecl.Client
+}
+
+func (c *UpdaterConfig) New(cl ecl.Client) (*Updater, error) {
+ return &Updater{
+ config: c,
+ cl: cl,
+ }, nil
+}
+
+func (u *Updater) Run(ctx context.Context, conn *bmdb.Connection) error {
+ var sess *bmdb.Session
+ var err error
+
+ if !u.config.Enable {
+ return nil
+ }
+
+ for {
+ if sess == nil {
+ sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdUpdater})
+ if err != nil {
+ return fmt.Errorf("could not start BMDB session: %w", err)
+ }
+ }
+ limit := time.After(u.config.IterationRate)
+
+ err = u.runInSession(ctx, sess)
+ switch {
+ case err == nil:
+ <-limit
+ case errors.Is(err, ctx.Err()):
+ return err
+ case errors.Is(err, bmdb.ErrSessionExpired):
+ klog.Errorf("Session expired, restarting...")
+ sess = nil
+ time.Sleep(time.Second)
+ case err != nil:
+ klog.Errorf("Processing failed: %v", err)
+ // TODO(q3k): close session
+ time.Sleep(time.Second)
+ }
+ }
+}
+
+// applyNullStringUpdate returns true if 'up' supersedes 'cur'. Otherwise, it
+// returns false and zeroes out up.
+func applyNullStringUpdate(up, cur *sql.NullString) bool {
+ if up.Valid {
+ if !cur.Valid {
+ return true
+ }
+ if up.String != cur.String {
+ return true
+ }
+ }
+ up.String = ""
+ up.Valid = false
+ return false
+}
+
+// applyNullProviderStatusUpdate returns true if 'up' supersedes 'cur'.
+// Otherwise, it returns false and zeroes out up.
+func applyNullProviderStatusUpdate(up, cur *model.NullProviderStatus) bool {
+ if up.Valid {
+ if !cur.Valid {
+ return true
+ }
+ if up.ProviderStatus != cur.ProviderStatus {
+ return true
+ }
+ }
+ up.ProviderStatus = model.ProviderStatusUnknown
+ up.Valid = false
+ return false
+}
+
+// applyUpdate returns true if 'up' supersedes 'cur'. Otherwise, it returns false
+// and zeroes out up.
+func applyUpdate(up *model.MachineUpdateProviderStatusParams, cur *model.MachineProvided) bool {
+ res := false
+ res = res || applyNullStringUpdate(&up.ProviderReservationID, &cur.ProviderReservationID)
+ res = res || applyNullStringUpdate(&up.ProviderIpAddress, &cur.ProviderIpAddress)
+ res = res || applyNullStringUpdate(&up.ProviderLocation, &cur.ProviderLocation)
+ res = res || applyNullProviderStatusUpdate(&up.ProviderStatus, &cur.ProviderStatus)
+ return res
+}
+
+// updateLog logs information about the given update as calculated by applyUpdate.
+func updateLog(up *model.MachineUpdateProviderStatusParams) {
+ if up.ProviderReservationID.Valid {
+ klog.Infof(" Machine %s: new reservation ID %s", up.ProviderID, up.ProviderReservationID.String)
+ }
+ if up.ProviderIpAddress.Valid {
+ klog.Infof(" Machine %s: new IP address %s", up.ProviderID, up.ProviderIpAddress.String)
+ }
+ if up.ProviderLocation.Valid {
+ klog.Infof(" Machine %s: new location %s", up.ProviderID, up.ProviderLocation.String)
+ }
+ if up.ProviderStatus.Valid {
+ klog.Infof(" Machine %s: new status %s", up.ProviderID, up.ProviderStatus.ProviderStatus)
+ }
+}
+
+func (u *Updater) runInSession(ctx context.Context, sess *bmdb.Session) error {
+ // Get all machines provided by us into the BMDB.
+ // TODO(q3k): do not load all machines into memory.
+
+ var machines []model.MachineProvided
+ err := sess.Transact(ctx, func(q *model.Queries) error {
+ var err error
+ machines, err = q.GetProvidedMachines(ctx, model.ProviderEquinix)
+ return err
+ })
+ if err != nil {
+ return fmt.Errorf("when fetching provided machines: %w", err)
+ }
+
+ // Limit how many machines we check by timing them out if they're likely to not
+ // get updated soon.
+ penalized := 0
+ var check []model.MachineProvided
+ for _, m := range machines {
+ if u.sinbin.Penalized(m.ProviderID) {
+ penalized += 1
+ } else {
+ check = append(check, m)
+ }
+ }
+
+ klog.Infof("Machines to check %d, skipping: %d", len(check), penalized)
+ for _, m := range check {
+ dev, err := u.cl.GetDevice(ctx, "", m.ProviderID, &packngo.ListOptions{
+ Includes: []string{
+ "hardware_reservation",
+ },
+ Excludes: []string{
+ "created_by", "customdata", "network_ports", "operating_system", "actions",
+ "plan", "provisioning_events", "ssh_keys", "tags", "volumes",
+ },
+ })
+ if err != nil {
+ klog.Warningf("Fetching device %s failed: %v", m.ProviderID, err)
+ continue
+ }
+
+ // nextCheck will be used to sinbin the machine for some given time if there is
+ // no difference between the current state and new state.
+ //
+ // Some conditions override this to be shorter (when the machine doesn't yet have
+ // all data available or is in an otherwise unstable state).
+ nextCheck := time.Minute * 30
+
+ up := model.MachineUpdateProviderStatusParams{
+ Provider: m.Provider,
+ ProviderID: m.ProviderID,
+ }
+
+ if dev.HardwareReservation != nil {
+ up.ProviderReservationID.Valid = true
+ up.ProviderReservationID.String = dev.HardwareReservation.ID
+ } else {
+ nextCheck = time.Minute
+ }
+
+ for _, addr := range dev.Network {
+ if !addr.Public {
+ continue
+ }
+ up.ProviderIpAddress.Valid = true
+ up.ProviderIpAddress.String = addr.Address
+ break
+ }
+ if !up.ProviderIpAddress.Valid {
+ nextCheck = time.Minute
+ }
+
+ if dev.Facility != nil {
+ up.ProviderLocation.Valid = true
+ up.ProviderLocation.String = dev.Facility.Code
+ } else {
+ nextCheck = time.Minute
+ }
+
+ up.ProviderStatus.Valid = true
+ switch dev.State {
+ case "active":
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusRunning
+ case "deleted":
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusMissing
+ case "failed":
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioningFailedPermanent
+ case "inactive":
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
+ case "powering_on", "powering_off":
+ nextCheck = time.Minute
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
+ case "queued", "provisioning", "reinstalling", "post_provisioning":
+ nextCheck = time.Minute
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioning
+ default:
+ klog.Warningf("Device %s has unexpected status: %q", m.ProviderID, dev.State)
+ nextCheck = time.Minute
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusUnknown
+ }
+
+ if !applyUpdate(&up, &m) {
+ u.sinbin.Add(m.ProviderID, time.Now().Add(nextCheck))
+ continue
+ }
+
+ klog.Infof("Device %s has new data:", m.ProviderID)
+ updateLog(&up)
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ return q.MachineUpdateProviderStatus(ctx, up)
+ })
+ if err != nil {
+ klog.Warningf("Device %s failed to update: %v", m.ProviderID, err)
+ }
+ u.sinbin.Add(m.ProviderID, time.Now().Add(time.Minute))
+ }
+ return nil
+}