cloud/bmaas/bmdb/scruffy: initialize, implement BMDB metrics

This creates a new BMaaS component, Scruffy the Janitor.

Scruffy will run a bunch of housekeeping jobs that aren't tied to a
particular provider or even region. Currently Scruffy just collects BMDB
metrics by periodically polling the BMDB SQL database.

Change-Id: Icafa714811757eaaf31fed43184ded8512bde067
Reviewed-on: https://review.monogon.dev/c/monogon/+/1819
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/bmaas/scruffy/BUILD.bazel b/cloud/bmaas/scruffy/BUILD.bazel
new file mode 100644
index 0000000..4934ad2
--- /dev/null
+++ b/cloud/bmaas/scruffy/BUILD.bazel
@@ -0,0 +1,48 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+    name = "scruffy",
+    srcs = [
+        "bmdb_stats.go",
+        "hw_stats.go",
+        "labels.go",
+        "server.go",
+    ],
+    importpath = "source.monogon.dev/cloud/bmaas/scruffy",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//cloud/bmaas/bmdb",
+        "//cloud/bmaas/bmdb/metrics",
+        "//cloud/bmaas/bmdb/model",
+        "//cloud/bmaas/bmdb/webug",
+        "//cloud/bmaas/server/api",
+        "//cloud/lib/component",
+        "//go/algorithm/cartesian",
+        "@com_github_cenkalti_backoff_v4//:backoff",
+        "@com_github_google_uuid//:uuid",
+        "@com_github_prometheus_client_golang//prometheus",
+        "@io_k8s_klog_v2//:klog",
+        "@org_golang_google_protobuf//proto",
+    ],
+)
+
+go_test(
+    name = "scruffy_test",
+    srcs = [
+        "bmdb_stats_test.go",
+        "hw_stats_test.go",
+    ],
+    data = [
+        "@cockroach",
+    ],
+    embed = [":scruffy"],
+    deps = [
+        "//cloud/agent/api",
+        "//cloud/bmaas/bmdb",
+        "//cloud/bmaas/bmdb/model",
+        "//cloud/bmaas/server/api",
+        "//cloud/lib/component",
+        "@com_github_prometheus_client_golang//prometheus",
+        "@org_golang_google_protobuf//proto",
+    ],
+)
diff --git a/cloud/bmaas/scruffy/bmdb_stats.go b/cloud/bmaas/scruffy/bmdb_stats.go
new file mode 100644
index 0000000..68de363
--- /dev/null
+++ b/cloud/bmaas/scruffy/bmdb_stats.go
@@ -0,0 +1,202 @@
+package scruffy
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+)
+
+// bmdbStatsRunner collects metrics from the BMDB and exposes them as Prometheus
+// metrics via a registry passed to newBMDBStatsRunner.
+type bmdbStatsRunner struct {
+	s          *Server
+	collectors []*statsCollector
+}
+
+// A statsCollectorDefinition describes how to gather a given metric via a BMDB
+// SQL query.
+type statsCollectorDefinition struct {
+	// name of the metric. Used in actual metric name, prefixed with 'bmdb_stats_'.
+	name string
+	// help string emitted in prometheus endpoint.
+	help string
+	// labels is the label 'type definition', containing information about the
+	// dimensions of this metric.
+	labels labelDefinitions
+	// query used to retrieve the metric data.
+	query func(*model.Queries, context.Context) ([]model.MetricValue, error)
+}
+
+// labelProcess is the type definition of the 'process' label 'type', which is a
+// fixed-cardinality representation of the database Process enum.
+var labelProcess = labelDefinition{
+	name: "process",
+	initialValues: []string{
+		string(model.ProcessShepherdAccess),
+		string(model.ProcessShepherdAgentStart),
+		string(model.ProcessShepherdRecovery),
+	},
+}
+
+var collectorDefs = []statsCollectorDefinition{
+	{
+		name:   "active_backoffs",
+		help:   "Number of active backoffs, partitioned by process. There may be more than one active backoff per machine.",
+		query:  model.WrapLabeledMetric((*model.Queries).CountActiveBackoffs),
+		labels: []labelDefinition{labelProcess},
+	},
+	{
+		name:   "active_work",
+		help:   "Number of active work, partitioned by process. There may be more than one active work item per machine.",
+		query:  model.WrapLabeledMetric((*model.Queries).CountActiveWork),
+		labels: []labelDefinition{labelProcess},
+	},
+	{
+		name:  "machines",
+		help:  "Number of machines in the BMDB.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachines),
+	},
+	{
+		name:  "machines_provided",
+		help:  "Number of provided machines in the BMDB.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesProvided),
+	},
+	{
+		name:  "machines_heartbeating",
+		help:  "Number of machines with a currently heartbeating agent.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesAgentHeartbeating),
+	},
+	{
+		name:  "machines_pending_installation",
+		help:  "Number of machines pending installation.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationPending),
+	},
+	{
+		name:  "machines_installed",
+		help:  "Number of machines succesfully installed.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationComplete),
+	},
+	{
+		name:  "machines_pending_agent_start",
+		help:  "Number of machines pending the agent start workflow.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentStart),
+	},
+	{
+		name:  "machines_pending_agent_recovery",
+		help:  "Number of machines pending the agent recovery workflow.",
+		query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentRecovery),
+	},
+}
+
+// A statsCollector is an instantiated statsCollectorDefinition which carries the
+// actual prometheus gauge backing the metric.
+type statsCollector struct {
+	gauge *prometheus.GaugeVec
+	def   *statsCollectorDefinition
+}
+
+// setDefaults emits gauges with zero values for all metrics of the runner, using
+// the initialLabel data gathered from each metric definition.
+func (b *bmdbStatsRunner) setDefaults() {
+	for _, collector := range b.collectors {
+		info := collector.def
+		initial := info.labels.initialLabels()
+		if len(initial) == 0 {
+			collector.gauge.With(nil).Set(0.0)
+		} else {
+			for _, labels := range initial {
+				collector.gauge.With(labels).Set(0.0)
+			}
+		}
+	}
+}
+
+// newBMDBStatsRunner builds a bmdbStatsRunner from the collectorDefs above. The
+// bmdbStatsRunner then has the given's Server BMDB connection bound to it and
+// can perform actual database statistic gathering.
+func newBMDBStatsRunner(s *Server, reg *prometheus.Registry) *bmdbStatsRunner {
+	var collectors []*statsCollector
+
+	for _, info := range collectorDefs {
+		info := info
+		gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "bmdb_stats_" + info.name,
+			Help: info.help,
+		}, info.labels.names())
+		reg.MustRegister(gauge)
+
+		collectors = append(collectors, &statsCollector{
+			gauge: gauge,
+			def:   &info,
+		})
+	}
+
+	res := &bmdbStatsRunner{
+		s:          s,
+		collectors: collectors,
+	}
+	res.setDefaults()
+	return res
+}
+
+func (b *bmdbStatsRunner) run(ctx context.Context) {
+	klog.Infof("Starting stats runner...")
+
+	ti := time.NewTicker(b.s.Config.StatsRunnerRate)
+
+	for {
+		err := b.runOnce(ctx)
+		if err != nil {
+			if errors.Is(err, ctx.Err()) {
+				return
+			}
+			klog.Errorf("Stats run failed: %v", err)
+		}
+		select {
+		case <-ti.C:
+		case <-ctx.Done():
+			klog.Infof("Exiting stats runner (%v)...", ctx.Err())
+			return
+		}
+	}
+}
+
+func (b *bmdbStatsRunner) runOnce(ctx context.Context) error {
+	sess, err := b.s.session(ctx)
+	if err != nil {
+		return err
+	}
+
+	results := make(map[string][]model.MetricValue)
+	// TODO(q3k): don't fail entire run if we can't collect just one metric.
+	err = sess.Transact(ctx, func(q *model.Queries) error {
+		for _, c := range b.collectors {
+			res, err := c.def.query(q, ctx)
+			if err != nil {
+				return fmt.Errorf("collecting %s failed: %v", c.def.name, err)
+			} else {
+				results[c.def.name] = res
+			}
+		}
+		return nil
+	})
+	if err != nil {
+		return err
+	}
+
+	b.setDefaults()
+	for _, c := range b.collectors {
+		for _, m := range results[c.def.name] {
+			klog.Infof("Setting %s (%v) to %d", c.def.name, m.Labels, m.Count)
+			c.gauge.With(m.Labels).Set(float64(m.Count))
+		}
+	}
+
+	return nil
+}
diff --git a/cloud/bmaas/scruffy/bmdb_stats_test.go b/cloud/bmaas/scruffy/bmdb_stats_test.go
new file mode 100644
index 0000000..89d5b5e
--- /dev/null
+++ b/cloud/bmaas/scruffy/bmdb_stats_test.go
@@ -0,0 +1,169 @@
+package scruffy
+
+import (
+	"context"
+	"fmt"
+	"sort"
+	"strings"
+	"testing"
+
+	"github.com/prometheus/client_golang/prometheus"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	"source.monogon.dev/cloud/lib/component"
+)
+
+func TestBMDBStats(t *testing.T) {
+	s := Server{
+		Config: Config{
+			BMDB: bmdb.BMDB{
+				Config: bmdb.Config{
+					Database: component.CockroachConfig{
+						InMemory: true,
+					},
+				},
+			},
+		},
+	}
+
+	registry := prometheus.NewRegistry()
+	runner := newBMDBStatsRunner(&s, registry)
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	expect := func(wantValues map[string]int64) {
+		t.Helper()
+		res, err := registry.Gather()
+		if err != nil {
+			t.Fatalf("Gather: %v", err)
+		}
+		gotValues := make(map[string]bool)
+		for _, mf := range res {
+			if len(mf.Metric) != 1 {
+				for _, m := range mf.Metric {
+					var lvs []string
+					for _, lp := range m.Label {
+						lvs = append(lvs, fmt.Sprintf("%s=%s", *lp.Name, *lp.Value))
+					}
+					sort.Strings(lvs)
+					name := fmt.Sprintf("%s[%s]", *mf.Name, strings.Join(lvs, ","))
+					gotValues[name] = true
+					if _, ok := wantValues[name]; !ok {
+						t.Errorf("MetricFamily %s: unexpected", name)
+					}
+					if want, got := wantValues[name], int64(*m.Gauge.Value); want != got {
+						t.Errorf("MetricFamily %s: wanted %d, got %d", *mf.Name, want, got)
+					}
+				}
+			} else {
+				m := mf.Metric[0]
+				gotValues[*mf.Name] = true
+				if want, got := wantValues[*mf.Name], int64(*m.Gauge.Value); want != got {
+					t.Errorf("MetricFamily %s: wanted %d, got %d", *mf.Name, want, got)
+				}
+				if _, ok := wantValues[*mf.Name]; !ok {
+					t.Errorf("MetricFamily %s: unexpected", *mf.Name)
+				}
+			}
+		}
+		for mf, _ := range wantValues {
+			if !gotValues[mf] {
+				t.Errorf("MetricFamily %s: missing", mf)
+			}
+		}
+	}
+
+	expect(map[string]int64{
+		"bmdb_stats_machines":                                    0,
+		"bmdb_stats_machines_provided":                           0,
+		"bmdb_stats_machines_heartbeating":                       0,
+		"bmdb_stats_machines_pending_installation":               0,
+		"bmdb_stats_machines_installed":                          0,
+		"bmdb_stats_machines_pending_agent_start":                0,
+		"bmdb_stats_machines_pending_agent_recovery":             0,
+		"bmdb_stats_active_backoffs[process=ShepherdAccess]":     0,
+		"bmdb_stats_active_backoffs[process=ShepherdAgentStart]": 0,
+		"bmdb_stats_active_backoffs[process=ShepherdRecovery]":   0,
+		"bmdb_stats_active_work[process=ShepherdAccess]":         0,
+		"bmdb_stats_active_work[process=ShepherdAgentStart]":     0,
+		"bmdb_stats_active_work[process=ShepherdRecovery]":       0,
+	})
+
+	conn, err := s.Config.BMDB.Open(true)
+	if err != nil {
+		t.Fatalf("Open: %v", err)
+	}
+	sess, err := conn.StartSession(ctx)
+	if err != nil {
+		t.Fatalf("StartSession: %v", err)
+	}
+
+	s.bmdb = conn
+	s.sessionC = make(chan *bmdb.Session)
+	go s.sessionWorker(ctx)
+	if err := runner.runOnce(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	expect(map[string]int64{
+		"bmdb_stats_machines":                                    0,
+		"bmdb_stats_machines_provided":                           0,
+		"bmdb_stats_machines_heartbeating":                       0,
+		"bmdb_stats_machines_pending_installation":               0,
+		"bmdb_stats_machines_installed":                          0,
+		"bmdb_stats_machines_pending_agent_start":                0,
+		"bmdb_stats_machines_pending_agent_recovery":             0,
+		"bmdb_stats_active_backoffs[process=ShepherdAccess]":     0,
+		"bmdb_stats_active_backoffs[process=ShepherdAgentStart]": 0,
+		"bmdb_stats_active_backoffs[process=ShepherdRecovery]":   0,
+		"bmdb_stats_active_work[process=ShepherdAccess]":         0,
+		"bmdb_stats_active_work[process=ShepherdAgentStart]":     0,
+		"bmdb_stats_active_work[process=ShepherdRecovery]":       0,
+	})
+
+	f := fill().
+		// Provided, needs installation.
+		machine().providedE("1").build().
+		// Three machines needing recovery.
+		machine().providedE("2").agentNeverHeartbeat().build().
+		machine().providedE("3").agentNeverHeartbeat().build().
+		machine().providedE("4").agentNeverHeartbeat().build().
+		// One machine correctly heartbeating.
+		machine().providedE("5").agentHealthy().build().
+		// Two machines heartbeating and pending installation.
+		machine().providedE("6").agentHealthy().installRequested(10).build().
+		machine().providedE("7").agentHealthy().installRequested(10).installReported(9).build().
+		// Machine which is pending installation _and_ recovery.
+		machine().providedE("8").agentNeverHeartbeat().installRequested(10).build().
+		// Machine which has been successfully installed.
+		machine().providedE("9").agentStoppedHeartbeating().installRequested(10).installReported(10).build()
+
+	err = sess.Transact(ctx, func(q *model.Queries) error {
+		return f(ctx, q)
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := runner.runOnce(ctx); err != nil {
+		t.Fatal(err)
+	}
+
+	expect(map[string]int64{
+		"bmdb_stats_machines":                                    9,
+		"bmdb_stats_machines_provided":                           9,
+		"bmdb_stats_machines_heartbeating":                       3,
+		"bmdb_stats_machines_pending_installation":               3,
+		"bmdb_stats_machines_installed":                          1,
+		"bmdb_stats_machines_pending_agent_start":                1,
+		"bmdb_stats_machines_pending_agent_recovery":             4,
+		"bmdb_stats_active_backoffs[process=ShepherdAccess]":     0,
+		"bmdb_stats_active_backoffs[process=ShepherdAgentStart]": 0,
+		"bmdb_stats_active_backoffs[process=ShepherdRecovery]":   0,
+		"bmdb_stats_active_work[process=ShepherdAccess]":         0,
+		"bmdb_stats_active_work[process=ShepherdAgentStart]":     0,
+		"bmdb_stats_active_work[process=ShepherdRecovery]":       0,
+	})
+}
diff --git a/cloud/bmaas/scruffy/cmd/BUILD.bazel b/cloud/bmaas/scruffy/cmd/BUILD.bazel
new file mode 100644
index 0000000..5e050bf
--- /dev/null
+++ b/cloud/bmaas/scruffy/cmd/BUILD.bazel
@@ -0,0 +1,18 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+
+go_library(
+    name = "cmd_lib",
+    srcs = ["main.go"],
+    importpath = "source.monogon.dev/cloud/bmaas/scruffy/cmd",
+    visibility = ["//visibility:private"],
+    deps = [
+        "//cloud/bmaas/scruffy",
+        "//metropolis/cli/pkg/context",
+    ],
+)
+
+go_binary(
+    name = "cmd",
+    embed = [":cmd_lib"],
+    visibility = ["//visibility:public"],
+)
diff --git a/cloud/bmaas/scruffy/cmd/main.go b/cloud/bmaas/scruffy/cmd/main.go
new file mode 100644
index 0000000..e838834
--- /dev/null
+++ b/cloud/bmaas/scruffy/cmd/main.go
@@ -0,0 +1,19 @@
+package main
+
+import (
+	"context"
+	"flag"
+
+	"source.monogon.dev/cloud/bmaas/scruffy"
+	clicontext "source.monogon.dev/metropolis/cli/pkg/context"
+)
+
+func main() {
+	s := &scruffy.Server{}
+	s.Config.RegisterFlags()
+	flag.Parse()
+
+	ctx := clicontext.WithInterrupt(context.Background())
+	s.Start(ctx)
+	<-ctx.Done()
+}
diff --git a/cloud/bmaas/scruffy/hw_stats.go b/cloud/bmaas/scruffy/hw_stats.go
new file mode 100644
index 0000000..22dd247
--- /dev/null
+++ b/cloud/bmaas/scruffy/hw_stats.go
@@ -0,0 +1,156 @@
+package scruffy
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"time"
+
+	"github.com/google/uuid"
+	"github.com/prometheus/client_golang/prometheus"
+	"google.golang.org/protobuf/proto"
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	"source.monogon.dev/cloud/bmaas/server/api"
+)
+
+// hwStatsRunner collects metrics from the machine hardware inventory in BMDB and
+// exposes them as Prometheus metrics via a registry passed to newHWStatsRunner.
+type hwStatsRunner struct {
+	s *Server
+
+	memoryPerRegion     *prometheus.GaugeVec
+	cpuThreadsPerRegion *prometheus.GaugeVec
+}
+
+// newHWStatsRunner builds a hwStatsRunner. The hwStatsRunner then has the
+// given's Server BMDB connection bound to it and can perform actual database
+// statistic gathering.
+func newHWStatsRunner(s *Server, reg *prometheus.Registry) *hwStatsRunner {
+	hwsr := &hwStatsRunner{
+		s: s,
+
+		memoryPerRegion: prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "bmdb_hwstats_region_ram_bytes",
+		}, []string{"provider", "location"}),
+
+		cpuThreadsPerRegion: prometheus.NewGaugeVec(prometheus.GaugeOpts{
+			Name: "bmdb_hwstats_region_cpu_threads",
+		}, []string{"provider", "location"}),
+	}
+	reg.MustRegister(hwsr.memoryPerRegion, hwsr.cpuThreadsPerRegion)
+	return hwsr
+}
+
+func (h *hwStatsRunner) run(ctx context.Context) {
+	klog.Infof("Starting stats runner...")
+
+	ti := time.NewTicker(time.Minute)
+
+	for {
+		err := h.runOnce(ctx)
+		if err != nil {
+			if errors.Is(err, ctx.Err()) {
+				return
+			}
+			klog.Errorf("Stats run failed: %v", err)
+		}
+		select {
+		case <-ti.C:
+		case <-ctx.Done():
+			klog.Infof("Exiting stats runner (%v)...", ctx.Err())
+			return
+		}
+	}
+}
+
+// statsPerRegion are gathered and aggregated (summed) per region.
+type statsPerRegion struct {
+	ramBytes   uint64
+	numThreads uint64
+}
+
+// add a given AgentHardwareReport to this region's data.
+func (s *statsPerRegion) add(hwrep *api.AgentHardwareReport) {
+	s.ramBytes += uint64(hwrep.Report.MemoryInstalledBytes)
+	for _, cpu := range hwrep.Report.Cpu {
+		s.numThreads += uint64(cpu.HardwareThreads)
+	}
+}
+
+// regionKey is used to uniquely identify each region per each provider.
+type regionKey struct {
+	provider model.Provider
+	location string
+}
+
+func (r *regionKey) String() string {
+	return fmt.Sprintf("%s/%s", r.provider, r.location)
+}
+
+func (h *hwStatsRunner) runOnce(ctx context.Context) error {
+	sess, err := h.s.session(ctx)
+	if err != nil {
+		return err
+	}
+
+	var start uuid.UUID
+
+	perRegion := make(map[regionKey]*statsPerRegion)
+	var total statsPerRegion
+
+	for {
+		var res []model.ListMachineHardwareRow
+		err = sess.Transact(ctx, func(q *model.Queries) error {
+			res, err = q.ListMachineHardware(ctx, model.ListMachineHardwareParams{
+				Limit:     100,
+				MachineID: start,
+			})
+			return err
+		})
+		if err != nil {
+			return err
+		}
+		klog.Infof("Machines: %d chunk", len(res))
+		if len(res) == 0 {
+			break
+		}
+		for _, row := range res {
+			var hwrep api.AgentHardwareReport
+			err = proto.Unmarshal(row.HardwareReportRaw.([]byte), &hwrep)
+			if err != nil {
+				klog.Warningf("Could not decode hardware report from %s: %v", row.MachineID, err)
+				continue
+			}
+
+			if !row.ProviderLocation.Valid {
+				klog.Warningf("%s has no provider location, skipping", row.MachineID)
+				continue
+			}
+
+			key := regionKey{
+				provider: row.Provider,
+				location: row.ProviderLocation.String,
+			}
+			if _, ok := perRegion[key]; !ok {
+				perRegion[key] = &statsPerRegion{}
+			}
+			perRegion[key].add(&hwrep)
+			total.add(&hwrep)
+
+			start = row.MachineID
+		}
+	}
+
+	for k, st := range perRegion {
+		labels := prometheus.Labels{
+			"provider": string(k.provider),
+			"location": k.location,
+		}
+
+		h.memoryPerRegion.With(labels).Set(float64(st.ramBytes))
+		h.cpuThreadsPerRegion.With(labels).Set(float64(st.numThreads))
+	}
+	return nil
+}
diff --git a/cloud/bmaas/scruffy/hw_stats_test.go b/cloud/bmaas/scruffy/hw_stats_test.go
new file mode 100644
index 0000000..ffb572c
--- /dev/null
+++ b/cloud/bmaas/scruffy/hw_stats_test.go
@@ -0,0 +1,319 @@
+package scruffy
+
+import (
+	"context"
+	"database/sql"
+	"testing"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"google.golang.org/protobuf/proto"
+
+	aapi "source.monogon.dev/cloud/agent/api"
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	"source.monogon.dev/cloud/bmaas/server/api"
+	"source.monogon.dev/cloud/lib/component"
+)
+
+type filler func(ctx context.Context, q *model.Queries) error
+
+func fill() filler {
+	return func(ctx context.Context, q *model.Queries) error {
+		return nil
+	}
+}
+
+func (f filler) chain(n func(ctx context.Context, q *model.Queries) error) filler {
+	return func(ctx context.Context, q *model.Queries) error {
+		if err := f(ctx, q); err != nil {
+			return err
+		}
+		return n(ctx, q)
+	}
+}
+
+type fillerMachine struct {
+	f filler
+
+	provider   *model.Provider
+	providerID *string
+
+	location *string
+
+	threads *int32
+	ramgb   *int64
+
+	agentStartedAt *time.Time
+
+	agentHeartbeatAt *time.Time
+
+	installationRequestGeneration *int64
+
+	installationReportGeneration *int64
+}
+
+func (f filler) machine() *fillerMachine {
+	return &fillerMachine{
+		f: f,
+	}
+}
+
+func (m *fillerMachine) provided(p model.Provider, pid string) *fillerMachine {
+	m.provider = &p
+	m.providerID = &pid
+	return m
+}
+
+func (m *fillerMachine) providedE(pid string) *fillerMachine {
+	return m.provided(model.ProviderEquinix, pid)
+}
+
+func (m *fillerMachine) located(location string) *fillerMachine {
+	m.location = &location
+	return m
+}
+
+func (m *fillerMachine) hardware(threads int32, ramgb int64) *fillerMachine {
+	m.threads = &threads
+	m.ramgb = &ramgb
+	return m
+}
+
+func (m *fillerMachine) agentStarted(t time.Time) *fillerMachine {
+	m.agentStartedAt = &t
+	return m
+}
+
+func (m *fillerMachine) agentHeartbeat(t time.Time) *fillerMachine {
+	m.agentHeartbeatAt = &t
+	return m
+}
+
+func (m *fillerMachine) agentHealthy() *fillerMachine {
+	now := time.Now()
+	return m.agentStarted(now.Add(-30 * time.Minute)).agentHeartbeat(now.Add(-1 * time.Minute))
+}
+
+func (m *fillerMachine) agentStoppedHeartbeating() *fillerMachine {
+	now := time.Now()
+	return m.agentStarted(now.Add(-30 * time.Minute)).agentHeartbeat(now.Add(-20 * time.Minute))
+}
+
+func (m *fillerMachine) agentNeverHeartbeat() *fillerMachine {
+	now := time.Now()
+	return m.agentStarted(now.Add(-30 * time.Minute))
+}
+
+func (m *fillerMachine) installRequested(gen int64) *fillerMachine {
+	m.installationRequestGeneration = &gen
+	return m
+}
+
+func (m *fillerMachine) installReported(gen int64) *fillerMachine {
+	m.installationReportGeneration = &gen
+	return m
+}
+
+func (m *fillerMachine) build() filler {
+	return m.f.chain(func(ctx context.Context, q *model.Queries) error {
+		mach, err := q.NewMachine(ctx)
+		if err != nil {
+			return err
+		}
+		if m.providerID != nil {
+			err = q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+				MachineID:  mach.MachineID,
+				Provider:   *m.provider,
+				ProviderID: *m.providerID,
+			})
+			if err != nil {
+				return err
+			}
+			if m.location != nil {
+				err = q.MachineUpdateProviderStatus(ctx, model.MachineUpdateProviderStatusParams{
+					ProviderID:       *m.providerID,
+					Provider:         *m.provider,
+					ProviderLocation: sql.NullString{Valid: true, String: *m.location},
+				})
+				if err != nil {
+					return err
+				}
+			}
+		}
+		if m.threads != nil {
+			report := api.AgentHardwareReport{
+				Report: &aapi.Node{
+					MemoryInstalledBytes: *m.ramgb << 30,
+					MemoryUsableRatio:    1.0,
+					Cpu: []*aapi.CPU{
+						{
+							HardwareThreads: *m.threads,
+							Cores:           *m.threads,
+						},
+					},
+				},
+				Warning: nil,
+			}
+			raw, err := proto.Marshal(&report)
+			if err != nil {
+				return err
+			}
+			err = q.MachineSetHardwareReport(ctx, model.MachineSetHardwareReportParams{
+				MachineID:         mach.MachineID,
+				HardwareReportRaw: raw,
+			})
+			if err != nil {
+				return err
+			}
+		}
+		if m.agentStartedAt != nil {
+			err = q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+				MachineID:      mach.MachineID,
+				AgentStartedAt: *m.agentStartedAt,
+				AgentPublicKey: []byte("fakefakefake"),
+			})
+			if err != nil {
+				return err
+			}
+		}
+		if m.agentHeartbeatAt != nil {
+			err = q.MachineSetAgentHeartbeat(ctx, model.MachineSetAgentHeartbeatParams{
+				MachineID:        mach.MachineID,
+				AgentHeartbeatAt: *m.agentHeartbeatAt,
+			})
+			if err != nil {
+				return err
+			}
+		}
+		if m.installationRequestGeneration != nil {
+			err = q.MachineSetOSInstallationRequest(ctx, model.MachineSetOSInstallationRequestParams{
+				MachineID:  mach.MachineID,
+				Generation: *m.installationRequestGeneration,
+			})
+			if err != nil {
+				return err
+			}
+		}
+		if m.installationReportGeneration != nil {
+			err = q.MachineSetOSInstallationReport(ctx, model.MachineSetOSInstallationReportParams{
+				MachineID:  mach.MachineID,
+				Generation: *m.installationReportGeneration,
+			})
+			if err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+}
+
+func TestHWStats(t *testing.T) {
+	s := Server{
+		Config: Config{
+			BMDB: bmdb.BMDB{
+				Config: bmdb.Config{
+					Database: component.CockroachConfig{
+						InMemory: true,
+					},
+				},
+			},
+		},
+	}
+
+	registry := prometheus.NewRegistry()
+	runner := newHWStatsRunner(&s, registry)
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	res, err := registry.Gather()
+	if err != nil {
+		t.Fatalf("Gather: %v", err)
+	}
+	if want, got := 0, len(res); want != got {
+		t.Fatalf("Expected no metrics with empty database, got %d", got)
+	}
+
+	conn, err := s.Config.BMDB.Open(true)
+	if err != nil {
+		t.Fatalf("Open: %v", err)
+	}
+	sess, err := conn.StartSession(ctx)
+	if err != nil {
+		t.Fatalf("StartSession: %v", err)
+	}
+	// Populate database with some test data.
+	err = sess.Transact(ctx, func(q *model.Queries) error {
+		f := fill().
+			machine().provided(model.ProviderEquinix, "1").hardware(32, 256).located("dark-bramble").build().
+			machine().provided(model.ProviderEquinix, "2").hardware(32, 256).located("dark-bramble").build().
+			machine().provided(model.ProviderEquinix, "3").hardware(32, 256).located("dark-bramble").build().
+			machine().provided(model.ProviderEquinix, "4").hardware(32, 256).located("brittle-hollow").build().
+			machine().provided(model.ProviderEquinix, "5").hardware(32, 256).located("timber-hearth").build().
+			machine().provided(model.ProviderEquinix, "6").hardware(32, 256).located("timber-hearth").build()
+		return f(ctx, q)
+	})
+	if err != nil {
+		t.Fatalf("Transact: %v", err)
+	}
+
+	s.bmdb = conn
+	s.sessionC = make(chan *bmdb.Session)
+	go s.sessionWorker(ctx)
+
+	// Do a statistics run and check results.
+	if err := runner.runOnce(ctx); err != nil {
+		t.Fatalf("runOnce: %v", err)
+	}
+
+	mfs, err := registry.Gather()
+	if err != nil {
+		t.Fatalf("Gatcher: %v", err)
+	}
+
+	// metric name -> provider -> location -> value
+	values := make(map[string]map[string]map[string]float64)
+	for _, mf := range mfs {
+		values[*mf.Name] = make(map[string]map[string]float64)
+		for _, m := range mf.Metric {
+			var provider, location string
+			for _, pair := range m.Label {
+				switch *pair.Name {
+				case "location":
+					location = *pair.Value
+				case "provider":
+					provider = *pair.Value
+				}
+			}
+			if _, ok := values[*mf.Name][provider]; !ok {
+				values[*mf.Name][provider] = make(map[string]float64)
+			}
+			switch {
+			case m.Gauge != nil && m.Gauge.Value != nil:
+				values[*mf.Name][provider][location] = *m.Gauge.Value
+			}
+		}
+	}
+
+	for _, te := range []struct {
+		provider model.Provider
+		location string
+		threads  int32
+		ramgb    int64
+	}{
+		{model.ProviderEquinix, "dark-bramble", 96, 768},
+		{model.ProviderEquinix, "brittle-hollow", 32, 256},
+		{model.ProviderEquinix, "timber-hearth", 64, 512},
+	} {
+		threads := values["bmdb_hwstats_region_cpu_threads"][string(te.provider)][te.location]
+		bytes := values["bmdb_hwstats_region_ram_bytes"][string(te.provider)][te.location]
+
+		if want, got := te.threads, int32(threads); want != got {
+			t.Errorf("Wanted %d threads in %s/%s, got %d", want, te.provider, te.location, got)
+		}
+		if want, got := te.ramgb, int64(bytes)>>30; want != got {
+			t.Errorf("Wanted %d GB RAM in %s/%s, got %d", want, te.provider, te.location, got)
+		}
+	}
+}
diff --git a/cloud/bmaas/scruffy/labels.go b/cloud/bmaas/scruffy/labels.go
new file mode 100644
index 0000000..b88f799
--- /dev/null
+++ b/cloud/bmaas/scruffy/labels.go
@@ -0,0 +1,94 @@
+package scruffy
+
+import (
+	"github.com/prometheus/client_golang/prometheus"
+
+	"source.monogon.dev/go/algorithm/cartesian"
+)
+
+// A labelDefinition describes a key/value pair that's a metric dimension. It
+// consists of the label key/name (a string), and a list of possible values of
+// this key. The list of values will be used to initialize the metrics at startup
+// with zero values.
+//
+// The initialValues system is intended to be used with labels that are
+// low-cardinality enums, e.g. the name of a subsystem.
+//
+// All labelDefinitions for a single metric will then create a cartesian product
+// of all initialValues.
+type labelDefinition struct {
+	// name/key of the label.
+	name string
+	// initialValues defines the default values for this label key/name that will be
+	// used to generate a list of initial zero-filled metrics.
+	initialValues []string
+}
+
+// labelDefinitions is a list of labelDefinition which define the label
+// dimensions of a metric. All the initialValues of the respective
+// labelDefinitions will create a cartesian set of default zero-filled metric
+// values when the metric susbsystem gets initialized. These zero values will
+// then get overridden by real data as it is collected.
+type labelDefinitions []labelDefinition
+
+// initialLabels generates the list of initial labels key/values that should be
+// used to generate zero-filled metrics on startup. This is a cartesian product
+// of all initialValues of all labelDefinitions.
+func (l labelDefinitions) initialLabels() []prometheus.Labels {
+	// Nothing to do if this is an empty labelDefinitions.
+	if len(l) == 0 {
+		return nil
+	}
+
+	// Given:
+	//
+	// labelDefinitions := []labelDefinition{
+	//    { name: "a", initialValues: []string{"foo", "bar"}},
+	//    { name: "b", initialValues: []string{"baz", "barf"}},
+	// }
+	//
+	// This creates:
+	//
+	// values := []string{
+	//    { "foo", "bar" }, // label 'a'
+	//    { "baz", "barf" }, // label 'b'
+	// }
+	var values [][]string
+	for _, ld := range l {
+		values = append(values, ld.initialValues)
+	}
+
+	// Given the above:
+	//
+	// valuesProduct := []string{
+	//    //  a      b
+	//    { "foo", "baz" },
+	//    { "foo", "barf" },
+	//    { "bar", "baz" },
+	//    { "bar", "barf" },
+	// }
+	valuesProduct := cartesian.Product[string](values...)
+
+	// This converts valuesProduct into an actual prometheus-compatible type,
+	// re-attaching the label names back into the columns as seen above.
+	var res []prometheus.Labels
+	for _, vp := range valuesProduct {
+		labels := make(prometheus.Labels)
+		for i, lv := range vp {
+			labelDef := l[i]
+			labels[labelDef.name] = lv
+		}
+		res = append(res, labels)
+	}
+	return res
+}
+
+// names returns the keys/names of all the metric labels from these
+// labelDefinitions.
+func (l labelDefinitions) names() []string {
+	var res []string
+	for _, ld := range l {
+		res = append(res, ld.name)
+	}
+	return res
+}
diff --git a/cloud/bmaas/scruffy/server.go b/cloud/bmaas/scruffy/server.go
new file mode 100644
index 0000000..58e0f51
--- /dev/null
+++ b/cloud/bmaas/scruffy/server.go
@@ -0,0 +1,127 @@
+// Package scruffy implements Scruffy, The Janitor.
+//
+// Scruffy is a BMaaS component which runs a bunch of important, housekeeping-ish
+// processes that aren't tied to any particular provider and are mostly
+// batch-oriented.
+//
+// Currently Scruffy just collects metrics from the BMDB.
+package scruffy
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/cenkalti/backoff/v4"
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/metrics"
+	"source.monogon.dev/cloud/bmaas/bmdb/webug"
+	"source.monogon.dev/cloud/lib/component"
+)
+
+type Config struct {
+	Component component.ComponentConfig
+	BMDB      bmdb.BMDB
+	Webug     webug.Config
+
+	StatsRunnerRate time.Duration
+}
+
+// TODO(q3k): factor this out to BMDB library?
+func runtimeInfo() string {
+	hostname, _ := os.Hostname()
+	if hostname == "" {
+		hostname = "UNKNOWN"
+	}
+	return fmt.Sprintf("host %s", hostname)
+}
+
+func (c *Config) RegisterFlags() {
+	c.Component.RegisterFlags("scruffy")
+	c.BMDB.ComponentName = "scruffy"
+	c.BMDB.RuntimeInfo = runtimeInfo()
+	c.BMDB.Database.RegisterFlags("bmdb")
+	c.Webug.RegisterFlags()
+
+	flag.DurationVar(&c.StatsRunnerRate, "scruffy_stats_collection_rate", time.Minute, "How often the stats collection loop will run against BMDB")
+}
+
+type Server struct {
+	Config Config
+
+	bmdb     *bmdb.Connection
+	sessionC chan *bmdb.Session
+}
+
+func (s *Server) Start(ctx context.Context) {
+	reg := s.Config.Component.PrometheusRegistry()
+	s.Config.BMDB.EnableMetrics(reg)
+	s.Config.Component.StartPrometheus(ctx)
+
+	conn, err := s.Config.BMDB.Open(true)
+	if err != nil {
+		klog.Exitf("Failed to connect to BMDB: %v", err)
+	}
+	s.bmdb = conn
+	s.sessionC = make(chan *bmdb.Session)
+	go s.sessionWorker(ctx)
+
+	bsr := newBMDBStatsRunner(s, reg)
+	go bsr.run(ctx)
+
+	hwr := newHWStatsRunner(s, reg)
+	go hwr.run(ctx)
+
+	go func() {
+		if err := s.Config.Webug.Start(ctx, conn); err != nil && err != ctx.Err() {
+			klog.Exitf("Failed to start webug: %v", err)
+		}
+	}()
+}
+
+// sessionWorker emits a valid BMDB session to sessionC as long as ctx is active.
+//
+// TODO(q3k): factor out into bmdb client lib
+func (s *Server) sessionWorker(ctx context.Context) {
+	var session *bmdb.Session
+	for {
+		if session == nil || session.Expired() {
+			klog.Infof("Starting new session...")
+			bo := backoff.NewExponentialBackOff()
+			err := backoff.Retry(func() error {
+				var err error
+				session, err = s.bmdb.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorScruffyStats})
+				if err != nil {
+					klog.Errorf("Failed to start session: %v", err)
+					return err
+				} else {
+					return nil
+				}
+			}, backoff.WithContext(bo, ctx))
+			if err != nil {
+				// If something's really wrong just crash.
+				klog.Exitf("Gave up on starting session: %v", err)
+			}
+			klog.Infof("New session: %s", session.UUID)
+		}
+
+		select {
+		case <-ctx.Done():
+			return
+		case s.sessionC <- session:
+		}
+	}
+}
+
+func (s *Server) session(ctx context.Context) (*bmdb.Session, error) {
+	select {
+	case sess := <-s.sessionC:
+		return sess, nil
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	}
+}