c/bmaas/bmdb: implement backoff and history
This augments the existing Work mechanism with a Fail outcome/method
which allows insertion of a machine & process backoff until a deadline
expires.
We also add a history/audit table which contains information about the
work history of a machine - when some work started, finished, failed or
got cancelled.
Change-Id: If890a412977c1d3c7ff3baa69987fb74932818a0
Reviewed-on: https://review.monogon.dev/c/monogon/+/1086
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/cloud/bmaas/bmdb/sessions_test.go b/cloud/bmaas/bmdb/sessions_test.go
index 9664c17..097c79e 100644
--- a/cloud/bmaas/bmdb/sessions_test.go
+++ b/cloud/bmaas/bmdb/sessions_test.go
@@ -160,6 +160,104 @@
}
}
+func TestWorkBackoff(t *testing.T) {
+ b := dut()
+ conn, err := b.Open(true)
+ if err != nil {
+ t.Fatalf("Open failed: %v", err)
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ session, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("Starting session failed: %v", err)
+ }
+
+ var machine model.Machine
+ // Create machine.
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ machine, err = q.NewMachine(ctx)
+ if err != nil {
+ return err
+ }
+ return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: machine.MachineID,
+ Provider: model.ProviderEquinix,
+ ProviderID: "123",
+ })
+ })
+ if err != nil {
+ t.Fatalf("Creating machine failed: %v", err)
+ }
+
+ // Work on machine, but fail it with a backoff.
+ work, err := session.Work(ctx, model.ProcessShepherdInstall, func(q *model.Queries) ([]uuid.UUID, error) {
+ machines, err := q.GetMachinesForAgentStart(ctx, 1)
+ if err != nil {
+ return nil, err
+ }
+ if len(machines) < 1 {
+ return nil, ErrNothingToDo
+ }
+ return []uuid.UUID{machines[0].MachineID}, nil
+ })
+ if err != nil {
+ t.Fatalf("Starting work failed: %v", err)
+ }
+ backoff := time.Hour
+ if err := work.Fail(ctx, &backoff, "test"); err != nil {
+ t.Fatalf("Failing work failed: %v", err)
+ }
+
+ // The machine shouldn't be returned now.
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ machines, err := q.GetMachinesForAgentStart(ctx, 1)
+ if err != nil {
+ return err
+ }
+ if len(machines) > 0 {
+ t.Errorf("Expected no machines ready for installation.")
+ }
+ return nil
+ })
+ if err != nil {
+ t.Errorf("Failed to retrieve machines for installation: %v", err)
+ }
+
+ // Instead of waiting for the backoff to expire, set it again, but this time
+ // make it immediate. This works because the backoff query acts as an upsert.
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ return q.WorkBackoffInsert(ctx, model.WorkBackoffInsertParams{
+ MachineID: machine.MachineID,
+ Process: model.ProcessShepherdInstall,
+ Seconds: 0,
+ })
+ })
+ if err != nil {
+ t.Errorf("Failed to update backoff: %v", err)
+ }
+
+ // Just in case.
+ time.Sleep(100 * time.Millisecond)
+
+ // The machine should now be returned again.
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ machines, err := q.GetMachinesForAgentStart(ctx, 1)
+ if err != nil {
+ return err
+ }
+ if len(machines) != 1 {
+ t.Errorf("Expected exactly one machine ready for installation.")
+ }
+ return nil
+ })
+ if err != nil {
+ t.Errorf("Failed to retrieve machines for installation: %v", err)
+ }
+}
+
// TestInstallationWorkflow exercises the agent installation workflow within the
// BMDB.
func TestInstallationWorkflow(t *testing.T) {
@@ -194,6 +292,7 @@
}
// Start working on a machine.
+ var machine uuid.UUID
startedC := make(chan struct{})
doneC := make(chan struct{})
errC := make(chan error)
@@ -206,6 +305,7 @@
if len(machines) < 1 {
return nil, ErrNothingToDo
}
+ machine = machines[0].MachineID
return []uuid.UUID{machines[0].MachineID}, nil
})
defer work.Cancel(ctx)
@@ -218,6 +318,7 @@
// Simulate work by blocking on a channel.
close(startedC)
+
<-doneC
err = work.Finish(ctx, func(q *model.Queries) error {
@@ -243,13 +344,13 @@
return nil
})
if err != nil {
- t.Errorf("Failed to retrieve machines for installation in parallel: %v", err)
+ t.Fatalf("Failed to retrieve machines for installation in parallel: %v", err)
}
// Finish working on machine.
close(doneC)
err = <-errC
if err != nil {
- t.Errorf("Failed to finish work on machine: %v", err)
+ t.Fatalf("Failed to finish work on machine: %v", err)
}
// That machine is now installed, so we still expect no work to have to be done.
err = session.Transact(ctx, func(q *model.Queries) error {
@@ -263,7 +364,37 @@
return nil
})
if err != nil {
- t.Errorf("Failed to retrieve machines for installation after work finished: %v", err)
+ t.Fatalf("Failed to retrieve machines for installation after work finished: %v", err)
+ }
+
+ // Check history has been recorded.
+ var history []model.WorkHistory
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ history, err = q.ListHistoryOf(ctx, machine)
+ return err
+ })
+ if err != nil {
+ t.Fatalf("Failed to retrieve machine history: %v", err)
+ }
+ // Expect two history items: started and finished.
+ if want, got := 2, len(history); want != got {
+ t.Errorf("Wanted %d history items, got %d", want, got)
+ } else {
+ if want, got := model.WorkHistoryEventStarted, history[0].Event; want != got {
+ t.Errorf("Wanted first history event to be %s, got %s", want, got)
+ }
+ if want, got := model.WorkHistoryEventFinished, history[1].Event; want != got {
+ t.Errorf("Wanted second history event to be %s, got %s", want, got)
+ }
+ }
+ // Check all other history event data.
+ for i, el := range history {
+ if want, got := machine, el.MachineID; want.String() != got.String() {
+ t.Errorf("Wanted %d history event machine ID to be %s, got %s", i, want, got)
+ }
+ if want, got := model.ProcessShepherdInstall, el.Process; want != got {
+ t.Errorf("Wanted %d history event process to be %s, got %s", i, want, got)
+ }
}
}