blob: d3c80bd00126ace5f3aed6a9f4d29aa018bd0b7f [file] [log] [blame]
Serge Bazanski35e8d792022-10-11 11:32:30 +02001package bmdb
2
3import (
4 "context"
5 "errors"
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +01006 "fmt"
Serge Bazanski35e8d792022-10-11 11:32:30 +02007 "testing"
8 "time"
9
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +010010 "github.com/google/uuid"
11
Serge Bazanski35e8d792022-10-11 11:32:30 +020012 "source.monogon.dev/cloud/bmaas/bmdb/model"
13 "source.monogon.dev/cloud/lib/component"
14)
15
16func dut() *BMDB {
17 return &BMDB{
18 Config: Config{
19 Database: component.CockroachConfig{
20 InMemory: true,
21 },
22 },
23 }
24}
25
26// TestSessionExpiry exercises the session heartbeat logic, making sure that if
27// a session stops being maintained subsequent Transact calls will fail.
28func TestSessionExpiry(t *testing.T) {
29 b := dut()
30 conn, err := b.Open(true)
31 if err != nil {
32 t.Fatalf("Open failed: %v", err)
33 }
34
35 ctx, ctxC := context.WithCancel(context.Background())
36 defer ctxC()
37
38 session, err := conn.StartSession(ctx)
39 if err != nil {
40 t.Fatalf("Starting session failed: %v", err)
41 }
42
43 // A transaction in a brand-new session should work.
44 var machine model.Machine
45 err = session.Transact(ctx, func(q *model.Queries) error {
46 machine, err = q.NewMachine(ctx)
47 return err
48 })
49 if err != nil {
50 t.Fatalf("First transaction failed: %v", err)
51 }
52
53 time.Sleep(6 * time.Second)
54
55 // A transaction after the 5-second session interval should continue to work.
56 err = session.Transact(ctx, func(q *model.Queries) error {
57 _, err = q.NewMachine(ctx)
58 return err
59 })
60 if err != nil {
61 t.Fatalf("Second transaction failed: %v", err)
62 }
63
64 // A transaction after the 5-second session interval should fail if we don't
65 // maintain its heartbeat.
66 session.ctxC()
67 time.Sleep(6 * time.Second)
68
69 err = session.Transact(ctx, func(q *model.Queries) error {
70 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
71 MachineID: machine.MachineID,
72 Provider: "foo",
73 ProviderID: "bar",
74 })
75 })
76 if !errors.Is(err, ErrSessionExpired) {
77 t.Fatalf("Second transaction should've failed due to expired session, got %v", err)
78 }
79
80}
81
82// TestWork exercises the per-{process,machine} mutual exclusion mechanism of
83// Work items.
84func TestWork(t *testing.T) {
85 b := dut()
86 conn, err := b.Open(true)
87 if err != nil {
88 t.Fatalf("Open failed: %v", err)
89 }
90
91 ctx, ctxC := context.WithCancel(context.Background())
92 defer ctxC()
93
94 // Start two session for testing.
95 session1, err := conn.StartSession(ctx)
96 if err != nil {
97 t.Fatalf("Starting session failed: %v", err)
98 }
99 session2, err := conn.StartSession(ctx)
100 if err != nil {
101 t.Fatalf("Starting session failed: %v", err)
102 }
103
104 var machine model.Machine
105 err = session1.Transact(ctx, func(q *model.Queries) error {
106 machine, err = q.NewMachine(ctx)
107 return err
108 })
109 if err != nil {
110 t.Fatalf("Creating machine failed: %v", err)
111 }
112
113 // Create a subcontext for a long-term work request. We'll cancel it later as
114 // part of the test.
115 ctxB, ctxBC := context.WithCancel(ctx)
116 defer ctxBC()
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100117
118 constantRetriever := func(_ *model.Queries) ([]uuid.UUID, error) {
119 return []uuid.UUID{machine.MachineID}, nil
120 }
121
122 // Start work on machine which we're not gonna finish for a while.
123 work1, err := session1.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200124 if err != nil {
125 t.Fatalf("Starting first work failed: %v", err)
126 }
127
128 // Starting more work on the same machine but a different process should still
129 // be allowed.
130 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100131 work2, err := session.Work(ctxB, model.ProcessUnitTest2, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200132 if err != nil {
133 t.Errorf("Could not run concurrent process on machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100134 } else {
135 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200136 }
137 }
138
139 // However, starting work with the same process on the same machine should
140 // fail.
141 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100142 work2, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200143 if !errors.Is(err, ErrWorkConflict) {
144 t.Errorf("Concurrent work with same process should've been forbidden, got %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100145 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200146 }
147 }
148
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100149 // Now, finish the long-running work.
150 work1.Cancel(ctx)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200151
152 // We should now be able to perform 'test1' work again against this machine.
153 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100154 work1, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200155 if err != nil {
156 t.Errorf("Could not run work against machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100157 } else {
158 work1.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200159 }
160 }
161}
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100162
Serge Bazanski20312b42023-04-19 13:49:47 +0200163// TestWorkBackoff exercises the backoff functionality within the BMDB.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100164func TestWorkBackoff(t *testing.T) {
165 b := dut()
166 conn, err := b.Open(true)
167 if err != nil {
168 t.Fatalf("Open failed: %v", err)
169 }
170
171 ctx, ctxC := context.WithCancel(context.Background())
172 defer ctxC()
173
174 session, err := conn.StartSession(ctx)
175 if err != nil {
176 t.Fatalf("Starting session failed: %v", err)
177 }
178
179 var machine model.Machine
180 // Create machine.
181 err = session.Transact(ctx, func(q *model.Queries) error {
182 machine, err = q.NewMachine(ctx)
183 if err != nil {
184 return err
185 }
186 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
187 MachineID: machine.MachineID,
188 Provider: model.ProviderEquinix,
189 ProviderID: "123",
190 })
191 })
192 if err != nil {
193 t.Fatalf("Creating machine failed: %v", err)
194 }
195
Serge Bazanski20312b42023-04-19 13:49:47 +0200196 waitMachine := func(nsec int64) *Work {
197 t.Helper()
198
199 deadline := time.Now().Add(time.Duration(nsec) * 2 * time.Second)
200 for {
201 if time.Now().After(deadline) {
202 t.Fatalf("Deadline expired")
203 }
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200204 work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000205 machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
206 Limit: 1,
207 Provider: model.ProviderEquinix,
208 })
Serge Bazanski20312b42023-04-19 13:49:47 +0200209 if err != nil {
210 return nil, err
211 }
212 if len(machines) < 1 {
213 return nil, ErrNothingToDo
214 }
215 return []uuid.UUID{machines[0].MachineID}, nil
216 })
217 if err == nil {
218 return work
219 }
220 if !errors.Is(err, ErrNothingToDo) {
221 t.Fatalf("Unexpected work error: %v", err)
222 }
223 time.Sleep(100 * time.Millisecond)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100224 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100225 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200226
227 // Work on machine, but fail it with a backoff.
228 work := waitMachine(1)
229 backoff := Backoff{
230 Initial: time.Second,
231 Maximum: 5 * time.Second,
232 Exponent: 2,
233 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100234 if err := work.Fail(ctx, &backoff, "test"); err != nil {
235 t.Fatalf("Failing work failed: %v", err)
236 }
237
Serge Bazanski20312b42023-04-19 13:49:47 +0200238 expect := func(count int) {
239 t.Helper()
Serge Bazanskia9580a72023-01-12 14:44:35 +0100240
Serge Bazanski20312b42023-04-19 13:49:47 +0200241 var machines []model.MachineProvided
242 var err error
243 err = session.Transact(ctx, func(q *model.Queries) error {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000244 machines, err = q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
245 Limit: 1,
246 Provider: model.ProviderEquinix,
247 })
Serge Bazanski20312b42023-04-19 13:49:47 +0200248 if err != nil {
249 return err
250 }
251 return nil
Serge Bazanskia9580a72023-01-12 14:44:35 +0100252 })
Serge Bazanski20312b42023-04-19 13:49:47 +0200253 if err != nil {
254 t.Errorf("Failed to retrieve machines for agent start: %v", err)
255 }
256 if want, got := count, len(machines); want != got {
257 t.Errorf("Expected %d machines, got %d", want, got)
258 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100259 }
260
Serge Bazanski20312b42023-04-19 13:49:47 +0200261 // The machine shouldn't be returned now.
262 expect(0)
263
264 // Wait for the backoff to expire.
265 time.Sleep(1100 * time.Millisecond)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100266
267 // The machine should now be returned again.
Serge Bazanski20312b42023-04-19 13:49:47 +0200268 expect(1)
269
270 // Prepare helper for checking exponential backoffs.
271 failAndCheck := func(nsec int64) {
272 t.Helper()
273 work := waitMachine(nsec)
274 if err := work.Fail(ctx, &backoff, "test"); err != nil {
275 t.Fatalf("Failing work failed: %v", err)
276 }
277
278 var backoffs []model.WorkBackoff
279 err = session.Transact(ctx, func(q *model.Queries) error {
280 var err error
281 backoffs, err = q.WorkBackoffOf(ctx, model.WorkBackoffOfParams{
282 MachineID: machine.MachineID,
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200283 Process: model.ProcessShepherdAgentStart,
Serge Bazanski20312b42023-04-19 13:49:47 +0200284 })
Serge Bazanskia9580a72023-01-12 14:44:35 +0100285 return err
Serge Bazanski20312b42023-04-19 13:49:47 +0200286 })
287 if err != nil {
288 t.Errorf("Failed to retrieve machines for agent start: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100289 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200290 if len(backoffs) < 1 {
291 t.Errorf("No backoff")
292 } else {
293 backoff := backoffs[0]
294 if want, got := nsec, backoff.LastIntervalSeconds.Int64; want != got {
295 t.Fatalf("Wanted backoff of %d seconds, got %d", want, got)
296 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100297 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200298 }
299
300 // Exercise exponential backoff functionality.
301 failAndCheck(2)
302 failAndCheck(4)
303 failAndCheck(5)
304 failAndCheck(5)
305
306 // If the job now succeeds, subsequent failures should start from 1 again.
307 work = waitMachine(5)
308 err = work.Finish(ctx, func(q *model.Queries) error {
309 // Not setting any tags that would cause subsequent queries to not return the
310 // machine anymore.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100311 return nil
312 })
313 if err != nil {
Serge Bazanski20312b42023-04-19 13:49:47 +0200314 t.Fatalf("Could not finish work: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100315 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200316
317 failAndCheck(1)
318 failAndCheck(2)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100319}
320
Serge Bazanski10383132023-02-20 15:39:45 +0100321// TestAgentStartWorkflow exercises the agent start workflow within the BMDB.
322func TestAgentStartWorkflow(t *testing.T) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100323 b := dut()
324 conn, err := b.Open(true)
325 if err != nil {
326 t.Fatalf("Open failed: %v", err)
327 }
328
329 ctx, ctxC := context.WithCancel(context.Background())
330 defer ctxC()
331
332 session, err := conn.StartSession(ctx)
333 if err != nil {
334 t.Fatalf("Starting session failed: %v", err)
335 }
336
337 // Create machine. Drop its ID.
338 err = session.Transact(ctx, func(q *model.Queries) error {
339 machine, err := q.NewMachine(ctx)
340 if err != nil {
341 return err
342 }
343 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
344 MachineID: machine.MachineID,
345 Provider: model.ProviderEquinix,
346 ProviderID: "123",
347 })
348 })
349 if err != nil {
350 t.Fatalf("Creating machine failed: %v", err)
351 }
352
353 // Start working on a machine.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100354 var machine uuid.UUID
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100355 startedC := make(chan struct{})
356 doneC := make(chan struct{})
357 errC := make(chan error)
358 go func() {
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200359 work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000360 machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
361 Limit: 1,
362 Provider: model.ProviderEquinix,
363 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100364 if err != nil {
365 return nil, err
366 }
367 if len(machines) < 1 {
368 return nil, ErrNothingToDo
369 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100370 machine = machines[0].MachineID
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100371 return []uuid.UUID{machines[0].MachineID}, nil
372 })
373 defer work.Cancel(ctx)
374
375 if err != nil {
376 close(startedC)
377 errC <- err
378 return
379 }
380
381 // Simulate work by blocking on a channel.
382 close(startedC)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100383
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100384 <-doneC
385
386 err = work.Finish(ctx, func(q *model.Queries) error {
387 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
388 MachineID: work.Machine,
389 AgentStartedAt: time.Now(),
390 AgentPublicKey: []byte("fakefakefake"),
391 })
392 })
393 errC <- err
394 }()
395 <-startedC
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200396
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100397 // Work on the machine has started. Attempting to get more machines now should
398 // return no machines.
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200399
400 // Mutual exclusion with AgentStart:
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100401 err = session.Transact(ctx, func(q *model.Queries) error {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000402 machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
403 Limit: 1,
404 Provider: model.ProviderEquinix,
405 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100406 if err != nil {
407 return err
408 }
409 if len(machines) > 0 {
Serge Bazanski10383132023-02-20 15:39:45 +0100410 t.Errorf("Expected no machines ready for agent start.")
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100411 }
412 return nil
413 })
414 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100415 t.Fatalf("Failed to retrieve machines for start in parallel: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100416 }
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200417
418 // Mutual exclusion with Recovery:
419 err = session.Transact(ctx, func(q *model.Queries) error {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000420 machines, err := q.GetMachineForAgentRecovery(ctx, model.GetMachineForAgentRecoveryParams{
421 Limit: 1,
422 Provider: model.ProviderEquinix,
423 })
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200424 if err != nil {
425 return err
426 }
427 if len(machines) > 0 {
428 t.Errorf("Expected no machines ready for agent recovery.")
429 }
430 return nil
431 })
432 if err != nil {
433 t.Fatalf("Failed to retrieve machines for recovery in parallel: %v", err)
434 }
435
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100436 // Finish working on machine.
437 close(doneC)
438 err = <-errC
439 if err != nil {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100440 t.Fatalf("Failed to finish work on machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100441 }
Serge Bazanski10383132023-02-20 15:39:45 +0100442 // That machine has its agent started, so we still expect no work to have to be
443 // done.
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100444 err = session.Transact(ctx, func(q *model.Queries) error {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000445 machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
446 Limit: 1,
447 Provider: model.ProviderEquinix,
448 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100449 if err != nil {
450 return err
451 }
452 if len(machines) > 0 {
Serge Bazanski10383132023-02-20 15:39:45 +0100453 t.Errorf("Expected still no machines ready for agent start.")
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100454 }
455 return nil
456 })
457 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100458 t.Fatalf("Failed to retrieve machines for agent start after work finished: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100459 }
460
461 // Check history has been recorded.
462 var history []model.WorkHistory
463 err = session.Transact(ctx, func(q *model.Queries) error {
464 history, err = q.ListHistoryOf(ctx, machine)
465 return err
466 })
467 if err != nil {
468 t.Fatalf("Failed to retrieve machine history: %v", err)
469 }
470 // Expect two history items: started and finished.
471 if want, got := 2, len(history); want != got {
472 t.Errorf("Wanted %d history items, got %d", want, got)
473 } else {
474 if want, got := model.WorkHistoryEventStarted, history[0].Event; want != got {
475 t.Errorf("Wanted first history event to be %s, got %s", want, got)
476 }
477 if want, got := model.WorkHistoryEventFinished, history[1].Event; want != got {
478 t.Errorf("Wanted second history event to be %s, got %s", want, got)
479 }
480 }
481 // Check all other history event data.
482 for i, el := range history {
483 if want, got := machine, el.MachineID; want.String() != got.String() {
484 t.Errorf("Wanted %d history event machine ID to be %s, got %s", i, want, got)
485 }
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200486 if want, got := model.ProcessShepherdAgentStart, el.Process; want != got {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100487 t.Errorf("Wanted %d history event process to be %s, got %s", i, want, got)
488 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100489 }
490}
491
Serge Bazanski10383132023-02-20 15:39:45 +0100492// TestAgentStartWorkflowParallel starts work on three machines by six workers
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100493// and makes sure that there are no scheduling conflicts between them.
Serge Bazanski10383132023-02-20 15:39:45 +0100494func TestAgentStartWorkflowParallel(t *testing.T) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100495 b := dut()
496 conn, err := b.Open(true)
497 if err != nil {
498 t.Fatalf("Open failed: %v", err)
499 }
500
501 ctx, ctxC := context.WithCancel(context.Background())
502 defer ctxC()
503
504 makeMachine := func(providerID string) {
505 ctxS, ctxC := context.WithCancel(ctx)
506 defer ctxC()
507 session, err := conn.StartSession(ctxS)
508 if err != nil {
509 t.Fatalf("Starting session failed: %v", err)
510 }
511 err = session.Transact(ctx, func(q *model.Queries) error {
512 machine, err := q.NewMachine(ctx)
513 if err != nil {
514 return err
515 }
516 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
517 MachineID: machine.MachineID,
518 Provider: model.ProviderEquinix,
519 ProviderID: providerID,
520 })
521 })
522 if err != nil {
523 t.Fatalf("Creating machine failed: %v", err)
524 }
525 }
526 // Make six machines for testing.
527 for i := 0; i < 6; i++ {
528 makeMachine(fmt.Sprintf("test%d", i))
529 }
530
531 workStarted := make(chan struct{})
532 workDone := make(chan struct {
533 machine uuid.UUID
534 workerID int
535 })
536
537 workOnce := func(ctx context.Context, workerID int, session *Session) error {
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200538 work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000539 machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
540 Limit: 1,
541 Provider: model.ProviderEquinix,
542 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100543 if err != nil {
544 return nil, err
545 }
546 if len(machines) < 1 {
547 return nil, ErrNothingToDo
548 }
549 return []uuid.UUID{machines[0].MachineID}, nil
550 })
551
552 if err != nil {
553 return err
554 }
555 defer work.Cancel(ctx)
556
557 select {
558 case <-workStarted:
559 case <-ctx.Done():
560 return ctx.Err()
561 }
562
563 select {
564 case workDone <- struct {
565 machine uuid.UUID
566 workerID int
567 }{
568 machine: work.Machine,
569 workerID: workerID,
570 }:
571 case <-ctx.Done():
572 return ctx.Err()
573 }
574
575 return work.Finish(ctx, func(q *model.Queries) error {
576 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
577 MachineID: work.Machine,
578 AgentStartedAt: time.Now(),
579 AgentPublicKey: []byte("fakefakefake"),
580 })
581 })
582 }
583
584 worker := func(workerID int) {
585 ctxS, ctxC := context.WithCancel(ctx)
586 defer ctxC()
587 session, err := conn.StartSession(ctxS)
588 if err != nil {
589 t.Fatalf("Starting session failed: %v", err)
590 }
591 for {
592 err := workOnce(ctxS, workerID, session)
593 if err != nil {
Serge Bazanskice4af2b2023-03-16 21:23:39 +0100594 if errors.Is(err, ErrNothingToDo) {
595 continue
596 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100597 if errors.Is(err, ctxS.Err()) {
598 return
599 }
600 t.Fatalf("worker failed: %v", err)
601 }
602 }
603 }
604 // Start three workers.
605 for i := 0; i < 3; i++ {
606 go worker(i)
607 }
608
609 // Wait for at least three workers to be alive.
610 for i := 0; i < 3; i++ {
611 workStarted <- struct{}{}
612 }
613
614 // Allow all workers to continue running from now on.
615 close(workStarted)
616
617 // Expect six machines to have been handled in parallel by three workers.
618 seenWorkers := make(map[int]bool)
619 seenMachines := make(map[string]bool)
620 for i := 0; i < 6; i++ {
621 res := <-workDone
622 seenWorkers[res.workerID] = true
623 seenMachines[res.machine.String()] = true
624 }
625
626 if want, got := 3, len(seenWorkers); want != got {
627 t.Errorf("Expected %d workers, got %d", want, got)
628 }
629 if want, got := 6, len(seenMachines); want != got {
630 t.Errorf("Expected %d machines, got %d", want, got)
631 }
632}