cloud/shepherd/equinix: update provider extra data

This adds a new sub-component to the Equinix Shepherd, the Updater.

The Updater periodically scans all machines backed by the Equinix
provider in the BMDB and queries the Equinix API for their status. The
status then populates the new 'Provided' row data.

Change-Id: I99657545aabfb13d71e165d36ce549c852feaf49
Reviewed-on: https://review.monogon.dev/c/monogon/+/1578
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/cloud/shepherd/equinix/manager/updater.go b/cloud/shepherd/equinix/manager/updater.go
new file mode 100644
index 0000000..6c00b97
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/updater.go
@@ -0,0 +1,262 @@
+package manager
+
+import (
+	"context"
+	"database/sql"
+	"errors"
+	"flag"
+	"fmt"
+	"time"
+
+	"github.com/packethost/packngo"
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	"source.monogon.dev/cloud/lib/sinbin"
+	ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
+)
+
+type UpdaterConfig struct {
+	// Enable starts the updater.
+	Enable bool
+	// IterationRate is the minimu mtime taken between subsequent iterations of the
+	// updater.
+	IterationRate time.Duration
+}
+
+func (u *UpdaterConfig) RegisterFlags() {
+	flag.BoolVar(&u.Enable, "updater_enable", true, "Enable the updater, which periodically scans equinix machines and updates their status in the BMDB")
+	flag.DurationVar(&u.IterationRate, "updater_iteration_rate", time.Minute, "Rate limiting for updater iteration loop")
+}
+
+// The Updater periodically scans all machines backed by the equinix provider and
+// updaters their Provided status fields based on data retrieved from the Equinix
+// API.
+type Updater struct {
+	config *UpdaterConfig
+	sinbin sinbin.Sinbin[string]
+
+	cl ecl.Client
+}
+
+func (c *UpdaterConfig) New(cl ecl.Client) (*Updater, error) {
+	return &Updater{
+		config: c,
+		cl:     cl,
+	}, nil
+}
+
+func (u *Updater) Run(ctx context.Context, conn *bmdb.Connection) error {
+	var sess *bmdb.Session
+	var err error
+
+	if !u.config.Enable {
+		return nil
+	}
+
+	for {
+		if sess == nil {
+			sess, err = conn.StartSession(ctx)
+			if err != nil {
+				return fmt.Errorf("could not start BMDB session: %w", err)
+			}
+		}
+		limit := time.After(u.config.IterationRate)
+
+		err = u.runInSession(ctx, sess)
+		switch {
+		case err == nil:
+			<-limit
+		case errors.Is(err, ctx.Err()):
+			return err
+		case errors.Is(err, bmdb.ErrSessionExpired):
+			klog.Errorf("Session expired, restarting...")
+			sess = nil
+			time.Sleep(time.Second)
+		case err != nil:
+			klog.Errorf("Processing failed: %v", err)
+			// TODO(q3k): close session
+			time.Sleep(time.Second)
+		}
+	}
+}
+
+// applyNullStringUpdate returns true if 'up' supersedes 'cur'. Otherwise, it
+// returns false and zeroes out up.
+func applyNullStringUpdate(up, cur *sql.NullString) bool {
+	if up.Valid {
+		if !cur.Valid {
+			return true
+		}
+		if up.String != cur.String {
+			return true
+		}
+	}
+	up.String = ""
+	up.Valid = false
+	return false
+}
+
+// applyNullProviderStatusUpdate returns true if 'up' supersedes 'cur'.
+// Otherwise, it returns false and zeroes out up.
+func applyNullProviderStatusUpdate(up, cur *model.NullProviderStatus) bool {
+	if up.Valid {
+		if !cur.Valid {
+			return true
+		}
+		if up.ProviderStatus != cur.ProviderStatus {
+			return true
+		}
+	}
+	up.ProviderStatus = model.ProviderStatusUnknown
+	up.Valid = false
+	return false
+}
+
+// applyUpdate returns true if 'up' supersedes 'cur'. Otherwise, it returns false
+// and zeroes out up.
+func applyUpdate(up *model.MachineUpdateProviderStatusParams, cur *model.MachineProvided) bool {
+	res := false
+	res = res || applyNullStringUpdate(&up.ProviderReservationID, &cur.ProviderReservationID)
+	res = res || applyNullStringUpdate(&up.ProviderIpAddress, &cur.ProviderIpAddress)
+	res = res || applyNullStringUpdate(&up.ProviderLocation, &cur.ProviderLocation)
+	res = res || applyNullProviderStatusUpdate(&up.ProviderStatus, &cur.ProviderStatus)
+	return res
+}
+
+// updateLog logs information about the given update as calculated by applyUpdate.
+func updateLog(up *model.MachineUpdateProviderStatusParams) {
+	if up.ProviderReservationID.Valid {
+		klog.Infof("   Device %s: new reservation ID %s", up.ProviderID, up.ProviderReservationID.String)
+	}
+	if up.ProviderIpAddress.Valid {
+		klog.Infof("   Device %s: new IP address %s", up.ProviderID, up.ProviderIpAddress.String)
+	}
+	if up.ProviderLocation.Valid {
+		klog.Infof("   Device %s: new location %s", up.ProviderID, up.ProviderLocation.String)
+	}
+	if up.ProviderStatus.Valid {
+		klog.Infof("   Device %s: new status %s", up.ProviderID, up.ProviderStatus.ProviderStatus)
+	}
+}
+
+func (u *Updater) runInSession(ctx context.Context, sess *bmdb.Session) error {
+	// Get all machines provided by us into the BMDB.
+	// TODO(q3k): do not load all machines into memory.
+
+	var machines []model.MachineProvided
+	err := sess.Transact(ctx, func(q *model.Queries) error {
+		var err error
+		machines, err = q.GetProvidedMachines(ctx, model.ProviderEquinix)
+		return err
+	})
+	if err != nil {
+		return fmt.Errorf("when fetching provided machines: %w", err)
+	}
+
+	// Limit how many machines we check by timing them out if they're likely to not
+	// get updated soon.
+	penalized := 0
+	var check []model.MachineProvided
+	for _, m := range machines {
+		if u.sinbin.Penalized(m.ProviderID) {
+			penalized += 1
+		} else {
+			check = append(check, m)
+		}
+	}
+
+	klog.Infof("Machines to check %d, skipping: %d", len(check), penalized)
+	for _, m := range check {
+		dev, err := u.cl.GetDevice(ctx, "", m.ProviderID, &packngo.ListOptions{
+			Includes: []string{
+				"hardware_reservation",
+			},
+			Excludes: []string{
+				"created_by", "customdata", "network_ports", "operating_system", "actions",
+				"plan", "provisioning_events", "ssh_keys", "tags", "volumes",
+			},
+		})
+		if err != nil {
+			klog.Warningf("Fetching device %s failed: %v", m.ProviderID, err)
+			continue
+		}
+
+		// nextCheck will be used to sinbin the machine for some given time if there is
+		// no difference between the current state and new state.
+		//
+		// Some conditions override this to be shorter (when the machine doesn't yet have
+		// all data available or is in an otherwise unstable state).
+		nextCheck := time.Minute * 30
+
+		up := model.MachineUpdateProviderStatusParams{
+			Provider:   m.Provider,
+			ProviderID: m.ProviderID,
+		}
+
+		if dev.HardwareReservation != nil {
+			up.ProviderReservationID.Valid = true
+			up.ProviderReservationID.String = dev.HardwareReservation.ID
+		} else {
+			nextCheck = time.Minute
+		}
+
+		for _, addr := range dev.Network {
+			if !addr.Public {
+				continue
+			}
+			up.ProviderIpAddress.Valid = true
+			up.ProviderIpAddress.String = addr.Address
+			break
+		}
+		if !up.ProviderIpAddress.Valid {
+			nextCheck = time.Minute
+		}
+
+		if dev.Facility != nil {
+			up.ProviderLocation.Valid = true
+			up.ProviderLocation.String = dev.Facility.Code
+		} else {
+			nextCheck = time.Minute
+		}
+
+		up.ProviderStatus.Valid = true
+		switch dev.State {
+		case "active":
+			up.ProviderStatus.ProviderStatus = model.ProviderStatusRunning
+		case "deleted":
+			up.ProviderStatus.ProviderStatus = model.ProviderStatusMissing
+		case "failed":
+			up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioningFailedPermanent
+		case "inactive":
+			up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
+		case "powering_on", "powering_off":
+			nextCheck = time.Minute
+			up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
+		case "queued", "provisioning", "reinstalling", "post_provisioning":
+			nextCheck = time.Minute
+			up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioning
+		default:
+			klog.Warningf("Device %s has unexpected status: %q", m.ProviderID, dev.State)
+			nextCheck = time.Minute
+			up.ProviderStatus.ProviderStatus = model.ProviderStatusUnknown
+		}
+
+		if !applyUpdate(&up, &m) {
+			u.sinbin.Add(m.ProviderID, time.Now().Add(nextCheck))
+			continue
+		}
+
+		klog.Infof("Device %s has new data:", m.ProviderID)
+		updateLog(&up)
+		err = sess.Transact(ctx, func(q *model.Queries) error {
+			return q.MachineUpdateProviderStatus(ctx, up)
+		})
+		if err != nil {
+			klog.Warningf("Device %s failed to update: %v", m.ProviderID, err)
+		}
+		u.sinbin.Add(m.ProviderID, time.Now().Add(time.Minute))
+	}
+	return nil
+}