blob: b8625a9ba74cafd4a4fa1d10fb42c1df59276306 [file] [log] [blame]
package bmdb
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/google/uuid"
"source.monogon.dev/cloud/bmaas/bmdb/model"
"source.monogon.dev/cloud/lib/component"
)
func dut() *BMDB {
return &BMDB{
Config: Config{
Database: component.CockroachConfig{
InMemory: true,
},
},
}
}
// TestSessionExpiry exercises the session heartbeat logic, making sure that if
// a session stops being maintained subsequent Transact calls will fail.
func TestSessionExpiry(t *testing.T) {
b := dut()
conn, err := b.Open(true)
if err != nil {
t.Fatalf("Open failed: %v", err)
}
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
session, err := conn.StartSession(ctx)
if err != nil {
t.Fatalf("Starting session failed: %v", err)
}
// A transaction in a brand-new session should work.
var machine model.Machine
err = session.Transact(ctx, func(q *model.Queries) error {
machine, err = q.NewMachine(ctx)
return err
})
if err != nil {
t.Fatalf("First transaction failed: %v", err)
}
time.Sleep(6 * time.Second)
// A transaction after the 5-second session interval should continue to work.
err = session.Transact(ctx, func(q *model.Queries) error {
_, err = q.NewMachine(ctx)
return err
})
if err != nil {
t.Fatalf("Second transaction failed: %v", err)
}
// A transaction after the 5-second session interval should fail if we don't
// maintain its heartbeat.
session.ctxC()
time.Sleep(6 * time.Second)
err = session.Transact(ctx, func(q *model.Queries) error {
return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
MachineID: machine.MachineID,
Provider: "foo",
ProviderID: "bar",
})
})
if !errors.Is(err, ErrSessionExpired) {
t.Fatalf("Second transaction should've failed due to expired session, got %v", err)
}
}
// TestWork exercises the per-{process,machine} mutual exclusion mechanism of
// Work items.
func TestWork(t *testing.T) {
b := dut()
conn, err := b.Open(true)
if err != nil {
t.Fatalf("Open failed: %v", err)
}
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
// Start two session for testing.
session1, err := conn.StartSession(ctx)
if err != nil {
t.Fatalf("Starting session failed: %v", err)
}
session2, err := conn.StartSession(ctx)
if err != nil {
t.Fatalf("Starting session failed: %v", err)
}
var machine model.Machine
err = session1.Transact(ctx, func(q *model.Queries) error {
machine, err = q.NewMachine(ctx)
return err
})
if err != nil {
t.Fatalf("Creating machine failed: %v", err)
}
// Create a subcontext for a long-term work request. We'll cancel it later as
// part of the test.
ctxB, ctxBC := context.WithCancel(ctx)
defer ctxBC()
constantRetriever := func(_ *model.Queries) ([]uuid.UUID, error) {
return []uuid.UUID{machine.MachineID}, nil
}
// Start work on machine which we're not gonna finish for a while.
work1, err := session1.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
if err != nil {
t.Fatalf("Starting first work failed: %v", err)
}
// Starting more work on the same machine but a different process should still
// be allowed.
for _, session := range []*Session{session1, session2} {
work2, err := session.Work(ctxB, model.ProcessUnitTest2, constantRetriever)
if err != nil {
t.Errorf("Could not run concurrent process on machine: %v", err)
} else {
work2.Cancel(ctxB)
}
}
// However, starting work with the same process on the same machine should
// fail.
for _, session := range []*Session{session1, session2} {
work2, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
if !errors.Is(err, ErrWorkConflict) {
t.Errorf("Concurrent work with same process should've been forbidden, got %v", err)
work2.Cancel(ctxB)
}
}
// Now, finish the long-running work.
work1.Cancel(ctx)
// We should now be able to perform 'test1' work again against this machine.
for _, session := range []*Session{session1, session2} {
work1, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
if err != nil {
t.Errorf("Could not run work against machine: %v", err)
} else {
work1.Cancel(ctxB)
}
}
}
// TestWorkBackoff exercises the backoff functionality within the BMDB.
func TestWorkBackoff(t *testing.T) {
b := dut()
conn, err := b.Open(true)
if err != nil {
t.Fatalf("Open failed: %v", err)
}
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
session, err := conn.StartSession(ctx)
if err != nil {
t.Fatalf("Starting session failed: %v", err)
}
var machine model.Machine
// Create machine.
err = session.Transact(ctx, func(q *model.Queries) error {
machine, err = q.NewMachine(ctx)
if err != nil {
return err
}
return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
MachineID: machine.MachineID,
Provider: model.ProviderEquinix,
ProviderID: "123",
})
})
if err != nil {
t.Fatalf("Creating machine failed: %v", err)
}
waitMachine := func(nsec int64) *Work {
t.Helper()
deadline := time.Now().Add(time.Duration(nsec) * 2 * time.Second)
for {
if time.Now().After(deadline) {
t.Fatalf("Deadline expired")
}
work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
machines, err := q.GetMachinesForAgentStart(ctx, 1)
if err != nil {
return nil, err
}
if len(machines) < 1 {
return nil, ErrNothingToDo
}
return []uuid.UUID{machines[0].MachineID}, nil
})
if err == nil {
return work
}
if !errors.Is(err, ErrNothingToDo) {
t.Fatalf("Unexpected work error: %v", err)
}
time.Sleep(100 * time.Millisecond)
}
}
// Work on machine, but fail it with a backoff.
work := waitMachine(1)
backoff := Backoff{
Initial: time.Second,
Maximum: 5 * time.Second,
Exponent: 2,
}
if err := work.Fail(ctx, &backoff, "test"); err != nil {
t.Fatalf("Failing work failed: %v", err)
}
expect := func(count int) {
t.Helper()
var machines []model.MachineProvided
var err error
err = session.Transact(ctx, func(q *model.Queries) error {
machines, err = q.GetMachinesForAgentStart(ctx, 1)
if err != nil {
return err
}
return nil
})
if err != nil {
t.Errorf("Failed to retrieve machines for agent start: %v", err)
}
if want, got := count, len(machines); want != got {
t.Errorf("Expected %d machines, got %d", want, got)
}
}
// The machine shouldn't be returned now.
expect(0)
// Wait for the backoff to expire.
time.Sleep(1100 * time.Millisecond)
// The machine should now be returned again.
expect(1)
// Prepare helper for checking exponential backoffs.
failAndCheck := func(nsec int64) {
t.Helper()
work := waitMachine(nsec)
if err := work.Fail(ctx, &backoff, "test"); err != nil {
t.Fatalf("Failing work failed: %v", err)
}
var backoffs []model.WorkBackoff
err = session.Transact(ctx, func(q *model.Queries) error {
var err error
backoffs, err = q.WorkBackoffOf(ctx, model.WorkBackoffOfParams{
MachineID: machine.MachineID,
Process: model.ProcessShepherdAgentStart,
})
return err
})
if err != nil {
t.Errorf("Failed to retrieve machines for agent start: %v", err)
}
if len(backoffs) < 1 {
t.Errorf("No backoff")
} else {
backoff := backoffs[0]
if want, got := nsec, backoff.LastIntervalSeconds.Int64; want != got {
t.Fatalf("Wanted backoff of %d seconds, got %d", want, got)
}
}
}
// Exercise exponential backoff functionality.
failAndCheck(2)
failAndCheck(4)
failAndCheck(5)
failAndCheck(5)
// If the job now succeeds, subsequent failures should start from 1 again.
work = waitMachine(5)
err = work.Finish(ctx, func(q *model.Queries) error {
// Not setting any tags that would cause subsequent queries to not return the
// machine anymore.
return nil
})
if err != nil {
t.Fatalf("Could not finish work: %v", err)
}
failAndCheck(1)
failAndCheck(2)
}
// TestAgentStartWorkflow exercises the agent start workflow within the BMDB.
func TestAgentStartWorkflow(t *testing.T) {
b := dut()
conn, err := b.Open(true)
if err != nil {
t.Fatalf("Open failed: %v", err)
}
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
session, err := conn.StartSession(ctx)
if err != nil {
t.Fatalf("Starting session failed: %v", err)
}
// Create machine. Drop its ID.
err = session.Transact(ctx, func(q *model.Queries) error {
machine, err := q.NewMachine(ctx)
if err != nil {
return err
}
return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
MachineID: machine.MachineID,
Provider: model.ProviderEquinix,
ProviderID: "123",
})
})
if err != nil {
t.Fatalf("Creating machine failed: %v", err)
}
// Start working on a machine.
var machine uuid.UUID
startedC := make(chan struct{})
doneC := make(chan struct{})
errC := make(chan error)
go func() {
work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
machines, err := q.GetMachinesForAgentStart(ctx, 1)
if err != nil {
return nil, err
}
if len(machines) < 1 {
return nil, ErrNothingToDo
}
machine = machines[0].MachineID
return []uuid.UUID{machines[0].MachineID}, nil
})
defer work.Cancel(ctx)
if err != nil {
close(startedC)
errC <- err
return
}
// Simulate work by blocking on a channel.
close(startedC)
<-doneC
err = work.Finish(ctx, func(q *model.Queries) error {
return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
MachineID: work.Machine,
AgentStartedAt: time.Now(),
AgentPublicKey: []byte("fakefakefake"),
})
})
errC <- err
}()
<-startedC
// Work on the machine has started. Attempting to get more machines now should
// return no machines.
// Mutual exclusion with AgentStart:
err = session.Transact(ctx, func(q *model.Queries) error {
machines, err := q.GetMachinesForAgentStart(ctx, 1)
if err != nil {
return err
}
if len(machines) > 0 {
t.Errorf("Expected no machines ready for agent start.")
}
return nil
})
if err != nil {
t.Fatalf("Failed to retrieve machines for start in parallel: %v", err)
}
// Mutual exclusion with Recovery:
err = session.Transact(ctx, func(q *model.Queries) error {
machines, err := q.GetMachineForAgentRecovery(ctx, 1)
if err != nil {
return err
}
if len(machines) > 0 {
t.Errorf("Expected no machines ready for agent recovery.")
}
return nil
})
if err != nil {
t.Fatalf("Failed to retrieve machines for recovery in parallel: %v", err)
}
// Finish working on machine.
close(doneC)
err = <-errC
if err != nil {
t.Fatalf("Failed to finish work on machine: %v", err)
}
// That machine has its agent started, so we still expect no work to have to be
// done.
err = session.Transact(ctx, func(q *model.Queries) error {
machines, err := q.GetMachinesForAgentStart(ctx, 1)
if err != nil {
return err
}
if len(machines) > 0 {
t.Errorf("Expected still no machines ready for agent start.")
}
return nil
})
if err != nil {
t.Fatalf("Failed to retrieve machines for agent start after work finished: %v", err)
}
// Check history has been recorded.
var history []model.WorkHistory
err = session.Transact(ctx, func(q *model.Queries) error {
history, err = q.ListHistoryOf(ctx, machine)
return err
})
if err != nil {
t.Fatalf("Failed to retrieve machine history: %v", err)
}
// Expect two history items: started and finished.
if want, got := 2, len(history); want != got {
t.Errorf("Wanted %d history items, got %d", want, got)
} else {
if want, got := model.WorkHistoryEventStarted, history[0].Event; want != got {
t.Errorf("Wanted first history event to be %s, got %s", want, got)
}
if want, got := model.WorkHistoryEventFinished, history[1].Event; want != got {
t.Errorf("Wanted second history event to be %s, got %s", want, got)
}
}
// Check all other history event data.
for i, el := range history {
if want, got := machine, el.MachineID; want.String() != got.String() {
t.Errorf("Wanted %d history event machine ID to be %s, got %s", i, want, got)
}
if want, got := model.ProcessShepherdAgentStart, el.Process; want != got {
t.Errorf("Wanted %d history event process to be %s, got %s", i, want, got)
}
}
}
// TestAgentStartWorkflowParallel starts work on three machines by six workers
// and makes sure that there are no scheduling conflicts between them.
func TestAgentStartWorkflowParallel(t *testing.T) {
b := dut()
conn, err := b.Open(true)
if err != nil {
t.Fatalf("Open failed: %v", err)
}
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
makeMachine := func(providerID string) {
ctxS, ctxC := context.WithCancel(ctx)
defer ctxC()
session, err := conn.StartSession(ctxS)
if err != nil {
t.Fatalf("Starting session failed: %v", err)
}
err = session.Transact(ctx, func(q *model.Queries) error {
machine, err := q.NewMachine(ctx)
if err != nil {
return err
}
return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
MachineID: machine.MachineID,
Provider: model.ProviderEquinix,
ProviderID: providerID,
})
})
if err != nil {
t.Fatalf("Creating machine failed: %v", err)
}
}
// Make six machines for testing.
for i := 0; i < 6; i++ {
makeMachine(fmt.Sprintf("test%d", i))
}
workStarted := make(chan struct{})
workDone := make(chan struct {
machine uuid.UUID
workerID int
})
workOnce := func(ctx context.Context, workerID int, session *Session) error {
work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
machines, err := q.GetMachinesForAgentStart(ctx, 1)
if err != nil {
return nil, err
}
if len(machines) < 1 {
return nil, ErrNothingToDo
}
return []uuid.UUID{machines[0].MachineID}, nil
})
if err != nil {
return err
}
defer work.Cancel(ctx)
select {
case <-workStarted:
case <-ctx.Done():
return ctx.Err()
}
select {
case workDone <- struct {
machine uuid.UUID
workerID int
}{
machine: work.Machine,
workerID: workerID,
}:
case <-ctx.Done():
return ctx.Err()
}
return work.Finish(ctx, func(q *model.Queries) error {
return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
MachineID: work.Machine,
AgentStartedAt: time.Now(),
AgentPublicKey: []byte("fakefakefake"),
})
})
}
worker := func(workerID int) {
ctxS, ctxC := context.WithCancel(ctx)
defer ctxC()
session, err := conn.StartSession(ctxS)
if err != nil {
t.Fatalf("Starting session failed: %v", err)
}
for {
err := workOnce(ctxS, workerID, session)
if err != nil {
if errors.Is(err, ErrNothingToDo) {
continue
}
if errors.Is(err, ctxS.Err()) {
return
}
t.Fatalf("worker failed: %v", err)
}
}
}
// Start three workers.
for i := 0; i < 3; i++ {
go worker(i)
}
// Wait for at least three workers to be alive.
for i := 0; i < 3; i++ {
workStarted <- struct{}{}
}
// Allow all workers to continue running from now on.
close(workStarted)
// Expect six machines to have been handled in parallel by three workers.
seenWorkers := make(map[int]bool)
seenMachines := make(map[string]bool)
for i := 0; i < 6; i++ {
res := <-workDone
seenWorkers[res.workerID] = true
seenMachines[res.machine.String()] = true
}
if want, got := 3, len(seenWorkers); want != got {
t.Errorf("Expected %d workers, got %d", want, got)
}
if want, got := 6, len(seenMachines); want != got {
t.Errorf("Expected %d machines, got %d", want, got)
}
}