blob: e13c63e01ee35dad2d57fd34e31946031e3b7399 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +02004package manager
5
6import (
7 "context"
8 "errors"
9 "flag"
10 "fmt"
11 "net/netip"
12 "sort"
13 "time"
14
15 "github.com/google/uuid"
16 "golang.org/x/time/rate"
17 "k8s.io/klog/v2"
18
19 "source.monogon.dev/cloud/bmaas/bmdb"
20 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
21 "source.monogon.dev/cloud/bmaas/bmdb/model"
22 "source.monogon.dev/cloud/shepherd"
23 "source.monogon.dev/go/mflags"
24)
25
26// Provisioner implements the server provisioning logic. Provisioning entails
27// bringing all available machines (subject to limits) into BMDB.
28type Provisioner struct {
29 ProvisionerConfig
30 p shepherd.Provider
31}
32
33// ProvisionerConfig configures the provisioning process.
34type ProvisionerConfig struct {
35 // MaxCount is the maximum count of managed servers. No new devices will be
36 // created after reaching the limit. No attempt will be made to reduce the
37 // server count.
38 MaxCount uint
39
40 // ReconcileLoopLimiter limits the rate of the main reconciliation loop
41 // iterating.
42 ReconcileLoopLimiter *rate.Limiter
43
44 // DeviceCreation limits the rate at which devices are created.
45 DeviceCreationLimiter *rate.Limiter
46
47 // ChunkSize is how many machines will try to be spawned in a
48 // single reconciliation loop. Higher numbers allow for faster initial
49 // provisioning, but lower numbers decrease potential raciness with other systems
50 // and make sure that other parts of the reconciliation logic are ran regularly.
51 //
52 // 20 is decent starting point.
53 ChunkSize uint
54}
55
56func (pc *ProvisionerConfig) RegisterFlags() {
57 flag.UintVar(&pc.MaxCount, "provisioner_max_machines", 50, "Limit of machines that the provisioner will attempt to pull into the BMDB. Zero for no limit.")
58 mflags.Limiter(&pc.ReconcileLoopLimiter, "provisioner_reconciler_rate", "1m,1", "Rate limiting for main provisioner reconciliation loop")
59 mflags.Limiter(&pc.DeviceCreationLimiter, "provisioner_device_creation_rate", "5s,1", "Rate limiting for machine creation")
60 flag.UintVar(&pc.ChunkSize, "provisioner_reservation_chunk_size", 20, "How many machines will the provisioner attempt to create in a single reconciliation loop iteration")
61}
62
63func (pc *ProvisionerConfig) check() error {
64 // If these are unset, it's probably because someone is using us as a library.
65 // Provide error messages useful to code users instead of flag names.
66 if pc.ReconcileLoopLimiter == nil {
67 return fmt.Errorf("ReconcileLoopLimiter must be set")
68 }
69 if pc.DeviceCreationLimiter == nil {
70 return fmt.Errorf("DeviceCreationLimiter must be set")
71 }
72 if pc.ChunkSize == 0 {
73 return fmt.Errorf("ChunkSize must be set")
74 }
75 return nil
76}
77
78// NewProvisioner creates a Provisioner instance, checking ProvisionerConfig and
79// providerConfig for errors.
80func NewProvisioner(p shepherd.Provider, pc ProvisionerConfig) (*Provisioner, error) {
81 if err := pc.check(); err != nil {
82 return nil, err
83 }
84
85 return &Provisioner{
86 ProvisionerConfig: pc,
87 p: p,
88 }, nil
89}
90
91// Run the provisioner blocking the current goroutine until the given context
92// expires.
93func (p *Provisioner) Run(ctx context.Context, conn *bmdb.Connection) error {
94
95 var sess *bmdb.Session
96 var err error
97 for {
98 if sess == nil {
99 sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdProvisioner})
100 if err != nil {
101 return fmt.Errorf("could not start BMDB session: %w", err)
102 }
103 }
104 err = p.runInSession(ctx, sess)
105
106 switch {
107 case err == nil:
108 case errors.Is(err, ctx.Err()):
109 return err
110 case errors.Is(err, bmdb.ErrSessionExpired):
111 klog.Errorf("Session expired, restarting...")
112 sess = nil
113 time.Sleep(time.Second)
Tim Windelschmidt438ae2e2024-04-11 23:23:12 +0200114 default:
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200115 klog.Errorf("Processing failed: %v", err)
116 // TODO(q3k): close session
117 time.Sleep(time.Second)
118 }
119 }
120}
121
122type machineListing struct {
123 machines []shepherd.Machine
124 err error
125}
126
127// runInSession executes one iteration of the provisioner's control loop within a
128// BMDB session. This control loop attempts to bring all capacity into machines in
129// the BMDB, subject to limits.
130func (p *Provisioner) runInSession(ctx context.Context, sess *bmdb.Session) error {
131 if err := p.ReconcileLoopLimiter.Wait(ctx); err != nil {
132 return err
133 }
134
135 providerC := make(chan *machineListing, 1)
136 bmdbC := make(chan *machineListing, 1)
137
138 klog.Infof("Getting provider and bmdb machines...")
139
140 // Make sub-context for two parallel operations, and so that we can cancel one
141 // immediately if the other fails.
142 subCtx, subCtxC := context.WithCancel(ctx)
143 defer subCtxC()
144
145 go func() {
146 machines, err := p.listInProvider(subCtx)
147 providerC <- &machineListing{
148 machines: machines,
149 err: err,
150 }
151 }()
152 go func() {
153 machines, err := p.listInBMDB(subCtx, sess)
154 bmdbC <- &machineListing{
155 machines: machines,
156 err: err,
157 }
158 }()
159 var inProvider, inBMDB *machineListing
160 for {
161 select {
162 case inProvider = <-providerC:
163 if err := inProvider.err; err != nil {
164 return fmt.Errorf("listing provider machines failed: %w", err)
165 }
166 klog.Infof("Got %d machines in provider.", len(inProvider.machines))
167 case inBMDB = <-bmdbC:
168 if err := inBMDB.err; err != nil {
169 return fmt.Errorf("listing BMDB machines failed: %w", err)
170 }
171 klog.Infof("Got %d machines in BMDB.", len(inBMDB.machines))
172 }
173 if inProvider != nil && inBMDB != nil {
174 break
175 }
176 }
177
178 subCtxC()
179 if err := p.reconcile(ctx, sess, inProvider.machines, inBMDB.machines); err != nil {
180 return fmt.Errorf("reconciliation failed: %w", err)
181 }
182 return nil
183}
184
185// listInProviders returns all machines that the provider thinks we should be
186// managing.
187func (p *Provisioner) listInProvider(ctx context.Context) ([]shepherd.Machine, error) {
188 machines, err := p.p.ListMachines(ctx)
189 if err != nil {
190 return nil, fmt.Errorf("while fetching managed machines: %w", err)
191 }
192 sort.Slice(machines, func(i, j int) bool {
193 return machines[i].ID() < machines[j].ID()
194 })
195 return machines, nil
196}
197
198type providedMachine struct {
199 model.MachineProvided
200}
201
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100202func (p providedMachine) Failed() bool {
203 if !p.MachineProvided.ProviderStatus.Valid {
204 // If we don't have any ProviderStatus to check for, return false
205 // to trigger the validation inside the reconciler loop.
206 return false
207 }
208 switch p.MachineProvided.ProviderStatus.ProviderStatus {
209 case model.ProviderStatusProvisioningFailedPermanent:
210 return true
211 }
212 return false
213}
214
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200215func (p providedMachine) ID() shepherd.ProviderID {
216 return shepherd.ProviderID(p.ProviderID)
217}
218
219func (p providedMachine) Addr() netip.Addr {
220 if !p.ProviderIpAddress.Valid {
221 return netip.Addr{}
222 }
223
224 addr, err := netip.ParseAddr(p.ProviderIpAddress.String)
225 if err != nil {
226 return netip.Addr{}
227 }
228 return addr
229}
230
Tim Windelschmidtc4dd0032024-02-19 13:13:31 +0100231func (p providedMachine) Availability() shepherd.Availability {
232 return shepherd.AvailabilityKnownUsed
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200233}
234
235// listInBMDB returns all the machines that the BMDB thinks we should be managing.
236func (p *Provisioner) listInBMDB(ctx context.Context, sess *bmdb.Session) ([]shepherd.Machine, error) {
237 var res []shepherd.Machine
238 err := sess.Transact(ctx, func(q *model.Queries) error {
239 machines, err := q.GetProvidedMachines(ctx, p.p.Type())
240 if err != nil {
241 return err
242 }
243 res = make([]shepherd.Machine, 0, len(machines))
244 for _, machine := range machines {
245 _, err := uuid.Parse(machine.ProviderID)
246 if err != nil {
247 klog.Errorf("BMDB machine %s has unparseable provider ID %q", machine.MachineID, machine.ProviderID)
248 continue
249 }
250
251 res = append(res, providedMachine{machine})
252 }
253 return nil
254 })
255 if err != nil {
256 return nil, err
257 }
258 sort.Slice(res, func(i, j int) bool {
259 return res[i].ID() < res[j].ID()
260 })
261 return res, nil
262}
263
Tim Windelschmidtc4dd0032024-02-19 13:13:31 +0100264// resolvePossiblyUsed checks if the availability is set to possibly used and
265// resolves it to the correct one.
266func (p *Provisioner) resolvePossiblyUsed(machine shepherd.Machine, providedMachines map[shepherd.ProviderID]shepherd.Machine) shepherd.Availability {
267 state, id := machine.Availability(), machine.ID()
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200268
Tim Windelschmidtc4dd0032024-02-19 13:13:31 +0100269 // Bail out if this isn't possibly used.
270 if state != shepherd.AvailabilityPossiblyUsed {
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200271 return state
272 }
273
274 // If a machine does not have a valid id, its always seen as unused.
275 if !id.IsValid() {
Tim Windelschmidtc4dd0032024-02-19 13:13:31 +0100276 return shepherd.AvailabilityKnownUnused
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200277 }
278
279 // If the machine is not inside the bmdb, it's seen as unused.
280 if _, ok := providedMachines[id]; !ok {
Tim Windelschmidtc4dd0032024-02-19 13:13:31 +0100281 return shepherd.AvailabilityKnownUnused
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200282 }
283
Tim Windelschmidtc4dd0032024-02-19 13:13:31 +0100284 return shepherd.AvailabilityKnownUsed
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200285}
286
287// reconcile takes a list of machines that the provider thinks we should be
288// managing and that the BMDB thinks we should be managing, and tries to make
289// sense of that. First, some checks are performed across the two lists to make
290// sure we haven't dropped anything. Then, additional machines are deployed from
291// hardware reservations as needed.
292func (p *Provisioner) reconcile(ctx context.Context, sess *bmdb.Session, inProvider, bmdbMachines []shepherd.Machine) error {
293 klog.Infof("Reconciling...")
294
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100295 bmdb := make(map[shepherd.ProviderID]shepherd.Machine)
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200296 for _, machine := range bmdbMachines {
Tim Windelschmidtc4dd0032024-02-19 13:13:31 +0100297 // Dont check the availability here as its hardcoded to be known used.
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100298 bmdb[machine.ID()] = machine
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200299 }
300
301 var availableMachines []shepherd.Machine
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100302 provider := make(map[shepherd.ProviderID]shepherd.Machine)
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200303 for _, machine := range inProvider {
304 state := p.resolvePossiblyUsed(machine, bmdb)
305
306 switch state {
Tim Windelschmidtc4dd0032024-02-19 13:13:31 +0100307 case shepherd.AvailabilityKnownUnused:
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200308 availableMachines = append(availableMachines, machine)
309
Tim Windelschmidtc4dd0032024-02-19 13:13:31 +0100310 case shepherd.AvailabilityKnownUsed:
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100311 provider[machine.ID()] = machine
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200312
313 default:
Tim Windelschmidtc4dd0032024-02-19 13:13:31 +0100314 return fmt.Errorf("machine has invalid availability (ID: %s, Addr: %s): %s", machine.ID(), machine.Addr(), state)
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200315 }
316 }
317
318 managed := make(map[shepherd.ProviderID]bool)
319
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100320 // We discovered that a machine mostly fails either when provisioning or
321 // deprovisioning. A already deployed and running machine can only switch
322 // into failed state if any api interaction happend, e.g. rebooting the
323 // machine into recovery mode. If such a machine is returned to the
324 // reconciling loop, it will trigger the badbadnotgood safety switch and
325 // return with an error. To reduce the manual intervention required we
326 // filter out these machines on both sides (bmdb and provider).
327 isBadBadNotGood := func(known map[shepherd.ProviderID]shepherd.Machine, machine shepherd.Machine) bool {
328 // If the machine is missing and not failed, its a bad case.
329 if known[machine.ID()] == nil && !machine.Failed() {
330 return true
331 }
332 return false
333 }
334
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200335 // Some desynchronization between the BMDB and Provider point of view might be so
336 // bad we shouldn't attempt to do any work, at least not any time soon.
337 badbadnotgood := false
338
339 // Find any machines supposedly managed by us in the provider, but not in the
340 // BMDB.
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100341 for id, machine := range provider {
342 if isBadBadNotGood(bmdb, machine) {
343 klog.Errorf("Provider machine has no corresponding machine in BMDB. (PID: %s)", id)
344 badbadnotgood = true
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200345 continue
346 }
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100347
348 managed[id] = true
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200349 }
350
351 // Find any machines in the BMDB but not in the provider.
Tim Windelschmidtfdd87ab2023-12-07 18:03:21 +0100352 for id, machine := range bmdb {
353 if isBadBadNotGood(provider, machine) {
354 klog.Errorf("Provider machine referred to in BMDB but missing in provider. (PID: %s)", id)
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200355 badbadnotgood = true
356 }
357 }
358
359 // Bail if things are weird.
360 if badbadnotgood {
361 klog.Errorf("Something's very wrong. Bailing early and refusing to do any work.")
362 return fmt.Errorf("fatal discrepency between BMDB and provider")
363 }
364
365 // Summarize all managed machines, which is the intersection of BMDB and
366 // Provisioner machines, usually both of these sets being equal.
367 nmanaged := len(managed)
368 klog.Infof("Total managed machines: %d", nmanaged)
369
370 if p.MaxCount != 0 && p.MaxCount <= uint(nmanaged) {
371 klog.Infof("Not bringing up more machines (at limit of %d machines)", p.MaxCount)
372 return nil
373 }
374
375 limitName := "no limit"
376 if p.MaxCount != 0 {
377 limitName = fmt.Sprintf("%d", p.MaxCount)
378 }
379 klog.Infof("Below managed machine limit (%s), bringing up more...", limitName)
380
381 if len(availableMachines) == 0 {
382 klog.Infof("No more capacity available.")
383 return nil
384 }
385
386 toProvision := availableMachines
387 // Limit them to MaxCount, if applicable.
388 if p.MaxCount != 0 {
389 needed := int(p.MaxCount) - nmanaged
390 if len(toProvision) < needed {
391 needed = len(toProvision)
392 }
393 toProvision = toProvision[:needed]
394 }
395
396 // Limit them to an arbitrary 'chunk' size so that we don't do too many things in
397 // a single reconciliation operation.
398 if uint(len(toProvision)) > p.ChunkSize {
399 toProvision = toProvision[:p.ChunkSize]
400 }
401
402 if len(toProvision) == 0 {
403 klog.Infof("No more unused machines available, or all filtered out.")
404 return nil
405 }
406
407 klog.Infof("Bringing up %d machines...", len(toProvision))
408 for _, machine := range toProvision {
409 if err := p.DeviceCreationLimiter.Wait(ctx); err != nil {
410 return err
411 }
412
413 nd, err := p.p.CreateMachine(ctx, sess, shepherd.CreateMachineRequest{
414 UnusedMachine: machine,
415 })
416 if err != nil {
Tim Windelschmidt690511d2024-04-22 19:10:29 +0200417 klog.Errorf("while creating new device (ID: %s, Addr: %s, Availability: %s): %v", machine.ID(), machine.Addr(), machine.Availability(), err)
Tim Windelschmidtb6308cd2023-10-10 21:19:03 +0200418 continue
419 }
420 klog.Infof("Created new machine with ID: %s", nd.ID())
421 }
422
423 return nil
424}