blob: 8839c171d455014d35a9eb7517de8de1a6e125f0 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanski35e8d792022-10-11 11:32:30 +02004package bmdb
5
6import (
7 "context"
8 "errors"
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +01009 "fmt"
Serge Bazanski35e8d792022-10-11 11:32:30 +020010 "testing"
11 "time"
12
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +010013 "github.com/google/uuid"
14
Serge Bazanski35e8d792022-10-11 11:32:30 +020015 "source.monogon.dev/cloud/bmaas/bmdb/model"
16 "source.monogon.dev/cloud/lib/component"
17)
18
19func dut() *BMDB {
20 return &BMDB{
21 Config: Config{
22 Database: component.CockroachConfig{
23 InMemory: true,
24 },
25 },
26 }
27}
28
29// TestSessionExpiry exercises the session heartbeat logic, making sure that if
30// a session stops being maintained subsequent Transact calls will fail.
31func TestSessionExpiry(t *testing.T) {
32 b := dut()
33 conn, err := b.Open(true)
34 if err != nil {
35 t.Fatalf("Open failed: %v", err)
36 }
37
38 ctx, ctxC := context.WithCancel(context.Background())
39 defer ctxC()
40
41 session, err := conn.StartSession(ctx)
42 if err != nil {
43 t.Fatalf("Starting session failed: %v", err)
44 }
45
46 // A transaction in a brand-new session should work.
47 var machine model.Machine
48 err = session.Transact(ctx, func(q *model.Queries) error {
49 machine, err = q.NewMachine(ctx)
50 return err
51 })
52 if err != nil {
53 t.Fatalf("First transaction failed: %v", err)
54 }
55
56 time.Sleep(6 * time.Second)
57
58 // A transaction after the 5-second session interval should continue to work.
59 err = session.Transact(ctx, func(q *model.Queries) error {
60 _, err = q.NewMachine(ctx)
61 return err
62 })
63 if err != nil {
64 t.Fatalf("Second transaction failed: %v", err)
65 }
66
67 // A transaction after the 5-second session interval should fail if we don't
68 // maintain its heartbeat.
69 session.ctxC()
70 time.Sleep(6 * time.Second)
71
72 err = session.Transact(ctx, func(q *model.Queries) error {
73 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
74 MachineID: machine.MachineID,
75 Provider: "foo",
76 ProviderID: "bar",
77 })
78 })
79 if !errors.Is(err, ErrSessionExpired) {
80 t.Fatalf("Second transaction should've failed due to expired session, got %v", err)
81 }
82
83}
84
85// TestWork exercises the per-{process,machine} mutual exclusion mechanism of
86// Work items.
87func TestWork(t *testing.T) {
88 b := dut()
89 conn, err := b.Open(true)
90 if err != nil {
91 t.Fatalf("Open failed: %v", err)
92 }
93
94 ctx, ctxC := context.WithCancel(context.Background())
95 defer ctxC()
96
97 // Start two session for testing.
98 session1, err := conn.StartSession(ctx)
99 if err != nil {
100 t.Fatalf("Starting session failed: %v", err)
101 }
102 session2, err := conn.StartSession(ctx)
103 if err != nil {
104 t.Fatalf("Starting session failed: %v", err)
105 }
106
107 var machine model.Machine
108 err = session1.Transact(ctx, func(q *model.Queries) error {
109 machine, err = q.NewMachine(ctx)
110 return err
111 })
112 if err != nil {
113 t.Fatalf("Creating machine failed: %v", err)
114 }
115
116 // Create a subcontext for a long-term work request. We'll cancel it later as
117 // part of the test.
118 ctxB, ctxBC := context.WithCancel(ctx)
119 defer ctxBC()
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100120
121 constantRetriever := func(_ *model.Queries) ([]uuid.UUID, error) {
122 return []uuid.UUID{machine.MachineID}, nil
123 }
124
125 // Start work on machine which we're not gonna finish for a while.
126 work1, err := session1.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200127 if err != nil {
128 t.Fatalf("Starting first work failed: %v", err)
129 }
130
131 // Starting more work on the same machine but a different process should still
132 // be allowed.
133 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100134 work2, err := session.Work(ctxB, model.ProcessUnitTest2, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200135 if err != nil {
136 t.Errorf("Could not run concurrent process on machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100137 } else {
138 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200139 }
140 }
141
142 // However, starting work with the same process on the same machine should
143 // fail.
144 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100145 work2, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200146 if !errors.Is(err, ErrWorkConflict) {
147 t.Errorf("Concurrent work with same process should've been forbidden, got %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100148 work2.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200149 }
150 }
151
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100152 // Now, finish the long-running work.
153 work1.Cancel(ctx)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200154
155 // We should now be able to perform 'test1' work again against this machine.
156 for _, session := range []*Session{session1, session2} {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100157 work1, err := session.Work(ctxB, model.ProcessUnitTest1, constantRetriever)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200158 if err != nil {
159 t.Errorf("Could not run work against machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100160 } else {
161 work1.Cancel(ctxB)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200162 }
163 }
164}
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100165
Serge Bazanski20312b42023-04-19 13:49:47 +0200166// TestWorkBackoff exercises the backoff functionality within the BMDB.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100167func TestWorkBackoff(t *testing.T) {
168 b := dut()
169 conn, err := b.Open(true)
170 if err != nil {
171 t.Fatalf("Open failed: %v", err)
172 }
173
174 ctx, ctxC := context.WithCancel(context.Background())
175 defer ctxC()
176
177 session, err := conn.StartSession(ctx)
178 if err != nil {
179 t.Fatalf("Starting session failed: %v", err)
180 }
181
182 var machine model.Machine
183 // Create machine.
184 err = session.Transact(ctx, func(q *model.Queries) error {
185 machine, err = q.NewMachine(ctx)
186 if err != nil {
187 return err
188 }
189 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
190 MachineID: machine.MachineID,
191 Provider: model.ProviderEquinix,
192 ProviderID: "123",
193 })
194 })
195 if err != nil {
196 t.Fatalf("Creating machine failed: %v", err)
197 }
198
Serge Bazanski20312b42023-04-19 13:49:47 +0200199 waitMachine := func(nsec int64) *Work {
200 t.Helper()
201
202 deadline := time.Now().Add(time.Duration(nsec) * 2 * time.Second)
203 for {
204 if time.Now().After(deadline) {
205 t.Fatalf("Deadline expired")
206 }
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200207 work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000208 machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
209 Limit: 1,
210 Provider: model.ProviderEquinix,
211 })
Serge Bazanski20312b42023-04-19 13:49:47 +0200212 if err != nil {
213 return nil, err
214 }
215 if len(machines) < 1 {
216 return nil, ErrNothingToDo
217 }
218 return []uuid.UUID{machines[0].MachineID}, nil
219 })
220 if err == nil {
221 return work
222 }
223 if !errors.Is(err, ErrNothingToDo) {
224 t.Fatalf("Unexpected work error: %v", err)
225 }
226 time.Sleep(100 * time.Millisecond)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100227 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100228 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200229
230 // Work on machine, but fail it with a backoff.
231 work := waitMachine(1)
232 backoff := Backoff{
233 Initial: time.Second,
234 Maximum: 5 * time.Second,
235 Exponent: 2,
236 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100237 if err := work.Fail(ctx, &backoff, "test"); err != nil {
238 t.Fatalf("Failing work failed: %v", err)
239 }
240
Serge Bazanski20312b42023-04-19 13:49:47 +0200241 expect := func(count int) {
242 t.Helper()
Serge Bazanskia9580a72023-01-12 14:44:35 +0100243
Serge Bazanski20312b42023-04-19 13:49:47 +0200244 var machines []model.MachineProvided
245 var err error
246 err = session.Transact(ctx, func(q *model.Queries) error {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000247 machines, err = q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
248 Limit: 1,
249 Provider: model.ProviderEquinix,
250 })
Serge Bazanski20312b42023-04-19 13:49:47 +0200251 if err != nil {
252 return err
253 }
254 return nil
Serge Bazanskia9580a72023-01-12 14:44:35 +0100255 })
Serge Bazanski20312b42023-04-19 13:49:47 +0200256 if err != nil {
257 t.Errorf("Failed to retrieve machines for agent start: %v", err)
258 }
259 if want, got := count, len(machines); want != got {
260 t.Errorf("Expected %d machines, got %d", want, got)
261 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100262 }
263
Serge Bazanski20312b42023-04-19 13:49:47 +0200264 // The machine shouldn't be returned now.
265 expect(0)
266
267 // Wait for the backoff to expire.
268 time.Sleep(1100 * time.Millisecond)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100269
270 // The machine should now be returned again.
Serge Bazanski20312b42023-04-19 13:49:47 +0200271 expect(1)
272
273 // Prepare helper for checking exponential backoffs.
274 failAndCheck := func(nsec int64) {
275 t.Helper()
276 work := waitMachine(nsec)
277 if err := work.Fail(ctx, &backoff, "test"); err != nil {
278 t.Fatalf("Failing work failed: %v", err)
279 }
280
281 var backoffs []model.WorkBackoff
282 err = session.Transact(ctx, func(q *model.Queries) error {
283 var err error
284 backoffs, err = q.WorkBackoffOf(ctx, model.WorkBackoffOfParams{
285 MachineID: machine.MachineID,
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200286 Process: model.ProcessShepherdAgentStart,
Serge Bazanski20312b42023-04-19 13:49:47 +0200287 })
Serge Bazanskia9580a72023-01-12 14:44:35 +0100288 return err
Serge Bazanski20312b42023-04-19 13:49:47 +0200289 })
290 if err != nil {
291 t.Errorf("Failed to retrieve machines for agent start: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100292 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200293 if len(backoffs) < 1 {
294 t.Errorf("No backoff")
295 } else {
296 backoff := backoffs[0]
297 if want, got := nsec, backoff.LastIntervalSeconds.Int64; want != got {
298 t.Fatalf("Wanted backoff of %d seconds, got %d", want, got)
299 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100300 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200301 }
302
303 // Exercise exponential backoff functionality.
304 failAndCheck(2)
305 failAndCheck(4)
306 failAndCheck(5)
307 failAndCheck(5)
308
309 // If the job now succeeds, subsequent failures should start from 1 again.
310 work = waitMachine(5)
311 err = work.Finish(ctx, func(q *model.Queries) error {
312 // Not setting any tags that would cause subsequent queries to not return the
313 // machine anymore.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100314 return nil
315 })
316 if err != nil {
Serge Bazanski20312b42023-04-19 13:49:47 +0200317 t.Fatalf("Could not finish work: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100318 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200319
320 failAndCheck(1)
321 failAndCheck(2)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100322}
323
Serge Bazanski10383132023-02-20 15:39:45 +0100324// TestAgentStartWorkflow exercises the agent start workflow within the BMDB.
325func TestAgentStartWorkflow(t *testing.T) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100326 b := dut()
327 conn, err := b.Open(true)
328 if err != nil {
329 t.Fatalf("Open failed: %v", err)
330 }
331
332 ctx, ctxC := context.WithCancel(context.Background())
333 defer ctxC()
334
335 session, err := conn.StartSession(ctx)
336 if err != nil {
337 t.Fatalf("Starting session failed: %v", err)
338 }
339
340 // Create machine. Drop its ID.
341 err = session.Transact(ctx, func(q *model.Queries) error {
342 machine, err := q.NewMachine(ctx)
343 if err != nil {
344 return err
345 }
346 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
347 MachineID: machine.MachineID,
348 Provider: model.ProviderEquinix,
349 ProviderID: "123",
350 })
351 })
352 if err != nil {
353 t.Fatalf("Creating machine failed: %v", err)
354 }
355
356 // Start working on a machine.
Serge Bazanskia9580a72023-01-12 14:44:35 +0100357 var machine uuid.UUID
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100358 startedC := make(chan struct{})
359 doneC := make(chan struct{})
360 errC := make(chan error)
361 go func() {
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200362 work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000363 machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
364 Limit: 1,
365 Provider: model.ProviderEquinix,
366 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100367 if err != nil {
368 return nil, err
369 }
370 if len(machines) < 1 {
371 return nil, ErrNothingToDo
372 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100373 machine = machines[0].MachineID
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100374 return []uuid.UUID{machines[0].MachineID}, nil
375 })
376 defer work.Cancel(ctx)
377
378 if err != nil {
379 close(startedC)
380 errC <- err
381 return
382 }
383
384 // Simulate work by blocking on a channel.
385 close(startedC)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100386
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100387 <-doneC
388
389 err = work.Finish(ctx, func(q *model.Queries) error {
390 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
391 MachineID: work.Machine,
392 AgentStartedAt: time.Now(),
393 AgentPublicKey: []byte("fakefakefake"),
394 })
395 })
396 errC <- err
397 }()
398 <-startedC
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200399
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100400 // Work on the machine has started. Attempting to get more machines now should
401 // return no machines.
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200402
403 // Mutual exclusion with AgentStart:
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100404 err = session.Transact(ctx, func(q *model.Queries) error {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000405 machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
406 Limit: 1,
407 Provider: model.ProviderEquinix,
408 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100409 if err != nil {
410 return err
411 }
412 if len(machines) > 0 {
Serge Bazanski10383132023-02-20 15:39:45 +0100413 t.Errorf("Expected no machines ready for agent start.")
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100414 }
415 return nil
416 })
417 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100418 t.Fatalf("Failed to retrieve machines for start in parallel: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100419 }
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200420
421 // Mutual exclusion with Recovery:
422 err = session.Transact(ctx, func(q *model.Queries) error {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000423 machines, err := q.GetMachineForAgentRecovery(ctx, model.GetMachineForAgentRecoveryParams{
424 Limit: 1,
425 Provider: model.ProviderEquinix,
426 })
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200427 if err != nil {
428 return err
429 }
430 if len(machines) > 0 {
431 t.Errorf("Expected no machines ready for agent recovery.")
432 }
433 return nil
434 })
435 if err != nil {
436 t.Fatalf("Failed to retrieve machines for recovery in parallel: %v", err)
437 }
438
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100439 // Finish working on machine.
440 close(doneC)
441 err = <-errC
442 if err != nil {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100443 t.Fatalf("Failed to finish work on machine: %v", err)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100444 }
Serge Bazanski10383132023-02-20 15:39:45 +0100445 // That machine has its agent started, so we still expect no work to have to be
446 // done.
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100447 err = session.Transact(ctx, func(q *model.Queries) error {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000448 machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
449 Limit: 1,
450 Provider: model.ProviderEquinix,
451 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100452 if err != nil {
453 return err
454 }
455 if len(machines) > 0 {
Serge Bazanski10383132023-02-20 15:39:45 +0100456 t.Errorf("Expected still no machines ready for agent start.")
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100457 }
458 return nil
459 })
460 if err != nil {
Serge Bazanski10383132023-02-20 15:39:45 +0100461 t.Fatalf("Failed to retrieve machines for agent start after work finished: %v", err)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100462 }
463
464 // Check history has been recorded.
465 var history []model.WorkHistory
466 err = session.Transact(ctx, func(q *model.Queries) error {
467 history, err = q.ListHistoryOf(ctx, machine)
468 return err
469 })
470 if err != nil {
471 t.Fatalf("Failed to retrieve machine history: %v", err)
472 }
473 // Expect two history items: started and finished.
474 if want, got := 2, len(history); want != got {
475 t.Errorf("Wanted %d history items, got %d", want, got)
476 } else {
477 if want, got := model.WorkHistoryEventStarted, history[0].Event; want != got {
478 t.Errorf("Wanted first history event to be %s, got %s", want, got)
479 }
480 if want, got := model.WorkHistoryEventFinished, history[1].Event; want != got {
481 t.Errorf("Wanted second history event to be %s, got %s", want, got)
482 }
483 }
484 // Check all other history event data.
485 for i, el := range history {
486 if want, got := machine, el.MachineID; want.String() != got.String() {
487 t.Errorf("Wanted %d history event machine ID to be %s, got %s", i, want, got)
488 }
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200489 if want, got := model.ProcessShepherdAgentStart, el.Process; want != got {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100490 t.Errorf("Wanted %d history event process to be %s, got %s", i, want, got)
491 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100492 }
493}
494
Serge Bazanski10383132023-02-20 15:39:45 +0100495// TestAgentStartWorkflowParallel starts work on three machines by six workers
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100496// and makes sure that there are no scheduling conflicts between them.
Serge Bazanski10383132023-02-20 15:39:45 +0100497func TestAgentStartWorkflowParallel(t *testing.T) {
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100498 b := dut()
499 conn, err := b.Open(true)
500 if err != nil {
501 t.Fatalf("Open failed: %v", err)
502 }
503
504 ctx, ctxC := context.WithCancel(context.Background())
505 defer ctxC()
506
507 makeMachine := func(providerID string) {
508 ctxS, ctxC := context.WithCancel(ctx)
509 defer ctxC()
510 session, err := conn.StartSession(ctxS)
511 if err != nil {
512 t.Fatalf("Starting session failed: %v", err)
513 }
514 err = session.Transact(ctx, func(q *model.Queries) error {
515 machine, err := q.NewMachine(ctx)
516 if err != nil {
517 return err
518 }
519 return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
520 MachineID: machine.MachineID,
521 Provider: model.ProviderEquinix,
522 ProviderID: providerID,
523 })
524 })
525 if err != nil {
526 t.Fatalf("Creating machine failed: %v", err)
527 }
528 }
529 // Make six machines for testing.
530 for i := 0; i < 6; i++ {
531 makeMachine(fmt.Sprintf("test%d", i))
532 }
533
534 workStarted := make(chan struct{})
535 workDone := make(chan struct {
536 machine uuid.UUID
537 workerID int
538 })
539
540 workOnce := func(ctx context.Context, workerID int, session *Session) error {
Serge Bazanski00cf57d2023-04-20 11:19:00 +0200541 work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
Tim Windelschmidt0e749612023-08-07 17:42:59 +0000542 machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
543 Limit: 1,
544 Provider: model.ProviderEquinix,
545 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100546 if err != nil {
547 return nil, err
548 }
549 if len(machines) < 1 {
550 return nil, ErrNothingToDo
551 }
552 return []uuid.UUID{machines[0].MachineID}, nil
553 })
554
555 if err != nil {
556 return err
557 }
558 defer work.Cancel(ctx)
559
560 select {
561 case <-workStarted:
562 case <-ctx.Done():
563 return ctx.Err()
564 }
565
566 select {
567 case workDone <- struct {
568 machine uuid.UUID
569 workerID int
570 }{
571 machine: work.Machine,
572 workerID: workerID,
573 }:
574 case <-ctx.Done():
575 return ctx.Err()
576 }
577
578 return work.Finish(ctx, func(q *model.Queries) error {
579 return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
580 MachineID: work.Machine,
581 AgentStartedAt: time.Now(),
582 AgentPublicKey: []byte("fakefakefake"),
583 })
584 })
585 }
586
587 worker := func(workerID int) {
Tim Windelschmidt88049722024-04-11 23:09:23 +0200588 ctxS, ctxSC := context.WithCancel(ctx)
589 defer ctxSC()
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100590 session, err := conn.StartSession(ctxS)
591 if err != nil {
Tim Windelschmidt88049722024-04-11 23:09:23 +0200592 t.Errorf("Starting session failed: %v", err)
593 ctxC()
594 return
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100595 }
596 for {
597 err := workOnce(ctxS, workerID, session)
598 if err != nil {
Serge Bazanskice4af2b2023-03-16 21:23:39 +0100599 if errors.Is(err, ErrNothingToDo) {
600 continue
601 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100602 if errors.Is(err, ctxS.Err()) {
603 return
604 }
Tim Windelschmidt88049722024-04-11 23:09:23 +0200605 t.Errorf("worker failed: %v", err)
606 ctxC()
607 return
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100608 }
609 }
610 }
611 // Start three workers.
612 for i := 0; i < 3; i++ {
613 go worker(i)
614 }
615
616 // Wait for at least three workers to be alive.
617 for i := 0; i < 3; i++ {
Tim Windelschmidt88049722024-04-11 23:09:23 +0200618 select {
Tim Windelschmidt99e15112025-02-05 17:38:16 +0100619 case workStarted <- struct{}{}:
620 case <-ctx.Done():
621 t.FailNow()
Tim Windelschmidt88049722024-04-11 23:09:23 +0200622 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100623 }
624
625 // Allow all workers to continue running from now on.
626 close(workStarted)
627
628 // Expect six machines to have been handled in parallel by three workers.
629 seenWorkers := make(map[int]bool)
630 seenMachines := make(map[string]bool)
631 for i := 0; i < 6; i++ {
632 res := <-workDone
633 seenWorkers[res.workerID] = true
634 seenMachines[res.machine.String()] = true
635 }
636
637 if want, got := 3, len(seenWorkers); want != got {
638 t.Errorf("Expected %d workers, got %d", want, got)
639 }
640 if want, got := 6, len(seenMachines); want != got {
641 t.Errorf("Expected %d machines, got %d", want, got)
642 }
643}