blob: 0557dbf6d02b423ad2966df24b647f9f0b25fff2 [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "errors"
6 "flag"
7 "fmt"
8 "sort"
9 "time"
10
11 "github.com/google/uuid"
12 "github.com/packethost/packngo"
13 "golang.org/x/time/rate"
14 "k8s.io/klog/v2"
15
16 "source.monogon.dev/cloud/bmaas/bmdb"
17 "source.monogon.dev/cloud/bmaas/bmdb/model"
18 "source.monogon.dev/cloud/lib/sinbin"
19 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
20)
21
22// ProvisionerConfig configures the provisioning process.
23type ProvisionerConfig struct {
24 // OS defines the operating system new devices are created with. Its format
25 // is specified by Equinix API.
26 OS string
27 // MaxCount is the maximum count of managed servers. No new devices will be
28 // created after reaching the limit. No attempt will be made to reduce the
29 // server count.
30 MaxCount uint
31
32 // ReconcileLoopLimiter limits the rate of the main reconciliation loop
33 // iterating. As new machines are being provisioned, each loop will cause one
34 // 'long' ListHardwareReservations call to Equinix.
35 ReconcileLoopLimiter *rate.Limiter
36
37 // DeviceCreation limits the rate at which devices are created within
38 // Equinix through use of appropriate API calls.
39 DeviceCreationLimiter *rate.Limiter
40
41 // Assimilate Equinix machines that match the configured device prefix into the
42 // BMDB as Provided. This should only be used for manual testing with
43 // -bmdb_eat_my_data.
44 Assimilate bool
45
46 // ReservationChunkSize is how many Equinix machines will try to be spawned in a
47 // single reconciliation loop. Higher numbers allow for faster initial
48 // provisioning, but lower numbers decrease potential raciness with other systems
49 // and make sure that other parts of the reconciliation logic are ran regularly.
50 //
51 // 20 is decent starting point.
52 ReservationChunkSize uint
Tim Windelschmidt28fcddc2023-04-19 15:34:25 +020053
54 // UseProjectKeys defines if the provisioner adds all ssh keys defined inside
55 // the used project to every new machine. This is only used for debug purposes.
56 UseProjectKeys bool
Serge Bazanskicaa12082023-02-16 14:54:04 +010057}
58
59func (p *ProvisionerConfig) RegisterFlags() {
60 flag.StringVar(&p.OS, "provisioner_os", "ubuntu_20_04", "OS that provisioner will deploy on Equinix machines. Not the target OS for cluster customers.")
61 flag.UintVar(&p.MaxCount, "provisioner_max_machines", 50, "Limit of machines that the provisioner will attempt to pull into the BMDB. Zero for no limit.")
62 flagLimiter(&p.ReconcileLoopLimiter, "provisioner_reconciler_rate", "1m,1", "Rate limiting for main provisioner reconciliation loop")
63 flagLimiter(&p.DeviceCreationLimiter, "provisioner_device_creation_rate", "5s,1", "Rate limiting for Equinix device/machine creation")
64 flag.BoolVar(&p.Assimilate, "provisioner_assimilate", false, "Assimilate matching machines in Equinix project into BMDB as Provided. Only to be used when manually testing.")
65 flag.UintVar(&p.ReservationChunkSize, "provisioner_reservation_chunk_size", 20, "How many machines will the provisioner attempt to create in a single reconciliation loop iteration")
Tim Windelschmidt28fcddc2023-04-19 15:34:25 +020066 flag.BoolVar(&p.UseProjectKeys, "provisioner_use_project_keys", false, "Add all Equinix project keys to newly provisioned machines, not just the provisioner's managed key. Debug/development only.")
Serge Bazanskicaa12082023-02-16 14:54:04 +010067}
68
69// Provisioner implements the server provisioning logic. Provisioning entails
70// bringing all available hardware reservations (subject to limits) into BMDB as
71// machines provided by Equinix.
72type Provisioner struct {
73 config *ProvisionerConfig
74 sharedConfig *SharedConfig
75
76 // cl is the wrapngo client instance used.
77 cl ecl.Client
78
79 // badReservations is a holiday resort for Equinix hardware reservations which
80 // failed to be provisioned for some reason or another. We keep a list of them in
81 // memory just so that we don't repeatedly try to provision the same known bad
82 // machines.
83 badReservations sinbin.Sinbin[string]
84}
85
86// New creates a Provisioner instance, checking ProvisionerConfig and
87// SharedConfig for errors.
88func (c *ProvisionerConfig) New(cl ecl.Client, sc *SharedConfig) (*Provisioner, error) {
89 // If these are unset, it's probably because someone is using us as a library.
90 // Provide error messages useful to code users instead of flag names.
91 if c.OS == "" {
92 return nil, fmt.Errorf("OS must be set")
93 }
94 if c.ReconcileLoopLimiter == nil {
95 return nil, fmt.Errorf("ReconcileLoopLimiter must be set")
96 }
97 if c.DeviceCreationLimiter == nil {
98 return nil, fmt.Errorf("DeviceCreationLimiter must be set")
99 }
100 if c.ReservationChunkSize == 0 {
101 return nil, fmt.Errorf("ReservationChunkSize must be set")
102 }
103 return &Provisioner{
104 config: c,
105 sharedConfig: sc,
106
107 cl: cl,
108 }, nil
109}
110
111// Run the provisioner blocking the current goroutine until the given context
112// expires.
113func (p *Provisioner) Run(ctx context.Context, conn *bmdb.Connection) error {
114
115 var sess *bmdb.Session
116 var err error
117 for {
118 if sess == nil {
119 sess, err = conn.StartSession(ctx)
120 if err != nil {
121 return fmt.Errorf("could not start BMDB session: %w", err)
122 }
123 }
124 err = p.runInSession(ctx, sess)
125
126 switch {
127 case err == nil:
128 case errors.Is(err, ctx.Err()):
129 return err
130 case errors.Is(err, bmdb.ErrSessionExpired):
131 klog.Errorf("Session expired, restarting...")
132 sess = nil
133 time.Sleep(time.Second)
134 case err != nil:
135 klog.Errorf("Processing failed: %v", err)
136 // TODO(q3k): close session
137 time.Sleep(time.Second)
138 }
139 }
140}
141
142type machineListing struct {
143 machines []uuid.UUID
144 err error
145}
146
147// runInSession executes one iteration of the provisioner's control loop within a
148// BMDB session. This control loop attempts to bring all Equinix hardware
149// reservations into machines in the BMDB, subject to limits.
150func (p *Provisioner) runInSession(ctx context.Context, sess *bmdb.Session) error {
151 if err := p.config.ReconcileLoopLimiter.Wait(ctx); err != nil {
152 return err
153 }
154
155 providerC := make(chan *machineListing, 1)
156 bmdbC := make(chan *machineListing, 1)
157
158 klog.Infof("Getting provider and bmdb machines...")
159
160 // Make sub-context for two parallel operations, and so that we can cancel one
161 // immediately if the other fails.
162 subCtx, subCtxC := context.WithCancel(ctx)
163 defer subCtxC()
164
165 go func() {
166 machines, err := p.listInProvider(subCtx)
167 providerC <- &machineListing{
168 machines: machines,
169 err: err,
170 }
171 }()
172 go func() {
173 machines, err := p.listInBMDB(subCtx, sess)
174 bmdbC <- &machineListing{
175 machines: machines,
176 err: err,
177 }
178 }()
179 var inProvider, inBMDB *machineListing
180 for {
181 select {
182 case inProvider = <-providerC:
183 if err := inProvider.err; err != nil {
184 return fmt.Errorf("listing provider machines failed: %w", err)
185 }
186 klog.Infof("Got %d machines managed in provider.", len(inProvider.machines))
187 case inBMDB = <-bmdbC:
188 if err := inBMDB.err; err != nil {
189 return fmt.Errorf("listing BMDB machines failed: %w", err)
190 }
191 klog.Infof("Got %d machines in BMDB.", len(inBMDB.machines))
192 }
193 if inProvider != nil && inBMDB != nil {
194 break
195 }
196 }
197
198 subCtxC()
199 if err := p.reconcile(ctx, sess, inProvider.machines, inBMDB.machines); err != nil {
200 return fmt.Errorf("reconciliation failed: %w", err)
201 }
202 return nil
203}
204
205// listInProviders returns all machines that the provider thinks we should be
206// managing.
207func (p *Provisioner) listInProvider(ctx context.Context) ([]uuid.UUID, error) {
208 devices, err := p.sharedConfig.managedDevices(ctx, p.cl)
209 if err != nil {
210 return nil, fmt.Errorf("while fetching managed machines: %w", err)
211 }
212 var pvr []uuid.UUID
213 for _, dev := range devices {
214 id, err := uuid.Parse(dev.ID)
215 if err != nil {
216 klog.Errorf("Device ID %q is not UUID, skipping", dev.ID)
217 } else {
218 pvr = append(pvr, id)
219 }
220 }
221 sort.Slice(pvr, func(i, j int) bool {
222 return pvr[i].String() < pvr[j].String()
223 })
224 return pvr, nil
225}
226
227// listInBMDB returns all the machines that the BMDB thinks we should be managing.
228func (p *Provisioner) listInBMDB(ctx context.Context, sess *bmdb.Session) ([]uuid.UUID, error) {
229 var res []uuid.UUID
230 err := sess.Transact(ctx, func(q *model.Queries) error {
231 machines, err := q.GetProvidedMachines(ctx, model.ProviderEquinix)
232 if err != nil {
233 return err
234 }
235 res = make([]uuid.UUID, len(machines))
236 for i, machine := range machines {
237 id, err := uuid.Parse(machine.ProviderID)
238 if err != nil {
239 klog.Errorf("BMDB machine %s has unparseable provider ID %q", machine.MachineID, machine.ProviderID)
240 } else {
241 res[i] = id
242 }
243 }
244 return nil
245 })
246 if err != nil {
247 return nil, err
248 }
249 sort.Slice(res, func(i, j int) bool {
250 return res[i].String() < res[j].String()
251 })
252 return res, nil
253}
254
255// reconcile takes a list of machines that the provider thinks we should be
256// managing and that the BMDB thinks we should be managing, and tries to make
257// sense of that. First, some checks are performed across the two lists to make
258// sure we haven't dropped anything. Then, additional machines are deployed from
259// hardware reservations as needed.
260func (p *Provisioner) reconcile(ctx context.Context, sess *bmdb.Session, inProvider, inBMDB []uuid.UUID) error {
261 klog.Infof("Reconciling...")
262
263 bmdb := make(map[string]bool)
264 provider := make(map[string]bool)
265 for _, machine := range inProvider {
266 provider[machine.String()] = true
267 }
268 for _, machine := range inBMDB {
269 bmdb[machine.String()] = true
270 }
271
272 managed := make(map[string]bool)
273
274 // Some desynchronization between the BMDB and Provider point of view might be so
275 // bad we shouldn't attempt to do any work, at least not any time soon.
276 badbadnotgood := false
277
278 // Find any machines supposedly managed by us in the provider, but not in the
279 // BMDB, and assimilate them if so configured.
280 for machine, _ := range provider {
281 if bmdb[machine] {
282 managed[machine] = true
283 continue
284 }
285 if p.config.Assimilate {
286 klog.Warningf("Provider machine %s has no corresponding machine in BMDB. Assimilating it.", machine)
287 if err := p.assimilate(ctx, sess, machine); err != nil {
288 klog.Errorf("Failed to assimilate: %v", err)
289 } else {
290 managed[machine] = true
291 }
292 } else {
293 klog.Errorf("Provider machine %s has no corresponding machine in BMDB.", machine)
294 badbadnotgood = true
295 }
296 }
297
298 // Find any machines in the BMDB but not in the provider.
299 for machine, _ := range bmdb {
300 if !provider[machine] {
301 klog.Errorf("Provider device ID %s referred to in BMDB (from TODO) but missing in provider.", machine)
302 badbadnotgood = true
303 }
304 }
305
306 // Bail if things are weird.
307 if badbadnotgood {
308 klog.Errorf("Something's very wrong. Bailing early and refusing to do any work.")
309 return fmt.Errorf("fatal discrepency between BMDB and provider")
310 }
311
312 // Summarize all managed machines, which is the intersection of BMDB and
313 // Provisioner machines, usually both of these sets being equal.
314 nmanaged := len(managed)
315 klog.Infof("Total managed machines: %d", nmanaged)
316
317 if p.config.MaxCount != 0 && p.config.MaxCount <= uint(nmanaged) {
318 klog.Infof("Not bringing up more machines (at limit of %d machines)", p.config.MaxCount)
319 return nil
320 }
321
322 limitName := "no limit"
323 if p.config.MaxCount != 0 {
324 limitName = fmt.Sprintf("%d", p.config.MaxCount)
325 }
326 klog.Infof("Below managed machine limit (%s), bringing up more...", limitName)
327 klog.Infof("Retrieving hardware reservations, this will take a while...")
328 reservations, err := p.cl.ListReservations(ctx, p.sharedConfig.ProjectId)
329 if err != nil {
330 return fmt.Errorf("failed to list reservations: %w", err)
331 }
332
333 // Collect all reservations.
334 var toProvision []packngo.HardwareReservation
Tim Windelschmidtea946632023-04-18 03:46:58 +0200335 var inUse, notProvisionable, penalized int
Serge Bazanskicaa12082023-02-16 14:54:04 +0100336 for _, reservation := range reservations {
Tim Windelschmidtea946632023-04-18 03:46:58 +0200337 if reservation.Device != nil {
338 inUse++
Serge Bazanskicaa12082023-02-16 14:54:04 +0100339 continue
340 }
Tim Windelschmidtea946632023-04-18 03:46:58 +0200341 if !reservation.Provisionable {
342 notProvisionable++
343 continue
Serge Bazanskicaa12082023-02-16 14:54:04 +0100344 }
345 if p.badReservations.Penalized(reservation.ID) {
Tim Windelschmidtea946632023-04-18 03:46:58 +0200346 penalized++
Serge Bazanskicaa12082023-02-16 14:54:04 +0100347 continue
348 }
349 toProvision = append(toProvision, reservation)
350 }
Tim Windelschmidtea946632023-04-18 03:46:58 +0200351 klog.Infof("Retrieved hardware reservations: %d (total), %d (available), %d (in use), %d (not provisionable), %d (penalized)", len(reservations), len(toProvision), inUse, notProvisionable, penalized)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100352
353 // Limit them to MaxCount, if applicable.
354 if p.config.MaxCount != 0 {
355 needed := int(p.config.MaxCount) - nmanaged
356 if len(toProvision) < needed {
357 needed = len(toProvision)
358 }
359 toProvision = toProvision[:needed]
360 }
361
362 // Limit them to an arbitrary 'chunk' size so that we don't do too many things in
363 // a single reconciliation operation.
364 if uint(len(toProvision)) > p.config.ReservationChunkSize {
365 toProvision = toProvision[:p.config.ReservationChunkSize]
366 }
367
368 if len(toProvision) == 0 {
369 klog.Infof("No more hardware reservations available, or all filtered out.")
370 return nil
371 }
372
373 klog.Infof("Bringing up %d machines...", len(toProvision))
374 for _, res := range toProvision {
375 p.config.DeviceCreationLimiter.Wait(ctx)
376 if err := p.provision(ctx, sess, res); err != nil {
377 klog.Errorf("Failed to provision reservation %s: %v", res.ID, err)
378 until := time.Now().Add(time.Hour)
379 klog.Errorf("Adding hardware reservation %s to sinbin until %s", res.ID, until)
380 p.badReservations.Add(res.ID, until)
381 }
382 }
383
384 return nil
385}
386
387// provision attempts to create a device within Equinix using given Hardware
388// Reservation rsv. The resulting device is registered with BMDB, and tagged as
389// "provided" in the process.
390func (pr *Provisioner) provision(ctx context.Context, sess *bmdb.Session, rsv packngo.HardwareReservation) error {
391 klog.Infof("Creating a new device using reservation ID %s.", rsv.ID)
392 hostname := pr.sharedConfig.DevicePrefix + rsv.ID[:18]
393 kid, err := pr.sharedConfig.sshEquinixId(ctx, pr.cl)
394 if err != nil {
395 return err
396 }
397 req := &packngo.DeviceCreateRequest{
398 Hostname: hostname,
399 OS: pr.config.OS,
400 Plan: rsv.Plan.Slug,
401 ProjectID: pr.sharedConfig.ProjectId,
402 HardwareReservationID: rsv.ID,
403 ProjectSSHKeys: []string{kid},
404 }
Tim Windelschmidt28fcddc2023-04-19 15:34:25 +0200405 if pr.config.UseProjectKeys {
406 klog.Warningf("INSECURE: Machines will be created with ALL PROJECT SSH KEYS!")
407 req.ProjectSSHKeys = nil
408 }
409
Serge Bazanskicaa12082023-02-16 14:54:04 +0100410 nd, err := pr.cl.CreateDevice(ctx, req)
411 if err != nil {
412 return fmt.Errorf("while creating new device within Equinix: %w", err)
413 }
414 klog.Infof("Created a new device within Equinix (PID: %s).", nd.ID)
415
416 err = pr.assimilate(ctx, sess, nd.ID)
417 if err != nil {
418 // TODO(mateusz@monogon.tech) at this point the device at Equinix isn't
419 // matched by a BMDB record. Schedule device deletion or make sure this
420 // case is being handled elsewhere.
421 return err
422 }
423 return nil
424}
425
426// assimilate brings in an already existing machine from Equinix into the BMDB.
427// This is only used in manual testing.
428func (pr *Provisioner) assimilate(ctx context.Context, sess *bmdb.Session, deviceID string) error {
429 return sess.Transact(ctx, func(q *model.Queries) error {
430 // Create a new machine record within BMDB.
431 m, err := q.NewMachine(ctx)
432 if err != nil {
433 return fmt.Errorf("while creating a new machine record in BMDB: %w", err)
434 }
435
436 // Link the new machine with the Equinix device, and tag it "provided".
437 p := model.MachineAddProvidedParams{
438 MachineID: m.MachineID,
439 ProviderID: deviceID,
440 Provider: model.ProviderEquinix,
441 }
442 klog.Infof("Setting \"provided\" tag (ID: %s, PID: %s, Provider: %s).", p.MachineID, p.ProviderID, p.Provider)
443 if err := q.MachineAddProvided(ctx, p); err != nil {
444 return fmt.Errorf("while tagging machine active: %w", err)
445 }
446 return nil
447 })
448}