blob: b9f6d115048f8262bb5830270c4d92c8897a4ebe [file] [log] [blame]
Serge Bazanski424e2012023-02-15 23:31:49 +01001// Package reflection implements facilities to retrieve information about the
2// implemented Tags and their types from a plain CockroachDB SQL connection,
3// bypassing the queries/types defined in models. Then, the retrieved Schema can
4// be used to retrieve information about machines.
5//
6// This is designed to be used in debugging facilities to allow arbitrary machine
7// introspection. It must _not_ be used in the user path, as the schema
8// extraction functionality is implemented best-effort.
9package reflection
10
11import (
12 "context"
13 "database/sql"
14 "encoding/hex"
15 "fmt"
16 "sort"
17 "strings"
18 "time"
19
Serge Bazanski10b21542023-04-13 12:12:05 +020020 "k8s.io/klog/v2"
21
Serge Bazanski424e2012023-02-15 23:31:49 +010022 "github.com/google/uuid"
Serge Bazanski10b21542023-04-13 12:12:05 +020023 "google.golang.org/protobuf/encoding/prototext"
24 "google.golang.org/protobuf/proto"
Serge Bazanski3c6306b2023-09-19 11:48:44 +000025 "google.golang.org/protobuf/reflect/protopath"
26 "google.golang.org/protobuf/reflect/protorange"
Serge Bazanski424e2012023-02-15 23:31:49 +010027)
28
29// GetMachinesOpts influences the behaviour of GetMachines.
30type GetMachinesOpts struct {
31 // FilterMachine, if set, will only retrieve information about the machine with
32 // the given UUID. In case the given machine UUID does not exist in the database,
33 // an empty result will be returned and _no_ error will be set.
34 FilterMachine *uuid.UUID
35 // Strict enables strict consistency. This is not recommended for use when
36 // retrieving all machines, as such queries will compete against all currently
37 // running operations. When not enabled, the retrieval will be executed AS OF
38 // SYSTEM TIME follower_timestamp(), meaning the data might be a few seconds out
39 // of date. Regardless of the option, the returned machine data will be
40 // internally consistent, even across machines - but when not enabled the data
41 // might be stale.
42 Strict bool
43 // ExpiredBackoffs enables the retrieval of information about all machine
44 // backoffs, including expired backoff. Note that expired backoffs might be
45 // garbage collected in the future, and their long-term storage is not
46 // guaranteed.
47 ExpiredBackoffs bool
48}
49
50// GetMachines retrieves all available BMDB data about one or more machines. The
51// Schema's embedded SQL connection is used to performed the retrieval.
52//
53// Options can be specified to influenced the exact operation performed. By
54// default (with a zeroed structure or nil pointer), all machines with active
55// backoffs are retrieved with weak consistency. See GetMachineOpts to influence
56// this behaviour.
57func (r *Schema) GetMachines(ctx context.Context, opts *GetMachinesOpts) (*Reflected[[]*Machine], error) {
58 if opts == nil {
59 opts = &GetMachinesOpts{}
60 }
61
62 // We're about to build a pretty big SELECT query with a ton of joins.
63 //
64 // First, we join against work_backoff and work to get information about active
65 // work and backoffs on the machines we're retrieving.
66 //
67 // Second, we join against all the tags that are declared in the schema.
68
69 // These are the colums we'll SELECT <...> FROM
70 columns := []string{
71 "machines.machine_id",
72 "machines.machine_created_at",
73 "work_backoff.process",
74 "work_backoff.cause",
75 "work_backoff.until",
76 "work.process",
77 "work.session_id",
78 // ... tag columns will come after this.
79 }
80 // These are tha args we'll pass to the query.
81 var args []any
82
83 // Start building joins. First, against work_backoff and work.
84 backoffFilter := " AND work_backoff.until > now()"
85 if opts.ExpiredBackoffs {
86 backoffFilter = ""
87 }
88 joins := []string{
89 "LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id" + backoffFilter,
90 "LEFT JOIN work ON machines.machine_id = work.machine_id",
91 }
92
93 // Then, against tags. Also populate columns as we go along.
94 for _, tagType := range r.TagTypes {
95 joins = append(joins, fmt.Sprintf("LEFT JOIN %s ON machines.machine_id = %s.machine_id", tagType.NativeName, tagType.NativeName))
96 columns = append(columns, fmt.Sprintf("%s.machine_id", tagType.NativeName))
97 for _, fieldType := range tagType.Fields {
98 columns = append(columns, fmt.Sprintf("%s.%s", tagType.NativeName, fieldType.NativeName))
99 }
100 }
101
102 // Finalize query.
103 q := []string{
104 "SELECT",
105 strings.Join(columns, ", "),
106 "FROM machines",
107 }
108 q = append(q, joins...)
109 if !opts.Strict {
110 q = append(q, "AS OF SYSTEM TIME follower_read_timestamp()")
111 }
112 if opts.FilterMachine != nil {
113 q = append(q, "WHERE machines.machine_id = $1")
114 args = append(args, *opts.FilterMachine)
115 }
Tim Windelschmidtae7e3ed2023-04-17 23:15:39 +0200116 q = append(q, "ORDER BY machines.machine_id")
Serge Bazanski424e2012023-02-15 23:31:49 +0100117
118 rows, err := r.db.QueryContext(ctx, strings.Join(q, "\n"), args...)
119 if err != nil {
120 return nil, fmt.Errorf("query failed: %w", err)
121 }
122 defer rows.Close()
123
124 // Okay, we can start scanning the result rows.
125 //
126 // As this is a complex join, we need to merge some rows together and discard
127 // some NULLs. We do merging/deduplication using machine_id values for the
128 // machine data, and abuse UNIQUE constraints in the work_backoff/work tables to
129 // deduplicate these.
130 //
131 // The alternative would be to rewrite this query to use array_agg, and we might
132 // do that at some point. This is only really a problem if we
133 // have _a lot_ of active work/backoffs (as that effectively duplicates all
134 // machine/tag data), which isn't the case yet. But we should keep an eye out for
135 // this.
136
137 var machines []*Machine
138 for rows.Next() {
139
140 // We need to scan this row back into columns. For constant columns we'll just
141 // create the data here and refer to it later.
142 var dests []any
143
144 // Add non-tag always-retrieved constants.
145 var mid uuid.UUID
146 var machineCreated time.Time
147 var workSession uuid.NullUUID
148 var backoffProcess, backoffCause, workProcess sql.NullString
149 var backoffUntil sql.NullTime
150
151 dests = append(dests, &mid, &machineCreated, &backoffProcess, &backoffCause, &backoffUntil, &workProcess, &workSession)
152
153 // For dynamic data, we need to keep a reference to a list of columns that are
154 // part of tags, and then refer to them later. We can't just refer back to dests
155 // as the types are erased into `any`. scannedTags is that data storage.
156 type scannedTag struct {
157 ty *TagType
158 id uuid.NullUUID
159 fields []*TagField
160 }
161 var scannedTags []*scannedTag
162 for _, tagType := range r.TagTypes {
163 tagType := tagType
164 st := scannedTag{
165 ty: &tagType,
166 }
167 scannedTags = append(scannedTags, &st)
168 dests = append(dests, &st.id)
169 for _, fieldType := range tagType.Fields {
170 fieldType := fieldType
171 field := TagField{
172 Type: &fieldType,
173 }
174 dests = append(dests, &field)
175 st.fields = append(st.fields, &field)
176
177 }
178 }
179
180 if err := rows.Scan(dests...); err != nil {
181 return nil, fmt.Errorf("scan failed: %w", err)
182 }
183
184 // Now comes the merging/deduplication.
185
186 // First, check if we are processing a new machine. If so, create a new
187 // Machine. Otherwise, pick up the previous one.
188 var machine *Machine
189 if len(machines) == 0 || machines[len(machines)-1].ID.String() != mid.String() {
190 // New machine or no machine yet.
191 machine = &Machine{
192 ID: mid,
193 Created: machineCreated,
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100194 Tags: make(map[string]*Tag),
Serge Bazanski424e2012023-02-15 23:31:49 +0100195 Backoffs: make(map[string]Backoff),
196 Work: make(map[string]Work),
197 }
198
199 // Collect tags into machine.
200 for _, st := range scannedTags {
201 if !st.id.Valid {
202 continue
203 }
204 var fields []TagField
205 for _, f := range st.fields {
206 fields = append(fields, *f)
207 }
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100208 machine.Tags[st.ty.Name()] = &Tag{
Serge Bazanski424e2012023-02-15 23:31:49 +0100209 Type: st.ty,
210 Fields: fields,
211 }
212 }
213 machines = append(machines, machine)
214 } else {
215 // Continue previous machine.
216 machine = machines[len(machines)-1]
217 }
218
219 // Do we have a backoff? Upsert it to the machine. This works because there's a
220 // UNIQUE(machine_id, process) constraint on the work_backoff table, and we're
221 // effectively rebuilding that keyspace here by indexing first by machine then by
222 // process.
223 if backoffCause.Valid && backoffProcess.Valid && backoffUntil.Valid {
224 process := backoffProcess.String
225 machine.Backoffs[process] = Backoff{
226 Cause: backoffCause.String,
227 Process: process,
228 Until: backoffUntil.Time,
229 }
230 }
231
232 // Do we have an active work item? Upsert it to the machine. Same UNIQUE
233 // constraint abuse happening here.
234 if workProcess.Valid && workSession.Valid {
235 process := workProcess.String
236 machine.Work[process] = Work{
237 SessionID: workSession.UUID,
238 Process: process,
239 }
240 }
241 }
242
243 return &Reflected[[]*Machine]{
244 Data: machines,
245 Query: strings.Join(q, " "),
246 }, nil
247}
248
249// Reflected wraps data retrieved by reflection (T) with metadata about the
250// retrieval.
251type Reflected[T any] struct {
252 Data T
253 // Effective SQL query performed on the database.
254 Query string
255}
256
257// Machine retrieved from BMDB.
258type Machine struct {
259 ID uuid.UUID
260 Created time.Time
261
262 // Tags on this machine, keyed by Tag type name (canonical, not native).
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100263 Tags map[string]*Tag
Serge Bazanski424e2012023-02-15 23:31:49 +0100264
265 // Backoffs on this machine, keyed by process name. By default these are only
266 // active backoffs, unless ExpiredBackoffs was set on GetMachineOptions.
267 Backoffs map[string]Backoff
268
269 // Work active on this machine, keyed by process name.
270 Work map[string]Work
271}
272
273// ActiveBackoffs retrieves a copy of a Machine's active backoffs. Note: the
274// expiration check is performed according tu current system time, so it might
275// not be consistent with the data snapshot retrieved from the database.
276func (r *Machine) ActiveBackoffs() []*Backoff {
277 var res []*Backoff
278 for _, bo := range r.Backoffs {
279 bo := bo
280 if !bo.Active() {
281 continue
282 }
283 res = append(res, &bo)
284 }
285 sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
286 return res
287}
288
289// ExpiredBackoffs retrieves a copy of a Machine's expired backoffs. Note: the
290// expiration check is performed according tu current system time, so it might
291// not be consistent with the data snapshot retrieved from the database.
292func (r *Machine) ExpiredBackoffs() []*Backoff {
293 var res []*Backoff
294 for _, bo := range r.Backoffs {
295 bo := bo
296 if bo.Active() {
297 continue
298 }
299 res = append(res, &bo)
300 }
301 sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
302 return res
303}
304
305// Tag value set on a Machine.
306type Tag struct {
307 // Type describing this tag.
308 Type *TagType
309 // Field data contained in this tag, sorted alphabetically by name.
310 Fields []TagField
311}
312
313// Field is a shorthand for returning a TagField by its name.
314func (r *Tag) Field(name string) *TagField {
315 for _, f := range r.Fields {
316 if f.Type.NativeName == name {
317 return &f
318 }
319 }
320 return nil
321}
322
Serge Bazanski3c6306b2023-09-19 11:48:44 +0000323// DisplayOption is an opaque argument used to influence the display style of a
324// tag value when returned from HumanValue.
325type DisplayOption string
326
327const (
328 // DisplaySingleLine limits display to a single line (i.e. don't try to
329 // pretty-print long values by inserting newlines and indents).
330 DisplaySingleLine DisplayOption = "single-line"
331)
332
333func (r *Tag) HumanValue(opts ...DisplayOption) string {
334 var kvs []string
335 for _, field := range r.Fields {
336 kvs = append(kvs, fmt.Sprintf("%s: %s", field.Type.NativeName, field.HumanValue(opts...)))
337 }
338 return strings.Join(kvs, ", ")
339}
340
Serge Bazanski424e2012023-02-15 23:31:49 +0100341// TagField value which is part of a Tag set on a Machine.
342type TagField struct {
343 // Type describing this field.
344 Type *TagFieldType
345
346 text *string
347 bytes *[]byte
348 time *time.Time
Serge Bazanski10b21542023-04-13 12:12:05 +0200349 proto proto.Message
Serge Bazanski424e2012023-02-15 23:31:49 +0100350}
351
352// HumanValue returns a human-readable (best effort) representation of the field
353// value.
Serge Bazanski3c6306b2023-09-19 11:48:44 +0000354func (r *TagField) HumanValue(opts ...DisplayOption) string {
Serge Bazanski424e2012023-02-15 23:31:49 +0100355 switch {
Serge Bazanski10b21542023-04-13 12:12:05 +0200356 case r.proto != nil:
Serge Bazanski3c6306b2023-09-19 11:48:44 +0000357 mopts := prototext.MarshalOptions{
Serge Bazanski10b21542023-04-13 12:12:05 +0200358 Multiline: true,
359 Indent: "\t",
360 }
Serge Bazanski3c6306b2023-09-19 11:48:44 +0000361 for _, opt := range opts {
362 if opt == DisplaySingleLine {
363 mopts.Multiline = false
364 }
365 }
366 return mopts.Format(r.proto)
Serge Bazanski424e2012023-02-15 23:31:49 +0100367 case r.text != nil:
368 return *r.text
369 case r.bytes != nil:
370 return hex.EncodeToString(*r.bytes)
371 case r.time != nil:
372 return r.time.String()
373 default:
374 return "<unknown>"
375 }
376}
377
Serge Bazanski3c6306b2023-09-19 11:48:44 +0000378// Index attempts to index into a structured tag field (currently only protobuf
379// fields) by a 'field.subfield.subsubfield' selector.
380//
381// The selector for Protobuf fields follows the convention from 'protorange',
382// which is a semi-standardized format used in the Protobuf ecosystem. See
383// https://pkg.go.dev/google.golang.org/protobuf/reflect/protorange for more
384// details.
385//
386// An error will be returned if the TagField is not a protobuf field or if the
387// given selector does not point to a known message field.
388func (r *TagField) Index(k string) (string, error) {
389 if r.Type.ProtoType == nil {
390 return "", fmt.Errorf("can only index proto fields")
391 }
392 k = fmt.Sprintf("(%s).%s", r.Type.ProtoType.Descriptor().FullName(), k)
393
394 var res string
395 var found bool
396 ref := r.proto.ProtoReflect()
397 protorange.Range(ref, func(values protopath.Values) error {
398 if values.Path.String() == k {
399 res = values.Index(-1).Value.String()
400 found = true
401 }
402 return nil
403 })
404
405 if !found {
406 return "", fmt.Errorf("protobuf field not found")
407 }
408 return res, nil
409}
410
Serge Bazanski424e2012023-02-15 23:31:49 +0100411// Backoff on a Machine.
412type Backoff struct {
413 // Process which established Backoff.
414 Process string
415 // Time when Backoff expires.
416 Until time.Time
417 // Cause for the Backoff as emitted by worker.
418 Cause string
419}
420
421// Active returns whether this Backoff is _currently_ active per the _local_ time.
422func (r Backoff) Active() bool {
423 return time.Now().Before(r.Until)
424}
425
426// Work being actively performed on a Machine.
427type Work struct {
428 // SessionID of the worker performing this Work.
429 SessionID uuid.UUID
430 // Process name of this Work.
431 Process string
432}
433
434// Scan implements sql.Scanner for direct scanning of query results into a
435// reflected tag value. This method is not meant to by used outside the
436// reflection package.
437func (r *TagField) Scan(src any) error {
438 if src == nil {
439 return nil
440 }
441
442 switch r.Type.NativeType {
443 case "text":
444 src2, ok := src.(string)
445 if !ok {
446 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
447 }
448 r.text = &src2
449 case "bytea":
450 src2, ok := src.([]byte)
451 if !ok {
452 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
453 }
Serge Bazanski5cd7ddf2023-04-13 12:09:53 +0200454 // Copy the bytes, as they are otherwise going to be reused by the pq library.
455 copied := make([]byte, len(src2))
456 copy(copied[:], src2)
457 r.bytes = &copied
Serge Bazanski10b21542023-04-13 12:12:05 +0200458
459 if r.Type.ProtoType != nil {
460 msg := r.Type.ProtoType.New().Interface()
461 err := proto.Unmarshal(*r.bytes, msg)
462 if err != nil {
463 klog.Warningf("Could not unmarshal %s: %v", r.Type.NativeName, err)
464 } else {
465 r.proto = msg
466 }
467 }
Serge Bazanski424e2012023-02-15 23:31:49 +0100468 case "USER-DEFINED":
469 switch r.Type.NativeUDTName {
Serge Bazanskiafd3cf82023-04-19 17:43:46 +0200470 case "provider", "provider_status":
Serge Bazanski424e2012023-02-15 23:31:49 +0100471 src2, ok := src.([]byte)
472 if !ok {
473 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
474 }
475 src3 := string(src2)
476 r.text = &src3
477 }
478 case "timestamp with time zone":
479 src2, ok := src.(time.Time)
480 if !ok {
481 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
482 }
483 r.time = &src2
Tim Windelschmidt2bffb6f2023-04-24 19:06:10 +0200484 case "bigint":
485 src2, ok := src.(int64)
486 if !ok {
487 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
488 }
489 src3 := fmt.Sprintf("%d", src2)
490 r.text = &src3
Serge Bazanski424e2012023-02-15 23:31:49 +0100491 default:
492 return fmt.Errorf("unimplemented SQL type %q", r.Type.NativeType)
493 }
494
495 return nil
496}