blob: 03dffa6c01d8ff6f7b648baeb4ae98b72b612fa2 [file] [log] [blame]
Serge Bazanskicaa12082023-02-16 14:54:04 +01001package manager
2
3import (
4 "context"
5 "errors"
6 "flag"
7 "fmt"
8 "sort"
9 "time"
10
11 "github.com/google/uuid"
12 "github.com/packethost/packngo"
13 "golang.org/x/time/rate"
14 "k8s.io/klog/v2"
15
16 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanskic50f6942023-04-24 18:27:22 +020017 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
Serge Bazanskicaa12082023-02-16 14:54:04 +010018 "source.monogon.dev/cloud/bmaas/bmdb/model"
19 "source.monogon.dev/cloud/lib/sinbin"
20 ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
21)
22
23// ProvisionerConfig configures the provisioning process.
24type ProvisionerConfig struct {
25 // OS defines the operating system new devices are created with. Its format
26 // is specified by Equinix API.
27 OS string
28 // MaxCount is the maximum count of managed servers. No new devices will be
29 // created after reaching the limit. No attempt will be made to reduce the
30 // server count.
31 MaxCount uint
32
33 // ReconcileLoopLimiter limits the rate of the main reconciliation loop
34 // iterating. As new machines are being provisioned, each loop will cause one
35 // 'long' ListHardwareReservations call to Equinix.
36 ReconcileLoopLimiter *rate.Limiter
37
38 // DeviceCreation limits the rate at which devices are created within
39 // Equinix through use of appropriate API calls.
40 DeviceCreationLimiter *rate.Limiter
41
42 // Assimilate Equinix machines that match the configured device prefix into the
43 // BMDB as Provided. This should only be used for manual testing with
44 // -bmdb_eat_my_data.
45 Assimilate bool
46
47 // ReservationChunkSize is how many Equinix machines will try to be spawned in a
48 // single reconciliation loop. Higher numbers allow for faster initial
49 // provisioning, but lower numbers decrease potential raciness with other systems
50 // and make sure that other parts of the reconciliation logic are ran regularly.
51 //
52 // 20 is decent starting point.
53 ReservationChunkSize uint
Tim Windelschmidt28fcddc2023-04-19 15:34:25 +020054
55 // UseProjectKeys defines if the provisioner adds all ssh keys defined inside
56 // the used project to every new machine. This is only used for debug purposes.
57 UseProjectKeys bool
Serge Bazanskicaa12082023-02-16 14:54:04 +010058}
59
60func (p *ProvisionerConfig) RegisterFlags() {
61 flag.StringVar(&p.OS, "provisioner_os", "ubuntu_20_04", "OS that provisioner will deploy on Equinix machines. Not the target OS for cluster customers.")
62 flag.UintVar(&p.MaxCount, "provisioner_max_machines", 50, "Limit of machines that the provisioner will attempt to pull into the BMDB. Zero for no limit.")
63 flagLimiter(&p.ReconcileLoopLimiter, "provisioner_reconciler_rate", "1m,1", "Rate limiting for main provisioner reconciliation loop")
64 flagLimiter(&p.DeviceCreationLimiter, "provisioner_device_creation_rate", "5s,1", "Rate limiting for Equinix device/machine creation")
65 flag.BoolVar(&p.Assimilate, "provisioner_assimilate", false, "Assimilate matching machines in Equinix project into BMDB as Provided. Only to be used when manually testing.")
66 flag.UintVar(&p.ReservationChunkSize, "provisioner_reservation_chunk_size", 20, "How many machines will the provisioner attempt to create in a single reconciliation loop iteration")
Tim Windelschmidt28fcddc2023-04-19 15:34:25 +020067 flag.BoolVar(&p.UseProjectKeys, "provisioner_use_project_keys", false, "Add all Equinix project keys to newly provisioned machines, not just the provisioner's managed key. Debug/development only.")
Serge Bazanskicaa12082023-02-16 14:54:04 +010068}
69
70// Provisioner implements the server provisioning logic. Provisioning entails
71// bringing all available hardware reservations (subject to limits) into BMDB as
72// machines provided by Equinix.
73type Provisioner struct {
74 config *ProvisionerConfig
75 sharedConfig *SharedConfig
76
77 // cl is the wrapngo client instance used.
78 cl ecl.Client
79
80 // badReservations is a holiday resort for Equinix hardware reservations which
81 // failed to be provisioned for some reason or another. We keep a list of them in
82 // memory just so that we don't repeatedly try to provision the same known bad
83 // machines.
84 badReservations sinbin.Sinbin[string]
85}
86
87// New creates a Provisioner instance, checking ProvisionerConfig and
88// SharedConfig for errors.
89func (c *ProvisionerConfig) New(cl ecl.Client, sc *SharedConfig) (*Provisioner, error) {
90 // If these are unset, it's probably because someone is using us as a library.
91 // Provide error messages useful to code users instead of flag names.
92 if c.OS == "" {
93 return nil, fmt.Errorf("OS must be set")
94 }
95 if c.ReconcileLoopLimiter == nil {
96 return nil, fmt.Errorf("ReconcileLoopLimiter must be set")
97 }
98 if c.DeviceCreationLimiter == nil {
99 return nil, fmt.Errorf("DeviceCreationLimiter must be set")
100 }
101 if c.ReservationChunkSize == 0 {
102 return nil, fmt.Errorf("ReservationChunkSize must be set")
103 }
104 return &Provisioner{
105 config: c,
106 sharedConfig: sc,
107
108 cl: cl,
109 }, nil
110}
111
112// Run the provisioner blocking the current goroutine until the given context
113// expires.
114func (p *Provisioner) Run(ctx context.Context, conn *bmdb.Connection) error {
115
116 var sess *bmdb.Session
117 var err error
118 for {
119 if sess == nil {
Serge Bazanskic50f6942023-04-24 18:27:22 +0200120 sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdProvisioner})
Serge Bazanskicaa12082023-02-16 14:54:04 +0100121 if err != nil {
122 return fmt.Errorf("could not start BMDB session: %w", err)
123 }
124 }
125 err = p.runInSession(ctx, sess)
126
127 switch {
128 case err == nil:
129 case errors.Is(err, ctx.Err()):
130 return err
131 case errors.Is(err, bmdb.ErrSessionExpired):
132 klog.Errorf("Session expired, restarting...")
133 sess = nil
134 time.Sleep(time.Second)
135 case err != nil:
136 klog.Errorf("Processing failed: %v", err)
137 // TODO(q3k): close session
138 time.Sleep(time.Second)
139 }
140 }
141}
142
143type machineListing struct {
144 machines []uuid.UUID
145 err error
146}
147
148// runInSession executes one iteration of the provisioner's control loop within a
149// BMDB session. This control loop attempts to bring all Equinix hardware
150// reservations into machines in the BMDB, subject to limits.
151func (p *Provisioner) runInSession(ctx context.Context, sess *bmdb.Session) error {
152 if err := p.config.ReconcileLoopLimiter.Wait(ctx); err != nil {
153 return err
154 }
155
156 providerC := make(chan *machineListing, 1)
157 bmdbC := make(chan *machineListing, 1)
158
159 klog.Infof("Getting provider and bmdb machines...")
160
161 // Make sub-context for two parallel operations, and so that we can cancel one
162 // immediately if the other fails.
163 subCtx, subCtxC := context.WithCancel(ctx)
164 defer subCtxC()
165
166 go func() {
167 machines, err := p.listInProvider(subCtx)
168 providerC <- &machineListing{
169 machines: machines,
170 err: err,
171 }
172 }()
173 go func() {
174 machines, err := p.listInBMDB(subCtx, sess)
175 bmdbC <- &machineListing{
176 machines: machines,
177 err: err,
178 }
179 }()
180 var inProvider, inBMDB *machineListing
181 for {
182 select {
183 case inProvider = <-providerC:
184 if err := inProvider.err; err != nil {
185 return fmt.Errorf("listing provider machines failed: %w", err)
186 }
187 klog.Infof("Got %d machines managed in provider.", len(inProvider.machines))
188 case inBMDB = <-bmdbC:
189 if err := inBMDB.err; err != nil {
190 return fmt.Errorf("listing BMDB machines failed: %w", err)
191 }
192 klog.Infof("Got %d machines in BMDB.", len(inBMDB.machines))
193 }
194 if inProvider != nil && inBMDB != nil {
195 break
196 }
197 }
198
199 subCtxC()
200 if err := p.reconcile(ctx, sess, inProvider.machines, inBMDB.machines); err != nil {
201 return fmt.Errorf("reconciliation failed: %w", err)
202 }
203 return nil
204}
205
206// listInProviders returns all machines that the provider thinks we should be
207// managing.
208func (p *Provisioner) listInProvider(ctx context.Context) ([]uuid.UUID, error) {
209 devices, err := p.sharedConfig.managedDevices(ctx, p.cl)
210 if err != nil {
211 return nil, fmt.Errorf("while fetching managed machines: %w", err)
212 }
213 var pvr []uuid.UUID
214 for _, dev := range devices {
215 id, err := uuid.Parse(dev.ID)
216 if err != nil {
217 klog.Errorf("Device ID %q is not UUID, skipping", dev.ID)
218 } else {
219 pvr = append(pvr, id)
220 }
221 }
222 sort.Slice(pvr, func(i, j int) bool {
223 return pvr[i].String() < pvr[j].String()
224 })
225 return pvr, nil
226}
227
228// listInBMDB returns all the machines that the BMDB thinks we should be managing.
229func (p *Provisioner) listInBMDB(ctx context.Context, sess *bmdb.Session) ([]uuid.UUID, error) {
230 var res []uuid.UUID
231 err := sess.Transact(ctx, func(q *model.Queries) error {
232 machines, err := q.GetProvidedMachines(ctx, model.ProviderEquinix)
233 if err != nil {
234 return err
235 }
236 res = make([]uuid.UUID, len(machines))
237 for i, machine := range machines {
238 id, err := uuid.Parse(machine.ProviderID)
239 if err != nil {
240 klog.Errorf("BMDB machine %s has unparseable provider ID %q", machine.MachineID, machine.ProviderID)
241 } else {
242 res[i] = id
243 }
244 }
245 return nil
246 })
247 if err != nil {
248 return nil, err
249 }
250 sort.Slice(res, func(i, j int) bool {
251 return res[i].String() < res[j].String()
252 })
253 return res, nil
254}
255
256// reconcile takes a list of machines that the provider thinks we should be
257// managing and that the BMDB thinks we should be managing, and tries to make
258// sense of that. First, some checks are performed across the two lists to make
259// sure we haven't dropped anything. Then, additional machines are deployed from
260// hardware reservations as needed.
261func (p *Provisioner) reconcile(ctx context.Context, sess *bmdb.Session, inProvider, inBMDB []uuid.UUID) error {
262 klog.Infof("Reconciling...")
263
264 bmdb := make(map[string]bool)
265 provider := make(map[string]bool)
266 for _, machine := range inProvider {
267 provider[machine.String()] = true
268 }
269 for _, machine := range inBMDB {
270 bmdb[machine.String()] = true
271 }
272
273 managed := make(map[string]bool)
274
275 // Some desynchronization between the BMDB and Provider point of view might be so
276 // bad we shouldn't attempt to do any work, at least not any time soon.
277 badbadnotgood := false
278
279 // Find any machines supposedly managed by us in the provider, but not in the
280 // BMDB, and assimilate them if so configured.
281 for machine, _ := range provider {
282 if bmdb[machine] {
283 managed[machine] = true
284 continue
285 }
286 if p.config.Assimilate {
287 klog.Warningf("Provider machine %s has no corresponding machine in BMDB. Assimilating it.", machine)
288 if err := p.assimilate(ctx, sess, machine); err != nil {
289 klog.Errorf("Failed to assimilate: %v", err)
290 } else {
291 managed[machine] = true
292 }
293 } else {
294 klog.Errorf("Provider machine %s has no corresponding machine in BMDB.", machine)
295 badbadnotgood = true
296 }
297 }
298
299 // Find any machines in the BMDB but not in the provider.
300 for machine, _ := range bmdb {
301 if !provider[machine] {
302 klog.Errorf("Provider device ID %s referred to in BMDB (from TODO) but missing in provider.", machine)
303 badbadnotgood = true
304 }
305 }
306
307 // Bail if things are weird.
308 if badbadnotgood {
309 klog.Errorf("Something's very wrong. Bailing early and refusing to do any work.")
310 return fmt.Errorf("fatal discrepency between BMDB and provider")
311 }
312
313 // Summarize all managed machines, which is the intersection of BMDB and
314 // Provisioner machines, usually both of these sets being equal.
315 nmanaged := len(managed)
316 klog.Infof("Total managed machines: %d", nmanaged)
317
318 if p.config.MaxCount != 0 && p.config.MaxCount <= uint(nmanaged) {
319 klog.Infof("Not bringing up more machines (at limit of %d machines)", p.config.MaxCount)
320 return nil
321 }
322
323 limitName := "no limit"
324 if p.config.MaxCount != 0 {
325 limitName = fmt.Sprintf("%d", p.config.MaxCount)
326 }
327 klog.Infof("Below managed machine limit (%s), bringing up more...", limitName)
328 klog.Infof("Retrieving hardware reservations, this will take a while...")
329 reservations, err := p.cl.ListReservations(ctx, p.sharedConfig.ProjectId)
330 if err != nil {
331 return fmt.Errorf("failed to list reservations: %w", err)
332 }
333
334 // Collect all reservations.
335 var toProvision []packngo.HardwareReservation
Tim Windelschmidtea946632023-04-18 03:46:58 +0200336 var inUse, notProvisionable, penalized int
Serge Bazanskicaa12082023-02-16 14:54:04 +0100337 for _, reservation := range reservations {
Tim Windelschmidtea946632023-04-18 03:46:58 +0200338 if reservation.Device != nil {
339 inUse++
Serge Bazanskicaa12082023-02-16 14:54:04 +0100340 continue
341 }
Tim Windelschmidtea946632023-04-18 03:46:58 +0200342 if !reservation.Provisionable {
343 notProvisionable++
344 continue
Serge Bazanskicaa12082023-02-16 14:54:04 +0100345 }
346 if p.badReservations.Penalized(reservation.ID) {
Tim Windelschmidtea946632023-04-18 03:46:58 +0200347 penalized++
Serge Bazanskicaa12082023-02-16 14:54:04 +0100348 continue
349 }
350 toProvision = append(toProvision, reservation)
351 }
Tim Windelschmidtea946632023-04-18 03:46:58 +0200352 klog.Infof("Retrieved hardware reservations: %d (total), %d (available), %d (in use), %d (not provisionable), %d (penalized)", len(reservations), len(toProvision), inUse, notProvisionable, penalized)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100353
354 // Limit them to MaxCount, if applicable.
355 if p.config.MaxCount != 0 {
356 needed := int(p.config.MaxCount) - nmanaged
357 if len(toProvision) < needed {
358 needed = len(toProvision)
359 }
360 toProvision = toProvision[:needed]
361 }
362
363 // Limit them to an arbitrary 'chunk' size so that we don't do too many things in
364 // a single reconciliation operation.
365 if uint(len(toProvision)) > p.config.ReservationChunkSize {
366 toProvision = toProvision[:p.config.ReservationChunkSize]
367 }
368
369 if len(toProvision) == 0 {
370 klog.Infof("No more hardware reservations available, or all filtered out.")
371 return nil
372 }
373
374 klog.Infof("Bringing up %d machines...", len(toProvision))
375 for _, res := range toProvision {
376 p.config.DeviceCreationLimiter.Wait(ctx)
377 if err := p.provision(ctx, sess, res); err != nil {
378 klog.Errorf("Failed to provision reservation %s: %v", res.ID, err)
379 until := time.Now().Add(time.Hour)
380 klog.Errorf("Adding hardware reservation %s to sinbin until %s", res.ID, until)
381 p.badReservations.Add(res.ID, until)
382 }
383 }
384
385 return nil
386}
387
388// provision attempts to create a device within Equinix using given Hardware
389// Reservation rsv. The resulting device is registered with BMDB, and tagged as
390// "provided" in the process.
391func (pr *Provisioner) provision(ctx context.Context, sess *bmdb.Session, rsv packngo.HardwareReservation) error {
392 klog.Infof("Creating a new device using reservation ID %s.", rsv.ID)
393 hostname := pr.sharedConfig.DevicePrefix + rsv.ID[:18]
394 kid, err := pr.sharedConfig.sshEquinixId(ctx, pr.cl)
395 if err != nil {
396 return err
397 }
398 req := &packngo.DeviceCreateRequest{
399 Hostname: hostname,
400 OS: pr.config.OS,
401 Plan: rsv.Plan.Slug,
402 ProjectID: pr.sharedConfig.ProjectId,
403 HardwareReservationID: rsv.ID,
404 ProjectSSHKeys: []string{kid},
405 }
Tim Windelschmidt28fcddc2023-04-19 15:34:25 +0200406 if pr.config.UseProjectKeys {
407 klog.Warningf("INSECURE: Machines will be created with ALL PROJECT SSH KEYS!")
408 req.ProjectSSHKeys = nil
409 }
410
Serge Bazanskicaa12082023-02-16 14:54:04 +0100411 nd, err := pr.cl.CreateDevice(ctx, req)
412 if err != nil {
413 return fmt.Errorf("while creating new device within Equinix: %w", err)
414 }
Tim Windelschmidtff619352023-04-19 15:46:08 +0200415 klog.Infof("Created a new device within Equinix (RID: %s, PID: %s, HOST: %s)", rsv.ID, nd.ID, hostname)
Serge Bazanskicaa12082023-02-16 14:54:04 +0100416
417 err = pr.assimilate(ctx, sess, nd.ID)
418 if err != nil {
419 // TODO(mateusz@monogon.tech) at this point the device at Equinix isn't
420 // matched by a BMDB record. Schedule device deletion or make sure this
421 // case is being handled elsewhere.
422 return err
423 }
424 return nil
425}
426
427// assimilate brings in an already existing machine from Equinix into the BMDB.
428// This is only used in manual testing.
429func (pr *Provisioner) assimilate(ctx context.Context, sess *bmdb.Session, deviceID string) error {
430 return sess.Transact(ctx, func(q *model.Queries) error {
431 // Create a new machine record within BMDB.
432 m, err := q.NewMachine(ctx)
433 if err != nil {
434 return fmt.Errorf("while creating a new machine record in BMDB: %w", err)
435 }
436
437 // Link the new machine with the Equinix device, and tag it "provided".
438 p := model.MachineAddProvidedParams{
439 MachineID: m.MachineID,
440 ProviderID: deviceID,
441 Provider: model.ProviderEquinix,
442 }
443 klog.Infof("Setting \"provided\" tag (ID: %s, PID: %s, Provider: %s).", p.MachineID, p.ProviderID, p.Provider)
444 if err := q.MachineAddProvided(ctx, p); err != nil {
445 return fmt.Errorf("while tagging machine active: %w", err)
446 }
447 return nil
448 })
449}