blob: 5f19c6d4e7c68c3d83996ca9d696a25b7f931b0a [file] [log] [blame]
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +02001package manager
2
3import (
4 "context"
5 "errors"
6 "flag"
7 "fmt"
8 "net/netip"
9 "sort"
10 "time"
11
12 "github.com/google/uuid"
13 "golang.org/x/time/rate"
14 "k8s.io/klog/v2"
15
16 "source.monogon.dev/cloud/bmaas/bmdb"
17 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
18 "source.monogon.dev/cloud/bmaas/bmdb/model"
19 "source.monogon.dev/cloud/shepherd"
20 "source.monogon.dev/go/mflags"
21)
22
23// Provisioner implements the server provisioning logic. Provisioning entails
24// bringing all available machines (subject to limits) into BMDB.
25type Provisioner struct {
26 ProvisionerConfig
27 p shepherd.Provider
28}
29
30// ProvisionerConfig configures the provisioning process.
31type ProvisionerConfig struct {
32 // MaxCount is the maximum count of managed servers. No new devices will be
33 // created after reaching the limit. No attempt will be made to reduce the
34 // server count.
35 MaxCount uint
36
37 // ReconcileLoopLimiter limits the rate of the main reconciliation loop
38 // iterating.
39 ReconcileLoopLimiter *rate.Limiter
40
41 // DeviceCreation limits the rate at which devices are created.
42 DeviceCreationLimiter *rate.Limiter
43
44 // ChunkSize is how many machines will try to be spawned in a
45 // single reconciliation loop. Higher numbers allow for faster initial
46 // provisioning, but lower numbers decrease potential raciness with other systems
47 // and make sure that other parts of the reconciliation logic are ran regularly.
48 //
49 // 20 is decent starting point.
50 ChunkSize uint
51}
52
53func (pc *ProvisionerConfig) RegisterFlags() {
54 flag.UintVar(&pc.MaxCount, "provisioner_max_machines", 50, "Limit of machines that the provisioner will attempt to pull into the BMDB. Zero for no limit.")
55 mflags.Limiter(&pc.ReconcileLoopLimiter, "provisioner_reconciler_rate", "1m,1", "Rate limiting for main provisioner reconciliation loop")
56 mflags.Limiter(&pc.DeviceCreationLimiter, "provisioner_device_creation_rate", "5s,1", "Rate limiting for machine creation")
57 flag.UintVar(&pc.ChunkSize, "provisioner_reservation_chunk_size", 20, "How many machines will the provisioner attempt to create in a single reconciliation loop iteration")
58}
59
60func (pc *ProvisionerConfig) check() error {
61 // If these are unset, it's probably because someone is using us as a library.
62 // Provide error messages useful to code users instead of flag names.
63 if pc.ReconcileLoopLimiter == nil {
64 return fmt.Errorf("ReconcileLoopLimiter must be set")
65 }
66 if pc.DeviceCreationLimiter == nil {
67 return fmt.Errorf("DeviceCreationLimiter must be set")
68 }
69 if pc.ChunkSize == 0 {
70 return fmt.Errorf("ChunkSize must be set")
71 }
72 return nil
73}
74
75// NewProvisioner creates a Provisioner instance, checking ProvisionerConfig and
76// providerConfig for errors.
77func NewProvisioner(p shepherd.Provider, pc ProvisionerConfig) (*Provisioner, error) {
78 if err := pc.check(); err != nil {
79 return nil, err
80 }
81
82 return &Provisioner{
83 ProvisionerConfig: pc,
84 p: p,
85 }, nil
86}
87
88// Run the provisioner blocking the current goroutine until the given context
89// expires.
90func (p *Provisioner) Run(ctx context.Context, conn *bmdb.Connection) error {
91
92 var sess *bmdb.Session
93 var err error
94 for {
95 if sess == nil {
96 sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdProvisioner})
97 if err != nil {
98 return fmt.Errorf("could not start BMDB session: %w", err)
99 }
100 }
101 err = p.runInSession(ctx, sess)
102
103 switch {
104 case err == nil:
105 case errors.Is(err, ctx.Err()):
106 return err
107 case errors.Is(err, bmdb.ErrSessionExpired):
108 klog.Errorf("Session expired, restarting...")
109 sess = nil
110 time.Sleep(time.Second)
111 case err != nil:
112 klog.Errorf("Processing failed: %v", err)
113 // TODO(q3k): close session
114 time.Sleep(time.Second)
115 }
116 }
117}
118
119type machineListing struct {
120 machines []shepherd.Machine
121 err error
122}
123
124// runInSession executes one iteration of the provisioner's control loop within a
125// BMDB session. This control loop attempts to bring all capacity into machines in
126// the BMDB, subject to limits.
127func (p *Provisioner) runInSession(ctx context.Context, sess *bmdb.Session) error {
128 if err := p.ReconcileLoopLimiter.Wait(ctx); err != nil {
129 return err
130 }
131
132 providerC := make(chan *machineListing, 1)
133 bmdbC := make(chan *machineListing, 1)
134
135 klog.Infof("Getting provider and bmdb machines...")
136
137 // Make sub-context for two parallel operations, and so that we can cancel one
138 // immediately if the other fails.
139 subCtx, subCtxC := context.WithCancel(ctx)
140 defer subCtxC()
141
142 go func() {
143 machines, err := p.listInProvider(subCtx)
144 providerC <- &machineListing{
145 machines: machines,
146 err: err,
147 }
148 }()
149 go func() {
150 machines, err := p.listInBMDB(subCtx, sess)
151 bmdbC <- &machineListing{
152 machines: machines,
153 err: err,
154 }
155 }()
156 var inProvider, inBMDB *machineListing
157 for {
158 select {
159 case inProvider = <-providerC:
160 if err := inProvider.err; err != nil {
161 return fmt.Errorf("listing provider machines failed: %w", err)
162 }
163 klog.Infof("Got %d machines in provider.", len(inProvider.machines))
164 case inBMDB = <-bmdbC:
165 if err := inBMDB.err; err != nil {
166 return fmt.Errorf("listing BMDB machines failed: %w", err)
167 }
168 klog.Infof("Got %d machines in BMDB.", len(inBMDB.machines))
169 }
170 if inProvider != nil && inBMDB != nil {
171 break
172 }
173 }
174
175 subCtxC()
176 if err := p.reconcile(ctx, sess, inProvider.machines, inBMDB.machines); err != nil {
177 return fmt.Errorf("reconciliation failed: %w", err)
178 }
179 return nil
180}
181
182// listInProviders returns all machines that the provider thinks we should be
183// managing.
184func (p *Provisioner) listInProvider(ctx context.Context) ([]shepherd.Machine, error) {
185 machines, err := p.p.ListMachines(ctx)
186 if err != nil {
187 return nil, fmt.Errorf("while fetching managed machines: %w", err)
188 }
189 sort.Slice(machines, func(i, j int) bool {
190 return machines[i].ID() < machines[j].ID()
191 })
192 return machines, nil
193}
194
195type providedMachine struct {
196 model.MachineProvided
197}
198
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100199func (p providedMachine) Failed() bool {
200 if !p.MachineProvided.ProviderStatus.Valid {
201 // If we don't have any ProviderStatus to check for, return false
202 // to trigger the validation inside the reconciler loop.
203 return false
204 }
205 switch p.MachineProvided.ProviderStatus.ProviderStatus {
206 case model.ProviderStatusProvisioningFailedPermanent:
207 return true
208 }
209 return false
210}
211
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200212func (p providedMachine) ID() shepherd.ProviderID {
213 return shepherd.ProviderID(p.ProviderID)
214}
215
216func (p providedMachine) Addr() netip.Addr {
217 if !p.ProviderIpAddress.Valid {
218 return netip.Addr{}
219 }
220
221 addr, err := netip.ParseAddr(p.ProviderIpAddress.String)
222 if err != nil {
223 return netip.Addr{}
224 }
225 return addr
226}
227
228func (p providedMachine) State() shepherd.State {
229 return shepherd.StateKnownUsed
230}
231
232// listInBMDB returns all the machines that the BMDB thinks we should be managing.
233func (p *Provisioner) listInBMDB(ctx context.Context, sess *bmdb.Session) ([]shepherd.Machine, error) {
234 var res []shepherd.Machine
235 err := sess.Transact(ctx, func(q *model.Queries) error {
236 machines, err := q.GetProvidedMachines(ctx, p.p.Type())
237 if err != nil {
238 return err
239 }
240 res = make([]shepherd.Machine, 0, len(machines))
241 for _, machine := range machines {
242 _, err := uuid.Parse(machine.ProviderID)
243 if err != nil {
244 klog.Errorf("BMDB machine %s has unparseable provider ID %q", machine.MachineID, machine.ProviderID)
245 continue
246 }
247
248 res = append(res, providedMachine{machine})
249 }
250 return nil
251 })
252 if err != nil {
253 return nil, err
254 }
255 sort.Slice(res, func(i, j int) bool {
256 return res[i].ID() < res[j].ID()
257 })
258 return res, nil
259}
260
261// resolvePossiblyUsed checks if the state is set to possibly used and finds out
262// which state is the correct one.
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100263func (p *Provisioner) resolvePossiblyUsed(machine shepherd.Machine, providedMachines map[shepherd.ProviderID]shepherd.Machine) shepherd.State {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200264 state, id := machine.State(), machine.ID()
265
266 // Bail out if this isn't a possibly used state.
267 if state != shepherd.StatePossiblyUsed {
268 return state
269 }
270
271 // If a machine does not have a valid id, its always seen as unused.
272 if !id.IsValid() {
273 return shepherd.StateKnownUnused
274 }
275
276 // If the machine is not inside the bmdb, it's seen as unused.
277 if _, ok := providedMachines[id]; !ok {
278 return shepherd.StateKnownUnused
279 }
280
281 return shepherd.StateKnownUsed
282}
283
284// reconcile takes a list of machines that the provider thinks we should be
285// managing and that the BMDB thinks we should be managing, and tries to make
286// sense of that. First, some checks are performed across the two lists to make
287// sure we haven't dropped anything. Then, additional machines are deployed from
288// hardware reservations as needed.
289func (p *Provisioner) reconcile(ctx context.Context, sess *bmdb.Session, inProvider, bmdbMachines []shepherd.Machine) error {
290 klog.Infof("Reconciling...")
291
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100292 bmdb := make(map[shepherd.ProviderID]shepherd.Machine)
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200293 for _, machine := range bmdbMachines {
294 // Dont check the state here as its hardcoded to be known used.
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100295 bmdb[machine.ID()] = machine
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200296 }
297
298 var availableMachines []shepherd.Machine
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100299 provider := make(map[shepherd.ProviderID]shepherd.Machine)
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200300 for _, machine := range inProvider {
301 state := p.resolvePossiblyUsed(machine, bmdb)
302
303 switch state {
304 case shepherd.StateKnownUnused:
305 availableMachines = append(availableMachines, machine)
306
307 case shepherd.StateKnownUsed:
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100308 provider[machine.ID()] = machine
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200309
310 default:
311 return fmt.Errorf("machine has invalid state (ID: %s, Addr: %s): %s", machine.ID(), machine.Addr(), state)
312 }
313 }
314
315 managed := make(map[shepherd.ProviderID]bool)
316
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100317 // We discovered that a machine mostly fails either when provisioning or
318 // deprovisioning. A already deployed and running machine can only switch
319 // into failed state if any api interaction happend, e.g. rebooting the
320 // machine into recovery mode. If such a machine is returned to the
321 // reconciling loop, it will trigger the badbadnotgood safety switch and
322 // return with an error. To reduce the manual intervention required we
323 // filter out these machines on both sides (bmdb and provider).
324 isBadBadNotGood := func(known map[shepherd.ProviderID]shepherd.Machine, machine shepherd.Machine) bool {
325 // If the machine is missing and not failed, its a bad case.
326 if known[machine.ID()] == nil && !machine.Failed() {
327 return true
328 }
329 return false
330 }
331
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200332 // Some desynchronization between the BMDB and Provider point of view might be so
333 // bad we shouldn't attempt to do any work, at least not any time soon.
334 badbadnotgood := false
335
336 // Find any machines supposedly managed by us in the provider, but not in the
337 // BMDB.
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100338 for id, machine := range provider {
339 if isBadBadNotGood(bmdb, machine) {
340 klog.Errorf("Provider machine has no corresponding machine in BMDB. (PID: %s)", id)
341 badbadnotgood = true
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200342 continue
343 }
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100344
345 managed[id] = true
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200346 }
347
348 // Find any machines in the BMDB but not in the provider.
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100349 for id, machine := range bmdb {
350 if isBadBadNotGood(provider, machine) {
351 klog.Errorf("Provider machine referred to in BMDB but missing in provider. (PID: %s)", id)
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200352 badbadnotgood = true
353 }
354 }
355
356 // Bail if things are weird.
357 if badbadnotgood {
358 klog.Errorf("Something's very wrong. Bailing early and refusing to do any work.")
359 return fmt.Errorf("fatal discrepency between BMDB and provider")
360 }
361
362 // Summarize all managed machines, which is the intersection of BMDB and
363 // Provisioner machines, usually both of these sets being equal.
364 nmanaged := len(managed)
365 klog.Infof("Total managed machines: %d", nmanaged)
366
367 if p.MaxCount != 0 && p.MaxCount <= uint(nmanaged) {
368 klog.Infof("Not bringing up more machines (at limit of %d machines)", p.MaxCount)
369 return nil
370 }
371
372 limitName := "no limit"
373 if p.MaxCount != 0 {
374 limitName = fmt.Sprintf("%d", p.MaxCount)
375 }
376 klog.Infof("Below managed machine limit (%s), bringing up more...", limitName)
377
378 if len(availableMachines) == 0 {
379 klog.Infof("No more capacity available.")
380 return nil
381 }
382
383 toProvision := availableMachines
384 // Limit them to MaxCount, if applicable.
385 if p.MaxCount != 0 {
386 needed := int(p.MaxCount) - nmanaged
387 if len(toProvision) < needed {
388 needed = len(toProvision)
389 }
390 toProvision = toProvision[:needed]
391 }
392
393 // Limit them to an arbitrary 'chunk' size so that we don't do too many things in
394 // a single reconciliation operation.
395 if uint(len(toProvision)) > p.ChunkSize {
396 toProvision = toProvision[:p.ChunkSize]
397 }
398
399 if len(toProvision) == 0 {
400 klog.Infof("No more unused machines available, or all filtered out.")
401 return nil
402 }
403
404 klog.Infof("Bringing up %d machines...", len(toProvision))
405 for _, machine := range toProvision {
406 if err := p.DeviceCreationLimiter.Wait(ctx); err != nil {
407 return err
408 }
409
410 nd, err := p.p.CreateMachine(ctx, sess, shepherd.CreateMachineRequest{
411 UnusedMachine: machine,
412 })
413 if err != nil {
414 klog.Errorf("while creating new device (ID: %s, Addr: %s, State: %s): %w", machine.ID(), machine.Addr(), machine.State(), err)
415 continue
416 }
417 klog.Infof("Created new machine with ID: %s", nd.ID())
418 }
419
420 return nil
421}