blob: b8625a9ba74cafd4a4fa1d10fb42c1df59276306 [file] [log] [blame]
Serge Bazanski35e8d792022-10-11 11:32:30 +02001package bmdb
2
3import (
4 "context"
5 "errors"
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +01006 "fmt"
Serge Bazanski35e8d792022-10-11 11:32:30 +02007 "testing"
8 "time"
9
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +010010 "github.com/google/uuid"
11
Serge Bazanski35e8d792022-10-11 11:32:30 +020012 "source.monogon.dev/cloud/bmaas/bmdb/model"
13 "source.monogon.dev/cloud/lib/component"
14)
15
16func dut() *BMDB {
17 return &BMDB{
18 Config: Config{
19 Database: component.CockroachConfig{
20 InMemory: true,
21 },
22 },
23 }
24}
25
26// TestSessionExpiry exercises the session heartbeat logic, making sure that if
27// a session stops being maintained subsequent Transact calls will fail.
28func TestSessionExpiry(t *testing.T) {
29 b := dut()
30 conn, err := b.Open(true)
31 if err != nil {
32 t.Fatalf("Open failed: %v", err)
33 }
34
35 ctx, ctxC := context.WithCancel(context.Background())
36 defer ctxC()
37
38 session, err := conn.StartSession(ctx)
39 if err != nil {
40 t.Fatalf("Starting session failed: %v", err)
41 }
42
43 // A transaction in a brand-new session should work.
44 var machine model.Machine
45 err = session.Transact(ctx, func(q *model.Queries) error {
46 machine, err = q.NewMachine(ctx)
47 return err
48 })
49 if err != nil {
50 t.Fatalf("First transaction failed: %v", err)
51 }
52
53 time.Sleep(6 * time.Second)
54
55 // A transaction after the 5-second session interval should continue to work.
56 err = session.Transact(ctx, func(q *model.Queries) error {
57 _, err = q.NewMachine(ctx)
58 return err
59 })
60 if err != nil {
61 t.Fatalf("Second transaction failed: %v", err)
62 }
63
64 // A transaction after the 5-second session interval should fail if we don't
65 // maintain its heartbeat.
66 session.ctxC()
67 time.Sleep(6 * time.Second)
68
69 err = session.Transact(ctx, func(q *model.Queries) error {
70 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
71 MachineID: machine.MachineID,
72 Provider: "foo",
73 ProviderID: "bar",
74 })
75 })
76 if !errors.Is(err, ErrSessionExpired) {
77 t.Fatalf("Second transaction should've failed due to expired session, got %v", err)
78 }
79
80}
81
82// TestWork exercises the per-{process,machine} mutual exclusion mechanism of
83// Work items.
84func TestWork(t *testing.T) {
85 b := dut()
86 conn, err := b.Open(true)
87 if err != nil {
88 t.Fatalf("Open failed: %v", err)
89 }
90
91 ctx, ctxC := context.WithCancel(context.Background())
92 defer ctxC()
93
94 // Start two session for testing.
95 session1, err := conn.StartSession(ctx)
96 if err != nil {
97 t.Fatalf("Starting session failed: %v", err)
98 }
99 session2, err := conn.StartSession(ctx)
100 if err != nil {
101 t.Fatalf("Starting session failed: %v", err)
102 }
103
104 var machine model.Machine
105 err = session1.Transact(ctx, func(q *model.Queries) error {
106 machine, err = q.NewMachine(ctx)
107 return err
108 })
109 if err != nil {
110 t.Fatalf("Creating machine failed: %v", err)
111 }
112
113 // Create a subcontext for a long-term work request. We'll cancel it later as
114 // part of the test.
115 ctxB, ctxBC := context.WithCancel(ctx)
116 defer ctxBC()
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100117
118 constantRetriever := func(_ *model.Queries) ([]uuid.UUID, error) {
119 return []uuid.UUID{machine.MachineID}, nil
120 }
121
122 // Start work on machine which we're not gonna finish for a while.
123 work1, err := session1.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200124 if err != nil {
125 t.Fatalf("Starting first work failed: %v", err)
126 }
127
128 // Starting more work on the same machine but a different process should still
129 // be allowed.
130 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100131 work2, err := session.Work(ctxB, model.ProcessUnitTest2, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200132 if err != nil {
133 t.Errorf("Could not run concurrent process on machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100134 } else {
135 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200136 }
137 }
138
139 // However, starting work with the same process on the same machine should
140 // fail.
141 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100142 work2, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200143 if !errors.Is(err, ErrWorkConflict) {
144 t.Errorf("Concurrent work with same process should've been forbidden, got %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100145 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200146 }
147 }
148
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100149 // Now, finish the long-running work.
150 work1.Cancel(ctx)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200151
152 // We should now be able to perform 'test1' work again against this machine.
153 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100154 work1, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200155 if err != nil {
156 t.Errorf("Could not run work against machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100157 } else {
158 work1.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200159 }
160 }
161}
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100162
Serge Bazanski20312b42023-04-19 13:49:47 +0200163// TestWorkBackoff exercises the backoff functionality within the BMDB.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100164func TestWorkBackoff(t *testing.T) {
165 b := dut()
166 conn, err := b.Open(true)
167 if err != nil {
168 t.Fatalf("Open failed: %v", err)
169 }
170
171 ctx, ctxC := context.WithCancel(context.Background())
172 defer ctxC()
173
174 session, err := conn.StartSession(ctx)
175 if err != nil {
176 t.Fatalf("Starting session failed: %v", err)
177 }
178
179 var machine model.Machine
180 // Create machine.
181 err = session.Transact(ctx, func(q *model.Queries) error {
182 machine, err = q.NewMachine(ctx)
183 if err != nil {
184 return err
185 }
186 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
187 MachineID: machine.MachineID,
188 Provider: model.ProviderEquinix,
189 ProviderID: "123",
190 })
191 })
192 if err != nil {
193 t.Fatalf("Creating machine failed: %v", err)
194 }
195
Serge Bazanski20312b42023-04-19 13:49:47 +0200196 waitMachine := func(nsec int64) *Work {
197 t.Helper()
198
199 deadline := time.Now().Add(time.Duration(nsec) * 2 * time.Second)
200 for {
201 if time.Now().After(deadline) {
202 t.Fatalf("Deadline expired")
203 }
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200204 work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
Serge Bazanski20312b42023-04-19 13:49:47 +0200205 machines, err := q.GetMachinesForAgentStart(ctx, 1)
206 if err != nil {
207 return nil, err
208 }
209 if len(machines) < 1 {
210 return nil, ErrNothingToDo
211 }
212 return []uuid.UUID{machines[0].MachineID}, nil
213 })
214 if err == nil {
215 return work
216 }
217 if !errors.Is(err, ErrNothingToDo) {
218 t.Fatalf("Unexpected work error: %v", err)
219 }
220 time.Sleep(100 * time.Millisecond)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100221 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100222 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200223
224 // Work on machine, but fail it with a backoff.
225 work := waitMachine(1)
226 backoff := Backoff{
227 Initial: time.Second,
228 Maximum: 5 * time.Second,
229 Exponent: 2,
230 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100231 if err := work.Fail(ctx, &backoff, "test"); err != nil {
232 t.Fatalf("Failing work failed: %v", err)
233 }
234
Serge Bazanski20312b42023-04-19 13:49:47 +0200235 expect := func(count int) {
236 t.Helper()
Serge Bazanskia9580a72023-01-12 14:44:35 +0100237
Serge Bazanski20312b42023-04-19 13:49:47 +0200238 var machines []model.MachineProvided
239 var err error
240 err = session.Transact(ctx, func(q *model.Queries) error {
241 machines, err = q.GetMachinesForAgentStart(ctx, 1)
242 if err != nil {
243 return err
244 }
245 return nil
Serge Bazanskia9580a72023-01-12 14:44:35 +0100246 })
Serge Bazanski20312b42023-04-19 13:49:47 +0200247 if err != nil {
248 t.Errorf("Failed to retrieve machines for agent start: %v", err)
249 }
250 if want, got := count, len(machines); want != got {
251 t.Errorf("Expected %d machines, got %d", want, got)
252 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100253 }
254
Serge Bazanski20312b42023-04-19 13:49:47 +0200255 // The machine shouldn't be returned now.
256 expect(0)
257
258 // Wait for the backoff to expire.
259 time.Sleep(1100 * time.Millisecond)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100260
261 // The machine should now be returned again.
Serge Bazanski20312b42023-04-19 13:49:47 +0200262 expect(1)
263
264 // Prepare helper for checking exponential backoffs.
265 failAndCheck := func(nsec int64) {
266 t.Helper()
267 work := waitMachine(nsec)
268 if err := work.Fail(ctx, &backoff, "test"); err != nil {
269 t.Fatalf("Failing work failed: %v", err)
270 }
271
272 var backoffs []model.WorkBackoff
273 err = session.Transact(ctx, func(q *model.Queries) error {
274 var err error
275 backoffs, err = q.WorkBackoffOf(ctx, model.WorkBackoffOfParams{
276 MachineID: machine.MachineID,
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200277 Process: model.ProcessShepherdAgentStart,
Serge Bazanski20312b42023-04-19 13:49:47 +0200278 })
Serge Bazanskia9580a72023-01-12 14:44:35 +0100279 return err
Serge Bazanski20312b42023-04-19 13:49:47 +0200280 })
281 if err != nil {
282 t.Errorf("Failed to retrieve machines for agent start: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100283 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200284 if len(backoffs) < 1 {
285 t.Errorf("No backoff")
286 } else {
287 backoff := backoffs[0]
288 if want, got := nsec, backoff.LastIntervalSeconds.Int64; want != got {
289 t.Fatalf("Wanted backoff of %d seconds, got %d", want, got)
290 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100291 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200292 }
293
294 // Exercise exponential backoff functionality.
295 failAndCheck(2)
296 failAndCheck(4)
297 failAndCheck(5)
298 failAndCheck(5)
299
300 // If the job now succeeds, subsequent failures should start from 1 again.
301 work = waitMachine(5)
302 err = work.Finish(ctx, func(q *model.Queries) error {
303 // Not setting any tags that would cause subsequent queries to not return the
304 // machine anymore.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100305 return nil
306 })
307 if err != nil {
Serge Bazanski20312b42023-04-19 13:49:47 +0200308 t.Fatalf("Could not finish work: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100309 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200310
311 failAndCheck(1)
312 failAndCheck(2)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100313}
314
Serge Bazanski10383132023-02-20 15:39:45 +0100315// TestAgentStartWorkflow exercises the agent start workflow within the BMDB.
316func TestAgentStartWorkflow(t *testing.T) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100317 b := dut()
318 conn, err := b.Open(true)
319 if err != nil {
320 t.Fatalf("Open failed: %v", err)
321 }
322
323 ctx, ctxC := context.WithCancel(context.Background())
324 defer ctxC()
325
326 session, err := conn.StartSession(ctx)
327 if err != nil {
328 t.Fatalf("Starting session failed: %v", err)
329 }
330
331 // Create machine. Drop its ID.
332 err = session.Transact(ctx, func(q *model.Queries) error {
333 machine, err := q.NewMachine(ctx)
334 if err != nil {
335 return err
336 }
337 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
338 MachineID: machine.MachineID,
339 Provider: model.ProviderEquinix,
340 ProviderID: "123",
341 })
342 })
343 if err != nil {
344 t.Fatalf("Creating machine failed: %v", err)
345 }
346
347 // Start working on a machine.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100348 var machine uuid.UUID
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100349 startedC := make(chan struct{})
350 doneC := make(chan struct{})
351 errC := make(chan error)
352 go func() {
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200353 work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100354 machines, err := q.GetMachinesForAgentStart(ctx, 1)
355 if err != nil {
356 return nil, err
357 }
358 if len(machines) < 1 {
359 return nil, ErrNothingToDo
360 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100361 machine = machines[0].MachineID
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100362 return []uuid.UUID{machines[0].MachineID}, nil
363 })
364 defer work.Cancel(ctx)
365
366 if err != nil {
367 close(startedC)
368 errC <- err
369 return
370 }
371
372 // Simulate work by blocking on a channel.
373 close(startedC)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100374
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100375 <-doneC
376
377 err = work.Finish(ctx, func(q *model.Queries) error {
378 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
379 MachineID: work.Machine,
380 AgentStartedAt: time.Now(),
381 AgentPublicKey: []byte("fakefakefake"),
382 })
383 })
384 errC <- err
385 }()
386 <-startedC
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200387
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100388 // Work on the machine has started. Attempting to get more machines now should
389 // return no machines.
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200390
391 // Mutual exclusion with AgentStart:
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100392 err = session.Transact(ctx, func(q *model.Queries) error {
393 machines, err := q.GetMachinesForAgentStart(ctx, 1)
394 if err != nil {
395 return err
396 }
397 if len(machines) > 0 {
Serge Bazanski10383132023-02-20 15:39:45 +0100398 t.Errorf("Expected no machines ready for agent start.")
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100399 }
400 return nil
401 })
402 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100403 t.Fatalf("Failed to retrieve machines for start in parallel: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100404 }
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200405
406 // Mutual exclusion with Recovery:
407 err = session.Transact(ctx, func(q *model.Queries) error {
408 machines, err := q.GetMachineForAgentRecovery(ctx, 1)
409 if err != nil {
410 return err
411 }
412 if len(machines) > 0 {
413 t.Errorf("Expected no machines ready for agent recovery.")
414 }
415 return nil
416 })
417 if err != nil {
418 t.Fatalf("Failed to retrieve machines for recovery in parallel: %v", err)
419 }
420
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100421 // Finish working on machine.
422 close(doneC)
423 err = <-errC
424 if err != nil {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100425 t.Fatalf("Failed to finish work on machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100426 }
Serge Bazanski10383132023-02-20 15:39:45 +0100427 // That machine has its agent started, so we still expect no work to have to be
428 // done.
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100429 err = session.Transact(ctx, func(q *model.Queries) error {
430 machines, err := q.GetMachinesForAgentStart(ctx, 1)
431 if err != nil {
432 return err
433 }
434 if len(machines) > 0 {
Serge Bazanski10383132023-02-20 15:39:45 +0100435 t.Errorf("Expected still no machines ready for agent start.")
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100436 }
437 return nil
438 })
439 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100440 t.Fatalf("Failed to retrieve machines for agent start after work finished: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100441 }
442
443 // Check history has been recorded.
444 var history []model.WorkHistory
445 err = session.Transact(ctx, func(q *model.Queries) error {
446 history, err = q.ListHistoryOf(ctx, machine)
447 return err
448 })
449 if err != nil {
450 t.Fatalf("Failed to retrieve machine history: %v", err)
451 }
452 // Expect two history items: started and finished.
453 if want, got := 2, len(history); want != got {
454 t.Errorf("Wanted %d history items, got %d", want, got)
455 } else {
456 if want, got := model.WorkHistoryEventStarted, history[0].Event; want != got {
457 t.Errorf("Wanted first history event to be %s, got %s", want, got)
458 }
459 if want, got := model.WorkHistoryEventFinished, history[1].Event; want != got {
460 t.Errorf("Wanted second history event to be %s, got %s", want, got)
461 }
462 }
463 // Check all other history event data.
464 for i, el := range history {
465 if want, got := machine, el.MachineID; want.String() != got.String() {
466 t.Errorf("Wanted %d history event machine ID to be %s, got %s", i, want, got)
467 }
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200468 if want, got := model.ProcessShepherdAgentStart, el.Process; want != got {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100469 t.Errorf("Wanted %d history event process to be %s, got %s", i, want, got)
470 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100471 }
472}
473
Serge Bazanski10383132023-02-20 15:39:45 +0100474// TestAgentStartWorkflowParallel starts work on three machines by six workers
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100475// and makes sure that there are no scheduling conflicts between them.
Serge Bazanski10383132023-02-20 15:39:45 +0100476func TestAgentStartWorkflowParallel(t *testing.T) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100477 b := dut()
478 conn, err := b.Open(true)
479 if err != nil {
480 t.Fatalf("Open failed: %v", err)
481 }
482
483 ctx, ctxC := context.WithCancel(context.Background())
484 defer ctxC()
485
486 makeMachine := func(providerID string) {
487 ctxS, ctxC := context.WithCancel(ctx)
488 defer ctxC()
489 session, err := conn.StartSession(ctxS)
490 if err != nil {
491 t.Fatalf("Starting session failed: %v", err)
492 }
493 err = session.Transact(ctx, func(q *model.Queries) error {
494 machine, err := q.NewMachine(ctx)
495 if err != nil {
496 return err
497 }
498 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
499 MachineID: machine.MachineID,
500 Provider: model.ProviderEquinix,
501 ProviderID: providerID,
502 })
503 })
504 if err != nil {
505 t.Fatalf("Creating machine failed: %v", err)
506 }
507 }
508 // Make six machines for testing.
509 for i := 0; i < 6; i++ {
510 makeMachine(fmt.Sprintf("test%d", i))
511 }
512
513 workStarted := make(chan struct{})
514 workDone := make(chan struct {
515 machine uuid.UUID
516 workerID int
517 })
518
519 workOnce := func(ctx context.Context, workerID int, session *Session) error {
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200520 work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100521 machines, err := q.GetMachinesForAgentStart(ctx, 1)
522 if err != nil {
523 return nil, err
524 }
525 if len(machines) < 1 {
526 return nil, ErrNothingToDo
527 }
528 return []uuid.UUID{machines[0].MachineID}, nil
529 })
530
531 if err != nil {
532 return err
533 }
534 defer work.Cancel(ctx)
535
536 select {
537 case <-workStarted:
538 case <-ctx.Done():
539 return ctx.Err()
540 }
541
542 select {
543 case workDone <- struct {
544 machine uuid.UUID
545 workerID int
546 }{
547 machine: work.Machine,
548 workerID: workerID,
549 }:
550 case <-ctx.Done():
551 return ctx.Err()
552 }
553
554 return work.Finish(ctx, func(q *model.Queries) error {
555 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
556 MachineID: work.Machine,
557 AgentStartedAt: time.Now(),
558 AgentPublicKey: []byte("fakefakefake"),
559 })
560 })
561 }
562
563 worker := func(workerID int) {
564 ctxS, ctxC := context.WithCancel(ctx)
565 defer ctxC()
566 session, err := conn.StartSession(ctxS)
567 if err != nil {
568 t.Fatalf("Starting session failed: %v", err)
569 }
570 for {
571 err := workOnce(ctxS, workerID, session)
572 if err != nil {
Serge Bazanskice4af2b2023-03-16 21:23:39 +0100573 if errors.Is(err, ErrNothingToDo) {
574 continue
575 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100576 if errors.Is(err, ctxS.Err()) {
577 return
578 }
579 t.Fatalf("worker failed: %v", err)
580 }
581 }
582 }
583 // Start three workers.
584 for i := 0; i < 3; i++ {
585 go worker(i)
586 }
587
588 // Wait for at least three workers to be alive.
589 for i := 0; i < 3; i++ {
590 workStarted <- struct{}{}
591 }
592
593 // Allow all workers to continue running from now on.
594 close(workStarted)
595
596 // Expect six machines to have been handled in parallel by three workers.
597 seenWorkers := make(map[int]bool)
598 seenMachines := make(map[string]bool)
599 for i := 0; i < 6; i++ {
600 res := <-workDone
601 seenWorkers[res.workerID] = true
602 seenMachines[res.machine.String()] = true
603 }
604
605 if want, got := 3, len(seenWorkers); want != got {
606 t.Errorf("Expected %d workers, got %d", want, got)
607 }
608 if want, got := 6, len(seenMachines); want != got {
609 t.Errorf("Expected %d machines, got %d", want, got)
610 }
611}