blob: 68de363440381cafd28ba5d1799a526ce6669da2 [file] [log] [blame]
Serge Bazanski6f599512023-04-26 19:08:19 +02001package scruffy
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/prometheus/client_golang/prometheus"
10 "k8s.io/klog/v2"
11
12 "source.monogon.dev/cloud/bmaas/bmdb/model"
13)
14
15// bmdbStatsRunner collects metrics from the BMDB and exposes them as Prometheus
16// metrics via a registry passed to newBMDBStatsRunner.
17type bmdbStatsRunner struct {
18 s *Server
19 collectors []*statsCollector
20}
21
22// A statsCollectorDefinition describes how to gather a given metric via a BMDB
23// SQL query.
24type statsCollectorDefinition struct {
25 // name of the metric. Used in actual metric name, prefixed with 'bmdb_stats_'.
26 name string
27 // help string emitted in prometheus endpoint.
28 help string
29 // labels is the label 'type definition', containing information about the
30 // dimensions of this metric.
31 labels labelDefinitions
32 // query used to retrieve the metric data.
33 query func(*model.Queries, context.Context) ([]model.MetricValue, error)
34}
35
36// labelProcess is the type definition of the 'process' label 'type', which is a
37// fixed-cardinality representation of the database Process enum.
38var labelProcess = labelDefinition{
39 name: "process",
40 initialValues: []string{
41 string(model.ProcessShepherdAccess),
42 string(model.ProcessShepherdAgentStart),
43 string(model.ProcessShepherdRecovery),
44 },
45}
46
47var collectorDefs = []statsCollectorDefinition{
48 {
49 name: "active_backoffs",
50 help: "Number of active backoffs, partitioned by process. There may be more than one active backoff per machine.",
51 query: model.WrapLabeledMetric((*model.Queries).CountActiveBackoffs),
52 labels: []labelDefinition{labelProcess},
53 },
54 {
55 name: "active_work",
56 help: "Number of active work, partitioned by process. There may be more than one active work item per machine.",
57 query: model.WrapLabeledMetric((*model.Queries).CountActiveWork),
58 labels: []labelDefinition{labelProcess},
59 },
60 {
61 name: "machines",
62 help: "Number of machines in the BMDB.",
63 query: model.WrapSimpleMetric((*model.Queries).CountMachines),
64 },
65 {
66 name: "machines_provided",
67 help: "Number of provided machines in the BMDB.",
68 query: model.WrapSimpleMetric((*model.Queries).CountMachinesProvided),
69 },
70 {
71 name: "machines_heartbeating",
72 help: "Number of machines with a currently heartbeating agent.",
73 query: model.WrapSimpleMetric((*model.Queries).CountMachinesAgentHeartbeating),
74 },
75 {
76 name: "machines_pending_installation",
77 help: "Number of machines pending installation.",
78 query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationPending),
79 },
80 {
81 name: "machines_installed",
82 help: "Number of machines succesfully installed.",
83 query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationComplete),
84 },
85 {
86 name: "machines_pending_agent_start",
87 help: "Number of machines pending the agent start workflow.",
88 query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentStart),
89 },
90 {
91 name: "machines_pending_agent_recovery",
92 help: "Number of machines pending the agent recovery workflow.",
93 query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentRecovery),
94 },
95}
96
97// A statsCollector is an instantiated statsCollectorDefinition which carries the
98// actual prometheus gauge backing the metric.
99type statsCollector struct {
100 gauge *prometheus.GaugeVec
101 def *statsCollectorDefinition
102}
103
104// setDefaults emits gauges with zero values for all metrics of the runner, using
105// the initialLabel data gathered from each metric definition.
106func (b *bmdbStatsRunner) setDefaults() {
107 for _, collector := range b.collectors {
108 info := collector.def
109 initial := info.labels.initialLabels()
110 if len(initial) == 0 {
111 collector.gauge.With(nil).Set(0.0)
112 } else {
113 for _, labels := range initial {
114 collector.gauge.With(labels).Set(0.0)
115 }
116 }
117 }
118}
119
120// newBMDBStatsRunner builds a bmdbStatsRunner from the collectorDefs above. The
121// bmdbStatsRunner then has the given's Server BMDB connection bound to it and
122// can perform actual database statistic gathering.
123func newBMDBStatsRunner(s *Server, reg *prometheus.Registry) *bmdbStatsRunner {
124 var collectors []*statsCollector
125
126 for _, info := range collectorDefs {
127 info := info
128 gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
129 Name: "bmdb_stats_" + info.name,
130 Help: info.help,
131 }, info.labels.names())
132 reg.MustRegister(gauge)
133
134 collectors = append(collectors, &statsCollector{
135 gauge: gauge,
136 def: &info,
137 })
138 }
139
140 res := &bmdbStatsRunner{
141 s: s,
142 collectors: collectors,
143 }
144 res.setDefaults()
145 return res
146}
147
148func (b *bmdbStatsRunner) run(ctx context.Context) {
149 klog.Infof("Starting stats runner...")
150
151 ti := time.NewTicker(b.s.Config.StatsRunnerRate)
152
153 for {
154 err := b.runOnce(ctx)
155 if err != nil {
156 if errors.Is(err, ctx.Err()) {
157 return
158 }
159 klog.Errorf("Stats run failed: %v", err)
160 }
161 select {
162 case <-ti.C:
163 case <-ctx.Done():
164 klog.Infof("Exiting stats runner (%v)...", ctx.Err())
165 return
166 }
167 }
168}
169
170func (b *bmdbStatsRunner) runOnce(ctx context.Context) error {
171 sess, err := b.s.session(ctx)
172 if err != nil {
173 return err
174 }
175
176 results := make(map[string][]model.MetricValue)
177 // TODO(q3k): don't fail entire run if we can't collect just one metric.
178 err = sess.Transact(ctx, func(q *model.Queries) error {
179 for _, c := range b.collectors {
180 res, err := c.def.query(q, ctx)
181 if err != nil {
182 return fmt.Errorf("collecting %s failed: %v", c.def.name, err)
183 } else {
184 results[c.def.name] = res
185 }
186 }
187 return nil
188 })
189 if err != nil {
190 return err
191 }
192
193 b.setDefaults()
194 for _, c := range b.collectors {
195 for _, m := range results[c.def.name] {
196 klog.Infof("Setting %s (%v) to %d", c.def.name, m.Labels, m.Count)
197 c.gauge.With(m.Labels).Set(float64(m.Count))
198 }
199 }
200
201 return nil
202}