Serge Bazanski | 6f59951 | 2023-04-26 19:08:19 +0200 | [diff] [blame] | 1 | package scruffy |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "errors" |
| 6 | "fmt" |
| 7 | "time" |
| 8 | |
| 9 | "github.com/google/uuid" |
| 10 | "github.com/prometheus/client_golang/prometheus" |
| 11 | "google.golang.org/protobuf/proto" |
| 12 | "k8s.io/klog/v2" |
| 13 | |
| 14 | "source.monogon.dev/cloud/bmaas/bmdb/model" |
| 15 | "source.monogon.dev/cloud/bmaas/server/api" |
| 16 | ) |
| 17 | |
| 18 | // hwStatsRunner collects metrics from the machine hardware inventory in BMDB and |
| 19 | // exposes them as Prometheus metrics via a registry passed to newHWStatsRunner. |
| 20 | type hwStatsRunner struct { |
| 21 | s *Server |
| 22 | |
| 23 | memoryPerRegion *prometheus.GaugeVec |
| 24 | cpuThreadsPerRegion *prometheus.GaugeVec |
| 25 | } |
| 26 | |
| 27 | // newHWStatsRunner builds a hwStatsRunner. The hwStatsRunner then has the |
| 28 | // given's Server BMDB connection bound to it and can perform actual database |
| 29 | // statistic gathering. |
| 30 | func newHWStatsRunner(s *Server, reg *prometheus.Registry) *hwStatsRunner { |
| 31 | hwsr := &hwStatsRunner{ |
| 32 | s: s, |
| 33 | |
| 34 | memoryPerRegion: prometheus.NewGaugeVec(prometheus.GaugeOpts{ |
| 35 | Name: "bmdb_hwstats_region_ram_bytes", |
| 36 | }, []string{"provider", "location"}), |
| 37 | |
| 38 | cpuThreadsPerRegion: prometheus.NewGaugeVec(prometheus.GaugeOpts{ |
| 39 | Name: "bmdb_hwstats_region_cpu_threads", |
| 40 | }, []string{"provider", "location"}), |
| 41 | } |
| 42 | reg.MustRegister(hwsr.memoryPerRegion, hwsr.cpuThreadsPerRegion) |
| 43 | return hwsr |
| 44 | } |
| 45 | |
| 46 | func (h *hwStatsRunner) run(ctx context.Context) { |
| 47 | klog.Infof("Starting stats runner...") |
| 48 | |
| 49 | ti := time.NewTicker(time.Minute) |
| 50 | |
| 51 | for { |
| 52 | err := h.runOnce(ctx) |
| 53 | if err != nil { |
| 54 | if errors.Is(err, ctx.Err()) { |
| 55 | return |
| 56 | } |
| 57 | klog.Errorf("Stats run failed: %v", err) |
| 58 | } |
| 59 | select { |
| 60 | case <-ti.C: |
| 61 | case <-ctx.Done(): |
| 62 | klog.Infof("Exiting stats runner (%v)...", ctx.Err()) |
| 63 | return |
| 64 | } |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | // statsPerRegion are gathered and aggregated (summed) per region. |
| 69 | type statsPerRegion struct { |
| 70 | ramBytes uint64 |
| 71 | numThreads uint64 |
| 72 | } |
| 73 | |
| 74 | // add a given AgentHardwareReport to this region's data. |
| 75 | func (s *statsPerRegion) add(hwrep *api.AgentHardwareReport) { |
| 76 | s.ramBytes += uint64(hwrep.Report.MemoryInstalledBytes) |
| 77 | for _, cpu := range hwrep.Report.Cpu { |
| 78 | s.numThreads += uint64(cpu.HardwareThreads) |
| 79 | } |
| 80 | } |
| 81 | |
| 82 | // regionKey is used to uniquely identify each region per each provider. |
| 83 | type regionKey struct { |
| 84 | provider model.Provider |
| 85 | location string |
| 86 | } |
| 87 | |
| 88 | func (r *regionKey) String() string { |
| 89 | return fmt.Sprintf("%s/%s", r.provider, r.location) |
| 90 | } |
| 91 | |
| 92 | func (h *hwStatsRunner) runOnce(ctx context.Context) error { |
| 93 | sess, err := h.s.session(ctx) |
| 94 | if err != nil { |
| 95 | return err |
| 96 | } |
| 97 | |
| 98 | var start uuid.UUID |
| 99 | |
| 100 | perRegion := make(map[regionKey]*statsPerRegion) |
| 101 | var total statsPerRegion |
| 102 | |
| 103 | for { |
| 104 | var res []model.ListMachineHardwareRow |
| 105 | err = sess.Transact(ctx, func(q *model.Queries) error { |
| 106 | res, err = q.ListMachineHardware(ctx, model.ListMachineHardwareParams{ |
| 107 | Limit: 100, |
| 108 | MachineID: start, |
| 109 | }) |
| 110 | return err |
| 111 | }) |
| 112 | if err != nil { |
| 113 | return err |
| 114 | } |
| 115 | klog.Infof("Machines: %d chunk", len(res)) |
| 116 | if len(res) == 0 { |
| 117 | break |
| 118 | } |
| 119 | for _, row := range res { |
| 120 | var hwrep api.AgentHardwareReport |
| 121 | err = proto.Unmarshal(row.HardwareReportRaw.([]byte), &hwrep) |
| 122 | if err != nil { |
| 123 | klog.Warningf("Could not decode hardware report from %s: %v", row.MachineID, err) |
| 124 | continue |
| 125 | } |
| 126 | |
| 127 | if !row.ProviderLocation.Valid { |
| 128 | klog.Warningf("%s has no provider location, skipping", row.MachineID) |
| 129 | continue |
| 130 | } |
| 131 | |
| 132 | key := regionKey{ |
| 133 | provider: row.Provider, |
| 134 | location: row.ProviderLocation.String, |
| 135 | } |
| 136 | if _, ok := perRegion[key]; !ok { |
| 137 | perRegion[key] = &statsPerRegion{} |
| 138 | } |
| 139 | perRegion[key].add(&hwrep) |
| 140 | total.add(&hwrep) |
| 141 | |
| 142 | start = row.MachineID |
| 143 | } |
| 144 | } |
| 145 | |
| 146 | for k, st := range perRegion { |
| 147 | labels := prometheus.Labels{ |
| 148 | "provider": string(k.provider), |
| 149 | "location": k.location, |
| 150 | } |
| 151 | |
| 152 | h.memoryPerRegion.With(labels).Set(float64(st.ramBytes)) |
| 153 | h.cpuThreadsPerRegion.With(labels).Set(float64(st.numThreads)) |
| 154 | } |
| 155 | return nil |
| 156 | } |