cloud/bmaas/bmdb: implement BMDB client metrics

This implements some basic BMDB metrics exported by any client code
using the BMDB.

It also enables their use in the Shepherd and BMSRV.

Change-Id: I1d5e82fd2c34a7bfd42f37fad540d69f7b23f347
Reviewed-on: https://review.monogon.dev/c/monogon/+/1600
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/bmaas/bmdb/sessions.go b/cloud/bmaas/bmdb/sessions.go
index 144e6da..95b8be5 100644
--- a/cloud/bmaas/bmdb/sessions.go
+++ b/cloud/bmaas/bmdb/sessions.go
@@ -12,6 +12,7 @@
 	"github.com/lib/pq"
 	"k8s.io/klog/v2"
 
+	"source.monogon.dev/cloud/bmaas/bmdb/metrics"
 	"source.monogon.dev/cloud/bmaas/bmdb/model"
 )
 
@@ -27,7 +28,7 @@
 // subsequent attempts to call Transact will fail with ErrSessionExpired. This
 // means that the caller within the component is responsible for recreating a
 // new Session if a previously used one expires.
-func (c *Connection) StartSession(ctx context.Context) (*Session, error) {
+func (c *Connection) StartSession(ctx context.Context, opts ...SessionOption) (*Session, error) {
 	intervalSeconds := 5
 
 	res, err := model.New(c.db).NewSession(ctx, model.NewSessionParams{
@@ -43,6 +44,13 @@
 
 	ctx2, ctxC := context.WithCancel(ctx)
 
+	var processor metrics.Processor
+	for _, opt := range opts {
+		if opt.Processor != "" {
+			processor = opt.Processor
+		}
+	}
+
 	s := &Session{
 		connection: c,
 		interval:   time.Duration(intervalSeconds) * time.Second,
@@ -51,11 +59,17 @@
 
 		ctx:  ctx2,
 		ctxC: ctxC,
+		m:    c.bmdb.metrics.Recorder(processor),
 	}
+	s.m.OnSessionStarted()
 	go s.maintainHeartbeat(ctx2)
 	return s, nil
 }
 
+type SessionOption struct {
+	Processor metrics.Processor
+}
+
 // Session is a session (identified by UUID) that has been started in the BMDB.
 // Its liveness is maintained by a background goroutine, and as long as that
 // session is alive, it can perform transactions and work on the BMDB.
@@ -67,6 +81,8 @@
 
 	ctx  context.Context
 	ctxC context.CancelFunc
+
+	m *metrics.ProcessorRecorder
 }
 
 // Expired returns true if this session is expired and will fail all subsequent
@@ -153,7 +169,12 @@
 // Most pure (meaning without side effects outside the database itself) BMDB
 // transactions should be run this way.
 func (s *Session) Transact(ctx context.Context, fn func(q *model.Queries) error) error {
-	return crdb.ExecuteTx(ctx, s.connection.db, nil, func(tx *sql.Tx) error {
+	var attempts int64
+
+	err := crdb.ExecuteTx(ctx, s.connection.db, nil, func(tx *sql.Tx) error {
+		attempts += 1
+		s.m.OnTransactionStarted(attempts)
+
 		qtx := model.New(tx)
 		sessions, err := qtx.SessionCheck(ctx, s.UUID)
 		if err != nil {
@@ -169,6 +190,10 @@
 
 		return nil
 	})
+	if err != nil {
+		s.m.OnTransactionFailed()
+	}
+	return err
 }
 
 var (
@@ -262,13 +287,16 @@
 	if err != nil {
 		return nil, err
 	}
-	klog.Infof("Started work %q on machine %q (sess %q)", process, *mid, s.UUID)
-	return &Work{
+	w := &Work{
 		Machine: *mid,
 		s:       s,
 		process: process,
 		backoff: exisingingBackoff,
-	}, nil
+		m:       s.m.WithProcess(process),
+	}
+	w.m.OnWorkStarted()
+	klog.Infof("Started work %q on machine %q (sess %q)", process, *mid, s.UUID)
+	return w, nil
 }
 
 // existingBackoff contains backoff information retrieved from a work item that
@@ -461,6 +489,8 @@
 	process model.Process
 
 	backoff *existingBackoff
+
+	m *metrics.ProcessRecorder
 }
 
 // Cancel the Work started on a machine. If the work has already been finished
@@ -470,6 +500,7 @@
 		return
 	}
 	w.done = true
+	w.m.OnWorkFinished(metrics.WorkResultCanceled)
 
 	klog.Infof("Canceling work %q on machine %q (sess %q)", w.process, w.Machine, w.s.UUID)
 	// Eat error and log. There's nothing we can do if this fails, and if it does, it's
@@ -508,6 +539,8 @@
 		return fmt.Errorf("already finished")
 	}
 	w.done = true
+	w.m.OnWorkFinished(metrics.WorkResultFinished)
+
 	klog.Infof("Finishing work %q on machine %q (sess %q)", w.process, w.Machine, w.s.UUID)
 	return w.s.Transact(ctx, func(q *model.Queries) error {
 		err := q.FinishWork(ctx, model.FinishWorkParams{
@@ -563,6 +596,7 @@
 		return fmt.Errorf("already finished")
 	}
 	w.done = true
+	w.m.OnWorkFinished(metrics.WorkResultFailed)
 
 	return w.s.Transact(ctx, func(q *model.Queries) error {
 		err := q.FinishWork(ctx, model.FinishWorkParams{