cloud/bmaas: split ShepherdAccess into Shepherd{AgentStart,Recovery}

This effectively undoes our previous attempted consolidation of all
Shepherd accesses under one tag. We now use two separate tags for the
two main Shepherd work processes, and mutually exclude them in SQL.

We do this so that we can see more clearly in work history (and in
general when processing machines) what the Shepherd is actually trying
to do to a machine.

The downside of this implementation is that we now extend the BMDB/ETP
model to being able to mutually exclude different processes. This is
easy enough to express in SQL, but might make future generic modelling
more difficult.

An alternative would be to add an extra field to work/work history that
acts as an informative field for operators to know the details of a work
item. We might still want to do that in the future. However, that field
being freeform, we could not really rely on it for machine parsing.

Change-Id: I9578ac000f6112514fe587e9fddf7e85671c6437
Reviewed-on: https://review.monogon.dev/c/monogon/+/1584
Reviewed-by: Leopold Schabel <leo@monogon.tech>
Tested-by: Jenkins CI
diff --git a/cloud/bmaas/bmdb/model/migrations/1681980576_extra_shepherd_processes.down.sql b/cloud/bmaas/bmdb/model/migrations/1681980576_extra_shepherd_processes.down.sql
new file mode 100644
index 0000000..e01997f
--- /dev/null
+++ b/cloud/bmaas/bmdb/model/migrations/1681980576_extra_shepherd_processes.down.sql
@@ -0,0 +1,2 @@
+-- Not removing added enum values, as the 'up' migration has 'if not exists',
+-- and there is no harm in keeping unused enum values around.
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/migrations/1681980576_extra_shepherd_processes.up.sql b/cloud/bmaas/bmdb/model/migrations/1681980576_extra_shepherd_processes.up.sql
new file mode 100644
index 0000000..6e7c630
--- /dev/null
+++ b/cloud/bmaas/bmdb/model/migrations/1681980576_extra_shepherd_processes.up.sql
@@ -0,0 +1,6 @@
+-- Add two more process kinds, ShepherdAgentStart and ShepherdRecovery, for agent
+-- start and recovery by the shepherd respectively. These deprecate the previous
+-- ShepherdAccess process. The two processes mutually exclude each other.
+
+ALTER TYPE process ADD VALUE IF NOT EXISTS 'ShepherdAgentStart';
+ALTER TYPE process ADD VALUE IF NOT EXISTS 'ShepherdRecovery';
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/queries_workflows.sql b/cloud/bmaas/bmdb/model/queries_workflows.sql
index 8f22f41..07ebf77 100644
--- a/cloud/bmaas/bmdb/model/queries_workflows.sql
+++ b/cloud/bmaas/bmdb/model/queries_workflows.sql
@@ -12,8 +12,8 @@
     machine_provided.*
 FROM machines
 INNER JOIN machine_provided ON machines.machine_id = machine_provided.machine_id
-LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'ShepherdAccess'
-LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id AND work_backoff.until > now() AND work_backoff.process = 'ShepherdAccess'
+LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process IN ('ShepherdAccess', 'ShepherdAgentStart', 'ShepherdRecovery')
+LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id AND work_backoff.until > now() AND work_backoff.process = 'ShepherdAgentStart'
 LEFT JOIN machine_agent_started ON machines.machine_id = machine_agent_started.machine_id
 WHERE
   machine_agent_started.machine_id IS NULL
@@ -33,8 +33,8 @@
     machine_provided.*
 FROM machines
 INNER JOIN machine_provided ON machines.machine_id = machine_provided.machine_id
-LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'ShepherdAccess'
-LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id AND work_backoff.until > now() AND work_backoff.process = 'ShepherdAccess'
+LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process IN ('ShepherdAccess', 'ShepherdAgentStart', 'ShepherdRecovery')
+LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id AND work_backoff.until > now() AND work_backoff.process = 'ShepherdRecovery'
 LEFT JOIN machine_agent_started ON machines.machine_id = machine_agent_started.machine_id
 LEFT JOIN machine_agent_heartbeat ON machines.machine_id = machine_agent_heartbeat.machine_id
 WHERE
diff --git a/cloud/bmaas/bmdb/sessions_test.go b/cloud/bmaas/bmdb/sessions_test.go
index e697150..b8625a9 100644
--- a/cloud/bmaas/bmdb/sessions_test.go
+++ b/cloud/bmaas/bmdb/sessions_test.go
@@ -201,7 +201,7 @@
 			if time.Now().After(deadline) {
 				t.Fatalf("Deadline expired")
 			}
-			work, err := session.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
+			work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
 				machines, err := q.GetMachinesForAgentStart(ctx, 1)
 				if err != nil {
 					return nil, err
@@ -274,7 +274,7 @@
 			var err error
 			backoffs, err = q.WorkBackoffOf(ctx, model.WorkBackoffOfParams{
 				MachineID: machine.MachineID,
-				Process:   model.ProcessShepherdAccess,
+				Process:   model.ProcessShepherdAgentStart,
 			})
 			return err
 		})
@@ -350,7 +350,7 @@
 	doneC := make(chan struct{})
 	errC := make(chan error)
 	go func() {
-		work, err := session.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
+		work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
 			machines, err := q.GetMachinesForAgentStart(ctx, 1)
 			if err != nil {
 				return nil, err
@@ -384,8 +384,11 @@
 		errC <- err
 	}()
 	<-startedC
+
 	// Work on the machine has started. Attempting to get more machines now should
 	// return no machines.
+
+	// Mutual exclusion with AgentStart:
 	err = session.Transact(ctx, func(q *model.Queries) error {
 		machines, err := q.GetMachinesForAgentStart(ctx, 1)
 		if err != nil {
@@ -399,6 +402,22 @@
 	if err != nil {
 		t.Fatalf("Failed to retrieve machines for start in parallel: %v", err)
 	}
+
+	// Mutual exclusion with Recovery:
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		machines, err := q.GetMachineForAgentRecovery(ctx, 1)
+		if err != nil {
+			return err
+		}
+		if len(machines) > 0 {
+			t.Errorf("Expected no machines ready for agent recovery.")
+		}
+		return nil
+	})
+	if err != nil {
+		t.Fatalf("Failed to retrieve machines for recovery in parallel: %v", err)
+	}
+
 	// Finish working on machine.
 	close(doneC)
 	err = <-errC
@@ -446,7 +465,7 @@
 		if want, got := machine, el.MachineID; want.String() != got.String() {
 			t.Errorf("Wanted %d history event machine ID to be %s, got %s", i, want, got)
 		}
-		if want, got := model.ProcessShepherdAccess, el.Process; want != got {
+		if want, got := model.ProcessShepherdAgentStart, el.Process; want != got {
 			t.Errorf("Wanted %d history event process to be %s, got %s", i, want, got)
 		}
 	}
@@ -498,7 +517,7 @@
 	})
 
 	workOnce := func(ctx context.Context, workerID int, session *Session) error {
-		work, err := session.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
+		work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
 			machines, err := q.GetMachinesForAgentStart(ctx, 1)
 			if err != nil {
 				return nil, err
diff --git a/cloud/shepherd/equinix/manager/control_loop.go b/cloud/shepherd/equinix/manager/control_loop.go
index aa4138e..4896c15 100644
--- a/cloud/shepherd/equinix/manager/control_loop.go
+++ b/cloud/shepherd/equinix/manager/control_loop.go
@@ -26,12 +26,17 @@
 	// work is a machine lock facilitated by BMDB that prevents machines from
 	// being processed by multiple workers at the same time.
 	work *bmdb.Work
+	// backoff is configured from processInfo.defaultBackoff but can be overridden by
+	// processMachine to set a different backoff policy for specific failure modes.
+	backoff bmdb.Backoff
 }
 
 // controlLoop is implemented by any component which should act as a BMDB-based
 // control loop. Implementing these methods allows the given component to be
 // started using RunControlLoop.
 type controlLoop interface {
+	getProcessInfo() processInfo
+
 	// getMachines must return the list of machines ready to be processed by the
 	// control loop for a given control loop implementation.
 	getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error)
@@ -45,6 +50,11 @@
 	getControlLoopConfig() *ControlLoopConfig
 }
 
+type processInfo struct {
+	process        model.Process
+	defaultBackoff bmdb.Backoff
+}
+
 // ControlLoopConfig should be embedded the every component which acts as a
 // control loop. RegisterFlags should be called by the component whenever it is
 // registering its own flags. Check should be called whenever the component is
@@ -135,10 +145,12 @@
 // run the control loops(s) (depending on opts.Parallelism) blocking the current
 // goroutine until the given context expires and all provisioners quit.
 func (r *controlLoopRunner) run(ctx context.Context, conn *bmdb.Connection) error {
+	pinfo := r.loop.getProcessInfo()
+
 	eg := errgroup.Group{}
 	for j := 0; j < r.config.Parallelism; j += 1 {
 		eg.Go(func() error {
-			return r.runOne(ctx, conn)
+			return r.runOne(ctx, conn, &pinfo)
 		})
 	}
 	return eg.Wait()
@@ -146,7 +158,7 @@
 
 // run the control loop blocking the current goroutine until the given context
 // expires.
-func (r *controlLoopRunner) runOne(ctx context.Context, conn *bmdb.Connection) error {
+func (r *controlLoopRunner) runOne(ctx context.Context, conn *bmdb.Connection, pinfo *processInfo) error {
 	var err error
 
 	// Maintain a BMDB session as long as possible.
@@ -159,7 +171,7 @@
 			}
 		}
 		// Inside that session, run the main logic.
-		err := r.runInSession(ctx, sess)
+		err := r.runInSession(ctx, sess, pinfo)
 
 		switch {
 		case err == nil:
@@ -180,8 +192,8 @@
 // runInSession executes one iteration of the control loop within a BMDB session.
 // This control loop attempts to start or re-start the agent on any machines that
 // need this per the BMDB.
-func (r *controlLoopRunner) runInSession(ctx context.Context, sess *bmdb.Session) error {
-	t, err := r.source(ctx, sess)
+func (r *controlLoopRunner) runInSession(ctx context.Context, sess *bmdb.Session, pinfo *processInfo) error {
+	t, err := r.source(ctx, sess, pinfo)
 	if err != nil {
 		return fmt.Errorf("could not source machine: %w", err)
 	}
@@ -192,12 +204,7 @@
 
 	if err := r.loop.processMachine(ctx, t); err != nil {
 		klog.Errorf("Failed to process machine %s: %v", t.machine.MachineID, err)
-		backoff := bmdb.Backoff{
-			Initial:  time.Minute,
-			Maximum:  2 * time.Hour,
-			Exponent: 1.1,
-		}
-		err = t.work.Fail(ctx, &backoff, fmt.Sprintf("failed to process: %v", err))
+		err = t.work.Fail(ctx, &t.backoff, fmt.Sprintf("failed to process: %v", err))
 		return err
 	}
 	return nil
@@ -207,11 +214,11 @@
 // control loop, locked by a work item. If both task and error are nil, then
 // there are no machines needed to be initialized. The returned work item in task
 // _must_ be canceled or finished by the caller.
-func (r *controlLoopRunner) source(ctx context.Context, sess *bmdb.Session) (*task, error) {
+func (r *controlLoopRunner) source(ctx context.Context, sess *bmdb.Session, pinfo *processInfo) (*task, error) {
 	r.config.DBQueryLimiter.Wait(ctx)
 
 	var machine *model.MachineProvided
-	work, err := sess.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
+	work, err := sess.Work(ctx, pinfo.process, func(q *model.Queries) ([]uuid.UUID, error) {
 		machines, err := r.loop.getMachines(ctx, q, 1)
 		if err != nil {
 			return nil, err
@@ -234,5 +241,6 @@
 	return &task{
 		machine: machine,
 		work:    work,
+		backoff: pinfo.defaultBackoff,
 	}, nil
 }
diff --git a/cloud/shepherd/equinix/manager/initializer.go b/cloud/shepherd/equinix/manager/initializer.go
index 95f9d39..c90f8d8 100644
--- a/cloud/shepherd/equinix/manager/initializer.go
+++ b/cloud/shepherd/equinix/manager/initializer.go
@@ -20,6 +20,7 @@
 	"k8s.io/klog/v2"
 
 	apb "source.monogon.dev/cloud/agent/api"
+	"source.monogon.dev/cloud/bmaas/bmdb"
 	"source.monogon.dev/cloud/bmaas/bmdb/model"
 	ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
 )
@@ -147,6 +148,17 @@
 	}, nil
 }
 
+func (c *Initializer) getProcessInfo() processInfo {
+	return processInfo{
+		process: model.ProcessShepherdAgentStart,
+		defaultBackoff: bmdb.Backoff{
+			Initial:  5 * time.Minute,
+			Maximum:  4 * time.Hour,
+			Exponent: 1.2,
+		},
+	}
+}
+
 func (c *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
 	return q.GetMachinesForAgentStart(ctx, limit)
 }
diff --git a/cloud/shepherd/equinix/manager/recoverer.go b/cloud/shepherd/equinix/manager/recoverer.go
index 4ec73af..85ec440 100644
--- a/cloud/shepherd/equinix/manager/recoverer.go
+++ b/cloud/shepherd/equinix/manager/recoverer.go
@@ -8,6 +8,7 @@
 
 	"k8s.io/klog/v2"
 
+	"source.monogon.dev/cloud/bmaas/bmdb"
 	"source.monogon.dev/cloud/bmaas/bmdb/model"
 	ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
 )
@@ -40,6 +41,17 @@
 	}, nil
 }
 
+func (r *Recoverer) getProcessInfo() processInfo {
+	return processInfo{
+		process: model.ProcessShepherdRecovery,
+		defaultBackoff: bmdb.Backoff{
+			Initial:  1 * time.Minute,
+			Maximum:  1 * time.Hour,
+			Exponent: 1.2,
+		},
+	}
+}
+
 func (r *Recoverer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
 	return q.GetMachineForAgentRecovery(ctx, limit)
 }