cloud/bmaas: implement BMDB reflection
This is the foundation for runtime introspection of BMDBs, to be used in
debug and operator tooling.
Change-Id: Id1eb0cd1dfd94c5d4dafde82448695497525e24f
Reviewed-on: https://review.monogon.dev/c/monogon/+/1131
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/cloud/bmaas/bmdb/BUILD.bazel b/cloud/bmaas/bmdb/BUILD.bazel
index b89f777..8c67b9c 100644
--- a/cloud/bmaas/bmdb/BUILD.bazel
+++ b/cloud/bmaas/bmdb/BUILD.bazel
@@ -11,6 +11,7 @@
visibility = ["//visibility:public"],
deps = [
"//cloud/bmaas/bmdb/model",
+ "//cloud/bmaas/bmdb/reflection",
"//cloud/lib/component",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_google_uuid//:uuid",
@@ -23,6 +24,7 @@
name = "bmdb_test",
srcs = [
"queries_test.go",
+ "reflection_test.go",
"sessions_test.go",
],
data = [
@@ -31,6 +33,7 @@
embed = [":bmdb"],
deps = [
"//cloud/bmaas/bmdb/model",
+ "//cloud/bmaas/bmdb/reflection",
"//cloud/lib/component",
"@com_github_google_uuid//:uuid",
],
diff --git a/cloud/bmaas/bmdb/connection.go b/cloud/bmaas/bmdb/connection.go
index 3db869d..c07bf15 100644
--- a/cloud/bmaas/bmdb/connection.go
+++ b/cloud/bmaas/bmdb/connection.go
@@ -1,12 +1,14 @@
package bmdb
import (
+ "context"
"database/sql"
"fmt"
"k8s.io/klog/v2"
"source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/bmaas/bmdb/reflection"
)
// Open creates a new Connection to the BMDB for the calling component. Multiple
@@ -58,3 +60,21 @@
// still be false.
InMemory bool
}
+
+// Reflect returns a reflection.Schema as detected by inspecting the table
+// information of this connection to the BMDB. The Schema can then be used to
+// retrieve arbitrary tag/machine information without going through the
+// concurrency/ordering mechanism of the BMDB.
+//
+// This should only be used to implement debugging tooling and should absolutely
+// not be in the path of any user requests.
+//
+// This Connection will be used not only to query the Schema information, but
+// also for all subsequent data retrieval operations on it. Please ensure that
+// the Schema is rebuilt in the event of a database connection failure. Ideally,
+// you should be rebuilding the schema often, to follow what is currently
+// available on the production database - but not for every request. Use a cache
+// or something.
+func (c *Connection) Reflect(ctx context.Context) (*reflection.Schema, error) {
+ return reflection.Reflect(ctx, c.db)
+}
diff --git a/cloud/bmaas/bmdb/reflection/BUILD.bazel b/cloud/bmaas/bmdb/reflection/BUILD.bazel
new file mode 100644
index 0000000..f685b98
--- /dev/null
+++ b/cloud/bmaas/bmdb/reflection/BUILD.bazel
@@ -0,0 +1,16 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "reflection",
+ srcs = [
+ "reflection.go",
+ "schema.go",
+ ],
+ importpath = "source.monogon.dev/cloud/bmaas/bmdb/reflection",
+ visibility = ["//visibility:public"],
+ deps = [
+ "@com_github_google_uuid//:uuid",
+ "@com_github_iancoleman_strcase//:strcase",
+ "@io_k8s_klog_v2//:klog",
+ ],
+)
diff --git a/cloud/bmaas/bmdb/reflection/reflection.go b/cloud/bmaas/bmdb/reflection/reflection.go
new file mode 100644
index 0000000..19bc40a
--- /dev/null
+++ b/cloud/bmaas/bmdb/reflection/reflection.go
@@ -0,0 +1,406 @@
+// Package reflection implements facilities to retrieve information about the
+// implemented Tags and their types from a plain CockroachDB SQL connection,
+// bypassing the queries/types defined in models. Then, the retrieved Schema can
+// be used to retrieve information about machines.
+//
+// This is designed to be used in debugging facilities to allow arbitrary machine
+// introspection. It must _not_ be used in the user path, as the schema
+// extraction functionality is implemented best-effort.
+package reflection
+
+import (
+ "context"
+ "database/sql"
+ "encoding/hex"
+ "fmt"
+ "sort"
+ "strings"
+ "time"
+
+ "github.com/google/uuid"
+)
+
+// GetMachinesOpts influences the behaviour of GetMachines.
+type GetMachinesOpts struct {
+ // FilterMachine, if set, will only retrieve information about the machine with
+ // the given UUID. In case the given machine UUID does not exist in the database,
+ // an empty result will be returned and _no_ error will be set.
+ FilterMachine *uuid.UUID
+ // Strict enables strict consistency. This is not recommended for use when
+ // retrieving all machines, as such queries will compete against all currently
+ // running operations. When not enabled, the retrieval will be executed AS OF
+ // SYSTEM TIME follower_timestamp(), meaning the data might be a few seconds out
+ // of date. Regardless of the option, the returned machine data will be
+ // internally consistent, even across machines - but when not enabled the data
+ // might be stale.
+ Strict bool
+ // ExpiredBackoffs enables the retrieval of information about all machine
+ // backoffs, including expired backoff. Note that expired backoffs might be
+ // garbage collected in the future, and their long-term storage is not
+ // guaranteed.
+ ExpiredBackoffs bool
+}
+
+// GetMachines retrieves all available BMDB data about one or more machines. The
+// Schema's embedded SQL connection is used to performed the retrieval.
+//
+// Options can be specified to influenced the exact operation performed. By
+// default (with a zeroed structure or nil pointer), all machines with active
+// backoffs are retrieved with weak consistency. See GetMachineOpts to influence
+// this behaviour.
+func (r *Schema) GetMachines(ctx context.Context, opts *GetMachinesOpts) (*Reflected[[]*Machine], error) {
+ if opts == nil {
+ opts = &GetMachinesOpts{}
+ }
+
+ // We're about to build a pretty big SELECT query with a ton of joins.
+ //
+ // First, we join against work_backoff and work to get information about active
+ // work and backoffs on the machines we're retrieving.
+ //
+ // Second, we join against all the tags that are declared in the schema.
+
+ // These are the colums we'll SELECT <...> FROM
+ columns := []string{
+ "machines.machine_id",
+ "machines.machine_created_at",
+ "work_backoff.process",
+ "work_backoff.cause",
+ "work_backoff.until",
+ "work.process",
+ "work.session_id",
+ // ... tag columns will come after this.
+ }
+ // These are tha args we'll pass to the query.
+ var args []any
+
+ // Start building joins. First, against work_backoff and work.
+ backoffFilter := " AND work_backoff.until > now()"
+ if opts.ExpiredBackoffs {
+ backoffFilter = ""
+ }
+ joins := []string{
+ "LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id" + backoffFilter,
+ "LEFT JOIN work ON machines.machine_id = work.machine_id",
+ }
+
+ // Then, against tags. Also populate columns as we go along.
+ for _, tagType := range r.TagTypes {
+ joins = append(joins, fmt.Sprintf("LEFT JOIN %s ON machines.machine_id = %s.machine_id", tagType.NativeName, tagType.NativeName))
+ columns = append(columns, fmt.Sprintf("%s.machine_id", tagType.NativeName))
+ for _, fieldType := range tagType.Fields {
+ columns = append(columns, fmt.Sprintf("%s.%s", tagType.NativeName, fieldType.NativeName))
+ }
+ }
+
+ // Finalize query.
+ q := []string{
+ "SELECT",
+ strings.Join(columns, ", "),
+ "FROM machines",
+ }
+ q = append(q, joins...)
+ if !opts.Strict {
+ q = append(q, "AS OF SYSTEM TIME follower_read_timestamp()")
+ }
+ if opts.FilterMachine != nil {
+ q = append(q, "WHERE machines.machine_id = $1")
+ args = append(args, *opts.FilterMachine)
+ }
+
+ rows, err := r.db.QueryContext(ctx, strings.Join(q, "\n"), args...)
+ if err != nil {
+ return nil, fmt.Errorf("query failed: %w", err)
+ }
+ defer rows.Close()
+
+ // Okay, we can start scanning the result rows.
+ //
+ // As this is a complex join, we need to merge some rows together and discard
+ // some NULLs. We do merging/deduplication using machine_id values for the
+ // machine data, and abuse UNIQUE constraints in the work_backoff/work tables to
+ // deduplicate these.
+ //
+ // The alternative would be to rewrite this query to use array_agg, and we might
+ // do that at some point. This is only really a problem if we
+ // have _a lot_ of active work/backoffs (as that effectively duplicates all
+ // machine/tag data), which isn't the case yet. But we should keep an eye out for
+ // this.
+
+ var machines []*Machine
+ for rows.Next() {
+
+ // We need to scan this row back into columns. For constant columns we'll just
+ // create the data here and refer to it later.
+ var dests []any
+
+ // Add non-tag always-retrieved constants.
+ var mid uuid.UUID
+ var machineCreated time.Time
+ var workSession uuid.NullUUID
+ var backoffProcess, backoffCause, workProcess sql.NullString
+ var backoffUntil sql.NullTime
+
+ dests = append(dests, &mid, &machineCreated, &backoffProcess, &backoffCause, &backoffUntil, &workProcess, &workSession)
+
+ // For dynamic data, we need to keep a reference to a list of columns that are
+ // part of tags, and then refer to them later. We can't just refer back to dests
+ // as the types are erased into `any`. scannedTags is that data storage.
+ type scannedTag struct {
+ ty *TagType
+ id uuid.NullUUID
+ fields []*TagField
+ }
+ var scannedTags []*scannedTag
+ for _, tagType := range r.TagTypes {
+ tagType := tagType
+ st := scannedTag{
+ ty: &tagType,
+ }
+ scannedTags = append(scannedTags, &st)
+ dests = append(dests, &st.id)
+ for _, fieldType := range tagType.Fields {
+ fieldType := fieldType
+ field := TagField{
+ Type: &fieldType,
+ }
+ dests = append(dests, &field)
+ st.fields = append(st.fields, &field)
+
+ }
+ }
+
+ if err := rows.Scan(dests...); err != nil {
+ return nil, fmt.Errorf("scan failed: %w", err)
+ }
+
+ // Now comes the merging/deduplication.
+
+ // First, check if we are processing a new machine. If so, create a new
+ // Machine. Otherwise, pick up the previous one.
+ var machine *Machine
+ if len(machines) == 0 || machines[len(machines)-1].ID.String() != mid.String() {
+ // New machine or no machine yet.
+ machine = &Machine{
+ ID: mid,
+ Created: machineCreated,
+ Tags: make(map[string]Tag),
+ Backoffs: make(map[string]Backoff),
+ Work: make(map[string]Work),
+ }
+
+ // Collect tags into machine.
+ for _, st := range scannedTags {
+ if !st.id.Valid {
+ continue
+ }
+ var fields []TagField
+ for _, f := range st.fields {
+ fields = append(fields, *f)
+ }
+ machine.Tags[st.ty.Name()] = Tag{
+ Type: st.ty,
+ Fields: fields,
+ }
+ }
+ machines = append(machines, machine)
+ } else {
+ // Continue previous machine.
+ machine = machines[len(machines)-1]
+ }
+
+ // Do we have a backoff? Upsert it to the machine. This works because there's a
+ // UNIQUE(machine_id, process) constraint on the work_backoff table, and we're
+ // effectively rebuilding that keyspace here by indexing first by machine then by
+ // process.
+ if backoffCause.Valid && backoffProcess.Valid && backoffUntil.Valid {
+ process := backoffProcess.String
+ machine.Backoffs[process] = Backoff{
+ Cause: backoffCause.String,
+ Process: process,
+ Until: backoffUntil.Time,
+ }
+ }
+
+ // Do we have an active work item? Upsert it to the machine. Same UNIQUE
+ // constraint abuse happening here.
+ if workProcess.Valid && workSession.Valid {
+ process := workProcess.String
+ machine.Work[process] = Work{
+ SessionID: workSession.UUID,
+ Process: process,
+ }
+ }
+ }
+
+ return &Reflected[[]*Machine]{
+ Data: machines,
+ Query: strings.Join(q, " "),
+ }, nil
+}
+
+// Reflected wraps data retrieved by reflection (T) with metadata about the
+// retrieval.
+type Reflected[T any] struct {
+ Data T
+ // Effective SQL query performed on the database.
+ Query string
+}
+
+// Machine retrieved from BMDB.
+type Machine struct {
+ ID uuid.UUID
+ Created time.Time
+
+ // Tags on this machine, keyed by Tag type name (canonical, not native).
+ Tags map[string]Tag
+
+ // Backoffs on this machine, keyed by process name. By default these are only
+ // active backoffs, unless ExpiredBackoffs was set on GetMachineOptions.
+ Backoffs map[string]Backoff
+
+ // Work active on this machine, keyed by process name.
+ Work map[string]Work
+}
+
+// ActiveBackoffs retrieves a copy of a Machine's active backoffs. Note: the
+// expiration check is performed according tu current system time, so it might
+// not be consistent with the data snapshot retrieved from the database.
+func (r *Machine) ActiveBackoffs() []*Backoff {
+ var res []*Backoff
+ for _, bo := range r.Backoffs {
+ bo := bo
+ if !bo.Active() {
+ continue
+ }
+ res = append(res, &bo)
+ }
+ sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
+ return res
+}
+
+// ExpiredBackoffs retrieves a copy of a Machine's expired backoffs. Note: the
+// expiration check is performed according tu current system time, so it might
+// not be consistent with the data snapshot retrieved from the database.
+func (r *Machine) ExpiredBackoffs() []*Backoff {
+ var res []*Backoff
+ for _, bo := range r.Backoffs {
+ bo := bo
+ if bo.Active() {
+ continue
+ }
+ res = append(res, &bo)
+ }
+ sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
+ return res
+}
+
+// Tag value set on a Machine.
+type Tag struct {
+ // Type describing this tag.
+ Type *TagType
+ // Field data contained in this tag, sorted alphabetically by name.
+ Fields []TagField
+}
+
+// Field is a shorthand for returning a TagField by its name.
+func (r *Tag) Field(name string) *TagField {
+ for _, f := range r.Fields {
+ if f.Type.NativeName == name {
+ return &f
+ }
+ }
+ return nil
+}
+
+// TagField value which is part of a Tag set on a Machine.
+type TagField struct {
+ // Type describing this field.
+ Type *TagFieldType
+
+ text *string
+ bytes *[]byte
+ time *time.Time
+}
+
+// HumanValue returns a human-readable (best effort) representation of the field
+// value.
+func (r *TagField) HumanValue() string {
+ switch {
+ case r.text != nil:
+ return *r.text
+ case r.bytes != nil:
+ return hex.EncodeToString(*r.bytes)
+ case r.time != nil:
+ return r.time.String()
+ default:
+ return "<unknown>"
+ }
+}
+
+// Backoff on a Machine.
+type Backoff struct {
+ // Process which established Backoff.
+ Process string
+ // Time when Backoff expires.
+ Until time.Time
+ // Cause for the Backoff as emitted by worker.
+ Cause string
+}
+
+// Active returns whether this Backoff is _currently_ active per the _local_ time.
+func (r Backoff) Active() bool {
+ return time.Now().Before(r.Until)
+}
+
+// Work being actively performed on a Machine.
+type Work struct {
+ // SessionID of the worker performing this Work.
+ SessionID uuid.UUID
+ // Process name of this Work.
+ Process string
+}
+
+// Scan implements sql.Scanner for direct scanning of query results into a
+// reflected tag value. This method is not meant to by used outside the
+// reflection package.
+func (r *TagField) Scan(src any) error {
+ if src == nil {
+ return nil
+ }
+
+ switch r.Type.NativeType {
+ case "text":
+ src2, ok := src.(string)
+ if !ok {
+ return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
+ }
+ r.text = &src2
+ case "bytea":
+ src2, ok := src.([]byte)
+ if !ok {
+ return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
+ }
+ r.bytes = &src2
+ case "USER-DEFINED":
+ switch r.Type.NativeUDTName {
+ case "provider":
+ src2, ok := src.([]byte)
+ if !ok {
+ return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
+ }
+ src3 := string(src2)
+ r.text = &src3
+ }
+ case "timestamp with time zone":
+ src2, ok := src.(time.Time)
+ if !ok {
+ return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
+ }
+ r.time = &src2
+ default:
+ return fmt.Errorf("unimplemented SQL type %q", r.Type.NativeType)
+ }
+
+ return nil
+}
diff --git a/cloud/bmaas/bmdb/reflection/schema.go b/cloud/bmaas/bmdb/reflection/schema.go
new file mode 100644
index 0000000..a29c16b
--- /dev/null
+++ b/cloud/bmaas/bmdb/reflection/schema.go
@@ -0,0 +1,169 @@
+package reflection
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "strings"
+
+ "github.com/iancoleman/strcase"
+ "k8s.io/klog/v2"
+)
+
+// Schema contains information about the tag types in a BMDB. It also contains an
+// active connection to the BMDB, allowing retrieval of data based on the
+// detected schema.
+//
+// It also contains an embedded connection to the CockroachDB database backing
+// this BMDB which is then used to retrieve data described by this schema.
+type Schema struct {
+ // TagTypes is the list of tag types extracted from the BMDB.
+ TagTypes []TagType
+ // Version is the go-migrate schema version of the BMDB this schema was extracted
+ // from. By convention, it is a stringified base-10 number representing the number
+ // of seconds since UNIX epoch of when the migration version was created, but
+ // this is not guaranteed.
+ Version string
+
+ db *sql.DB
+}
+
+// TagType describes the type of a BMDB Tag. Each tag in turn corresponds to a
+// CockroachDB database.
+type TagType struct {
+ // NativeName is the name of the table that holds tags of this type.
+ NativeName string
+ // Fields are the types of fields contained in this tag type.
+ Fields []TagFieldType
+}
+
+// Name returns the canonical name of this tag type. For example, a table named
+// machine_agent_started will have a canonical name AgentStarted.
+func (r *TagType) Name() string {
+ tableSuffix := strings.TrimPrefix(r.NativeName, "machine_")
+ parts := strings.Split(tableSuffix, "_")
+ // Capitalize some known acronyms.
+ for i, p := range parts {
+ parts[i] = strings.ReplaceAll(p, "os", "OS")
+ }
+ return strcase.ToCamel(strings.Join(parts, "_"))
+}
+
+// TagFieldType is the type of a field within a BMDB Tag. Each tag field in turn
+// corresponds to a column inside its Tag table.
+type TagFieldType struct {
+ // NativeName is the name of the column that holds this field type. It is also
+ // the canonical name of the field type.
+ NativeName string
+ // NativeType is the CockroachDB type name of this field.
+ NativeType string
+ // NativeUDTName is the CockroachDB user-defined-type name of this field. This is
+ // only valid if NativeType is 'USER-DEFINED'.
+ NativeUDTName string
+}
+
+// HumanType returns a human-readable representation of the field's type. This is
+// not well-defined, and should be used only informatively.
+func (r *TagFieldType) HumanType() string {
+ switch r.NativeType {
+ case "USER-DEFINED":
+ return r.NativeUDTName
+ case "timestamp with time zone":
+ return "timestamp"
+ case "bytea":
+ return "bytes"
+ default:
+ return r.NativeType
+ }
+}
+
+// Reflect builds a runtime BMDB schema from a raw SQL connection to the BMDB
+// database. You're probably looking for bmdb.Connection.Reflect.
+func Reflect(ctx context.Context, db *sql.DB) (*Schema, error) {
+ // Get all tables in the currently connected to database.
+ rows, err := db.QueryContext(ctx, `
+ SELECT table_name
+ FROM information_schema.tables
+ WHERE table_catalog = current_database()
+ AND table_schema = 'public'
+ AND table_name LIKE 'machine\_%'
+ `)
+ if err != nil {
+ return nil, fmt.Errorf("could not query table names: %w", err)
+ }
+ defer rows.Close()
+
+ // Collect all table names for further processing.
+ var tableNames []string
+ for rows.Next() {
+ var name string
+ if err := rows.Scan(&name); err != nil {
+ return nil, fmt.Errorf("table name scan failed: %w", err)
+ }
+ tableNames = append(tableNames, name)
+ }
+
+ // Start processing each table into a TagType.
+ tags := make([]TagType, 0, len(tableNames))
+ for _, tagName := range tableNames {
+ // Get all columns of the table.
+ rows, err := db.QueryContext(ctx, `
+ SELECT column_name, data_type, udt_name
+ FROM information_schema.columns
+ WHERE table_catalog = current_database()
+ AND table_schema = 'public'
+ AND table_name = $1
+ `, tagName)
+ if err != nil {
+ return nil, fmt.Errorf("could not query columns: %w", err)
+ }
+
+ tag := TagType{
+ NativeName: tagName,
+ }
+
+ // Build field types from columns.
+ foundMachineID := false
+ for rows.Next() {
+ var column_name, data_type, udt_name string
+ if err := rows.Scan(&column_name, &data_type, &udt_name); err != nil {
+ rows.Close()
+ return nil, fmt.Errorf("column scan failed: %w", err)
+ }
+ if column_name == "machine_id" {
+ foundMachineID = true
+ continue
+ }
+ tag.Fields = append(tag.Fields, TagFieldType{
+ NativeName: column_name,
+ NativeType: data_type,
+ NativeUDTName: udt_name,
+ })
+ }
+
+ // Make sure there's a machine_id key in the table, then remove it.
+ if !foundMachineID {
+ klog.Warningf("Table %q has no machine_id column, skipping", tag.NativeName)
+ continue
+ }
+
+ tags = append(tags, tag)
+ }
+
+ // Retrieve version information from go-migrate's schema_migrations table.
+ var version string
+ var dirty bool
+ if err := db.QueryRowContext(ctx, "SELECT version, dirty FROM schema_migrations").Scan(&version, &dirty); err != nil {
+ return nil, fmt.Errorf("could not select schema version: %w", err)
+ }
+ if dirty {
+ version += " DIRTY!!!"
+ }
+
+ return &Schema{
+ TagTypes: tags,
+ Version: version,
+
+ db: db,
+ }, nil
+}
diff --git a/cloud/bmaas/bmdb/reflection_test.go b/cloud/bmaas/bmdb/reflection_test.go
new file mode 100644
index 0000000..73aa397
--- /dev/null
+++ b/cloud/bmaas/bmdb/reflection_test.go
@@ -0,0 +1,209 @@
+package bmdb
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/google/uuid"
+
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/bmaas/bmdb/reflection"
+)
+
+// TestReflection exercises the BMDB reflection schema reflection and data
+// retrieval code. Ideally this code would live in //cloud/bmaas/bmdb/reflection,
+// but due to namespacing issues it lives here.
+func TestReflection(t *testing.T) {
+ b := dut()
+ conn, err := b.Open(true)
+ if err != nil {
+ t.Fatalf("Open failed: %v", err)
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ sess, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("StartSession: %v", err)
+ }
+
+ // Create 10 test machines.
+ var mids []uuid.UUID
+ sess.Transact(ctx, func(q *model.Queries) error {
+ for i := 0; i < 10; i += 1 {
+ mach, err := q.NewMachine(ctx)
+ if err != nil {
+ return err
+ }
+ err = q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: mach.MachineID,
+ Provider: model.ProviderEquinix,
+ ProviderID: fmt.Sprintf("test-%d", i),
+ })
+ if err != nil {
+ return err
+ }
+ mids = append(mids, mach.MachineID)
+ }
+ return nil
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Start and fail work on one of the machines with an hour long backoff.
+ w, err := sess.Work(ctx, model.ProcessUnitTest1, func(q *model.Queries) ([]uuid.UUID, error) {
+ return mids[0:1], nil
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ to := time.Hour
+ w.Fail(ctx, &to, "failure test")
+
+ // On another machine, create a failure with a 1 second backoff.
+ w, err = sess.Work(ctx, model.ProcessUnitTest1, func(q *model.Queries) ([]uuid.UUID, error) {
+ return mids[1:2], nil
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ to = time.Second
+ w.Fail(ctx, &to, "failure test")
+ // Later on in the test we must wait for this backoff to actually elapse. Start
+ // counting now.
+ elapsed := time.NewTicker(to * 1)
+ defer elapsed.Stop()
+
+ // On another machine, create work and don't finish it yet.
+ _, err = sess.Work(ctx, model.ProcessUnitTest1, func(q *model.Queries) ([]uuid.UUID, error) {
+ return mids[2:3], nil
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ schema, err := conn.Reflect(ctx)
+ if err != nil {
+ t.Fatalf("ReflectTagTypes: %v", err)
+ }
+
+ // Dump all in strict mode.
+ opts := &reflection.GetMachinesOpts{
+ Strict: true,
+ }
+ res, err := schema.GetMachines(ctx, opts)
+ if err != nil {
+ t.Fatalf("Dump failed: %v", err)
+ }
+ if res.Query == "" {
+ t.Errorf("Query not set on result")
+ }
+ machines := res.Data
+ if want, got := 10, len(machines); want != got {
+ t.Fatalf("Expected %d machines in dump, got %d", want, got)
+ }
+
+ // Expect Provided tag on all machines. Do a detailed check on fields, too.
+ for _, machine := range machines {
+ tag, ok := machine.Tags["Provided"]
+ if !ok {
+ t.Errorf("No Provided tag on machine.")
+ continue
+ }
+ if want, got := "Provided", tag.Type.Name(); want != got {
+ t.Errorf("Provided tag should have type %q, got %q", want, got)
+ }
+ if provider := tag.Field("provider"); provider != nil {
+ if want, got := provider.HumanValue(), "Equinix"; want != got {
+ t.Errorf("Wanted Provided.provider value %q, got %q", want, got)
+ }
+ } else {
+ t.Errorf("Provider tag has no provider field")
+ }
+ if providerId := tag.Field("provider_id"); providerId != nil {
+ if !strings.HasPrefix(providerId.HumanValue(), "test-") {
+ t.Errorf("Unexpected provider_id value %q", providerId.HumanValue())
+ }
+ } else {
+ t.Errorf("Provider tag has no provider_id field")
+ }
+ }
+
+ // Now just dump one machine.
+ opts.FilterMachine = &mids[0]
+ res, err = schema.GetMachines(ctx, opts)
+ if err != nil {
+ t.Fatalf("Dump failed: %v", err)
+ }
+ machines = res.Data
+ if want, got := 1, len(machines); want != got {
+ t.Fatalf("Expected %d machines in dump, got %d", want, got)
+ }
+ if want, got := mids[0].String(), machines[0].ID.String(); want != got {
+ t.Fatalf("Expected machine %s, got %s", want, got)
+ }
+
+ // Now dump a machine that doesn't exist. That should just return an empty list.
+ fakeMid := uuid.New()
+ opts.FilterMachine = &fakeMid
+ res, err = schema.GetMachines(ctx, opts)
+ if err != nil {
+ t.Fatalf("Dump failed: %v", err)
+ }
+ machines = res.Data
+ if want, got := 0, len(machines); want != got {
+ t.Fatalf("Expected %d machines in dump, got %d", want, got)
+ }
+
+ // Finally, check the special case machines. The first one should have an active
+ // backoff.
+ opts.FilterMachine = &mids[0]
+ res, err = schema.GetMachines(ctx, opts)
+ if err != nil {
+ t.Errorf("Dump failed: %v", err)
+ } else {
+ machine := res.Data[0]
+ if _, ok := machine.Backoffs["UnitTest1"]; !ok {
+ t.Errorf("Expected UnitTest1 backoff on machine")
+ }
+ }
+ // The second one should have an expired backoff that shouldn't be reported in a
+ // normal call..
+ <-elapsed.C
+ opts.FilterMachine = &mids[1]
+ res, err = schema.GetMachines(ctx, opts)
+ if err != nil {
+ t.Errorf("Dump failed: %v", err)
+ } else {
+ machine := res.Data[0]
+ if _, ok := machine.Backoffs["UnitTest1"]; ok {
+ t.Errorf("Expected no UnitTest1 backoff on machine")
+ }
+ }
+ // But if we ask for expired backoffs, we should get it.
+ opts.ExpiredBackoffs = true
+ res, err = schema.GetMachines(ctx, opts)
+ if err != nil {
+ t.Errorf("Dump failed: %v", err)
+ } else {
+ machine := res.Data[0]
+ if _, ok := machine.Backoffs["UnitTest1"]; !ok {
+ t.Errorf("Expected UnitTest1 backoff on machine")
+ }
+ }
+ // Finally, the third machine should have an active Work item.
+ opts.FilterMachine = &mids[2]
+ res, err = schema.GetMachines(ctx, opts)
+ if err != nil {
+ t.Errorf("Dump failed: %v", err)
+ } else {
+ machine := res.Data[0]
+ if _, ok := machine.Work["UnitTest1"]; !ok {
+ t.Errorf("Expected UnitTest1 work item on machine")
+ }
+ }
+}
diff --git a/go.mod b/go.mod
index 44db341..4c48b0e 100644
--- a/go.mod
+++ b/go.mod
@@ -88,6 +88,7 @@
github.com/google/gopacket v1.1.19
github.com/google/nftables v0.0.0-20220221214239-211824995dcb
github.com/google/uuid v1.3.0
+ github.com/iancoleman/strcase v0.2.0
github.com/improbable-eng/grpc-web v0.15.0
github.com/insomniacslk/dhcp v0.0.0-20220119180841-3c283ff8b7dd
github.com/joho/godotenv v1.4.0
diff --git a/go.sum b/go.sum
index f5c166d..1cefa75 100644
--- a/go.sum
+++ b/go.sum
@@ -1229,6 +1229,7 @@
github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4=
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/hugelgupf/socketpair v0.0.0-20190730060125-05d35a94e714/go.mod h1:2Goc3h8EklBH5mspfHFxBnEoURQCGzQQH1ga9Myjvis=
+github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0=
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=