cloud/bmaas/bmdb/scruffy: initialize, implement BMDB metrics

This creates a new BMaaS component, Scruffy the Janitor.

Scruffy will run a bunch of housekeeping jobs that aren't tied to a
particular provider or even region. Currently Scruffy just collects BMDB
metrics by periodically polling the BMDB SQL database.

Change-Id: Icafa714811757eaaf31fed43184ded8512bde067
Reviewed-on: https://review.monogon.dev/c/monogon/+/1819
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/bmaas/scruffy/bmdb_stats.go b/cloud/bmaas/scruffy/bmdb_stats.go
new file mode 100644
index 0000000..68de363
--- /dev/null
+++ b/cloud/bmaas/scruffy/bmdb_stats.go
@@ -0,0 +1,202 @@
+package scruffy
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+)
+
+// bmdbStatsRunner collects metrics from the BMDB and exposes them as Prometheus
+// metrics via a registry passed to newBMDBStatsRunner.
+type bmdbStatsRunner struct {
+	s          *Server
+	collectors []*statsCollector
+}
+
+// A statsCollectorDefinition describes how to gather a given metric via a BMDB
+// SQL query.
+type statsCollectorDefinition struct {
+	// name of the metric. Used in actual metric name, prefixed with 'bmdb_stats_'.
+	name string
+	// help string emitted in prometheus endpoint.
+	help string
+	// labels is the label 'type definition', containing information about the
+	// dimensions of this metric.
+	labels labelDefinitions
+	// query used to retrieve the metric data.
+	query func(*model.Queries, context.Context) ([]model.MetricValue, error)
+}
+
+// labelProcess is the type definition of the 'process' label 'type', which is a
+// fixed-cardinality representation of the database Process enum.
+var labelProcess = labelDefinition{
+	name: "process",
+	initialValues: []string{
+		string(model.ProcessShepherdAccess),
+		string(model.ProcessShepherdAgentStart),
+		string(model.ProcessShepherdRecovery),
+	},
+}
+
+var collectorDefs = []statsCollectorDefinition{
+	{
+		name:   "active_backoffs",
+		help:   "Number of active backoffs, partitioned by process. There may be more than one active backoff per machine.",
+		query:  model.WrapLabeledMetric((*model.Queries).CountActiveBackoffs),
+		labels: []labelDefinition{labelProcess},
+	},
+	{
+		name:   "active_work",
+		help:   "Number of active work, partitioned by process. There may be more than one active work item per machine.",
+		query:  model.WrapLabeledMetric((*model.Queries).CountActiveWork),
+		labels: []labelDefinition{labelProcess},
+	},
+	{
+		name:  "machines",
+		help:  "Number of machines in the BMDB.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachines),
+	},
+	{
+		name:  "machines_provided",
+		help:  "Number of provided machines in the BMDB.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesProvided),
+	},
+	{
+		name:  "machines_heartbeating",
+		help:  "Number of machines with a currently heartbeating agent.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesAgentHeartbeating),
+	},
+	{
+		name:  "machines_pending_installation",
+		help:  "Number of machines pending installation.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationPending),
+	},
+	{
+		name:  "machines_installed",
+		help:  "Number of machines succesfully installed.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationComplete),
+	},
+	{
+		name:  "machines_pending_agent_start",
+		help:  "Number of machines pending the agent start workflow.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentStart),
+	},
+	{
+		name:  "machines_pending_agent_recovery",
+		help:  "Number of machines pending the agent recovery workflow.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentRecovery),
+	},
+}
+
+// A statsCollector is an instantiated statsCollectorDefinition which carries the
+// actual prometheus gauge backing the metric.
+type statsCollector struct {
+	gauge *prometheus.GaugeVec
+	def   *statsCollectorDefinition
+}
+
+// setDefaults emits gauges with zero values for all metrics of the runner, using
+// the initialLabel data gathered from each metric definition.
+func (b *bmdbStatsRunner) setDefaults() {
+	for _, collector := range b.collectors {
+		info := collector.def
+		initial := info.labels.initialLabels()
+		if len(initial) == 0 {
+			collector.gauge.With(nil).Set(0.0)
+		} else {
+			for _, labels := range initial {
+				collector.gauge.With(labels).Set(0.0)
+			}
+		}
+	}
+}
+
+// newBMDBStatsRunner builds a bmdbStatsRunner from the collectorDefs above. The
+// bmdbStatsRunner then has the given's Server BMDB connection bound to it and
+// can perform actual database statistic gathering.
+func newBMDBStatsRunner(s *Server, reg *prometheus.Registry) *bmdbStatsRunner {
+	var collectors []*statsCollector
+
+	for _, info := range collectorDefs {
+		info := info
+		gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "bmdb_stats_" + info.name,
+			Help: info.help,
+		}, info.labels.names())
+		reg.MustRegister(gauge)
+
+		collectors = append(collectors, &statsCollector{
+			gauge: gauge,
+			def:   &info,
+		})
+	}
+
+	res := &bmdbStatsRunner{
+		s:          s,
+		collectors: collectors,
+	}
+	res.setDefaults()
+	return res
+}
+
+func (b *bmdbStatsRunner) run(ctx context.Context) {
+	klog.Infof("Starting stats runner...")
+
+	ti := time.NewTicker(b.s.Config.StatsRunnerRate)
+
+	for {
+		err := b.runOnce(ctx)
+		if err != nil {
+			if errors.Is(err, ctx.Err()) {
+				return
+			}
+			klog.Errorf("Stats run failed: %v", err)
+		}
+		select {
+		case <-ti.C:
+		case <-ctx.Done():
+			klog.Infof("Exiting stats runner (%v)...", ctx.Err())
+			return
+		}
+	}
+}
+
+func (b *bmdbStatsRunner) runOnce(ctx context.Context) error {
+	sess, err := b.s.session(ctx)
+	if err != nil {
+		return err
+	}
+
+	results := make(map[string][]model.MetricValue)
+	// TODO(q3k): don't fail entire run if we can't collect just one metric.
+	err = sess.Transact(ctx, func(q *model.Queries) error {
+		for _, c := range b.collectors {
+			res, err := c.def.query(q, ctx)
+			if err != nil {
+				return fmt.Errorf("collecting %s failed: %v", c.def.name, err)
+			} else {
+				results[c.def.name] = res
+			}
+		}
+		return nil
+	})
+	if err != nil {
+		return err
+	}
+
+	b.setDefaults()
+	for _, c := range b.collectors {
+		for _, m := range results[c.def.name] {
+			klog.Infof("Setting %s (%v) to %d", c.def.name, m.Labels, m.Count)
+			c.gauge.With(m.Labels).Set(float64(m.Count))
+		}
+	}
+
+	return nil
+}