cloud/bmaas/bmdb/scruffy: initialize, implement BMDB metrics
This creates a new BMaaS component, Scruffy the Janitor.
Scruffy will run a bunch of housekeeping jobs that aren't tied to a
particular provider or even region. Currently Scruffy just collects BMDB
metrics by periodically polling the BMDB SQL database.
Change-Id: Icafa714811757eaaf31fed43184ded8512bde067
Reviewed-on: https://review.monogon.dev/c/monogon/+/1819
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/bmaas/scruffy/bmdb_stats.go b/cloud/bmaas/scruffy/bmdb_stats.go
new file mode 100644
index 0000000..68de363
--- /dev/null
+++ b/cloud/bmaas/scruffy/bmdb_stats.go
@@ -0,0 +1,202 @@
+package scruffy
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+)
+
+// bmdbStatsRunner collects metrics from the BMDB and exposes them as Prometheus
+// metrics via a registry passed to newBMDBStatsRunner.
+type bmdbStatsRunner struct {
+ s *Server
+ collectors []*statsCollector
+}
+
+// A statsCollectorDefinition describes how to gather a given metric via a BMDB
+// SQL query.
+type statsCollectorDefinition struct {
+ // name of the metric. Used in actual metric name, prefixed with 'bmdb_stats_'.
+ name string
+ // help string emitted in prometheus endpoint.
+ help string
+ // labels is the label 'type definition', containing information about the
+ // dimensions of this metric.
+ labels labelDefinitions
+ // query used to retrieve the metric data.
+ query func(*model.Queries, context.Context) ([]model.MetricValue, error)
+}
+
+// labelProcess is the type definition of the 'process' label 'type', which is a
+// fixed-cardinality representation of the database Process enum.
+var labelProcess = labelDefinition{
+ name: "process",
+ initialValues: []string{
+ string(model.ProcessShepherdAccess),
+ string(model.ProcessShepherdAgentStart),
+ string(model.ProcessShepherdRecovery),
+ },
+}
+
+var collectorDefs = []statsCollectorDefinition{
+ {
+ name: "active_backoffs",
+ help: "Number of active backoffs, partitioned by process. There may be more than one active backoff per machine.",
+ query: model.WrapLabeledMetric((*model.Queries).CountActiveBackoffs),
+ labels: []labelDefinition{labelProcess},
+ },
+ {
+ name: "active_work",
+ help: "Number of active work, partitioned by process. There may be more than one active work item per machine.",
+ query: model.WrapLabeledMetric((*model.Queries).CountActiveWork),
+ labels: []labelDefinition{labelProcess},
+ },
+ {
+ name: "machines",
+ help: "Number of machines in the BMDB.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachines),
+ },
+ {
+ name: "machines_provided",
+ help: "Number of provided machines in the BMDB.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesProvided),
+ },
+ {
+ name: "machines_heartbeating",
+ help: "Number of machines with a currently heartbeating agent.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesAgentHeartbeating),
+ },
+ {
+ name: "machines_pending_installation",
+ help: "Number of machines pending installation.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationPending),
+ },
+ {
+ name: "machines_installed",
+ help: "Number of machines succesfully installed.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationComplete),
+ },
+ {
+ name: "machines_pending_agent_start",
+ help: "Number of machines pending the agent start workflow.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentStart),
+ },
+ {
+ name: "machines_pending_agent_recovery",
+ help: "Number of machines pending the agent recovery workflow.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentRecovery),
+ },
+}
+
+// A statsCollector is an instantiated statsCollectorDefinition which carries the
+// actual prometheus gauge backing the metric.
+type statsCollector struct {
+ gauge *prometheus.GaugeVec
+ def *statsCollectorDefinition
+}
+
+// setDefaults emits gauges with zero values for all metrics of the runner, using
+// the initialLabel data gathered from each metric definition.
+func (b *bmdbStatsRunner) setDefaults() {
+ for _, collector := range b.collectors {
+ info := collector.def
+ initial := info.labels.initialLabels()
+ if len(initial) == 0 {
+ collector.gauge.With(nil).Set(0.0)
+ } else {
+ for _, labels := range initial {
+ collector.gauge.With(labels).Set(0.0)
+ }
+ }
+ }
+}
+
+// newBMDBStatsRunner builds a bmdbStatsRunner from the collectorDefs above. The
+// bmdbStatsRunner then has the given's Server BMDB connection bound to it and
+// can perform actual database statistic gathering.
+func newBMDBStatsRunner(s *Server, reg *prometheus.Registry) *bmdbStatsRunner {
+ var collectors []*statsCollector
+
+ for _, info := range collectorDefs {
+ info := info
+ gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "bmdb_stats_" + info.name,
+ Help: info.help,
+ }, info.labels.names())
+ reg.MustRegister(gauge)
+
+ collectors = append(collectors, &statsCollector{
+ gauge: gauge,
+ def: &info,
+ })
+ }
+
+ res := &bmdbStatsRunner{
+ s: s,
+ collectors: collectors,
+ }
+ res.setDefaults()
+ return res
+}
+
+func (b *bmdbStatsRunner) run(ctx context.Context) {
+ klog.Infof("Starting stats runner...")
+
+ ti := time.NewTicker(b.s.Config.StatsRunnerRate)
+
+ for {
+ err := b.runOnce(ctx)
+ if err != nil {
+ if errors.Is(err, ctx.Err()) {
+ return
+ }
+ klog.Errorf("Stats run failed: %v", err)
+ }
+ select {
+ case <-ti.C:
+ case <-ctx.Done():
+ klog.Infof("Exiting stats runner (%v)...", ctx.Err())
+ return
+ }
+ }
+}
+
+func (b *bmdbStatsRunner) runOnce(ctx context.Context) error {
+ sess, err := b.s.session(ctx)
+ if err != nil {
+ return err
+ }
+
+ results := make(map[string][]model.MetricValue)
+ // TODO(q3k): don't fail entire run if we can't collect just one metric.
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ for _, c := range b.collectors {
+ res, err := c.def.query(q, ctx)
+ if err != nil {
+ return fmt.Errorf("collecting %s failed: %v", c.def.name, err)
+ } else {
+ results[c.def.name] = res
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+
+ b.setDefaults()
+ for _, c := range b.collectors {
+ for _, m := range results[c.def.name] {
+ klog.Infof("Setting %s (%v) to %d", c.def.name, m.Labels, m.Count)
+ c.gauge.With(m.Labels).Set(float64(m.Count))
+ }
+ }
+
+ return nil
+}