Serge Bazanski | 35e8d79 | 2022-10-11 11:32:30 +0200 | [diff] [blame^] | 1 | package bmdb |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "errors" |
| 6 | "testing" |
| 7 | "time" |
| 8 | |
| 9 | "source.monogon.dev/cloud/bmaas/bmdb/model" |
| 10 | "source.monogon.dev/cloud/lib/component" |
| 11 | ) |
| 12 | |
| 13 | func dut() *BMDB { |
| 14 | return &BMDB{ |
| 15 | Config: Config{ |
| 16 | Database: component.CockroachConfig{ |
| 17 | InMemory: true, |
| 18 | }, |
| 19 | }, |
| 20 | } |
| 21 | } |
| 22 | |
| 23 | // TestSessionExpiry exercises the session heartbeat logic, making sure that if |
| 24 | // a session stops being maintained subsequent Transact calls will fail. |
| 25 | func TestSessionExpiry(t *testing.T) { |
| 26 | b := dut() |
| 27 | conn, err := b.Open(true) |
| 28 | if err != nil { |
| 29 | t.Fatalf("Open failed: %v", err) |
| 30 | } |
| 31 | |
| 32 | ctx, ctxC := context.WithCancel(context.Background()) |
| 33 | defer ctxC() |
| 34 | |
| 35 | session, err := conn.StartSession(ctx) |
| 36 | if err != nil { |
| 37 | t.Fatalf("Starting session failed: %v", err) |
| 38 | } |
| 39 | |
| 40 | // A transaction in a brand-new session should work. |
| 41 | var machine model.Machine |
| 42 | err = session.Transact(ctx, func(q *model.Queries) error { |
| 43 | machine, err = q.NewMachine(ctx) |
| 44 | return err |
| 45 | }) |
| 46 | if err != nil { |
| 47 | t.Fatalf("First transaction failed: %v", err) |
| 48 | } |
| 49 | |
| 50 | time.Sleep(6 * time.Second) |
| 51 | |
| 52 | // A transaction after the 5-second session interval should continue to work. |
| 53 | err = session.Transact(ctx, func(q *model.Queries) error { |
| 54 | _, err = q.NewMachine(ctx) |
| 55 | return err |
| 56 | }) |
| 57 | if err != nil { |
| 58 | t.Fatalf("Second transaction failed: %v", err) |
| 59 | } |
| 60 | |
| 61 | // A transaction after the 5-second session interval should fail if we don't |
| 62 | // maintain its heartbeat. |
| 63 | session.ctxC() |
| 64 | time.Sleep(6 * time.Second) |
| 65 | |
| 66 | err = session.Transact(ctx, func(q *model.Queries) error { |
| 67 | return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{ |
| 68 | MachineID: machine.MachineID, |
| 69 | Provider: "foo", |
| 70 | ProviderID: "bar", |
| 71 | }) |
| 72 | }) |
| 73 | if !errors.Is(err, ErrSessionExpired) { |
| 74 | t.Fatalf("Second transaction should've failed due to expired session, got %v", err) |
| 75 | } |
| 76 | |
| 77 | } |
| 78 | |
| 79 | // TestWork exercises the per-{process,machine} mutual exclusion mechanism of |
| 80 | // Work items. |
| 81 | func TestWork(t *testing.T) { |
| 82 | b := dut() |
| 83 | conn, err := b.Open(true) |
| 84 | if err != nil { |
| 85 | t.Fatalf("Open failed: %v", err) |
| 86 | } |
| 87 | |
| 88 | ctx, ctxC := context.WithCancel(context.Background()) |
| 89 | defer ctxC() |
| 90 | |
| 91 | // Start two session for testing. |
| 92 | session1, err := conn.StartSession(ctx) |
| 93 | if err != nil { |
| 94 | t.Fatalf("Starting session failed: %v", err) |
| 95 | } |
| 96 | session2, err := conn.StartSession(ctx) |
| 97 | if err != nil { |
| 98 | t.Fatalf("Starting session failed: %v", err) |
| 99 | } |
| 100 | |
| 101 | var machine model.Machine |
| 102 | err = session1.Transact(ctx, func(q *model.Queries) error { |
| 103 | machine, err = q.NewMachine(ctx) |
| 104 | return err |
| 105 | }) |
| 106 | if err != nil { |
| 107 | t.Fatalf("Creating machine failed: %v", err) |
| 108 | } |
| 109 | |
| 110 | // Create a subcontext for a long-term work request. We'll cancel it later as |
| 111 | // part of the test. |
| 112 | ctxB, ctxBC := context.WithCancel(ctx) |
| 113 | defer ctxBC() |
| 114 | // Start work which will block forever. We have to go rendezvous through a |
| 115 | // channel to make sure the work actually starts. |
| 116 | started := make(chan error) |
| 117 | done := make(chan error, 1) |
| 118 | go func() { |
| 119 | err := session1.Work(ctxB, machine.MachineID, model.ProcessUnitTest1, func() error { |
| 120 | started <- nil |
| 121 | <-ctxB.Done() |
| 122 | return ctxB.Err() |
| 123 | }) |
| 124 | done <- err |
| 125 | if err != nil { |
| 126 | started <- err |
| 127 | } |
| 128 | }() |
| 129 | err = <-started |
| 130 | if err != nil { |
| 131 | t.Fatalf("Starting first work failed: %v", err) |
| 132 | } |
| 133 | |
| 134 | // Starting more work on the same machine but a different process should still |
| 135 | // be allowed. |
| 136 | for _, session := range []*Session{session1, session2} { |
| 137 | err = session.Work(ctxB, machine.MachineID, model.ProcessUnitTest2, func() error { |
| 138 | return nil |
| 139 | }) |
| 140 | if err != nil { |
| 141 | t.Errorf("Could not run concurrent process on machine: %v", err) |
| 142 | } |
| 143 | } |
| 144 | |
| 145 | // However, starting work with the same process on the same machine should |
| 146 | // fail. |
| 147 | for _, session := range []*Session{session1, session2} { |
| 148 | err = session.Work(ctxB, machine.MachineID, model.ProcessUnitTest1, func() error { |
| 149 | return nil |
| 150 | }) |
| 151 | if !errors.Is(err, ErrWorkConflict) { |
| 152 | t.Errorf("Concurrent work with same process should've been forbidden, got %v", err) |
| 153 | } |
| 154 | } |
| 155 | |
| 156 | // Now, cancel the first long-running request and wait for it to return. |
| 157 | ctxBC() |
| 158 | err = <-done |
| 159 | if !errors.Is(err, ctxB.Err()) { |
| 160 | t.Fatalf("First work item should've failed with %v, got %v", ctxB.Err(), err) |
| 161 | } |
| 162 | |
| 163 | // We should now be able to perform 'test1' work again against this machine. |
| 164 | for _, session := range []*Session{session1, session2} { |
| 165 | err = session.Work(ctx, machine.MachineID, model.ProcessUnitTest1, func() error { |
| 166 | return nil |
| 167 | }) |
| 168 | if err != nil { |
| 169 | t.Errorf("Could not run work against machine: %v", err) |
| 170 | } |
| 171 | } |
| 172 | } |