blob: 5b6e089513b9da32c4074a4d43e3465c76e34f6e [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +02004package main
Serge Bazanskiafd3cf82023-04-19 17:43:46 +02005
6import (
7 "context"
8 "database/sql"
9 "errors"
10 "flag"
11 "fmt"
12 "time"
13
14 "github.com/packethost/packngo"
15 "k8s.io/klog/v2"
16
17 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanskic50f6942023-04-24 18:27:22 +020018 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
Serge Bazanskiafd3cf82023-04-19 17:43:46 +020019 "source.monogon.dev/cloud/bmaas/bmdb/model"
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +020020 ecl "source.monogon.dev/cloud/equinix/wrapngo"
Serge Bazanskiafd3cf82023-04-19 17:43:46 +020021 "source.monogon.dev/cloud/lib/sinbin"
Serge Bazanskiafd3cf82023-04-19 17:43:46 +020022)
23
24type UpdaterConfig struct {
25 // Enable starts the updater.
26 Enable bool
27 // IterationRate is the minimu mtime taken between subsequent iterations of the
28 // updater.
29 IterationRate time.Duration
30}
31
32func (u *UpdaterConfig) RegisterFlags() {
33 flag.BoolVar(&u.Enable, "updater_enable", true, "Enable the updater, which periodically scans equinix machines and updates their status in the BMDB")
34 flag.DurationVar(&u.IterationRate, "updater_iteration_rate", time.Minute, "Rate limiting for updater iteration loop")
35}
36
37// The Updater periodically scans all machines backed by the equinix provider and
38// updaters their Provided status fields based on data retrieved from the Equinix
39// API.
40type Updater struct {
41 config *UpdaterConfig
42 sinbin sinbin.Sinbin[string]
43
44 cl ecl.Client
45}
46
Tim Windelschmidt0c57d342024-04-11 01:38:47 +020047func (u *UpdaterConfig) New(cl ecl.Client) (*Updater, error) {
Serge Bazanskiafd3cf82023-04-19 17:43:46 +020048 return &Updater{
Tim Windelschmidt0c57d342024-04-11 01:38:47 +020049 config: u,
Serge Bazanskiafd3cf82023-04-19 17:43:46 +020050 cl: cl,
51 }, nil
52}
53
54func (u *Updater) Run(ctx context.Context, conn *bmdb.Connection) error {
55 var sess *bmdb.Session
56 var err error
57
58 if !u.config.Enable {
59 return nil
60 }
61
62 for {
63 if sess == nil {
Serge Bazanskic50f6942023-04-24 18:27:22 +020064 sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdUpdater})
Serge Bazanskiafd3cf82023-04-19 17:43:46 +020065 if err != nil {
66 return fmt.Errorf("could not start BMDB session: %w", err)
67 }
68 }
69 limit := time.After(u.config.IterationRate)
70
71 err = u.runInSession(ctx, sess)
72 switch {
73 case err == nil:
74 <-limit
75 case errors.Is(err, ctx.Err()):
76 return err
77 case errors.Is(err, bmdb.ErrSessionExpired):
78 klog.Errorf("Session expired, restarting...")
79 sess = nil
80 time.Sleep(time.Second)
Tim Windelschmidt438ae2e2024-04-11 23:23:12 +020081 default:
Serge Bazanskiafd3cf82023-04-19 17:43:46 +020082 klog.Errorf("Processing failed: %v", err)
83 // TODO(q3k): close session
84 time.Sleep(time.Second)
85 }
86 }
87}
88
89// applyNullStringUpdate returns true if 'up' supersedes 'cur'. Otherwise, it
90// returns false and zeroes out up.
91func applyNullStringUpdate(up, cur *sql.NullString) bool {
92 if up.Valid {
93 if !cur.Valid {
94 return true
95 }
96 if up.String != cur.String {
97 return true
98 }
99 }
100 up.String = ""
101 up.Valid = false
102 return false
103}
104
105// applyNullProviderStatusUpdate returns true if 'up' supersedes 'cur'.
106// Otherwise, it returns false and zeroes out up.
107func applyNullProviderStatusUpdate(up, cur *model.NullProviderStatus) bool {
108 if up.Valid {
109 if !cur.Valid {
110 return true
111 }
112 if up.ProviderStatus != cur.ProviderStatus {
113 return true
114 }
115 }
116 up.ProviderStatus = model.ProviderStatusUnknown
117 up.Valid = false
118 return false
119}
120
121// applyUpdate returns true if 'up' supersedes 'cur'. Otherwise, it returns false
122// and zeroes out up.
123func applyUpdate(up *model.MachineUpdateProviderStatusParams, cur *model.MachineProvided) bool {
124 res := false
125 res = res || applyNullStringUpdate(&up.ProviderReservationID, &cur.ProviderReservationID)
126 res = res || applyNullStringUpdate(&up.ProviderIpAddress, &cur.ProviderIpAddress)
127 res = res || applyNullStringUpdate(&up.ProviderLocation, &cur.ProviderLocation)
128 res = res || applyNullProviderStatusUpdate(&up.ProviderStatus, &cur.ProviderStatus)
129 return res
130}
131
132// updateLog logs information about the given update as calculated by applyUpdate.
133func updateLog(up *model.MachineUpdateProviderStatusParams) {
134 if up.ProviderReservationID.Valid {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200135 klog.Infof(" Machine %s: new reservation ID %s", up.ProviderID, up.ProviderReservationID.String)
Serge Bazanskiafd3cf82023-04-19 17:43:46 +0200136 }
137 if up.ProviderIpAddress.Valid {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200138 klog.Infof(" Machine %s: new IP address %s", up.ProviderID, up.ProviderIpAddress.String)
Serge Bazanskiafd3cf82023-04-19 17:43:46 +0200139 }
140 if up.ProviderLocation.Valid {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200141 klog.Infof(" Machine %s: new location %s", up.ProviderID, up.ProviderLocation.String)
Serge Bazanskiafd3cf82023-04-19 17:43:46 +0200142 }
143 if up.ProviderStatus.Valid {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200144 klog.Infof(" Machine %s: new status %s", up.ProviderID, up.ProviderStatus.ProviderStatus)
Serge Bazanskiafd3cf82023-04-19 17:43:46 +0200145 }
146}
147
148func (u *Updater) runInSession(ctx context.Context, sess *bmdb.Session) error {
149 // Get all machines provided by us into the BMDB.
150 // TODO(q3k): do not load all machines into memory.
151
152 var machines []model.MachineProvided
153 err := sess.Transact(ctx, func(q *model.Queries) error {
154 var err error
155 machines, err = q.GetProvidedMachines(ctx, model.ProviderEquinix)
156 return err
157 })
158 if err != nil {
159 return fmt.Errorf("when fetching provided machines: %w", err)
160 }
161
162 // Limit how many machines we check by timing them out if they're likely to not
163 // get updated soon.
164 penalized := 0
165 var check []model.MachineProvided
166 for _, m := range machines {
167 if u.sinbin.Penalized(m.ProviderID) {
168 penalized += 1
169 } else {
170 check = append(check, m)
171 }
172 }
173
174 klog.Infof("Machines to check %d, skipping: %d", len(check), penalized)
175 for _, m := range check {
176 dev, err := u.cl.GetDevice(ctx, "", m.ProviderID, &packngo.ListOptions{
177 Includes: []string{
178 "hardware_reservation",
179 },
180 Excludes: []string{
181 "created_by", "customdata", "network_ports", "operating_system", "actions",
182 "plan", "provisioning_events", "ssh_keys", "tags", "volumes",
183 },
184 })
185 if err != nil {
186 klog.Warningf("Fetching device %s failed: %v", m.ProviderID, err)
187 continue
188 }
189
190 // nextCheck will be used to sinbin the machine for some given time if there is
191 // no difference between the current state and new state.
192 //
193 // Some conditions override this to be shorter (when the machine doesn't yet have
194 // all data available or is in an otherwise unstable state).
195 nextCheck := time.Minute * 30
196
197 up := model.MachineUpdateProviderStatusParams{
198 Provider: m.Provider,
199 ProviderID: m.ProviderID,
200 }
201
202 if dev.HardwareReservation != nil {
203 up.ProviderReservationID.Valid = true
204 up.ProviderReservationID.String = dev.HardwareReservation.ID
205 } else {
206 nextCheck = time.Minute
207 }
208
209 for _, addr := range dev.Network {
210 if !addr.Public {
211 continue
212 }
213 up.ProviderIpAddress.Valid = true
214 up.ProviderIpAddress.String = addr.Address
215 break
216 }
217 if !up.ProviderIpAddress.Valid {
218 nextCheck = time.Minute
219 }
220
221 if dev.Facility != nil {
222 up.ProviderLocation.Valid = true
223 up.ProviderLocation.String = dev.Facility.Code
224 } else {
225 nextCheck = time.Minute
226 }
227
228 up.ProviderStatus.Valid = true
229 switch dev.State {
230 case "active":
231 up.ProviderStatus.ProviderStatus = model.ProviderStatusRunning
232 case "deleted":
233 up.ProviderStatus.ProviderStatus = model.ProviderStatusMissing
234 case "failed":
235 up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioningFailedPermanent
236 case "inactive":
237 up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
238 case "powering_on", "powering_off":
239 nextCheck = time.Minute
240 up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
241 case "queued", "provisioning", "reinstalling", "post_provisioning":
242 nextCheck = time.Minute
243 up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioning
244 default:
245 klog.Warningf("Device %s has unexpected status: %q", m.ProviderID, dev.State)
246 nextCheck = time.Minute
247 up.ProviderStatus.ProviderStatus = model.ProviderStatusUnknown
248 }
249
250 if !applyUpdate(&up, &m) {
251 u.sinbin.Add(m.ProviderID, time.Now().Add(nextCheck))
252 continue
253 }
254
255 klog.Infof("Device %s has new data:", m.ProviderID)
256 updateLog(&up)
257 err = sess.Transact(ctx, func(q *model.Queries) error {
258 return q.MachineUpdateProviderStatus(ctx, up)
259 })
260 if err != nil {
261 klog.Warningf("Device %s failed to update: %v", m.ProviderID, err)
262 }
263 u.sinbin.Add(m.ProviderID, time.Now().Add(time.Minute))
264 }
265 return nil
266}