blob: 6c00b9726cc04af2adb24837fdc67a53e977d838 [file] [log] [blame]
Serge Bazanskiafd3cf82023-04-19 17:43:46 +02001package manager
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "flag"
8 "fmt"
9 "time"
10
11 "github.com/packethost/packngo"
12 "k8s.io/klog/v2"
13
14 "source.monogon.dev/cloud/bmaas/bmdb"
15 "source.monogon.dev/cloud/bmaas/bmdb/model"
16 "source.monogon.dev/cloud/lib/sinbin"
17 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
18)
19
20type UpdaterConfig struct {
21 // Enable starts the updater.
22 Enable bool
23 // IterationRate is the minimu mtime taken between subsequent iterations of the
24 // updater.
25 IterationRate time.Duration
26}
27
28func (u *UpdaterConfig) RegisterFlags() {
29 flag.BoolVar(&u.Enable, "updater_enable", true, "Enable the updater, which periodically scans equinix machines and updates their status in the BMDB")
30 flag.DurationVar(&u.IterationRate, "updater_iteration_rate", time.Minute, "Rate limiting for updater iteration loop")
31}
32
33// The Updater periodically scans all machines backed by the equinix provider and
34// updaters their Provided status fields based on data retrieved from the Equinix
35// API.
36type Updater struct {
37 config *UpdaterConfig
38 sinbin sinbin.Sinbin[string]
39
40 cl ecl.Client
41}
42
43func (c *UpdaterConfig) New(cl ecl.Client) (*Updater, error) {
44 return &Updater{
45 config: c,
46 cl: cl,
47 }, nil
48}
49
50func (u *Updater) Run(ctx context.Context, conn *bmdb.Connection) error {
51 var sess *bmdb.Session
52 var err error
53
54 if !u.config.Enable {
55 return nil
56 }
57
58 for {
59 if sess == nil {
60 sess, err = conn.StartSession(ctx)
61 if err != nil {
62 return fmt.Errorf("could not start BMDB session: %w", err)
63 }
64 }
65 limit := time.After(u.config.IterationRate)
66
67 err = u.runInSession(ctx, sess)
68 switch {
69 case err == nil:
70 <-limit
71 case errors.Is(err, ctx.Err()):
72 return err
73 case errors.Is(err, bmdb.ErrSessionExpired):
74 klog.Errorf("Session expired, restarting...")
75 sess = nil
76 time.Sleep(time.Second)
77 case err != nil:
78 klog.Errorf("Processing failed: %v", err)
79 // TODO(q3k): close session
80 time.Sleep(time.Second)
81 }
82 }
83}
84
85// applyNullStringUpdate returns true if 'up' supersedes 'cur'. Otherwise, it
86// returns false and zeroes out up.
87func applyNullStringUpdate(up, cur *sql.NullString) bool {
88 if up.Valid {
89 if !cur.Valid {
90 return true
91 }
92 if up.String != cur.String {
93 return true
94 }
95 }
96 up.String = ""
97 up.Valid = false
98 return false
99}
100
101// applyNullProviderStatusUpdate returns true if 'up' supersedes 'cur'.
102// Otherwise, it returns false and zeroes out up.
103func applyNullProviderStatusUpdate(up, cur *model.NullProviderStatus) bool {
104 if up.Valid {
105 if !cur.Valid {
106 return true
107 }
108 if up.ProviderStatus != cur.ProviderStatus {
109 return true
110 }
111 }
112 up.ProviderStatus = model.ProviderStatusUnknown
113 up.Valid = false
114 return false
115}
116
117// applyUpdate returns true if 'up' supersedes 'cur'. Otherwise, it returns false
118// and zeroes out up.
119func applyUpdate(up *model.MachineUpdateProviderStatusParams, cur *model.MachineProvided) bool {
120 res := false
121 res = res || applyNullStringUpdate(&up.ProviderReservationID, &cur.ProviderReservationID)
122 res = res || applyNullStringUpdate(&up.ProviderIpAddress, &cur.ProviderIpAddress)
123 res = res || applyNullStringUpdate(&up.ProviderLocation, &cur.ProviderLocation)
124 res = res || applyNullProviderStatusUpdate(&up.ProviderStatus, &cur.ProviderStatus)
125 return res
126}
127
128// updateLog logs information about the given update as calculated by applyUpdate.
129func updateLog(up *model.MachineUpdateProviderStatusParams) {
130 if up.ProviderReservationID.Valid {
131 klog.Infof(" Device %s: new reservation ID %s", up.ProviderID, up.ProviderReservationID.String)
132 }
133 if up.ProviderIpAddress.Valid {
134 klog.Infof(" Device %s: new IP address %s", up.ProviderID, up.ProviderIpAddress.String)
135 }
136 if up.ProviderLocation.Valid {
137 klog.Infof(" Device %s: new location %s", up.ProviderID, up.ProviderLocation.String)
138 }
139 if up.ProviderStatus.Valid {
140 klog.Infof(" Device %s: new status %s", up.ProviderID, up.ProviderStatus.ProviderStatus)
141 }
142}
143
144func (u *Updater) runInSession(ctx context.Context, sess *bmdb.Session) error {
145 // Get all machines provided by us into the BMDB.
146 // TODO(q3k): do not load all machines into memory.
147
148 var machines []model.MachineProvided
149 err := sess.Transact(ctx, func(q *model.Queries) error {
150 var err error
151 machines, err = q.GetProvidedMachines(ctx, model.ProviderEquinix)
152 return err
153 })
154 if err != nil {
155 return fmt.Errorf("when fetching provided machines: %w", err)
156 }
157
158 // Limit how many machines we check by timing them out if they're likely to not
159 // get updated soon.
160 penalized := 0
161 var check []model.MachineProvided
162 for _, m := range machines {
163 if u.sinbin.Penalized(m.ProviderID) {
164 penalized += 1
165 } else {
166 check = append(check, m)
167 }
168 }
169
170 klog.Infof("Machines to check %d, skipping: %d", len(check), penalized)
171 for _, m := range check {
172 dev, err := u.cl.GetDevice(ctx, "", m.ProviderID, &packngo.ListOptions{
173 Includes: []string{
174 "hardware_reservation",
175 },
176 Excludes: []string{
177 "created_by", "customdata", "network_ports", "operating_system", "actions",
178 "plan", "provisioning_events", "ssh_keys", "tags", "volumes",
179 },
180 })
181 if err != nil {
182 klog.Warningf("Fetching device %s failed: %v", m.ProviderID, err)
183 continue
184 }
185
186 // nextCheck will be used to sinbin the machine for some given time if there is
187 // no difference between the current state and new state.
188 //
189 // Some conditions override this to be shorter (when the machine doesn't yet have
190 // all data available or is in an otherwise unstable state).
191 nextCheck := time.Minute * 30
192
193 up := model.MachineUpdateProviderStatusParams{
194 Provider: m.Provider,
195 ProviderID: m.ProviderID,
196 }
197
198 if dev.HardwareReservation != nil {
199 up.ProviderReservationID.Valid = true
200 up.ProviderReservationID.String = dev.HardwareReservation.ID
201 } else {
202 nextCheck = time.Minute
203 }
204
205 for _, addr := range dev.Network {
206 if !addr.Public {
207 continue
208 }
209 up.ProviderIpAddress.Valid = true
210 up.ProviderIpAddress.String = addr.Address
211 break
212 }
213 if !up.ProviderIpAddress.Valid {
214 nextCheck = time.Minute
215 }
216
217 if dev.Facility != nil {
218 up.ProviderLocation.Valid = true
219 up.ProviderLocation.String = dev.Facility.Code
220 } else {
221 nextCheck = time.Minute
222 }
223
224 up.ProviderStatus.Valid = true
225 switch dev.State {
226 case "active":
227 up.ProviderStatus.ProviderStatus = model.ProviderStatusRunning
228 case "deleted":
229 up.ProviderStatus.ProviderStatus = model.ProviderStatusMissing
230 case "failed":
231 up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioningFailedPermanent
232 case "inactive":
233 up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
234 case "powering_on", "powering_off":
235 nextCheck = time.Minute
236 up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
237 case "queued", "provisioning", "reinstalling", "post_provisioning":
238 nextCheck = time.Minute
239 up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioning
240 default:
241 klog.Warningf("Device %s has unexpected status: %q", m.ProviderID, dev.State)
242 nextCheck = time.Minute
243 up.ProviderStatus.ProviderStatus = model.ProviderStatusUnknown
244 }
245
246 if !applyUpdate(&up, &m) {
247 u.sinbin.Add(m.ProviderID, time.Now().Add(nextCheck))
248 continue
249 }
250
251 klog.Infof("Device %s has new data:", m.ProviderID)
252 updateLog(&up)
253 err = sess.Transact(ctx, func(q *model.Queries) error {
254 return q.MachineUpdateProviderStatus(ctx, up)
255 })
256 if err != nil {
257 klog.Warningf("Device %s failed to update: %v", m.ProviderID, err)
258 }
259 u.sinbin.Add(m.ProviderID, time.Now().Add(time.Minute))
260 }
261 return nil
262}