blob: 22ea3fd08cf481ecd46cf4878e3c21523e70a876 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanski35e8d792022-10-11 11:32:30 +02004package bmdb
5
6import (
7 "context"
8 "database/sql"
9 "errors"
10 "fmt"
11 "time"
12
13 "github.com/cockroachdb/cockroach-go/v2/crdb"
14 "github.com/google/uuid"
15 "github.com/lib/pq"
16 "k8s.io/klog/v2"
17
Serge Bazanskic50f6942023-04-24 18:27:22 +020018 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
Serge Bazanski35e8d792022-10-11 11:32:30 +020019 "source.monogon.dev/cloud/bmaas/bmdb/model"
Serge Bazanski35e8d792022-10-11 11:32:30 +020020)
21
Serge Bazanski35e8d792022-10-11 11:32:30 +020022// StartSession creates a new BMDB session which will be maintained in a
23// background goroutine as long as the given context is valid. Each Session is
24// represented by an entry in a sessions table within the BMDB, and subsequent
25// Transact calls emit SQL transactions which depend on that entry still being
26// present and up to date. A garbage collection system (to be implemented) will
27// remove expired sessions from the BMDB, but this mechanism is not necessary
28// for the session expiry mechanism to work.
29//
30// When the session becomes invalid (for example due to network partition),
31// subsequent attempts to call Transact will fail with ErrSessionExpired. This
32// means that the caller within the component is responsible for recreating a
33// new Session if a previously used one expires.
Serge Bazanskic50f6942023-04-24 18:27:22 +020034func (c *Connection) StartSession(ctx context.Context, opts ...SessionOption) (*Session, error) {
Serge Bazanski35e8d792022-10-11 11:32:30 +020035 intervalSeconds := 5
36
37 res, err := model.New(c.db).NewSession(ctx, model.NewSessionParams{
38 SessionComponentName: c.bmdb.ComponentName,
39 SessionRuntimeInfo: c.bmdb.RuntimeInfo,
40 SessionIntervalSeconds: int64(intervalSeconds),
41 })
42 if err != nil {
43 return nil, fmt.Errorf("creating session failed: %w", err)
44 }
45
46 klog.Infof("Started session %s", res.SessionID)
47
48 ctx2, ctxC := context.WithCancel(ctx)
49
Serge Bazanskic50f6942023-04-24 18:27:22 +020050 var processor metrics.Processor
51 for _, opt := range opts {
52 if opt.Processor != "" {
53 processor = opt.Processor
54 }
55 }
56
Serge Bazanski35e8d792022-10-11 11:32:30 +020057 s := &Session{
58 connection: c,
59 interval: time.Duration(intervalSeconds) * time.Second,
60
61 UUID: res.SessionID,
62
63 ctx: ctx2,
64 ctxC: ctxC,
Serge Bazanskic50f6942023-04-24 18:27:22 +020065 m: c.bmdb.metrics.Recorder(processor),
Serge Bazanski35e8d792022-10-11 11:32:30 +020066 }
Serge Bazanskic50f6942023-04-24 18:27:22 +020067 s.m.OnSessionStarted()
Serge Bazanski35e8d792022-10-11 11:32:30 +020068 go s.maintainHeartbeat(ctx2)
69 return s, nil
70}
71
Serge Bazanskic50f6942023-04-24 18:27:22 +020072type SessionOption struct {
73 Processor metrics.Processor
74}
75
Serge Bazanski35e8d792022-10-11 11:32:30 +020076// Session is a session (identified by UUID) that has been started in the BMDB.
77// Its liveness is maintained by a background goroutine, and as long as that
78// session is alive, it can perform transactions and work on the BMDB.
79type Session struct {
80 connection *Connection
81 interval time.Duration
82
83 UUID uuid.UUID
84
85 ctx context.Context
86 ctxC context.CancelFunc
Serge Bazanskic50f6942023-04-24 18:27:22 +020087
88 m *metrics.ProcessorRecorder
Serge Bazanski35e8d792022-10-11 11:32:30 +020089}
90
Serge Bazanski42f13462023-04-19 15:00:06 +020091// Expired returns true if this session is expired and will fail all subsequent
92// transactions/work.
93func (s *Session) Expired() bool {
94 return s.ctx.Err() != nil
95}
96
97// expire is a helper which marks this session as expired and returns
98// ErrSessionExpired.
99func (s *Session) expire() error {
100 s.ctxC()
101 return ErrSessionExpired
102}
103
Serge Bazanski35e8d792022-10-11 11:32:30 +0200104var (
105 // ErrSessionExpired is returned when attempting to Transact or Work on a
106 // Session that has expired or been canceled. Once a Session starts returning
107 // these errors, it must be re-created by another StartSession call, as no other
108 // calls will succeed.
109 ErrSessionExpired = errors.New("session expired")
110 // ErrWorkConflict is returned when attempting to Work on a Session with a
111 // process name that's already performing some work, concurrently, on the
112 // requested machine.
113 ErrWorkConflict = errors.New("conflicting work on machine")
114)
115
116// maintainHeartbeat will attempt to repeatedly poke the session at a frequency
117// twice of that of the minimum frequency mandated by the configured 5-second
118// interval. It will exit if it detects that the session cannot be maintained
119// anymore, canceling the session's internal context and causing future
120// Transact/Work calls to fail.
121func (s *Session) maintainHeartbeat(ctx context.Context) {
122 // Internal deadline, used to check whether we haven't dropped the ball on
123 // performing the updates due to a lot of transient errors.
124 deadline := time.Now().Add(s.interval)
125 for {
126 if ctx.Err() != nil {
127 klog.Infof("Session %s: context over, exiting: %v", s.UUID, ctx.Err())
128 return
129 }
130
131 err := s.Transact(ctx, func(q *model.Queries) error {
132 sessions, err := q.SessionCheck(ctx, s.UUID)
133 if err != nil {
134 return fmt.Errorf("when retrieving session: %w", err)
135 }
136 if len(sessions) < 1 {
Serge Bazanski42f13462023-04-19 15:00:06 +0200137 return s.expire()
Serge Bazanski35e8d792022-10-11 11:32:30 +0200138 }
139 err = q.SessionPoke(ctx, s.UUID)
140 if err != nil {
141 return fmt.Errorf("when poking session: %w", err)
142 }
143 return nil
144 })
145 if err != nil {
146 klog.Errorf("Session %s: update failed: %v", s.UUID, err)
147 if errors.Is(err, ErrSessionExpired) || deadline.After(time.Now()) {
148 // No way to recover.
149 klog.Errorf("Session %s: exiting", s.UUID)
150 s.ctxC()
151 return
152 }
153 // Just retry in a bit. One second seems about right for a 5 second interval.
154 //
155 // TODO(q3k): calculate this based on the configured interval.
156 time.Sleep(time.Second)
157 }
158 // Success. Keep going.
159 deadline = time.Now().Add(s.interval)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100160 select {
161 case <-ctx.Done():
162 // Do nothing, next loop iteration will exit.
163 case <-time.After(s.interval / 2):
164 // Do nothing, next loop iteration will heartbeat.
165 }
Serge Bazanski35e8d792022-10-11 11:32:30 +0200166 }
167}
168
169// Transact runs a given function in the context of both a CockroachDB and BMDB
170// transaction, retrying as necessary.
171//
172// Most pure (meaning without side effects outside the database itself) BMDB
173// transactions should be run this way.
174func (s *Session) Transact(ctx context.Context, fn func(q *model.Queries) error) error {
Serge Bazanskic50f6942023-04-24 18:27:22 +0200175 var attempts int64
176
177 err := crdb.ExecuteTx(ctx, s.connection.db, nil, func(tx *sql.Tx) error {
178 attempts += 1
179 s.m.OnTransactionStarted(attempts)
180
Serge Bazanski35e8d792022-10-11 11:32:30 +0200181 qtx := model.New(tx)
182 sessions, err := qtx.SessionCheck(ctx, s.UUID)
183 if err != nil {
184 return fmt.Errorf("when retrieving session: %w", err)
185 }
186 if len(sessions) < 1 {
Serge Bazanski42f13462023-04-19 15:00:06 +0200187 return s.expire()
Serge Bazanski35e8d792022-10-11 11:32:30 +0200188 }
189
190 if err := fn(qtx); err != nil {
191 return err
192 }
193
194 return nil
195 })
Serge Bazanskic50f6942023-04-24 18:27:22 +0200196 if err != nil {
197 s.m.OnTransactionFailed()
198 }
199 return err
Serge Bazanski35e8d792022-10-11 11:32:30 +0200200}
201
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100202var (
203 ErrNothingToDo = errors.New("nothing to do")
204 // PostgresUniqueViolation is returned by the lib/pq driver when a mutation
205 // cannot be performed due to a UNIQUE constraint being violated as a result of
206 // the query.
207 postgresUniqueViolation = pq.ErrorCode("23505")
208)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200209
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100210// Work starts work on a machine. Full work execution is performed in three
211// phases:
212//
213// 1. Retrieval phase. This is performed by 'fn' given to this function.
214// The retrieval function must return zero or more machines that some work
215// should be performed on per the BMDB. The first returned machine will be
216// locked for work under the given process and made available in the Work
217// structure returned by this call. The function may be called multiple times,
218// as it's run within a CockroachDB transaction which may be retried an
219// arbitrary number of times. Thus, it should be side-effect free, ideally only
220// performing read queries to the database.
221// 2. Work phase. This is performed by user code while holding on to the Work
222// structure instance.
223// 3. Commit phase. This is performed by the function passed to Work.Finish. See
224// that method's documentation for more details.
225//
226// Important: after retrieving Work successfully, either Finish or Cancel must be
227// called, otherwise the machine will be locked until the parent session expires
228// or is closed! It's safe and recommended to `defer work.Close()` after calling
229// Work().
230//
231// If no machine is eligible for work, ErrNothingToDo should be returned by the
232// retrieval function, and the same error (wrapped) will be returned by Work. In
233// case the retrieval function returns no machines and no error, that error will
234// also be returned.
235//
236// The returned Work object is _not_ goroutine safe.
237func (s *Session) Work(ctx context.Context, process model.Process, fn func(q *model.Queries) ([]uuid.UUID, error)) (*Work, error) {
238 var mid *uuid.UUID
Serge Bazanski20312b42023-04-19 13:49:47 +0200239 var exisingingBackoff *existingBackoff
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100240 err := s.Transact(ctx, func(q *model.Queries) error {
241 mids, err := fn(q)
242 if err != nil {
243 return fmt.Errorf("could not retrieve machines for work: %w", err)
244 }
245 if len(mids) < 1 {
246 return ErrNothingToDo
247 }
248 mid = &mids[0]
249 err = q.StartWork(ctx, model.StartWorkParams{
250 MachineID: mids[0],
Serge Bazanski35e8d792022-10-11 11:32:30 +0200251 SessionID: s.UUID,
252 Process: process,
253 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100254 if err != nil {
255 var perr *pq.Error
256 if errors.As(err, &perr) && perr.Code == postgresUniqueViolation {
257 return ErrWorkConflict
258 }
259 return fmt.Errorf("could not start work on %q: %w", mids[0], err)
Serge Bazanski35e8d792022-10-11 11:32:30 +0200260 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100261 err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
262 MachineID: mids[0],
263 Event: model.WorkHistoryEventStarted,
264 Process: process,
265 })
266 if err != nil {
267 return fmt.Errorf("could not insert history event: %w", err)
268 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200269 backoffs, err := q.WorkBackoffOf(ctx, model.WorkBackoffOfParams{
270 MachineID: mids[0],
271 Process: process,
272 })
273 if err != nil {
274 return fmt.Errorf("could not get backoffs: %w", err)
275 }
276 if len(backoffs) > 0 {
277 // If the backoff exists but the last interval is null (e.g. is from a previous
278 // version of the schema when backoffs had no interval data) pretend it doesn't
279 // exist. Then the backoff mechanism can restart from a clean slate and populate
280 // a new, full backoff row.
281 if backoff := backoffs[0]; backoff.LastIntervalSeconds.Valid {
282 klog.Infof("Existing backoff: %d seconds", backoff.LastIntervalSeconds.Int64)
283 exisingingBackoff = &existingBackoff{
284 lastInterval: time.Second * time.Duration(backoff.LastIntervalSeconds.Int64),
285 }
286 }
287 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100288 return nil
289 })
290 if err != nil {
291 return nil, err
292 }
Serge Bazanskic50f6942023-04-24 18:27:22 +0200293 w := &Work{
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100294 Machine: *mid,
295 s: s,
296 process: process,
Serge Bazanski20312b42023-04-19 13:49:47 +0200297 backoff: exisingingBackoff,
Serge Bazanskic50f6942023-04-24 18:27:22 +0200298 m: s.m.WithProcess(process),
299 }
300 w.m.OnWorkStarted()
301 klog.Infof("Started work %q on machine %q (sess %q)", process, *mid, s.UUID)
302 return w, nil
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100303}
Serge Bazanski35e8d792022-10-11 11:32:30 +0200304
Serge Bazanski20312b42023-04-19 13:49:47 +0200305// existingBackoff contains backoff information retrieved from a work item that
306// has previously failed with a backoff.
307type existingBackoff struct {
308 // lastInterval is the last interval as stored in the backoff table.
309 lastInterval time.Duration
310}
311
312// Backoff describes the configuration of backoff for a failed work item. It can
313// be passed to Work.Fail to cause an item to not be processed again (to be 'in
314// backoff') for a given period of time. Exponential backoff can be configured so
315// that subsequent failures of a process will have exponentially increasing
316// backoff periods, up to some maximum length.
317//
318// The underlying unit of backoff period length in the database is one second.
319// What that means is that all effective calculated backoff periods must be an
320// integer number of seconds. This is performed by always rounding up this period
321// to the nearest second. A side effect of this is that with exponential backoff,
322// non-integer exponents will be less precisely applied for small backoff values,
323// e.g. an exponent of 1.1 with initial backoff of 1s will generate the following
324// sequence of backoff periods:
325//
326// 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 15, 17
327//
328// Which corresponds to the following approximate multipliers in between periods:
329//
330// 2.00, 1.50, 1.33, 1.25, 1.20, 1.17, 1.14, 1.12, 1.11, 1.10, 1.18, 1.15, 1.13
331//
332// Thus, the exponent value should be treated more as a limit that the sequence
333// of periods will approach than a hard rule for calculating the periods.
334// However, if the exponent is larger than 1 (i.e. any time exponential backoff
335// is requested), this guarantees that the backoff won't get 'stuck' on a
336// repeated period value due to a rounding error.
337//
338// A zero backoff structure is valid and represents a non-exponential backoff of
339// one second.
340//
341// A partially filled structure is also valid. See the field comments for more
342// information about how fields are capped if not set. The described behaviour
343// allows for two useful shorthands:
344//
345// 1. If only Initial is set, then the backoff is non-exponential and will always
346// be of value Initial (or whatever the previous period already persisted the
347// database).
348// 2. If only Maximum and Exponent are set, the backoff will be exponential,
349// starting at one second, and exponentially increasing to Maximum.
350//
351// It is recommended to construct Backoff structures as const values and treat
352// them as read-only 'descriptors', one per work kind / process.
353//
354// One feature currently missing from the Backoff implementation is jitter. This
355// might be introduced in the future if deemed necessary.
356type Backoff struct {
357 // Initial backoff period, used for the backoff if this item failed for the first
358 // time (i.e. has not had a Finish call in between two Fail calls).
359 //
360 // Subsequent calls will ignore this field if the backoff is exponential. If
361 // non-exponential, the initial time will always override whatever was previously
362 // persisted in the database, i.e. the backoff will always be of value 'Initial'.
363 //
364 // Cannot be lower than one second. If it is, it will be capped to it.
365 Initial time.Duration `u:"initial"`
366
367 // Maximum time for backoff. If the calculation of the next back off period
368 // (based on the Exponent and last backoff value) exceeds this maximum, it will
369 // be capped to it.
370 //
371 // Maximum is not persisted in the database. Instead, it is always read from this
372 // structure.
373 //
374 // Cannot be lower than Initial. If it is, it will be capped to it.
375 Maximum time.Duration `u:"maximum"`
376
377 // Exponent used for next backoff calculation. Any time a work item fails
378 // directly after another failure, the previous backoff period will be multiplied
379 // by the exponent to yield the new backoff period. The new period will then be
380 // capped to Maximum.
381 //
382 // Exponent is not persisted in the database. Instead, it is always read from
383 // this structure.
384 //
385 // Cannot be lower than 1.0. If it is, it will be capped to it.
386 Exponent float64 `u:"exponent"`
387}
388
389// normalized copies the given backoff and returns a 'normalized' version of it,
390// with the 'when zero/unset' rules described in the Backoff documentation
391// strings.
392func (b *Backoff) normalized() *Backoff {
393 c := *b
394
395 if c.Exponent < 1.0 {
396 c.Exponent = 1.0
397 }
398 if c.Initial < time.Second {
399 c.Initial = time.Second
400 }
401 if c.Maximum < c.Initial {
402 c.Maximum = c.Initial
403 }
404 return &c
405}
406
407func (b *Backoff) simple() bool {
408 // Non-normalized simple backoffs will have a zero exponent.
409 if b.Exponent == 0.0 {
410 return true
411 }
412 // Normalized simple backoffs will have a 1.0 exponent.
413 if b.Exponent == 1.0 {
414 return true
415 }
416 return false
417}
418
419// next calculates the backoff period based on a backoff descriptor and previous
420// existing backoff information. Both or either can be nil.
421func (b *Backoff) next(e *existingBackoff) int64 {
422 second := time.Second.Nanoseconds()
423
424 // Minimum interval is one second. Start with that.
425 last := second
426 // Then, if we have a previous interval, and it's greater than a second, use that
427 // as the last interval.
428 if e != nil {
429 if previous := e.lastInterval.Nanoseconds(); previous > second {
430 last = previous
431 }
432 }
433
434 // If no backoff is configured, go with either the minimum of one second, or
435 // whatever the last previous interval was.
436 if b == nil {
437 return last / second
438 }
439
440 // Make a copy of the backoff descriptor, normalizing as necessary.
441 c := b.normalized()
442
443 // Simple backoffs always return Initial.
444 if b.simple() {
445 return c.Initial.Nanoseconds() / second
446 }
447
448 // If there is no existing backoff, return the initial backoff value directly.
449 if e == nil {
450 return c.Initial.Nanoseconds() / second
451 }
452
453 // Start out with the persisted interval.
454 next := last
455 // If by any chance we persisted an interval less than one second, clamp it.
456 if next < second {
457 next = second
458 }
459
460 // Multiply by exponent from descriptor.
461 next = int64(float64(next) * c.Exponent)
462
463 // Handle overflows. If multiplying by a positive number resulted in a lower
464 // value than what we started with, it means we overflowed and wrapped around. If
465 // so, clamp to maximum.
466 if next < last {
467 next = c.Maximum.Nanoseconds()
468 }
469
470 // Clamp to maximum.
471 if next > c.Maximum.Nanoseconds() {
472 next = c.Maximum.Nanoseconds()
473 }
474 // Round up to the nearest second.
475 if next%second == 0 {
476 return next / second
477 } else {
478 return next/second + 1
479 }
480}
481
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100482// Work being performed on a machine.
483type Work struct {
484 // Machine that this work is being performed on, as retrieved by the retrieval
485 // function passed to the Work method.
486 Machine uuid.UUID
487 // s is the parent session.
488 s *Session
489 // done marks that this work has already been canceled or finished.
490 done bool
491 // process that this work performs.
492 process model.Process
Serge Bazanski20312b42023-04-19 13:49:47 +0200493
494 backoff *existingBackoff
Serge Bazanskic50f6942023-04-24 18:27:22 +0200495
496 m *metrics.ProcessRecorder
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100497}
498
499// Cancel the Work started on a machine. If the work has already been finished
500// or canceled, this is a no-op. In case of error, a log line will be emitted.
501func (w *Work) Cancel(ctx context.Context) {
502 if w.done {
503 return
504 }
505 w.done = true
Serge Bazanskic50f6942023-04-24 18:27:22 +0200506 w.m.OnWorkFinished(metrics.WorkResultCanceled)
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100507
508 klog.Infof("Canceling work %q on machine %q (sess %q)", w.process, w.Machine, w.s.UUID)
509 // Eat error and log. There's nothing we can do if this fails, and if it does, it's
510 // probably because our connectivity to the BMDB has failed. If so, our session
511 // will be invalidated soon and so will the work being performed on this
512 // machine.
513 err := w.s.Transact(ctx, func(q *model.Queries) error {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100514 err := q.FinishWork(ctx, model.FinishWorkParams{
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100515 MachineID: w.Machine,
516 SessionID: w.s.UUID,
517 Process: w.process,
518 })
Serge Bazanskia9580a72023-01-12 14:44:35 +0100519 if err != nil {
520 return err
521 }
522 return q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
523 MachineID: w.Machine,
524 Process: w.process,
525 Event: model.WorkHistoryEventCanceled,
526 })
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100527 })
528 if err != nil {
529 klog.Errorf("Failed to cancel work %q on %q (sess %q): %v", w.process, w.Machine, w.s.UUID, err)
530 }
531}
532
533// Finish work by executing a commit function 'fn' and releasing the machine
534// from the work performed. The function given should apply tags to the
535// processed machine in a way that causes it to not be eligible for retrieval
536// again. As with the retriever function, the commit function might be called an
537// arbitrary number of times as part of cockroachdb transaction retries.
538//
539// This may be called only once.
540func (w *Work) Finish(ctx context.Context, fn func(q *model.Queries) error) error {
541 if w.done {
542 return fmt.Errorf("already finished")
543 }
544 w.done = true
Serge Bazanskic50f6942023-04-24 18:27:22 +0200545 w.m.OnWorkFinished(metrics.WorkResultFinished)
546
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100547 klog.Infof("Finishing work %q on machine %q (sess %q)", w.process, w.Machine, w.s.UUID)
548 return w.s.Transact(ctx, func(q *model.Queries) error {
549 err := q.FinishWork(ctx, model.FinishWorkParams{
550 MachineID: w.Machine,
551 SessionID: w.s.UUID,
552 Process: w.process,
553 })
554 if err != nil {
555 return err
556 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200557 err = q.WorkBackoffDelete(ctx, model.WorkBackoffDeleteParams{
558 MachineID: w.Machine,
559 Process: w.process,
560 })
561 if err != nil {
562 return err
563 }
Serge Bazanskia9580a72023-01-12 14:44:35 +0100564 err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
565 MachineID: w.Machine,
566 Process: w.process,
567 Event: model.WorkHistoryEventFinished,
568 })
569 if err != nil {
570 return err
571 }
Serge Bazanskibe6c3ad2022-12-12 15:11:39 +0100572 return fn(q)
573 })
Serge Bazanski35e8d792022-10-11 11:32:30 +0200574}
Serge Bazanskia9580a72023-01-12 14:44:35 +0100575
Serge Bazanski20312b42023-04-19 13:49:47 +0200576// Fail work and introduce backoff. The given cause is an operator-readable
Serge Bazanskia9580a72023-01-12 14:44:35 +0100577// string that will be persisted alongside the backoff and the work history/audit
578// table.
Serge Bazanski20312b42023-04-19 13:49:47 +0200579//
580// The backoff describes a period during which the same process will not be
581// retried on this machine until its expiration.
582//
583// The given backoff is a structure which describes both the initial backoff
584// period if the work failed for the first time, and a mechanism to exponentially
585// increase the backoff period if that work failed repeatedly. The work is
586// defined to have failed repeatedly if it only resulted in Cancel/Fail calls
587// without any Finish calls in the meantime.
588//
589// Only the last backoff period is persisted in the database. The exponential
590// backoff behaviour (including its maximum time) is always calculated based on
591// the given backoff structure.
592//
593// If nil, the backoff defaults to a non-exponential, one second backoff. This is
594// the minimum designed to keep the system chugging along without repeatedly
595// trying a failed job in a loop. However, the backoff should generally be set to
596// some well engineered value to prevent spurious retries.
597func (w *Work) Fail(ctx context.Context, backoff *Backoff, cause string) error {
Serge Bazanskia9580a72023-01-12 14:44:35 +0100598 if w.done {
599 return fmt.Errorf("already finished")
600 }
601 w.done = true
Serge Bazanskic50f6942023-04-24 18:27:22 +0200602 w.m.OnWorkFinished(metrics.WorkResultFailed)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100603
604 return w.s.Transact(ctx, func(q *model.Queries) error {
605 err := q.FinishWork(ctx, model.FinishWorkParams{
606 MachineID: w.Machine,
607 SessionID: w.s.UUID,
608 Process: w.process,
609 })
610 if err != nil {
611 return err
612 }
613 err = q.WorkHistoryInsert(ctx, model.WorkHistoryInsertParams{
614 MachineID: w.Machine,
615 Process: w.process,
616 Event: model.WorkHistoryEventFailed,
617 FailedCause: sql.NullString{
618 String: cause,
619 Valid: true,
620 },
621 })
622 if err != nil {
623 return err
624 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200625 if backoff == nil {
626 klog.Warningf("Nil backoff for %q on machine %q: defaulting to one second non-exponential.", w.process, w.Machine)
Serge Bazanskia9580a72023-01-12 14:44:35 +0100627 }
Serge Bazanski20312b42023-04-19 13:49:47 +0200628 seconds := backoff.next(w.backoff)
629 klog.Infof("Adding backoff for %q on machine %q: %d seconds", w.process, w.Machine, seconds)
630 return q.WorkBackoffInsert(ctx, model.WorkBackoffInsertParams{
631 MachineID: w.Machine,
632 Process: w.process,
633 Cause: cause,
634 Seconds: seconds,
635 })
Serge Bazanskia9580a72023-01-12 14:44:35 +0100636 })
637}