blob: dd8c6ff31961902196dda6b26bb55be08f019fae [file] [log] [blame]
package manager
import (
"context"
"database/sql"
"errors"
"flag"
"fmt"
"time"
"github.com/packethost/packngo"
"k8s.io/klog/v2"
"source.monogon.dev/cloud/bmaas/bmdb"
"source.monogon.dev/cloud/bmaas/bmdb/metrics"
"source.monogon.dev/cloud/bmaas/bmdb/model"
"source.monogon.dev/cloud/lib/sinbin"
ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
)
type UpdaterConfig struct {
// Enable starts the updater.
Enable bool
// IterationRate is the minimu mtime taken between subsequent iterations of the
// updater.
IterationRate time.Duration
}
func (u *UpdaterConfig) RegisterFlags() {
flag.BoolVar(&u.Enable, "updater_enable", true, "Enable the updater, which periodically scans equinix machines and updates their status in the BMDB")
flag.DurationVar(&u.IterationRate, "updater_iteration_rate", time.Minute, "Rate limiting for updater iteration loop")
}
// The Updater periodically scans all machines backed by the equinix provider and
// updaters their Provided status fields based on data retrieved from the Equinix
// API.
type Updater struct {
config *UpdaterConfig
sinbin sinbin.Sinbin[string]
cl ecl.Client
}
func (c *UpdaterConfig) New(cl ecl.Client) (*Updater, error) {
return &Updater{
config: c,
cl: cl,
}, nil
}
func (u *Updater) Run(ctx context.Context, conn *bmdb.Connection) error {
var sess *bmdb.Session
var err error
if !u.config.Enable {
return nil
}
for {
if sess == nil {
sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdUpdater})
if err != nil {
return fmt.Errorf("could not start BMDB session: %w", err)
}
}
limit := time.After(u.config.IterationRate)
err = u.runInSession(ctx, sess)
switch {
case err == nil:
<-limit
case errors.Is(err, ctx.Err()):
return err
case errors.Is(err, bmdb.ErrSessionExpired):
klog.Errorf("Session expired, restarting...")
sess = nil
time.Sleep(time.Second)
case err != nil:
klog.Errorf("Processing failed: %v", err)
// TODO(q3k): close session
time.Sleep(time.Second)
}
}
}
// applyNullStringUpdate returns true if 'up' supersedes 'cur'. Otherwise, it
// returns false and zeroes out up.
func applyNullStringUpdate(up, cur *sql.NullString) bool {
if up.Valid {
if !cur.Valid {
return true
}
if up.String != cur.String {
return true
}
}
up.String = ""
up.Valid = false
return false
}
// applyNullProviderStatusUpdate returns true if 'up' supersedes 'cur'.
// Otherwise, it returns false and zeroes out up.
func applyNullProviderStatusUpdate(up, cur *model.NullProviderStatus) bool {
if up.Valid {
if !cur.Valid {
return true
}
if up.ProviderStatus != cur.ProviderStatus {
return true
}
}
up.ProviderStatus = model.ProviderStatusUnknown
up.Valid = false
return false
}
// applyUpdate returns true if 'up' supersedes 'cur'. Otherwise, it returns false
// and zeroes out up.
func applyUpdate(up *model.MachineUpdateProviderStatusParams, cur *model.MachineProvided) bool {
res := false
res = res || applyNullStringUpdate(&up.ProviderReservationID, &cur.ProviderReservationID)
res = res || applyNullStringUpdate(&up.ProviderIpAddress, &cur.ProviderIpAddress)
res = res || applyNullStringUpdate(&up.ProviderLocation, &cur.ProviderLocation)
res = res || applyNullProviderStatusUpdate(&up.ProviderStatus, &cur.ProviderStatus)
return res
}
// updateLog logs information about the given update as calculated by applyUpdate.
func updateLog(up *model.MachineUpdateProviderStatusParams) {
if up.ProviderReservationID.Valid {
klog.Infof(" Device %s: new reservation ID %s", up.ProviderID, up.ProviderReservationID.String)
}
if up.ProviderIpAddress.Valid {
klog.Infof(" Device %s: new IP address %s", up.ProviderID, up.ProviderIpAddress.String)
}
if up.ProviderLocation.Valid {
klog.Infof(" Device %s: new location %s", up.ProviderID, up.ProviderLocation.String)
}
if up.ProviderStatus.Valid {
klog.Infof(" Device %s: new status %s", up.ProviderID, up.ProviderStatus.ProviderStatus)
}
}
func (u *Updater) runInSession(ctx context.Context, sess *bmdb.Session) error {
// Get all machines provided by us into the BMDB.
// TODO(q3k): do not load all machines into memory.
var machines []model.MachineProvided
err := sess.Transact(ctx, func(q *model.Queries) error {
var err error
machines, err = q.GetProvidedMachines(ctx, model.ProviderEquinix)
return err
})
if err != nil {
return fmt.Errorf("when fetching provided machines: %w", err)
}
// Limit how many machines we check by timing them out if they're likely to not
// get updated soon.
penalized := 0
var check []model.MachineProvided
for _, m := range machines {
if u.sinbin.Penalized(m.ProviderID) {
penalized += 1
} else {
check = append(check, m)
}
}
klog.Infof("Machines to check %d, skipping: %d", len(check), penalized)
for _, m := range check {
dev, err := u.cl.GetDevice(ctx, "", m.ProviderID, &packngo.ListOptions{
Includes: []string{
"hardware_reservation",
},
Excludes: []string{
"created_by", "customdata", "network_ports", "operating_system", "actions",
"plan", "provisioning_events", "ssh_keys", "tags", "volumes",
},
})
if err != nil {
klog.Warningf("Fetching device %s failed: %v", m.ProviderID, err)
continue
}
// nextCheck will be used to sinbin the machine for some given time if there is
// no difference between the current state and new state.
//
// Some conditions override this to be shorter (when the machine doesn't yet have
// all data available or is in an otherwise unstable state).
nextCheck := time.Minute * 30
up := model.MachineUpdateProviderStatusParams{
Provider: m.Provider,
ProviderID: m.ProviderID,
}
if dev.HardwareReservation != nil {
up.ProviderReservationID.Valid = true
up.ProviderReservationID.String = dev.HardwareReservation.ID
} else {
nextCheck = time.Minute
}
for _, addr := range dev.Network {
if !addr.Public {
continue
}
up.ProviderIpAddress.Valid = true
up.ProviderIpAddress.String = addr.Address
break
}
if !up.ProviderIpAddress.Valid {
nextCheck = time.Minute
}
if dev.Facility != nil {
up.ProviderLocation.Valid = true
up.ProviderLocation.String = dev.Facility.Code
} else {
nextCheck = time.Minute
}
up.ProviderStatus.Valid = true
switch dev.State {
case "active":
up.ProviderStatus.ProviderStatus = model.ProviderStatusRunning
case "deleted":
up.ProviderStatus.ProviderStatus = model.ProviderStatusMissing
case "failed":
up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioningFailedPermanent
case "inactive":
up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
case "powering_on", "powering_off":
nextCheck = time.Minute
up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
case "queued", "provisioning", "reinstalling", "post_provisioning":
nextCheck = time.Minute
up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioning
default:
klog.Warningf("Device %s has unexpected status: %q", m.ProviderID, dev.State)
nextCheck = time.Minute
up.ProviderStatus.ProviderStatus = model.ProviderStatusUnknown
}
if !applyUpdate(&up, &m) {
u.sinbin.Add(m.ProviderID, time.Now().Add(nextCheck))
continue
}
klog.Infof("Device %s has new data:", m.ProviderID)
updateLog(&up)
err = sess.Transact(ctx, func(q *model.Queries) error {
return q.MachineUpdateProviderStatus(ctx, up)
})
if err != nil {
klog.Warningf("Device %s failed to update: %v", m.ProviderID, err)
}
u.sinbin.Add(m.ProviderID, time.Now().Add(time.Minute))
}
return nil
}