| // Package reflection implements facilities to retrieve information about the | 
 | // implemented Tags and their types from a plain CockroachDB SQL connection, | 
 | // bypassing the queries/types defined in models. Then, the retrieved Schema can | 
 | // be used to retrieve information about machines. | 
 | // | 
 | // This is designed to be used in debugging facilities to allow arbitrary machine | 
 | // introspection. It must _not_ be used in the user path, as the schema | 
 | // extraction functionality is implemented best-effort. | 
 | package reflection | 
 |  | 
 | import ( | 
 | 	"context" | 
 | 	"database/sql" | 
 | 	"encoding/hex" | 
 | 	"fmt" | 
 | 	"sort" | 
 | 	"strings" | 
 | 	"time" | 
 |  | 
 | 	"k8s.io/klog/v2" | 
 |  | 
 | 	"github.com/google/uuid" | 
 | 	"google.golang.org/protobuf/encoding/prototext" | 
 | 	"google.golang.org/protobuf/proto" | 
 | 	"google.golang.org/protobuf/reflect/protopath" | 
 | 	"google.golang.org/protobuf/reflect/protorange" | 
 | ) | 
 |  | 
 | // GetMachinesOpts influences the behaviour of GetMachines. | 
 | type GetMachinesOpts struct { | 
 | 	// FilterMachine, if set, will only retrieve information about the machine with | 
 | 	// the given UUID. In case the given machine UUID does not exist in the database, | 
 | 	// an empty result will be returned and _no_ error will be set. | 
 | 	FilterMachine *uuid.UUID | 
 | 	// Strict enables strict consistency. This is not recommended for use when | 
 | 	// retrieving all machines, as such queries will compete against all currently | 
 | 	// running operations. When not enabled, the retrieval will be executed AS OF | 
 | 	// SYSTEM TIME follower_timestamp(), meaning the data might be a few seconds out | 
 | 	// of date. Regardless of the option, the returned machine data will be | 
 | 	// internally consistent, even across machines - but when not enabled the data | 
 | 	// might be stale. | 
 | 	Strict bool | 
 | 	// ExpiredBackoffs enables the retrieval of information about all machine | 
 | 	// backoffs, including expired backoff. Note that expired backoffs might be | 
 | 	// garbage collected in the future, and their long-term storage is not | 
 | 	// guaranteed. | 
 | 	ExpiredBackoffs bool | 
 | } | 
 |  | 
 | // GetMachines retrieves all available BMDB data about one or more machines. The | 
 | // Schema's embedded SQL connection is used to performed the retrieval. | 
 | // | 
 | // Options can be specified to influenced the exact operation performed. By | 
 | // default (with a zeroed structure or nil pointer), all machines with active | 
 | // backoffs are retrieved with weak consistency. See GetMachineOpts to influence | 
 | // this behaviour. | 
 | func (r *Schema) GetMachines(ctx context.Context, opts *GetMachinesOpts) (*Reflected[[]*Machine], error) { | 
 | 	if opts == nil { | 
 | 		opts = &GetMachinesOpts{} | 
 | 	} | 
 |  | 
 | 	// We're about to build a pretty big SELECT query with a ton of joins. | 
 | 	// | 
 | 	// First, we join against work_backoff and work to get information about active | 
 | 	// work and backoffs on the machines we're retrieving. | 
 | 	// | 
 | 	// Second, we join against all the tags that are declared in the schema. | 
 |  | 
 | 	// These are the colums we'll SELECT <...> FROM | 
 | 	columns := []string{ | 
 | 		"machines.machine_id", | 
 | 		"machines.machine_created_at", | 
 | 		"work_backoff.process", | 
 | 		"work_backoff.cause", | 
 | 		"work_backoff.until", | 
 | 		"work.process", | 
 | 		"work.session_id", | 
 | 		// ... tag columns will come after this. | 
 | 	} | 
 | 	// These are tha args we'll pass to the query. | 
 | 	var args []any | 
 |  | 
 | 	// Start building joins. First, against work_backoff and work. | 
 | 	backoffFilter := " AND work_backoff.until > now()" | 
 | 	if opts.ExpiredBackoffs { | 
 | 		backoffFilter = "" | 
 | 	} | 
 | 	joins := []string{ | 
 | 		"LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id" + backoffFilter, | 
 | 		"LEFT JOIN work ON machines.machine_id = work.machine_id", | 
 | 	} | 
 |  | 
 | 	// Then, against tags. Also populate columns as we go along. | 
 | 	for _, tagType := range r.TagTypes { | 
 | 		joins = append(joins, fmt.Sprintf("LEFT JOIN %s ON machines.machine_id = %s.machine_id", tagType.NativeName, tagType.NativeName)) | 
 | 		columns = append(columns, fmt.Sprintf("%s.machine_id", tagType.NativeName)) | 
 | 		for _, fieldType := range tagType.Fields { | 
 | 			columns = append(columns, fmt.Sprintf("%s.%s", tagType.NativeName, fieldType.NativeName)) | 
 | 		} | 
 | 	} | 
 |  | 
 | 	// Finalize query. | 
 | 	q := []string{ | 
 | 		"SELECT", | 
 | 		strings.Join(columns, ", "), | 
 | 		"FROM machines", | 
 | 	} | 
 | 	q = append(q, joins...) | 
 | 	if !opts.Strict { | 
 | 		q = append(q, "AS OF SYSTEM TIME follower_read_timestamp()") | 
 | 	} | 
 | 	if opts.FilterMachine != nil { | 
 | 		q = append(q, "WHERE machines.machine_id = $1") | 
 | 		args = append(args, *opts.FilterMachine) | 
 | 	} | 
 | 	q = append(q, "ORDER BY machines.machine_id") | 
 |  | 
 | 	rows, err := r.db.QueryContext(ctx, strings.Join(q, "\n"), args...) | 
 | 	if err != nil { | 
 | 		return nil, fmt.Errorf("query failed: %w", err) | 
 | 	} | 
 | 	defer rows.Close() | 
 |  | 
 | 	// Okay, we can start scanning the result rows. | 
 | 	// | 
 | 	// As this is a complex join, we need to merge some rows together and discard | 
 | 	// some NULLs. We do merging/deduplication using machine_id values for the | 
 | 	// machine data, and abuse UNIQUE constraints in the work_backoff/work tables to | 
 | 	// deduplicate these. | 
 | 	// | 
 | 	// The alternative would be to rewrite this query to use array_agg, and we might | 
 | 	// do that at some point. This is only really a problem if we | 
 | 	// have _a lot_ of active work/backoffs (as that effectively duplicates all | 
 | 	// machine/tag data), which isn't the case yet. But we should keep an eye out for | 
 | 	// this. | 
 |  | 
 | 	var machines []*Machine | 
 | 	for rows.Next() { | 
 |  | 
 | 		// We need to scan this row back into columns. For constant columns we'll just | 
 | 		// create the data here and refer to it later. | 
 | 		var dests []any | 
 |  | 
 | 		// Add non-tag always-retrieved constants. | 
 | 		var mid uuid.UUID | 
 | 		var machineCreated time.Time | 
 | 		var workSession uuid.NullUUID | 
 | 		var backoffProcess, backoffCause, workProcess sql.NullString | 
 | 		var backoffUntil sql.NullTime | 
 |  | 
 | 		dests = append(dests, &mid, &machineCreated, &backoffProcess, &backoffCause, &backoffUntil, &workProcess, &workSession) | 
 |  | 
 | 		// For dynamic data, we need to keep a reference to a list of columns that are | 
 | 		// part of tags, and then refer to them later. We can't just refer back to dests | 
 | 		// as the types are erased into `any`. scannedTags is that data storage. | 
 | 		type scannedTag struct { | 
 | 			ty     *TagType | 
 | 			id     uuid.NullUUID | 
 | 			fields []*TagField | 
 | 		} | 
 | 		var scannedTags []*scannedTag | 
 | 		for _, tagType := range r.TagTypes { | 
 | 			tagType := tagType | 
 | 			st := scannedTag{ | 
 | 				ty: &tagType, | 
 | 			} | 
 | 			scannedTags = append(scannedTags, &st) | 
 | 			dests = append(dests, &st.id) | 
 | 			for _, fieldType := range tagType.Fields { | 
 | 				fieldType := fieldType | 
 | 				field := TagField{ | 
 | 					Type: &fieldType, | 
 | 				} | 
 | 				dests = append(dests, &field) | 
 | 				st.fields = append(st.fields, &field) | 
 |  | 
 | 			} | 
 | 		} | 
 |  | 
 | 		if err := rows.Scan(dests...); err != nil { | 
 | 			return nil, fmt.Errorf("scan failed: %w", err) | 
 | 		} | 
 |  | 
 | 		// Now comes the merging/deduplication. | 
 |  | 
 | 		// First, check if we are processing a new machine. If so, create a new | 
 | 		// Machine. Otherwise, pick up the previous one. | 
 | 		var machine *Machine | 
 | 		if len(machines) == 0 || machines[len(machines)-1].ID.String() != mid.String() { | 
 | 			// New machine or no machine yet. | 
 | 			machine = &Machine{ | 
 | 				ID:       mid, | 
 | 				Created:  machineCreated, | 
 | 				Tags:     make(map[string]*Tag), | 
 | 				Backoffs: make(map[string]Backoff), | 
 | 				Work:     make(map[string]Work), | 
 | 			} | 
 |  | 
 | 			// Collect tags into machine. | 
 | 			for _, st := range scannedTags { | 
 | 				if !st.id.Valid { | 
 | 					continue | 
 | 				} | 
 | 				var fields []TagField | 
 | 				for _, f := range st.fields { | 
 | 					fields = append(fields, *f) | 
 | 				} | 
 | 				machine.Tags[st.ty.Name()] = &Tag{ | 
 | 					Type:   st.ty, | 
 | 					Fields: fields, | 
 | 				} | 
 | 			} | 
 | 			machines = append(machines, machine) | 
 | 		} else { | 
 | 			// Continue previous machine. | 
 | 			machine = machines[len(machines)-1] | 
 | 		} | 
 |  | 
 | 		// Do we have a backoff? Upsert it to the machine. This works because there's a | 
 | 		// UNIQUE(machine_id, process) constraint on the work_backoff table, and we're | 
 | 		// effectively rebuilding that keyspace here by indexing first by machine then by | 
 | 		// process. | 
 | 		if backoffCause.Valid && backoffProcess.Valid && backoffUntil.Valid { | 
 | 			process := backoffProcess.String | 
 | 			machine.Backoffs[process] = Backoff{ | 
 | 				Cause:   backoffCause.String, | 
 | 				Process: process, | 
 | 				Until:   backoffUntil.Time, | 
 | 			} | 
 | 		} | 
 |  | 
 | 		// Do we have an active work item? Upsert it to the machine. Same UNIQUE | 
 | 		// constraint abuse happening here. | 
 | 		if workProcess.Valid && workSession.Valid { | 
 | 			process := workProcess.String | 
 | 			machine.Work[process] = Work{ | 
 | 				SessionID: workSession.UUID, | 
 | 				Process:   process, | 
 | 			} | 
 | 		} | 
 | 	} | 
 |  | 
 | 	return &Reflected[[]*Machine]{ | 
 | 		Data:  machines, | 
 | 		Query: strings.Join(q, " "), | 
 | 	}, nil | 
 | } | 
 |  | 
 | // Reflected wraps data retrieved by reflection (T) with metadata about the | 
 | // retrieval. | 
 | type Reflected[T any] struct { | 
 | 	Data T | 
 | 	// Effective SQL query performed on the database. | 
 | 	Query string | 
 | } | 
 |  | 
 | // Machine retrieved from BMDB. | 
 | type Machine struct { | 
 | 	ID      uuid.UUID | 
 | 	Created time.Time | 
 |  | 
 | 	// Tags on this machine, keyed by Tag type name (canonical, not native). | 
 | 	Tags map[string]*Tag | 
 |  | 
 | 	// Backoffs on this machine, keyed by process name. By default these are only | 
 | 	// active backoffs, unless ExpiredBackoffs was set on GetMachineOptions. | 
 | 	Backoffs map[string]Backoff | 
 |  | 
 | 	// Work active on this machine, keyed by process name. | 
 | 	Work map[string]Work | 
 | } | 
 |  | 
 | // ActiveBackoffs retrieves a copy of a Machine's active backoffs. Note: the | 
 | // expiration check is performed according tu current system time, so it might | 
 | // not be consistent with the data snapshot retrieved from the database. | 
 | func (r *Machine) ActiveBackoffs() []*Backoff { | 
 | 	var res []*Backoff | 
 | 	for _, bo := range r.Backoffs { | 
 | 		bo := bo | 
 | 		if !bo.Active() { | 
 | 			continue | 
 | 		} | 
 | 		res = append(res, &bo) | 
 | 	} | 
 | 	sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process }) | 
 | 	return res | 
 | } | 
 |  | 
 | // ExpiredBackoffs retrieves a copy of a Machine's expired backoffs. Note: the | 
 | // expiration check is performed according tu current system time, so it might | 
 | // not be consistent with the data snapshot retrieved from the database. | 
 | func (r *Machine) ExpiredBackoffs() []*Backoff { | 
 | 	var res []*Backoff | 
 | 	for _, bo := range r.Backoffs { | 
 | 		bo := bo | 
 | 		if bo.Active() { | 
 | 			continue | 
 | 		} | 
 | 		res = append(res, &bo) | 
 | 	} | 
 | 	sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process }) | 
 | 	return res | 
 | } | 
 |  | 
 | // Tag value set on a Machine. | 
 | type Tag struct { | 
 | 	// Type describing this tag. | 
 | 	Type *TagType | 
 | 	// Field data contained in this tag, sorted alphabetically by name. | 
 | 	Fields []TagField | 
 | } | 
 |  | 
 | // Field is a shorthand for returning a TagField by its name. | 
 | func (r *Tag) Field(name string) *TagField { | 
 | 	for _, f := range r.Fields { | 
 | 		if f.Type.NativeName == name { | 
 | 			return &f | 
 | 		} | 
 | 	} | 
 | 	return nil | 
 | } | 
 |  | 
 | // DisplayOption is an opaque argument used to influence the display style of a | 
 | // tag value when returned from HumanValue. | 
 | type DisplayOption string | 
 |  | 
 | const ( | 
 | 	// DisplaySingleLine limits display to a single line (i.e. don't try to | 
 | 	// pretty-print long values by inserting newlines and indents). | 
 | 	DisplaySingleLine DisplayOption = "single-line" | 
 | ) | 
 |  | 
 | func (r *Tag) HumanValue(opts ...DisplayOption) string { | 
 | 	var kvs []string | 
 | 	for _, field := range r.Fields { | 
 | 		kvs = append(kvs, fmt.Sprintf("%s: %s", field.Type.NativeName, field.HumanValue(opts...))) | 
 | 	} | 
 | 	return strings.Join(kvs, ", ") | 
 | } | 
 |  | 
 | // TagField value which is part of a Tag set on a Machine. | 
 | type TagField struct { | 
 | 	// Type describing this field. | 
 | 	Type *TagFieldType | 
 |  | 
 | 	text  *string | 
 | 	bytes *[]byte | 
 | 	time  *time.Time | 
 | 	proto proto.Message | 
 | } | 
 |  | 
 | // HumanValue returns a human-readable (best effort) representation of the field | 
 | // value. | 
 | func (r *TagField) HumanValue(opts ...DisplayOption) string { | 
 | 	switch { | 
 | 	case r.proto != nil: | 
 | 		mopts := prototext.MarshalOptions{ | 
 | 			Multiline: true, | 
 | 			Indent:    "\t", | 
 | 		} | 
 | 		for _, opt := range opts { | 
 | 			if opt == DisplaySingleLine { | 
 | 				mopts.Multiline = false | 
 | 			} | 
 | 		} | 
 | 		return mopts.Format(r.proto) | 
 | 	case r.text != nil: | 
 | 		return *r.text | 
 | 	case r.bytes != nil: | 
 | 		return hex.EncodeToString(*r.bytes) | 
 | 	case r.time != nil: | 
 | 		return r.time.String() | 
 | 	default: | 
 | 		return "<unknown>" | 
 | 	} | 
 | } | 
 |  | 
 | // Index attempts to index into a structured tag field (currently only protobuf | 
 | // fields) by a 'field.subfield.subsubfield' selector. | 
 | // | 
 | // The selector for Protobuf fields follows the convention from 'protorange', | 
 | // which is a semi-standardized format used in the Protobuf ecosystem. See | 
 | // https://pkg.go.dev/google.golang.org/protobuf/reflect/protorange for more | 
 | // details. | 
 | // | 
 | // An error will be returned if the TagField is not a protobuf field or if the | 
 | // given selector does not point to a known message field. | 
 | func (r *TagField) Index(k string) (string, error) { | 
 | 	if r.Type.ProtoType == nil { | 
 | 		return "", fmt.Errorf("can only index proto fields") | 
 | 	} | 
 | 	k = fmt.Sprintf("(%s).%s", r.Type.ProtoType.Descriptor().FullName(), k) | 
 |  | 
 | 	var res string | 
 | 	var found bool | 
 | 	ref := r.proto.ProtoReflect() | 
 | 	protorange.Range(ref, func(values protopath.Values) error { | 
 | 		if values.Path.String() == k { | 
 | 			res = values.Index(-1).Value.String() | 
 | 			found = true | 
 | 		} | 
 | 		return nil | 
 | 	}) | 
 |  | 
 | 	if !found { | 
 | 		return "", fmt.Errorf("protobuf field not found") | 
 | 	} | 
 | 	return res, nil | 
 | } | 
 |  | 
 | // Backoff on a Machine. | 
 | type Backoff struct { | 
 | 	// Process which established Backoff. | 
 | 	Process string | 
 | 	// Time when Backoff expires. | 
 | 	Until time.Time | 
 | 	// Cause for the Backoff as emitted by worker. | 
 | 	Cause string | 
 | } | 
 |  | 
 | // Active returns whether this Backoff is _currently_ active per the _local_ time. | 
 | func (r Backoff) Active() bool { | 
 | 	return time.Now().Before(r.Until) | 
 | } | 
 |  | 
 | // Work being actively performed on a Machine. | 
 | type Work struct { | 
 | 	// SessionID of the worker performing this Work. | 
 | 	SessionID uuid.UUID | 
 | 	// Process name of this Work. | 
 | 	Process string | 
 | } | 
 |  | 
 | // Scan implements sql.Scanner for direct scanning of query results into a | 
 | // reflected tag value. This method is not meant to by used outside the | 
 | // reflection package. | 
 | func (r *TagField) Scan(src any) error { | 
 | 	if src == nil { | 
 | 		return nil | 
 | 	} | 
 |  | 
 | 	switch r.Type.NativeType { | 
 | 	case "text": | 
 | 		src2, ok := src.(string) | 
 | 		if !ok { | 
 | 			return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) | 
 | 		} | 
 | 		r.text = &src2 | 
 | 	case "bytea": | 
 | 		src2, ok := src.([]byte) | 
 | 		if !ok { | 
 | 			return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) | 
 | 		} | 
 | 		// Copy the bytes, as they are otherwise going to be reused by the pq library. | 
 | 		copied := make([]byte, len(src2)) | 
 | 		copy(copied[:], src2) | 
 | 		r.bytes = &copied | 
 |  | 
 | 		if r.Type.ProtoType != nil { | 
 | 			msg := r.Type.ProtoType.New().Interface() | 
 | 			err := proto.Unmarshal(*r.bytes, msg) | 
 | 			if err != nil { | 
 | 				klog.Warningf("Could not unmarshal %s: %v", r.Type.NativeName, err) | 
 | 			} else { | 
 | 				r.proto = msg | 
 | 			} | 
 | 		} | 
 | 	case "USER-DEFINED": | 
 | 		switch r.Type.NativeUDTName { | 
 | 		case "provider", "provider_status": | 
 | 			src2, ok := src.([]byte) | 
 | 			if !ok { | 
 | 				return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) | 
 | 			} | 
 | 			src3 := string(src2) | 
 | 			r.text = &src3 | 
 | 		} | 
 | 	case "timestamp with time zone": | 
 | 		src2, ok := src.(time.Time) | 
 | 		if !ok { | 
 | 			return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) | 
 | 		} | 
 | 		r.time = &src2 | 
 | 	case "bigint": | 
 | 		src2, ok := src.(int64) | 
 | 		if !ok { | 
 | 			return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) | 
 | 		} | 
 | 		src3 := fmt.Sprintf("%d", src2) | 
 | 		r.text = &src3 | 
 | 	default: | 
 | 		return fmt.Errorf("unimplemented SQL type %q", r.Type.NativeType) | 
 | 	} | 
 |  | 
 | 	return nil | 
 | } |