cloud/bmaas/bmdb/scruffy: initialize, implement BMDB metrics
This creates a new BMaaS component, Scruffy the Janitor.
Scruffy will run a bunch of housekeeping jobs that aren't tied to a
particular provider or even region. Currently Scruffy just collects BMDB
metrics by periodically polling the BMDB SQL database.
Change-Id: Icafa714811757eaaf31fed43184ded8512bde067
Reviewed-on: https://review.monogon.dev/c/monogon/+/1819
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/bmaas/scruffy/BUILD.bazel b/cloud/bmaas/scruffy/BUILD.bazel
new file mode 100644
index 0000000..4934ad2
--- /dev/null
+++ b/cloud/bmaas/scruffy/BUILD.bazel
@@ -0,0 +1,48 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "scruffy",
+ srcs = [
+ "bmdb_stats.go",
+ "hw_stats.go",
+ "labels.go",
+ "server.go",
+ ],
+ importpath = "source.monogon.dev/cloud/bmaas/scruffy",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//cloud/bmaas/bmdb",
+ "//cloud/bmaas/bmdb/metrics",
+ "//cloud/bmaas/bmdb/model",
+ "//cloud/bmaas/bmdb/webug",
+ "//cloud/bmaas/server/api",
+ "//cloud/lib/component",
+ "//go/algorithm/cartesian",
+ "@com_github_cenkalti_backoff_v4//:backoff",
+ "@com_github_google_uuid//:uuid",
+ "@com_github_prometheus_client_golang//prometheus",
+ "@io_k8s_klog_v2//:klog",
+ "@org_golang_google_protobuf//proto",
+ ],
+)
+
+go_test(
+ name = "scruffy_test",
+ srcs = [
+ "bmdb_stats_test.go",
+ "hw_stats_test.go",
+ ],
+ data = [
+ "@cockroach",
+ ],
+ embed = [":scruffy"],
+ deps = [
+ "//cloud/agent/api",
+ "//cloud/bmaas/bmdb",
+ "//cloud/bmaas/bmdb/model",
+ "//cloud/bmaas/server/api",
+ "//cloud/lib/component",
+ "@com_github_prometheus_client_golang//prometheus",
+ "@org_golang_google_protobuf//proto",
+ ],
+)
diff --git a/cloud/bmaas/scruffy/bmdb_stats.go b/cloud/bmaas/scruffy/bmdb_stats.go
new file mode 100644
index 0000000..68de363
--- /dev/null
+++ b/cloud/bmaas/scruffy/bmdb_stats.go
@@ -0,0 +1,202 @@
+package scruffy
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+)
+
+// bmdbStatsRunner collects metrics from the BMDB and exposes them as Prometheus
+// metrics via a registry passed to newBMDBStatsRunner.
+type bmdbStatsRunner struct {
+ s *Server
+ collectors []*statsCollector
+}
+
+// A statsCollectorDefinition describes how to gather a given metric via a BMDB
+// SQL query.
+type statsCollectorDefinition struct {
+ // name of the metric. Used in actual metric name, prefixed with 'bmdb_stats_'.
+ name string
+ // help string emitted in prometheus endpoint.
+ help string
+ // labels is the label 'type definition', containing information about the
+ // dimensions of this metric.
+ labels labelDefinitions
+ // query used to retrieve the metric data.
+ query func(*model.Queries, context.Context) ([]model.MetricValue, error)
+}
+
+// labelProcess is the type definition of the 'process' label 'type', which is a
+// fixed-cardinality representation of the database Process enum.
+var labelProcess = labelDefinition{
+ name: "process",
+ initialValues: []string{
+ string(model.ProcessShepherdAccess),
+ string(model.ProcessShepherdAgentStart),
+ string(model.ProcessShepherdRecovery),
+ },
+}
+
+var collectorDefs = []statsCollectorDefinition{
+ {
+ name: "active_backoffs",
+ help: "Number of active backoffs, partitioned by process. There may be more than one active backoff per machine.",
+ query: model.WrapLabeledMetric((*model.Queries).CountActiveBackoffs),
+ labels: []labelDefinition{labelProcess},
+ },
+ {
+ name: "active_work",
+ help: "Number of active work, partitioned by process. There may be more than one active work item per machine.",
+ query: model.WrapLabeledMetric((*model.Queries).CountActiveWork),
+ labels: []labelDefinition{labelProcess},
+ },
+ {
+ name: "machines",
+ help: "Number of machines in the BMDB.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachines),
+ },
+ {
+ name: "machines_provided",
+ help: "Number of provided machines in the BMDB.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesProvided),
+ },
+ {
+ name: "machines_heartbeating",
+ help: "Number of machines with a currently heartbeating agent.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesAgentHeartbeating),
+ },
+ {
+ name: "machines_pending_installation",
+ help: "Number of machines pending installation.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationPending),
+ },
+ {
+ name: "machines_installed",
+ help: "Number of machines succesfully installed.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesInstallationComplete),
+ },
+ {
+ name: "machines_pending_agent_start",
+ help: "Number of machines pending the agent start workflow.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentStart),
+ },
+ {
+ name: "machines_pending_agent_recovery",
+ help: "Number of machines pending the agent recovery workflow.",
+ query: model.WrapSimpleMetric((*model.Queries).CountMachinesForAgentRecovery),
+ },
+}
+
+// A statsCollector is an instantiated statsCollectorDefinition which carries the
+// actual prometheus gauge backing the metric.
+type statsCollector struct {
+ gauge *prometheus.GaugeVec
+ def *statsCollectorDefinition
+}
+
+// setDefaults emits gauges with zero values for all metrics of the runner, using
+// the initialLabel data gathered from each metric definition.
+func (b *bmdbStatsRunner) setDefaults() {
+ for _, collector := range b.collectors {
+ info := collector.def
+ initial := info.labels.initialLabels()
+ if len(initial) == 0 {
+ collector.gauge.With(nil).Set(0.0)
+ } else {
+ for _, labels := range initial {
+ collector.gauge.With(labels).Set(0.0)
+ }
+ }
+ }
+}
+
+// newBMDBStatsRunner builds a bmdbStatsRunner from the collectorDefs above. The
+// bmdbStatsRunner then has the given's Server BMDB connection bound to it and
+// can perform actual database statistic gathering.
+func newBMDBStatsRunner(s *Server, reg *prometheus.Registry) *bmdbStatsRunner {
+ var collectors []*statsCollector
+
+ for _, info := range collectorDefs {
+ info := info
+ gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "bmdb_stats_" + info.name,
+ Help: info.help,
+ }, info.labels.names())
+ reg.MustRegister(gauge)
+
+ collectors = append(collectors, &statsCollector{
+ gauge: gauge,
+ def: &info,
+ })
+ }
+
+ res := &bmdbStatsRunner{
+ s: s,
+ collectors: collectors,
+ }
+ res.setDefaults()
+ return res
+}
+
+func (b *bmdbStatsRunner) run(ctx context.Context) {
+ klog.Infof("Starting stats runner...")
+
+ ti := time.NewTicker(b.s.Config.StatsRunnerRate)
+
+ for {
+ err := b.runOnce(ctx)
+ if err != nil {
+ if errors.Is(err, ctx.Err()) {
+ return
+ }
+ klog.Errorf("Stats run failed: %v", err)
+ }
+ select {
+ case <-ti.C:
+ case <-ctx.Done():
+ klog.Infof("Exiting stats runner (%v)...", ctx.Err())
+ return
+ }
+ }
+}
+
+func (b *bmdbStatsRunner) runOnce(ctx context.Context) error {
+ sess, err := b.s.session(ctx)
+ if err != nil {
+ return err
+ }
+
+ results := make(map[string][]model.MetricValue)
+ // TODO(q3k): don't fail entire run if we can't collect just one metric.
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ for _, c := range b.collectors {
+ res, err := c.def.query(q, ctx)
+ if err != nil {
+ return fmt.Errorf("collecting %s failed: %v", c.def.name, err)
+ } else {
+ results[c.def.name] = res
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+
+ b.setDefaults()
+ for _, c := range b.collectors {
+ for _, m := range results[c.def.name] {
+ klog.Infof("Setting %s (%v) to %d", c.def.name, m.Labels, m.Count)
+ c.gauge.With(m.Labels).Set(float64(m.Count))
+ }
+ }
+
+ return nil
+}
diff --git a/cloud/bmaas/scruffy/bmdb_stats_test.go b/cloud/bmaas/scruffy/bmdb_stats_test.go
new file mode 100644
index 0000000..89d5b5e
--- /dev/null
+++ b/cloud/bmaas/scruffy/bmdb_stats_test.go
@@ -0,0 +1,169 @@
+package scruffy
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "strings"
+ "testing"
+
+ "github.com/prometheus/client_golang/prometheus"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/lib/component"
+)
+
+func TestBMDBStats(t *testing.T) {
+ s := Server{
+ Config: Config{
+ BMDB: bmdb.BMDB{
+ Config: bmdb.Config{
+ Database: component.CockroachConfig{
+ InMemory: true,
+ },
+ },
+ },
+ },
+ }
+
+ registry := prometheus.NewRegistry()
+ runner := newBMDBStatsRunner(&s, registry)
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ expect := func(wantValues map[string]int64) {
+ t.Helper()
+ res, err := registry.Gather()
+ if err != nil {
+ t.Fatalf("Gather: %v", err)
+ }
+ gotValues := make(map[string]bool)
+ for _, mf := range res {
+ if len(mf.Metric) != 1 {
+ for _, m := range mf.Metric {
+ var lvs []string
+ for _, lp := range m.Label {
+ lvs = append(lvs, fmt.Sprintf("%s=%s", *lp.Name, *lp.Value))
+ }
+ sort.Strings(lvs)
+ name := fmt.Sprintf("%s[%s]", *mf.Name, strings.Join(lvs, ","))
+ gotValues[name] = true
+ if _, ok := wantValues[name]; !ok {
+ t.Errorf("MetricFamily %s: unexpected", name)
+ }
+ if want, got := wantValues[name], int64(*m.Gauge.Value); want != got {
+ t.Errorf("MetricFamily %s: wanted %d, got %d", *mf.Name, want, got)
+ }
+ }
+ } else {
+ m := mf.Metric[0]
+ gotValues[*mf.Name] = true
+ if want, got := wantValues[*mf.Name], int64(*m.Gauge.Value); want != got {
+ t.Errorf("MetricFamily %s: wanted %d, got %d", *mf.Name, want, got)
+ }
+ if _, ok := wantValues[*mf.Name]; !ok {
+ t.Errorf("MetricFamily %s: unexpected", *mf.Name)
+ }
+ }
+ }
+ for mf, _ := range wantValues {
+ if !gotValues[mf] {
+ t.Errorf("MetricFamily %s: missing", mf)
+ }
+ }
+ }
+
+ expect(map[string]int64{
+ "bmdb_stats_machines": 0,
+ "bmdb_stats_machines_provided": 0,
+ "bmdb_stats_machines_heartbeating": 0,
+ "bmdb_stats_machines_pending_installation": 0,
+ "bmdb_stats_machines_installed": 0,
+ "bmdb_stats_machines_pending_agent_start": 0,
+ "bmdb_stats_machines_pending_agent_recovery": 0,
+ "bmdb_stats_active_backoffs[process=ShepherdAccess]": 0,
+ "bmdb_stats_active_backoffs[process=ShepherdAgentStart]": 0,
+ "bmdb_stats_active_backoffs[process=ShepherdRecovery]": 0,
+ "bmdb_stats_active_work[process=ShepherdAccess]": 0,
+ "bmdb_stats_active_work[process=ShepherdAgentStart]": 0,
+ "bmdb_stats_active_work[process=ShepherdRecovery]": 0,
+ })
+
+ conn, err := s.Config.BMDB.Open(true)
+ if err != nil {
+ t.Fatalf("Open: %v", err)
+ }
+ sess, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("StartSession: %v", err)
+ }
+
+ s.bmdb = conn
+ s.sessionC = make(chan *bmdb.Session)
+ go s.sessionWorker(ctx)
+ if err := runner.runOnce(ctx); err != nil {
+ t.Fatal(err)
+ }
+
+ expect(map[string]int64{
+ "bmdb_stats_machines": 0,
+ "bmdb_stats_machines_provided": 0,
+ "bmdb_stats_machines_heartbeating": 0,
+ "bmdb_stats_machines_pending_installation": 0,
+ "bmdb_stats_machines_installed": 0,
+ "bmdb_stats_machines_pending_agent_start": 0,
+ "bmdb_stats_machines_pending_agent_recovery": 0,
+ "bmdb_stats_active_backoffs[process=ShepherdAccess]": 0,
+ "bmdb_stats_active_backoffs[process=ShepherdAgentStart]": 0,
+ "bmdb_stats_active_backoffs[process=ShepherdRecovery]": 0,
+ "bmdb_stats_active_work[process=ShepherdAccess]": 0,
+ "bmdb_stats_active_work[process=ShepherdAgentStart]": 0,
+ "bmdb_stats_active_work[process=ShepherdRecovery]": 0,
+ })
+
+ f := fill().
+ // Provided, needs installation.
+ machine().providedE("1").build().
+ // Three machines needing recovery.
+ machine().providedE("2").agentNeverHeartbeat().build().
+ machine().providedE("3").agentNeverHeartbeat().build().
+ machine().providedE("4").agentNeverHeartbeat().build().
+ // One machine correctly heartbeating.
+ machine().providedE("5").agentHealthy().build().
+ // Two machines heartbeating and pending installation.
+ machine().providedE("6").agentHealthy().installRequested(10).build().
+ machine().providedE("7").agentHealthy().installRequested(10).installReported(9).build().
+ // Machine which is pending installation _and_ recovery.
+ machine().providedE("8").agentNeverHeartbeat().installRequested(10).build().
+ // Machine which has been successfully installed.
+ machine().providedE("9").agentStoppedHeartbeating().installRequested(10).installReported(10).build()
+
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ return f(ctx, q)
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if err := runner.runOnce(ctx); err != nil {
+ t.Fatal(err)
+ }
+
+ expect(map[string]int64{
+ "bmdb_stats_machines": 9,
+ "bmdb_stats_machines_provided": 9,
+ "bmdb_stats_machines_heartbeating": 3,
+ "bmdb_stats_machines_pending_installation": 3,
+ "bmdb_stats_machines_installed": 1,
+ "bmdb_stats_machines_pending_agent_start": 1,
+ "bmdb_stats_machines_pending_agent_recovery": 4,
+ "bmdb_stats_active_backoffs[process=ShepherdAccess]": 0,
+ "bmdb_stats_active_backoffs[process=ShepherdAgentStart]": 0,
+ "bmdb_stats_active_backoffs[process=ShepherdRecovery]": 0,
+ "bmdb_stats_active_work[process=ShepherdAccess]": 0,
+ "bmdb_stats_active_work[process=ShepherdAgentStart]": 0,
+ "bmdb_stats_active_work[process=ShepherdRecovery]": 0,
+ })
+}
diff --git a/cloud/bmaas/scruffy/cmd/BUILD.bazel b/cloud/bmaas/scruffy/cmd/BUILD.bazel
new file mode 100644
index 0000000..5e050bf
--- /dev/null
+++ b/cloud/bmaas/scruffy/cmd/BUILD.bazel
@@ -0,0 +1,18 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+
+go_library(
+ name = "cmd_lib",
+ srcs = ["main.go"],
+ importpath = "source.monogon.dev/cloud/bmaas/scruffy/cmd",
+ visibility = ["//visibility:private"],
+ deps = [
+ "//cloud/bmaas/scruffy",
+ "//metropolis/cli/pkg/context",
+ ],
+)
+
+go_binary(
+ name = "cmd",
+ embed = [":cmd_lib"],
+ visibility = ["//visibility:public"],
+)
diff --git a/cloud/bmaas/scruffy/cmd/main.go b/cloud/bmaas/scruffy/cmd/main.go
new file mode 100644
index 0000000..e838834
--- /dev/null
+++ b/cloud/bmaas/scruffy/cmd/main.go
@@ -0,0 +1,19 @@
+package main
+
+import (
+ "context"
+ "flag"
+
+ "source.monogon.dev/cloud/bmaas/scruffy"
+ clicontext "source.monogon.dev/metropolis/cli/pkg/context"
+)
+
+func main() {
+ s := &scruffy.Server{}
+ s.Config.RegisterFlags()
+ flag.Parse()
+
+ ctx := clicontext.WithInterrupt(context.Background())
+ s.Start(ctx)
+ <-ctx.Done()
+}
diff --git a/cloud/bmaas/scruffy/hw_stats.go b/cloud/bmaas/scruffy/hw_stats.go
new file mode 100644
index 0000000..22dd247
--- /dev/null
+++ b/cloud/bmaas/scruffy/hw_stats.go
@@ -0,0 +1,156 @@
+package scruffy
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/prometheus/client_golang/prometheus"
+ "google.golang.org/protobuf/proto"
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/bmaas/server/api"
+)
+
+// hwStatsRunner collects metrics from the machine hardware inventory in BMDB and
+// exposes them as Prometheus metrics via a registry passed to newHWStatsRunner.
+type hwStatsRunner struct {
+ s *Server
+
+ memoryPerRegion *prometheus.GaugeVec
+ cpuThreadsPerRegion *prometheus.GaugeVec
+}
+
+// newHWStatsRunner builds a hwStatsRunner. The hwStatsRunner then has the
+// given's Server BMDB connection bound to it and can perform actual database
+// statistic gathering.
+func newHWStatsRunner(s *Server, reg *prometheus.Registry) *hwStatsRunner {
+ hwsr := &hwStatsRunner{
+ s: s,
+
+ memoryPerRegion: prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "bmdb_hwstats_region_ram_bytes",
+ }, []string{"provider", "location"}),
+
+ cpuThreadsPerRegion: prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "bmdb_hwstats_region_cpu_threads",
+ }, []string{"provider", "location"}),
+ }
+ reg.MustRegister(hwsr.memoryPerRegion, hwsr.cpuThreadsPerRegion)
+ return hwsr
+}
+
+func (h *hwStatsRunner) run(ctx context.Context) {
+ klog.Infof("Starting stats runner...")
+
+ ti := time.NewTicker(time.Minute)
+
+ for {
+ err := h.runOnce(ctx)
+ if err != nil {
+ if errors.Is(err, ctx.Err()) {
+ return
+ }
+ klog.Errorf("Stats run failed: %v", err)
+ }
+ select {
+ case <-ti.C:
+ case <-ctx.Done():
+ klog.Infof("Exiting stats runner (%v)...", ctx.Err())
+ return
+ }
+ }
+}
+
+// statsPerRegion are gathered and aggregated (summed) per region.
+type statsPerRegion struct {
+ ramBytes uint64
+ numThreads uint64
+}
+
+// add a given AgentHardwareReport to this region's data.
+func (s *statsPerRegion) add(hwrep *api.AgentHardwareReport) {
+ s.ramBytes += uint64(hwrep.Report.MemoryInstalledBytes)
+ for _, cpu := range hwrep.Report.Cpu {
+ s.numThreads += uint64(cpu.HardwareThreads)
+ }
+}
+
+// regionKey is used to uniquely identify each region per each provider.
+type regionKey struct {
+ provider model.Provider
+ location string
+}
+
+func (r *regionKey) String() string {
+ return fmt.Sprintf("%s/%s", r.provider, r.location)
+}
+
+func (h *hwStatsRunner) runOnce(ctx context.Context) error {
+ sess, err := h.s.session(ctx)
+ if err != nil {
+ return err
+ }
+
+ var start uuid.UUID
+
+ perRegion := make(map[regionKey]*statsPerRegion)
+ var total statsPerRegion
+
+ for {
+ var res []model.ListMachineHardwareRow
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ res, err = q.ListMachineHardware(ctx, model.ListMachineHardwareParams{
+ Limit: 100,
+ MachineID: start,
+ })
+ return err
+ })
+ if err != nil {
+ return err
+ }
+ klog.Infof("Machines: %d chunk", len(res))
+ if len(res) == 0 {
+ break
+ }
+ for _, row := range res {
+ var hwrep api.AgentHardwareReport
+ err = proto.Unmarshal(row.HardwareReportRaw.([]byte), &hwrep)
+ if err != nil {
+ klog.Warningf("Could not decode hardware report from %s: %v", row.MachineID, err)
+ continue
+ }
+
+ if !row.ProviderLocation.Valid {
+ klog.Warningf("%s has no provider location, skipping", row.MachineID)
+ continue
+ }
+
+ key := regionKey{
+ provider: row.Provider,
+ location: row.ProviderLocation.String,
+ }
+ if _, ok := perRegion[key]; !ok {
+ perRegion[key] = &statsPerRegion{}
+ }
+ perRegion[key].add(&hwrep)
+ total.add(&hwrep)
+
+ start = row.MachineID
+ }
+ }
+
+ for k, st := range perRegion {
+ labels := prometheus.Labels{
+ "provider": string(k.provider),
+ "location": k.location,
+ }
+
+ h.memoryPerRegion.With(labels).Set(float64(st.ramBytes))
+ h.cpuThreadsPerRegion.With(labels).Set(float64(st.numThreads))
+ }
+ return nil
+}
diff --git a/cloud/bmaas/scruffy/hw_stats_test.go b/cloud/bmaas/scruffy/hw_stats_test.go
new file mode 100644
index 0000000..ffb572c
--- /dev/null
+++ b/cloud/bmaas/scruffy/hw_stats_test.go
@@ -0,0 +1,319 @@
+package scruffy
+
+import (
+ "context"
+ "database/sql"
+ "testing"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "google.golang.org/protobuf/proto"
+
+ aapi "source.monogon.dev/cloud/agent/api"
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/bmaas/server/api"
+ "source.monogon.dev/cloud/lib/component"
+)
+
+type filler func(ctx context.Context, q *model.Queries) error
+
+func fill() filler {
+ return func(ctx context.Context, q *model.Queries) error {
+ return nil
+ }
+}
+
+func (f filler) chain(n func(ctx context.Context, q *model.Queries) error) filler {
+ return func(ctx context.Context, q *model.Queries) error {
+ if err := f(ctx, q); err != nil {
+ return err
+ }
+ return n(ctx, q)
+ }
+}
+
+type fillerMachine struct {
+ f filler
+
+ provider *model.Provider
+ providerID *string
+
+ location *string
+
+ threads *int32
+ ramgb *int64
+
+ agentStartedAt *time.Time
+
+ agentHeartbeatAt *time.Time
+
+ installationRequestGeneration *int64
+
+ installationReportGeneration *int64
+}
+
+func (f filler) machine() *fillerMachine {
+ return &fillerMachine{
+ f: f,
+ }
+}
+
+func (m *fillerMachine) provided(p model.Provider, pid string) *fillerMachine {
+ m.provider = &p
+ m.providerID = &pid
+ return m
+}
+
+func (m *fillerMachine) providedE(pid string) *fillerMachine {
+ return m.provided(model.ProviderEquinix, pid)
+}
+
+func (m *fillerMachine) located(location string) *fillerMachine {
+ m.location = &location
+ return m
+}
+
+func (m *fillerMachine) hardware(threads int32, ramgb int64) *fillerMachine {
+ m.threads = &threads
+ m.ramgb = &ramgb
+ return m
+}
+
+func (m *fillerMachine) agentStarted(t time.Time) *fillerMachine {
+ m.agentStartedAt = &t
+ return m
+}
+
+func (m *fillerMachine) agentHeartbeat(t time.Time) *fillerMachine {
+ m.agentHeartbeatAt = &t
+ return m
+}
+
+func (m *fillerMachine) agentHealthy() *fillerMachine {
+ now := time.Now()
+ return m.agentStarted(now.Add(-30 * time.Minute)).agentHeartbeat(now.Add(-1 * time.Minute))
+}
+
+func (m *fillerMachine) agentStoppedHeartbeating() *fillerMachine {
+ now := time.Now()
+ return m.agentStarted(now.Add(-30 * time.Minute)).agentHeartbeat(now.Add(-20 * time.Minute))
+}
+
+func (m *fillerMachine) agentNeverHeartbeat() *fillerMachine {
+ now := time.Now()
+ return m.agentStarted(now.Add(-30 * time.Minute))
+}
+
+func (m *fillerMachine) installRequested(gen int64) *fillerMachine {
+ m.installationRequestGeneration = &gen
+ return m
+}
+
+func (m *fillerMachine) installReported(gen int64) *fillerMachine {
+ m.installationReportGeneration = &gen
+ return m
+}
+
+func (m *fillerMachine) build() filler {
+ return m.f.chain(func(ctx context.Context, q *model.Queries) error {
+ mach, err := q.NewMachine(ctx)
+ if err != nil {
+ return err
+ }
+ if m.providerID != nil {
+ err = q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: mach.MachineID,
+ Provider: *m.provider,
+ ProviderID: *m.providerID,
+ })
+ if err != nil {
+ return err
+ }
+ if m.location != nil {
+ err = q.MachineUpdateProviderStatus(ctx, model.MachineUpdateProviderStatusParams{
+ ProviderID: *m.providerID,
+ Provider: *m.provider,
+ ProviderLocation: sql.NullString{Valid: true, String: *m.location},
+ })
+ if err != nil {
+ return err
+ }
+ }
+ }
+ if m.threads != nil {
+ report := api.AgentHardwareReport{
+ Report: &aapi.Node{
+ MemoryInstalledBytes: *m.ramgb << 30,
+ MemoryUsableRatio: 1.0,
+ Cpu: []*aapi.CPU{
+ {
+ HardwareThreads: *m.threads,
+ Cores: *m.threads,
+ },
+ },
+ },
+ Warning: nil,
+ }
+ raw, err := proto.Marshal(&report)
+ if err != nil {
+ return err
+ }
+ err = q.MachineSetHardwareReport(ctx, model.MachineSetHardwareReportParams{
+ MachineID: mach.MachineID,
+ HardwareReportRaw: raw,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ if m.agentStartedAt != nil {
+ err = q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+ MachineID: mach.MachineID,
+ AgentStartedAt: *m.agentStartedAt,
+ AgentPublicKey: []byte("fakefakefake"),
+ })
+ if err != nil {
+ return err
+ }
+ }
+ if m.agentHeartbeatAt != nil {
+ err = q.MachineSetAgentHeartbeat(ctx, model.MachineSetAgentHeartbeatParams{
+ MachineID: mach.MachineID,
+ AgentHeartbeatAt: *m.agentHeartbeatAt,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ if m.installationRequestGeneration != nil {
+ err = q.MachineSetOSInstallationRequest(ctx, model.MachineSetOSInstallationRequestParams{
+ MachineID: mach.MachineID,
+ Generation: *m.installationRequestGeneration,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ if m.installationReportGeneration != nil {
+ err = q.MachineSetOSInstallationReport(ctx, model.MachineSetOSInstallationReportParams{
+ MachineID: mach.MachineID,
+ Generation: *m.installationReportGeneration,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+}
+
+func TestHWStats(t *testing.T) {
+ s := Server{
+ Config: Config{
+ BMDB: bmdb.BMDB{
+ Config: bmdb.Config{
+ Database: component.CockroachConfig{
+ InMemory: true,
+ },
+ },
+ },
+ },
+ }
+
+ registry := prometheus.NewRegistry()
+ runner := newHWStatsRunner(&s, registry)
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ res, err := registry.Gather()
+ if err != nil {
+ t.Fatalf("Gather: %v", err)
+ }
+ if want, got := 0, len(res); want != got {
+ t.Fatalf("Expected no metrics with empty database, got %d", got)
+ }
+
+ conn, err := s.Config.BMDB.Open(true)
+ if err != nil {
+ t.Fatalf("Open: %v", err)
+ }
+ sess, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("StartSession: %v", err)
+ }
+ // Populate database with some test data.
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ f := fill().
+ machine().provided(model.ProviderEquinix, "1").hardware(32, 256).located("dark-bramble").build().
+ machine().provided(model.ProviderEquinix, "2").hardware(32, 256).located("dark-bramble").build().
+ machine().provided(model.ProviderEquinix, "3").hardware(32, 256).located("dark-bramble").build().
+ machine().provided(model.ProviderEquinix, "4").hardware(32, 256).located("brittle-hollow").build().
+ machine().provided(model.ProviderEquinix, "5").hardware(32, 256).located("timber-hearth").build().
+ machine().provided(model.ProviderEquinix, "6").hardware(32, 256).located("timber-hearth").build()
+ return f(ctx, q)
+ })
+ if err != nil {
+ t.Fatalf("Transact: %v", err)
+ }
+
+ s.bmdb = conn
+ s.sessionC = make(chan *bmdb.Session)
+ go s.sessionWorker(ctx)
+
+ // Do a statistics run and check results.
+ if err := runner.runOnce(ctx); err != nil {
+ t.Fatalf("runOnce: %v", err)
+ }
+
+ mfs, err := registry.Gather()
+ if err != nil {
+ t.Fatalf("Gatcher: %v", err)
+ }
+
+ // metric name -> provider -> location -> value
+ values := make(map[string]map[string]map[string]float64)
+ for _, mf := range mfs {
+ values[*mf.Name] = make(map[string]map[string]float64)
+ for _, m := range mf.Metric {
+ var provider, location string
+ for _, pair := range m.Label {
+ switch *pair.Name {
+ case "location":
+ location = *pair.Value
+ case "provider":
+ provider = *pair.Value
+ }
+ }
+ if _, ok := values[*mf.Name][provider]; !ok {
+ values[*mf.Name][provider] = make(map[string]float64)
+ }
+ switch {
+ case m.Gauge != nil && m.Gauge.Value != nil:
+ values[*mf.Name][provider][location] = *m.Gauge.Value
+ }
+ }
+ }
+
+ for _, te := range []struct {
+ provider model.Provider
+ location string
+ threads int32
+ ramgb int64
+ }{
+ {model.ProviderEquinix, "dark-bramble", 96, 768},
+ {model.ProviderEquinix, "brittle-hollow", 32, 256},
+ {model.ProviderEquinix, "timber-hearth", 64, 512},
+ } {
+ threads := values["bmdb_hwstats_region_cpu_threads"][string(te.provider)][te.location]
+ bytes := values["bmdb_hwstats_region_ram_bytes"][string(te.provider)][te.location]
+
+ if want, got := te.threads, int32(threads); want != got {
+ t.Errorf("Wanted %d threads in %s/%s, got %d", want, te.provider, te.location, got)
+ }
+ if want, got := te.ramgb, int64(bytes)>>30; want != got {
+ t.Errorf("Wanted %d GB RAM in %s/%s, got %d", want, te.provider, te.location, got)
+ }
+ }
+}
diff --git a/cloud/bmaas/scruffy/labels.go b/cloud/bmaas/scruffy/labels.go
new file mode 100644
index 0000000..b88f799
--- /dev/null
+++ b/cloud/bmaas/scruffy/labels.go
@@ -0,0 +1,94 @@
+package scruffy
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+
+ "source.monogon.dev/go/algorithm/cartesian"
+)
+
+// A labelDefinition describes a key/value pair that's a metric dimension. It
+// consists of the label key/name (a string), and a list of possible values of
+// this key. The list of values will be used to initialize the metrics at startup
+// with zero values.
+//
+// The initialValues system is intended to be used with labels that are
+// low-cardinality enums, e.g. the name of a subsystem.
+//
+// All labelDefinitions for a single metric will then create a cartesian product
+// of all initialValues.
+type labelDefinition struct {
+ // name/key of the label.
+ name string
+ // initialValues defines the default values for this label key/name that will be
+ // used to generate a list of initial zero-filled metrics.
+ initialValues []string
+}
+
+// labelDefinitions is a list of labelDefinition which define the label
+// dimensions of a metric. All the initialValues of the respective
+// labelDefinitions will create a cartesian set of default zero-filled metric
+// values when the metric susbsystem gets initialized. These zero values will
+// then get overridden by real data as it is collected.
+type labelDefinitions []labelDefinition
+
+// initialLabels generates the list of initial labels key/values that should be
+// used to generate zero-filled metrics on startup. This is a cartesian product
+// of all initialValues of all labelDefinitions.
+func (l labelDefinitions) initialLabels() []prometheus.Labels {
+ // Nothing to do if this is an empty labelDefinitions.
+ if len(l) == 0 {
+ return nil
+ }
+
+ // Given:
+ //
+ // labelDefinitions := []labelDefinition{
+ // { name: "a", initialValues: []string{"foo", "bar"}},
+ // { name: "b", initialValues: []string{"baz", "barf"}},
+ // }
+ //
+ // This creates:
+ //
+ // values := []string{
+ // { "foo", "bar" }, // label 'a'
+ // { "baz", "barf" }, // label 'b'
+ // }
+ var values [][]string
+ for _, ld := range l {
+ values = append(values, ld.initialValues)
+ }
+
+ // Given the above:
+ //
+ // valuesProduct := []string{
+ // // a b
+ // { "foo", "baz" },
+ // { "foo", "barf" },
+ // { "bar", "baz" },
+ // { "bar", "barf" },
+ // }
+ valuesProduct := cartesian.Product[string](values...)
+
+ // This converts valuesProduct into an actual prometheus-compatible type,
+ // re-attaching the label names back into the columns as seen above.
+ var res []prometheus.Labels
+ for _, vp := range valuesProduct {
+ labels := make(prometheus.Labels)
+ for i, lv := range vp {
+ labelDef := l[i]
+ labels[labelDef.name] = lv
+ }
+ res = append(res, labels)
+ }
+ return res
+}
+
+// names returns the keys/names of all the metric labels from these
+// labelDefinitions.
+func (l labelDefinitions) names() []string {
+ var res []string
+ for _, ld := range l {
+ res = append(res, ld.name)
+ }
+ return res
+}
diff --git a/cloud/bmaas/scruffy/server.go b/cloud/bmaas/scruffy/server.go
new file mode 100644
index 0000000..58e0f51
--- /dev/null
+++ b/cloud/bmaas/scruffy/server.go
@@ -0,0 +1,127 @@
+// Package scruffy implements Scruffy, The Janitor.
+//
+// Scruffy is a BMaaS component which runs a bunch of important, housekeeping-ish
+// processes that aren't tied to any particular provider and are mostly
+// batch-oriented.
+//
+// Currently Scruffy just collects metrics from the BMDB.
+package scruffy
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/metrics"
+ "source.monogon.dev/cloud/bmaas/bmdb/webug"
+ "source.monogon.dev/cloud/lib/component"
+)
+
+type Config struct {
+ Component component.ComponentConfig
+ BMDB bmdb.BMDB
+ Webug webug.Config
+
+ StatsRunnerRate time.Duration
+}
+
+// TODO(q3k): factor this out to BMDB library?
+func runtimeInfo() string {
+ hostname, _ := os.Hostname()
+ if hostname == "" {
+ hostname = "UNKNOWN"
+ }
+ return fmt.Sprintf("host %s", hostname)
+}
+
+func (c *Config) RegisterFlags() {
+ c.Component.RegisterFlags("scruffy")
+ c.BMDB.ComponentName = "scruffy"
+ c.BMDB.RuntimeInfo = runtimeInfo()
+ c.BMDB.Database.RegisterFlags("bmdb")
+ c.Webug.RegisterFlags()
+
+ flag.DurationVar(&c.StatsRunnerRate, "scruffy_stats_collection_rate", time.Minute, "How often the stats collection loop will run against BMDB")
+}
+
+type Server struct {
+ Config Config
+
+ bmdb *bmdb.Connection
+ sessionC chan *bmdb.Session
+}
+
+func (s *Server) Start(ctx context.Context) {
+ reg := s.Config.Component.PrometheusRegistry()
+ s.Config.BMDB.EnableMetrics(reg)
+ s.Config.Component.StartPrometheus(ctx)
+
+ conn, err := s.Config.BMDB.Open(true)
+ if err != nil {
+ klog.Exitf("Failed to connect to BMDB: %v", err)
+ }
+ s.bmdb = conn
+ s.sessionC = make(chan *bmdb.Session)
+ go s.sessionWorker(ctx)
+
+ bsr := newBMDBStatsRunner(s, reg)
+ go bsr.run(ctx)
+
+ hwr := newHWStatsRunner(s, reg)
+ go hwr.run(ctx)
+
+ go func() {
+ if err := s.Config.Webug.Start(ctx, conn); err != nil && err != ctx.Err() {
+ klog.Exitf("Failed to start webug: %v", err)
+ }
+ }()
+}
+
+// sessionWorker emits a valid BMDB session to sessionC as long as ctx is active.
+//
+// TODO(q3k): factor out into bmdb client lib
+func (s *Server) sessionWorker(ctx context.Context) {
+ var session *bmdb.Session
+ for {
+ if session == nil || session.Expired() {
+ klog.Infof("Starting new session...")
+ bo := backoff.NewExponentialBackOff()
+ err := backoff.Retry(func() error {
+ var err error
+ session, err = s.bmdb.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorScruffyStats})
+ if err != nil {
+ klog.Errorf("Failed to start session: %v", err)
+ return err
+ } else {
+ return nil
+ }
+ }, backoff.WithContext(bo, ctx))
+ if err != nil {
+ // If something's really wrong just crash.
+ klog.Exitf("Gave up on starting session: %v", err)
+ }
+ klog.Infof("New session: %s", session.UUID)
+ }
+
+ select {
+ case <-ctx.Done():
+ return
+ case s.sessionC <- session:
+ }
+ }
+}
+
+func (s *Server) session(ctx context.Context) (*bmdb.Session, error) {
+ select {
+ case sess := <-s.sessionC:
+ return sess, nil
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+}