blob: 9664c17ba31590627a1db9a769679b4800d13a6a [file] [log] [blame]
Serge Bazanski35e8d792022-10-11 11:32:30 +02001package bmdb
2
3import (
4 "context"
5 "errors"
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +01006 "fmt"
Serge Bazanski35e8d792022-10-11 11:32:30 +02007 "testing"
8 "time"
9
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +010010 "github.com/google/uuid"
11
Serge Bazanski35e8d792022-10-11 11:32:30 +020012 "source.monogon.dev/cloud/bmaas/bmdb/model"
13 "source.monogon.dev/cloud/lib/component"
14)
15
16func dut() *BMDB {
17 return &BMDB{
18 Config: Config{
19 Database: component.CockroachConfig{
20 InMemory: true,
21 },
22 },
23 }
24}
25
26// TestSessionExpiry exercises the session heartbeat logic, making sure that if
27// a session stops being maintained subsequent Transact calls will fail.
28func TestSessionExpiry(t *testing.T) {
29 b := dut()
30 conn, err := b.Open(true)
31 if err != nil {
32 t.Fatalf("Open failed: %v", err)
33 }
34
35 ctx, ctxC := context.WithCancel(context.Background())
36 defer ctxC()
37
38 session, err := conn.StartSession(ctx)
39 if err != nil {
40 t.Fatalf("Starting session failed: %v", err)
41 }
42
43 // A transaction in a brand-new session should work.
44 var machine model.Machine
45 err = session.Transact(ctx, func(q *model.Queries) error {
46 machine, err = q.NewMachine(ctx)
47 return err
48 })
49 if err != nil {
50 t.Fatalf("First transaction failed: %v", err)
51 }
52
53 time.Sleep(6 * time.Second)
54
55 // A transaction after the 5-second session interval should continue to work.
56 err = session.Transact(ctx, func(q *model.Queries) error {
57 _, err = q.NewMachine(ctx)
58 return err
59 })
60 if err != nil {
61 t.Fatalf("Second transaction failed: %v", err)
62 }
63
64 // A transaction after the 5-second session interval should fail if we don't
65 // maintain its heartbeat.
66 session.ctxC()
67 time.Sleep(6 * time.Second)
68
69 err = session.Transact(ctx, func(q *model.Queries) error {
70 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
71 MachineID: machine.MachineID,
72 Provider: "foo",
73 ProviderID: "bar",
74 })
75 })
76 if !errors.Is(err, ErrSessionExpired) {
77 t.Fatalf("Second transaction should've failed due to expired session, got %v", err)
78 }
79
80}
81
82// TestWork exercises the per-{process,machine} mutual exclusion mechanism of
83// Work items.
84func TestWork(t *testing.T) {
85 b := dut()
86 conn, err := b.Open(true)
87 if err != nil {
88 t.Fatalf("Open failed: %v", err)
89 }
90
91 ctx, ctxC := context.WithCancel(context.Background())
92 defer ctxC()
93
94 // Start two session for testing.
95 session1, err := conn.StartSession(ctx)
96 if err != nil {
97 t.Fatalf("Starting session failed: %v", err)
98 }
99 session2, err := conn.StartSession(ctx)
100 if err != nil {
101 t.Fatalf("Starting session failed: %v", err)
102 }
103
104 var machine model.Machine
105 err = session1.Transact(ctx, func(q *model.Queries) error {
106 machine, err = q.NewMachine(ctx)
107 return err
108 })
109 if err != nil {
110 t.Fatalf("Creating machine failed: %v", err)
111 }
112
113 // Create a subcontext for a long-term work request. We'll cancel it later as
114 // part of the test.
115 ctxB, ctxBC := context.WithCancel(ctx)
116 defer ctxBC()
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100117
118 constantRetriever := func(_ *model.Queries) ([]uuid.UUID, error) {
119 return []uuid.UUID{machine.MachineID}, nil
120 }
121
122 // Start work on machine which we're not gonna finish for a while.
123 work1, err := session1.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200124 if err != nil {
125 t.Fatalf("Starting first work failed: %v", err)
126 }
127
128 // Starting more work on the same machine but a different process should still
129 // be allowed.
130 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100131 work2, err := session.Work(ctxB, model.ProcessUnitTest2, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200132 if err != nil {
133 t.Errorf("Could not run concurrent process on machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100134 } else {
135 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200136 }
137 }
138
139 // However, starting work with the same process on the same machine should
140 // fail.
141 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100142 work2, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200143 if !errors.Is(err, ErrWorkConflict) {
144 t.Errorf("Concurrent work with same process should've been forbidden, got %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100145 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200146 }
147 }
148
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100149 // Now, finish the long-running work.
150 work1.Cancel(ctx)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200151
152 // We should now be able to perform 'test1' work again against this machine.
153 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100154 work1, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200155 if err != nil {
156 t.Errorf("Could not run work against machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100157 } else {
158 work1.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200159 }
160 }
161}
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100162
163// TestInstallationWorkflow exercises the agent installation workflow within the
164// BMDB.
165func TestInstallationWorkflow(t *testing.T) {
166 b := dut()
167 conn, err := b.Open(true)
168 if err != nil {
169 t.Fatalf("Open failed: %v", err)
170 }
171
172 ctx, ctxC := context.WithCancel(context.Background())
173 defer ctxC()
174
175 session, err := conn.StartSession(ctx)
176 if err != nil {
177 t.Fatalf("Starting session failed: %v", err)
178 }
179
180 // Create machine. Drop its ID.
181 err = session.Transact(ctx, func(q *model.Queries) error {
182 machine, err := q.NewMachine(ctx)
183 if err != nil {
184 return err
185 }
186 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
187 MachineID: machine.MachineID,
188 Provider: model.ProviderEquinix,
189 ProviderID: "123",
190 })
191 })
192 if err != nil {
193 t.Fatalf("Creating machine failed: %v", err)
194 }
195
196 // Start working on a machine.
197 startedC := make(chan struct{})
198 doneC := make(chan struct{})
199 errC := make(chan error)
200 go func() {
201 work, err := session.Work(ctx, model.ProcessShepherdInstall, func(q *model.Queries) ([]uuid.UUID, error) {
202 machines, err := q.GetMachinesForAgentStart(ctx, 1)
203 if err != nil {
204 return nil, err
205 }
206 if len(machines) < 1 {
207 return nil, ErrNothingToDo
208 }
209 return []uuid.UUID{machines[0].MachineID}, nil
210 })
211 defer work.Cancel(ctx)
212
213 if err != nil {
214 close(startedC)
215 errC <- err
216 return
217 }
218
219 // Simulate work by blocking on a channel.
220 close(startedC)
221 <-doneC
222
223 err = work.Finish(ctx, func(q *model.Queries) error {
224 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
225 MachineID: work.Machine,
226 AgentStartedAt: time.Now(),
227 AgentPublicKey: []byte("fakefakefake"),
228 })
229 })
230 errC <- err
231 }()
232 <-startedC
233 // Work on the machine has started. Attempting to get more machines now should
234 // return no machines.
235 err = session.Transact(ctx, func(q *model.Queries) error {
236 machines, err := q.GetMachinesForAgentStart(ctx, 1)
237 if err != nil {
238 return err
239 }
240 if len(machines) > 0 {
241 t.Errorf("Expected no machines ready for installation.")
242 }
243 return nil
244 })
245 if err != nil {
246 t.Errorf("Failed to retrieve machines for installation in parallel: %v", err)
247 }
248 // Finish working on machine.
249 close(doneC)
250 err = <-errC
251 if err != nil {
252 t.Errorf("Failed to finish work on machine: %v", err)
253 }
254 // That machine is now installed, so we still expect no work to have to be done.
255 err = session.Transact(ctx, func(q *model.Queries) error {
256 machines, err := q.GetMachinesForAgentStart(ctx, 1)
257 if err != nil {
258 return err
259 }
260 if len(machines) > 0 {
261 t.Errorf("Expected still no machines ready for installation.")
262 }
263 return nil
264 })
265 if err != nil {
266 t.Errorf("Failed to retrieve machines for installation after work finished: %v", err)
267 }
268}
269
270// TestInstallationWorkflowParallel starts work on three machines by six workers
271// and makes sure that there are no scheduling conflicts between them.
272func TestInstallationWorkflowParallel(t *testing.T) {
273 b := dut()
274 conn, err := b.Open(true)
275 if err != nil {
276 t.Fatalf("Open failed: %v", err)
277 }
278
279 ctx, ctxC := context.WithCancel(context.Background())
280 defer ctxC()
281
282 makeMachine := func(providerID string) {
283 ctxS, ctxC := context.WithCancel(ctx)
284 defer ctxC()
285 session, err := conn.StartSession(ctxS)
286 if err != nil {
287 t.Fatalf("Starting session failed: %v", err)
288 }
289 err = session.Transact(ctx, func(q *model.Queries) error {
290 machine, err := q.NewMachine(ctx)
291 if err != nil {
292 return err
293 }
294 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
295 MachineID: machine.MachineID,
296 Provider: model.ProviderEquinix,
297 ProviderID: providerID,
298 })
299 })
300 if err != nil {
301 t.Fatalf("Creating machine failed: %v", err)
302 }
303 }
304 // Make six machines for testing.
305 for i := 0; i < 6; i++ {
306 makeMachine(fmt.Sprintf("test%d", i))
307 }
308
309 workStarted := make(chan struct{})
310 workDone := make(chan struct {
311 machine uuid.UUID
312 workerID int
313 })
314
315 workOnce := func(ctx context.Context, workerID int, session *Session) error {
316 work, err := session.Work(ctx, model.ProcessShepherdInstall, func(q *model.Queries) ([]uuid.UUID, error) {
317 machines, err := q.GetMachinesForAgentStart(ctx, 1)
318 if err != nil {
319 return nil, err
320 }
321 if len(machines) < 1 {
322 return nil, ErrNothingToDo
323 }
324 return []uuid.UUID{machines[0].MachineID}, nil
325 })
326
327 if err != nil {
328 return err
329 }
330 defer work.Cancel(ctx)
331
332 select {
333 case <-workStarted:
334 case <-ctx.Done():
335 return ctx.Err()
336 }
337
338 select {
339 case workDone <- struct {
340 machine uuid.UUID
341 workerID int
342 }{
343 machine: work.Machine,
344 workerID: workerID,
345 }:
346 case <-ctx.Done():
347 return ctx.Err()
348 }
349
350 return work.Finish(ctx, func(q *model.Queries) error {
351 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
352 MachineID: work.Machine,
353 AgentStartedAt: time.Now(),
354 AgentPublicKey: []byte("fakefakefake"),
355 })
356 })
357 }
358
359 worker := func(workerID int) {
360 ctxS, ctxC := context.WithCancel(ctx)
361 defer ctxC()
362 session, err := conn.StartSession(ctxS)
363 if err != nil {
364 t.Fatalf("Starting session failed: %v", err)
365 }
366 for {
367 err := workOnce(ctxS, workerID, session)
368 if err != nil {
369 if errors.Is(err, ctxS.Err()) {
370 return
371 }
372 t.Fatalf("worker failed: %v", err)
373 }
374 }
375 }
376 // Start three workers.
377 for i := 0; i < 3; i++ {
378 go worker(i)
379 }
380
381 // Wait for at least three workers to be alive.
382 for i := 0; i < 3; i++ {
383 workStarted <- struct{}{}
384 }
385
386 // Allow all workers to continue running from now on.
387 close(workStarted)
388
389 // Expect six machines to have been handled in parallel by three workers.
390 seenWorkers := make(map[int]bool)
391 seenMachines := make(map[string]bool)
392 for i := 0; i < 6; i++ {
393 res := <-workDone
394 seenWorkers[res.workerID] = true
395 seenMachines[res.machine.String()] = true
396 }
397
398 if want, got := 3, len(seenWorkers); want != got {
399 t.Errorf("Expected %d workers, got %d", want, got)
400 }
401 if want, got := 6, len(seenMachines); want != got {
402 t.Errorf("Expected %d machines, got %d", want, got)
403 }
404}