cloud/bmaas: implement exponential backoffs

This lets work fail with a proper exponential backoff. This is important
not just to not hammer external systems, but also to not end up with
exteremely long and verbose historical logs for repeatedly failing
processes.

To implement these, we have to slightly alter our model: instead of
always persisting backoffs for a given machine, we only persist them as
long as the last work for a given process has failed, deleting pertinent
backoffs (if any) on success. Then, the existence of a backoff item is
used to calculate the value of the next backoff.

The change also introduces and explicit period, in seconds, to the
backoff item. It is currently implemented as a nullable field, but a
future migration/update might make them non-nullable (and delete any
straggling backoffs that still don't have the period set).

Change-Id: I958fcd957dae1156349224f07fb8d4836955d375
Reviewed-on: https://review.monogon.dev/c/monogon/+/1565
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/cloud/bmaas/bmdb/sessions_test.go b/cloud/bmaas/bmdb/sessions_test.go
index f3977d5..e697150 100644
--- a/cloud/bmaas/bmdb/sessions_test.go
+++ b/cloud/bmaas/bmdb/sessions_test.go
@@ -160,6 +160,7 @@
 	}
 }
 
+// TestWorkBackoff exercises the backoff functionality within the BMDB.
 func TestWorkBackoff(t *testing.T) {
 	b := dut()
 	conn, err := b.Open(true)
@@ -192,70 +193,123 @@
 		t.Fatalf("Creating machine failed: %v", err)
 	}
 
-	// Work on machine, but fail it with a backoff.
-	work, err := session.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
-		machines, err := q.GetMachinesForAgentStart(ctx, 1)
-		if err != nil {
-			return nil, err
+	waitMachine := func(nsec int64) *Work {
+		t.Helper()
+
+		deadline := time.Now().Add(time.Duration(nsec) * 2 * time.Second)
+		for {
+			if time.Now().After(deadline) {
+				t.Fatalf("Deadline expired")
+			}
+			work, err := session.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
+				machines, err := q.GetMachinesForAgentStart(ctx, 1)
+				if err != nil {
+					return nil, err
+				}
+				if len(machines) < 1 {
+					return nil, ErrNothingToDo
+				}
+				return []uuid.UUID{machines[0].MachineID}, nil
+			})
+			if err == nil {
+				return work
+			}
+			if !errors.Is(err, ErrNothingToDo) {
+				t.Fatalf("Unexpected work error: %v", err)
+			}
+			time.Sleep(100 * time.Millisecond)
 		}
-		if len(machines) < 1 {
-			return nil, ErrNothingToDo
-		}
-		return []uuid.UUID{machines[0].MachineID}, nil
-	})
-	if err != nil {
-		t.Fatalf("Starting work failed: %v", err)
 	}
-	backoff := time.Hour
+
+	// Work on machine, but fail it with a backoff.
+	work := waitMachine(1)
+	backoff := Backoff{
+		Initial:  time.Second,
+		Maximum:  5 * time.Second,
+		Exponent: 2,
+	}
 	if err := work.Fail(ctx, &backoff, "test"); err != nil {
 		t.Fatalf("Failing work failed: %v", err)
 	}
 
-	// The machine shouldn't be returned now.
-	err = session.Transact(ctx, func(q *model.Queries) error {
-		machines, err := q.GetMachinesForAgentStart(ctx, 1)
-		if err != nil {
-			return err
-		}
-		if len(machines) > 0 {
-			t.Errorf("Expected no machines ready for agent start.")
-		}
-		return nil
-	})
-	if err != nil {
-		t.Errorf("Failed to retrieve machines for agent start: %v", err)
-	}
+	expect := func(count int) {
+		t.Helper()
 
-	// Instead of waiting for the backoff to expire, set it again, but this time
-	// make it immediate. This works because the backoff query acts as an upsert.
-	err = session.Transact(ctx, func(q *model.Queries) error {
-		return q.WorkBackoffInsert(ctx, model.WorkBackoffInsertParams{
-			MachineID: machine.MachineID,
-			Process:   model.ProcessShepherdAccess,
-			Seconds:   0,
+		var machines []model.MachineProvided
+		var err error
+		err = session.Transact(ctx, func(q *model.Queries) error {
+			machines, err = q.GetMachinesForAgentStart(ctx, 1)
+			if err != nil {
+				return err
+			}
+			return nil
 		})
-	})
-	if err != nil {
-		t.Errorf("Failed to update backoff: %v", err)
+		if err != nil {
+			t.Errorf("Failed to retrieve machines for agent start: %v", err)
+		}
+		if want, got := count, len(machines); want != got {
+			t.Errorf("Expected %d machines, got %d", want, got)
+		}
 	}
 
-	// Just in case.
-	time.Sleep(100 * time.Millisecond)
+	// The machine shouldn't be returned now.
+	expect(0)
+
+	// Wait for the backoff to expire.
+	time.Sleep(1100 * time.Millisecond)
 
 	// The machine should now be returned again.
-	err = session.Transact(ctx, func(q *model.Queries) error {
-		machines, err := q.GetMachinesForAgentStart(ctx, 1)
-		if err != nil {
+	expect(1)
+
+	// Prepare helper for checking exponential backoffs.
+	failAndCheck := func(nsec int64) {
+		t.Helper()
+		work := waitMachine(nsec)
+		if err := work.Fail(ctx, &backoff, "test"); err != nil {
+			t.Fatalf("Failing work failed: %v", err)
+		}
+
+		var backoffs []model.WorkBackoff
+		err = session.Transact(ctx, func(q *model.Queries) error {
+			var err error
+			backoffs, err = q.WorkBackoffOf(ctx, model.WorkBackoffOfParams{
+				MachineID: machine.MachineID,
+				Process:   model.ProcessShepherdAccess,
+			})
 			return err
+		})
+		if err != nil {
+			t.Errorf("Failed to retrieve machines for agent start: %v", err)
 		}
-		if len(machines) != 1 {
-			t.Errorf("Expected exactly one machine ready for agent start.")
+		if len(backoffs) < 1 {
+			t.Errorf("No backoff")
+		} else {
+			backoff := backoffs[0]
+			if want, got := nsec, backoff.LastIntervalSeconds.Int64; want != got {
+				t.Fatalf("Wanted backoff of %d seconds, got %d", want, got)
+			}
 		}
+	}
+
+	// Exercise exponential backoff functionality.
+	failAndCheck(2)
+	failAndCheck(4)
+	failAndCheck(5)
+	failAndCheck(5)
+
+	// If the job now succeeds, subsequent failures should start from 1 again.
+	work = waitMachine(5)
+	err = work.Finish(ctx, func(q *model.Queries) error {
+		// Not setting any tags that would cause subsequent queries to not return the
+		// machine anymore.
 		return nil
 	})
 	if err != nil {
-		t.Errorf("Failed to retrieve machines for agent start: %v", err)
+		t.Fatalf("Could not finish work: %v", err)
 	}
+
+	failAndCheck(1)
+	failAndCheck(2)
 }
 
 // TestAgentStartWorkflow exercises the agent start workflow within the BMDB.