blob: 2bd1e972710f90398de4dcce28d0d379a0453e6b [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "errors"
6 "flag"
7 "fmt"
8 "sort"
9 "time"
10
11 "github.com/google/uuid"
12 "github.com/packethost/packngo"
13 "golang.org/x/time/rate"
14 "k8s.io/klog/v2"
15
16 "source.monogon.dev/cloud/bmaas/bmdb"
17 "source.monogon.dev/cloud/bmaas/bmdb/model"
18 "source.monogon.dev/cloud/lib/sinbin"
19 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
20)
21
22// ProvisionerConfig configures the provisioning process.
23type ProvisionerConfig struct {
24 // OS defines the operating system new devices are created with. Its format
25 // is specified by Equinix API.
26 OS string
27 // MaxCount is the maximum count of managed servers. No new devices will be
28 // created after reaching the limit. No attempt will be made to reduce the
29 // server count.
30 MaxCount uint
31
32 // ReconcileLoopLimiter limits the rate of the main reconciliation loop
33 // iterating. As new machines are being provisioned, each loop will cause one
34 // 'long' ListHardwareReservations call to Equinix.
35 ReconcileLoopLimiter *rate.Limiter
36
37 // DeviceCreation limits the rate at which devices are created within
38 // Equinix through use of appropriate API calls.
39 DeviceCreationLimiter *rate.Limiter
40
41 // Assimilate Equinix machines that match the configured device prefix into the
42 // BMDB as Provided. This should only be used for manual testing with
43 // -bmdb_eat_my_data.
44 Assimilate bool
45
46 // ReservationChunkSize is how many Equinix machines will try to be spawned in a
47 // single reconciliation loop. Higher numbers allow for faster initial
48 // provisioning, but lower numbers decrease potential raciness with other systems
49 // and make sure that other parts of the reconciliation logic are ran regularly.
50 //
51 // 20 is decent starting point.
52 ReservationChunkSize uint
53}
54
55func (p *ProvisionerConfig) RegisterFlags() {
56 flag.StringVar(&p.OS, "provisioner_os", "ubuntu_20_04", "OS that provisioner will deploy on Equinix machines. Not the target OS for cluster customers.")
57 flag.UintVar(&p.MaxCount, "provisioner_max_machines", 50, "Limit of machines that the provisioner will attempt to pull into the BMDB. Zero for no limit.")
58 flagLimiter(&p.ReconcileLoopLimiter, "provisioner_reconciler_rate", "1m,1", "Rate limiting for main provisioner reconciliation loop")
59 flagLimiter(&p.DeviceCreationLimiter, "provisioner_device_creation_rate", "5s,1", "Rate limiting for Equinix device/machine creation")
60 flag.BoolVar(&p.Assimilate, "provisioner_assimilate", false, "Assimilate matching machines in Equinix project into BMDB as Provided. Only to be used when manually testing.")
61 flag.UintVar(&p.ReservationChunkSize, "provisioner_reservation_chunk_size", 20, "How many machines will the provisioner attempt to create in a single reconciliation loop iteration")
62}
63
64// Provisioner implements the server provisioning logic. Provisioning entails
65// bringing all available hardware reservations (subject to limits) into BMDB as
66// machines provided by Equinix.
67type Provisioner struct {
68 config *ProvisionerConfig
69 sharedConfig *SharedConfig
70
71 // cl is the wrapngo client instance used.
72 cl ecl.Client
73
74 // badReservations is a holiday resort for Equinix hardware reservations which
75 // failed to be provisioned for some reason or another. We keep a list of them in
76 // memory just so that we don't repeatedly try to provision the same known bad
77 // machines.
78 badReservations sinbin.Sinbin[string]
79}
80
81// New creates a Provisioner instance, checking ProvisionerConfig and
82// SharedConfig for errors.
83func (c *ProvisionerConfig) New(cl ecl.Client, sc *SharedConfig) (*Provisioner, error) {
84 // If these are unset, it's probably because someone is using us as a library.
85 // Provide error messages useful to code users instead of flag names.
86 if c.OS == "" {
87 return nil, fmt.Errorf("OS must be set")
88 }
89 if c.ReconcileLoopLimiter == nil {
90 return nil, fmt.Errorf("ReconcileLoopLimiter must be set")
91 }
92 if c.DeviceCreationLimiter == nil {
93 return nil, fmt.Errorf("DeviceCreationLimiter must be set")
94 }
95 if c.ReservationChunkSize == 0 {
96 return nil, fmt.Errorf("ReservationChunkSize must be set")
97 }
98 return &Provisioner{
99 config: c,
100 sharedConfig: sc,
101
102 cl: cl,
103 }, nil
104}
105
106// Run the provisioner blocking the current goroutine until the given context
107// expires.
108func (p *Provisioner) Run(ctx context.Context, conn *bmdb.Connection) error {
109
110 var sess *bmdb.Session
111 var err error
112 for {
113 if sess == nil {
114 sess, err = conn.StartSession(ctx)
115 if err != nil {
116 return fmt.Errorf("could not start BMDB session: %w", err)
117 }
118 }
119 err = p.runInSession(ctx, sess)
120
121 switch {
122 case err == nil:
123 case errors.Is(err, ctx.Err()):
124 return err
125 case errors.Is(err, bmdb.ErrSessionExpired):
126 klog.Errorf("Session expired, restarting...")
127 sess = nil
128 time.Sleep(time.Second)
129 case err != nil:
130 klog.Errorf("Processing failed: %v", err)
131 // TODO(q3k): close session
132 time.Sleep(time.Second)
133 }
134 }
135}
136
137type machineListing struct {
138 machines []uuid.UUID
139 err error
140}
141
142// runInSession executes one iteration of the provisioner's control loop within a
143// BMDB session. This control loop attempts to bring all Equinix hardware
144// reservations into machines in the BMDB, subject to limits.
145func (p *Provisioner) runInSession(ctx context.Context, sess *bmdb.Session) error {
146 if err := p.config.ReconcileLoopLimiter.Wait(ctx); err != nil {
147 return err
148 }
149
150 providerC := make(chan *machineListing, 1)
151 bmdbC := make(chan *machineListing, 1)
152
153 klog.Infof("Getting provider and bmdb machines...")
154
155 // Make sub-context for two parallel operations, and so that we can cancel one
156 // immediately if the other fails.
157 subCtx, subCtxC := context.WithCancel(ctx)
158 defer subCtxC()
159
160 go func() {
161 machines, err := p.listInProvider(subCtx)
162 providerC <- &machineListing{
163 machines: machines,
164 err: err,
165 }
166 }()
167 go func() {
168 machines, err := p.listInBMDB(subCtx, sess)
169 bmdbC <- &machineListing{
170 machines: machines,
171 err: err,
172 }
173 }()
174 var inProvider, inBMDB *machineListing
175 for {
176 select {
177 case inProvider = <-providerC:
178 if err := inProvider.err; err != nil {
179 return fmt.Errorf("listing provider machines failed: %w", err)
180 }
181 klog.Infof("Got %d machines managed in provider.", len(inProvider.machines))
182 case inBMDB = <-bmdbC:
183 if err := inBMDB.err; err != nil {
184 return fmt.Errorf("listing BMDB machines failed: %w", err)
185 }
186 klog.Infof("Got %d machines in BMDB.", len(inBMDB.machines))
187 }
188 if inProvider != nil && inBMDB != nil {
189 break
190 }
191 }
192
193 subCtxC()
194 if err := p.reconcile(ctx, sess, inProvider.machines, inBMDB.machines); err != nil {
195 return fmt.Errorf("reconciliation failed: %w", err)
196 }
197 return nil
198}
199
200// listInProviders returns all machines that the provider thinks we should be
201// managing.
202func (p *Provisioner) listInProvider(ctx context.Context) ([]uuid.UUID, error) {
203 devices, err := p.sharedConfig.managedDevices(ctx, p.cl)
204 if err != nil {
205 return nil, fmt.Errorf("while fetching managed machines: %w", err)
206 }
207 var pvr []uuid.UUID
208 for _, dev := range devices {
209 id, err := uuid.Parse(dev.ID)
210 if err != nil {
211 klog.Errorf("Device ID %q is not UUID, skipping", dev.ID)
212 } else {
213 pvr = append(pvr, id)
214 }
215 }
216 sort.Slice(pvr, func(i, j int) bool {
217 return pvr[i].String() < pvr[j].String()
218 })
219 return pvr, nil
220}
221
222// listInBMDB returns all the machines that the BMDB thinks we should be managing.
223func (p *Provisioner) listInBMDB(ctx context.Context, sess *bmdb.Session) ([]uuid.UUID, error) {
224 var res []uuid.UUID
225 err := sess.Transact(ctx, func(q *model.Queries) error {
226 machines, err := q.GetProvidedMachines(ctx, model.ProviderEquinix)
227 if err != nil {
228 return err
229 }
230 res = make([]uuid.UUID, len(machines))
231 for i, machine := range machines {
232 id, err := uuid.Parse(machine.ProviderID)
233 if err != nil {
234 klog.Errorf("BMDB machine %s has unparseable provider ID %q", machine.MachineID, machine.ProviderID)
235 } else {
236 res[i] = id
237 }
238 }
239 return nil
240 })
241 if err != nil {
242 return nil, err
243 }
244 sort.Slice(res, func(i, j int) bool {
245 return res[i].String() < res[j].String()
246 })
247 return res, nil
248}
249
250// reconcile takes a list of machines that the provider thinks we should be
251// managing and that the BMDB thinks we should be managing, and tries to make
252// sense of that. First, some checks are performed across the two lists to make
253// sure we haven't dropped anything. Then, additional machines are deployed from
254// hardware reservations as needed.
255func (p *Provisioner) reconcile(ctx context.Context, sess *bmdb.Session, inProvider, inBMDB []uuid.UUID) error {
256 klog.Infof("Reconciling...")
257
258 bmdb := make(map[string]bool)
259 provider := make(map[string]bool)
260 for _, machine := range inProvider {
261 provider[machine.String()] = true
262 }
263 for _, machine := range inBMDB {
264 bmdb[machine.String()] = true
265 }
266
267 managed := make(map[string]bool)
268
269 // Some desynchronization between the BMDB and Provider point of view might be so
270 // bad we shouldn't attempt to do any work, at least not any time soon.
271 badbadnotgood := false
272
273 // Find any machines supposedly managed by us in the provider, but not in the
274 // BMDB, and assimilate them if so configured.
275 for machine, _ := range provider {
276 if bmdb[machine] {
277 managed[machine] = true
278 continue
279 }
280 if p.config.Assimilate {
281 klog.Warningf("Provider machine %s has no corresponding machine in BMDB. Assimilating it.", machine)
282 if err := p.assimilate(ctx, sess, machine); err != nil {
283 klog.Errorf("Failed to assimilate: %v", err)
284 } else {
285 managed[machine] = true
286 }
287 } else {
288 klog.Errorf("Provider machine %s has no corresponding machine in BMDB.", machine)
289 badbadnotgood = true
290 }
291 }
292
293 // Find any machines in the BMDB but not in the provider.
294 for machine, _ := range bmdb {
295 if !provider[machine] {
296 klog.Errorf("Provider device ID %s referred to in BMDB (from TODO) but missing in provider.", machine)
297 badbadnotgood = true
298 }
299 }
300
301 // Bail if things are weird.
302 if badbadnotgood {
303 klog.Errorf("Something's very wrong. Bailing early and refusing to do any work.")
304 return fmt.Errorf("fatal discrepency between BMDB and provider")
305 }
306
307 // Summarize all managed machines, which is the intersection of BMDB and
308 // Provisioner machines, usually both of these sets being equal.
309 nmanaged := len(managed)
310 klog.Infof("Total managed machines: %d", nmanaged)
311
312 if p.config.MaxCount != 0 && p.config.MaxCount <= uint(nmanaged) {
313 klog.Infof("Not bringing up more machines (at limit of %d machines)", p.config.MaxCount)
314 return nil
315 }
316
317 limitName := "no limit"
318 if p.config.MaxCount != 0 {
319 limitName = fmt.Sprintf("%d", p.config.MaxCount)
320 }
321 klog.Infof("Below managed machine limit (%s), bringing up more...", limitName)
322 klog.Infof("Retrieving hardware reservations, this will take a while...")
323 reservations, err := p.cl.ListReservations(ctx, p.sharedConfig.ProjectId)
324 if err != nil {
325 return fmt.Errorf("failed to list reservations: %w", err)
326 }
327
328 // Collect all reservations.
329 var toProvision []packngo.HardwareReservation
Tim Windelschmidtea946632023-04-18 03:46:58 +0200330 var inUse, notProvisionable, penalized int
Serge Bazanskicaa12082023-02-16 14:54:04 +0100331 for _, reservation := range reservations {
Tim Windelschmidtea946632023-04-18 03:46:58 +0200332 if reservation.Device != nil {
333 inUse++
Serge Bazanskicaa12082023-02-16 14:54:04 +0100334 continue
335 }
Tim Windelschmidtea946632023-04-18 03:46:58 +0200336 if !reservation.Provisionable {
337 notProvisionable++
338 continue
Serge Bazanskicaa12082023-02-16 14:54:04 +0100339 }
340 if p.badReservations.Penalized(reservation.ID) {
Tim Windelschmidtea946632023-04-18 03:46:58 +0200341 penalized++
Serge Bazanskicaa12082023-02-16 14:54:04 +0100342 continue
343 }
344 toProvision = append(toProvision, reservation)
345 }
Tim Windelschmidtea946632023-04-18 03:46:58 +0200346 klog.Infof("Retrieved hardware reservations: %d (total), %d (available), %d (in use), %d (not provisionable), %d (penalized)", len(reservations), len(toProvision), inUse, notProvisionable, penalized)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100347
348 // Limit them to MaxCount, if applicable.
349 if p.config.MaxCount != 0 {
350 needed := int(p.config.MaxCount) - nmanaged
351 if len(toProvision) < needed {
352 needed = len(toProvision)
353 }
354 toProvision = toProvision[:needed]
355 }
356
357 // Limit them to an arbitrary 'chunk' size so that we don't do too many things in
358 // a single reconciliation operation.
359 if uint(len(toProvision)) > p.config.ReservationChunkSize {
360 toProvision = toProvision[:p.config.ReservationChunkSize]
361 }
362
363 if len(toProvision) == 0 {
364 klog.Infof("No more hardware reservations available, or all filtered out.")
365 return nil
366 }
367
368 klog.Infof("Bringing up %d machines...", len(toProvision))
369 for _, res := range toProvision {
370 p.config.DeviceCreationLimiter.Wait(ctx)
371 if err := p.provision(ctx, sess, res); err != nil {
372 klog.Errorf("Failed to provision reservation %s: %v", res.ID, err)
373 until := time.Now().Add(time.Hour)
374 klog.Errorf("Adding hardware reservation %s to sinbin until %s", res.ID, until)
375 p.badReservations.Add(res.ID, until)
376 }
377 }
378
379 return nil
380}
381
382// provision attempts to create a device within Equinix using given Hardware
383// Reservation rsv. The resulting device is registered with BMDB, and tagged as
384// "provided" in the process.
385func (pr *Provisioner) provision(ctx context.Context, sess *bmdb.Session, rsv packngo.HardwareReservation) error {
386 klog.Infof("Creating a new device using reservation ID %s.", rsv.ID)
387 hostname := pr.sharedConfig.DevicePrefix + rsv.ID[:18]
388 kid, err := pr.sharedConfig.sshEquinixId(ctx, pr.cl)
389 if err != nil {
390 return err
391 }
392 req := &packngo.DeviceCreateRequest{
393 Hostname: hostname,
394 OS: pr.config.OS,
395 Plan: rsv.Plan.Slug,
396 ProjectID: pr.sharedConfig.ProjectId,
397 HardwareReservationID: rsv.ID,
398 ProjectSSHKeys: []string{kid},
399 }
400 nd, err := pr.cl.CreateDevice(ctx, req)
401 if err != nil {
402 return fmt.Errorf("while creating new device within Equinix: %w", err)
403 }
404 klog.Infof("Created a new device within Equinix (PID: %s).", nd.ID)
405
406 err = pr.assimilate(ctx, sess, nd.ID)
407 if err != nil {
408 // TODO(mateusz@monogon.tech) at this point the device at Equinix isn't
409 // matched by a BMDB record. Schedule device deletion or make sure this
410 // case is being handled elsewhere.
411 return err
412 }
413 return nil
414}
415
416// assimilate brings in an already existing machine from Equinix into the BMDB.
417// This is only used in manual testing.
418func (pr *Provisioner) assimilate(ctx context.Context, sess *bmdb.Session, deviceID string) error {
419 return sess.Transact(ctx, func(q *model.Queries) error {
420 // Create a new machine record within BMDB.
421 m, err := q.NewMachine(ctx)
422 if err != nil {
423 return fmt.Errorf("while creating a new machine record in BMDB: %w", err)
424 }
425
426 // Link the new machine with the Equinix device, and tag it "provided".
427 p := model.MachineAddProvidedParams{
428 MachineID: m.MachineID,
429 ProviderID: deviceID,
430 Provider: model.ProviderEquinix,
431 }
432 klog.Infof("Setting \"provided\" tag (ID: %s, PID: %s, Provider: %s).", p.MachineID, p.ProviderID, p.Provider)
433 if err := q.MachineAddProvided(ctx, p); err != nil {
434 return fmt.Errorf("while tagging machine active: %w", err)
435 }
436 return nil
437 })
438}