blob: 95b8be540fcbc3520909243449788762e024005a [file] [log] [blame]
Serge Bazanski35e8d792022-10-11 11:32:30 +02001package bmdb
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "time"
9
10 "github.com/cockroachdb/cockroach-go/v2/crdb"
11 "github.com/google/uuid"
12 "github.com/lib/pq"
13 "k8s.io/klog/v2"
14
Serge Bazanskic50f6942023-04-24 18:27:22 +020015 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
Serge Bazanski35e8d792022-10-11 11:32:30 +020016 "source.monogon.dev/cloud/bmaas/bmdb/model"
Serge Bazanski35e8d792022-10-11 11:32:30 +020017)
18
Serge Bazanski35e8d792022-10-11 11:32:30 +020019// StartSession creates a new BMDB session which will be maintained in a
20// background goroutine as long as the given context is valid. Each Session is
21// represented by an entry in a sessions table within the BMDB, and subsequent
22// Transact calls emit SQL transactions which depend on that entry still being
23// present and up to date. A garbage collection system (to be implemented) will
24// remove expired sessions from the BMDB, but this mechanism is not necessary
25// for the session expiry mechanism to work.
26//
27// When the session becomes invalid (for example due to network partition),
28// subsequent attempts to call Transact will fail with ErrSessionExpired. This
29// means that the caller within the component is responsible for recreating a
30// new Session if a previously used one expires.
Serge Bazanskic50f6942023-04-24 18:27:22 +020031func (c *Connection) StartSession(ctx context.Context, opts ...SessionOption) (*Session, error) {
Serge Bazanski35e8d792022-10-11 11:32:30 +020032 intervalSeconds := 5
33
34 res, err := model.New(c.db).NewSession(ctx, model.NewSessionParams{
35 SessionComponentName: c.bmdb.ComponentName,
36 SessionRuntimeInfo: c.bmdb.RuntimeInfo,
37 SessionIntervalSeconds: int64(intervalSeconds),
38 })
39 if err != nil {
40 return nil, fmt.Errorf("creating session failed: %w", err)
41 }
42
43 klog.Infof("Started session %s", res.SessionID)
44
45 ctx2, ctxC := context.WithCancel(ctx)
46
Serge Bazanskic50f6942023-04-24 18:27:22 +020047 var processor metrics.Processor
48 for _, opt := range opts {
49 if opt.Processor != "" {
50 processor = opt.Processor
51 }
52 }
53
Serge Bazanski35e8d792022-10-11 11:32:30 +020054 s := &Session{
55 connection: c,
56 interval: time.Duration(intervalSeconds) * time.Second,
57
58 UUID: res.SessionID,
59
60 ctx: ctx2,
61 ctxC: ctxC,
Serge Bazanskic50f6942023-04-24 18:27:22 +020062 m: c.bmdb.metrics.Recorder(processor),
Serge Bazanski35e8d792022-10-11 11:32:30 +020063 }
Serge Bazanskic50f6942023-04-24 18:27:22 +020064 s.m.OnSessionStarted()
Serge Bazanski35e8d792022-10-11 11:32:30 +020065 go s.maintainHeartbeat(ctx2)
66 return s, nil
67}
68
Serge Bazanskic50f6942023-04-24 18:27:22 +020069type SessionOption struct {
70 Processor metrics.Processor
71}
72
Serge Bazanski35e8d792022-10-11 11:32:30 +020073// Session is a session (identified by UUID) that has been started in the BMDB.
74// Its liveness is maintained by a background goroutine, and as long as that
75// session is alive, it can perform transactions and work on the BMDB.
76type Session struct {
77 connection *Connection
78 interval time.Duration
79
80 UUID uuid.UUID
81
82 ctx context.Context
83 ctxC context.CancelFunc
Serge Bazanskic50f6942023-04-24 18:27:22 +020084
85 m *metrics.ProcessorRecorder
Serge Bazanski35e8d792022-10-11 11:32:30 +020086}
87
Serge Bazanski42f13462023-04-19 15:00:06 +020088// Expired returns true if this session is expired and will fail all subsequent
89// transactions/work.
90func (s *Session) Expired() bool {
91 return s.ctx.Err() != nil
92}
93
94// expire is a helper which marks this session as expired and returns
95// ErrSessionExpired.
96func (s *Session) expire() error {
97 s.ctxC()
98 return ErrSessionExpired
99}
100
Serge Bazanski35e8d792022-10-11 11:32:30 +0200101var (
102 // ErrSessionExpired is returned when attempting to Transact or Work on a
103 // Session that has expired or been canceled. Once a Session starts returning
104 // these errors, it must be re-created by another StartSession call, as no other
105 // calls will succeed.
106 ErrSessionExpired = errors.New("session expired")
107 // ErrWorkConflict is returned when attempting to Work on a Session with a
108 // process name that's already performing some work, concurrently, on the
109 // requested machine.
110 ErrWorkConflict = errors.New("conflicting work on machine")
111)
112
113// maintainHeartbeat will attempt to repeatedly poke the session at a frequency
114// twice of that of the minimum frequency mandated by the configured 5-second
115// interval. It will exit if it detects that the session cannot be maintained
116// anymore, canceling the session's internal context and causing future
117// Transact/Work calls to fail.
118func (s *Session) maintainHeartbeat(ctx context.Context) {
119 // Internal deadline, used to check whether we haven't dropped the ball on
120 // performing the updates due to a lot of transient errors.
121 deadline := time.Now().Add(s.interval)
122 for {
123 if ctx.Err() != nil {
124 klog.Infof("Session %s: context over, exiting: %v", s.UUID, ctx.Err())
125 return
126 }
127
128 err := s.Transact(ctx, func(q *model.Queries) error {
129 sessions, err := q.SessionCheck(ctx, s.UUID)
130 if err != nil {
131 return fmt.Errorf("when retrieving session: %w", err)
132 }
133 if len(sessions) < 1 {
Serge Bazanski42f13462023-04-19 15:00:06 +0200134 return s.expire()
Serge Bazanski35e8d792022-10-11 11:32:30 +0200135 }
136 err = q.SessionPoke(ctx, s.UUID)
137 if err != nil {
138 return fmt.Errorf("when poking session: %w", err)
139 }
140 return nil
141 })
142 if err != nil {
143 klog.Errorf("Session %s: update failed: %v", s.UUID, err)
144 if errors.Is(err, ErrSessionExpired) || deadline.After(time.Now()) {
145 // No way to recover.
146 klog.Errorf("Session %s: exiting", s.UUID)
147 s.ctxC()
148 return
149 }
150 // Just retry in a bit. One second seems about right for a 5 second interval.
151 //
152 // TODO(q3k): calculate this based on the configured interval.
153 time.Sleep(time.Second)
154 }
155 // Success. Keep going.
156 deadline = time.Now().Add(s.interval)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100157 select {
158 case <-ctx.Done():
159 // Do nothing, next loop iteration will exit.
160 case <-time.After(s.interval / 2):
161 // Do nothing, next loop iteration will heartbeat.
162 }
Serge Bazanski35e8d792022-10-11 11:32:30 +0200163 }
164}
165
166// Transact runs a given function in the context of both a CockroachDB and BMDB
167// transaction, retrying as necessary.
168//
169// Most pure (meaning without side effects outside the database itself) BMDB
170// transactions should be run this way.
171func (s *Session) Transact(ctx context.Context, fn func(q *model.Queries) error) error {
Serge Bazanskic50f6942023-04-24 18:27:22 +0200172 var attempts int64
173
174 err := crdb.ExecuteTx(ctx, s.connection.db, nil, func(tx *sql.Tx) error {
175 attempts += 1
176 s.m.OnTransactionStarted(attempts)
177
Serge Bazanski35e8d792022-10-11 11:32:30 +0200178 qtx := model.New(tx)
179 sessions, err := qtx.SessionCheck(ctx, s.UUID)
180 if err != nil {
181 return fmt.Errorf("when retrieving session: %w", err)
182 }
183 if len(sessions) < 1 {
Serge Bazanski42f13462023-04-19 15:00:06 +0200184 return s.expire()
Serge Bazanski35e8d792022-10-11 11:32:30 +0200185 }
186
187 if err := fn(qtx); err != nil {
188 return err
189 }
190
191 return nil
192 })
Serge Bazanskic50f6942023-04-24 18:27:22 +0200193 if err != nil {
194 s.m.OnTransactionFailed()
195 }
196 return err
Serge Bazanski35e8d792022-10-11 11:32:30 +0200197}
198
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100199var (
200 ErrNothingToDo = errors.New("nothing to do")
201 // PostgresUniqueViolation is returned by the lib/pq driver when a mutation
202 // cannot be performed due to a UNIQUE constraint being violated as a result of
203 // the query.
204 postgresUniqueViolation = pq.ErrorCode("23505")
205)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200206
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100207// Work starts work on a machine. Full work execution is performed in three
208// phases:
209//
210// 1. Retrieval phase. This is performed by 'fn' given to this function.
211// The retrieval function must return zero or more machines that some work
212// should be performed on per the BMDB. The first returned machine will be
213// locked for work under the given process and made available in the Work
214// structure returned by this call. The function may be called multiple times,
215// as it's run within a CockroachDB transaction which may be retried an
216// arbitrary number of times. Thus, it should be side-effect free, ideally only
217// performing read queries to the database.
218// 2. Work phase. This is performed by user code while holding on to the Work
219// structure instance.
220// 3. Commit phase. This is performed by the function passed to Work.Finish. See
221// that method's documentation for more details.
222//
223// Important: after retrieving Work successfully, either Finish or Cancel must be
224// called, otherwise the machine will be locked until the parent session expires
225// or is closed! It's safe and recommended to `defer work.Close()` after calling
226// Work().
227//
228// If no machine is eligible for work, ErrNothingToDo should be returned by the
229// retrieval function, and the same error (wrapped) will be returned by Work. In
230// case the retrieval function returns no machines and no error, that error will
231// also be returned.
232//
233// The returned Work object is _not_ goroutine safe.
234func (s *Session) Work(ctx context.Context, process model.Process, fn func(q *model.Queries) ([]uuid.UUID, error)) (*Work, error) {
235 var mid *uuid.UUID
Serge Bazanski20312b42023-04-19 13:49:47 +0200236 var exisingingBackoff *existingBackoff
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100237 err := s.Transact(ctx, func(q *model.Queries) error {
238 mids, err := fn(q)
239 if err != nil {
240 return fmt.Errorf("could not retrieve machines for work: %w", err)
241 }
242 if len(mids) < 1 {
243 return ErrNothingToDo
244 }
245 mid = &mids[0]
246 err = q.StartWork(ctx, model.StartWorkParams{
247 MachineID: mids[0],
Serge Bazanski35e8d792022-10-11 11:32:30 +0200248 SessionID: s.UUID,
249 Process: process,
250 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100251 if err != nil {
252 var perr *pq.Error
253 if errors.As(err, &perr) && perr.Code == postgresUniqueViolation {
254 return ErrWorkConflict
255 }
256 return fmt.Errorf("could not start work on %q: %w", mids[0], err)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200257 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100258 err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
259 MachineID: mids[0],
260 Event: model.WorkHistoryEventStarted,
261 Process: process,
262 })
263 if err != nil {
264 return fmt.Errorf("could not insert history event: %w", err)
265 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200266 backoffs, err := q.WorkBackoffOf(ctx, model.WorkBackoffOfParams{
267 MachineID: mids[0],
268 Process: process,
269 })
270 if err != nil {
271 return fmt.Errorf("could not get backoffs: %w", err)
272 }
273 if len(backoffs) > 0 {
274 // If the backoff exists but the last interval is null (e.g. is from a previous
275 // version of the schema when backoffs had no interval data) pretend it doesn't
276 // exist. Then the backoff mechanism can restart from a clean slate and populate
277 // a new, full backoff row.
278 if backoff := backoffs[0]; backoff.LastIntervalSeconds.Valid {
279 klog.Infof("Existing backoff: %d seconds", backoff.LastIntervalSeconds.Int64)
280 exisingingBackoff = &existingBackoff{
281 lastInterval: time.Second * time.Duration(backoff.LastIntervalSeconds.Int64),
282 }
283 }
284 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100285 return nil
286 })
287 if err != nil {
288 return nil, err
289 }
Serge Bazanskic50f6942023-04-24 18:27:22 +0200290 w := &Work{
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100291 Machine: *mid,
292 s: s,
293 process: process,
Serge Bazanski20312b42023-04-19 13:49:47 +0200294 backoff: exisingingBackoff,
Serge Bazanskic50f6942023-04-24 18:27:22 +0200295 m: s.m.WithProcess(process),
296 }
297 w.m.OnWorkStarted()
298 klog.Infof("Started work %q on machine %q (sess %q)", process, *mid, s.UUID)
299 return w, nil
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100300}
Serge Bazanski35e8d792022-10-11 11:32:30 +0200301
Serge Bazanski20312b42023-04-19 13:49:47 +0200302// existingBackoff contains backoff information retrieved from a work item that
303// has previously failed with a backoff.
304type existingBackoff struct {
305 // lastInterval is the last interval as stored in the backoff table.
306 lastInterval time.Duration
307}
308
309// Backoff describes the configuration of backoff for a failed work item. It can
310// be passed to Work.Fail to cause an item to not be processed again (to be 'in
311// backoff') for a given period of time. Exponential backoff can be configured so
312// that subsequent failures of a process will have exponentially increasing
313// backoff periods, up to some maximum length.
314//
315// The underlying unit of backoff period length in the database is one second.
316// What that means is that all effective calculated backoff periods must be an
317// integer number of seconds. This is performed by always rounding up this period
318// to the nearest second. A side effect of this is that with exponential backoff,
319// non-integer exponents will be less precisely applied for small backoff values,
320// e.g. an exponent of 1.1 with initial backoff of 1s will generate the following
321// sequence of backoff periods:
322//
323// 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 15, 17
324//
325// Which corresponds to the following approximate multipliers in between periods:
326//
327// 2.00, 1.50, 1.33, 1.25, 1.20, 1.17, 1.14, 1.12, 1.11, 1.10, 1.18, 1.15, 1.13
328//
329// Thus, the exponent value should be treated more as a limit that the sequence
330// of periods will approach than a hard rule for calculating the periods.
331// However, if the exponent is larger than 1 (i.e. any time exponential backoff
332// is requested), this guarantees that the backoff won't get 'stuck' on a
333// repeated period value due to a rounding error.
334//
335// A zero backoff structure is valid and represents a non-exponential backoff of
336// one second.
337//
338// A partially filled structure is also valid. See the field comments for more
339// information about how fields are capped if not set. The described behaviour
340// allows for two useful shorthands:
341//
342// 1. If only Initial is set, then the backoff is non-exponential and will always
343// be of value Initial (or whatever the previous period already persisted the
344// database).
345// 2. If only Maximum and Exponent are set, the backoff will be exponential,
346// starting at one second, and exponentially increasing to Maximum.
347//
348// It is recommended to construct Backoff structures as const values and treat
349// them as read-only 'descriptors', one per work kind / process.
350//
351// One feature currently missing from the Backoff implementation is jitter. This
352// might be introduced in the future if deemed necessary.
353type Backoff struct {
354 // Initial backoff period, used for the backoff if this item failed for the first
355 // time (i.e. has not had a Finish call in between two Fail calls).
356 //
357 // Subsequent calls will ignore this field if the backoff is exponential. If
358 // non-exponential, the initial time will always override whatever was previously
359 // persisted in the database, i.e. the backoff will always be of value 'Initial'.
360 //
361 // Cannot be lower than one second. If it is, it will be capped to it.
362 Initial time.Duration `u:"initial"`
363
364 // Maximum time for backoff. If the calculation of the next back off period
365 // (based on the Exponent and last backoff value) exceeds this maximum, it will
366 // be capped to it.
367 //
368 // Maximum is not persisted in the database. Instead, it is always read from this
369 // structure.
370 //
371 // Cannot be lower than Initial. If it is, it will be capped to it.
372 Maximum time.Duration `u:"maximum"`
373
374 // Exponent used for next backoff calculation. Any time a work item fails
375 // directly after another failure, the previous backoff period will be multiplied
376 // by the exponent to yield the new backoff period. The new period will then be
377 // capped to Maximum.
378 //
379 // Exponent is not persisted in the database. Instead, it is always read from
380 // this structure.
381 //
382 // Cannot be lower than 1.0. If it is, it will be capped to it.
383 Exponent float64 `u:"exponent"`
384}
385
386// normalized copies the given backoff and returns a 'normalized' version of it,
387// with the 'when zero/unset' rules described in the Backoff documentation
388// strings.
389func (b *Backoff) normalized() *Backoff {
390 c := *b
391
392 if c.Exponent < 1.0 {
393 c.Exponent = 1.0
394 }
395 if c.Initial < time.Second {
396 c.Initial = time.Second
397 }
398 if c.Maximum < c.Initial {
399 c.Maximum = c.Initial
400 }
401 return &c
402}
403
404func (b *Backoff) simple() bool {
405 // Non-normalized simple backoffs will have a zero exponent.
406 if b.Exponent == 0.0 {
407 return true
408 }
409 // Normalized simple backoffs will have a 1.0 exponent.
410 if b.Exponent == 1.0 {
411 return true
412 }
413 return false
414}
415
416// next calculates the backoff period based on a backoff descriptor and previous
417// existing backoff information. Both or either can be nil.
418func (b *Backoff) next(e *existingBackoff) int64 {
419 second := time.Second.Nanoseconds()
420
421 // Minimum interval is one second. Start with that.
422 last := second
423 // Then, if we have a previous interval, and it's greater than a second, use that
424 // as the last interval.
425 if e != nil {
426 if previous := e.lastInterval.Nanoseconds(); previous > second {
427 last = previous
428 }
429 }
430
431 // If no backoff is configured, go with either the minimum of one second, or
432 // whatever the last previous interval was.
433 if b == nil {
434 return last / second
435 }
436
437 // Make a copy of the backoff descriptor, normalizing as necessary.
438 c := b.normalized()
439
440 // Simple backoffs always return Initial.
441 if b.simple() {
442 return c.Initial.Nanoseconds() / second
443 }
444
445 // If there is no existing backoff, return the initial backoff value directly.
446 if e == nil {
447 return c.Initial.Nanoseconds() / second
448 }
449
450 // Start out with the persisted interval.
451 next := last
452 // If by any chance we persisted an interval less than one second, clamp it.
453 if next < second {
454 next = second
455 }
456
457 // Multiply by exponent from descriptor.
458 next = int64(float64(next) * c.Exponent)
459
460 // Handle overflows. If multiplying by a positive number resulted in a lower
461 // value than what we started with, it means we overflowed and wrapped around. If
462 // so, clamp to maximum.
463 if next < last {
464 next = c.Maximum.Nanoseconds()
465 }
466
467 // Clamp to maximum.
468 if next > c.Maximum.Nanoseconds() {
469 next = c.Maximum.Nanoseconds()
470 }
471 // Round up to the nearest second.
472 if next%second == 0 {
473 return next / second
474 } else {
475 return next/second + 1
476 }
477}
478
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100479// Work being performed on a machine.
480type Work struct {
481 // Machine that this work is being performed on, as retrieved by the retrieval
482 // function passed to the Work method.
483 Machine uuid.UUID
484 // s is the parent session.
485 s *Session
486 // done marks that this work has already been canceled or finished.
487 done bool
488 // process that this work performs.
489 process model.Process
Serge Bazanski20312b42023-04-19 13:49:47 +0200490
491 backoff *existingBackoff
Serge Bazanskic50f6942023-04-24 18:27:22 +0200492
493 m *metrics.ProcessRecorder
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100494}
495
496// Cancel the Work started on a machine. If the work has already been finished
497// or canceled, this is a no-op. In case of error, a log line will be emitted.
498func (w *Work) Cancel(ctx context.Context) {
499 if w.done {
500 return
501 }
502 w.done = true
Serge Bazanskic50f6942023-04-24 18:27:22 +0200503 w.m.OnWorkFinished(metrics.WorkResultCanceled)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100504
505 klog.Infof("Canceling work %q on machine %q (sess %q)", w.process, w.Machine, w.s.UUID)
506 // Eat error and log. There's nothing we can do if this fails, and if it does, it's
507 // probably because our connectivity to the BMDB has failed. If so, our session
508 // will be invalidated soon and so will the work being performed on this
509 // machine.
510 err := w.s.Transact(ctx, func(q *model.Queries) error {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100511 err := q.FinishWork(ctx, model.FinishWorkParams{
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100512 MachineID: w.Machine,
513 SessionID: w.s.UUID,
514 Process: w.process,
515 })
Serge Bazanskia9580a72023-01-12 14:44:35 +0100516 if err != nil {
517 return err
518 }
519 return q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
520 MachineID: w.Machine,
521 Process: w.process,
522 Event: model.WorkHistoryEventCanceled,
523 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100524 })
525 if err != nil {
526 klog.Errorf("Failed to cancel work %q on %q (sess %q): %v", w.process, w.Machine, w.s.UUID, err)
527 }
528}
529
530// Finish work by executing a commit function 'fn' and releasing the machine
531// from the work performed. The function given should apply tags to the
532// processed machine in a way that causes it to not be eligible for retrieval
533// again. As with the retriever function, the commit function might be called an
534// arbitrary number of times as part of cockroachdb transaction retries.
535//
536// This may be called only once.
537func (w *Work) Finish(ctx context.Context, fn func(q *model.Queries) error) error {
538 if w.done {
539 return fmt.Errorf("already finished")
540 }
541 w.done = true
Serge Bazanskic50f6942023-04-24 18:27:22 +0200542 w.m.OnWorkFinished(metrics.WorkResultFinished)
543
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100544 klog.Infof("Finishing work %q on machine %q (sess %q)", w.process, w.Machine, w.s.UUID)
545 return w.s.Transact(ctx, func(q *model.Queries) error {
546 err := q.FinishWork(ctx, model.FinishWorkParams{
547 MachineID: w.Machine,
548 SessionID: w.s.UUID,
549 Process: w.process,
550 })
551 if err != nil {
552 return err
553 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200554 err = q.WorkBackoffDelete(ctx, model.WorkBackoffDeleteParams{
555 MachineID: w.Machine,
556 Process: w.process,
557 })
558 if err != nil {
559 return err
560 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100561 err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
562 MachineID: w.Machine,
563 Process: w.process,
564 Event: model.WorkHistoryEventFinished,
565 })
566 if err != nil {
567 return err
568 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100569 return fn(q)
570 })
Serge Bazanski35e8d792022-10-11 11:32:30 +0200571}
Serge Bazanskia9580a72023-01-12 14:44:35 +0100572
Serge Bazanski20312b42023-04-19 13:49:47 +0200573// Fail work and introduce backoff. The given cause is an operator-readable
Serge Bazanskia9580a72023-01-12 14:44:35 +0100574// string that will be persisted alongside the backoff and the work history/audit
575// table.
Serge Bazanski20312b42023-04-19 13:49:47 +0200576//
577// The backoff describes a period during which the same process will not be
578// retried on this machine until its expiration.
579//
580// The given backoff is a structure which describes both the initial backoff
581// period if the work failed for the first time, and a mechanism to exponentially
582// increase the backoff period if that work failed repeatedly. The work is
583// defined to have failed repeatedly if it only resulted in Cancel/Fail calls
584// without any Finish calls in the meantime.
585//
586// Only the last backoff period is persisted in the database. The exponential
587// backoff behaviour (including its maximum time) is always calculated based on
588// the given backoff structure.
589//
590// If nil, the backoff defaults to a non-exponential, one second backoff. This is
591// the minimum designed to keep the system chugging along without repeatedly
592// trying a failed job in a loop. However, the backoff should generally be set to
593// some well engineered value to prevent spurious retries.
594func (w *Work) Fail(ctx context.Context, backoff *Backoff, cause string) error {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100595 if w.done {
596 return fmt.Errorf("already finished")
597 }
598 w.done = true
Serge Bazanskic50f6942023-04-24 18:27:22 +0200599 w.m.OnWorkFinished(metrics.WorkResultFailed)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100600
601 return w.s.Transact(ctx, func(q *model.Queries) error {
602 err := q.FinishWork(ctx, model.FinishWorkParams{
603 MachineID: w.Machine,
604 SessionID: w.s.UUID,
605 Process: w.process,
606 })
607 if err != nil {
608 return err
609 }
610 err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
611 MachineID: w.Machine,
612 Process: w.process,
613 Event: model.WorkHistoryEventFailed,
614 FailedCause: sql.NullString{
615 String: cause,
616 Valid: true,
617 },
618 })
619 if err != nil {
620 return err
621 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200622 if backoff == nil {
623 klog.Warningf("Nil backoff for %q on machine %q: defaulting to one second non-exponential.", w.process, w.Machine)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100624 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200625 seconds := backoff.next(w.backoff)
626 klog.Infof("Adding backoff for %q on machine %q: %d seconds", w.process, w.Machine, seconds)
627 return q.WorkBackoffInsert(ctx, model.WorkBackoffInsertParams{
628 MachineID: w.Machine,
629 Process: w.process,
630 Cause: cause,
631 Seconds: seconds,
632 })
Serge Bazanskia9580a72023-01-12 14:44:35 +0100633 })
634}