// Package reflection implements facilities to retrieve information about the
// implemented Tags and their types from a plain CockroachDB SQL connection,
// bypassing the queries/types defined in models. Then, the retrieved Schema can
// be used to retrieve information about machines.
//
// This is designed to be used in debugging facilities to allow arbitrary machine
// introspection. It must _not_ be used in the user path, as the schema
// extraction functionality is implemented best-effort.
package reflection

import (
	"context"
	"database/sql"
	"encoding/hex"
	"fmt"
	"sort"
	"strings"
	"time"

	"k8s.io/klog/v2"

	"github.com/google/uuid"
	"google.golang.org/protobuf/encoding/prototext"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/reflect/protopath"
	"google.golang.org/protobuf/reflect/protorange"
)

// GetMachinesOpts influences the behaviour of GetMachines.
type GetMachinesOpts struct {
	// FilterMachine, if set, will only retrieve information about the machine with
	// the given UUID. In case the given machine UUID does not exist in the database,
	// an empty result will be returned and _no_ error will be set.
	FilterMachine *uuid.UUID
	// Strict enables strict consistency. This is not recommended for use when
	// retrieving all machines, as such queries will compete against all currently
	// running operations. When not enabled, the retrieval will be executed AS OF
	// SYSTEM TIME follower_timestamp(), meaning the data might be a few seconds out
	// of date. Regardless of the option, the returned machine data will be
	// internally consistent, even across machines - but when not enabled the data
	// might be stale.
	Strict bool
	// ExpiredBackoffs enables the retrieval of information about all machine
	// backoffs, including expired backoff. Note that expired backoffs might be
	// garbage collected in the future, and their long-term storage is not
	// guaranteed.
	ExpiredBackoffs bool
}

// GetMachines retrieves all available BMDB data about one or more machines. The
// Schema's embedded SQL connection is used to performed the retrieval.
//
// Options can be specified to influenced the exact operation performed. By
// default (with a zeroed structure or nil pointer), all machines with active
// backoffs are retrieved with weak consistency. See GetMachineOpts to influence
// this behaviour.
func (r *Schema) GetMachines(ctx context.Context, opts *GetMachinesOpts) (*Reflected[[]*Machine], error) {
	if opts == nil {
		opts = &GetMachinesOpts{}
	}

	// We're about to build a pretty big SELECT query with a ton of joins.
	//
	// First, we join against work_backoff and work to get information about active
	// work and backoffs on the machines we're retrieving.
	//
	// Second, we join against all the tags that are declared in the schema.

	// These are the colums we'll SELECT <...> FROM
	columns := []string{
		"machines.machine_id",
		"machines.machine_created_at",
		"work_backoff.process",
		"work_backoff.cause",
		"work_backoff.until",
		"work.process",
		"work.session_id",
		// ... tag columns will come after this.
	}
	// These are tha args we'll pass to the query.
	var args []any

	// Start building joins. First, against work_backoff and work.
	backoffFilter := " AND work_backoff.until > now()"
	if opts.ExpiredBackoffs {
		backoffFilter = ""
	}
	joins := []string{
		"LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id" + backoffFilter,
		"LEFT JOIN work ON machines.machine_id = work.machine_id",
	}

	// Then, against tags. Also populate columns as we go along.
	for _, tagType := range r.TagTypes {
		joins = append(joins, fmt.Sprintf("LEFT JOIN %s ON machines.machine_id = %s.machine_id", tagType.NativeName, tagType.NativeName))
		columns = append(columns, fmt.Sprintf("%s.machine_id", tagType.NativeName))
		for _, fieldType := range tagType.Fields {
			columns = append(columns, fmt.Sprintf("%s.%s", tagType.NativeName, fieldType.NativeName))
		}
	}

	// Finalize query.
	q := []string{
		"SELECT",
		strings.Join(columns, ", "),
		"FROM machines",
	}
	q = append(q, joins...)
	if !opts.Strict {
		q = append(q, "AS OF SYSTEM TIME follower_read_timestamp()")
	}
	if opts.FilterMachine != nil {
		q = append(q, "WHERE machines.machine_id = $1")
		args = append(args, *opts.FilterMachine)
	}
	q = append(q, "ORDER BY machines.machine_id")

	rows, err := r.db.QueryContext(ctx, strings.Join(q, "\n"), args...)
	if err != nil {
		return nil, fmt.Errorf("query failed: %w", err)
	}
	defer rows.Close()

	// Okay, we can start scanning the result rows.
	//
	// As this is a complex join, we need to merge some rows together and discard
	// some NULLs. We do merging/deduplication using machine_id values for the
	// machine data, and abuse UNIQUE constraints in the work_backoff/work tables to
	// deduplicate these.
	//
	// The alternative would be to rewrite this query to use array_agg, and we might
	// do that at some point. This is only really a problem if we
	// have _a lot_ of active work/backoffs (as that effectively duplicates all
	// machine/tag data), which isn't the case yet. But we should keep an eye out for
	// this.

	var machines []*Machine
	for rows.Next() {

		// We need to scan this row back into columns. For constant columns we'll just
		// create the data here and refer to it later.
		var dests []any

		// Add non-tag always-retrieved constants.
		var mid uuid.UUID
		var machineCreated time.Time
		var workSession uuid.NullUUID
		var backoffProcess, backoffCause, workProcess sql.NullString
		var backoffUntil sql.NullTime

		dests = append(dests, &mid, &machineCreated, &backoffProcess, &backoffCause, &backoffUntil, &workProcess, &workSession)

		// For dynamic data, we need to keep a reference to a list of columns that are
		// part of tags, and then refer to them later. We can't just refer back to dests
		// as the types are erased into `any`. scannedTags is that data storage.
		type scannedTag struct {
			ty     *TagType
			id     uuid.NullUUID
			fields []*TagField
		}
		var scannedTags []*scannedTag
		for _, tagType := range r.TagTypes {
			tagType := tagType
			st := scannedTag{
				ty: &tagType,
			}
			scannedTags = append(scannedTags, &st)
			dests = append(dests, &st.id)
			for _, fieldType := range tagType.Fields {
				fieldType := fieldType
				field := TagField{
					Type: &fieldType,
				}
				dests = append(dests, &field)
				st.fields = append(st.fields, &field)

			}
		}

		if err := rows.Scan(dests...); err != nil {
			return nil, fmt.Errorf("scan failed: %w", err)
		}

		// Now comes the merging/deduplication.

		// First, check if we are processing a new machine. If so, create a new
		// Machine. Otherwise, pick up the previous one.
		var machine *Machine
		if len(machines) == 0 || machines[len(machines)-1].ID.String() != mid.String() {
			// New machine or no machine yet.
			machine = &Machine{
				ID:       mid,
				Created:  machineCreated,
				Tags:     make(map[string]*Tag),
				Backoffs: make(map[string]Backoff),
				Work:     make(map[string]Work),
			}

			// Collect tags into machine.
			for _, st := range scannedTags {
				if !st.id.Valid {
					continue
				}
				var fields []TagField
				for _, f := range st.fields {
					fields = append(fields, *f)
				}
				machine.Tags[st.ty.Name()] = &Tag{
					Type:   st.ty,
					Fields: fields,
				}
			}
			machines = append(machines, machine)
		} else {
			// Continue previous machine.
			machine = machines[len(machines)-1]
		}

		// Do we have a backoff? Upsert it to the machine. This works because there's a
		// UNIQUE(machine_id, process) constraint on the work_backoff table, and we're
		// effectively rebuilding that keyspace here by indexing first by machine then by
		// process.
		if backoffCause.Valid && backoffProcess.Valid && backoffUntil.Valid {
			process := backoffProcess.String
			machine.Backoffs[process] = Backoff{
				Cause:   backoffCause.String,
				Process: process,
				Until:   backoffUntil.Time,
			}
		}

		// Do we have an active work item? Upsert it to the machine. Same UNIQUE
		// constraint abuse happening here.
		if workProcess.Valid && workSession.Valid {
			process := workProcess.String
			machine.Work[process] = Work{
				SessionID: workSession.UUID,
				Process:   process,
			}
		}
	}

	return &Reflected[[]*Machine]{
		Data:  machines,
		Query: strings.Join(q, " "),
	}, nil
}

// Reflected wraps data retrieved by reflection (T) with metadata about the
// retrieval.
type Reflected[T any] struct {
	Data T
	// Effective SQL query performed on the database.
	Query string
}

// Machine retrieved from BMDB.
type Machine struct {
	ID      uuid.UUID
	Created time.Time

	// Tags on this machine, keyed by Tag type name (canonical, not native).
	Tags map[string]*Tag

	// Backoffs on this machine, keyed by process name. By default these are only
	// active backoffs, unless ExpiredBackoffs was set on GetMachineOptions.
	Backoffs map[string]Backoff

	// Work active on this machine, keyed by process name.
	Work map[string]Work
}

// ActiveBackoffs retrieves a copy of a Machine's active backoffs. Note: the
// expiration check is performed according tu current system time, so it might
// not be consistent with the data snapshot retrieved from the database.
func (r *Machine) ActiveBackoffs() []*Backoff {
	var res []*Backoff
	for _, bo := range r.Backoffs {
		bo := bo
		if !bo.Active() {
			continue
		}
		res = append(res, &bo)
	}
	sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
	return res
}

// ExpiredBackoffs retrieves a copy of a Machine's expired backoffs. Note: the
// expiration check is performed according tu current system time, so it might
// not be consistent with the data snapshot retrieved from the database.
func (r *Machine) ExpiredBackoffs() []*Backoff {
	var res []*Backoff
	for _, bo := range r.Backoffs {
		bo := bo
		if bo.Active() {
			continue
		}
		res = append(res, &bo)
	}
	sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
	return res
}

// Tag value set on a Machine.
type Tag struct {
	// Type describing this tag.
	Type *TagType
	// Field data contained in this tag, sorted alphabetically by name.
	Fields []TagField
}

// Field is a shorthand for returning a TagField by its name.
func (r *Tag) Field(name string) *TagField {
	for _, f := range r.Fields {
		if f.Type.NativeName == name {
			return &f
		}
	}
	return nil
}

// DisplayOption is an opaque argument used to influence the display style of a
// tag value when returned from HumanValue.
type DisplayOption string

const (
	// DisplaySingleLine limits display to a single line (i.e. don't try to
	// pretty-print long values by inserting newlines and indents).
	DisplaySingleLine DisplayOption = "single-line"
)

func (r *Tag) HumanValue(opts ...DisplayOption) string {
	var kvs []string
	for _, field := range r.Fields {
		kvs = append(kvs, fmt.Sprintf("%s: %s", field.Type.NativeName, field.HumanValue(opts...)))
	}
	return strings.Join(kvs, ", ")
}

// TagField value which is part of a Tag set on a Machine.
type TagField struct {
	// Type describing this field.
	Type *TagFieldType

	text  *string
	bytes *[]byte
	time  *time.Time
	proto proto.Message
}

// HumanValue returns a human-readable (best effort) representation of the field
// value.
func (r *TagField) HumanValue(opts ...DisplayOption) string {
	switch {
	case r.proto != nil:
		mopts := prototext.MarshalOptions{
			Multiline: true,
			Indent:    "\t",
		}
		for _, opt := range opts {
			if opt == DisplaySingleLine {
				mopts.Multiline = false
			}
		}
		return mopts.Format(r.proto)
	case r.text != nil:
		return *r.text
	case r.bytes != nil:
		return hex.EncodeToString(*r.bytes)
	case r.time != nil:
		return r.time.String()
	default:
		return "<unknown>"
	}
}

// Index attempts to index into a structured tag field (currently only protobuf
// fields) by a 'field.subfield.subsubfield' selector.
//
// The selector for Protobuf fields follows the convention from 'protorange',
// which is a semi-standardized format used in the Protobuf ecosystem. See
// https://pkg.go.dev/google.golang.org/protobuf/reflect/protorange for more
// details.
//
// An error will be returned if the TagField is not a protobuf field or if the
// given selector does not point to a known message field.
func (r *TagField) Index(k string) (string, error) {
	if r.Type.ProtoType == nil {
		return "", fmt.Errorf("can only index proto fields")
	}
	k = fmt.Sprintf("(%s).%s", r.Type.ProtoType.Descriptor().FullName(), k)

	var res string
	var found bool
	ref := r.proto.ProtoReflect()
	protorange.Range(ref, func(values protopath.Values) error {
		if values.Path.String() == k {
			res = values.Index(-1).Value.String()
			found = true
		}
		return nil
	})

	if !found {
		return "", fmt.Errorf("protobuf field not found")
	}
	return res, nil
}

// Backoff on a Machine.
type Backoff struct {
	// Process which established Backoff.
	Process string
	// Time when Backoff expires.
	Until time.Time
	// Cause for the Backoff as emitted by worker.
	Cause string
}

// Active returns whether this Backoff is _currently_ active per the _local_ time.
func (r Backoff) Active() bool {
	return time.Now().Before(r.Until)
}

// Work being actively performed on a Machine.
type Work struct {
	// SessionID of the worker performing this Work.
	SessionID uuid.UUID
	// Process name of this Work.
	Process string
}

// Scan implements sql.Scanner for direct scanning of query results into a
// reflected tag value. This method is not meant to by used outside the
// reflection package.
func (r *TagField) Scan(src any) error {
	if src == nil {
		return nil
	}

	switch r.Type.NativeType {
	case "text":
		src2, ok := src.(string)
		if !ok {
			return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
		}
		r.text = &src2
	case "bytea":
		src2, ok := src.([]byte)
		if !ok {
			return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
		}
		// Copy the bytes, as they are otherwise going to be reused by the pq library.
		copied := make([]byte, len(src2))
		copy(copied[:], src2)
		r.bytes = &copied

		if r.Type.ProtoType != nil {
			msg := r.Type.ProtoType.New().Interface()
			err := proto.Unmarshal(*r.bytes, msg)
			if err != nil {
				klog.Warningf("Could not unmarshal %s: %v", r.Type.NativeName, err)
			} else {
				r.proto = msg
			}
		}
	case "USER-DEFINED":
		switch r.Type.NativeUDTName {
		case "provider", "provider_status":
			src2, ok := src.([]byte)
			if !ok {
				return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
			}
			src3 := string(src2)
			r.text = &src3
		}
	case "timestamp with time zone":
		src2, ok := src.(time.Time)
		if !ok {
			return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
		}
		r.time = &src2
	case "bigint":
		src2, ok := src.(int64)
		if !ok {
			return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
		}
		src3 := fmt.Sprintf("%d", src2)
		r.text = &src3
	default:
		return fmt.Errorf("unimplemented SQL type %q", r.Type.NativeType)
	}

	return nil
}
