| package manager |
| |
| import ( |
| "context" |
| "errors" |
| "flag" |
| "fmt" |
| "net/netip" |
| "sort" |
| "time" |
| |
| "github.com/google/uuid" |
| "golang.org/x/time/rate" |
| "k8s.io/klog/v2" |
| |
| "source.monogon.dev/cloud/bmaas/bmdb" |
| "source.monogon.dev/cloud/bmaas/bmdb/metrics" |
| "source.monogon.dev/cloud/bmaas/bmdb/model" |
| "source.monogon.dev/cloud/shepherd" |
| "source.monogon.dev/go/mflags" |
| ) |
| |
| // Provisioner implements the server provisioning logic. Provisioning entails |
| // bringing all available machines (subject to limits) into BMDB. |
| type Provisioner struct { |
| ProvisionerConfig |
| p shepherd.Provider |
| } |
| |
| // ProvisionerConfig configures the provisioning process. |
| type ProvisionerConfig struct { |
| // MaxCount is the maximum count of managed servers. No new devices will be |
| // created after reaching the limit. No attempt will be made to reduce the |
| // server count. |
| MaxCount uint |
| |
| // ReconcileLoopLimiter limits the rate of the main reconciliation loop |
| // iterating. |
| ReconcileLoopLimiter *rate.Limiter |
| |
| // DeviceCreation limits the rate at which devices are created. |
| DeviceCreationLimiter *rate.Limiter |
| |
| // ChunkSize is how many machines will try to be spawned in a |
| // single reconciliation loop. Higher numbers allow for faster initial |
| // provisioning, but lower numbers decrease potential raciness with other systems |
| // and make sure that other parts of the reconciliation logic are ran regularly. |
| // |
| // 20 is decent starting point. |
| ChunkSize uint |
| } |
| |
| func (pc *ProvisionerConfig) RegisterFlags() { |
| flag.UintVar(&pc.MaxCount, "provisioner_max_machines", 50, "Limit of machines that the provisioner will attempt to pull into the BMDB. Zero for no limit.") |
| mflags.Limiter(&pc.ReconcileLoopLimiter, "provisioner_reconciler_rate", "1m,1", "Rate limiting for main provisioner reconciliation loop") |
| mflags.Limiter(&pc.DeviceCreationLimiter, "provisioner_device_creation_rate", "5s,1", "Rate limiting for machine creation") |
| flag.UintVar(&pc.ChunkSize, "provisioner_reservation_chunk_size", 20, "How many machines will the provisioner attempt to create in a single reconciliation loop iteration") |
| } |
| |
| func (pc *ProvisionerConfig) check() error { |
| // If these are unset, it's probably because someone is using us as a library. |
| // Provide error messages useful to code users instead of flag names. |
| if pc.ReconcileLoopLimiter == nil { |
| return fmt.Errorf("ReconcileLoopLimiter must be set") |
| } |
| if pc.DeviceCreationLimiter == nil { |
| return fmt.Errorf("DeviceCreationLimiter must be set") |
| } |
| if pc.ChunkSize == 0 { |
| return fmt.Errorf("ChunkSize must be set") |
| } |
| return nil |
| } |
| |
| // NewProvisioner creates a Provisioner instance, checking ProvisionerConfig and |
| // providerConfig for errors. |
| func NewProvisioner(p shepherd.Provider, pc ProvisionerConfig) (*Provisioner, error) { |
| if err := pc.check(); err != nil { |
| return nil, err |
| } |
| |
| return &Provisioner{ |
| ProvisionerConfig: pc, |
| p: p, |
| }, nil |
| } |
| |
| // Run the provisioner blocking the current goroutine until the given context |
| // expires. |
| func (p *Provisioner) Run(ctx context.Context, conn *bmdb.Connection) error { |
| |
| var sess *bmdb.Session |
| var err error |
| for { |
| if sess == nil { |
| sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdProvisioner}) |
| if err != nil { |
| return fmt.Errorf("could not start BMDB session: %w", err) |
| } |
| } |
| err = p.runInSession(ctx, sess) |
| |
| switch { |
| case err == nil: |
| case errors.Is(err, ctx.Err()): |
| return err |
| case errors.Is(err, bmdb.ErrSessionExpired): |
| klog.Errorf("Session expired, restarting...") |
| sess = nil |
| time.Sleep(time.Second) |
| case err != nil: |
| klog.Errorf("Processing failed: %v", err) |
| // TODO(q3k): close session |
| time.Sleep(time.Second) |
| } |
| } |
| } |
| |
| type machineListing struct { |
| machines []shepherd.Machine |
| err error |
| } |
| |
| // runInSession executes one iteration of the provisioner's control loop within a |
| // BMDB session. This control loop attempts to bring all capacity into machines in |
| // the BMDB, subject to limits. |
| func (p *Provisioner) runInSession(ctx context.Context, sess *bmdb.Session) error { |
| if err := p.ReconcileLoopLimiter.Wait(ctx); err != nil { |
| return err |
| } |
| |
| providerC := make(chan *machineListing, 1) |
| bmdbC := make(chan *machineListing, 1) |
| |
| klog.Infof("Getting provider and bmdb machines...") |
| |
| // Make sub-context for two parallel operations, and so that we can cancel one |
| // immediately if the other fails. |
| subCtx, subCtxC := context.WithCancel(ctx) |
| defer subCtxC() |
| |
| go func() { |
| machines, err := p.listInProvider(subCtx) |
| providerC <- &machineListing{ |
| machines: machines, |
| err: err, |
| } |
| }() |
| go func() { |
| machines, err := p.listInBMDB(subCtx, sess) |
| bmdbC <- &machineListing{ |
| machines: machines, |
| err: err, |
| } |
| }() |
| var inProvider, inBMDB *machineListing |
| for { |
| select { |
| case inProvider = <-providerC: |
| if err := inProvider.err; err != nil { |
| return fmt.Errorf("listing provider machines failed: %w", err) |
| } |
| klog.Infof("Got %d machines in provider.", len(inProvider.machines)) |
| case inBMDB = <-bmdbC: |
| if err := inBMDB.err; err != nil { |
| return fmt.Errorf("listing BMDB machines failed: %w", err) |
| } |
| klog.Infof("Got %d machines in BMDB.", len(inBMDB.machines)) |
| } |
| if inProvider != nil && inBMDB != nil { |
| break |
| } |
| } |
| |
| subCtxC() |
| if err := p.reconcile(ctx, sess, inProvider.machines, inBMDB.machines); err != nil { |
| return fmt.Errorf("reconciliation failed: %w", err) |
| } |
| return nil |
| } |
| |
| // listInProviders returns all machines that the provider thinks we should be |
| // managing. |
| func (p *Provisioner) listInProvider(ctx context.Context) ([]shepherd.Machine, error) { |
| machines, err := p.p.ListMachines(ctx) |
| if err != nil { |
| return nil, fmt.Errorf("while fetching managed machines: %w", err) |
| } |
| sort.Slice(machines, func(i, j int) bool { |
| return machines[i].ID() < machines[j].ID() |
| }) |
| return machines, nil |
| } |
| |
| type providedMachine struct { |
| model.MachineProvided |
| } |
| |
| func (p providedMachine) ID() shepherd.ProviderID { |
| return shepherd.ProviderID(p.ProviderID) |
| } |
| |
| func (p providedMachine) Addr() netip.Addr { |
| if !p.ProviderIpAddress.Valid { |
| return netip.Addr{} |
| } |
| |
| addr, err := netip.ParseAddr(p.ProviderIpAddress.String) |
| if err != nil { |
| return netip.Addr{} |
| } |
| return addr |
| } |
| |
| func (p providedMachine) State() shepherd.State { |
| return shepherd.StateKnownUsed |
| } |
| |
| // listInBMDB returns all the machines that the BMDB thinks we should be managing. |
| func (p *Provisioner) listInBMDB(ctx context.Context, sess *bmdb.Session) ([]shepherd.Machine, error) { |
| var res []shepherd.Machine |
| err := sess.Transact(ctx, func(q *model.Queries) error { |
| machines, err := q.GetProvidedMachines(ctx, p.p.Type()) |
| if err != nil { |
| return err |
| } |
| res = make([]shepherd.Machine, 0, len(machines)) |
| for _, machine := range machines { |
| _, err := uuid.Parse(machine.ProviderID) |
| if err != nil { |
| klog.Errorf("BMDB machine %s has unparseable provider ID %q", machine.MachineID, machine.ProviderID) |
| continue |
| } |
| |
| res = append(res, providedMachine{machine}) |
| } |
| return nil |
| }) |
| if err != nil { |
| return nil, err |
| } |
| sort.Slice(res, func(i, j int) bool { |
| return res[i].ID() < res[j].ID() |
| }) |
| return res, nil |
| } |
| |
| // resolvePossiblyUsed checks if the state is set to possibly used and finds out |
| // which state is the correct one. |
| func (p *Provisioner) resolvePossiblyUsed(machine shepherd.Machine, providedMachines map[shepherd.ProviderID]bool) shepherd.State { |
| state, id := machine.State(), machine.ID() |
| |
| // Bail out if this isn't a possibly used state. |
| if state != shepherd.StatePossiblyUsed { |
| return state |
| } |
| |
| // If a machine does not have a valid id, its always seen as unused. |
| if !id.IsValid() { |
| return shepherd.StateKnownUnused |
| } |
| |
| // If the machine is not inside the bmdb, it's seen as unused. |
| if _, ok := providedMachines[id]; !ok { |
| return shepherd.StateKnownUnused |
| } |
| |
| return shepherd.StateKnownUsed |
| } |
| |
| // reconcile takes a list of machines that the provider thinks we should be |
| // managing and that the BMDB thinks we should be managing, and tries to make |
| // sense of that. First, some checks are performed across the two lists to make |
| // sure we haven't dropped anything. Then, additional machines are deployed from |
| // hardware reservations as needed. |
| func (p *Provisioner) reconcile(ctx context.Context, sess *bmdb.Session, inProvider, bmdbMachines []shepherd.Machine) error { |
| klog.Infof("Reconciling...") |
| |
| bmdb := make(map[shepherd.ProviderID]bool) |
| for _, machine := range bmdbMachines { |
| // Dont check the state here as its hardcoded to be known used. |
| bmdb[machine.ID()] = true |
| } |
| |
| var availableMachines []shepherd.Machine |
| provider := make(map[shepherd.ProviderID]bool) |
| for _, machine := range inProvider { |
| state := p.resolvePossiblyUsed(machine, bmdb) |
| |
| switch state { |
| case shepherd.StateKnownUnused: |
| availableMachines = append(availableMachines, machine) |
| |
| case shepherd.StateKnownUsed: |
| provider[machine.ID()] = true |
| |
| default: |
| return fmt.Errorf("machine has invalid state (ID: %s, Addr: %s): %s", machine.ID(), machine.Addr(), state) |
| } |
| } |
| |
| managed := make(map[shepherd.ProviderID]bool) |
| |
| // Some desynchronization between the BMDB and Provider point of view might be so |
| // bad we shouldn't attempt to do any work, at least not any time soon. |
| badbadnotgood := false |
| |
| // Find any machines supposedly managed by us in the provider, but not in the |
| // BMDB. |
| for machine, _ := range provider { |
| if bmdb[machine] { |
| managed[machine] = true |
| continue |
| } |
| klog.Errorf("Provider machine %s has no corresponding machine in BMDB.", machine) |
| badbadnotgood = true |
| } |
| |
| // Find any machines in the BMDB but not in the provider. |
| for machine, _ := range bmdb { |
| if !provider[machine] { |
| klog.Errorf("Provider device ID %s referred to in BMDB (from TODO) but missing in provider.", machine) |
| badbadnotgood = true |
| } |
| } |
| |
| // Bail if things are weird. |
| if badbadnotgood { |
| klog.Errorf("Something's very wrong. Bailing early and refusing to do any work.") |
| return fmt.Errorf("fatal discrepency between BMDB and provider") |
| } |
| |
| // Summarize all managed machines, which is the intersection of BMDB and |
| // Provisioner machines, usually both of these sets being equal. |
| nmanaged := len(managed) |
| klog.Infof("Total managed machines: %d", nmanaged) |
| |
| if p.MaxCount != 0 && p.MaxCount <= uint(nmanaged) { |
| klog.Infof("Not bringing up more machines (at limit of %d machines)", p.MaxCount) |
| return nil |
| } |
| |
| limitName := "no limit" |
| if p.MaxCount != 0 { |
| limitName = fmt.Sprintf("%d", p.MaxCount) |
| } |
| klog.Infof("Below managed machine limit (%s), bringing up more...", limitName) |
| |
| if len(availableMachines) == 0 { |
| klog.Infof("No more capacity available.") |
| return nil |
| } |
| |
| toProvision := availableMachines |
| // Limit them to MaxCount, if applicable. |
| if p.MaxCount != 0 { |
| needed := int(p.MaxCount) - nmanaged |
| if len(toProvision) < needed { |
| needed = len(toProvision) |
| } |
| toProvision = toProvision[:needed] |
| } |
| |
| // Limit them to an arbitrary 'chunk' size so that we don't do too many things in |
| // a single reconciliation operation. |
| if uint(len(toProvision)) > p.ChunkSize { |
| toProvision = toProvision[:p.ChunkSize] |
| } |
| |
| if len(toProvision) == 0 { |
| klog.Infof("No more unused machines available, or all filtered out.") |
| return nil |
| } |
| |
| klog.Infof("Bringing up %d machines...", len(toProvision)) |
| for _, machine := range toProvision { |
| if err := p.DeviceCreationLimiter.Wait(ctx); err != nil { |
| return err |
| } |
| |
| nd, err := p.p.CreateMachine(ctx, sess, shepherd.CreateMachineRequest{ |
| UnusedMachine: machine, |
| }) |
| if err != nil { |
| klog.Errorf("while creating new device (ID: %s, Addr: %s, State: %s): %w", machine.ID(), machine.Addr(), machine.State(), err) |
| continue |
| } |
| klog.Infof("Created new machine with ID: %s", nd.ID()) |
| } |
| |
| return nil |
| } |