cloud/bmaas/bmdb: implement BMDB client metrics

This implements some basic BMDB metrics exported by any client code
using the BMDB.

It also enables their use in the Shepherd and BMSRV.

Change-Id: I1d5e82fd2c34a7bfd42f37fad540d69f7b23f347
Reviewed-on: https://review.monogon.dev/c/monogon/+/1600
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/bmaas/bmdb/BUILD.bazel b/cloud/bmaas/bmdb/BUILD.bazel
index 761baef..c4eb226 100644
--- a/cloud/bmaas/bmdb/BUILD.bazel
+++ b/cloud/bmaas/bmdb/BUILD.bazel
@@ -10,12 +10,14 @@
     importpath = "source.monogon.dev/cloud/bmaas/bmdb",
     visibility = ["//visibility:public"],
     deps = [
+        "//cloud/bmaas/bmdb/metrics",
         "//cloud/bmaas/bmdb/model",
         "//cloud/bmaas/bmdb/reflection",
         "//cloud/lib/component",
         "@com_github_cockroachdb_cockroach_go_v2//crdb",
         "@com_github_google_uuid//:uuid",
         "@com_github_lib_pq//:pq",
+        "@com_github_prometheus_client_golang//prometheus",
         "@io_k8s_klog_v2//:klog",
     ],
 )
diff --git a/cloud/bmaas/bmdb/bmdb.go b/cloud/bmaas/bmdb/bmdb.go
index 5699381..242c3d9 100644
--- a/cloud/bmaas/bmdb/bmdb.go
+++ b/cloud/bmaas/bmdb/bmdb.go
@@ -7,7 +7,12 @@
 // service over gRPC.
 package bmdb
 
-import "source.monogon.dev/cloud/lib/component"
+import (
+	"github.com/prometheus/client_golang/prometheus"
+
+	"source.monogon.dev/cloud/bmaas/bmdb/metrics"
+	"source.monogon.dev/cloud/lib/component"
+)
 
 // BMDB is the Bare Metal Database, a common schema to store information about
 // bare metal machines in CockroachDB. This struct is supposed to be
@@ -30,6 +35,8 @@
 //     machines with some active work being performed on them.
 type BMDB struct {
 	Config
+
+	metrics *metrics.MetricsSet
 }
 
 // Config is the configuration of the BMDB connector.
@@ -44,3 +51,11 @@
 	// ComponentName in active Sessions.
 	RuntimeInfo string
 }
+
+// EnableMetrics configures BMDB metrics collection and registers it on the given
+// registry. This method should only be called once, and is not goroutine safe.
+func (b *BMDB) EnableMetrics(registry *prometheus.Registry) {
+	if b.metrics == nil {
+		b.metrics = metrics.New(registry)
+	}
+}
diff --git a/cloud/bmaas/bmdb/metrics/BUILD.bazel b/cloud/bmaas/bmdb/metrics/BUILD.bazel
new file mode 100644
index 0000000..d5eb8a5
--- /dev/null
+++ b/cloud/bmaas/bmdb/metrics/BUILD.bazel
@@ -0,0 +1,12 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+    name = "metrics",
+    srcs = ["metrics.go"],
+    importpath = "source.monogon.dev/cloud/bmaas/bmdb/metrics",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//cloud/bmaas/bmdb/model",
+        "@com_github_prometheus_client_golang//prometheus",
+    ],
+)
diff --git a/cloud/bmaas/bmdb/metrics/metrics.go b/cloud/bmaas/bmdb/metrics/metrics.go
new file mode 100644
index 0000000..2dbad29
--- /dev/null
+++ b/cloud/bmaas/bmdb/metrics/metrics.go
@@ -0,0 +1,212 @@
+// Package metrics implements a Prometheus metrics submission interface for BMDB
+// client components. A Metrics object can be attached to a BMDB object, which
+// will make all BMDB sessions/transactions/work statistics be submitted to that
+// Metrics object.
+package metrics
+
+import (
+	"github.com/prometheus/client_golang/prometheus"
+
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+)
+
+// Processor describes some cloud component and possibly sub-component which acts
+// upon the BMDB. When starting a BMDB session, this Processor can be provided to
+// contextualize the metrics emitted by this session. Because the selected
+// Processor ends up directly as a Prometheus metric label, it must be
+// low-cardinality - thus all possible values are defined as an enum here. If a
+// Session is not configured with a Processor, the default (ProcessorUnknown)
+// will be used.
+type Processor string
+
+const (
+	ProcessorUnknown             Processor = ""
+	ProcessorShepherdInitializer Processor = "shepherd-initializer"
+	ProcessorShepherdProvisioner Processor = "shepherd-provisioner"
+	ProcessorShepherdRecoverer   Processor = "shepherd-recoverer"
+	ProcessorShepherdUpdater     Processor = "shepherd-updater"
+	ProcessorBMSRV               Processor = "bmsrv"
+)
+
+// String returns the Prometheus label value for use with the 'processor' label
+// key.
+func (p Processor) String() string {
+	switch p {
+	case ProcessorUnknown:
+		return "unknown"
+	default:
+		return string(p)
+	}
+}
+
+// MetricsSet contains all the Prometheus metrics objects related to a BMDB
+// client.
+//
+// The MetricsSet object is goroutine-safe.
+//
+// An empty MetricsSet object is not valid, and should be instead constructed
+// using New.
+//
+// A nil MetricsSet object is valid and represents a no-op metrics recorder
+// that's never collected.
+type MetricsSet struct {
+	sessionStarted      *prometheus.CounterVec
+	transactionExecuted *prometheus.CounterVec
+	transactionRetried  *prometheus.CounterVec
+	transactionFailed   *prometheus.CounterVec
+	workStarted         *prometheus.CounterVec
+	workFinished        *prometheus.CounterVec
+}
+
+func processorCounter(name, help string, labels ...string) *prometheus.CounterVec {
+	labels = append([]string{"processor"}, labels...)
+	return prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Name: name,
+			Help: help,
+		},
+		labels,
+	)
+}
+
+// New creates a new BMDB MetricsSet object which can be then attached to a BMDB
+// object by calling BMDB.EnableMetrics on the MetricsSet object.
+//
+// The given registry must be a valid Prometheus registry, and all metrics
+// contained in this MetricsSet object will be registered into it.
+//
+// The MetricsSet object can be shared between multiple BMDB object.
+//
+// The MetricsSet object is goroutine-safe.
+func New(registry *prometheus.Registry) *MetricsSet {
+	m := &MetricsSet{
+		sessionStarted:      processorCounter("bmdb_session_started", "How many sessions this worker started"),
+		transactionExecuted: processorCounter("bmdb_transaction_executed", "How many transactions were performed by this worker"),
+		transactionRetried:  processorCounter("bmdb_transaction_retried", "How many transaction retries were performed by this worker"),
+		transactionFailed:   processorCounter("bmdb_transaction_failed", "How many transactions failed permanently on this worker"),
+		workStarted:         processorCounter("bmdb_work_started", "How many work items were performed by this worker, partitioned by process", "process"),
+		workFinished:        processorCounter("bmdb_work_finished", "How many work items were finished by this worker, partitioned by process and result", "process", "result"),
+	}
+	registry.MustRegister(
+		m.sessionStarted,
+		m.transactionExecuted,
+		m.transactionRetried,
+		m.transactionFailed,
+		m.workStarted,
+		m.workFinished,
+	)
+	return m
+}
+
+// ProcessorRecorder wraps a MetricsSet object with the context of some
+// Processor. It exposes methods that record specific events into the managed
+// Metrics.
+//
+// The ProcessorRecorder object is goroutine safe.
+//
+// An empty ProcessorRecorder object is not valid, and should be instead
+// constructed using Metrics.Recorder.
+//
+// A nil ProcessorRecorder object is valid and represents a no-op metrics
+// recorder.
+type ProcessorRecorder struct {
+	m      *MetricsSet
+	labels prometheus.Labels
+}
+
+// Recorder builds a ProcessorRecorder for the given Metrics and a given
+// Processor.
+func (m *MetricsSet) Recorder(p Processor) *ProcessorRecorder {
+	if m == nil {
+		return nil
+	}
+	return &ProcessorRecorder{
+		m: m,
+		labels: prometheus.Labels{
+			"processor": p.String(),
+		},
+	}
+}
+
+// OnTransactionStarted should be called any time a BMDB client starts or
+// re-starts a BMDB Transaction. The attempt should either be '1' (for the first
+// attempt) or a number larger than 1 for any subsequent attempt (i.e. retry) of
+// a transaction.
+func (r *ProcessorRecorder) OnTransactionStarted(attempt int64) {
+	if r == nil {
+		return
+	}
+	if attempt == 1 {
+		r.m.transactionExecuted.With(r.labels).Inc()
+	} else {
+		r.m.transactionRetried.With(r.labels).Inc()
+	}
+}
+
+// OnTransactionFailed should be called any time a BMDB client fails a
+// BMDB Transaction permanently.
+func (r *ProcessorRecorder) OnTransactionFailed() {
+	if r == nil {
+		return
+	}
+	r.m.transactionFailed.With(r.labels).Inc()
+}
+
+// OnSessionStarted should be called any time a BMDB client opens a new BMDB
+// Session.
+func (r *ProcessorRecorder) OnSessionStarted() {
+	if r == nil {
+		return
+	}
+	r.m.sessionStarted.With(r.labels).Inc()
+}
+
+// ProcessRecorder wraps a ProcessorRecorder with an additional model.Process.
+// The resulting object can then record work-specific events.
+//
+// The PusherWithProcess object is goroutine-safe.
+type ProcessRecorder struct {
+	*ProcessorRecorder
+	labels prometheus.Labels
+}
+
+// WithProcess wraps a given Pusher with a Process.
+//
+// The resulting PusherWithProcess object is goroutine-safe.
+func (r *ProcessorRecorder) WithProcess(process model.Process) *ProcessRecorder {
+	if r == nil {
+		return nil
+	}
+	return &ProcessRecorder{
+		ProcessorRecorder: r,
+		labels: prometheus.Labels{
+			"processor": r.labels["processor"],
+			"process":   string(process),
+		},
+	}
+}
+
+// OnWorkStarted should be called any time a BMDB client starts a new Work item.
+func (r *ProcessRecorder) OnWorkStarted() {
+	if r == nil {
+		return
+	}
+	r.m.workStarted.With(r.labels).Inc()
+}
+
+type WorkResult string
+
+const (
+	WorkResultFinished WorkResult = "finished"
+	WorkResultCanceled WorkResult = "canceled"
+	WorkResultFailed   WorkResult = "failed"
+)
+
+// OnWorkFinished should be called any time a BMDB client finishes, cancels or
+// fails a Work item.
+func (r *ProcessRecorder) OnWorkFinished(result WorkResult) {
+	if r == nil {
+		return
+	}
+	r.m.workFinished.MustCurryWith(r.labels).With(prometheus.Labels{"result": string(result)}).Inc()
+}
diff --git a/cloud/bmaas/bmdb/sessions.go b/cloud/bmaas/bmdb/sessions.go
index 144e6da..95b8be5 100644
--- a/cloud/bmaas/bmdb/sessions.go
+++ b/cloud/bmaas/bmdb/sessions.go
@@ -12,6 +12,7 @@
 	"github.com/lib/pq"
 	"k8s.io/klog/v2"
 
+	"source.monogon.dev/cloud/bmaas/bmdb/metrics"
 	"source.monogon.dev/cloud/bmaas/bmdb/model"
 )
 
@@ -27,7 +28,7 @@
 // subsequent attempts to call Transact will fail with ErrSessionExpired. This
 // means that the caller within the component is responsible for recreating a
 // new Session if a previously used one expires.
-func (c *Connection) StartSession(ctx context.Context) (*Session, error) {
+func (c *Connection) StartSession(ctx context.Context, opts ...SessionOption) (*Session, error) {
 	intervalSeconds := 5
 
 	res, err := model.New(c.db).NewSession(ctx, model.NewSessionParams{
@@ -43,6 +44,13 @@
 
 	ctx2, ctxC := context.WithCancel(ctx)
 
+	var processor metrics.Processor
+	for _, opt := range opts {
+		if opt.Processor != "" {
+			processor = opt.Processor
+		}
+	}
+
 	s := &Session{
 		connection: c,
 		interval:   time.Duration(intervalSeconds) * time.Second,
@@ -51,11 +59,17 @@
 
 		ctx:  ctx2,
 		ctxC: ctxC,
+		m:    c.bmdb.metrics.Recorder(processor),
 	}
+	s.m.OnSessionStarted()
 	go s.maintainHeartbeat(ctx2)
 	return s, nil
 }
 
+type SessionOption struct {
+	Processor metrics.Processor
+}
+
 // Session is a session (identified by UUID) that has been started in the BMDB.
 // Its liveness is maintained by a background goroutine, and as long as that
 // session is alive, it can perform transactions and work on the BMDB.
@@ -67,6 +81,8 @@
 
 	ctx  context.Context
 	ctxC context.CancelFunc
+
+	m *metrics.ProcessorRecorder
 }
 
 // Expired returns true if this session is expired and will fail all subsequent
@@ -153,7 +169,12 @@
 // Most pure (meaning without side effects outside the database itself) BMDB
 // transactions should be run this way.
 func (s *Session) Transact(ctx context.Context, fn func(q *model.Queries) error) error {
-	return crdb.ExecuteTx(ctx, s.connection.db, nil, func(tx *sql.Tx) error {
+	var attempts int64
+
+	err := crdb.ExecuteTx(ctx, s.connection.db, nil, func(tx *sql.Tx) error {
+		attempts += 1
+		s.m.OnTransactionStarted(attempts)
+
 		qtx := model.New(tx)
 		sessions, err := qtx.SessionCheck(ctx, s.UUID)
 		if err != nil {
@@ -169,6 +190,10 @@
 
 		return nil
 	})
+	if err != nil {
+		s.m.OnTransactionFailed()
+	}
+	return err
 }
 
 var (
@@ -262,13 +287,16 @@
 	if err != nil {
 		return nil, err
 	}
-	klog.Infof("Started work %q on machine %q (sess %q)", process, *mid, s.UUID)
-	return &Work{
+	w := &Work{
 		Machine: *mid,
 		s:       s,
 		process: process,
 		backoff: exisingingBackoff,
-	}, nil
+		m:       s.m.WithProcess(process),
+	}
+	w.m.OnWorkStarted()
+	klog.Infof("Started work %q on machine %q (sess %q)", process, *mid, s.UUID)
+	return w, nil
 }
 
 // existingBackoff contains backoff information retrieved from a work item that
@@ -461,6 +489,8 @@
 	process model.Process
 
 	backoff *existingBackoff
+
+	m *metrics.ProcessRecorder
 }
 
 // Cancel the Work started on a machine. If the work has already been finished
@@ -470,6 +500,7 @@
 		return
 	}
 	w.done = true
+	w.m.OnWorkFinished(metrics.WorkResultCanceled)
 
 	klog.Infof("Canceling work %q on machine %q (sess %q)", w.process, w.Machine, w.s.UUID)
 	// Eat error and log. There's nothing we can do if this fails, and if it does, it's
@@ -508,6 +539,8 @@
 		return fmt.Errorf("already finished")
 	}
 	w.done = true
+	w.m.OnWorkFinished(metrics.WorkResultFinished)
+
 	klog.Infof("Finishing work %q on machine %q (sess %q)", w.process, w.Machine, w.s.UUID)
 	return w.s.Transact(ctx, func(q *model.Queries) error {
 		err := q.FinishWork(ctx, model.FinishWorkParams{
@@ -563,6 +596,7 @@
 		return fmt.Errorf("already finished")
 	}
 	w.done = true
+	w.m.OnWorkFinished(metrics.WorkResultFailed)
 
 	return w.s.Transact(ctx, func(q *model.Queries) error {
 		err := q.FinishWork(ctx, model.FinishWorkParams{
diff --git a/cloud/bmaas/server/BUILD.bazel b/cloud/bmaas/server/BUILD.bazel
index 4303d62..18e72b5 100644
--- a/cloud/bmaas/server/BUILD.bazel
+++ b/cloud/bmaas/server/BUILD.bazel
@@ -10,6 +10,7 @@
     visibility = ["//visibility:public"],
     deps = [
         "//cloud/bmaas/bmdb",
+        "//cloud/bmaas/bmdb/metrics",
         "//cloud/bmaas/bmdb/model",
         "//cloud/bmaas/bmdb/webug",
         "//cloud/bmaas/server/api",
diff --git a/cloud/bmaas/server/server.go b/cloud/bmaas/server/server.go
index 8867e4f..00972de 100644
--- a/cloud/bmaas/server/server.go
+++ b/cloud/bmaas/server/server.go
@@ -13,6 +13,7 @@
 	"k8s.io/klog/v2"
 
 	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/metrics"
 	"source.monogon.dev/cloud/bmaas/bmdb/webug"
 	apb "source.monogon.dev/cloud/bmaas/server/api"
 	"source.monogon.dev/cloud/lib/component"
@@ -73,7 +74,7 @@
 			bo := backoff.NewExponentialBackOff()
 			err := backoff.Retry(func() error {
 				var err error
-				session, err = s.bmdb.StartSession(ctx)
+				session, err = s.bmdb.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorBMSRV})
 				if err != nil {
 					klog.Errorf("Failed to start session: %v", err)
 					return err
@@ -145,6 +146,8 @@
 // Start the BMaaS Server in background goroutines. This should only be called
 // once. The process will exit with debug logs if starting the server failed.
 func (s *Server) Start(ctx context.Context) {
+	reg := s.Config.Component.PrometheusRegistry()
+	s.Config.BMDB.EnableMetrics(reg)
 	s.Config.Component.StartPrometheus(ctx)
 
 	conn, err := s.Config.BMDB.Open(true)