blob: a77f2414b0bf2e5fb609a7bc5d710df6b25e00a2 [file] [log] [blame]
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +02001package manager
2
3import (
4 "context"
5 "errors"
6 "flag"
7 "fmt"
8 "net/netip"
9 "sort"
10 "time"
11
12 "github.com/google/uuid"
13 "golang.org/x/time/rate"
14 "k8s.io/klog/v2"
15
16 "source.monogon.dev/cloud/bmaas/bmdb"
17 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
18 "source.monogon.dev/cloud/bmaas/bmdb/model"
19 "source.monogon.dev/cloud/shepherd"
20 "source.monogon.dev/go/mflags"
21)
22
23// Provisioner implements the server provisioning logic. Provisioning entails
24// bringing all available machines (subject to limits) into BMDB.
25type Provisioner struct {
26 ProvisionerConfig
27 p shepherd.Provider
28}
29
30// ProvisionerConfig configures the provisioning process.
31type ProvisionerConfig struct {
32 // MaxCount is the maximum count of managed servers. No new devices will be
33 // created after reaching the limit. No attempt will be made to reduce the
34 // server count.
35 MaxCount uint
36
37 // ReconcileLoopLimiter limits the rate of the main reconciliation loop
38 // iterating.
39 ReconcileLoopLimiter *rate.Limiter
40
41 // DeviceCreation limits the rate at which devices are created.
42 DeviceCreationLimiter *rate.Limiter
43
44 // ChunkSize is how many machines will try to be spawned in a
45 // single reconciliation loop. Higher numbers allow for faster initial
46 // provisioning, but lower numbers decrease potential raciness with other systems
47 // and make sure that other parts of the reconciliation logic are ran regularly.
48 //
49 // 20 is decent starting point.
50 ChunkSize uint
51}
52
53func (pc *ProvisionerConfig) RegisterFlags() {
54 flag.UintVar(&pc.MaxCount, "provisioner_max_machines", 50, "Limit of machines that the provisioner will attempt to pull into the BMDB. Zero for no limit.")
55 mflags.Limiter(&pc.ReconcileLoopLimiter, "provisioner_reconciler_rate", "1m,1", "Rate limiting for main provisioner reconciliation loop")
56 mflags.Limiter(&pc.DeviceCreationLimiter, "provisioner_device_creation_rate", "5s,1", "Rate limiting for machine creation")
57 flag.UintVar(&pc.ChunkSize, "provisioner_reservation_chunk_size", 20, "How many machines will the provisioner attempt to create in a single reconciliation loop iteration")
58}
59
60func (pc *ProvisionerConfig) check() error {
61 // If these are unset, it's probably because someone is using us as a library.
62 // Provide error messages useful to code users instead of flag names.
63 if pc.ReconcileLoopLimiter == nil {
64 return fmt.Errorf("ReconcileLoopLimiter must be set")
65 }
66 if pc.DeviceCreationLimiter == nil {
67 return fmt.Errorf("DeviceCreationLimiter must be set")
68 }
69 if pc.ChunkSize == 0 {
70 return fmt.Errorf("ChunkSize must be set")
71 }
72 return nil
73}
74
75// NewProvisioner creates a Provisioner instance, checking ProvisionerConfig and
76// providerConfig for errors.
77func NewProvisioner(p shepherd.Provider, pc ProvisionerConfig) (*Provisioner, error) {
78 if err := pc.check(); err != nil {
79 return nil, err
80 }
81
82 return &Provisioner{
83 ProvisionerConfig: pc,
84 p: p,
85 }, nil
86}
87
88// Run the provisioner blocking the current goroutine until the given context
89// expires.
90func (p *Provisioner) Run(ctx context.Context, conn *bmdb.Connection) error {
91
92 var sess *bmdb.Session
93 var err error
94 for {
95 if sess == nil {
96 sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdProvisioner})
97 if err != nil {
98 return fmt.Errorf("could not start BMDB session: %w", err)
99 }
100 }
101 err = p.runInSession(ctx, sess)
102
103 switch {
104 case err == nil:
105 case errors.Is(err, ctx.Err()):
106 return err
107 case errors.Is(err, bmdb.ErrSessionExpired):
108 klog.Errorf("Session expired, restarting...")
109 sess = nil
110 time.Sleep(time.Second)
111 case err != nil:
112 klog.Errorf("Processing failed: %v", err)
113 // TODO(q3k): close session
114 time.Sleep(time.Second)
115 }
116 }
117}
118
119type machineListing struct {
120 machines []shepherd.Machine
121 err error
122}
123
124// runInSession executes one iteration of the provisioner's control loop within a
125// BMDB session. This control loop attempts to bring all capacity into machines in
126// the BMDB, subject to limits.
127func (p *Provisioner) runInSession(ctx context.Context, sess *bmdb.Session) error {
128 if err := p.ReconcileLoopLimiter.Wait(ctx); err != nil {
129 return err
130 }
131
132 providerC := make(chan *machineListing, 1)
133 bmdbC := make(chan *machineListing, 1)
134
135 klog.Infof("Getting provider and bmdb machines...")
136
137 // Make sub-context for two parallel operations, and so that we can cancel one
138 // immediately if the other fails.
139 subCtx, subCtxC := context.WithCancel(ctx)
140 defer subCtxC()
141
142 go func() {
143 machines, err := p.listInProvider(subCtx)
144 providerC <- &machineListing{
145 machines: machines,
146 err: err,
147 }
148 }()
149 go func() {
150 machines, err := p.listInBMDB(subCtx, sess)
151 bmdbC <- &machineListing{
152 machines: machines,
153 err: err,
154 }
155 }()
156 var inProvider, inBMDB *machineListing
157 for {
158 select {
159 case inProvider = <-providerC:
160 if err := inProvider.err; err != nil {
161 return fmt.Errorf("listing provider machines failed: %w", err)
162 }
163 klog.Infof("Got %d machines in provider.", len(inProvider.machines))
164 case inBMDB = <-bmdbC:
165 if err := inBMDB.err; err != nil {
166 return fmt.Errorf("listing BMDB machines failed: %w", err)
167 }
168 klog.Infof("Got %d machines in BMDB.", len(inBMDB.machines))
169 }
170 if inProvider != nil && inBMDB != nil {
171 break
172 }
173 }
174
175 subCtxC()
176 if err := p.reconcile(ctx, sess, inProvider.machines, inBMDB.machines); err != nil {
177 return fmt.Errorf("reconciliation failed: %w", err)
178 }
179 return nil
180}
181
182// listInProviders returns all machines that the provider thinks we should be
183// managing.
184func (p *Provisioner) listInProvider(ctx context.Context) ([]shepherd.Machine, error) {
185 machines, err := p.p.ListMachines(ctx)
186 if err != nil {
187 return nil, fmt.Errorf("while fetching managed machines: %w", err)
188 }
189 sort.Slice(machines, func(i, j int) bool {
190 return machines[i].ID() < machines[j].ID()
191 })
192 return machines, nil
193}
194
195type providedMachine struct {
196 model.MachineProvided
197}
198
199func (p providedMachine) ID() shepherd.ProviderID {
200 return shepherd.ProviderID(p.ProviderID)
201}
202
203func (p providedMachine) Addr() netip.Addr {
204 if !p.ProviderIpAddress.Valid {
205 return netip.Addr{}
206 }
207
208 addr, err := netip.ParseAddr(p.ProviderIpAddress.String)
209 if err != nil {
210 return netip.Addr{}
211 }
212 return addr
213}
214
215func (p providedMachine) State() shepherd.State {
216 return shepherd.StateKnownUsed
217}
218
219// listInBMDB returns all the machines that the BMDB thinks we should be managing.
220func (p *Provisioner) listInBMDB(ctx context.Context, sess *bmdb.Session) ([]shepherd.Machine, error) {
221 var res []shepherd.Machine
222 err := sess.Transact(ctx, func(q *model.Queries) error {
223 machines, err := q.GetProvidedMachines(ctx, p.p.Type())
224 if err != nil {
225 return err
226 }
227 res = make([]shepherd.Machine, 0, len(machines))
228 for _, machine := range machines {
229 _, err := uuid.Parse(machine.ProviderID)
230 if err != nil {
231 klog.Errorf("BMDB machine %s has unparseable provider ID %q", machine.MachineID, machine.ProviderID)
232 continue
233 }
234
235 res = append(res, providedMachine{machine})
236 }
237 return nil
238 })
239 if err != nil {
240 return nil, err
241 }
242 sort.Slice(res, func(i, j int) bool {
243 return res[i].ID() < res[j].ID()
244 })
245 return res, nil
246}
247
248// resolvePossiblyUsed checks if the state is set to possibly used and finds out
249// which state is the correct one.
250func (p *Provisioner) resolvePossiblyUsed(machine shepherd.Machine, providedMachines map[shepherd.ProviderID]bool) shepherd.State {
251 state, id := machine.State(), machine.ID()
252
253 // Bail out if this isn't a possibly used state.
254 if state != shepherd.StatePossiblyUsed {
255 return state
256 }
257
258 // If a machine does not have a valid id, its always seen as unused.
259 if !id.IsValid() {
260 return shepherd.StateKnownUnused
261 }
262
263 // If the machine is not inside the bmdb, it's seen as unused.
264 if _, ok := providedMachines[id]; !ok {
265 return shepherd.StateKnownUnused
266 }
267
268 return shepherd.StateKnownUsed
269}
270
271// reconcile takes a list of machines that the provider thinks we should be
272// managing and that the BMDB thinks we should be managing, and tries to make
273// sense of that. First, some checks are performed across the two lists to make
274// sure we haven't dropped anything. Then, additional machines are deployed from
275// hardware reservations as needed.
276func (p *Provisioner) reconcile(ctx context.Context, sess *bmdb.Session, inProvider, bmdbMachines []shepherd.Machine) error {
277 klog.Infof("Reconciling...")
278
279 bmdb := make(map[shepherd.ProviderID]bool)
280 for _, machine := range bmdbMachines {
281 // Dont check the state here as its hardcoded to be known used.
282 bmdb[machine.ID()] = true
283 }
284
285 var availableMachines []shepherd.Machine
286 provider := make(map[shepherd.ProviderID]bool)
287 for _, machine := range inProvider {
288 state := p.resolvePossiblyUsed(machine, bmdb)
289
290 switch state {
291 case shepherd.StateKnownUnused:
292 availableMachines = append(availableMachines, machine)
293
294 case shepherd.StateKnownUsed:
295 provider[machine.ID()] = true
296
297 default:
298 return fmt.Errorf("machine has invalid state (ID: %s, Addr: %s): %s", machine.ID(), machine.Addr(), state)
299 }
300 }
301
302 managed := make(map[shepherd.ProviderID]bool)
303
304 // Some desynchronization between the BMDB and Provider point of view might be so
305 // bad we shouldn't attempt to do any work, at least not any time soon.
306 badbadnotgood := false
307
308 // Find any machines supposedly managed by us in the provider, but not in the
309 // BMDB.
310 for machine, _ := range provider {
311 if bmdb[machine] {
312 managed[machine] = true
313 continue
314 }
315 klog.Errorf("Provider machine %s has no corresponding machine in BMDB.", machine)
316 badbadnotgood = true
317 }
318
319 // Find any machines in the BMDB but not in the provider.
320 for machine, _ := range bmdb {
321 if !provider[machine] {
322 klog.Errorf("Provider device ID %s referred to in BMDB (from TODO) but missing in provider.", machine)
323 badbadnotgood = true
324 }
325 }
326
327 // Bail if things are weird.
328 if badbadnotgood {
329 klog.Errorf("Something's very wrong. Bailing early and refusing to do any work.")
330 return fmt.Errorf("fatal discrepency between BMDB and provider")
331 }
332
333 // Summarize all managed machines, which is the intersection of BMDB and
334 // Provisioner machines, usually both of these sets being equal.
335 nmanaged := len(managed)
336 klog.Infof("Total managed machines: %d", nmanaged)
337
338 if p.MaxCount != 0 && p.MaxCount <= uint(nmanaged) {
339 klog.Infof("Not bringing up more machines (at limit of %d machines)", p.MaxCount)
340 return nil
341 }
342
343 limitName := "no limit"
344 if p.MaxCount != 0 {
345 limitName = fmt.Sprintf("%d", p.MaxCount)
346 }
347 klog.Infof("Below managed machine limit (%s), bringing up more...", limitName)
348
349 if len(availableMachines) == 0 {
350 klog.Infof("No more capacity available.")
351 return nil
352 }
353
354 toProvision := availableMachines
355 // Limit them to MaxCount, if applicable.
356 if p.MaxCount != 0 {
357 needed := int(p.MaxCount) - nmanaged
358 if len(toProvision) < needed {
359 needed = len(toProvision)
360 }
361 toProvision = toProvision[:needed]
362 }
363
364 // Limit them to an arbitrary 'chunk' size so that we don't do too many things in
365 // a single reconciliation operation.
366 if uint(len(toProvision)) > p.ChunkSize {
367 toProvision = toProvision[:p.ChunkSize]
368 }
369
370 if len(toProvision) == 0 {
371 klog.Infof("No more unused machines available, or all filtered out.")
372 return nil
373 }
374
375 klog.Infof("Bringing up %d machines...", len(toProvision))
376 for _, machine := range toProvision {
377 if err := p.DeviceCreationLimiter.Wait(ctx); err != nil {
378 return err
379 }
380
381 nd, err := p.p.CreateMachine(ctx, sess, shepherd.CreateMachineRequest{
382 UnusedMachine: machine,
383 })
384 if err != nil {
385 klog.Errorf("while creating new device (ID: %s, Addr: %s, State: %s): %w", machine.ID(), machine.Addr(), machine.State(), err)
386 continue
387 }
388 klog.Infof("Created new machine with ID: %s", nd.ID())
389 }
390
391 return nil
392}