blob: 00181098dc582713a8d8073758a6091314173ffb [file] [log] [blame]
Serge Bazanski35e8d792022-10-11 11:32:30 +02001package bmdb
2
3import (
4 "context"
5 "errors"
6 "testing"
7 "time"
8
9 "source.monogon.dev/cloud/bmaas/bmdb/model"
10 "source.monogon.dev/cloud/lib/component"
11)
12
13func dut() *BMDB {
14 return &BMDB{
15 Config: Config{
16 Database: component.CockroachConfig{
17 InMemory: true,
18 },
19 },
20 }
21}
22
23// TestSessionExpiry exercises the session heartbeat logic, making sure that if
24// a session stops being maintained subsequent Transact calls will fail.
25func TestSessionExpiry(t *testing.T) {
26 b := dut()
27 conn, err := b.Open(true)
28 if err != nil {
29 t.Fatalf("Open failed: %v", err)
30 }
31
32 ctx, ctxC := context.WithCancel(context.Background())
33 defer ctxC()
34
35 session, err := conn.StartSession(ctx)
36 if err != nil {
37 t.Fatalf("Starting session failed: %v", err)
38 }
39
40 // A transaction in a brand-new session should work.
41 var machine model.Machine
42 err = session.Transact(ctx, func(q *model.Queries) error {
43 machine, err = q.NewMachine(ctx)
44 return err
45 })
46 if err != nil {
47 t.Fatalf("First transaction failed: %v", err)
48 }
49
50 time.Sleep(6 * time.Second)
51
52 // A transaction after the 5-second session interval should continue to work.
53 err = session.Transact(ctx, func(q *model.Queries) error {
54 _, err = q.NewMachine(ctx)
55 return err
56 })
57 if err != nil {
58 t.Fatalf("Second transaction failed: %v", err)
59 }
60
61 // A transaction after the 5-second session interval should fail if we don't
62 // maintain its heartbeat.
63 session.ctxC()
64 time.Sleep(6 * time.Second)
65
66 err = session.Transact(ctx, func(q *model.Queries) error {
67 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
68 MachineID: machine.MachineID,
69 Provider: "foo",
70 ProviderID: "bar",
71 })
72 })
73 if !errors.Is(err, ErrSessionExpired) {
74 t.Fatalf("Second transaction should've failed due to expired session, got %v", err)
75 }
76
77}
78
79// TestWork exercises the per-{process,machine} mutual exclusion mechanism of
80// Work items.
81func TestWork(t *testing.T) {
82 b := dut()
83 conn, err := b.Open(true)
84 if err != nil {
85 t.Fatalf("Open failed: %v", err)
86 }
87
88 ctx, ctxC := context.WithCancel(context.Background())
89 defer ctxC()
90
91 // Start two session for testing.
92 session1, err := conn.StartSession(ctx)
93 if err != nil {
94 t.Fatalf("Starting session failed: %v", err)
95 }
96 session2, err := conn.StartSession(ctx)
97 if err != nil {
98 t.Fatalf("Starting session failed: %v", err)
99 }
100
101 var machine model.Machine
102 err = session1.Transact(ctx, func(q *model.Queries) error {
103 machine, err = q.NewMachine(ctx)
104 return err
105 })
106 if err != nil {
107 t.Fatalf("Creating machine failed: %v", err)
108 }
109
110 // Create a subcontext for a long-term work request. We'll cancel it later as
111 // part of the test.
112 ctxB, ctxBC := context.WithCancel(ctx)
113 defer ctxBC()
114 // Start work which will block forever. We have to go rendezvous through a
115 // channel to make sure the work actually starts.
116 started := make(chan error)
117 done := make(chan error, 1)
118 go func() {
119 err := session1.Work(ctxB, machine.MachineID, model.ProcessUnitTest1, func() error {
120 started <- nil
121 <-ctxB.Done()
122 return ctxB.Err()
123 })
124 done <- err
125 if err != nil {
126 started <- err
127 }
128 }()
129 err = <-started
130 if err != nil {
131 t.Fatalf("Starting first work failed: %v", err)
132 }
133
134 // Starting more work on the same machine but a different process should still
135 // be allowed.
136 for _, session := range []*Session{session1, session2} {
137 err = session.Work(ctxB, machine.MachineID, model.ProcessUnitTest2, func() error {
138 return nil
139 })
140 if err != nil {
141 t.Errorf("Could not run concurrent process on machine: %v", err)
142 }
143 }
144
145 // However, starting work with the same process on the same machine should
146 // fail.
147 for _, session := range []*Session{session1, session2} {
148 err = session.Work(ctxB, machine.MachineID, model.ProcessUnitTest1, func() error {
149 return nil
150 })
151 if !errors.Is(err, ErrWorkConflict) {
152 t.Errorf("Concurrent work with same process should've been forbidden, got %v", err)
153 }
154 }
155
156 // Now, cancel the first long-running request and wait for it to return.
157 ctxBC()
158 err = <-done
159 if !errors.Is(err, ctxB.Err()) {
160 t.Fatalf("First work item should've failed with %v, got %v", ctxB.Err(), err)
161 }
162
163 // We should now be able to perform 'test1' work again against this machine.
164 for _, session := range []*Session{session1, session2} {
165 err = session.Work(ctx, machine.MachineID, model.ProcessUnitTest1, func() error {
166 return nil
167 })
168 if err != nil {
169 t.Errorf("Could not run work against machine: %v", err)
170 }
171 }
172}