| package manager | 
 |  | 
 | import ( | 
 | 	"context" | 
 | 	"errors" | 
 | 	"flag" | 
 | 	"fmt" | 
 | 	"net/netip" | 
 | 	"sort" | 
 | 	"time" | 
 |  | 
 | 	"github.com/google/uuid" | 
 | 	"golang.org/x/time/rate" | 
 | 	"k8s.io/klog/v2" | 
 |  | 
 | 	"source.monogon.dev/cloud/bmaas/bmdb" | 
 | 	"source.monogon.dev/cloud/bmaas/bmdb/metrics" | 
 | 	"source.monogon.dev/cloud/bmaas/bmdb/model" | 
 | 	"source.monogon.dev/cloud/shepherd" | 
 | 	"source.monogon.dev/go/mflags" | 
 | ) | 
 |  | 
 | // Provisioner implements the server provisioning logic. Provisioning entails | 
 | // bringing all available machines (subject to limits) into BMDB. | 
 | type Provisioner struct { | 
 | 	ProvisionerConfig | 
 | 	p shepherd.Provider | 
 | } | 
 |  | 
 | // ProvisionerConfig configures the provisioning process. | 
 | type ProvisionerConfig struct { | 
 | 	// MaxCount is the maximum count of managed servers. No new devices will be | 
 | 	// created after reaching the limit. No attempt will be made to reduce the | 
 | 	// server count. | 
 | 	MaxCount uint | 
 |  | 
 | 	// ReconcileLoopLimiter limits the rate of the main reconciliation loop | 
 | 	// iterating. | 
 | 	ReconcileLoopLimiter *rate.Limiter | 
 |  | 
 | 	// DeviceCreation limits the rate at which devices are created. | 
 | 	DeviceCreationLimiter *rate.Limiter | 
 |  | 
 | 	// ChunkSize is how many machines will try to be spawned in a | 
 | 	// single reconciliation loop. Higher numbers allow for faster initial | 
 | 	// provisioning, but lower numbers decrease potential raciness with other systems | 
 | 	// and make sure that other parts of the reconciliation logic are ran regularly. | 
 | 	// | 
 | 	// 20 is decent starting point. | 
 | 	ChunkSize uint | 
 | } | 
 |  | 
 | func (pc *ProvisionerConfig) RegisterFlags() { | 
 | 	flag.UintVar(&pc.MaxCount, "provisioner_max_machines", 50, "Limit of machines that the provisioner will attempt to pull into the BMDB. Zero for no limit.") | 
 | 	mflags.Limiter(&pc.ReconcileLoopLimiter, "provisioner_reconciler_rate", "1m,1", "Rate limiting for main provisioner reconciliation loop") | 
 | 	mflags.Limiter(&pc.DeviceCreationLimiter, "provisioner_device_creation_rate", "5s,1", "Rate limiting for machine creation") | 
 | 	flag.UintVar(&pc.ChunkSize, "provisioner_reservation_chunk_size", 20, "How many machines will the provisioner attempt to create in a single reconciliation loop iteration") | 
 | } | 
 |  | 
 | func (pc *ProvisionerConfig) check() error { | 
 | 	// If these are unset, it's probably because someone is using us as a library. | 
 | 	// Provide error messages useful to code users instead of flag names. | 
 | 	if pc.ReconcileLoopLimiter == nil { | 
 | 		return fmt.Errorf("ReconcileLoopLimiter must be set") | 
 | 	} | 
 | 	if pc.DeviceCreationLimiter == nil { | 
 | 		return fmt.Errorf("DeviceCreationLimiter must be set") | 
 | 	} | 
 | 	if pc.ChunkSize == 0 { | 
 | 		return fmt.Errorf("ChunkSize must be set") | 
 | 	} | 
 | 	return nil | 
 | } | 
 |  | 
 | // NewProvisioner creates a Provisioner instance, checking ProvisionerConfig and | 
 | // providerConfig for errors. | 
 | func NewProvisioner(p shepherd.Provider, pc ProvisionerConfig) (*Provisioner, error) { | 
 | 	if err := pc.check(); err != nil { | 
 | 		return nil, err | 
 | 	} | 
 |  | 
 | 	return &Provisioner{ | 
 | 		ProvisionerConfig: pc, | 
 | 		p:                 p, | 
 | 	}, nil | 
 | } | 
 |  | 
 | // Run the provisioner blocking the current goroutine until the given context | 
 | // expires. | 
 | func (p *Provisioner) Run(ctx context.Context, conn *bmdb.Connection) error { | 
 |  | 
 | 	var sess *bmdb.Session | 
 | 	var err error | 
 | 	for { | 
 | 		if sess == nil { | 
 | 			sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdProvisioner}) | 
 | 			if err != nil { | 
 | 				return fmt.Errorf("could not start BMDB session: %w", err) | 
 | 			} | 
 | 		} | 
 | 		err = p.runInSession(ctx, sess) | 
 |  | 
 | 		switch { | 
 | 		case err == nil: | 
 | 		case errors.Is(err, ctx.Err()): | 
 | 			return err | 
 | 		case errors.Is(err, bmdb.ErrSessionExpired): | 
 | 			klog.Errorf("Session expired, restarting...") | 
 | 			sess = nil | 
 | 			time.Sleep(time.Second) | 
 | 		case err != nil: | 
 | 			klog.Errorf("Processing failed: %v", err) | 
 | 			// TODO(q3k): close session | 
 | 			time.Sleep(time.Second) | 
 | 		} | 
 | 	} | 
 | } | 
 |  | 
 | type machineListing struct { | 
 | 	machines []shepherd.Machine | 
 | 	err      error | 
 | } | 
 |  | 
 | // runInSession executes one iteration of the provisioner's control loop within a | 
 | // BMDB session. This control loop attempts to bring all capacity into machines in | 
 | // the BMDB, subject to limits. | 
 | func (p *Provisioner) runInSession(ctx context.Context, sess *bmdb.Session) error { | 
 | 	if err := p.ReconcileLoopLimiter.Wait(ctx); err != nil { | 
 | 		return err | 
 | 	} | 
 |  | 
 | 	providerC := make(chan *machineListing, 1) | 
 | 	bmdbC := make(chan *machineListing, 1) | 
 |  | 
 | 	klog.Infof("Getting provider and bmdb machines...") | 
 |  | 
 | 	// Make sub-context for two parallel operations, and so that we can cancel one | 
 | 	// immediately if the other fails. | 
 | 	subCtx, subCtxC := context.WithCancel(ctx) | 
 | 	defer subCtxC() | 
 |  | 
 | 	go func() { | 
 | 		machines, err := p.listInProvider(subCtx) | 
 | 		providerC <- &machineListing{ | 
 | 			machines: machines, | 
 | 			err:      err, | 
 | 		} | 
 | 	}() | 
 | 	go func() { | 
 | 		machines, err := p.listInBMDB(subCtx, sess) | 
 | 		bmdbC <- &machineListing{ | 
 | 			machines: machines, | 
 | 			err:      err, | 
 | 		} | 
 | 	}() | 
 | 	var inProvider, inBMDB *machineListing | 
 | 	for { | 
 | 		select { | 
 | 		case inProvider = <-providerC: | 
 | 			if err := inProvider.err; err != nil { | 
 | 				return fmt.Errorf("listing provider machines failed: %w", err) | 
 | 			} | 
 | 			klog.Infof("Got %d machines in provider.", len(inProvider.machines)) | 
 | 		case inBMDB = <-bmdbC: | 
 | 			if err := inBMDB.err; err != nil { | 
 | 				return fmt.Errorf("listing BMDB machines failed: %w", err) | 
 | 			} | 
 | 			klog.Infof("Got %d machines in BMDB.", len(inBMDB.machines)) | 
 | 		} | 
 | 		if inProvider != nil && inBMDB != nil { | 
 | 			break | 
 | 		} | 
 | 	} | 
 |  | 
 | 	subCtxC() | 
 | 	if err := p.reconcile(ctx, sess, inProvider.machines, inBMDB.machines); err != nil { | 
 | 		return fmt.Errorf("reconciliation failed: %w", err) | 
 | 	} | 
 | 	return nil | 
 | } | 
 |  | 
 | // listInProviders returns all machines that the provider thinks we should be | 
 | // managing. | 
 | func (p *Provisioner) listInProvider(ctx context.Context) ([]shepherd.Machine, error) { | 
 | 	machines, err := p.p.ListMachines(ctx) | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("while fetching managed machines: %w", err) | 
 | 	} | 
 | 	sort.Slice(machines, func(i, j int) bool { | 
 | 		return machines[i].ID() < machines[j].ID() | 
 | 	}) | 
 | 	return machines, nil | 
 | } | 
 |  | 
 | type providedMachine struct { | 
 | 	model.MachineProvided | 
 | } | 
 |  | 
 | func (p providedMachine) ID() shepherd.ProviderID { | 
 | 	return shepherd.ProviderID(p.ProviderID) | 
 | } | 
 |  | 
 | func (p providedMachine) Addr() netip.Addr { | 
 | 	if !p.ProviderIpAddress.Valid { | 
 | 		return netip.Addr{} | 
 | 	} | 
 |  | 
 | 	addr, err := netip.ParseAddr(p.ProviderIpAddress.String) | 
 | 	if err != nil { | 
 | 		return netip.Addr{} | 
 | 	} | 
 | 	return addr | 
 | } | 
 |  | 
 | func (p providedMachine) State() shepherd.State { | 
 | 	return shepherd.StateKnownUsed | 
 | } | 
 |  | 
 | // listInBMDB returns all the machines that the BMDB thinks we should be managing. | 
 | func (p *Provisioner) listInBMDB(ctx context.Context, sess *bmdb.Session) ([]shepherd.Machine, error) { | 
 | 	var res []shepherd.Machine | 
 | 	err := sess.Transact(ctx, func(q *model.Queries) error { | 
 | 		machines, err := q.GetProvidedMachines(ctx, p.p.Type()) | 
 | 		if err != nil { | 
 | 			return err | 
 | 		} | 
 | 		res = make([]shepherd.Machine, 0, len(machines)) | 
 | 		for _, machine := range machines { | 
 | 			_, err := uuid.Parse(machine.ProviderID) | 
 | 			if err != nil { | 
 | 				klog.Errorf("BMDB machine %s has unparseable provider ID %q", machine.MachineID, machine.ProviderID) | 
 | 				continue | 
 | 			} | 
 |  | 
 | 			res = append(res, providedMachine{machine}) | 
 | 		} | 
 | 		return nil | 
 | 	}) | 
 | 	if err != nil { | 
 | 		return nil, err | 
 | 	} | 
 | 	sort.Slice(res, func(i, j int) bool { | 
 | 		return res[i].ID() < res[j].ID() | 
 | 	}) | 
 | 	return res, nil | 
 | } | 
 |  | 
 | // resolvePossiblyUsed checks if the state is set to possibly used and finds out | 
 | // which state is the correct one. | 
 | func (p *Provisioner) resolvePossiblyUsed(machine shepherd.Machine, providedMachines map[shepherd.ProviderID]bool) shepherd.State { | 
 | 	state, id := machine.State(), machine.ID() | 
 |  | 
 | 	// Bail out if this isn't a possibly used state. | 
 | 	if state != shepherd.StatePossiblyUsed { | 
 | 		return state | 
 | 	} | 
 |  | 
 | 	// If a machine does not have a valid id, its always seen as unused. | 
 | 	if !id.IsValid() { | 
 | 		return shepherd.StateKnownUnused | 
 | 	} | 
 |  | 
 | 	// If the machine is not inside the bmdb, it's seen as unused. | 
 | 	if _, ok := providedMachines[id]; !ok { | 
 | 		return shepherd.StateKnownUnused | 
 | 	} | 
 |  | 
 | 	return shepherd.StateKnownUsed | 
 | } | 
 |  | 
 | // reconcile takes a list of machines that the provider thinks we should be | 
 | // managing and that the BMDB thinks we should be managing, and tries to make | 
 | // sense of that. First, some checks are performed across the two lists to make | 
 | // sure we haven't dropped anything. Then, additional machines are deployed from | 
 | // hardware reservations as needed. | 
 | func (p *Provisioner) reconcile(ctx context.Context, sess *bmdb.Session, inProvider, bmdbMachines []shepherd.Machine) error { | 
 | 	klog.Infof("Reconciling...") | 
 |  | 
 | 	bmdb := make(map[shepherd.ProviderID]bool) | 
 | 	for _, machine := range bmdbMachines { | 
 | 		// Dont check the state here as its hardcoded to be known used. | 
 | 		bmdb[machine.ID()] = true | 
 | 	} | 
 |  | 
 | 	var availableMachines []shepherd.Machine | 
 | 	provider := make(map[shepherd.ProviderID]bool) | 
 | 	for _, machine := range inProvider { | 
 | 		state := p.resolvePossiblyUsed(machine, bmdb) | 
 |  | 
 | 		switch state { | 
 | 		case shepherd.StateKnownUnused: | 
 | 			availableMachines = append(availableMachines, machine) | 
 |  | 
 | 		case shepherd.StateKnownUsed: | 
 | 			provider[machine.ID()] = true | 
 |  | 
 | 		default: | 
 | 			return fmt.Errorf("machine has invalid state (ID: %s, Addr: %s): %s", machine.ID(), machine.Addr(), state) | 
 | 		} | 
 | 	} | 
 |  | 
 | 	managed := make(map[shepherd.ProviderID]bool) | 
 |  | 
 | 	// Some desynchronization between the BMDB and Provider point of view might be so | 
 | 	// bad we shouldn't attempt to do any work, at least not any time soon. | 
 | 	badbadnotgood := false | 
 |  | 
 | 	// Find any machines supposedly managed by us in the provider, but not in the | 
 | 	// BMDB. | 
 | 	for machine, _ := range provider { | 
 | 		if bmdb[machine] { | 
 | 			managed[machine] = true | 
 | 			continue | 
 | 		} | 
 | 		klog.Errorf("Provider machine %s has no corresponding machine in BMDB.", machine) | 
 | 		badbadnotgood = true | 
 | 	} | 
 |  | 
 | 	// Find any machines in the BMDB but not in the provider. | 
 | 	for machine, _ := range bmdb { | 
 | 		if !provider[machine] { | 
 | 			klog.Errorf("Provider device ID %s referred to in BMDB (from TODO) but missing in provider.", machine) | 
 | 			badbadnotgood = true | 
 | 		} | 
 | 	} | 
 |  | 
 | 	// Bail if things are weird. | 
 | 	if badbadnotgood { | 
 | 		klog.Errorf("Something's very wrong. Bailing early and refusing to do any work.") | 
 | 		return fmt.Errorf("fatal discrepency between BMDB and provider") | 
 | 	} | 
 |  | 
 | 	// Summarize all managed machines, which is the intersection of BMDB and | 
 | 	// Provisioner machines, usually both of these sets being equal. | 
 | 	nmanaged := len(managed) | 
 | 	klog.Infof("Total managed machines: %d", nmanaged) | 
 |  | 
 | 	if p.MaxCount != 0 && p.MaxCount <= uint(nmanaged) { | 
 | 		klog.Infof("Not bringing up more machines (at limit of %d machines)", p.MaxCount) | 
 | 		return nil | 
 | 	} | 
 |  | 
 | 	limitName := "no limit" | 
 | 	if p.MaxCount != 0 { | 
 | 		limitName = fmt.Sprintf("%d", p.MaxCount) | 
 | 	} | 
 | 	klog.Infof("Below managed machine limit (%s), bringing up more...", limitName) | 
 |  | 
 | 	if len(availableMachines) == 0 { | 
 | 		klog.Infof("No more capacity available.") | 
 | 		return nil | 
 | 	} | 
 |  | 
 | 	toProvision := availableMachines | 
 | 	// Limit them to MaxCount, if applicable. | 
 | 	if p.MaxCount != 0 { | 
 | 		needed := int(p.MaxCount) - nmanaged | 
 | 		if len(toProvision) < needed { | 
 | 			needed = len(toProvision) | 
 | 		} | 
 | 		toProvision = toProvision[:needed] | 
 | 	} | 
 |  | 
 | 	// Limit them to an arbitrary 'chunk' size so that we don't do too many things in | 
 | 	// a single reconciliation operation. | 
 | 	if uint(len(toProvision)) > p.ChunkSize { | 
 | 		toProvision = toProvision[:p.ChunkSize] | 
 | 	} | 
 |  | 
 | 	if len(toProvision) == 0 { | 
 | 		klog.Infof("No more unused machines available, or all filtered out.") | 
 | 		return nil | 
 | 	} | 
 |  | 
 | 	klog.Infof("Bringing up %d machines...", len(toProvision)) | 
 | 	for _, machine := range toProvision { | 
 | 		if err := p.DeviceCreationLimiter.Wait(ctx); err != nil { | 
 | 			return err | 
 | 		} | 
 |  | 
 | 		nd, err := p.p.CreateMachine(ctx, sess, shepherd.CreateMachineRequest{ | 
 | 			UnusedMachine: machine, | 
 | 		}) | 
 | 		if err != nil { | 
 | 			klog.Errorf("while creating new device (ID: %s, Addr: %s, State: %s): %w", machine.ID(), machine.Addr(), machine.State(), err) | 
 | 			continue | 
 | 		} | 
 | 		klog.Infof("Created new machine with ID: %s", nd.ID()) | 
 | 	} | 
 |  | 
 | 	return nil | 
 | } |