c/bmaas/bmdb: implement backoff and history

This augments the existing Work mechanism with a Fail outcome/method
which allows insertion of a machine & process backoff until a deadline
expires.

We also add a history/audit table which contains information about the
work history of a machine - when some work started, finished, failed or
got cancelled.

Change-Id: If890a412977c1d3c7ff3baa69987fb74932818a0
Reviewed-on: https://review.monogon.dev/c/monogon/+/1086
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.down.sql b/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.down.sql
new file mode 100644
index 0000000..2aa3ced
--- /dev/null
+++ b/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.down.sql
@@ -0,0 +1,3 @@
+DROP TABLE work_backoff;
+DROP TABLE work_history;
+DROP TYPE work_history_event;
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.up.sql b/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.up.sql
new file mode 100644
index 0000000..24f90e5
--- /dev/null
+++ b/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.up.sql
@@ -0,0 +1,53 @@
+CREATE TYPE work_history_event AS ENUM (
+    'Started',
+    'Finished',
+    'Failed',
+    'Canceled'
+);
+
+-- Audit trail of work history for a given machine.
+CREATE TABLE work_history(
+    -- The machine subject to this audit entry. As we want to allow keeping
+    -- information about deleted machines, this is not a foreign key.
+    machine_id UUID NOT NULL,
+
+    -- TODO(q3k): session history?
+
+    -- Process acting on this machine which caused an audit entry to be created.
+    process process NOT NULL,
+    -- Process lifecycle event (started, finished, etc) that caused this audit
+    -- entry to be created.
+    event work_history_event NOT NULL,
+    -- Time at which this entry was created.
+    timestamp TIMESTAMPTZ NOT NULL,
+
+    -- Failure cause, only set when event == Failed.
+    failed_cause STRING
+);
+
+CREATE INDEX ON work_history (machine_id);
+
+-- Backoff entries are created by failed work items, and effectively act as
+-- a Lockout-tagout entry for a given machine and a given process.
+--
+-- Currently, there is no way to fully backoff an entire machine, just
+-- individual processes from a given machine.
+--
+-- Backoff entries are only valid as long as 'until' is before now(), after that
+-- they are ignored by workflow queries. Insertion queries act as upserts,
+-- and thus the backoff entries do not need to be garbage collected, as they do
+-- not grow unbounded (maximumum one entry per process/machine).
+CREATE TABLE work_backoff(
+    -- The machine affected by this backoff.
+    machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE CASCADE,
+    -- The process that this machine should not be subjected to.
+    process process NOT NULL,
+    -- Until when the backoff is enforced.
+    until TIMESTAMPTZ NOT NULL,
+
+    -- Error reported by process/work when this backoff was inserted.
+    -- Human-readable.
+    cause STRING NOT NULL,
+
+    UNIQUE(machine_id, process)
+);
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/queries_base.sql b/cloud/bmaas/bmdb/model/queries_base.sql
index 60d1737..8257fb8 100644
--- a/cloud/bmaas/bmdb/model/queries_base.sql
+++ b/cloud/bmaas/bmdb/model/queries_base.sql
@@ -42,3 +42,29 @@
   AND session_id = $2
   AND process = $3;
 
+
+-- name: WorkHistoryInsert :exec
+-- Insert an entry into the work_history audit table.
+INSERT INTO work_history (
+    machine_id, process, event, timestamp, failed_cause
+) VALUES (
+    $1, $2, $3, now(), $4
+);
+
+-- name: WorkBackoffInsert :exec
+-- Upsert a backoff for a given machine/process.
+INSERT INTO work_backoff (
+    machine_id, process, cause, until
+) VALUES (
+    $1, $2, $3, now() + (sqlc.arg(seconds)::int * interval '1 second')
+) ON CONFLICT (machine_id, process) DO UPDATE SET
+    cause = $3,
+    until = now() + (sqlc.arg(seconds)::int * interval '1 second')
+;
+
+-- name: ListHistoryOf :many
+-- Retrieve full audit history of a machine.
+SELECT *
+FROM work_history
+WHERE machine_id = $1
+ORDER BY timestamp ASC;
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/queries_workflows.sql b/cloud/bmaas/bmdb/model/queries_workflows.sql
index d2e9966..b7b60fe 100644
--- a/cloud/bmaas/bmdb/model/queries_workflows.sql
+++ b/cloud/bmaas/bmdb/model/queries_workflows.sql
@@ -6,12 +6,14 @@
 FROM machines
 INNER JOIN machine_provided ON machines.machine_id = machine_provided.machine_id
 LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'ShepherdInstall'
+LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id AND work_backoff.until > now() AND work_backoff.process = 'ShepherdInstall'
 LEFT JOIN machine_agent_started ON machines.machine_id = machine_agent_started.machine_id
 WHERE
   machine_agent_started.machine_id IS NULL
   -- TODO(q3k): exclude machines which are not expected to run the agent (eg.
   -- are already exposed to a user).
   AND work.machine_id IS NULL
+  AND work_backoff.machine_id IS NULL
 LIMIT $1;
 
 -- name: GetMachineForAgentRecovery :many
@@ -25,6 +27,7 @@
 FROM machines
 INNER JOIN machine_provided ON machines.machine_id = machine_provided.machine_id
 LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'ShepherdInstall'
+LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id AND work_backoff.until > now() AND work_backoff.process = 'ShepherdInstall'
 LEFT JOIN machine_agent_started ON machines.machine_id = machine_agent_started.machine_id
 LEFT JOIN machine_agent_heartbeat ON machines.machine_id = machine_agent_heartbeat.machine_id
 WHERE
@@ -43,6 +46,7 @@
     )
   )
   AND work.machine_id IS NULL
+  AND work_backoff.machine_id IS NULL
 LIMIT $1;
 
 -- name: AuthenticateAgentConnection :many
@@ -75,5 +79,4 @@
         machine_os_installation_report.machine_id IS NULL
         OR machine_os_installation_report.generation != machine_os_installation_request.generation
     )
-LIMIT $2;
-
+LIMIT $2;
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/sessions.go b/cloud/bmaas/bmdb/sessions.go
index ccb1510..4336121 100644
--- a/cloud/bmaas/bmdb/sessions.go
+++ b/cloud/bmaas/bmdb/sessions.go
@@ -289,6 +289,14 @@
 			}
 			return fmt.Errorf("could not start work on %q: %w", mids[0], err)
 		}
+		err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
+			MachineID: mids[0],
+			Event:     model.WorkHistoryEventStarted,
+			Process:   process,
+		})
+		if err != nil {
+			return fmt.Errorf("could not insert history event: %w", err)
+		}
 		return nil
 	})
 	if err != nil {
@@ -329,11 +337,19 @@
 	// will be invalidated soon and so will the work being performed on this
 	// machine.
 	err := w.s.Transact(ctx, func(q *model.Queries) error {
-		return q.FinishWork(ctx, model.FinishWorkParams{
+		err := q.FinishWork(ctx, model.FinishWorkParams{
 			MachineID: w.Machine,
 			SessionID: w.s.UUID,
 			Process:   w.process,
 		})
+		if err != nil {
+			return err
+		}
+		return q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
+			MachineID: w.Machine,
+			Process:   w.process,
+			Event:     model.WorkHistoryEventCanceled,
+		})
 	})
 	if err != nil {
 		klog.Errorf("Failed to cancel work %q on %q (sess %q): %v", w.process, w.Machine, w.s.UUID, err)
@@ -362,6 +378,61 @@
 		if err != nil {
 			return err
 		}
+		err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
+			MachineID: w.Machine,
+			Process:   w.process,
+			Event:     model.WorkHistoryEventFinished,
+		})
+		if err != nil {
+			return err
+		}
 		return fn(q)
 	})
 }
+
+// Fail work and introduce backoff for a given duration (if given backoff is
+// non-nil). As long as that backoff is active, no further work for this
+// machine/process will be started. The given cause is an operator-readable
+// string that will be persisted alongside the backoff and the work history/audit
+// table.
+func (w *Work) Fail(ctx context.Context, backoff *time.Duration, cause string) error {
+	if w.done {
+		return fmt.Errorf("already finished")
+	}
+	w.done = true
+
+	return w.s.Transact(ctx, func(q *model.Queries) error {
+		err := q.FinishWork(ctx, model.FinishWorkParams{
+			MachineID: w.Machine,
+			SessionID: w.s.UUID,
+			Process:   w.process,
+		})
+		if err != nil {
+			return err
+		}
+		err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
+			MachineID: w.Machine,
+			Process:   w.process,
+			Event:     model.WorkHistoryEventFailed,
+			FailedCause: sql.NullString{
+				String: cause,
+				Valid:  true,
+			},
+		})
+		if err != nil {
+			return err
+		}
+		if backoff != nil && backoff.Seconds() >= 1.0 {
+			seconds := int64(backoff.Seconds())
+			klog.Infof("Adding backoff for %q on machine %q (%d seconds)", w.process, w.Machine, seconds)
+			return q.WorkBackoffInsert(ctx, model.WorkBackoffInsertParams{
+				MachineID: w.Machine,
+				Process:   w.process,
+				Seconds:   seconds,
+				Cause:     cause,
+			})
+		} else {
+			return nil
+		}
+	})
+}
diff --git a/cloud/bmaas/bmdb/sessions_test.go b/cloud/bmaas/bmdb/sessions_test.go
index 9664c17..097c79e 100644
--- a/cloud/bmaas/bmdb/sessions_test.go
+++ b/cloud/bmaas/bmdb/sessions_test.go
@@ -160,6 +160,104 @@
 	}
 }
 
+func TestWorkBackoff(t *testing.T) {
+	b := dut()
+	conn, err := b.Open(true)
+	if err != nil {
+		t.Fatalf("Open failed: %v", err)
+	}
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	session, err := conn.StartSession(ctx)
+	if err != nil {
+		t.Fatalf("Starting session failed: %v", err)
+	}
+
+	var machine model.Machine
+	// Create machine.
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		machine, err = q.NewMachine(ctx)
+		if err != nil {
+			return err
+		}
+		return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+			MachineID:  machine.MachineID,
+			Provider:   model.ProviderEquinix,
+			ProviderID: "123",
+		})
+	})
+	if err != nil {
+		t.Fatalf("Creating machine failed: %v", err)
+	}
+
+	// Work on machine, but fail it with a backoff.
+	work, err := session.Work(ctx, model.ProcessShepherdInstall, func(q *model.Queries) ([]uuid.UUID, error) {
+		machines, err := q.GetMachinesForAgentStart(ctx, 1)
+		if err != nil {
+			return nil, err
+		}
+		if len(machines) < 1 {
+			return nil, ErrNothingToDo
+		}
+		return []uuid.UUID{machines[0].MachineID}, nil
+	})
+	if err != nil {
+		t.Fatalf("Starting work failed: %v", err)
+	}
+	backoff := time.Hour
+	if err := work.Fail(ctx, &backoff, "test"); err != nil {
+		t.Fatalf("Failing work failed: %v", err)
+	}
+
+	// The machine shouldn't be returned now.
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		machines, err := q.GetMachinesForAgentStart(ctx, 1)
+		if err != nil {
+			return err
+		}
+		if len(machines) > 0 {
+			t.Errorf("Expected no machines ready for installation.")
+		}
+		return nil
+	})
+	if err != nil {
+		t.Errorf("Failed to retrieve machines for installation: %v", err)
+	}
+
+	// Instead of waiting for the backoff to expire, set it again, but this time
+	// make it immediate. This works because the backoff query acts as an upsert.
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		return q.WorkBackoffInsert(ctx, model.WorkBackoffInsertParams{
+			MachineID: machine.MachineID,
+			Process:   model.ProcessShepherdInstall,
+			Seconds:   0,
+		})
+	})
+	if err != nil {
+		t.Errorf("Failed to update backoff: %v", err)
+	}
+
+	// Just in case.
+	time.Sleep(100 * time.Millisecond)
+
+	// The machine should now be returned again.
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		machines, err := q.GetMachinesForAgentStart(ctx, 1)
+		if err != nil {
+			return err
+		}
+		if len(machines) != 1 {
+			t.Errorf("Expected exactly one machine ready for installation.")
+		}
+		return nil
+	})
+	if err != nil {
+		t.Errorf("Failed to retrieve machines for installation: %v", err)
+	}
+}
+
 // TestInstallationWorkflow exercises the agent installation workflow within the
 // BMDB.
 func TestInstallationWorkflow(t *testing.T) {
@@ -194,6 +292,7 @@
 	}
 
 	// Start working on a machine.
+	var machine uuid.UUID
 	startedC := make(chan struct{})
 	doneC := make(chan struct{})
 	errC := make(chan error)
@@ -206,6 +305,7 @@
 			if len(machines) < 1 {
 				return nil, ErrNothingToDo
 			}
+			machine = machines[0].MachineID
 			return []uuid.UUID{machines[0].MachineID}, nil
 		})
 		defer work.Cancel(ctx)
@@ -218,6 +318,7 @@
 
 		// Simulate work by blocking on a channel.
 		close(startedC)
+
 		<-doneC
 
 		err = work.Finish(ctx, func(q *model.Queries) error {
@@ -243,13 +344,13 @@
 		return nil
 	})
 	if err != nil {
-		t.Errorf("Failed to retrieve machines for installation in parallel: %v", err)
+		t.Fatalf("Failed to retrieve machines for installation in parallel: %v", err)
 	}
 	// Finish working on machine.
 	close(doneC)
 	err = <-errC
 	if err != nil {
-		t.Errorf("Failed to finish work on machine: %v", err)
+		t.Fatalf("Failed to finish work on machine: %v", err)
 	}
 	// That machine is now installed, so we still expect no work to have to be done.
 	err = session.Transact(ctx, func(q *model.Queries) error {
@@ -263,7 +364,37 @@
 		return nil
 	})
 	if err != nil {
-		t.Errorf("Failed to retrieve machines for installation after work finished: %v", err)
+		t.Fatalf("Failed to retrieve machines for installation after work finished: %v", err)
+	}
+
+	// Check history has been recorded.
+	var history []model.WorkHistory
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		history, err = q.ListHistoryOf(ctx, machine)
+		return err
+	})
+	if err != nil {
+		t.Fatalf("Failed to retrieve machine history: %v", err)
+	}
+	// Expect two history items: started and finished.
+	if want, got := 2, len(history); want != got {
+		t.Errorf("Wanted %d history items, got %d", want, got)
+	} else {
+		if want, got := model.WorkHistoryEventStarted, history[0].Event; want != got {
+			t.Errorf("Wanted first history event to be %s, got %s", want, got)
+		}
+		if want, got := model.WorkHistoryEventFinished, history[1].Event; want != got {
+			t.Errorf("Wanted second history event to be %s, got %s", want, got)
+		}
+	}
+	// Check all other history event data.
+	for i, el := range history {
+		if want, got := machine, el.MachineID; want.String() != got.String() {
+			t.Errorf("Wanted %d history event machine ID to be %s, got %s", i, want, got)
+		}
+		if want, got := model.ProcessShepherdInstall, el.Process; want != got {
+			t.Errorf("Wanted %d history event process to be %s, got %s", i, want, got)
+		}
 	}
 }