cloud/bmaas/bmdb: init
This adds the initial Bare Metal Database structure. This change focuses
on a session/work mechanism which is the foundation on which we will
build worker components. It allows lease-like mechanics on machines,
letting us not have to use 'standard' work queues in the BMaaS project.
Change-Id: I42c3f4384c64fd90dbeab8ff9652a6f611be81d4
Reviewed-on: https://review.monogon.dev/c/monogon/+/953
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/cloud/bmaas/bmdb/sessions_test.go b/cloud/bmaas/bmdb/sessions_test.go
new file mode 100644
index 0000000..0018109
--- /dev/null
+++ b/cloud/bmaas/bmdb/sessions_test.go
@@ -0,0 +1,172 @@
+package bmdb
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/lib/component"
+)
+
+func dut() *BMDB {
+ return &BMDB{
+ Config: Config{
+ Database: component.CockroachConfig{
+ InMemory: true,
+ },
+ },
+ }
+}
+
+// TestSessionExpiry exercises the session heartbeat logic, making sure that if
+// a session stops being maintained subsequent Transact calls will fail.
+func TestSessionExpiry(t *testing.T) {
+ b := dut()
+ conn, err := b.Open(true)
+ if err != nil {
+ t.Fatalf("Open failed: %v", err)
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ session, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("Starting session failed: %v", err)
+ }
+
+ // A transaction in a brand-new session should work.
+ var machine model.Machine
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ machine, err = q.NewMachine(ctx)
+ return err
+ })
+ if err != nil {
+ t.Fatalf("First transaction failed: %v", err)
+ }
+
+ time.Sleep(6 * time.Second)
+
+ // A transaction after the 5-second session interval should continue to work.
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ _, err = q.NewMachine(ctx)
+ return err
+ })
+ if err != nil {
+ t.Fatalf("Second transaction failed: %v", err)
+ }
+
+ // A transaction after the 5-second session interval should fail if we don't
+ // maintain its heartbeat.
+ session.ctxC()
+ time.Sleep(6 * time.Second)
+
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: machine.MachineID,
+ Provider: "foo",
+ ProviderID: "bar",
+ })
+ })
+ if !errors.Is(err, ErrSessionExpired) {
+ t.Fatalf("Second transaction should've failed due to expired session, got %v", err)
+ }
+
+}
+
+// TestWork exercises the per-{process,machine} mutual exclusion mechanism of
+// Work items.
+func TestWork(t *testing.T) {
+ b := dut()
+ conn, err := b.Open(true)
+ if err != nil {
+ t.Fatalf("Open failed: %v", err)
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ // Start two session for testing.
+ session1, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("Starting session failed: %v", err)
+ }
+ session2, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("Starting session failed: %v", err)
+ }
+
+ var machine model.Machine
+ err = session1.Transact(ctx, func(q *model.Queries) error {
+ machine, err = q.NewMachine(ctx)
+ return err
+ })
+ if err != nil {
+ t.Fatalf("Creating machine failed: %v", err)
+ }
+
+ // Create a subcontext for a long-term work request. We'll cancel it later as
+ // part of the test.
+ ctxB, ctxBC := context.WithCancel(ctx)
+ defer ctxBC()
+ // Start work which will block forever. We have to go rendezvous through a
+ // channel to make sure the work actually starts.
+ started := make(chan error)
+ done := make(chan error, 1)
+ go func() {
+ err := session1.Work(ctxB, machine.MachineID, model.ProcessUnitTest1, func() error {
+ started <- nil
+ <-ctxB.Done()
+ return ctxB.Err()
+ })
+ done <- err
+ if err != nil {
+ started <- err
+ }
+ }()
+ err = <-started
+ if err != nil {
+ t.Fatalf("Starting first work failed: %v", err)
+ }
+
+ // Starting more work on the same machine but a different process should still
+ // be allowed.
+ for _, session := range []*Session{session1, session2} {
+ err = session.Work(ctxB, machine.MachineID, model.ProcessUnitTest2, func() error {
+ return nil
+ })
+ if err != nil {
+ t.Errorf("Could not run concurrent process on machine: %v", err)
+ }
+ }
+
+ // However, starting work with the same process on the same machine should
+ // fail.
+ for _, session := range []*Session{session1, session2} {
+ err = session.Work(ctxB, machine.MachineID, model.ProcessUnitTest1, func() error {
+ return nil
+ })
+ if !errors.Is(err, ErrWorkConflict) {
+ t.Errorf("Concurrent work with same process should've been forbidden, got %v", err)
+ }
+ }
+
+ // Now, cancel the first long-running request and wait for it to return.
+ ctxBC()
+ err = <-done
+ if !errors.Is(err, ctxB.Err()) {
+ t.Fatalf("First work item should've failed with %v, got %v", ctxB.Err(), err)
+ }
+
+ // We should now be able to perform 'test1' work again against this machine.
+ for _, session := range []*Session{session1, session2} {
+ err = session.Work(ctx, machine.MachineID, model.ProcessUnitTest1, func() error {
+ return nil
+ })
+ if err != nil {
+ t.Errorf("Could not run work against machine: %v", err)
+ }
+ }
+}