blob: dd8c6ff31961902196dda6b26bb55be08f019fae [file] [log] [blame]
Serge Bazanskiafd3cf82023-04-19 17:43:46 +02001package manager
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "flag"
8 "fmt"
9 "time"
10
11 "github.com/packethost/packngo"
12 "k8s.io/klog/v2"
13
14 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanskic50f6942023-04-24 18:27:22 +020015 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
Serge Bazanskiafd3cf82023-04-19 17:43:46 +020016 "source.monogon.dev/cloud/bmaas/bmdb/model"
17 "source.monogon.dev/cloud/lib/sinbin"
18 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
19)
20
21type UpdaterConfig struct {
22 // Enable starts the updater.
23 Enable bool
24 // IterationRate is the minimu mtime taken between subsequent iterations of the
25 // updater.
26 IterationRate time.Duration
27}
28
29func (u *UpdaterConfig) RegisterFlags() {
30 flag.BoolVar(&u.Enable, "updater_enable", true, "Enable the updater, which periodically scans equinix machines and updates their status in the BMDB")
31 flag.DurationVar(&u.IterationRate, "updater_iteration_rate", time.Minute, "Rate limiting for updater iteration loop")
32}
33
34// The Updater periodically scans all machines backed by the equinix provider and
35// updaters their Provided status fields based on data retrieved from the Equinix
36// API.
37type Updater struct {
38 config *UpdaterConfig
39 sinbin sinbin.Sinbin[string]
40
41 cl ecl.Client
42}
43
44func (c *UpdaterConfig) New(cl ecl.Client) (*Updater, error) {
45 return &Updater{
46 config: c,
47 cl: cl,
48 }, nil
49}
50
51func (u *Updater) Run(ctx context.Context, conn *bmdb.Connection) error {
52 var sess *bmdb.Session
53 var err error
54
55 if !u.config.Enable {
56 return nil
57 }
58
59 for {
60 if sess == nil {
Serge Bazanskic50f6942023-04-24 18:27:22 +020061 sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdUpdater})
Serge Bazanskiafd3cf82023-04-19 17:43:46 +020062 if err != nil {
63 return fmt.Errorf("could not start BMDB session: %w", err)
64 }
65 }
66 limit := time.After(u.config.IterationRate)
67
68 err = u.runInSession(ctx, sess)
69 switch {
70 case err == nil:
71 <-limit
72 case errors.Is(err, ctx.Err()):
73 return err
74 case errors.Is(err, bmdb.ErrSessionExpired):
75 klog.Errorf("Session expired, restarting...")
76 sess = nil
77 time.Sleep(time.Second)
78 case err != nil:
79 klog.Errorf("Processing failed: %v", err)
80 // TODO(q3k): close session
81 time.Sleep(time.Second)
82 }
83 }
84}
85
86// applyNullStringUpdate returns true if 'up' supersedes 'cur'. Otherwise, it
87// returns false and zeroes out up.
88func applyNullStringUpdate(up, cur *sql.NullString) bool {
89 if up.Valid {
90 if !cur.Valid {
91 return true
92 }
93 if up.String != cur.String {
94 return true
95 }
96 }
97 up.String = ""
98 up.Valid = false
99 return false
100}
101
102// applyNullProviderStatusUpdate returns true if 'up' supersedes 'cur'.
103// Otherwise, it returns false and zeroes out up.
104func applyNullProviderStatusUpdate(up, cur *model.NullProviderStatus) bool {
105 if up.Valid {
106 if !cur.Valid {
107 return true
108 }
109 if up.ProviderStatus != cur.ProviderStatus {
110 return true
111 }
112 }
113 up.ProviderStatus = model.ProviderStatusUnknown
114 up.Valid = false
115 return false
116}
117
118// applyUpdate returns true if 'up' supersedes 'cur'. Otherwise, it returns false
119// and zeroes out up.
120func applyUpdate(up *model.MachineUpdateProviderStatusParams, cur *model.MachineProvided) bool {
121 res := false
122 res = res || applyNullStringUpdate(&up.ProviderReservationID, &cur.ProviderReservationID)
123 res = res || applyNullStringUpdate(&up.ProviderIpAddress, &cur.ProviderIpAddress)
124 res = res || applyNullStringUpdate(&up.ProviderLocation, &cur.ProviderLocation)
125 res = res || applyNullProviderStatusUpdate(&up.ProviderStatus, &cur.ProviderStatus)
126 return res
127}
128
129// updateLog logs information about the given update as calculated by applyUpdate.
130func updateLog(up *model.MachineUpdateProviderStatusParams) {
131 if up.ProviderReservationID.Valid {
132 klog.Infof(" Device %s: new reservation ID %s", up.ProviderID, up.ProviderReservationID.String)
133 }
134 if up.ProviderIpAddress.Valid {
135 klog.Infof(" Device %s: new IP address %s", up.ProviderID, up.ProviderIpAddress.String)
136 }
137 if up.ProviderLocation.Valid {
138 klog.Infof(" Device %s: new location %s", up.ProviderID, up.ProviderLocation.String)
139 }
140 if up.ProviderStatus.Valid {
141 klog.Infof(" Device %s: new status %s", up.ProviderID, up.ProviderStatus.ProviderStatus)
142 }
143}
144
145func (u *Updater) runInSession(ctx context.Context, sess *bmdb.Session) error {
146 // Get all machines provided by us into the BMDB.
147 // TODO(q3k): do not load all machines into memory.
148
149 var machines []model.MachineProvided
150 err := sess.Transact(ctx, func(q *model.Queries) error {
151 var err error
152 machines, err = q.GetProvidedMachines(ctx, model.ProviderEquinix)
153 return err
154 })
155 if err != nil {
156 return fmt.Errorf("when fetching provided machines: %w", err)
157 }
158
159 // Limit how many machines we check by timing them out if they're likely to not
160 // get updated soon.
161 penalized := 0
162 var check []model.MachineProvided
163 for _, m := range machines {
164 if u.sinbin.Penalized(m.ProviderID) {
165 penalized += 1
166 } else {
167 check = append(check, m)
168 }
169 }
170
171 klog.Infof("Machines to check %d, skipping: %d", len(check), penalized)
172 for _, m := range check {
173 dev, err := u.cl.GetDevice(ctx, "", m.ProviderID, &packngo.ListOptions{
174 Includes: []string{
175 "hardware_reservation",
176 },
177 Excludes: []string{
178 "created_by", "customdata", "network_ports", "operating_system", "actions",
179 "plan", "provisioning_events", "ssh_keys", "tags", "volumes",
180 },
181 })
182 if err != nil {
183 klog.Warningf("Fetching device %s failed: %v", m.ProviderID, err)
184 continue
185 }
186
187 // nextCheck will be used to sinbin the machine for some given time if there is
188 // no difference between the current state and new state.
189 //
190 // Some conditions override this to be shorter (when the machine doesn't yet have
191 // all data available or is in an otherwise unstable state).
192 nextCheck := time.Minute * 30
193
194 up := model.MachineUpdateProviderStatusParams{
195 Provider: m.Provider,
196 ProviderID: m.ProviderID,
197 }
198
199 if dev.HardwareReservation != nil {
200 up.ProviderReservationID.Valid = true
201 up.ProviderReservationID.String = dev.HardwareReservation.ID
202 } else {
203 nextCheck = time.Minute
204 }
205
206 for _, addr := range dev.Network {
207 if !addr.Public {
208 continue
209 }
210 up.ProviderIpAddress.Valid = true
211 up.ProviderIpAddress.String = addr.Address
212 break
213 }
214 if !up.ProviderIpAddress.Valid {
215 nextCheck = time.Minute
216 }
217
218 if dev.Facility != nil {
219 up.ProviderLocation.Valid = true
220 up.ProviderLocation.String = dev.Facility.Code
221 } else {
222 nextCheck = time.Minute
223 }
224
225 up.ProviderStatus.Valid = true
226 switch dev.State {
227 case "active":
228 up.ProviderStatus.ProviderStatus = model.ProviderStatusRunning
229 case "deleted":
230 up.ProviderStatus.ProviderStatus = model.ProviderStatusMissing
231 case "failed":
232 up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioningFailedPermanent
233 case "inactive":
234 up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
235 case "powering_on", "powering_off":
236 nextCheck = time.Minute
237 up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
238 case "queued", "provisioning", "reinstalling", "post_provisioning":
239 nextCheck = time.Minute
240 up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioning
241 default:
242 klog.Warningf("Device %s has unexpected status: %q", m.ProviderID, dev.State)
243 nextCheck = time.Minute
244 up.ProviderStatus.ProviderStatus = model.ProviderStatusUnknown
245 }
246
247 if !applyUpdate(&up, &m) {
248 u.sinbin.Add(m.ProviderID, time.Now().Add(nextCheck))
249 continue
250 }
251
252 klog.Infof("Device %s has new data:", m.ProviderID)
253 updateLog(&up)
254 err = sess.Transact(ctx, func(q *model.Queries) error {
255 return q.MachineUpdateProviderStatus(ctx, up)
256 })
257 if err != nil {
258 klog.Warningf("Device %s failed to update: %v", m.ProviderID, err)
259 }
260 u.sinbin.Add(m.ProviderID, time.Now().Add(time.Minute))
261 }
262 return nil
263}