blob: 68de363440381cafd28ba5d1799a526ce6669da2 [file] [log] [blame]
package scruffy
import (
"context"
"errors"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/klog/v2"
"source.monogon.dev/cloud/bmaas/bmdb/model"
)
// bmdbStatsRunner collects metrics from the BMDB and exposes them as Prometheus
// metrics via a registry passed to newBMDBStatsRunner.
type bmdbStatsRunner struct {
s *Server
collectors []*statsCollector
}
// A statsCollectorDefinition describes how to gather a given metric via a BMDB
// SQL query.
type statsCollectorDefinition struct {
// name of the metric. Used in actual metric name, prefixed with 'bmdb_stats_'.
name string
// help string emitted in prometheus endpoint.
help string
// labels is the label 'type definition', containing information about the
// dimensions of this metric.
labels labelDefinitions
// query used to retrieve the metric data.
query func(*model.Queries, context.Context) ([]model.MetricValue, error)
}
// labelProcess is the type definition of the 'process' label 'type', which is a
// fixed-cardinality representation of the database Process enum.
var labelProcess = labelDefinition{
name: "process",
initialValues: []string{
string(model.ProcessShepherdAccess),
string(model.ProcessShepherdAgentStart),
string(model.ProcessShepherdRecovery),
},
}
var collectorDefs = []statsCollectorDefinition{
{
name: "active_backoffs",
help: "Number of active backoffs, partitioned by process. There may be more than one active backoff per machine.",
query: model.WrapLabeledMetric((*model.Queries).CountActiveBackoffs),
labels: []labelDefinition{labelProcess},
},
{
name: "active_work",
help: "Number of active work, partitioned by process. There may be more than one active work item per machine.",
query: model.WrapLabeledMetric((*model.Queries).CountActiveWork),
labels: []labelDefinition{labelProcess},
},
{
name: "machines",
help: "Number of machines in the BMDB.",
query: model.WrapSimpleMetric((*model.Queries).CountMachines),
},
{
name: "machines_provided",
help: "Number of provided machines in the BMDB.",
query: model.WrapSimpleMetric((*model.Queries).CountMachinesProvided),
},
{
name: "machines_heartbeating",
help: "Number of machines with a currently heartbeating agent.",
query: model.WrapSimpleMetric((*model.Queries).CountMachinesAgentHeartbeating),
},
{
name: "machines_pending_installation",
help: "Number of machines pending installation.",
query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationPending),
},
{
name: "machines_installed",
help: "Number of machines succesfully installed.",
query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationComplete),
},
{
name: "machines_pending_agent_start",
help: "Number of machines pending the agent start workflow.",
query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentStart),
},
{
name: "machines_pending_agent_recovery",
help: "Number of machines pending the agent recovery workflow.",
query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentRecovery),
},
}
// A statsCollector is an instantiated statsCollectorDefinition which carries the
// actual prometheus gauge backing the metric.
type statsCollector struct {
gauge *prometheus.GaugeVec
def *statsCollectorDefinition
}
// setDefaults emits gauges with zero values for all metrics of the runner, using
// the initialLabel data gathered from each metric definition.
func (b *bmdbStatsRunner) setDefaults() {
for _, collector := range b.collectors {
info := collector.def
initial := info.labels.initialLabels()
if len(initial) == 0 {
collector.gauge.With(nil).Set(0.0)
} else {
for _, labels := range initial {
collector.gauge.With(labels).Set(0.0)
}
}
}
}
// newBMDBStatsRunner builds a bmdbStatsRunner from the collectorDefs above. The
// bmdbStatsRunner then has the given's Server BMDB connection bound to it and
// can perform actual database statistic gathering.
func newBMDBStatsRunner(s *Server, reg *prometheus.Registry) *bmdbStatsRunner {
var collectors []*statsCollector
for _, info := range collectorDefs {
info := info
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "bmdb_stats_" + info.name,
Help: info.help,
}, info.labels.names())
reg.MustRegister(gauge)
collectors = append(collectors, &statsCollector{
gauge: gauge,
def: &info,
})
}
res := &bmdbStatsRunner{
s: s,
collectors: collectors,
}
res.setDefaults()
return res
}
func (b *bmdbStatsRunner) run(ctx context.Context) {
klog.Infof("Starting stats runner...")
ti := time.NewTicker(b.s.Config.StatsRunnerRate)
for {
err := b.runOnce(ctx)
if err != nil {
if errors.Is(err, ctx.Err()) {
return
}
klog.Errorf("Stats run failed: %v", err)
}
select {
case <-ti.C:
case <-ctx.Done():
klog.Infof("Exiting stats runner (%v)...", ctx.Err())
return
}
}
}
func (b *bmdbStatsRunner) runOnce(ctx context.Context) error {
sess, err := b.s.session(ctx)
if err != nil {
return err
}
results := make(map[string][]model.MetricValue)
// TODO(q3k): don't fail entire run if we can't collect just one metric.
err = sess.Transact(ctx, func(q *model.Queries) error {
for _, c := range b.collectors {
res, err := c.def.query(q, ctx)
if err != nil {
return fmt.Errorf("collecting %s failed: %v", c.def.name, err)
} else {
results[c.def.name] = res
}
}
return nil
})
if err != nil {
return err
}
b.setDefaults()
for _, c := range b.collectors {
for _, m := range results[c.def.name] {
klog.Infof("Setting %s (%v) to %d", c.def.name, m.Labels, m.Count)
c.gauge.With(m.Labels).Set(float64(m.Count))
}
}
return nil
}