cloud/bmaas: implement BMDB reflection

This is the foundation for runtime introspection of BMDBs, to be used in
debug and operator tooling.

Change-Id: Id1eb0cd1dfd94c5d4dafde82448695497525e24f
Reviewed-on: https://review.monogon.dev/c/monogon/+/1131
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/cloud/bmaas/bmdb/reflection/BUILD.bazel b/cloud/bmaas/bmdb/reflection/BUILD.bazel
new file mode 100644
index 0000000..f685b98
--- /dev/null
+++ b/cloud/bmaas/bmdb/reflection/BUILD.bazel
@@ -0,0 +1,16 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+    name = "reflection",
+    srcs = [
+        "reflection.go",
+        "schema.go",
+    ],
+    importpath = "source.monogon.dev/cloud/bmaas/bmdb/reflection",
+    visibility = ["//visibility:public"],
+    deps = [
+        "@com_github_google_uuid//:uuid",
+        "@com_github_iancoleman_strcase//:strcase",
+        "@io_k8s_klog_v2//:klog",
+    ],
+)
diff --git a/cloud/bmaas/bmdb/reflection/reflection.go b/cloud/bmaas/bmdb/reflection/reflection.go
new file mode 100644
index 0000000..19bc40a
--- /dev/null
+++ b/cloud/bmaas/bmdb/reflection/reflection.go
@@ -0,0 +1,406 @@
+// Package reflection implements facilities to retrieve information about the
+// implemented Tags and their types from a plain CockroachDB SQL connection,
+// bypassing the queries/types defined in models. Then, the retrieved Schema can
+// be used to retrieve information about machines.
+//
+// This is designed to be used in debugging facilities to allow arbitrary machine
+// introspection. It must _not_ be used in the user path, as the schema
+// extraction functionality is implemented best-effort.
+package reflection
+
+import (
+	"context"
+	"database/sql"
+	"encoding/hex"
+	"fmt"
+	"sort"
+	"strings"
+	"time"
+
+	"github.com/google/uuid"
+)
+
+// GetMachinesOpts influences the behaviour of GetMachines.
+type GetMachinesOpts struct {
+	// FilterMachine, if set, will only retrieve information about the machine with
+	// the given UUID. In case the given machine UUID does not exist in the database,
+	// an empty result will be returned and _no_ error will be set.
+	FilterMachine *uuid.UUID
+	// Strict enables strict consistency. This is not recommended for use when
+	// retrieving all machines, as such queries will compete against all currently
+	// running operations. When not enabled, the retrieval will be executed AS OF
+	// SYSTEM TIME follower_timestamp(), meaning the data might be a few seconds out
+	// of date. Regardless of the option, the returned machine data will be
+	// internally consistent, even across machines - but when not enabled the data
+	// might be stale.
+	Strict bool
+	// ExpiredBackoffs enables the retrieval of information about all machine
+	// backoffs, including expired backoff. Note that expired backoffs might be
+	// garbage collected in the future, and their long-term storage is not
+	// guaranteed.
+	ExpiredBackoffs bool
+}
+
+// GetMachines retrieves all available BMDB data about one or more machines. The
+// Schema's embedded SQL connection is used to performed the retrieval.
+//
+// Options can be specified to influenced the exact operation performed. By
+// default (with a zeroed structure or nil pointer), all machines with active
+// backoffs are retrieved with weak consistency. See GetMachineOpts to influence
+// this behaviour.
+func (r *Schema) GetMachines(ctx context.Context, opts *GetMachinesOpts) (*Reflected[[]*Machine], error) {
+	if opts == nil {
+		opts = &GetMachinesOpts{}
+	}
+
+	// We're about to build a pretty big SELECT query with a ton of joins.
+	//
+	// First, we join against work_backoff and work to get information about active
+	// work and backoffs on the machines we're retrieving.
+	//
+	// Second, we join against all the tags that are declared in the schema.
+
+	// These are the colums we'll SELECT <...> FROM
+	columns := []string{
+		"machines.machine_id",
+		"machines.machine_created_at",
+		"work_backoff.process",
+		"work_backoff.cause",
+		"work_backoff.until",
+		"work.process",
+		"work.session_id",
+		// ... tag columns will come after this.
+	}
+	// These are tha args we'll pass to the query.
+	var args []any
+
+	// Start building joins. First, against work_backoff and work.
+	backoffFilter := " AND work_backoff.until > now()"
+	if opts.ExpiredBackoffs {
+		backoffFilter = ""
+	}
+	joins := []string{
+		"LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id" + backoffFilter,
+		"LEFT JOIN work ON machines.machine_id = work.machine_id",
+	}
+
+	// Then, against tags. Also populate columns as we go along.
+	for _, tagType := range r.TagTypes {
+		joins = append(joins, fmt.Sprintf("LEFT JOIN %s ON machines.machine_id = %s.machine_id", tagType.NativeName, tagType.NativeName))
+		columns = append(columns, fmt.Sprintf("%s.machine_id", tagType.NativeName))
+		for _, fieldType := range tagType.Fields {
+			columns = append(columns, fmt.Sprintf("%s.%s", tagType.NativeName, fieldType.NativeName))
+		}
+	}
+
+	// Finalize query.
+	q := []string{
+		"SELECT",
+		strings.Join(columns, ", "),
+		"FROM machines",
+	}
+	q = append(q, joins...)
+	if !opts.Strict {
+		q = append(q, "AS OF SYSTEM TIME follower_read_timestamp()")
+	}
+	if opts.FilterMachine != nil {
+		q = append(q, "WHERE machines.machine_id = $1")
+		args = append(args, *opts.FilterMachine)
+	}
+
+	rows, err := r.db.QueryContext(ctx, strings.Join(q, "\n"), args...)
+	if err != nil {
+		return nil, fmt.Errorf("query failed: %w", err)
+	}
+	defer rows.Close()
+
+	// Okay, we can start scanning the result rows.
+	//
+	// As this is a complex join, we need to merge some rows together and discard
+	// some NULLs. We do merging/deduplication using machine_id values for the
+	// machine data, and abuse UNIQUE constraints in the work_backoff/work tables to
+	// deduplicate these.
+	//
+	// The alternative would be to rewrite this query to use array_agg, and we might
+	// do that at some point. This is only really a problem if we
+	// have _a lot_ of active work/backoffs (as that effectively duplicates all
+	// machine/tag data), which isn't the case yet. But we should keep an eye out for
+	// this.
+
+	var machines []*Machine
+	for rows.Next() {
+
+		// We need to scan this row back into columns. For constant columns we'll just
+		// create the data here and refer to it later.
+		var dests []any
+
+		// Add non-tag always-retrieved constants.
+		var mid uuid.UUID
+		var machineCreated time.Time
+		var workSession uuid.NullUUID
+		var backoffProcess, backoffCause, workProcess sql.NullString
+		var backoffUntil sql.NullTime
+
+		dests = append(dests, &mid, &machineCreated, &backoffProcess, &backoffCause, &backoffUntil, &workProcess, &workSession)
+
+		// For dynamic data, we need to keep a reference to a list of columns that are
+		// part of tags, and then refer to them later. We can't just refer back to dests
+		// as the types are erased into `any`. scannedTags is that data storage.
+		type scannedTag struct {
+			ty     *TagType
+			id     uuid.NullUUID
+			fields []*TagField
+		}
+		var scannedTags []*scannedTag
+		for _, tagType := range r.TagTypes {
+			tagType := tagType
+			st := scannedTag{
+				ty: &tagType,
+			}
+			scannedTags = append(scannedTags, &st)
+			dests = append(dests, &st.id)
+			for _, fieldType := range tagType.Fields {
+				fieldType := fieldType
+				field := TagField{
+					Type: &fieldType,
+				}
+				dests = append(dests, &field)
+				st.fields = append(st.fields, &field)
+
+			}
+		}
+
+		if err := rows.Scan(dests...); err != nil {
+			return nil, fmt.Errorf("scan failed: %w", err)
+		}
+
+		// Now comes the merging/deduplication.
+
+		// First, check if we are processing a new machine. If so, create a new
+		// Machine. Otherwise, pick up the previous one.
+		var machine *Machine
+		if len(machines) == 0 || machines[len(machines)-1].ID.String() != mid.String() {
+			// New machine or no machine yet.
+			machine = &Machine{
+				ID:       mid,
+				Created:  machineCreated,
+				Tags:     make(map[string]Tag),
+				Backoffs: make(map[string]Backoff),
+				Work:     make(map[string]Work),
+			}
+
+			// Collect tags into machine.
+			for _, st := range scannedTags {
+				if !st.id.Valid {
+					continue
+				}
+				var fields []TagField
+				for _, f := range st.fields {
+					fields = append(fields, *f)
+				}
+				machine.Tags[st.ty.Name()] = Tag{
+					Type:   st.ty,
+					Fields: fields,
+				}
+			}
+			machines = append(machines, machine)
+		} else {
+			// Continue previous machine.
+			machine = machines[len(machines)-1]
+		}
+
+		// Do we have a backoff? Upsert it to the machine. This works because there's a
+		// UNIQUE(machine_id, process) constraint on the work_backoff table, and we're
+		// effectively rebuilding that keyspace here by indexing first by machine then by
+		// process.
+		if backoffCause.Valid && backoffProcess.Valid && backoffUntil.Valid {
+			process := backoffProcess.String
+			machine.Backoffs[process] = Backoff{
+				Cause:   backoffCause.String,
+				Process: process,
+				Until:   backoffUntil.Time,
+			}
+		}
+
+		// Do we have an active work item? Upsert it to the machine. Same UNIQUE
+		// constraint abuse happening here.
+		if workProcess.Valid && workSession.Valid {
+			process := workProcess.String
+			machine.Work[process] = Work{
+				SessionID: workSession.UUID,
+				Process:   process,
+			}
+		}
+	}
+
+	return &Reflected[[]*Machine]{
+		Data:  machines,
+		Query: strings.Join(q, " "),
+	}, nil
+}
+
+// Reflected wraps data retrieved by reflection (T) with metadata about the
+// retrieval.
+type Reflected[T any] struct {
+	Data T
+	// Effective SQL query performed on the database.
+	Query string
+}
+
+// Machine retrieved from BMDB.
+type Machine struct {
+	ID      uuid.UUID
+	Created time.Time
+
+	// Tags on this machine, keyed by Tag type name (canonical, not native).
+	Tags map[string]Tag
+
+	// Backoffs on this machine, keyed by process name. By default these are only
+	// active backoffs, unless ExpiredBackoffs was set on GetMachineOptions.
+	Backoffs map[string]Backoff
+
+	// Work active on this machine, keyed by process name.
+	Work map[string]Work
+}
+
+// ActiveBackoffs retrieves a copy of a Machine's active backoffs. Note: the
+// expiration check is performed according tu current system time, so it might
+// not be consistent with the data snapshot retrieved from the database.
+func (r *Machine) ActiveBackoffs() []*Backoff {
+	var res []*Backoff
+	for _, bo := range r.Backoffs {
+		bo := bo
+		if !bo.Active() {
+			continue
+		}
+		res = append(res, &bo)
+	}
+	sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
+	return res
+}
+
+// ExpiredBackoffs retrieves a copy of a Machine's expired backoffs. Note: the
+// expiration check is performed according tu current system time, so it might
+// not be consistent with the data snapshot retrieved from the database.
+func (r *Machine) ExpiredBackoffs() []*Backoff {
+	var res []*Backoff
+	for _, bo := range r.Backoffs {
+		bo := bo
+		if bo.Active() {
+			continue
+		}
+		res = append(res, &bo)
+	}
+	sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
+	return res
+}
+
+// Tag value set on a Machine.
+type Tag struct {
+	// Type describing this tag.
+	Type *TagType
+	// Field data contained in this tag, sorted alphabetically by name.
+	Fields []TagField
+}
+
+// Field is a shorthand for returning a TagField by its name.
+func (r *Tag) Field(name string) *TagField {
+	for _, f := range r.Fields {
+		if f.Type.NativeName == name {
+			return &f
+		}
+	}
+	return nil
+}
+
+// TagField value which is part of a Tag set on a Machine.
+type TagField struct {
+	// Type describing this field.
+	Type *TagFieldType
+
+	text  *string
+	bytes *[]byte
+	time  *time.Time
+}
+
+// HumanValue returns a human-readable (best effort) representation of the field
+// value.
+func (r *TagField) HumanValue() string {
+	switch {
+	case r.text != nil:
+		return *r.text
+	case r.bytes != nil:
+		return hex.EncodeToString(*r.bytes)
+	case r.time != nil:
+		return r.time.String()
+	default:
+		return "<unknown>"
+	}
+}
+
+// Backoff on a Machine.
+type Backoff struct {
+	// Process which established Backoff.
+	Process string
+	// Time when Backoff expires.
+	Until time.Time
+	// Cause for the Backoff as emitted by worker.
+	Cause string
+}
+
+// Active returns whether this Backoff is _currently_ active per the _local_ time.
+func (r Backoff) Active() bool {
+	return time.Now().Before(r.Until)
+}
+
+// Work being actively performed on a Machine.
+type Work struct {
+	// SessionID of the worker performing this Work.
+	SessionID uuid.UUID
+	// Process name of this Work.
+	Process string
+}
+
+// Scan implements sql.Scanner for direct scanning of query results into a
+// reflected tag value. This method is not meant to by used outside the
+// reflection package.
+func (r *TagField) Scan(src any) error {
+	if src == nil {
+		return nil
+	}
+
+	switch r.Type.NativeType {
+	case "text":
+		src2, ok := src.(string)
+		if !ok {
+			return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
+		}
+		r.text = &src2
+	case "bytea":
+		src2, ok := src.([]byte)
+		if !ok {
+			return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
+		}
+		r.bytes = &src2
+	case "USER-DEFINED":
+		switch r.Type.NativeUDTName {
+		case "provider":
+			src2, ok := src.([]byte)
+			if !ok {
+				return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
+			}
+			src3 := string(src2)
+			r.text = &src3
+		}
+	case "timestamp with time zone":
+		src2, ok := src.(time.Time)
+		if !ok {
+			return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
+		}
+		r.time = &src2
+	default:
+		return fmt.Errorf("unimplemented SQL type %q", r.Type.NativeType)
+	}
+
+	return nil
+}
diff --git a/cloud/bmaas/bmdb/reflection/schema.go b/cloud/bmaas/bmdb/reflection/schema.go
new file mode 100644
index 0000000..a29c16b
--- /dev/null
+++ b/cloud/bmaas/bmdb/reflection/schema.go
@@ -0,0 +1,169 @@
+package reflection
+
+import (
+	"context"
+	"database/sql"
+	"fmt"
+	"strings"
+
+	"github.com/iancoleman/strcase"
+	"k8s.io/klog/v2"
+)
+
+// Schema contains information about the tag types in a BMDB. It also contains an
+// active connection to the BMDB, allowing retrieval of data based on the
+// detected schema.
+//
+// It also contains an embedded connection to the CockroachDB database backing
+// this BMDB which is then used to retrieve data described by this schema.
+type Schema struct {
+	// TagTypes is the list of tag types extracted from the BMDB.
+	TagTypes []TagType
+	// Version is the go-migrate schema version of the BMDB this schema was extracted
+	// from. By convention, it is a stringified base-10 number representing the number
+	// of seconds since UNIX epoch of when the migration version was created, but
+	// this is not guaranteed.
+	Version string
+
+	db *sql.DB
+}
+
+// TagType describes the type of a BMDB Tag. Each tag in turn corresponds to a
+// CockroachDB database.
+type TagType struct {
+	// NativeName is the name of the table that holds tags of this type.
+	NativeName string
+	// Fields are the types of fields contained in this tag type.
+	Fields []TagFieldType
+}
+
+// Name returns the canonical name of this tag type. For example, a table named
+// machine_agent_started will have a canonical name AgentStarted.
+func (r *TagType) Name() string {
+	tableSuffix := strings.TrimPrefix(r.NativeName, "machine_")
+	parts := strings.Split(tableSuffix, "_")
+	// Capitalize some known acronyms.
+	for i, p := range parts {
+		parts[i] = strings.ReplaceAll(p, "os", "OS")
+	}
+	return strcase.ToCamel(strings.Join(parts, "_"))
+}
+
+// TagFieldType is the type of a field within a BMDB Tag. Each tag field in turn
+// corresponds to a column inside its Tag table.
+type TagFieldType struct {
+	// NativeName is the name of the column that holds this field type. It is also
+	// the canonical name of the field type.
+	NativeName string
+	// NativeType is the CockroachDB type name of this field.
+	NativeType string
+	// NativeUDTName is the CockroachDB user-defined-type name of this field. This is
+	// only valid if NativeType is 'USER-DEFINED'.
+	NativeUDTName string
+}
+
+// HumanType returns a human-readable representation of the field's type. This is
+// not well-defined, and should be used only informatively.
+func (r *TagFieldType) HumanType() string {
+	switch r.NativeType {
+	case "USER-DEFINED":
+		return r.NativeUDTName
+	case "timestamp with time zone":
+		return "timestamp"
+	case "bytea":
+		return "bytes"
+	default:
+		return r.NativeType
+	}
+}
+
+// Reflect builds a runtime BMDB schema from a raw SQL connection to the BMDB
+// database. You're probably looking for bmdb.Connection.Reflect.
+func Reflect(ctx context.Context, db *sql.DB) (*Schema, error) {
+	// Get all tables in the currently connected to database.
+	rows, err := db.QueryContext(ctx, `
+		SELECT table_name
+		FROM information_schema.tables
+		WHERE table_catalog = current_database()
+          AND table_schema = 'public'
+          AND table_name LIKE 'machine\_%'
+	`)
+	if err != nil {
+		return nil, fmt.Errorf("could not query table names: %w", err)
+	}
+	defer rows.Close()
+
+	// Collect all table names for further processing.
+	var tableNames []string
+	for rows.Next() {
+		var name string
+		if err := rows.Scan(&name); err != nil {
+			return nil, fmt.Errorf("table name scan failed: %w", err)
+		}
+		tableNames = append(tableNames, name)
+	}
+
+	// Start processing each table into a TagType.
+	tags := make([]TagType, 0, len(tableNames))
+	for _, tagName := range tableNames {
+		// Get all columns of the table.
+		rows, err := db.QueryContext(ctx, `
+			SELECT column_name, data_type, udt_name
+			FROM information_schema.columns
+		    WHERE table_catalog = current_database()
+			  AND table_schema = 'public'
+		      AND table_name = $1
+		`, tagName)
+		if err != nil {
+			return nil, fmt.Errorf("could not query columns: %w", err)
+		}
+
+		tag := TagType{
+			NativeName: tagName,
+		}
+
+		// Build field types from columns.
+		foundMachineID := false
+		for rows.Next() {
+			var column_name, data_type, udt_name string
+			if err := rows.Scan(&column_name, &data_type, &udt_name); err != nil {
+				rows.Close()
+				return nil, fmt.Errorf("column scan failed: %w", err)
+			}
+			if column_name == "machine_id" {
+				foundMachineID = true
+				continue
+			}
+			tag.Fields = append(tag.Fields, TagFieldType{
+				NativeName:    column_name,
+				NativeType:    data_type,
+				NativeUDTName: udt_name,
+			})
+		}
+
+		// Make sure there's a machine_id key in the table, then remove it.
+		if !foundMachineID {
+			klog.Warningf("Table %q has no machine_id column, skipping", tag.NativeName)
+			continue
+		}
+
+		tags = append(tags, tag)
+	}
+
+	// Retrieve version information from go-migrate's schema_migrations table.
+	var version string
+	var dirty bool
+	if err := db.QueryRowContext(ctx, "SELECT version, dirty FROM schema_migrations").Scan(&version, &dirty); err != nil {
+		return nil, fmt.Errorf("could not select schema version: %w", err)
+	}
+	if dirty {
+		version += " DIRTY!!!"
+	}
+
+	return &Schema{
+		TagTypes: tags,
+		Version:  version,
+
+		db: db,
+	}, nil
+}