blob: e697150681e92f0cdf884e2dd6984676c3718156 [file] [log] [blame]
Serge Bazanski35e8d792022-10-11 11:32:30 +02001package bmdb
2
3import (
4 "context"
5 "errors"
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +01006 "fmt"
Serge Bazanski35e8d792022-10-11 11:32:30 +02007 "testing"
8 "time"
9
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +010010 "github.com/google/uuid"
11
Serge Bazanski35e8d792022-10-11 11:32:30 +020012 "source.monogon.dev/cloud/bmaas/bmdb/model"
13 "source.monogon.dev/cloud/lib/component"
14)
15
16func dut() *BMDB {
17 return &BMDB{
18 Config: Config{
19 Database: component.CockroachConfig{
20 InMemory: true,
21 },
22 },
23 }
24}
25
26// TestSessionExpiry exercises the session heartbeat logic, making sure that if
27// a session stops being maintained subsequent Transact calls will fail.
28func TestSessionExpiry(t *testing.T) {
29 b := dut()
30 conn, err := b.Open(true)
31 if err != nil {
32 t.Fatalf("Open failed: %v", err)
33 }
34
35 ctx, ctxC := context.WithCancel(context.Background())
36 defer ctxC()
37
38 session, err := conn.StartSession(ctx)
39 if err != nil {
40 t.Fatalf("Starting session failed: %v", err)
41 }
42
43 // A transaction in a brand-new session should work.
44 var machine model.Machine
45 err = session.Transact(ctx, func(q *model.Queries) error {
46 machine, err = q.NewMachine(ctx)
47 return err
48 })
49 if err != nil {
50 t.Fatalf("First transaction failed: %v", err)
51 }
52
53 time.Sleep(6 * time.Second)
54
55 // A transaction after the 5-second session interval should continue to work.
56 err = session.Transact(ctx, func(q *model.Queries) error {
57 _, err = q.NewMachine(ctx)
58 return err
59 })
60 if err != nil {
61 t.Fatalf("Second transaction failed: %v", err)
62 }
63
64 // A transaction after the 5-second session interval should fail if we don't
65 // maintain its heartbeat.
66 session.ctxC()
67 time.Sleep(6 * time.Second)
68
69 err = session.Transact(ctx, func(q *model.Queries) error {
70 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
71 MachineID: machine.MachineID,
72 Provider: "foo",
73 ProviderID: "bar",
74 })
75 })
76 if !errors.Is(err, ErrSessionExpired) {
77 t.Fatalf("Second transaction should've failed due to expired session, got %v", err)
78 }
79
80}
81
82// TestWork exercises the per-{process,machine} mutual exclusion mechanism of
83// Work items.
84func TestWork(t *testing.T) {
85 b := dut()
86 conn, err := b.Open(true)
87 if err != nil {
88 t.Fatalf("Open failed: %v", err)
89 }
90
91 ctx, ctxC := context.WithCancel(context.Background())
92 defer ctxC()
93
94 // Start two session for testing.
95 session1, err := conn.StartSession(ctx)
96 if err != nil {
97 t.Fatalf("Starting session failed: %v", err)
98 }
99 session2, err := conn.StartSession(ctx)
100 if err != nil {
101 t.Fatalf("Starting session failed: %v", err)
102 }
103
104 var machine model.Machine
105 err = session1.Transact(ctx, func(q *model.Queries) error {
106 machine, err = q.NewMachine(ctx)
107 return err
108 })
109 if err != nil {
110 t.Fatalf("Creating machine failed: %v", err)
111 }
112
113 // Create a subcontext for a long-term work request. We'll cancel it later as
114 // part of the test.
115 ctxB, ctxBC := context.WithCancel(ctx)
116 defer ctxBC()
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100117
118 constantRetriever := func(_ *model.Queries) ([]uuid.UUID, error) {
119 return []uuid.UUID{machine.MachineID}, nil
120 }
121
122 // Start work on machine which we're not gonna finish for a while.
123 work1, err := session1.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200124 if err != nil {
125 t.Fatalf("Starting first work failed: %v", err)
126 }
127
128 // Starting more work on the same machine but a different process should still
129 // be allowed.
130 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100131 work2, err := session.Work(ctxB, model.ProcessUnitTest2, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200132 if err != nil {
133 t.Errorf("Could not run concurrent process on machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100134 } else {
135 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200136 }
137 }
138
139 // However, starting work with the same process on the same machine should
140 // fail.
141 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100142 work2, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200143 if !errors.Is(err, ErrWorkConflict) {
144 t.Errorf("Concurrent work with same process should've been forbidden, got %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100145 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200146 }
147 }
148
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100149 // Now, finish the long-running work.
150 work1.Cancel(ctx)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200151
152 // We should now be able to perform 'test1' work again against this machine.
153 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100154 work1, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200155 if err != nil {
156 t.Errorf("Could not run work against machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100157 } else {
158 work1.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200159 }
160 }
161}
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100162
Serge Bazanski20312b42023-04-19 13:49:47 +0200163// TestWorkBackoff exercises the backoff functionality within the BMDB.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100164func TestWorkBackoff(t *testing.T) {
165 b := dut()
166 conn, err := b.Open(true)
167 if err != nil {
168 t.Fatalf("Open failed: %v", err)
169 }
170
171 ctx, ctxC := context.WithCancel(context.Background())
172 defer ctxC()
173
174 session, err := conn.StartSession(ctx)
175 if err != nil {
176 t.Fatalf("Starting session failed: %v", err)
177 }
178
179 var machine model.Machine
180 // Create machine.
181 err = session.Transact(ctx, func(q *model.Queries) error {
182 machine, err = q.NewMachine(ctx)
183 if err != nil {
184 return err
185 }
186 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
187 MachineID: machine.MachineID,
188 Provider: model.ProviderEquinix,
189 ProviderID: "123",
190 })
191 })
192 if err != nil {
193 t.Fatalf("Creating machine failed: %v", err)
194 }
195
Serge Bazanski20312b42023-04-19 13:49:47 +0200196 waitMachine := func(nsec int64) *Work {
197 t.Helper()
198
199 deadline := time.Now().Add(time.Duration(nsec) * 2 * time.Second)
200 for {
201 if time.Now().After(deadline) {
202 t.Fatalf("Deadline expired")
203 }
204 work, err := session.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
205 machines, err := q.GetMachinesForAgentStart(ctx, 1)
206 if err != nil {
207 return nil, err
208 }
209 if len(machines) < 1 {
210 return nil, ErrNothingToDo
211 }
212 return []uuid.UUID{machines[0].MachineID}, nil
213 })
214 if err == nil {
215 return work
216 }
217 if !errors.Is(err, ErrNothingToDo) {
218 t.Fatalf("Unexpected work error: %v", err)
219 }
220 time.Sleep(100 * time.Millisecond)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100221 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100222 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200223
224 // Work on machine, but fail it with a backoff.
225 work := waitMachine(1)
226 backoff := Backoff{
227 Initial: time.Second,
228 Maximum: 5 * time.Second,
229 Exponent: 2,
230 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100231 if err := work.Fail(ctx, &backoff, "test"); err != nil {
232 t.Fatalf("Failing work failed: %v", err)
233 }
234
Serge Bazanski20312b42023-04-19 13:49:47 +0200235 expect := func(count int) {
236 t.Helper()
Serge Bazanskia9580a72023-01-12 14:44:35 +0100237
Serge Bazanski20312b42023-04-19 13:49:47 +0200238 var machines []model.MachineProvided
239 var err error
240 err = session.Transact(ctx, func(q *model.Queries) error {
241 machines, err = q.GetMachinesForAgentStart(ctx, 1)
242 if err != nil {
243 return err
244 }
245 return nil
Serge Bazanskia9580a72023-01-12 14:44:35 +0100246 })
Serge Bazanski20312b42023-04-19 13:49:47 +0200247 if err != nil {
248 t.Errorf("Failed to retrieve machines for agent start: %v", err)
249 }
250 if want, got := count, len(machines); want != got {
251 t.Errorf("Expected %d machines, got %d", want, got)
252 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100253 }
254
Serge Bazanski20312b42023-04-19 13:49:47 +0200255 // The machine shouldn't be returned now.
256 expect(0)
257
258 // Wait for the backoff to expire.
259 time.Sleep(1100 * time.Millisecond)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100260
261 // The machine should now be returned again.
Serge Bazanski20312b42023-04-19 13:49:47 +0200262 expect(1)
263
264 // Prepare helper for checking exponential backoffs.
265 failAndCheck := func(nsec int64) {
266 t.Helper()
267 work := waitMachine(nsec)
268 if err := work.Fail(ctx, &backoff, "test"); err != nil {
269 t.Fatalf("Failing work failed: %v", err)
270 }
271
272 var backoffs []model.WorkBackoff
273 err = session.Transact(ctx, func(q *model.Queries) error {
274 var err error
275 backoffs, err = q.WorkBackoffOf(ctx, model.WorkBackoffOfParams{
276 MachineID: machine.MachineID,
277 Process: model.ProcessShepherdAccess,
278 })
Serge Bazanskia9580a72023-01-12 14:44:35 +0100279 return err
Serge Bazanski20312b42023-04-19 13:49:47 +0200280 })
281 if err != nil {
282 t.Errorf("Failed to retrieve machines for agent start: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100283 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200284 if len(backoffs) < 1 {
285 t.Errorf("No backoff")
286 } else {
287 backoff := backoffs[0]
288 if want, got := nsec, backoff.LastIntervalSeconds.Int64; want != got {
289 t.Fatalf("Wanted backoff of %d seconds, got %d", want, got)
290 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100291 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200292 }
293
294 // Exercise exponential backoff functionality.
295 failAndCheck(2)
296 failAndCheck(4)
297 failAndCheck(5)
298 failAndCheck(5)
299
300 // If the job now succeeds, subsequent failures should start from 1 again.
301 work = waitMachine(5)
302 err = work.Finish(ctx, func(q *model.Queries) error {
303 // Not setting any tags that would cause subsequent queries to not return the
304 // machine anymore.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100305 return nil
306 })
307 if err != nil {
Serge Bazanski20312b42023-04-19 13:49:47 +0200308 t.Fatalf("Could not finish work: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100309 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200310
311 failAndCheck(1)
312 failAndCheck(2)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100313}
314
Serge Bazanski10383132023-02-20 15:39:45 +0100315// TestAgentStartWorkflow exercises the agent start workflow within the BMDB.
316func TestAgentStartWorkflow(t *testing.T) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100317 b := dut()
318 conn, err := b.Open(true)
319 if err != nil {
320 t.Fatalf("Open failed: %v", err)
321 }
322
323 ctx, ctxC := context.WithCancel(context.Background())
324 defer ctxC()
325
326 session, err := conn.StartSession(ctx)
327 if err != nil {
328 t.Fatalf("Starting session failed: %v", err)
329 }
330
331 // Create machine. Drop its ID.
332 err = session.Transact(ctx, func(q *model.Queries) error {
333 machine, err := q.NewMachine(ctx)
334 if err != nil {
335 return err
336 }
337 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
338 MachineID: machine.MachineID,
339 Provider: model.ProviderEquinix,
340 ProviderID: "123",
341 })
342 })
343 if err != nil {
344 t.Fatalf("Creating machine failed: %v", err)
345 }
346
347 // Start working on a machine.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100348 var machine uuid.UUID
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100349 startedC := make(chan struct{})
350 doneC := make(chan struct{})
351 errC := make(chan error)
352 go func() {
Serge Bazanski10383132023-02-20 15:39:45 +0100353 work, err := session.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100354 machines, err := q.GetMachinesForAgentStart(ctx, 1)
355 if err != nil {
356 return nil, err
357 }
358 if len(machines) < 1 {
359 return nil, ErrNothingToDo
360 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100361 machine = machines[0].MachineID
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100362 return []uuid.UUID{machines[0].MachineID}, nil
363 })
364 defer work.Cancel(ctx)
365
366 if err != nil {
367 close(startedC)
368 errC <- err
369 return
370 }
371
372 // Simulate work by blocking on a channel.
373 close(startedC)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100374
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100375 <-doneC
376
377 err = work.Finish(ctx, func(q *model.Queries) error {
378 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
379 MachineID: work.Machine,
380 AgentStartedAt: time.Now(),
381 AgentPublicKey: []byte("fakefakefake"),
382 })
383 })
384 errC <- err
385 }()
386 <-startedC
387 // Work on the machine has started. Attempting to get more machines now should
388 // return no machines.
389 err = session.Transact(ctx, func(q *model.Queries) error {
390 machines, err := q.GetMachinesForAgentStart(ctx, 1)
391 if err != nil {
392 return err
393 }
394 if len(machines) > 0 {
Serge Bazanski10383132023-02-20 15:39:45 +0100395 t.Errorf("Expected no machines ready for agent start.")
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100396 }
397 return nil
398 })
399 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100400 t.Fatalf("Failed to retrieve machines for start in parallel: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100401 }
402 // Finish working on machine.
403 close(doneC)
404 err = <-errC
405 if err != nil {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100406 t.Fatalf("Failed to finish work on machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100407 }
Serge Bazanski10383132023-02-20 15:39:45 +0100408 // That machine has its agent started, so we still expect no work to have to be
409 // done.
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100410 err = session.Transact(ctx, func(q *model.Queries) error {
411 machines, err := q.GetMachinesForAgentStart(ctx, 1)
412 if err != nil {
413 return err
414 }
415 if len(machines) > 0 {
Serge Bazanski10383132023-02-20 15:39:45 +0100416 t.Errorf("Expected still no machines ready for agent start.")
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100417 }
418 return nil
419 })
420 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100421 t.Fatalf("Failed to retrieve machines for agent start after work finished: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100422 }
423
424 // Check history has been recorded.
425 var history []model.WorkHistory
426 err = session.Transact(ctx, func(q *model.Queries) error {
427 history, err = q.ListHistoryOf(ctx, machine)
428 return err
429 })
430 if err != nil {
431 t.Fatalf("Failed to retrieve machine history: %v", err)
432 }
433 // Expect two history items: started and finished.
434 if want, got := 2, len(history); want != got {
435 t.Errorf("Wanted %d history items, got %d", want, got)
436 } else {
437 if want, got := model.WorkHistoryEventStarted, history[0].Event; want != got {
438 t.Errorf("Wanted first history event to be %s, got %s", want, got)
439 }
440 if want, got := model.WorkHistoryEventFinished, history[1].Event; want != got {
441 t.Errorf("Wanted second history event to be %s, got %s", want, got)
442 }
443 }
444 // Check all other history event data.
445 for i, el := range history {
446 if want, got := machine, el.MachineID; want.String() != got.String() {
447 t.Errorf("Wanted %d history event machine ID to be %s, got %s", i, want, got)
448 }
Serge Bazanski10383132023-02-20 15:39:45 +0100449 if want, got := model.ProcessShepherdAccess, el.Process; want != got {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100450 t.Errorf("Wanted %d history event process to be %s, got %s", i, want, got)
451 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100452 }
453}
454
Serge Bazanski10383132023-02-20 15:39:45 +0100455// TestAgentStartWorkflowParallel starts work on three machines by six workers
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100456// and makes sure that there are no scheduling conflicts between them.
Serge Bazanski10383132023-02-20 15:39:45 +0100457func TestAgentStartWorkflowParallel(t *testing.T) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100458 b := dut()
459 conn, err := b.Open(true)
460 if err != nil {
461 t.Fatalf("Open failed: %v", err)
462 }
463
464 ctx, ctxC := context.WithCancel(context.Background())
465 defer ctxC()
466
467 makeMachine := func(providerID string) {
468 ctxS, ctxC := context.WithCancel(ctx)
469 defer ctxC()
470 session, err := conn.StartSession(ctxS)
471 if err != nil {
472 t.Fatalf("Starting session failed: %v", err)
473 }
474 err = session.Transact(ctx, func(q *model.Queries) error {
475 machine, err := q.NewMachine(ctx)
476 if err != nil {
477 return err
478 }
479 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
480 MachineID: machine.MachineID,
481 Provider: model.ProviderEquinix,
482 ProviderID: providerID,
483 })
484 })
485 if err != nil {
486 t.Fatalf("Creating machine failed: %v", err)
487 }
488 }
489 // Make six machines for testing.
490 for i := 0; i < 6; i++ {
491 makeMachine(fmt.Sprintf("test%d", i))
492 }
493
494 workStarted := make(chan struct{})
495 workDone := make(chan struct {
496 machine uuid.UUID
497 workerID int
498 })
499
500 workOnce := func(ctx context.Context, workerID int, session *Session) error {
Serge Bazanski10383132023-02-20 15:39:45 +0100501 work, err := session.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100502 machines, err := q.GetMachinesForAgentStart(ctx, 1)
503 if err != nil {
504 return nil, err
505 }
506 if len(machines) < 1 {
507 return nil, ErrNothingToDo
508 }
509 return []uuid.UUID{machines[0].MachineID}, nil
510 })
511
512 if err != nil {
513 return err
514 }
515 defer work.Cancel(ctx)
516
517 select {
518 case <-workStarted:
519 case <-ctx.Done():
520 return ctx.Err()
521 }
522
523 select {
524 case workDone <- struct {
525 machine uuid.UUID
526 workerID int
527 }{
528 machine: work.Machine,
529 workerID: workerID,
530 }:
531 case <-ctx.Done():
532 return ctx.Err()
533 }
534
535 return work.Finish(ctx, func(q *model.Queries) error {
536 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
537 MachineID: work.Machine,
538 AgentStartedAt: time.Now(),
539 AgentPublicKey: []byte("fakefakefake"),
540 })
541 })
542 }
543
544 worker := func(workerID int) {
545 ctxS, ctxC := context.WithCancel(ctx)
546 defer ctxC()
547 session, err := conn.StartSession(ctxS)
548 if err != nil {
549 t.Fatalf("Starting session failed: %v", err)
550 }
551 for {
552 err := workOnce(ctxS, workerID, session)
553 if err != nil {
Serge Bazanskice4af2b2023-03-16 21:23:39 +0100554 if errors.Is(err, ErrNothingToDo) {
555 continue
556 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100557 if errors.Is(err, ctxS.Err()) {
558 return
559 }
560 t.Fatalf("worker failed: %v", err)
561 }
562 }
563 }
564 // Start three workers.
565 for i := 0; i < 3; i++ {
566 go worker(i)
567 }
568
569 // Wait for at least three workers to be alive.
570 for i := 0; i < 3; i++ {
571 workStarted <- struct{}{}
572 }
573
574 // Allow all workers to continue running from now on.
575 close(workStarted)
576
577 // Expect six machines to have been handled in parallel by three workers.
578 seenWorkers := make(map[int]bool)
579 seenMachines := make(map[string]bool)
580 for i := 0; i < 6; i++ {
581 res := <-workDone
582 seenWorkers[res.workerID] = true
583 seenMachines[res.machine.String()] = true
584 }
585
586 if want, got := 3, len(seenWorkers); want != got {
587 t.Errorf("Expected %d workers, got %d", want, got)
588 }
589 if want, got := 6, len(seenMachines); want != got {
590 t.Errorf("Expected %d machines, got %d", want, got)
591 }
592}