cloud: split shepherd up

Change-Id: I8e386d9eaaf17543743e1e8a37a8d71426910d59
Reviewed-on: https://review.monogon.dev/c/monogon/+/2213
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/cloud/shepherd/manager/provisioner.go b/cloud/shepherd/manager/provisioner.go
new file mode 100644
index 0000000..a77f241
--- /dev/null
+++ b/cloud/shepherd/manager/provisioner.go
@@ -0,0 +1,392 @@
+package manager
+
+import (
+	"context"
+	"errors"
+	"flag"
+	"fmt"
+	"net/netip"
+	"sort"
+	"time"
+
+	"github.com/google/uuid"
+	"golang.org/x/time/rate"
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/metrics"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	"source.monogon.dev/cloud/shepherd"
+	"source.monogon.dev/go/mflags"
+)
+
+// Provisioner implements the server provisioning logic. Provisioning entails
+// bringing all available machines (subject to limits) into BMDB.
+type Provisioner struct {
+	ProvisionerConfig
+	p shepherd.Provider
+}
+
+// ProvisionerConfig configures the provisioning process.
+type ProvisionerConfig struct {
+	// MaxCount is the maximum count of managed servers. No new devices will be
+	// created after reaching the limit. No attempt will be made to reduce the
+	// server count.
+	MaxCount uint
+
+	// ReconcileLoopLimiter limits the rate of the main reconciliation loop
+	// iterating.
+	ReconcileLoopLimiter *rate.Limiter
+
+	// DeviceCreation limits the rate at which devices are created.
+	DeviceCreationLimiter *rate.Limiter
+
+	// ChunkSize is how many machines will try to be spawned in a
+	// single reconciliation loop. Higher numbers allow for faster initial
+	// provisioning, but lower numbers decrease potential raciness with other systems
+	// and make sure that other parts of the reconciliation logic are ran regularly.
+	//
+	// 20 is decent starting point.
+	ChunkSize uint
+}
+
+func (pc *ProvisionerConfig) RegisterFlags() {
+	flag.UintVar(&pc.MaxCount, "provisioner_max_machines", 50, "Limit of machines that the provisioner will attempt to pull into the BMDB. Zero for no limit.")
+	mflags.Limiter(&pc.ReconcileLoopLimiter, "provisioner_reconciler_rate", "1m,1", "Rate limiting for main provisioner reconciliation loop")
+	mflags.Limiter(&pc.DeviceCreationLimiter, "provisioner_device_creation_rate", "5s,1", "Rate limiting for machine creation")
+	flag.UintVar(&pc.ChunkSize, "provisioner_reservation_chunk_size", 20, "How many machines will the provisioner attempt to create in a single reconciliation loop iteration")
+}
+
+func (pc *ProvisionerConfig) check() error {
+	// If these are unset, it's probably because someone is using us as a library.
+	// Provide error messages useful to code users instead of flag names.
+	if pc.ReconcileLoopLimiter == nil {
+		return fmt.Errorf("ReconcileLoopLimiter must be set")
+	}
+	if pc.DeviceCreationLimiter == nil {
+		return fmt.Errorf("DeviceCreationLimiter must be set")
+	}
+	if pc.ChunkSize == 0 {
+		return fmt.Errorf("ChunkSize must be set")
+	}
+	return nil
+}
+
+// NewProvisioner creates a Provisioner instance, checking ProvisionerConfig and
+// providerConfig for errors.
+func NewProvisioner(p shepherd.Provider, pc ProvisionerConfig) (*Provisioner, error) {
+	if err := pc.check(); err != nil {
+		return nil, err
+	}
+
+	return &Provisioner{
+		ProvisionerConfig: pc,
+		p:                 p,
+	}, nil
+}
+
+// Run the provisioner blocking the current goroutine until the given context
+// expires.
+func (p *Provisioner) Run(ctx context.Context, conn *bmdb.Connection) error {
+
+	var sess *bmdb.Session
+	var err error
+	for {
+		if sess == nil {
+			sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdProvisioner})
+			if err != nil {
+				return fmt.Errorf("could not start BMDB session: %w", err)
+			}
+		}
+		err = p.runInSession(ctx, sess)
+
+		switch {
+		case err == nil:
+		case errors.Is(err, ctx.Err()):
+			return err
+		case errors.Is(err, bmdb.ErrSessionExpired):
+			klog.Errorf("Session expired, restarting...")
+			sess = nil
+			time.Sleep(time.Second)
+		case err != nil:
+			klog.Errorf("Processing failed: %v", err)
+			// TODO(q3k): close session
+			time.Sleep(time.Second)
+		}
+	}
+}
+
+type machineListing struct {
+	machines []shepherd.Machine
+	err      error
+}
+
+// runInSession executes one iteration of the provisioner's control loop within a
+// BMDB session. This control loop attempts to bring all capacity into machines in
+// the BMDB, subject to limits.
+func (p *Provisioner) runInSession(ctx context.Context, sess *bmdb.Session) error {
+	if err := p.ReconcileLoopLimiter.Wait(ctx); err != nil {
+		return err
+	}
+
+	providerC := make(chan *machineListing, 1)
+	bmdbC := make(chan *machineListing, 1)
+
+	klog.Infof("Getting provider and bmdb machines...")
+
+	// Make sub-context for two parallel operations, and so that we can cancel one
+	// immediately if the other fails.
+	subCtx, subCtxC := context.WithCancel(ctx)
+	defer subCtxC()
+
+	go func() {
+		machines, err := p.listInProvider(subCtx)
+		providerC <- &machineListing{
+			machines: machines,
+			err:      err,
+		}
+	}()
+	go func() {
+		machines, err := p.listInBMDB(subCtx, sess)
+		bmdbC <- &machineListing{
+			machines: machines,
+			err:      err,
+		}
+	}()
+	var inProvider, inBMDB *machineListing
+	for {
+		select {
+		case inProvider = <-providerC:
+			if err := inProvider.err; err != nil {
+				return fmt.Errorf("listing provider machines failed: %w", err)
+			}
+			klog.Infof("Got %d machines in provider.", len(inProvider.machines))
+		case inBMDB = <-bmdbC:
+			if err := inBMDB.err; err != nil {
+				return fmt.Errorf("listing BMDB machines failed: %w", err)
+			}
+			klog.Infof("Got %d machines in BMDB.", len(inBMDB.machines))
+		}
+		if inProvider != nil && inBMDB != nil {
+			break
+		}
+	}
+
+	subCtxC()
+	if err := p.reconcile(ctx, sess, inProvider.machines, inBMDB.machines); err != nil {
+		return fmt.Errorf("reconciliation failed: %w", err)
+	}
+	return nil
+}
+
+// listInProviders returns all machines that the provider thinks we should be
+// managing.
+func (p *Provisioner) listInProvider(ctx context.Context) ([]shepherd.Machine, error) {
+	machines, err := p.p.ListMachines(ctx)
+	if err != nil {
+		return nil, fmt.Errorf("while fetching managed machines: %w", err)
+	}
+	sort.Slice(machines, func(i, j int) bool {
+		return machines[i].ID() < machines[j].ID()
+	})
+	return machines, nil
+}
+
+type providedMachine struct {
+	model.MachineProvided
+}
+
+func (p providedMachine) ID() shepherd.ProviderID {
+	return shepherd.ProviderID(p.ProviderID)
+}
+
+func (p providedMachine) Addr() netip.Addr {
+	if !p.ProviderIpAddress.Valid {
+		return netip.Addr{}
+	}
+
+	addr, err := netip.ParseAddr(p.ProviderIpAddress.String)
+	if err != nil {
+		return netip.Addr{}
+	}
+	return addr
+}
+
+func (p providedMachine) State() shepherd.State {
+	return shepherd.StateKnownUsed
+}
+
+// listInBMDB returns all the machines that the BMDB thinks we should be managing.
+func (p *Provisioner) listInBMDB(ctx context.Context, sess *bmdb.Session) ([]shepherd.Machine, error) {
+	var res []shepherd.Machine
+	err := sess.Transact(ctx, func(q *model.Queries) error {
+		machines, err := q.GetProvidedMachines(ctx, p.p.Type())
+		if err != nil {
+			return err
+		}
+		res = make([]shepherd.Machine, 0, len(machines))
+		for _, machine := range machines {
+			_, err := uuid.Parse(machine.ProviderID)
+			if err != nil {
+				klog.Errorf("BMDB machine %s has unparseable provider ID %q", machine.MachineID, machine.ProviderID)
+				continue
+			}
+
+			res = append(res, providedMachine{machine})
+		}
+		return nil
+	})
+	if err != nil {
+		return nil, err
+	}
+	sort.Slice(res, func(i, j int) bool {
+		return res[i].ID() < res[j].ID()
+	})
+	return res, nil
+}
+
+// resolvePossiblyUsed checks if the state is set to possibly used and finds out
+// which state is the correct one.
+func (p *Provisioner) resolvePossiblyUsed(machine shepherd.Machine, providedMachines map[shepherd.ProviderID]bool) shepherd.State {
+	state, id := machine.State(), machine.ID()
+
+	// Bail out if this isn't a possibly used state.
+	if state != shepherd.StatePossiblyUsed {
+		return state
+	}
+
+	// If a machine does not have a valid id, its always seen as unused.
+	if !id.IsValid() {
+		return shepherd.StateKnownUnused
+	}
+
+	// If the machine is not inside the bmdb, it's seen as unused.
+	if _, ok := providedMachines[id]; !ok {
+		return shepherd.StateKnownUnused
+	}
+
+	return shepherd.StateKnownUsed
+}
+
+// reconcile takes a list of machines that the provider thinks we should be
+// managing and that the BMDB thinks we should be managing, and tries to make
+// sense of that. First, some checks are performed across the two lists to make
+// sure we haven't dropped anything. Then, additional machines are deployed from
+// hardware reservations as needed.
+func (p *Provisioner) reconcile(ctx context.Context, sess *bmdb.Session, inProvider, bmdbMachines []shepherd.Machine) error {
+	klog.Infof("Reconciling...")
+
+	bmdb := make(map[shepherd.ProviderID]bool)
+	for _, machine := range bmdbMachines {
+		// Dont check the state here as its hardcoded to be known used.
+		bmdb[machine.ID()] = true
+	}
+
+	var availableMachines []shepherd.Machine
+	provider := make(map[shepherd.ProviderID]bool)
+	for _, machine := range inProvider {
+		state := p.resolvePossiblyUsed(machine, bmdb)
+
+		switch state {
+		case shepherd.StateKnownUnused:
+			availableMachines = append(availableMachines, machine)
+
+		case shepherd.StateKnownUsed:
+			provider[machine.ID()] = true
+
+		default:
+			return fmt.Errorf("machine has invalid state (ID: %s, Addr: %s): %s", machine.ID(), machine.Addr(), state)
+		}
+	}
+
+	managed := make(map[shepherd.ProviderID]bool)
+
+	// Some desynchronization between the BMDB and Provider point of view might be so
+	// bad we shouldn't attempt to do any work, at least not any time soon.
+	badbadnotgood := false
+
+	// Find any machines supposedly managed by us in the provider, but not in the
+	// BMDB.
+	for machine, _ := range provider {
+		if bmdb[machine] {
+			managed[machine] = true
+			continue
+		}
+		klog.Errorf("Provider machine %s has no corresponding machine in BMDB.", machine)
+		badbadnotgood = true
+	}
+
+	// Find any machines in the BMDB but not in the provider.
+	for machine, _ := range bmdb {
+		if !provider[machine] {
+			klog.Errorf("Provider device ID %s referred to in BMDB (from TODO) but missing in provider.", machine)
+			badbadnotgood = true
+		}
+	}
+
+	// Bail if things are weird.
+	if badbadnotgood {
+		klog.Errorf("Something's very wrong. Bailing early and refusing to do any work.")
+		return fmt.Errorf("fatal discrepency between BMDB and provider")
+	}
+
+	// Summarize all managed machines, which is the intersection of BMDB and
+	// Provisioner machines, usually both of these sets being equal.
+	nmanaged := len(managed)
+	klog.Infof("Total managed machines: %d", nmanaged)
+
+	if p.MaxCount != 0 && p.MaxCount <= uint(nmanaged) {
+		klog.Infof("Not bringing up more machines (at limit of %d machines)", p.MaxCount)
+		return nil
+	}
+
+	limitName := "no limit"
+	if p.MaxCount != 0 {
+		limitName = fmt.Sprintf("%d", p.MaxCount)
+	}
+	klog.Infof("Below managed machine limit (%s), bringing up more...", limitName)
+
+	if len(availableMachines) == 0 {
+		klog.Infof("No more capacity available.")
+		return nil
+	}
+
+	toProvision := availableMachines
+	// Limit them to MaxCount, if applicable.
+	if p.MaxCount != 0 {
+		needed := int(p.MaxCount) - nmanaged
+		if len(toProvision) < needed {
+			needed = len(toProvision)
+		}
+		toProvision = toProvision[:needed]
+	}
+
+	// Limit them to an arbitrary 'chunk' size so that we don't do too many things in
+	// a single reconciliation operation.
+	if uint(len(toProvision)) > p.ChunkSize {
+		toProvision = toProvision[:p.ChunkSize]
+	}
+
+	if len(toProvision) == 0 {
+		klog.Infof("No more unused machines available, or all filtered out.")
+		return nil
+	}
+
+	klog.Infof("Bringing up %d machines...", len(toProvision))
+	for _, machine := range toProvision {
+		if err := p.DeviceCreationLimiter.Wait(ctx); err != nil {
+			return err
+		}
+
+		nd, err := p.p.CreateMachine(ctx, sess, shepherd.CreateMachineRequest{
+			UnusedMachine: machine,
+		})
+		if err != nil {
+			klog.Errorf("while creating new device (ID: %s, Addr: %s, State: %s): %w", machine.ID(), machine.Addr(), machine.State(), err)
+			continue
+		}
+		klog.Infof("Created new machine with ID: %s", nd.ID())
+	}
+
+	return nil
+}