Serge Bazanski | 6f59951 | 2023-04-26 19:08:19 +0200 | [diff] [blame] | 1 | package scruffy |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "errors" |
| 6 | "fmt" |
| 7 | "time" |
| 8 | |
| 9 | "github.com/prometheus/client_golang/prometheus" |
| 10 | "k8s.io/klog/v2" |
| 11 | |
| 12 | "source.monogon.dev/cloud/bmaas/bmdb/model" |
| 13 | ) |
| 14 | |
| 15 | // bmdbStatsRunner collects metrics from the BMDB and exposes them as Prometheus |
| 16 | // metrics via a registry passed to newBMDBStatsRunner. |
| 17 | type bmdbStatsRunner struct { |
| 18 | s *Server |
| 19 | collectors []*statsCollector |
| 20 | } |
| 21 | |
| 22 | // A statsCollectorDefinition describes how to gather a given metric via a BMDB |
| 23 | // SQL query. |
| 24 | type statsCollectorDefinition struct { |
| 25 | // name of the metric. Used in actual metric name, prefixed with 'bmdb_stats_'. |
| 26 | name string |
| 27 | // help string emitted in prometheus endpoint. |
| 28 | help string |
| 29 | // labels is the label 'type definition', containing information about the |
| 30 | // dimensions of this metric. |
| 31 | labels labelDefinitions |
| 32 | // query used to retrieve the metric data. |
| 33 | query func(*model.Queries, context.Context) ([]model.MetricValue, error) |
| 34 | } |
| 35 | |
| 36 | // labelProcess is the type definition of the 'process' label 'type', which is a |
| 37 | // fixed-cardinality representation of the database Process enum. |
| 38 | var labelProcess = labelDefinition{ |
| 39 | name: "process", |
| 40 | initialValues: []string{ |
| 41 | string(model.ProcessShepherdAccess), |
| 42 | string(model.ProcessShepherdAgentStart), |
| 43 | string(model.ProcessShepherdRecovery), |
| 44 | }, |
| 45 | } |
| 46 | |
| 47 | var collectorDefs = []statsCollectorDefinition{ |
| 48 | { |
| 49 | name: "active_backoffs", |
| 50 | help: "Number of active backoffs, partitioned by process. There may be more than one active backoff per machine.", |
| 51 | query: model.WrapLabeledMetric((*model.Queries).CountActiveBackoffs), |
| 52 | labels: []labelDefinition{labelProcess}, |
| 53 | }, |
| 54 | { |
| 55 | name: "active_work", |
| 56 | help: "Number of active work, partitioned by process. There may be more than one active work item per machine.", |
| 57 | query: model.WrapLabeledMetric((*model.Queries).CountActiveWork), |
| 58 | labels: []labelDefinition{labelProcess}, |
| 59 | }, |
| 60 | { |
| 61 | name: "machines", |
| 62 | help: "Number of machines in the BMDB.", |
| 63 | query: model.WrapSimpleMetric((*model.Queries).CountMachines), |
| 64 | }, |
| 65 | { |
| 66 | name: "machines_provided", |
| 67 | help: "Number of provided machines in the BMDB.", |
| 68 | query: model.WrapSimpleMetric((*model.Queries).CountMachinesProvided), |
| 69 | }, |
| 70 | { |
| 71 | name: "machines_heartbeating", |
| 72 | help: "Number of machines with a currently heartbeating agent.", |
| 73 | query: model.WrapSimpleMetric((*model.Queries).CountMachinesAgentHeartbeating), |
| 74 | }, |
| 75 | { |
| 76 | name: "machines_pending_installation", |
| 77 | help: "Number of machines pending installation.", |
| 78 | query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationPending), |
| 79 | }, |
| 80 | { |
| 81 | name: "machines_installed", |
| 82 | help: "Number of machines succesfully installed.", |
| 83 | query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationComplete), |
| 84 | }, |
| 85 | { |
| 86 | name: "machines_pending_agent_start", |
| 87 | help: "Number of machines pending the agent start workflow.", |
| 88 | query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentStart), |
| 89 | }, |
| 90 | { |
| 91 | name: "machines_pending_agent_recovery", |
| 92 | help: "Number of machines pending the agent recovery workflow.", |
| 93 | query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentRecovery), |
| 94 | }, |
| 95 | } |
| 96 | |
| 97 | // A statsCollector is an instantiated statsCollectorDefinition which carries the |
| 98 | // actual prometheus gauge backing the metric. |
| 99 | type statsCollector struct { |
| 100 | gauge *prometheus.GaugeVec |
| 101 | def *statsCollectorDefinition |
| 102 | } |
| 103 | |
| 104 | // setDefaults emits gauges with zero values for all metrics of the runner, using |
| 105 | // the initialLabel data gathered from each metric definition. |
| 106 | func (b *bmdbStatsRunner) setDefaults() { |
| 107 | for _, collector := range b.collectors { |
| 108 | info := collector.def |
| 109 | initial := info.labels.initialLabels() |
| 110 | if len(initial) == 0 { |
| 111 | collector.gauge.With(nil).Set(0.0) |
| 112 | } else { |
| 113 | for _, labels := range initial { |
| 114 | collector.gauge.With(labels).Set(0.0) |
| 115 | } |
| 116 | } |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | // newBMDBStatsRunner builds a bmdbStatsRunner from the collectorDefs above. The |
| 121 | // bmdbStatsRunner then has the given's Server BMDB connection bound to it and |
| 122 | // can perform actual database statistic gathering. |
| 123 | func newBMDBStatsRunner(s *Server, reg *prometheus.Registry) *bmdbStatsRunner { |
| 124 | var collectors []*statsCollector |
| 125 | |
| 126 | for _, info := range collectorDefs { |
| 127 | info := info |
| 128 | gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ |
| 129 | Name: "bmdb_stats_" + info.name, |
| 130 | Help: info.help, |
| 131 | }, info.labels.names()) |
| 132 | reg.MustRegister(gauge) |
| 133 | |
| 134 | collectors = append(collectors, &statsCollector{ |
| 135 | gauge: gauge, |
| 136 | def: &info, |
| 137 | }) |
| 138 | } |
| 139 | |
| 140 | res := &bmdbStatsRunner{ |
| 141 | s: s, |
| 142 | collectors: collectors, |
| 143 | } |
| 144 | res.setDefaults() |
| 145 | return res |
| 146 | } |
| 147 | |
| 148 | func (b *bmdbStatsRunner) run(ctx context.Context) { |
| 149 | klog.Infof("Starting stats runner...") |
| 150 | |
| 151 | ti := time.NewTicker(b.s.Config.StatsRunnerRate) |
| 152 | |
| 153 | for { |
| 154 | err := b.runOnce(ctx) |
| 155 | if err != nil { |
| 156 | if errors.Is(err, ctx.Err()) { |
| 157 | return |
| 158 | } |
| 159 | klog.Errorf("Stats run failed: %v", err) |
| 160 | } |
| 161 | select { |
| 162 | case <-ti.C: |
| 163 | case <-ctx.Done(): |
| 164 | klog.Infof("Exiting stats runner (%v)...", ctx.Err()) |
| 165 | return |
| 166 | } |
| 167 | } |
| 168 | } |
| 169 | |
| 170 | func (b *bmdbStatsRunner) runOnce(ctx context.Context) error { |
| 171 | sess, err := b.s.session(ctx) |
| 172 | if err != nil { |
| 173 | return err |
| 174 | } |
| 175 | |
| 176 | results := make(map[string][]model.MetricValue) |
| 177 | // TODO(q3k): don't fail entire run if we can't collect just one metric. |
| 178 | err = sess.Transact(ctx, func(q *model.Queries) error { |
| 179 | for _, c := range b.collectors { |
| 180 | res, err := c.def.query(q, ctx) |
| 181 | if err != nil { |
| 182 | return fmt.Errorf("collecting %s failed: %v", c.def.name, err) |
| 183 | } else { |
| 184 | results[c.def.name] = res |
| 185 | } |
| 186 | } |
| 187 | return nil |
| 188 | }) |
| 189 | if err != nil { |
| 190 | return err |
| 191 | } |
| 192 | |
| 193 | b.setDefaults() |
| 194 | for _, c := range b.collectors { |
| 195 | for _, m := range results[c.def.name] { |
| 196 | klog.Infof("Setting %s (%v) to %d", c.def.name, m.Labels, m.Count) |
| 197 | c.gauge.With(m.Labels).Set(float64(m.Count)) |
| 198 | } |
| 199 | } |
| 200 | |
| 201 | return nil |
| 202 | } |