| package manager |
| |
| import ( |
| "context" |
| "errors" |
| "flag" |
| "fmt" |
| "sort" |
| "time" |
| |
| "github.com/google/uuid" |
| "github.com/packethost/packngo" |
| "golang.org/x/time/rate" |
| "k8s.io/klog/v2" |
| |
| "source.monogon.dev/cloud/bmaas/bmdb" |
| "source.monogon.dev/cloud/bmaas/bmdb/metrics" |
| "source.monogon.dev/cloud/bmaas/bmdb/model" |
| "source.monogon.dev/cloud/lib/sinbin" |
| ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo" |
| ) |
| |
| // ProvisionerConfig configures the provisioning process. |
| type ProvisionerConfig struct { |
| // OS defines the operating system new devices are created with. Its format |
| // is specified by Equinix API. |
| OS string |
| // MaxCount is the maximum count of managed servers. No new devices will be |
| // created after reaching the limit. No attempt will be made to reduce the |
| // server count. |
| MaxCount uint |
| |
| // ReconcileLoopLimiter limits the rate of the main reconciliation loop |
| // iterating. As new machines are being provisioned, each loop will cause one |
| // 'long' ListHardwareReservations call to Equinix. |
| ReconcileLoopLimiter *rate.Limiter |
| |
| // DeviceCreation limits the rate at which devices are created within |
| // Equinix through use of appropriate API calls. |
| DeviceCreationLimiter *rate.Limiter |
| |
| // Assimilate Equinix machines that match the configured device prefix into the |
| // BMDB as Provided. This should only be used for manual testing with |
| // -bmdb_eat_my_data. |
| Assimilate bool |
| |
| // ReservationChunkSize is how many Equinix machines will try to be spawned in a |
| // single reconciliation loop. Higher numbers allow for faster initial |
| // provisioning, but lower numbers decrease potential raciness with other systems |
| // and make sure that other parts of the reconciliation logic are ran regularly. |
| // |
| // 20 is decent starting point. |
| ReservationChunkSize uint |
| |
| // UseProjectKeys defines if the provisioner adds all ssh keys defined inside |
| // the used project to every new machine. This is only used for debug purposes. |
| UseProjectKeys bool |
| } |
| |
| func (p *ProvisionerConfig) RegisterFlags() { |
| flag.StringVar(&p.OS, "provisioner_os", "ubuntu_20_04", "OS that provisioner will deploy on Equinix machines. Not the target OS for cluster customers.") |
| flag.UintVar(&p.MaxCount, "provisioner_max_machines", 50, "Limit of machines that the provisioner will attempt to pull into the BMDB. Zero for no limit.") |
| flagLimiter(&p.ReconcileLoopLimiter, "provisioner_reconciler_rate", "1m,1", "Rate limiting for main provisioner reconciliation loop") |
| flagLimiter(&p.DeviceCreationLimiter, "provisioner_device_creation_rate", "5s,1", "Rate limiting for Equinix device/machine creation") |
| flag.BoolVar(&p.Assimilate, "provisioner_assimilate", false, "Assimilate matching machines in Equinix project into BMDB as Provided. Only to be used when manually testing.") |
| flag.UintVar(&p.ReservationChunkSize, "provisioner_reservation_chunk_size", 20, "How many machines will the provisioner attempt to create in a single reconciliation loop iteration") |
| flag.BoolVar(&p.UseProjectKeys, "provisioner_use_project_keys", false, "Add all Equinix project keys to newly provisioned machines, not just the provisioner's managed key. Debug/development only.") |
| } |
| |
| // Provisioner implements the server provisioning logic. Provisioning entails |
| // bringing all available hardware reservations (subject to limits) into BMDB as |
| // machines provided by Equinix. |
| type Provisioner struct { |
| config *ProvisionerConfig |
| sharedConfig *SharedConfig |
| |
| // cl is the wrapngo client instance used. |
| cl ecl.Client |
| |
| // badReservations is a holiday resort for Equinix hardware reservations which |
| // failed to be provisioned for some reason or another. We keep a list of them in |
| // memory just so that we don't repeatedly try to provision the same known bad |
| // machines. |
| badReservations sinbin.Sinbin[string] |
| } |
| |
| // New creates a Provisioner instance, checking ProvisionerConfig and |
| // SharedConfig for errors. |
| func (c *ProvisionerConfig) New(cl ecl.Client, sc *SharedConfig) (*Provisioner, error) { |
| // If these are unset, it's probably because someone is using us as a library. |
| // Provide error messages useful to code users instead of flag names. |
| if c.OS == "" { |
| return nil, fmt.Errorf("OS must be set") |
| } |
| if c.ReconcileLoopLimiter == nil { |
| return nil, fmt.Errorf("ReconcileLoopLimiter must be set") |
| } |
| if c.DeviceCreationLimiter == nil { |
| return nil, fmt.Errorf("DeviceCreationLimiter must be set") |
| } |
| if c.ReservationChunkSize == 0 { |
| return nil, fmt.Errorf("ReservationChunkSize must be set") |
| } |
| return &Provisioner{ |
| config: c, |
| sharedConfig: sc, |
| |
| cl: cl, |
| }, nil |
| } |
| |
| // Run the provisioner blocking the current goroutine until the given context |
| // expires. |
| func (p *Provisioner) Run(ctx context.Context, conn *bmdb.Connection) error { |
| |
| var sess *bmdb.Session |
| var err error |
| for { |
| if sess == nil { |
| sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdProvisioner}) |
| if err != nil { |
| return fmt.Errorf("could not start BMDB session: %w", err) |
| } |
| } |
| err = p.runInSession(ctx, sess) |
| |
| switch { |
| case err == nil: |
| case errors.Is(err, ctx.Err()): |
| return err |
| case errors.Is(err, bmdb.ErrSessionExpired): |
| klog.Errorf("Session expired, restarting...") |
| sess = nil |
| time.Sleep(time.Second) |
| case err != nil: |
| klog.Errorf("Processing failed: %v", err) |
| // TODO(q3k): close session |
| time.Sleep(time.Second) |
| } |
| } |
| } |
| |
| type machineListing struct { |
| machines []uuid.UUID |
| err error |
| } |
| |
| // runInSession executes one iteration of the provisioner's control loop within a |
| // BMDB session. This control loop attempts to bring all Equinix hardware |
| // reservations into machines in the BMDB, subject to limits. |
| func (p *Provisioner) runInSession(ctx context.Context, sess *bmdb.Session) error { |
| if err := p.config.ReconcileLoopLimiter.Wait(ctx); err != nil { |
| return err |
| } |
| |
| providerC := make(chan *machineListing, 1) |
| bmdbC := make(chan *machineListing, 1) |
| |
| klog.Infof("Getting provider and bmdb machines...") |
| |
| // Make sub-context for two parallel operations, and so that we can cancel one |
| // immediately if the other fails. |
| subCtx, subCtxC := context.WithCancel(ctx) |
| defer subCtxC() |
| |
| go func() { |
| machines, err := p.listInProvider(subCtx) |
| providerC <- &machineListing{ |
| machines: machines, |
| err: err, |
| } |
| }() |
| go func() { |
| machines, err := p.listInBMDB(subCtx, sess) |
| bmdbC <- &machineListing{ |
| machines: machines, |
| err: err, |
| } |
| }() |
| var inProvider, inBMDB *machineListing |
| for { |
| select { |
| case inProvider = <-providerC: |
| if err := inProvider.err; err != nil { |
| return fmt.Errorf("listing provider machines failed: %w", err) |
| } |
| klog.Infof("Got %d machines managed in provider.", len(inProvider.machines)) |
| case inBMDB = <-bmdbC: |
| if err := inBMDB.err; err != nil { |
| return fmt.Errorf("listing BMDB machines failed: %w", err) |
| } |
| klog.Infof("Got %d machines in BMDB.", len(inBMDB.machines)) |
| } |
| if inProvider != nil && inBMDB != nil { |
| break |
| } |
| } |
| |
| subCtxC() |
| if err := p.reconcile(ctx, sess, inProvider.machines, inBMDB.machines); err != nil { |
| return fmt.Errorf("reconciliation failed: %w", err) |
| } |
| return nil |
| } |
| |
| // listInProviders returns all machines that the provider thinks we should be |
| // managing. |
| func (p *Provisioner) listInProvider(ctx context.Context) ([]uuid.UUID, error) { |
| devices, err := p.sharedConfig.managedDevices(ctx, p.cl) |
| if err != nil { |
| return nil, fmt.Errorf("while fetching managed machines: %w", err) |
| } |
| var pvr []uuid.UUID |
| for _, dev := range devices { |
| id, err := uuid.Parse(dev.ID) |
| if err != nil { |
| klog.Errorf("Device ID %q is not UUID, skipping", dev.ID) |
| } else { |
| pvr = append(pvr, id) |
| } |
| } |
| sort.Slice(pvr, func(i, j int) bool { |
| return pvr[i].String() < pvr[j].String() |
| }) |
| return pvr, nil |
| } |
| |
| // listInBMDB returns all the machines that the BMDB thinks we should be managing. |
| func (p *Provisioner) listInBMDB(ctx context.Context, sess *bmdb.Session) ([]uuid.UUID, error) { |
| var res []uuid.UUID |
| err := sess.Transact(ctx, func(q *model.Queries) error { |
| machines, err := q.GetProvidedMachines(ctx, model.ProviderEquinix) |
| if err != nil { |
| return err |
| } |
| res = make([]uuid.UUID, len(machines)) |
| for i, machine := range machines { |
| id, err := uuid.Parse(machine.ProviderID) |
| if err != nil { |
| klog.Errorf("BMDB machine %s has unparseable provider ID %q", machine.MachineID, machine.ProviderID) |
| } else { |
| res[i] = id |
| } |
| } |
| return nil |
| }) |
| if err != nil { |
| return nil, err |
| } |
| sort.Slice(res, func(i, j int) bool { |
| return res[i].String() < res[j].String() |
| }) |
| return res, nil |
| } |
| |
| // reconcile takes a list of machines that the provider thinks we should be |
| // managing and that the BMDB thinks we should be managing, and tries to make |
| // sense of that. First, some checks are performed across the two lists to make |
| // sure we haven't dropped anything. Then, additional machines are deployed from |
| // hardware reservations as needed. |
| func (p *Provisioner) reconcile(ctx context.Context, sess *bmdb.Session, inProvider, inBMDB []uuid.UUID) error { |
| klog.Infof("Reconciling...") |
| |
| bmdb := make(map[string]bool) |
| provider := make(map[string]bool) |
| for _, machine := range inProvider { |
| provider[machine.String()] = true |
| } |
| for _, machine := range inBMDB { |
| bmdb[machine.String()] = true |
| } |
| |
| managed := make(map[string]bool) |
| |
| // Some desynchronization between the BMDB and Provider point of view might be so |
| // bad we shouldn't attempt to do any work, at least not any time soon. |
| badbadnotgood := false |
| |
| // Find any machines supposedly managed by us in the provider, but not in the |
| // BMDB, and assimilate them if so configured. |
| for machine, _ := range provider { |
| if bmdb[machine] { |
| managed[machine] = true |
| continue |
| } |
| if p.config.Assimilate { |
| klog.Warningf("Provider machine %s has no corresponding machine in BMDB. Assimilating it.", machine) |
| if err := p.assimilate(ctx, sess, machine); err != nil { |
| klog.Errorf("Failed to assimilate: %v", err) |
| } else { |
| managed[machine] = true |
| } |
| } else { |
| klog.Errorf("Provider machine %s has no corresponding machine in BMDB.", machine) |
| badbadnotgood = true |
| } |
| } |
| |
| // Find any machines in the BMDB but not in the provider. |
| for machine, _ := range bmdb { |
| if !provider[machine] { |
| klog.Errorf("Provider device ID %s referred to in BMDB (from TODO) but missing in provider.", machine) |
| badbadnotgood = true |
| } |
| } |
| |
| // Bail if things are weird. |
| if badbadnotgood { |
| klog.Errorf("Something's very wrong. Bailing early and refusing to do any work.") |
| return fmt.Errorf("fatal discrepency between BMDB and provider") |
| } |
| |
| // Summarize all managed machines, which is the intersection of BMDB and |
| // Provisioner machines, usually both of these sets being equal. |
| nmanaged := len(managed) |
| klog.Infof("Total managed machines: %d", nmanaged) |
| |
| if p.config.MaxCount != 0 && p.config.MaxCount <= uint(nmanaged) { |
| klog.Infof("Not bringing up more machines (at limit of %d machines)", p.config.MaxCount) |
| return nil |
| } |
| |
| limitName := "no limit" |
| if p.config.MaxCount != 0 { |
| limitName = fmt.Sprintf("%d", p.config.MaxCount) |
| } |
| klog.Infof("Below managed machine limit (%s), bringing up more...", limitName) |
| klog.Infof("Retrieving hardware reservations, this will take a while...") |
| reservations, err := p.cl.ListReservations(ctx, p.sharedConfig.ProjectId) |
| if err != nil { |
| return fmt.Errorf("failed to list reservations: %w", err) |
| } |
| |
| // Collect all reservations. |
| var toProvision []packngo.HardwareReservation |
| var inUse, notProvisionable, penalized int |
| for _, reservation := range reservations { |
| if reservation.Device != nil { |
| inUse++ |
| continue |
| } |
| if !reservation.Provisionable { |
| notProvisionable++ |
| continue |
| } |
| if p.badReservations.Penalized(reservation.ID) { |
| penalized++ |
| continue |
| } |
| toProvision = append(toProvision, reservation) |
| } |
| klog.Infof("Retrieved hardware reservations: %d (total), %d (available), %d (in use), %d (not provisionable), %d (penalized)", len(reservations), len(toProvision), inUse, notProvisionable, penalized) |
| |
| // Limit them to MaxCount, if applicable. |
| if p.config.MaxCount != 0 { |
| needed := int(p.config.MaxCount) - nmanaged |
| if len(toProvision) < needed { |
| needed = len(toProvision) |
| } |
| toProvision = toProvision[:needed] |
| } |
| |
| // Limit them to an arbitrary 'chunk' size so that we don't do too many things in |
| // a single reconciliation operation. |
| if uint(len(toProvision)) > p.config.ReservationChunkSize { |
| toProvision = toProvision[:p.config.ReservationChunkSize] |
| } |
| |
| if len(toProvision) == 0 { |
| klog.Infof("No more hardware reservations available, or all filtered out.") |
| return nil |
| } |
| |
| klog.Infof("Bringing up %d machines...", len(toProvision)) |
| for _, res := range toProvision { |
| p.config.DeviceCreationLimiter.Wait(ctx) |
| if err := p.provision(ctx, sess, res); err != nil { |
| klog.Errorf("Failed to provision reservation %s: %v", res.ID, err) |
| until := time.Now().Add(time.Hour) |
| klog.Errorf("Adding hardware reservation %s to sinbin until %s", res.ID, until) |
| p.badReservations.Add(res.ID, until) |
| } |
| } |
| |
| return nil |
| } |
| |
| // provision attempts to create a device within Equinix using given Hardware |
| // Reservation rsv. The resulting device is registered with BMDB, and tagged as |
| // "provided" in the process. |
| func (pr *Provisioner) provision(ctx context.Context, sess *bmdb.Session, rsv packngo.HardwareReservation) error { |
| klog.Infof("Creating a new device using reservation ID %s.", rsv.ID) |
| hostname := pr.sharedConfig.DevicePrefix + rsv.ID[:18] |
| kid, err := pr.sharedConfig.sshEquinixId(ctx, pr.cl) |
| if err != nil { |
| return err |
| } |
| req := &packngo.DeviceCreateRequest{ |
| Hostname: hostname, |
| OS: pr.config.OS, |
| Plan: rsv.Plan.Slug, |
| ProjectID: pr.sharedConfig.ProjectId, |
| HardwareReservationID: rsv.ID, |
| ProjectSSHKeys: []string{kid}, |
| } |
| if pr.config.UseProjectKeys { |
| klog.Warningf("INSECURE: Machines will be created with ALL PROJECT SSH KEYS!") |
| req.ProjectSSHKeys = nil |
| } |
| |
| nd, err := pr.cl.CreateDevice(ctx, req) |
| if err != nil { |
| return fmt.Errorf("while creating new device within Equinix: %w", err) |
| } |
| klog.Infof("Created a new device within Equinix (RID: %s, PID: %s, HOST: %s)", rsv.ID, nd.ID, hostname) |
| |
| err = pr.assimilate(ctx, sess, nd.ID) |
| if err != nil { |
| // TODO(mateusz@monogon.tech) at this point the device at Equinix isn't |
| // matched by a BMDB record. Schedule device deletion or make sure this |
| // case is being handled elsewhere. |
| return err |
| } |
| return nil |
| } |
| |
| // assimilate brings in an already existing machine from Equinix into the BMDB. |
| // This is only used in manual testing. |
| func (pr *Provisioner) assimilate(ctx context.Context, sess *bmdb.Session, deviceID string) error { |
| return sess.Transact(ctx, func(q *model.Queries) error { |
| // Create a new machine record within BMDB. |
| m, err := q.NewMachine(ctx) |
| if err != nil { |
| return fmt.Errorf("while creating a new machine record in BMDB: %w", err) |
| } |
| |
| // Link the new machine with the Equinix device, and tag it "provided". |
| p := model.MachineAddProvidedParams{ |
| MachineID: m.MachineID, |
| ProviderID: deviceID, |
| Provider: model.ProviderEquinix, |
| } |
| klog.Infof("Setting \"provided\" tag (ID: %s, PID: %s, Provider: %s).", p.MachineID, p.ProviderID, p.Provider) |
| if err := q.MachineAddProvided(ctx, p); err != nil { |
| return fmt.Errorf("while tagging machine active: %w", err) |
| } |
| return nil |
| }) |
| } |