| package scruffy |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "time" |
| |
| "github.com/prometheus/client_golang/prometheus" |
| "k8s.io/klog/v2" |
| |
| "source.monogon.dev/cloud/bmaas/bmdb/model" |
| ) |
| |
| // bmdbStatsRunner collects metrics from the BMDB and exposes them as Prometheus |
| // metrics via a registry passed to newBMDBStatsRunner. |
| type bmdbStatsRunner struct { |
| s *Server |
| collectors []*statsCollector |
| } |
| |
| // A statsCollectorDefinition describes how to gather a given metric via a BMDB |
| // SQL query. |
| type statsCollectorDefinition struct { |
| // name of the metric. Used in actual metric name, prefixed with 'bmdb_stats_'. |
| name string |
| // help string emitted in prometheus endpoint. |
| help string |
| // labels is the label 'type definition', containing information about the |
| // dimensions of this metric. |
| labels labelDefinitions |
| // query used to retrieve the metric data. |
| query func(*model.Queries, context.Context) ([]model.MetricValue, error) |
| } |
| |
| // labelProcess is the type definition of the 'process' label 'type', which is a |
| // fixed-cardinality representation of the database Process enum. |
| var labelProcess = labelDefinition{ |
| name: "process", |
| initialValues: []string{ |
| string(model.ProcessShepherdAccess), |
| string(model.ProcessShepherdAgentStart), |
| string(model.ProcessShepherdRecovery), |
| }, |
| } |
| |
| var collectorDefs = []statsCollectorDefinition{ |
| { |
| name: "active_backoffs", |
| help: "Number of active backoffs, partitioned by process. There may be more than one active backoff per machine.", |
| query: model.WrapLabeledMetric((*model.Queries).CountActiveBackoffs), |
| labels: []labelDefinition{labelProcess}, |
| }, |
| { |
| name: "active_work", |
| help: "Number of active work, partitioned by process. There may be more than one active work item per machine.", |
| query: model.WrapLabeledMetric((*model.Queries).CountActiveWork), |
| labels: []labelDefinition{labelProcess}, |
| }, |
| { |
| name: "machines", |
| help: "Number of machines in the BMDB.", |
| query: model.WrapSimpleMetric((*model.Queries).CountMachines), |
| }, |
| { |
| name: "machines_provided", |
| help: "Number of provided machines in the BMDB.", |
| query: model.WrapSimpleMetric((*model.Queries).CountMachinesProvided), |
| }, |
| { |
| name: "machines_heartbeating", |
| help: "Number of machines with a currently heartbeating agent.", |
| query: model.WrapSimpleMetric((*model.Queries).CountMachinesAgentHeartbeating), |
| }, |
| { |
| name: "machines_pending_installation", |
| help: "Number of machines pending installation.", |
| query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationPending), |
| }, |
| { |
| name: "machines_installed", |
| help: "Number of machines succesfully installed.", |
| query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationComplete), |
| }, |
| { |
| name: "machines_pending_agent_start", |
| help: "Number of machines pending the agent start workflow.", |
| query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentStart), |
| }, |
| { |
| name: "machines_pending_agent_recovery", |
| help: "Number of machines pending the agent recovery workflow.", |
| query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentRecovery), |
| }, |
| } |
| |
| // A statsCollector is an instantiated statsCollectorDefinition which carries the |
| // actual prometheus gauge backing the metric. |
| type statsCollector struct { |
| gauge *prometheus.GaugeVec |
| def *statsCollectorDefinition |
| } |
| |
| // setDefaults emits gauges with zero values for all metrics of the runner, using |
| // the initialLabel data gathered from each metric definition. |
| func (b *bmdbStatsRunner) setDefaults() { |
| for _, collector := range b.collectors { |
| info := collector.def |
| initial := info.labels.initialLabels() |
| if len(initial) == 0 { |
| collector.gauge.With(nil).Set(0.0) |
| } else { |
| for _, labels := range initial { |
| collector.gauge.With(labels).Set(0.0) |
| } |
| } |
| } |
| } |
| |
| // newBMDBStatsRunner builds a bmdbStatsRunner from the collectorDefs above. The |
| // bmdbStatsRunner then has the given's Server BMDB connection bound to it and |
| // can perform actual database statistic gathering. |
| func newBMDBStatsRunner(s *Server, reg *prometheus.Registry) *bmdbStatsRunner { |
| var collectors []*statsCollector |
| |
| for _, info := range collectorDefs { |
| info := info |
| gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ |
| Name: "bmdb_stats_" + info.name, |
| Help: info.help, |
| }, info.labels.names()) |
| reg.MustRegister(gauge) |
| |
| collectors = append(collectors, &statsCollector{ |
| gauge: gauge, |
| def: &info, |
| }) |
| } |
| |
| res := &bmdbStatsRunner{ |
| s: s, |
| collectors: collectors, |
| } |
| res.setDefaults() |
| return res |
| } |
| |
| func (b *bmdbStatsRunner) run(ctx context.Context) { |
| klog.Infof("Starting stats runner...") |
| |
| ti := time.NewTicker(b.s.Config.StatsRunnerRate) |
| |
| for { |
| err := b.runOnce(ctx) |
| if err != nil { |
| if errors.Is(err, ctx.Err()) { |
| return |
| } |
| klog.Errorf("Stats run failed: %v", err) |
| } |
| select { |
| case <-ti.C: |
| case <-ctx.Done(): |
| klog.Infof("Exiting stats runner (%v)...", ctx.Err()) |
| return |
| } |
| } |
| } |
| |
| func (b *bmdbStatsRunner) runOnce(ctx context.Context) error { |
| sess, err := b.s.session(ctx) |
| if err != nil { |
| return err |
| } |
| |
| results := make(map[string][]model.MetricValue) |
| // TODO(q3k): don't fail entire run if we can't collect just one metric. |
| err = sess.Transact(ctx, func(q *model.Queries) error { |
| for _, c := range b.collectors { |
| res, err := c.def.query(q, ctx) |
| if err != nil { |
| return fmt.Errorf("collecting %s failed: %v", c.def.name, err) |
| } else { |
| results[c.def.name] = res |
| } |
| } |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| |
| b.setDefaults() |
| for _, c := range b.collectors { |
| for _, m := range results[c.def.name] { |
| klog.Infof("Setting %s (%v) to %d", c.def.name, m.Labels, m.Count) |
| c.gauge.With(m.Labels).Set(float64(m.Count)) |
| } |
| } |
| |
| return nil |
| } |