blob: 00181098dc582713a8d8073758a6091314173ffb [file] [log] [blame]
package bmdb
import (
"context"
"errors"
"testing"
"time"
"source.monogon.dev/cloud/bmaas/bmdb/model"
"source.monogon.dev/cloud/lib/component"
)
func dut() *BMDB {
return &BMDB{
Config: Config{
Database: component.CockroachConfig{
InMemory: true,
},
},
}
}
// TestSessionExpiry exercises the session heartbeat logic, making sure that if
// a session stops being maintained subsequent Transact calls will fail.
func TestSessionExpiry(t *testing.T) {
b := dut()
conn, err := b.Open(true)
if err != nil {
t.Fatalf("Open failed: %v", err)
}
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
session, err := conn.StartSession(ctx)
if err != nil {
t.Fatalf("Starting session failed: %v", err)
}
// A transaction in a brand-new session should work.
var machine model.Machine
err = session.Transact(ctx, func(q *model.Queries) error {
machine, err = q.NewMachine(ctx)
return err
})
if err != nil {
t.Fatalf("First transaction failed: %v", err)
}
time.Sleep(6 * time.Second)
// A transaction after the 5-second session interval should continue to work.
err = session.Transact(ctx, func(q *model.Queries) error {
_, err = q.NewMachine(ctx)
return err
})
if err != nil {
t.Fatalf("Second transaction failed: %v", err)
}
// A transaction after the 5-second session interval should fail if we don't
// maintain its heartbeat.
session.ctxC()
time.Sleep(6 * time.Second)
err = session.Transact(ctx, func(q *model.Queries) error {
return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
MachineID: machine.MachineID,
Provider: "foo",
ProviderID: "bar",
})
})
if !errors.Is(err, ErrSessionExpired) {
t.Fatalf("Second transaction should've failed due to expired session, got %v", err)
}
}
// TestWork exercises the per-{process,machine} mutual exclusion mechanism of
// Work items.
func TestWork(t *testing.T) {
b := dut()
conn, err := b.Open(true)
if err != nil {
t.Fatalf("Open failed: %v", err)
}
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
// Start two session for testing.
session1, err := conn.StartSession(ctx)
if err != nil {
t.Fatalf("Starting session failed: %v", err)
}
session2, err := conn.StartSession(ctx)
if err != nil {
t.Fatalf("Starting session failed: %v", err)
}
var machine model.Machine
err = session1.Transact(ctx, func(q *model.Queries) error {
machine, err = q.NewMachine(ctx)
return err
})
if err != nil {
t.Fatalf("Creating machine failed: %v", err)
}
// Create a subcontext for a long-term work request. We'll cancel it later as
// part of the test.
ctxB, ctxBC := context.WithCancel(ctx)
defer ctxBC()
// Start work which will block forever. We have to go rendezvous through a
// channel to make sure the work actually starts.
started := make(chan error)
done := make(chan error, 1)
go func() {
err := session1.Work(ctxB, machine.MachineID, model.ProcessUnitTest1, func() error {
started <- nil
<-ctxB.Done()
return ctxB.Err()
})
done <- err
if err != nil {
started <- err
}
}()
err = <-started
if err != nil {
t.Fatalf("Starting first work failed: %v", err)
}
// Starting more work on the same machine but a different process should still
// be allowed.
for _, session := range []*Session{session1, session2} {
err = session.Work(ctxB, machine.MachineID, model.ProcessUnitTest2, func() error {
return nil
})
if err != nil {
t.Errorf("Could not run concurrent process on machine: %v", err)
}
}
// However, starting work with the same process on the same machine should
// fail.
for _, session := range []*Session{session1, session2} {
err = session.Work(ctxB, machine.MachineID, model.ProcessUnitTest1, func() error {
return nil
})
if !errors.Is(err, ErrWorkConflict) {
t.Errorf("Concurrent work with same process should've been forbidden, got %v", err)
}
}
// Now, cancel the first long-running request and wait for it to return.
ctxBC()
err = <-done
if !errors.Is(err, ctxB.Err()) {
t.Fatalf("First work item should've failed with %v, got %v", ctxB.Err(), err)
}
// We should now be able to perform 'test1' work again against this machine.
for _, session := range []*Session{session1, session2} {
err = session.Work(ctx, machine.MachineID, model.ProcessUnitTest1, func() error {
return nil
})
if err != nil {
t.Errorf("Could not run work against machine: %v", err)
}
}
}