c/bmaas/bmdb: implement backoff and history
This augments the existing Work mechanism with a Fail outcome/method
which allows insertion of a machine & process backoff until a deadline
expires.
We also add a history/audit table which contains information about the
work history of a machine - when some work started, finished, failed or
got cancelled.
Change-Id: If890a412977c1d3c7ff3baa69987fb74932818a0
Reviewed-on: https://review.monogon.dev/c/monogon/+/1086
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.down.sql b/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.down.sql
new file mode 100644
index 0000000..2aa3ced
--- /dev/null
+++ b/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.down.sql
@@ -0,0 +1,3 @@
+DROP TABLE work_backoff;
+DROP TABLE work_history;
+DROP TYPE work_history_event;
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.up.sql b/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.up.sql
new file mode 100644
index 0000000..24f90e5
--- /dev/null
+++ b/cloud/bmaas/bmdb/model/migrations/1672749980_backoff.up.sql
@@ -0,0 +1,53 @@
+CREATE TYPE work_history_event AS ENUM (
+ 'Started',
+ 'Finished',
+ 'Failed',
+ 'Canceled'
+);
+
+-- Audit trail of work history for a given machine.
+CREATE TABLE work_history(
+ -- The machine subject to this audit entry. As we want to allow keeping
+ -- information about deleted machines, this is not a foreign key.
+ machine_id UUID NOT NULL,
+
+ -- TODO(q3k): session history?
+
+ -- Process acting on this machine which caused an audit entry to be created.
+ process process NOT NULL,
+ -- Process lifecycle event (started, finished, etc) that caused this audit
+ -- entry to be created.
+ event work_history_event NOT NULL,
+ -- Time at which this entry was created.
+ timestamp TIMESTAMPTZ NOT NULL,
+
+ -- Failure cause, only set when event == Failed.
+ failed_cause STRING
+);
+
+CREATE INDEX ON work_history (machine_id);
+
+-- Backoff entries are created by failed work items, and effectively act as
+-- a Lockout-tagout entry for a given machine and a given process.
+--
+-- Currently, there is no way to fully backoff an entire machine, just
+-- individual processes from a given machine.
+--
+-- Backoff entries are only valid as long as 'until' is before now(), after that
+-- they are ignored by workflow queries. Insertion queries act as upserts,
+-- and thus the backoff entries do not need to be garbage collected, as they do
+-- not grow unbounded (maximumum one entry per process/machine).
+CREATE TABLE work_backoff(
+ -- The machine affected by this backoff.
+ machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE CASCADE,
+ -- The process that this machine should not be subjected to.
+ process process NOT NULL,
+ -- Until when the backoff is enforced.
+ until TIMESTAMPTZ NOT NULL,
+
+ -- Error reported by process/work when this backoff was inserted.
+ -- Human-readable.
+ cause STRING NOT NULL,
+
+ UNIQUE(machine_id, process)
+);
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/queries_base.sql b/cloud/bmaas/bmdb/model/queries_base.sql
index 60d1737..8257fb8 100644
--- a/cloud/bmaas/bmdb/model/queries_base.sql
+++ b/cloud/bmaas/bmdb/model/queries_base.sql
@@ -42,3 +42,29 @@
AND session_id = $2
AND process = $3;
+
+-- name: WorkHistoryInsert :exec
+-- Insert an entry into the work_history audit table.
+INSERT INTO work_history (
+ machine_id, process, event, timestamp, failed_cause
+) VALUES (
+ $1, $2, $3, now(), $4
+);
+
+-- name: WorkBackoffInsert :exec
+-- Upsert a backoff for a given machine/process.
+INSERT INTO work_backoff (
+ machine_id, process, cause, until
+) VALUES (
+ $1, $2, $3, now() + (sqlc.arg(seconds)::int * interval '1 second')
+) ON CONFLICT (machine_id, process) DO UPDATE SET
+ cause = $3,
+ until = now() + (sqlc.arg(seconds)::int * interval '1 second')
+;
+
+-- name: ListHistoryOf :many
+-- Retrieve full audit history of a machine.
+SELECT *
+FROM work_history
+WHERE machine_id = $1
+ORDER BY timestamp ASC;
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/queries_workflows.sql b/cloud/bmaas/bmdb/model/queries_workflows.sql
index d2e9966..b7b60fe 100644
--- a/cloud/bmaas/bmdb/model/queries_workflows.sql
+++ b/cloud/bmaas/bmdb/model/queries_workflows.sql
@@ -6,12 +6,14 @@
FROM machines
INNER JOIN machine_provided ON machines.machine_id = machine_provided.machine_id
LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'ShepherdInstall'
+LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id AND work_backoff.until > now() AND work_backoff.process = 'ShepherdInstall'
LEFT JOIN machine_agent_started ON machines.machine_id = machine_agent_started.machine_id
WHERE
machine_agent_started.machine_id IS NULL
-- TODO(q3k): exclude machines which are not expected to run the agent (eg.
-- are already exposed to a user).
AND work.machine_id IS NULL
+ AND work_backoff.machine_id IS NULL
LIMIT $1;
-- name: GetMachineForAgentRecovery :many
@@ -25,6 +27,7 @@
FROM machines
INNER JOIN machine_provided ON machines.machine_id = machine_provided.machine_id
LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'ShepherdInstall'
+LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id AND work_backoff.until > now() AND work_backoff.process = 'ShepherdInstall'
LEFT JOIN machine_agent_started ON machines.machine_id = machine_agent_started.machine_id
LEFT JOIN machine_agent_heartbeat ON machines.machine_id = machine_agent_heartbeat.machine_id
WHERE
@@ -43,6 +46,7 @@
)
)
AND work.machine_id IS NULL
+ AND work_backoff.machine_id IS NULL
LIMIT $1;
-- name: AuthenticateAgentConnection :many
@@ -75,5 +79,4 @@
machine_os_installation_report.machine_id IS NULL
OR machine_os_installation_report.generation != machine_os_installation_request.generation
)
-LIMIT $2;
-
+LIMIT $2;
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/sessions.go b/cloud/bmaas/bmdb/sessions.go
index ccb1510..4336121 100644
--- a/cloud/bmaas/bmdb/sessions.go
+++ b/cloud/bmaas/bmdb/sessions.go
@@ -289,6 +289,14 @@
}
return fmt.Errorf("could not start work on %q: %w", mids[0], err)
}
+ err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
+ MachineID: mids[0],
+ Event: model.WorkHistoryEventStarted,
+ Process: process,
+ })
+ if err != nil {
+ return fmt.Errorf("could not insert history event: %w", err)
+ }
return nil
})
if err != nil {
@@ -329,11 +337,19 @@
// will be invalidated soon and so will the work being performed on this
// machine.
err := w.s.Transact(ctx, func(q *model.Queries) error {
- return q.FinishWork(ctx, model.FinishWorkParams{
+ err := q.FinishWork(ctx, model.FinishWorkParams{
MachineID: w.Machine,
SessionID: w.s.UUID,
Process: w.process,
})
+ if err != nil {
+ return err
+ }
+ return q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
+ MachineID: w.Machine,
+ Process: w.process,
+ Event: model.WorkHistoryEventCanceled,
+ })
})
if err != nil {
klog.Errorf("Failed to cancel work %q on %q (sess %q): %v", w.process, w.Machine, w.s.UUID, err)
@@ -362,6 +378,61 @@
if err != nil {
return err
}
+ err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
+ MachineID: w.Machine,
+ Process: w.process,
+ Event: model.WorkHistoryEventFinished,
+ })
+ if err != nil {
+ return err
+ }
return fn(q)
})
}
+
+// Fail work and introduce backoff for a given duration (if given backoff is
+// non-nil). As long as that backoff is active, no further work for this
+// machine/process will be started. The given cause is an operator-readable
+// string that will be persisted alongside the backoff and the work history/audit
+// table.
+func (w *Work) Fail(ctx context.Context, backoff *time.Duration, cause string) error {
+ if w.done {
+ return fmt.Errorf("already finished")
+ }
+ w.done = true
+
+ return w.s.Transact(ctx, func(q *model.Queries) error {
+ err := q.FinishWork(ctx, model.FinishWorkParams{
+ MachineID: w.Machine,
+ SessionID: w.s.UUID,
+ Process: w.process,
+ })
+ if err != nil {
+ return err
+ }
+ err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
+ MachineID: w.Machine,
+ Process: w.process,
+ Event: model.WorkHistoryEventFailed,
+ FailedCause: sql.NullString{
+ String: cause,
+ Valid: true,
+ },
+ })
+ if err != nil {
+ return err
+ }
+ if backoff != nil && backoff.Seconds() >= 1.0 {
+ seconds := int64(backoff.Seconds())
+ klog.Infof("Adding backoff for %q on machine %q (%d seconds)", w.process, w.Machine, seconds)
+ return q.WorkBackoffInsert(ctx, model.WorkBackoffInsertParams{
+ MachineID: w.Machine,
+ Process: w.process,
+ Seconds: seconds,
+ Cause: cause,
+ })
+ } else {
+ return nil
+ }
+ })
+}
diff --git a/cloud/bmaas/bmdb/sessions_test.go b/cloud/bmaas/bmdb/sessions_test.go
index 9664c17..097c79e 100644
--- a/cloud/bmaas/bmdb/sessions_test.go
+++ b/cloud/bmaas/bmdb/sessions_test.go
@@ -160,6 +160,104 @@
}
}
+func TestWorkBackoff(t *testing.T) {
+ b := dut()
+ conn, err := b.Open(true)
+ if err != nil {
+ t.Fatalf("Open failed: %v", err)
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ session, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("Starting session failed: %v", err)
+ }
+
+ var machine model.Machine
+ // Create machine.
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ machine, err = q.NewMachine(ctx)
+ if err != nil {
+ return err
+ }
+ return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: machine.MachineID,
+ Provider: model.ProviderEquinix,
+ ProviderID: "123",
+ })
+ })
+ if err != nil {
+ t.Fatalf("Creating machine failed: %v", err)
+ }
+
+ // Work on machine, but fail it with a backoff.
+ work, err := session.Work(ctx, model.ProcessShepherdInstall, func(q *model.Queries) ([]uuid.UUID, error) {
+ machines, err := q.GetMachinesForAgentStart(ctx, 1)
+ if err != nil {
+ return nil, err
+ }
+ if len(machines) < 1 {
+ return nil, ErrNothingToDo
+ }
+ return []uuid.UUID{machines[0].MachineID}, nil
+ })
+ if err != nil {
+ t.Fatalf("Starting work failed: %v", err)
+ }
+ backoff := time.Hour
+ if err := work.Fail(ctx, &backoff, "test"); err != nil {
+ t.Fatalf("Failing work failed: %v", err)
+ }
+
+ // The machine shouldn't be returned now.
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ machines, err := q.GetMachinesForAgentStart(ctx, 1)
+ if err != nil {
+ return err
+ }
+ if len(machines) > 0 {
+ t.Errorf("Expected no machines ready for installation.")
+ }
+ return nil
+ })
+ if err != nil {
+ t.Errorf("Failed to retrieve machines for installation: %v", err)
+ }
+
+ // Instead of waiting for the backoff to expire, set it again, but this time
+ // make it immediate. This works because the backoff query acts as an upsert.
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ return q.WorkBackoffInsert(ctx, model.WorkBackoffInsertParams{
+ MachineID: machine.MachineID,
+ Process: model.ProcessShepherdInstall,
+ Seconds: 0,
+ })
+ })
+ if err != nil {
+ t.Errorf("Failed to update backoff: %v", err)
+ }
+
+ // Just in case.
+ time.Sleep(100 * time.Millisecond)
+
+ // The machine should now be returned again.
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ machines, err := q.GetMachinesForAgentStart(ctx, 1)
+ if err != nil {
+ return err
+ }
+ if len(machines) != 1 {
+ t.Errorf("Expected exactly one machine ready for installation.")
+ }
+ return nil
+ })
+ if err != nil {
+ t.Errorf("Failed to retrieve machines for installation: %v", err)
+ }
+}
+
// TestInstallationWorkflow exercises the agent installation workflow within the
// BMDB.
func TestInstallationWorkflow(t *testing.T) {
@@ -194,6 +292,7 @@
}
// Start working on a machine.
+ var machine uuid.UUID
startedC := make(chan struct{})
doneC := make(chan struct{})
errC := make(chan error)
@@ -206,6 +305,7 @@
if len(machines) < 1 {
return nil, ErrNothingToDo
}
+ machine = machines[0].MachineID
return []uuid.UUID{machines[0].MachineID}, nil
})
defer work.Cancel(ctx)
@@ -218,6 +318,7 @@
// Simulate work by blocking on a channel.
close(startedC)
+
<-doneC
err = work.Finish(ctx, func(q *model.Queries) error {
@@ -243,13 +344,13 @@
return nil
})
if err != nil {
- t.Errorf("Failed to retrieve machines for installation in parallel: %v", err)
+ t.Fatalf("Failed to retrieve machines for installation in parallel: %v", err)
}
// Finish working on machine.
close(doneC)
err = <-errC
if err != nil {
- t.Errorf("Failed to finish work on machine: %v", err)
+ t.Fatalf("Failed to finish work on machine: %v", err)
}
// That machine is now installed, so we still expect no work to have to be done.
err = session.Transact(ctx, func(q *model.Queries) error {
@@ -263,7 +364,37 @@
return nil
})
if err != nil {
- t.Errorf("Failed to retrieve machines for installation after work finished: %v", err)
+ t.Fatalf("Failed to retrieve machines for installation after work finished: %v", err)
+ }
+
+ // Check history has been recorded.
+ var history []model.WorkHistory
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ history, err = q.ListHistoryOf(ctx, machine)
+ return err
+ })
+ if err != nil {
+ t.Fatalf("Failed to retrieve machine history: %v", err)
+ }
+ // Expect two history items: started and finished.
+ if want, got := 2, len(history); want != got {
+ t.Errorf("Wanted %d history items, got %d", want, got)
+ } else {
+ if want, got := model.WorkHistoryEventStarted, history[0].Event; want != got {
+ t.Errorf("Wanted first history event to be %s, got %s", want, got)
+ }
+ if want, got := model.WorkHistoryEventFinished, history[1].Event; want != got {
+ t.Errorf("Wanted second history event to be %s, got %s", want, got)
+ }
+ }
+ // Check all other history event data.
+ for i, el := range history {
+ if want, got := machine, el.MachineID; want.String() != got.String() {
+ t.Errorf("Wanted %d history event machine ID to be %s, got %s", i, want, got)
+ }
+ if want, got := model.ProcessShepherdInstall, el.Process; want != got {
+ t.Errorf("Wanted %d history event process to be %s, got %s", i, want, got)
+ }
}
}