| package main | 
 |  | 
 | import ( | 
 | 	"context" | 
 | 	"database/sql" | 
 | 	"errors" | 
 | 	"flag" | 
 | 	"fmt" | 
 | 	"time" | 
 |  | 
 | 	"github.com/packethost/packngo" | 
 | 	"k8s.io/klog/v2" | 
 |  | 
 | 	"source.monogon.dev/cloud/bmaas/bmdb" | 
 | 	"source.monogon.dev/cloud/bmaas/bmdb/metrics" | 
 | 	"source.monogon.dev/cloud/bmaas/bmdb/model" | 
 | 	ecl "source.monogon.dev/cloud/equinix/wrapngo" | 
 | 	"source.monogon.dev/cloud/lib/sinbin" | 
 | ) | 
 |  | 
 | type UpdaterConfig struct { | 
 | 	// Enable starts the updater. | 
 | 	Enable bool | 
 | 	// IterationRate is the minimu mtime taken between subsequent iterations of the | 
 | 	// updater. | 
 | 	IterationRate time.Duration | 
 | } | 
 |  | 
 | func (u *UpdaterConfig) RegisterFlags() { | 
 | 	flag.BoolVar(&u.Enable, "updater_enable", true, "Enable the updater, which periodically scans equinix machines and updates their status in the BMDB") | 
 | 	flag.DurationVar(&u.IterationRate, "updater_iteration_rate", time.Minute, "Rate limiting for updater iteration loop") | 
 | } | 
 |  | 
 | // The Updater periodically scans all machines backed by the equinix provider and | 
 | // updaters their Provided status fields based on data retrieved from the Equinix | 
 | // API. | 
 | type Updater struct { | 
 | 	config *UpdaterConfig | 
 | 	sinbin sinbin.Sinbin[string] | 
 |  | 
 | 	cl ecl.Client | 
 | } | 
 |  | 
 | func (c *UpdaterConfig) New(cl ecl.Client) (*Updater, error) { | 
 | 	return &Updater{ | 
 | 		config: c, | 
 | 		cl:     cl, | 
 | 	}, nil | 
 | } | 
 |  | 
 | func (u *Updater) Run(ctx context.Context, conn *bmdb.Connection) error { | 
 | 	var sess *bmdb.Session | 
 | 	var err error | 
 |  | 
 | 	if !u.config.Enable { | 
 | 		return nil | 
 | 	} | 
 |  | 
 | 	for { | 
 | 		if sess == nil { | 
 | 			sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdUpdater}) | 
 | 			if err != nil { | 
 | 				return fmt.Errorf("could not start BMDB session: %w", err) | 
 | 			} | 
 | 		} | 
 | 		limit := time.After(u.config.IterationRate) | 
 |  | 
 | 		err = u.runInSession(ctx, sess) | 
 | 		switch { | 
 | 		case err == nil: | 
 | 			<-limit | 
 | 		case errors.Is(err, ctx.Err()): | 
 | 			return err | 
 | 		case errors.Is(err, bmdb.ErrSessionExpired): | 
 | 			klog.Errorf("Session expired, restarting...") | 
 | 			sess = nil | 
 | 			time.Sleep(time.Second) | 
 | 		case err != nil: | 
 | 			klog.Errorf("Processing failed: %v", err) | 
 | 			// TODO(q3k): close session | 
 | 			time.Sleep(time.Second) | 
 | 		} | 
 | 	} | 
 | } | 
 |  | 
 | // applyNullStringUpdate returns true if 'up' supersedes 'cur'. Otherwise, it | 
 | // returns false and zeroes out up. | 
 | func applyNullStringUpdate(up, cur *sql.NullString) bool { | 
 | 	if up.Valid { | 
 | 		if !cur.Valid { | 
 | 			return true | 
 | 		} | 
 | 		if up.String != cur.String { | 
 | 			return true | 
 | 		} | 
 | 	} | 
 | 	up.String = "" | 
 | 	up.Valid = false | 
 | 	return false | 
 | } | 
 |  | 
 | // applyNullProviderStatusUpdate returns true if 'up' supersedes 'cur'. | 
 | // Otherwise, it returns false and zeroes out up. | 
 | func applyNullProviderStatusUpdate(up, cur *model.NullProviderStatus) bool { | 
 | 	if up.Valid { | 
 | 		if !cur.Valid { | 
 | 			return true | 
 | 		} | 
 | 		if up.ProviderStatus != cur.ProviderStatus { | 
 | 			return true | 
 | 		} | 
 | 	} | 
 | 	up.ProviderStatus = model.ProviderStatusUnknown | 
 | 	up.Valid = false | 
 | 	return false | 
 | } | 
 |  | 
 | // applyUpdate returns true if 'up' supersedes 'cur'. Otherwise, it returns false | 
 | // and zeroes out up. | 
 | func applyUpdate(up *model.MachineUpdateProviderStatusParams, cur *model.MachineProvided) bool { | 
 | 	res := false | 
 | 	res = res || applyNullStringUpdate(&up.ProviderReservationID, &cur.ProviderReservationID) | 
 | 	res = res || applyNullStringUpdate(&up.ProviderIpAddress, &cur.ProviderIpAddress) | 
 | 	res = res || applyNullStringUpdate(&up.ProviderLocation, &cur.ProviderLocation) | 
 | 	res = res || applyNullProviderStatusUpdate(&up.ProviderStatus, &cur.ProviderStatus) | 
 | 	return res | 
 | } | 
 |  | 
 | // updateLog logs information about the given update as calculated by applyUpdate. | 
 | func updateLog(up *model.MachineUpdateProviderStatusParams) { | 
 | 	if up.ProviderReservationID.Valid { | 
 | 		klog.Infof("   Machine %s: new reservation ID %s", up.ProviderID, up.ProviderReservationID.String) | 
 | 	} | 
 | 	if up.ProviderIpAddress.Valid { | 
 | 		klog.Infof("   Machine %s: new IP address %s", up.ProviderID, up.ProviderIpAddress.String) | 
 | 	} | 
 | 	if up.ProviderLocation.Valid { | 
 | 		klog.Infof("   Machine %s: new location %s", up.ProviderID, up.ProviderLocation.String) | 
 | 	} | 
 | 	if up.ProviderStatus.Valid { | 
 | 		klog.Infof("   Machine %s: new status %s", up.ProviderID, up.ProviderStatus.ProviderStatus) | 
 | 	} | 
 | } | 
 |  | 
 | func (u *Updater) runInSession(ctx context.Context, sess *bmdb.Session) error { | 
 | 	// Get all machines provided by us into the BMDB. | 
 | 	// TODO(q3k): do not load all machines into memory. | 
 |  | 
 | 	var machines []model.MachineProvided | 
 | 	err := sess.Transact(ctx, func(q *model.Queries) error { | 
 | 		var err error | 
 | 		machines, err = q.GetProvidedMachines(ctx, model.ProviderEquinix) | 
 | 		return err | 
 | 	}) | 
 | 	if err != nil { | 
 | 		return fmt.Errorf("when fetching provided machines: %w", err) | 
 | 	} | 
 |  | 
 | 	// Limit how many machines we check by timing them out if they're likely to not | 
 | 	// get updated soon. | 
 | 	penalized := 0 | 
 | 	var check []model.MachineProvided | 
 | 	for _, m := range machines { | 
 | 		if u.sinbin.Penalized(m.ProviderID) { | 
 | 			penalized += 1 | 
 | 		} else { | 
 | 			check = append(check, m) | 
 | 		} | 
 | 	} | 
 |  | 
 | 	klog.Infof("Machines to check %d, skipping: %d", len(check), penalized) | 
 | 	for _, m := range check { | 
 | 		dev, err := u.cl.GetDevice(ctx, "", m.ProviderID, &packngo.ListOptions{ | 
 | 			Includes: []string{ | 
 | 				"hardware_reservation", | 
 | 			}, | 
 | 			Excludes: []string{ | 
 | 				"created_by", "customdata", "network_ports", "operating_system", "actions", | 
 | 				"plan", "provisioning_events", "ssh_keys", "tags", "volumes", | 
 | 			}, | 
 | 		}) | 
 | 		if err != nil { | 
 | 			klog.Warningf("Fetching device %s failed: %v", m.ProviderID, err) | 
 | 			continue | 
 | 		} | 
 |  | 
 | 		// nextCheck will be used to sinbin the machine for some given time if there is | 
 | 		// no difference between the current state and new state. | 
 | 		// | 
 | 		// Some conditions override this to be shorter (when the machine doesn't yet have | 
 | 		// all data available or is in an otherwise unstable state). | 
 | 		nextCheck := time.Minute * 30 | 
 |  | 
 | 		up := model.MachineUpdateProviderStatusParams{ | 
 | 			Provider:   m.Provider, | 
 | 			ProviderID: m.ProviderID, | 
 | 		} | 
 |  | 
 | 		if dev.HardwareReservation != nil { | 
 | 			up.ProviderReservationID.Valid = true | 
 | 			up.ProviderReservationID.String = dev.HardwareReservation.ID | 
 | 		} else { | 
 | 			nextCheck = time.Minute | 
 | 		} | 
 |  | 
 | 		for _, addr := range dev.Network { | 
 | 			if !addr.Public { | 
 | 				continue | 
 | 			} | 
 | 			up.ProviderIpAddress.Valid = true | 
 | 			up.ProviderIpAddress.String = addr.Address | 
 | 			break | 
 | 		} | 
 | 		if !up.ProviderIpAddress.Valid { | 
 | 			nextCheck = time.Minute | 
 | 		} | 
 |  | 
 | 		if dev.Facility != nil { | 
 | 			up.ProviderLocation.Valid = true | 
 | 			up.ProviderLocation.String = dev.Facility.Code | 
 | 		} else { | 
 | 			nextCheck = time.Minute | 
 | 		} | 
 |  | 
 | 		up.ProviderStatus.Valid = true | 
 | 		switch dev.State { | 
 | 		case "active": | 
 | 			up.ProviderStatus.ProviderStatus = model.ProviderStatusRunning | 
 | 		case "deleted": | 
 | 			up.ProviderStatus.ProviderStatus = model.ProviderStatusMissing | 
 | 		case "failed": | 
 | 			up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioningFailedPermanent | 
 | 		case "inactive": | 
 | 			up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped | 
 | 		case "powering_on", "powering_off": | 
 | 			nextCheck = time.Minute | 
 | 			up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped | 
 | 		case "queued", "provisioning", "reinstalling", "post_provisioning": | 
 | 			nextCheck = time.Minute | 
 | 			up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioning | 
 | 		default: | 
 | 			klog.Warningf("Device %s has unexpected status: %q", m.ProviderID, dev.State) | 
 | 			nextCheck = time.Minute | 
 | 			up.ProviderStatus.ProviderStatus = model.ProviderStatusUnknown | 
 | 		} | 
 |  | 
 | 		if !applyUpdate(&up, &m) { | 
 | 			u.sinbin.Add(m.ProviderID, time.Now().Add(nextCheck)) | 
 | 			continue | 
 | 		} | 
 |  | 
 | 		klog.Infof("Device %s has new data:", m.ProviderID) | 
 | 		updateLog(&up) | 
 | 		err = sess.Transact(ctx, func(q *model.Queries) error { | 
 | 			return q.MachineUpdateProviderStatus(ctx, up) | 
 | 		}) | 
 | 		if err != nil { | 
 | 			klog.Warningf("Device %s failed to update: %v", m.ProviderID, err) | 
 | 		} | 
 | 		u.sinbin.Add(m.ProviderID, time.Now().Add(time.Minute)) | 
 | 	} | 
 | 	return nil | 
 | } |