blob: aa4138ecc6e6a6a82e26a1a51139812bb56ada8d [file] [log] [blame]
Serge Bazanski86a714d2023-04-17 15:54:21 +02001package manager
2
3import (
4 "context"
5 "errors"
6 "flag"
7 "fmt"
8 "strconv"
9 "strings"
10 "time"
11
12 "github.com/google/uuid"
13 "golang.org/x/sync/errgroup"
14 "golang.org/x/time/rate"
15 "k8s.io/klog/v2"
16
17 "source.monogon.dev/cloud/bmaas/bmdb"
18 "source.monogon.dev/cloud/bmaas/bmdb/model"
19)
20
21// task describes a single server currently being processed by a control loop.
22type task struct {
23 // machine is the machine data (including provider and provider ID) retrieved
24 // from the BMDB.
25 machine *model.MachineProvided
26 // work is a machine lock facilitated by BMDB that prevents machines from
27 // being processed by multiple workers at the same time.
28 work *bmdb.Work
29}
30
31// controlLoop is implemented by any component which should act as a BMDB-based
32// control loop. Implementing these methods allows the given component to be
33// started using RunControlLoop.
34type controlLoop interface {
35 // getMachines must return the list of machines ready to be processed by the
36 // control loop for a given control loop implementation.
37 getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error)
38 // processMachine will be called within the scope of an active task/BMDB work by
39 // the control loop logic.
40 processMachine(ctx context.Context, t *task) error
41
42 // getControlLoopConfig is implemented by ControlLoopConfig which should be
43 // embedded by the control loop component. If not embedded, this method will have
44 // to be implemented, too.
45 getControlLoopConfig() *ControlLoopConfig
46}
47
48// ControlLoopConfig should be embedded the every component which acts as a
49// control loop. RegisterFlags should be called by the component whenever it is
50// registering its own flags. Check should be called whenever the component is
51// instantiated, after RegisterFlags has been called.
52type ControlLoopConfig struct {
53 // DBQueryLimiter limits the rate at which BMDB is queried for servers ready
54 // for BMaaS agent initialization. Must be set.
55 DBQueryLimiter *rate.Limiter
56
57 // Parallelism is how many instances of the Initializer will be allowed to run in
58 // parallel against the BMDB. This speeds up the process of starting/restarting
59 // agents significantly, as one initializer instance can handle at most one agent
60 // (re)starting process.
61 //
62 // If not set (ie. 0), default to 1. A good starting value for production
63 // deployments is 10 or so.
64 Parallelism int
65}
66
67func (c *ControlLoopConfig) getControlLoopConfig() *ControlLoopConfig {
68 return c
69}
70
71// flagLimiter configures a *rate.Limiter as a flag.
72func flagLimiter(l **rate.Limiter, name, defval, help string) {
73 syntax := "'duration,count' eg. '2m,10' for a 10-sized bucket refilled at one token every 2 minutes"
74 help = help + fmt.Sprintf(" (default: %q, syntax: %s)", defval, syntax)
75 flag.Func(name, help, func(val string) error {
76 if val == "" {
77 val = defval
78 }
79 parts := strings.Split(val, ",")
80 if len(parts) != 2 {
81 return fmt.Errorf("invalid syntax, want: %s", syntax)
82 }
83 duration, err := time.ParseDuration(parts[0])
84 if err != nil {
85 return fmt.Errorf("invalid duration: %w", err)
86 }
87 refill, err := strconv.ParseUint(parts[1], 10, 31)
88 if err != nil {
89 return fmt.Errorf("invalid refill rate: %w", err)
90 }
91 *l = rate.NewLimiter(rate.Every(duration), int(refill))
92 return nil
93 })
94 flag.Set(name, defval)
95}
96
97// RegisterFlags should be called on this configuration whenever the embeddeding
98// component/configuration is registering its own flags. The prefix should be the
99// name of the component.
100func (c *ControlLoopConfig) RegisterFlags(prefix string) {
101 flagLimiter(&c.DBQueryLimiter, prefix+"_db_query_rate", "250ms,8", "Rate limiting for BMDB queries")
102 flag.IntVar(&c.Parallelism, prefix+"_loop_parallelism", 1, "How many initializer instances to run in parallel, ie. how many agents to attempt to (re)start at once")
103}
104
105// Check should be called after RegisterFlags but before the control loop is ran.
106// If an error is returned, the control loop cannot start.
107func (c *ControlLoopConfig) Check() error {
108 if c.DBQueryLimiter == nil {
109 return fmt.Errorf("DBQueryLimiter must be configured")
110 }
111 if c.Parallelism == 0 {
112 c.Parallelism = 1
113 }
114 return nil
115}
116
117// RunControlLoop runs the given controlLoop implementation against the BMDB. The
118// loop will be run with the parallelism and rate configured by the
119// ControlLoopConfig embedded or otherwise returned by the controlLoop.
120func RunControlLoop(ctx context.Context, conn *bmdb.Connection, loop controlLoop) error {
121 clr := &controlLoopRunner{
122 loop: loop,
123 config: loop.getControlLoopConfig(),
124 }
125 return clr.run(ctx, conn)
126}
127
128// controlLoopRunner is a configured control loop with an underlying control loop
129// implementation.
130type controlLoopRunner struct {
131 config *ControlLoopConfig
132 loop controlLoop
133}
134
135// run the control loops(s) (depending on opts.Parallelism) blocking the current
136// goroutine until the given context expires and all provisioners quit.
137func (r *controlLoopRunner) run(ctx context.Context, conn *bmdb.Connection) error {
138 eg := errgroup.Group{}
139 for j := 0; j < r.config.Parallelism; j += 1 {
140 eg.Go(func() error {
141 return r.runOne(ctx, conn)
142 })
143 }
144 return eg.Wait()
145}
146
147// run the control loop blocking the current goroutine until the given context
148// expires.
149func (r *controlLoopRunner) runOne(ctx context.Context, conn *bmdb.Connection) error {
150 var err error
151
152 // Maintain a BMDB session as long as possible.
153 var sess *bmdb.Session
154 for {
155 if sess == nil {
156 sess, err = conn.StartSession(ctx)
157 if err != nil {
158 return fmt.Errorf("could not start BMDB session: %w", err)
159 }
160 }
161 // Inside that session, run the main logic.
162 err := r.runInSession(ctx, sess)
163
164 switch {
165 case err == nil:
166 case errors.Is(err, ctx.Err()):
167 return err
168 case errors.Is(err, bmdb.ErrSessionExpired):
169 klog.Errorf("Session expired, restarting...")
170 sess = nil
171 time.Sleep(time.Second)
172 case err != nil:
173 klog.Errorf("Processing failed: %v", err)
174 // TODO(q3k): close session
175 time.Sleep(time.Second)
176 }
177 }
178}
179
180// runInSession executes one iteration of the control loop within a BMDB session.
181// This control loop attempts to start or re-start the agent on any machines that
182// need this per the BMDB.
183func (r *controlLoopRunner) runInSession(ctx context.Context, sess *bmdb.Session) error {
184 t, err := r.source(ctx, sess)
185 if err != nil {
186 return fmt.Errorf("could not source machine: %w", err)
187 }
188 if t == nil {
189 return nil
190 }
191 defer t.work.Cancel(ctx)
192
193 if err := r.loop.processMachine(ctx, t); err != nil {
194 klog.Errorf("Failed to process machine %s: %v", t.machine.MachineID, err)
Serge Bazanski20312b42023-04-19 13:49:47 +0200195 backoff := bmdb.Backoff{
196 Initial: time.Minute,
197 Maximum: 2 * time.Hour,
198 Exponent: 1.1,
199 }
200 err = t.work.Fail(ctx, &backoff, fmt.Sprintf("failed to process: %v", err))
Serge Bazanski86a714d2023-04-17 15:54:21 +0200201 return err
202 }
203 return nil
204}
205
206// source supplies returns a BMDB-locked server ready for processing by the
207// control loop, locked by a work item. If both task and error are nil, then
208// there are no machines needed to be initialized. The returned work item in task
209// _must_ be canceled or finished by the caller.
210func (r *controlLoopRunner) source(ctx context.Context, sess *bmdb.Session) (*task, error) {
211 r.config.DBQueryLimiter.Wait(ctx)
212
213 var machine *model.MachineProvided
214 work, err := sess.Work(ctx, model.ProcessShepherdAccess, func(q *model.Queries) ([]uuid.UUID, error) {
215 machines, err := r.loop.getMachines(ctx, q, 1)
216 if err != nil {
217 return nil, err
218 }
219 if len(machines) < 1 {
220 return nil, bmdb.ErrNothingToDo
221 }
222 machine = &machines[0]
223 return []uuid.UUID{machines[0].MachineID}, nil
224 })
225
226 if errors.Is(err, bmdb.ErrNothingToDo) {
227 return nil, nil
228 }
229
230 if err != nil {
231 return nil, fmt.Errorf("while querying BMDB agent candidates: %w", err)
232 }
233
234 return &task{
235 machine: machine,
236 work: work,
237 }, nil
238}