blob: f3977d576f952a2df5a4e5dcc5c214fa8fcc04f3 [file] [log] [blame]
Serge Bazanski35e8d792022-10-11 11:32:30 +02001package bmdb
2
3import (
4 "context"
5 "errors"
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +01006 "fmt"
Serge Bazanski35e8d792022-10-11 11:32:30 +02007 "testing"
8 "time"
9
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +010010 "github.com/google/uuid"
11
Serge Bazanski35e8d792022-10-11 11:32:30 +020012 "source.monogon.dev/cloud/bmaas/bmdb/model"
13 "source.monogon.dev/cloud/lib/component"
14)
15
16func dut() *BMDB {
17 return &BMDB{
18 Config: Config{
19 Database: component.CockroachConfig{
20 InMemory: true,
21 },
22 },
23 }
24}
25
26// TestSessionExpiry exercises the session heartbeat logic, making sure that if
27// a session stops being maintained subsequent Transact calls will fail.
28func TestSessionExpiry(t *testing.T) {
29 b := dut()
30 conn, err := b.Open(true)
31 if err != nil {
32 t.Fatalf("Open failed: %v", err)
33 }
34
35 ctx, ctxC := context.WithCancel(context.Background())
36 defer ctxC()
37
38 session, err := conn.StartSession(ctx)
39 if err != nil {
40 t.Fatalf("Starting session failed: %v", err)
41 }
42
43 // A transaction in a brand-new session should work.
44 var machine model.Machine
45 err = session.Transact(ctx, func(q *model.Queries) error {
46 machine, err = q.NewMachine(ctx)
47 return err
48 })
49 if err != nil {
50 t.Fatalf("First transaction failed: %v", err)
51 }
52
53 time.Sleep(6 * time.Second)
54
55 // A transaction after the 5-second session interval should continue to work.
56 err = session.Transact(ctx, func(q *model.Queries) error {
57 _, err = q.NewMachine(ctx)
58 return err
59 })
60 if err != nil {
61 t.Fatalf("Second transaction failed: %v", err)
62 }
63
64 // A transaction after the 5-second session interval should fail if we don't
65 // maintain its heartbeat.
66 session.ctxC()
67 time.Sleep(6 * time.Second)
68
69 err = session.Transact(ctx, func(q *model.Queries) error {
70 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
71 MachineID: machine.MachineID,
72 Provider: "foo",
73 ProviderID: "bar",
74 })
75 })
76 if !errors.Is(err, ErrSessionExpired) {
77 t.Fatalf("Second transaction should've failed due to expired session, got %v", err)
78 }
79
80}
81
82// TestWork exercises the per-{process,machine} mutual exclusion mechanism of
83// Work items.
84func TestWork(t *testing.T) {
85 b := dut()
86 conn, err := b.Open(true)
87 if err != nil {
88 t.Fatalf("Open failed: %v", err)
89 }
90
91 ctx, ctxC := context.WithCancel(context.Background())
92 defer ctxC()
93
94 // Start two session for testing.
95 session1, err := conn.StartSession(ctx)
96 if err != nil {
97 t.Fatalf("Starting session failed: %v", err)
98 }
99 session2, err := conn.StartSession(ctx)
100 if err != nil {
101 t.Fatalf("Starting session failed: %v", err)
102 }
103
104 var machine model.Machine
105 err = session1.Transact(ctx, func(q *model.Queries) error {
106 machine, err = q.NewMachine(ctx)
107 return err
108 })
109 if err != nil {
110 t.Fatalf("Creating machine failed: %v", err)
111 }
112
113 // Create a subcontext for a long-term work request. We'll cancel it later as
114 // part of the test.
115 ctxB, ctxBC := context.WithCancel(ctx)
116 defer ctxBC()
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100117
118 constantRetriever := func(_ *model.Queries) ([]uuid.UUID, error) {
119 return []uuid.UUID{machine.MachineID}, nil
120 }
121
122 // Start work on machine which we're not gonna finish for a while.
123 work1, err := session1.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200124 if err != nil {
125 t.Fatalf("Starting first work failed: %v", err)
126 }
127
128 // Starting more work on the same machine but a different process should still
129 // be allowed.
130 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100131 work2, err := session.Work(ctxB, model.ProcessUnitTest2, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200132 if err != nil {
133 t.Errorf("Could not run concurrent process on machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100134 } else {
135 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200136 }
137 }
138
139 // However, starting work with the same process on the same machine should
140 // fail.
141 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100142 work2, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200143 if !errors.Is(err, ErrWorkConflict) {
144 t.Errorf("Concurrent work with same process should've been forbidden, got %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100145 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200146 }
147 }
148
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100149 // Now, finish the long-running work.
150 work1.Cancel(ctx)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200151
152 // We should now be able to perform 'test1' work again against this machine.
153 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100154 work1, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200155 if err != nil {
156 t.Errorf("Could not run work against machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100157 } else {
158 work1.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200159 }
160 }
161}
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100162
Serge Bazanskia9580a72023-01-12 14:44:35 +0100163func TestWorkBackoff(t *testing.T) {
164 b := dut()
165 conn, err := b.Open(true)
166 if err != nil {
167 t.Fatalf("Open failed: %v", err)
168 }
169
170 ctx, ctxC := context.WithCancel(context.Background())
171 defer ctxC()
172
173 session, err := conn.StartSession(ctx)
174 if err != nil {
175 t.Fatalf("Starting session failed: %v", err)
176 }
177
178 var machine model.Machine
179 // Create machine.
180 err = session.Transact(ctx, func(q *model.Queries) error {
181 machine, err = q.NewMachine(ctx)
182 if err != nil {
183 return err
184 }
185 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
186 MachineID: machine.MachineID,
187 Provider: model.ProviderEquinix,
188 ProviderID: "123",
189 })
190 })
191 if err != nil {
192 t.Fatalf("Creating machine failed: %v", err)
193 }
194
195 // Work on machine, but fail it with a backoff.
Serge Bazanski10383132023-02-20 15:39:45 +0100196 work, err := session.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100197 machines, err := q.GetMachinesForAgentStart(ctx, 1)
198 if err != nil {
199 return nil, err
200 }
201 if len(machines) < 1 {
202 return nil, ErrNothingToDo
203 }
204 return []uuid.UUID{machines[0].MachineID}, nil
205 })
206 if err != nil {
207 t.Fatalf("Starting work failed: %v", err)
208 }
209 backoff := time.Hour
210 if err := work.Fail(ctx, &backoff, "test"); err != nil {
211 t.Fatalf("Failing work failed: %v", err)
212 }
213
214 // The machine shouldn't be returned now.
215 err = session.Transact(ctx, func(q *model.Queries) error {
216 machines, err := q.GetMachinesForAgentStart(ctx, 1)
217 if err != nil {
218 return err
219 }
220 if len(machines) > 0 {
Serge Bazanski10383132023-02-20 15:39:45 +0100221 t.Errorf("Expected no machines ready for agent start.")
Serge Bazanskia9580a72023-01-12 14:44:35 +0100222 }
223 return nil
224 })
225 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100226 t.Errorf("Failed to retrieve machines for agent start: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100227 }
228
229 // Instead of waiting for the backoff to expire, set it again, but this time
230 // make it immediate. This works because the backoff query acts as an upsert.
231 err = session.Transact(ctx, func(q *model.Queries) error {
232 return q.WorkBackoffInsert(ctx, model.WorkBackoffInsertParams{
233 MachineID: machine.MachineID,
Serge Bazanski10383132023-02-20 15:39:45 +0100234 Process: model.ProcessShepherdAccess,
Serge Bazanskia9580a72023-01-12 14:44:35 +0100235 Seconds: 0,
236 })
237 })
238 if err != nil {
239 t.Errorf("Failed to update backoff: %v", err)
240 }
241
242 // Just in case.
243 time.Sleep(100 * time.Millisecond)
244
245 // The machine should now be returned again.
246 err = session.Transact(ctx, func(q *model.Queries) error {
247 machines, err := q.GetMachinesForAgentStart(ctx, 1)
248 if err != nil {
249 return err
250 }
251 if len(machines) != 1 {
Serge Bazanski10383132023-02-20 15:39:45 +0100252 t.Errorf("Expected exactly one machine ready for agent start.")
Serge Bazanskia9580a72023-01-12 14:44:35 +0100253 }
254 return nil
255 })
256 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100257 t.Errorf("Failed to retrieve machines for agent start: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100258 }
259}
260
Serge Bazanski10383132023-02-20 15:39:45 +0100261// TestAgentStartWorkflow exercises the agent start workflow within the BMDB.
262func TestAgentStartWorkflow(t *testing.T) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100263 b := dut()
264 conn, err := b.Open(true)
265 if err != nil {
266 t.Fatalf("Open failed: %v", err)
267 }
268
269 ctx, ctxC := context.WithCancel(context.Background())
270 defer ctxC()
271
272 session, err := conn.StartSession(ctx)
273 if err != nil {
274 t.Fatalf("Starting session failed: %v", err)
275 }
276
277 // Create machine. Drop its ID.
278 err = session.Transact(ctx, func(q *model.Queries) error {
279 machine, err := q.NewMachine(ctx)
280 if err != nil {
281 return err
282 }
283 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
284 MachineID: machine.MachineID,
285 Provider: model.ProviderEquinix,
286 ProviderID: "123",
287 })
288 })
289 if err != nil {
290 t.Fatalf("Creating machine failed: %v", err)
291 }
292
293 // Start working on a machine.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100294 var machine uuid.UUID
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100295 startedC := make(chan struct{})
296 doneC := make(chan struct{})
297 errC := make(chan error)
298 go func() {
Serge Bazanski10383132023-02-20 15:39:45 +0100299 work, err := session.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100300 machines, err := q.GetMachinesForAgentStart(ctx, 1)
301 if err != nil {
302 return nil, err
303 }
304 if len(machines) < 1 {
305 return nil, ErrNothingToDo
306 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100307 machine = machines[0].MachineID
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100308 return []uuid.UUID{machines[0].MachineID}, nil
309 })
310 defer work.Cancel(ctx)
311
312 if err != nil {
313 close(startedC)
314 errC <- err
315 return
316 }
317
318 // Simulate work by blocking on a channel.
319 close(startedC)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100320
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100321 <-doneC
322
323 err = work.Finish(ctx, func(q *model.Queries) error {
324 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
325 MachineID: work.Machine,
326 AgentStartedAt: time.Now(),
327 AgentPublicKey: []byte("fakefakefake"),
328 })
329 })
330 errC <- err
331 }()
332 <-startedC
333 // Work on the machine has started. Attempting to get more machines now should
334 // return no machines.
335 err = session.Transact(ctx, func(q *model.Queries) error {
336 machines, err := q.GetMachinesForAgentStart(ctx, 1)
337 if err != nil {
338 return err
339 }
340 if len(machines) > 0 {
Serge Bazanski10383132023-02-20 15:39:45 +0100341 t.Errorf("Expected no machines ready for agent start.")
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100342 }
343 return nil
344 })
345 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100346 t.Fatalf("Failed to retrieve machines for start in parallel: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100347 }
348 // Finish working on machine.
349 close(doneC)
350 err = <-errC
351 if err != nil {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100352 t.Fatalf("Failed to finish work on machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100353 }
Serge Bazanski10383132023-02-20 15:39:45 +0100354 // That machine has its agent started, so we still expect no work to have to be
355 // done.
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100356 err = session.Transact(ctx, func(q *model.Queries) error {
357 machines, err := q.GetMachinesForAgentStart(ctx, 1)
358 if err != nil {
359 return err
360 }
361 if len(machines) > 0 {
Serge Bazanski10383132023-02-20 15:39:45 +0100362 t.Errorf("Expected still no machines ready for agent start.")
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100363 }
364 return nil
365 })
366 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100367 t.Fatalf("Failed to retrieve machines for agent start after work finished: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100368 }
369
370 // Check history has been recorded.
371 var history []model.WorkHistory
372 err = session.Transact(ctx, func(q *model.Queries) error {
373 history, err = q.ListHistoryOf(ctx, machine)
374 return err
375 })
376 if err != nil {
377 t.Fatalf("Failed to retrieve machine history: %v", err)
378 }
379 // Expect two history items: started and finished.
380 if want, got := 2, len(history); want != got {
381 t.Errorf("Wanted %d history items, got %d", want, got)
382 } else {
383 if want, got := model.WorkHistoryEventStarted, history[0].Event; want != got {
384 t.Errorf("Wanted first history event to be %s, got %s", want, got)
385 }
386 if want, got := model.WorkHistoryEventFinished, history[1].Event; want != got {
387 t.Errorf("Wanted second history event to be %s, got %s", want, got)
388 }
389 }
390 // Check all other history event data.
391 for i, el := range history {
392 if want, got := machine, el.MachineID; want.String() != got.String() {
393 t.Errorf("Wanted %d history event machine ID to be %s, got %s", i, want, got)
394 }
Serge Bazanski10383132023-02-20 15:39:45 +0100395 if want, got := model.ProcessShepherdAccess, el.Process; want != got {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100396 t.Errorf("Wanted %d history event process to be %s, got %s", i, want, got)
397 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100398 }
399}
400
Serge Bazanski10383132023-02-20 15:39:45 +0100401// TestAgentStartWorkflowParallel starts work on three machines by six workers
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100402// and makes sure that there are no scheduling conflicts between them.
Serge Bazanski10383132023-02-20 15:39:45 +0100403func TestAgentStartWorkflowParallel(t *testing.T) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100404 b := dut()
405 conn, err := b.Open(true)
406 if err != nil {
407 t.Fatalf("Open failed: %v", err)
408 }
409
410 ctx, ctxC := context.WithCancel(context.Background())
411 defer ctxC()
412
413 makeMachine := func(providerID string) {
414 ctxS, ctxC := context.WithCancel(ctx)
415 defer ctxC()
416 session, err := conn.StartSession(ctxS)
417 if err != nil {
418 t.Fatalf("Starting session failed: %v", err)
419 }
420 err = session.Transact(ctx, func(q *model.Queries) error {
421 machine, err := q.NewMachine(ctx)
422 if err != nil {
423 return err
424 }
425 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
426 MachineID: machine.MachineID,
427 Provider: model.ProviderEquinix,
428 ProviderID: providerID,
429 })
430 })
431 if err != nil {
432 t.Fatalf("Creating machine failed: %v", err)
433 }
434 }
435 // Make six machines for testing.
436 for i := 0; i < 6; i++ {
437 makeMachine(fmt.Sprintf("test%d", i))
438 }
439
440 workStarted := make(chan struct{})
441 workDone := make(chan struct {
442 machine uuid.UUID
443 workerID int
444 })
445
446 workOnce := func(ctx context.Context, workerID int, session *Session) error {
Serge Bazanski10383132023-02-20 15:39:45 +0100447 work, err := session.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100448 machines, err := q.GetMachinesForAgentStart(ctx, 1)
449 if err != nil {
450 return nil, err
451 }
452 if len(machines) < 1 {
453 return nil, ErrNothingToDo
454 }
455 return []uuid.UUID{machines[0].MachineID}, nil
456 })
457
458 if err != nil {
459 return err
460 }
461 defer work.Cancel(ctx)
462
463 select {
464 case <-workStarted:
465 case <-ctx.Done():
466 return ctx.Err()
467 }
468
469 select {
470 case workDone <- struct {
471 machine uuid.UUID
472 workerID int
473 }{
474 machine: work.Machine,
475 workerID: workerID,
476 }:
477 case <-ctx.Done():
478 return ctx.Err()
479 }
480
481 return work.Finish(ctx, func(q *model.Queries) error {
482 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
483 MachineID: work.Machine,
484 AgentStartedAt: time.Now(),
485 AgentPublicKey: []byte("fakefakefake"),
486 })
487 })
488 }
489
490 worker := func(workerID int) {
491 ctxS, ctxC := context.WithCancel(ctx)
492 defer ctxC()
493 session, err := conn.StartSession(ctxS)
494 if err != nil {
495 t.Fatalf("Starting session failed: %v", err)
496 }
497 for {
498 err := workOnce(ctxS, workerID, session)
499 if err != nil {
Serge Bazanskice4af2b2023-03-16 21:23:39 +0100500 if errors.Is(err, ErrNothingToDo) {
501 continue
502 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100503 if errors.Is(err, ctxS.Err()) {
504 return
505 }
506 t.Fatalf("worker failed: %v", err)
507 }
508 }
509 }
510 // Start three workers.
511 for i := 0; i < 3; i++ {
512 go worker(i)
513 }
514
515 // Wait for at least three workers to be alive.
516 for i := 0; i < 3; i++ {
517 workStarted <- struct{}{}
518 }
519
520 // Allow all workers to continue running from now on.
521 close(workStarted)
522
523 // Expect six machines to have been handled in parallel by three workers.
524 seenWorkers := make(map[int]bool)
525 seenMachines := make(map[string]bool)
526 for i := 0; i < 6; i++ {
527 res := <-workDone
528 seenWorkers[res.workerID] = true
529 seenMachines[res.machine.String()] = true
530 }
531
532 if want, got := 3, len(seenWorkers); want != got {
533 t.Errorf("Expected %d workers, got %d", want, got)
534 }
535 if want, got := 6, len(seenMachines); want != got {
536 t.Errorf("Expected %d machines, got %d", want, got)
537 }
538}