blob: 85ec44025ad29c810cbe86dd64196f3f43370f81 [file] [log] [blame]
Serge Bazanskiae004682023-04-18 13:28:48 +02001package manager
2
3import (
4 "context"
Tim Windelschmidt913a03a2023-04-24 15:57:02 +02005 "flag"
Serge Bazanskiae004682023-04-18 13:28:48 +02006 "fmt"
Tim Windelschmidt913a03a2023-04-24 15:57:02 +02007 "time"
Serge Bazanskiae004682023-04-18 13:28:48 +02008
9 "k8s.io/klog/v2"
10
Serge Bazanski00cf57d2023-04-20 11:19:00 +020011 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanskiae004682023-04-18 13:28:48 +020012 "source.monogon.dev/cloud/bmaas/bmdb/model"
13 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
14)
15
16type RecovererConfig struct {
17 ControlLoopConfig
Tim Windelschmidt913a03a2023-04-24 15:57:02 +020018 RebootWaitSeconds int
Serge Bazanskiae004682023-04-18 13:28:48 +020019}
20
21func (r *RecovererConfig) RegisterFlags() {
22 r.ControlLoopConfig.RegisterFlags("recoverer")
Tim Windelschmidt913a03a2023-04-24 15:57:02 +020023 flag.IntVar(&r.RebootWaitSeconds, "recoverer_reboot_wait_seconds", 30, "How many seconds to sleep to ensure a reboot happend")
Serge Bazanskiae004682023-04-18 13:28:48 +020024}
25
26// The Recoverer reboots machines whose agent has stopped sending heartbeats or
27// has not sent any heartbeats at all.
28type Recoverer struct {
29 RecovererConfig
30
31 cl ecl.Client
32}
33
34func NewRecoverer(cl ecl.Client, rc RecovererConfig) (*Recoverer, error) {
35 if err := rc.ControlLoopConfig.Check(); err != nil {
36 return nil, err
37 }
38 return &Recoverer{
39 RecovererConfig: rc,
40 cl: cl,
41 }, nil
42}
43
Serge Bazanski00cf57d2023-04-20 11:19:00 +020044func (r *Recoverer) getProcessInfo() processInfo {
45 return processInfo{
46 process: model.ProcessShepherdRecovery,
47 defaultBackoff: bmdb.Backoff{
48 Initial: 1 * time.Minute,
49 Maximum: 1 * time.Hour,
50 Exponent: 1.2,
51 },
52 }
53}
54
Serge Bazanskiae004682023-04-18 13:28:48 +020055func (r *Recoverer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
56 return q.GetMachineForAgentRecovery(ctx, limit)
57}
58
59func (r *Recoverer) processMachine(ctx context.Context, t *task) error {
60 klog.Infof("Starting recovery of device (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
61
62 if err := r.cl.RebootDevice(ctx, t.machine.ProviderID); err != nil {
63 return fmt.Errorf("failed to reboot device: %w", err)
64 }
65
Tim Windelschmidt913a03a2023-04-24 15:57:02 +020066 // TODO(issue/215): replace this
67 // This is required as Equinix doesn't reboot the machines synchronously
68 // during the API call.
69 select {
70 case <-time.After(time.Duration(r.RebootWaitSeconds) * time.Second):
71 case <-ctx.Done():
72 return fmt.Errorf("while waiting for reboot: %w", ctx.Err())
73 }
74
Serge Bazanskiae004682023-04-18 13:28:48 +020075 klog.Infof("Removing AgentStarted/AgentHeartbeat (ID: %s, PID: %s)...", t.machine.MachineID, t.machine.ProviderID)
76 err := t.work.Finish(ctx, func(q *model.Queries) error {
77 if err := q.MachineDeleteAgentStarted(ctx, t.machine.MachineID); err != nil {
78 return fmt.Errorf("while deleting AgentStarted: %w", err)
79 }
80 if err := q.MachineDeleteAgentHeartbeat(ctx, t.machine.MachineID); err != nil {
81 return fmt.Errorf("while deleting AgentHeartbeat: %w", err)
82 }
83 return nil
84 })
85 if err != nil {
86 return fmt.Errorf("while deleting AgentStarted/AgentHeartbeat tags: %w", err)
87 }
88 return nil
89}