cloud/bmaas/bmdb/scruffy: initialize, implement BMDB metrics
This creates a new BMaaS component, Scruffy the Janitor.
Scruffy will run a bunch of housekeeping jobs that aren't tied to a
particular provider or even region. Currently Scruffy just collects BMDB
metrics by periodically polling the BMDB SQL database.
Change-Id: Icafa714811757eaaf31fed43184ded8512bde067
Reviewed-on: https://review.monogon.dev/c/monogon/+/1819
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/bmaas/scruffy/hw_stats_test.go b/cloud/bmaas/scruffy/hw_stats_test.go
new file mode 100644
index 0000000..ffb572c
--- /dev/null
+++ b/cloud/bmaas/scruffy/hw_stats_test.go
@@ -0,0 +1,319 @@
+package scruffy
+
+import (
+ "context"
+ "database/sql"
+ "testing"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "google.golang.org/protobuf/proto"
+
+ aapi "source.monogon.dev/cloud/agent/api"
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/bmaas/server/api"
+ "source.monogon.dev/cloud/lib/component"
+)
+
+type filler func(ctx context.Context, q *model.Queries) error
+
+func fill() filler {
+ return func(ctx context.Context, q *model.Queries) error {
+ return nil
+ }
+}
+
+func (f filler) chain(n func(ctx context.Context, q *model.Queries) error) filler {
+ return func(ctx context.Context, q *model.Queries) error {
+ if err := f(ctx, q); err != nil {
+ return err
+ }
+ return n(ctx, q)
+ }
+}
+
+type fillerMachine struct {
+ f filler
+
+ provider *model.Provider
+ providerID *string
+
+ location *string
+
+ threads *int32
+ ramgb *int64
+
+ agentStartedAt *time.Time
+
+ agentHeartbeatAt *time.Time
+
+ installationRequestGeneration *int64
+
+ installationReportGeneration *int64
+}
+
+func (f filler) machine() *fillerMachine {
+ return &fillerMachine{
+ f: f,
+ }
+}
+
+func (m *fillerMachine) provided(p model.Provider, pid string) *fillerMachine {
+ m.provider = &p
+ m.providerID = &pid
+ return m
+}
+
+func (m *fillerMachine) providedE(pid string) *fillerMachine {
+ return m.provided(model.ProviderEquinix, pid)
+}
+
+func (m *fillerMachine) located(location string) *fillerMachine {
+ m.location = &location
+ return m
+}
+
+func (m *fillerMachine) hardware(threads int32, ramgb int64) *fillerMachine {
+ m.threads = &threads
+ m.ramgb = &ramgb
+ return m
+}
+
+func (m *fillerMachine) agentStarted(t time.Time) *fillerMachine {
+ m.agentStartedAt = &t
+ return m
+}
+
+func (m *fillerMachine) agentHeartbeat(t time.Time) *fillerMachine {
+ m.agentHeartbeatAt = &t
+ return m
+}
+
+func (m *fillerMachine) agentHealthy() *fillerMachine {
+ now := time.Now()
+ return m.agentStarted(now.Add(-30 * time.Minute)).agentHeartbeat(now.Add(-1 * time.Minute))
+}
+
+func (m *fillerMachine) agentStoppedHeartbeating() *fillerMachine {
+ now := time.Now()
+ return m.agentStarted(now.Add(-30 * time.Minute)).agentHeartbeat(now.Add(-20 * time.Minute))
+}
+
+func (m *fillerMachine) agentNeverHeartbeat() *fillerMachine {
+ now := time.Now()
+ return m.agentStarted(now.Add(-30 * time.Minute))
+}
+
+func (m *fillerMachine) installRequested(gen int64) *fillerMachine {
+ m.installationRequestGeneration = &gen
+ return m
+}
+
+func (m *fillerMachine) installReported(gen int64) *fillerMachine {
+ m.installationReportGeneration = &gen
+ return m
+}
+
+func (m *fillerMachine) build() filler {
+ return m.f.chain(func(ctx context.Context, q *model.Queries) error {
+ mach, err := q.NewMachine(ctx)
+ if err != nil {
+ return err
+ }
+ if m.providerID != nil {
+ err = q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: mach.MachineID,
+ Provider: *m.provider,
+ ProviderID: *m.providerID,
+ })
+ if err != nil {
+ return err
+ }
+ if m.location != nil {
+ err = q.MachineUpdateProviderStatus(ctx, model.MachineUpdateProviderStatusParams{
+ ProviderID: *m.providerID,
+ Provider: *m.provider,
+ ProviderLocation: sql.NullString{Valid: true, String: *m.location},
+ })
+ if err != nil {
+ return err
+ }
+ }
+ }
+ if m.threads != nil {
+ report := api.AgentHardwareReport{
+ Report: &aapi.Node{
+ MemoryInstalledBytes: *m.ramgb << 30,
+ MemoryUsableRatio: 1.0,
+ Cpu: []*aapi.CPU{
+ {
+ HardwareThreads: *m.threads,
+ Cores: *m.threads,
+ },
+ },
+ },
+ Warning: nil,
+ }
+ raw, err := proto.Marshal(&report)
+ if err != nil {
+ return err
+ }
+ err = q.MachineSetHardwareReport(ctx, model.MachineSetHardwareReportParams{
+ MachineID: mach.MachineID,
+ HardwareReportRaw: raw,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ if m.agentStartedAt != nil {
+ err = q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+ MachineID: mach.MachineID,
+ AgentStartedAt: *m.agentStartedAt,
+ AgentPublicKey: []byte("fakefakefake"),
+ })
+ if err != nil {
+ return err
+ }
+ }
+ if m.agentHeartbeatAt != nil {
+ err = q.MachineSetAgentHeartbeat(ctx, model.MachineSetAgentHeartbeatParams{
+ MachineID: mach.MachineID,
+ AgentHeartbeatAt: *m.agentHeartbeatAt,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ if m.installationRequestGeneration != nil {
+ err = q.MachineSetOSInstallationRequest(ctx, model.MachineSetOSInstallationRequestParams{
+ MachineID: mach.MachineID,
+ Generation: *m.installationRequestGeneration,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ if m.installationReportGeneration != nil {
+ err = q.MachineSetOSInstallationReport(ctx, model.MachineSetOSInstallationReportParams{
+ MachineID: mach.MachineID,
+ Generation: *m.installationReportGeneration,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+}
+
+func TestHWStats(t *testing.T) {
+ s := Server{
+ Config: Config{
+ BMDB: bmdb.BMDB{
+ Config: bmdb.Config{
+ Database: component.CockroachConfig{
+ InMemory: true,
+ },
+ },
+ },
+ },
+ }
+
+ registry := prometheus.NewRegistry()
+ runner := newHWStatsRunner(&s, registry)
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ res, err := registry.Gather()
+ if err != nil {
+ t.Fatalf("Gather: %v", err)
+ }
+ if want, got := 0, len(res); want != got {
+ t.Fatalf("Expected no metrics with empty database, got %d", got)
+ }
+
+ conn, err := s.Config.BMDB.Open(true)
+ if err != nil {
+ t.Fatalf("Open: %v", err)
+ }
+ sess, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("StartSession: %v", err)
+ }
+ // Populate database with some test data.
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ f := fill().
+ machine().provided(model.ProviderEquinix, "1").hardware(32, 256).located("dark-bramble").build().
+ machine().provided(model.ProviderEquinix, "2").hardware(32, 256).located("dark-bramble").build().
+ machine().provided(model.ProviderEquinix, "3").hardware(32, 256).located("dark-bramble").build().
+ machine().provided(model.ProviderEquinix, "4").hardware(32, 256).located("brittle-hollow").build().
+ machine().provided(model.ProviderEquinix, "5").hardware(32, 256).located("timber-hearth").build().
+ machine().provided(model.ProviderEquinix, "6").hardware(32, 256).located("timber-hearth").build()
+ return f(ctx, q)
+ })
+ if err != nil {
+ t.Fatalf("Transact: %v", err)
+ }
+
+ s.bmdb = conn
+ s.sessionC = make(chan *bmdb.Session)
+ go s.sessionWorker(ctx)
+
+ // Do a statistics run and check results.
+ if err := runner.runOnce(ctx); err != nil {
+ t.Fatalf("runOnce: %v", err)
+ }
+
+ mfs, err := registry.Gather()
+ if err != nil {
+ t.Fatalf("Gatcher: %v", err)
+ }
+
+ // metric name -> provider -> location -> value
+ values := make(map[string]map[string]map[string]float64)
+ for _, mf := range mfs {
+ values[*mf.Name] = make(map[string]map[string]float64)
+ for _, m := range mf.Metric {
+ var provider, location string
+ for _, pair := range m.Label {
+ switch *pair.Name {
+ case "location":
+ location = *pair.Value
+ case "provider":
+ provider = *pair.Value
+ }
+ }
+ if _, ok := values[*mf.Name][provider]; !ok {
+ values[*mf.Name][provider] = make(map[string]float64)
+ }
+ switch {
+ case m.Gauge != nil && m.Gauge.Value != nil:
+ values[*mf.Name][provider][location] = *m.Gauge.Value
+ }
+ }
+ }
+
+ for _, te := range []struct {
+ provider model.Provider
+ location string
+ threads int32
+ ramgb int64
+ }{
+ {model.ProviderEquinix, "dark-bramble", 96, 768},
+ {model.ProviderEquinix, "brittle-hollow", 32, 256},
+ {model.ProviderEquinix, "timber-hearth", 64, 512},
+ } {
+ threads := values["bmdb_hwstats_region_cpu_threads"][string(te.provider)][te.location]
+ bytes := values["bmdb_hwstats_region_ram_bytes"][string(te.provider)][te.location]
+
+ if want, got := te.threads, int32(threads); want != got {
+ t.Errorf("Wanted %d threads in %s/%s, got %d", want, te.provider, te.location, got)
+ }
+ if want, got := te.ramgb, int64(bytes)>>30; want != got {
+ t.Errorf("Wanted %d GB RAM in %s/%s, got %d", want, te.provider, te.location, got)
+ }
+ }
+}