c/bmaas/bmdb: implement backoff and history

This augments the existing Work mechanism with a Fail outcome/method
which allows insertion of a machine & process backoff until a deadline
expires.

We also add a history/audit table which contains information about the
work history of a machine - when some work started, finished, failed or
got cancelled.

Change-Id: If890a412977c1d3c7ff3baa69987fb74932818a0
Reviewed-on: https://review.monogon.dev/c/monogon/+/1086
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/cloud/bmaas/bmdb/sessions_test.go b/cloud/bmaas/bmdb/sessions_test.go
index 9664c17..097c79e 100644
--- a/cloud/bmaas/bmdb/sessions_test.go
+++ b/cloud/bmaas/bmdb/sessions_test.go
@@ -160,6 +160,104 @@
 	}
 }
 
+func TestWorkBackoff(t *testing.T) {
+	b := dut()
+	conn, err := b.Open(true)
+	if err != nil {
+		t.Fatalf("Open failed: %v", err)
+	}
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	session, err := conn.StartSession(ctx)
+	if err != nil {
+		t.Fatalf("Starting session failed: %v", err)
+	}
+
+	var machine model.Machine
+	// Create machine.
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		machine, err = q.NewMachine(ctx)
+		if err != nil {
+			return err
+		}
+		return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+			MachineID:  machine.MachineID,
+			Provider:   model.ProviderEquinix,
+			ProviderID: "123",
+		})
+	})
+	if err != nil {
+		t.Fatalf("Creating machine failed: %v", err)
+	}
+
+	// Work on machine, but fail it with a backoff.
+	work, err := session.Work(ctx, model.ProcessShepherdInstall, func(q *model.Queries) ([]uuid.UUID, error) {
+		machines, err := q.GetMachinesForAgentStart(ctx, 1)
+		if err != nil {
+			return nil, err
+		}
+		if len(machines) < 1 {
+			return nil, ErrNothingToDo
+		}
+		return []uuid.UUID{machines[0].MachineID}, nil
+	})
+	if err != nil {
+		t.Fatalf("Starting work failed: %v", err)
+	}
+	backoff := time.Hour
+	if err := work.Fail(ctx, &backoff, "test"); err != nil {
+		t.Fatalf("Failing work failed: %v", err)
+	}
+
+	// The machine shouldn't be returned now.
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		machines, err := q.GetMachinesForAgentStart(ctx, 1)
+		if err != nil {
+			return err
+		}
+		if len(machines) > 0 {
+			t.Errorf("Expected no machines ready for installation.")
+		}
+		return nil
+	})
+	if err != nil {
+		t.Errorf("Failed to retrieve machines for installation: %v", err)
+	}
+
+	// Instead of waiting for the backoff to expire, set it again, but this time
+	// make it immediate. This works because the backoff query acts as an upsert.
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		return q.WorkBackoffInsert(ctx, model.WorkBackoffInsertParams{
+			MachineID: machine.MachineID,
+			Process:   model.ProcessShepherdInstall,
+			Seconds:   0,
+		})
+	})
+	if err != nil {
+		t.Errorf("Failed to update backoff: %v", err)
+	}
+
+	// Just in case.
+	time.Sleep(100 * time.Millisecond)
+
+	// The machine should now be returned again.
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		machines, err := q.GetMachinesForAgentStart(ctx, 1)
+		if err != nil {
+			return err
+		}
+		if len(machines) != 1 {
+			t.Errorf("Expected exactly one machine ready for installation.")
+		}
+		return nil
+	})
+	if err != nil {
+		t.Errorf("Failed to retrieve machines for installation: %v", err)
+	}
+}
+
 // TestInstallationWorkflow exercises the agent installation workflow within the
 // BMDB.
 func TestInstallationWorkflow(t *testing.T) {
@@ -194,6 +292,7 @@
 	}
 
 	// Start working on a machine.
+	var machine uuid.UUID
 	startedC := make(chan struct{})
 	doneC := make(chan struct{})
 	errC := make(chan error)
@@ -206,6 +305,7 @@
 			if len(machines) < 1 {
 				return nil, ErrNothingToDo
 			}
+			machine = machines[0].MachineID
 			return []uuid.UUID{machines[0].MachineID}, nil
 		})
 		defer work.Cancel(ctx)
@@ -218,6 +318,7 @@
 
 		// Simulate work by blocking on a channel.
 		close(startedC)
+
 		<-doneC
 
 		err = work.Finish(ctx, func(q *model.Queries) error {
@@ -243,13 +344,13 @@
 		return nil
 	})
 	if err != nil {
-		t.Errorf("Failed to retrieve machines for installation in parallel: %v", err)
+		t.Fatalf("Failed to retrieve machines for installation in parallel: %v", err)
 	}
 	// Finish working on machine.
 	close(doneC)
 	err = <-errC
 	if err != nil {
-		t.Errorf("Failed to finish work on machine: %v", err)
+		t.Fatalf("Failed to finish work on machine: %v", err)
 	}
 	// That machine is now installed, so we still expect no work to have to be done.
 	err = session.Transact(ctx, func(q *model.Queries) error {
@@ -263,7 +364,37 @@
 		return nil
 	})
 	if err != nil {
-		t.Errorf("Failed to retrieve machines for installation after work finished: %v", err)
+		t.Fatalf("Failed to retrieve machines for installation after work finished: %v", err)
+	}
+
+	// Check history has been recorded.
+	var history []model.WorkHistory
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		history, err = q.ListHistoryOf(ctx, machine)
+		return err
+	})
+	if err != nil {
+		t.Fatalf("Failed to retrieve machine history: %v", err)
+	}
+	// Expect two history items: started and finished.
+	if want, got := 2, len(history); want != got {
+		t.Errorf("Wanted %d history items, got %d", want, got)
+	} else {
+		if want, got := model.WorkHistoryEventStarted, history[0].Event; want != got {
+			t.Errorf("Wanted first history event to be %s, got %s", want, got)
+		}
+		if want, got := model.WorkHistoryEventFinished, history[1].Event; want != got {
+			t.Errorf("Wanted second history event to be %s, got %s", want, got)
+		}
+	}
+	// Check all other history event data.
+	for i, el := range history {
+		if want, got := machine, el.MachineID; want.String() != got.String() {
+			t.Errorf("Wanted %d history event machine ID to be %s, got %s", i, want, got)
+		}
+		if want, got := model.ProcessShepherdInstall, el.Process; want != got {
+			t.Errorf("Wanted %d history event process to be %s, got %s", i, want, got)
+		}
 	}
 }