blob: 0976905527110696709ad473025b5b9a7ba9769a [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanski424e2012023-02-15 23:31:49 +01004// Package reflection implements facilities to retrieve information about the
5// implemented Tags and their types from a plain CockroachDB SQL connection,
6// bypassing the queries/types defined in models. Then, the retrieved Schema can
7// be used to retrieve information about machines.
8//
9// This is designed to be used in debugging facilities to allow arbitrary machine
10// introspection. It must _not_ be used in the user path, as the schema
11// extraction functionality is implemented best-effort.
12package reflection
13
14import (
15 "context"
16 "database/sql"
17 "encoding/hex"
18 "fmt"
19 "sort"
20 "strings"
21 "time"
22
Serge Bazanski10b21542023-04-13 12:12:05 +020023 "k8s.io/klog/v2"
24
Serge Bazanski424e2012023-02-15 23:31:49 +010025 "github.com/google/uuid"
Serge Bazanski10b21542023-04-13 12:12:05 +020026 "google.golang.org/protobuf/encoding/prototext"
27 "google.golang.org/protobuf/proto"
Serge Bazanski3c6306b2023-09-19 11:48:44 +000028 "google.golang.org/protobuf/reflect/protopath"
29 "google.golang.org/protobuf/reflect/protorange"
Serge Bazanski424e2012023-02-15 23:31:49 +010030)
31
32// GetMachinesOpts influences the behaviour of GetMachines.
33type GetMachinesOpts struct {
34 // FilterMachine, if set, will only retrieve information about the machine with
35 // the given UUID. In case the given machine UUID does not exist in the database,
36 // an empty result will be returned and _no_ error will be set.
37 FilterMachine *uuid.UUID
38 // Strict enables strict consistency. This is not recommended for use when
39 // retrieving all machines, as such queries will compete against all currently
40 // running operations. When not enabled, the retrieval will be executed AS OF
41 // SYSTEM TIME follower_timestamp(), meaning the data might be a few seconds out
42 // of date. Regardless of the option, the returned machine data will be
43 // internally consistent, even across machines - but when not enabled the data
44 // might be stale.
45 Strict bool
46 // ExpiredBackoffs enables the retrieval of information about all machine
47 // backoffs, including expired backoff. Note that expired backoffs might be
48 // garbage collected in the future, and their long-term storage is not
49 // guaranteed.
50 ExpiredBackoffs bool
51}
52
53// GetMachines retrieves all available BMDB data about one or more machines. The
54// Schema's embedded SQL connection is used to performed the retrieval.
55//
56// Options can be specified to influenced the exact operation performed. By
57// default (with a zeroed structure or nil pointer), all machines with active
58// backoffs are retrieved with weak consistency. See GetMachineOpts to influence
59// this behaviour.
60func (r *Schema) GetMachines(ctx context.Context, opts *GetMachinesOpts) (*Reflected[[]*Machine], error) {
61 if opts == nil {
62 opts = &GetMachinesOpts{}
63 }
64
65 // We're about to build a pretty big SELECT query with a ton of joins.
66 //
67 // First, we join against work_backoff and work to get information about active
68 // work and backoffs on the machines we're retrieving.
69 //
70 // Second, we join against all the tags that are declared in the schema.
71
72 // These are the colums we'll SELECT <...> FROM
73 columns := []string{
74 "machines.machine_id",
75 "machines.machine_created_at",
76 "work_backoff.process",
77 "work_backoff.cause",
78 "work_backoff.until",
79 "work.process",
80 "work.session_id",
81 // ... tag columns will come after this.
82 }
83 // These are tha args we'll pass to the query.
84 var args []any
85
86 // Start building joins. First, against work_backoff and work.
87 backoffFilter := " AND work_backoff.until > now()"
88 if opts.ExpiredBackoffs {
89 backoffFilter = ""
90 }
91 joins := []string{
92 "LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id" + backoffFilter,
93 "LEFT JOIN work ON machines.machine_id = work.machine_id",
94 }
95
96 // Then, against tags. Also populate columns as we go along.
97 for _, tagType := range r.TagTypes {
98 joins = append(joins, fmt.Sprintf("LEFT JOIN %s ON machines.machine_id = %s.machine_id", tagType.NativeName, tagType.NativeName))
99 columns = append(columns, fmt.Sprintf("%s.machine_id", tagType.NativeName))
100 for _, fieldType := range tagType.Fields {
101 columns = append(columns, fmt.Sprintf("%s.%s", tagType.NativeName, fieldType.NativeName))
102 }
103 }
104
105 // Finalize query.
106 q := []string{
107 "SELECT",
108 strings.Join(columns, ", "),
109 "FROM machines",
110 }
111 q = append(q, joins...)
112 if !opts.Strict {
113 q = append(q, "AS OF SYSTEM TIME follower_read_timestamp()")
114 }
115 if opts.FilterMachine != nil {
116 q = append(q, "WHERE machines.machine_id = $1")
117 args = append(args, *opts.FilterMachine)
118 }
Tim Windelschmidtae7e3ed2023-04-17 23:15:39 +0200119 q = append(q, "ORDER BY machines.machine_id")
Serge Bazanski424e2012023-02-15 23:31:49 +0100120
121 rows, err := r.db.QueryContext(ctx, strings.Join(q, "\n"), args...)
122 if err != nil {
123 return nil, fmt.Errorf("query failed: %w", err)
124 }
125 defer rows.Close()
126
127 // Okay, we can start scanning the result rows.
128 //
129 // As this is a complex join, we need to merge some rows together and discard
130 // some NULLs. We do merging/deduplication using machine_id values for the
131 // machine data, and abuse UNIQUE constraints in the work_backoff/work tables to
132 // deduplicate these.
133 //
134 // The alternative would be to rewrite this query to use array_agg, and we might
135 // do that at some point. This is only really a problem if we
136 // have _a lot_ of active work/backoffs (as that effectively duplicates all
137 // machine/tag data), which isn't the case yet. But we should keep an eye out for
138 // this.
139
140 var machines []*Machine
141 for rows.Next() {
142
143 // We need to scan this row back into columns. For constant columns we'll just
144 // create the data here and refer to it later.
145 var dests []any
146
147 // Add non-tag always-retrieved constants.
148 var mid uuid.UUID
149 var machineCreated time.Time
150 var workSession uuid.NullUUID
151 var backoffProcess, backoffCause, workProcess sql.NullString
152 var backoffUntil sql.NullTime
153
154 dests = append(dests, &mid, &machineCreated, &backoffProcess, &backoffCause, &backoffUntil, &workProcess, &workSession)
155
156 // For dynamic data, we need to keep a reference to a list of columns that are
157 // part of tags, and then refer to them later. We can't just refer back to dests
158 // as the types are erased into `any`. scannedTags is that data storage.
159 type scannedTag struct {
160 ty *TagType
161 id uuid.NullUUID
162 fields []*TagField
163 }
164 var scannedTags []*scannedTag
165 for _, tagType := range r.TagTypes {
166 tagType := tagType
167 st := scannedTag{
168 ty: &tagType,
169 }
170 scannedTags = append(scannedTags, &st)
171 dests = append(dests, &st.id)
172 for _, fieldType := range tagType.Fields {
173 fieldType := fieldType
174 field := TagField{
175 Type: &fieldType,
176 }
177 dests = append(dests, &field)
178 st.fields = append(st.fields, &field)
179
180 }
181 }
182
183 if err := rows.Scan(dests...); err != nil {
184 return nil, fmt.Errorf("scan failed: %w", err)
185 }
186
187 // Now comes the merging/deduplication.
188
189 // First, check if we are processing a new machine. If so, create a new
190 // Machine. Otherwise, pick up the previous one.
191 var machine *Machine
192 if len(machines) == 0 || machines[len(machines)-1].ID.String() != mid.String() {
193 // New machine or no machine yet.
194 machine = &Machine{
195 ID: mid,
196 Created: machineCreated,
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100197 Tags: make(map[string]*Tag),
Serge Bazanski424e2012023-02-15 23:31:49 +0100198 Backoffs: make(map[string]Backoff),
199 Work: make(map[string]Work),
200 }
201
202 // Collect tags into machine.
203 for _, st := range scannedTags {
204 if !st.id.Valid {
205 continue
206 }
207 var fields []TagField
208 for _, f := range st.fields {
209 fields = append(fields, *f)
210 }
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100211 machine.Tags[st.ty.Name()] = &Tag{
Serge Bazanski424e2012023-02-15 23:31:49 +0100212 Type: st.ty,
213 Fields: fields,
214 }
215 }
216 machines = append(machines, machine)
217 } else {
218 // Continue previous machine.
219 machine = machines[len(machines)-1]
220 }
221
222 // Do we have a backoff? Upsert it to the machine. This works because there's a
223 // UNIQUE(machine_id, process) constraint on the work_backoff table, and we're
224 // effectively rebuilding that keyspace here by indexing first by machine then by
225 // process.
226 if backoffCause.Valid && backoffProcess.Valid && backoffUntil.Valid {
227 process := backoffProcess.String
228 machine.Backoffs[process] = Backoff{
229 Cause: backoffCause.String,
230 Process: process,
231 Until: backoffUntil.Time,
232 }
233 }
234
235 // Do we have an active work item? Upsert it to the machine. Same UNIQUE
236 // constraint abuse happening here.
237 if workProcess.Valid && workSession.Valid {
238 process := workProcess.String
239 machine.Work[process] = Work{
240 SessionID: workSession.UUID,
241 Process: process,
242 }
243 }
244 }
245
246 return &Reflected[[]*Machine]{
247 Data: machines,
248 Query: strings.Join(q, " "),
249 }, nil
250}
251
252// Reflected wraps data retrieved by reflection (T) with metadata about the
253// retrieval.
254type Reflected[T any] struct {
255 Data T
256 // Effective SQL query performed on the database.
257 Query string
258}
259
260// Machine retrieved from BMDB.
261type Machine struct {
262 ID uuid.UUID
263 Created time.Time
264
265 // Tags on this machine, keyed by Tag type name (canonical, not native).
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100266 Tags map[string]*Tag
Serge Bazanski424e2012023-02-15 23:31:49 +0100267
268 // Backoffs on this machine, keyed by process name. By default these are only
269 // active backoffs, unless ExpiredBackoffs was set on GetMachineOptions.
270 Backoffs map[string]Backoff
271
272 // Work active on this machine, keyed by process name.
273 Work map[string]Work
274}
275
276// ActiveBackoffs retrieves a copy of a Machine's active backoffs. Note: the
277// expiration check is performed according tu current system time, so it might
278// not be consistent with the data snapshot retrieved from the database.
279func (r *Machine) ActiveBackoffs() []*Backoff {
280 var res []*Backoff
281 for _, bo := range r.Backoffs {
282 bo := bo
283 if !bo.Active() {
284 continue
285 }
286 res = append(res, &bo)
287 }
288 sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
289 return res
290}
291
292// ExpiredBackoffs retrieves a copy of a Machine's expired backoffs. Note: the
293// expiration check is performed according tu current system time, so it might
294// not be consistent with the data snapshot retrieved from the database.
295func (r *Machine) ExpiredBackoffs() []*Backoff {
296 var res []*Backoff
297 for _, bo := range r.Backoffs {
298 bo := bo
299 if bo.Active() {
300 continue
301 }
302 res = append(res, &bo)
303 }
304 sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
305 return res
306}
307
308// Tag value set on a Machine.
309type Tag struct {
310 // Type describing this tag.
311 Type *TagType
312 // Field data contained in this tag, sorted alphabetically by name.
313 Fields []TagField
314}
315
316// Field is a shorthand for returning a TagField by its name.
317func (r *Tag) Field(name string) *TagField {
318 for _, f := range r.Fields {
319 if f.Type.NativeName == name {
320 return &f
321 }
322 }
323 return nil
324}
325
Serge Bazanski3c6306b2023-09-19 11:48:44 +0000326// DisplayOption is an opaque argument used to influence the display style of a
327// tag value when returned from HumanValue.
328type DisplayOption string
329
330const (
331 // DisplaySingleLine limits display to a single line (i.e. don't try to
332 // pretty-print long values by inserting newlines and indents).
333 DisplaySingleLine DisplayOption = "single-line"
334)
335
336func (r *Tag) HumanValue(opts ...DisplayOption) string {
337 var kvs []string
338 for _, field := range r.Fields {
339 kvs = append(kvs, fmt.Sprintf("%s: %s", field.Type.NativeName, field.HumanValue(opts...)))
340 }
341 return strings.Join(kvs, ", ")
342}
343
Serge Bazanski424e2012023-02-15 23:31:49 +0100344// TagField value which is part of a Tag set on a Machine.
345type TagField struct {
346 // Type describing this field.
347 Type *TagFieldType
348
349 text *string
350 bytes *[]byte
351 time *time.Time
Serge Bazanski10b21542023-04-13 12:12:05 +0200352 proto proto.Message
Serge Bazanski424e2012023-02-15 23:31:49 +0100353}
354
355// HumanValue returns a human-readable (best effort) representation of the field
356// value.
Serge Bazanski3c6306b2023-09-19 11:48:44 +0000357func (r *TagField) HumanValue(opts ...DisplayOption) string {
Serge Bazanski424e2012023-02-15 23:31:49 +0100358 switch {
Serge Bazanski10b21542023-04-13 12:12:05 +0200359 case r.proto != nil:
Serge Bazanski3c6306b2023-09-19 11:48:44 +0000360 mopts := prototext.MarshalOptions{
Serge Bazanski10b21542023-04-13 12:12:05 +0200361 Multiline: true,
362 Indent: "\t",
363 }
Serge Bazanski3c6306b2023-09-19 11:48:44 +0000364 for _, opt := range opts {
365 if opt == DisplaySingleLine {
366 mopts.Multiline = false
367 }
368 }
369 return mopts.Format(r.proto)
Serge Bazanski424e2012023-02-15 23:31:49 +0100370 case r.text != nil:
371 return *r.text
372 case r.bytes != nil:
373 return hex.EncodeToString(*r.bytes)
374 case r.time != nil:
375 return r.time.String()
376 default:
377 return "<unknown>"
378 }
379}
380
Serge Bazanski3c6306b2023-09-19 11:48:44 +0000381// Index attempts to index into a structured tag field (currently only protobuf
382// fields) by a 'field.subfield.subsubfield' selector.
383//
384// The selector for Protobuf fields follows the convention from 'protorange',
385// which is a semi-standardized format used in the Protobuf ecosystem. See
386// https://pkg.go.dev/google.golang.org/protobuf/reflect/protorange for more
387// details.
388//
389// An error will be returned if the TagField is not a protobuf field or if the
390// given selector does not point to a known message field.
391func (r *TagField) Index(k string) (string, error) {
392 if r.Type.ProtoType == nil {
393 return "", fmt.Errorf("can only index proto fields")
394 }
395 k = fmt.Sprintf("(%s).%s", r.Type.ProtoType.Descriptor().FullName(), k)
396
397 var res string
398 var found bool
399 ref := r.proto.ProtoReflect()
400 protorange.Range(ref, func(values protopath.Values) error {
401 if values.Path.String() == k {
402 res = values.Index(-1).Value.String()
403 found = true
404 }
405 return nil
406 })
407
408 if !found {
409 return "", fmt.Errorf("protobuf field not found")
410 }
411 return res, nil
412}
413
Serge Bazanski424e2012023-02-15 23:31:49 +0100414// Backoff on a Machine.
415type Backoff struct {
416 // Process which established Backoff.
417 Process string
418 // Time when Backoff expires.
419 Until time.Time
420 // Cause for the Backoff as emitted by worker.
421 Cause string
422}
423
424// Active returns whether this Backoff is _currently_ active per the _local_ time.
425func (r Backoff) Active() bool {
426 return time.Now().Before(r.Until)
427}
428
429// Work being actively performed on a Machine.
430type Work struct {
431 // SessionID of the worker performing this Work.
432 SessionID uuid.UUID
433 // Process name of this Work.
434 Process string
435}
436
437// Scan implements sql.Scanner for direct scanning of query results into a
438// reflected tag value. This method is not meant to by used outside the
439// reflection package.
440func (r *TagField) Scan(src any) error {
441 if src == nil {
442 return nil
443 }
444
445 switch r.Type.NativeType {
446 case "text":
447 src2, ok := src.(string)
448 if !ok {
449 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
450 }
451 r.text = &src2
452 case "bytea":
453 src2, ok := src.([]byte)
454 if !ok {
455 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
456 }
Serge Bazanski5cd7ddf2023-04-13 12:09:53 +0200457 // Copy the bytes, as they are otherwise going to be reused by the pq library.
458 copied := make([]byte, len(src2))
459 copy(copied[:], src2)
460 r.bytes = &copied
Serge Bazanski10b21542023-04-13 12:12:05 +0200461
462 if r.Type.ProtoType != nil {
463 msg := r.Type.ProtoType.New().Interface()
464 err := proto.Unmarshal(*r.bytes, msg)
465 if err != nil {
466 klog.Warningf("Could not unmarshal %s: %v", r.Type.NativeName, err)
467 } else {
468 r.proto = msg
469 }
470 }
Serge Bazanski424e2012023-02-15 23:31:49 +0100471 case "USER-DEFINED":
472 switch r.Type.NativeUDTName {
Serge Bazanskiafd3cf82023-04-19 17:43:46 +0200473 case "provider", "provider_status":
Serge Bazanski424e2012023-02-15 23:31:49 +0100474 src2, ok := src.([]byte)
475 if !ok {
476 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
477 }
478 src3 := string(src2)
479 r.text = &src3
480 }
481 case "timestamp with time zone":
482 src2, ok := src.(time.Time)
483 if !ok {
484 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
485 }
486 r.time = &src2
Tim Windelschmidt2bffb6f2023-04-24 19:06:10 +0200487 case "bigint":
488 src2, ok := src.(int64)
489 if !ok {
490 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
491 }
492 src3 := fmt.Sprintf("%d", src2)
493 r.text = &src3
Serge Bazanski424e2012023-02-15 23:31:49 +0100494 default:
495 return fmt.Errorf("unimplemented SQL type %q", r.Type.NativeType)
496 }
497
498 return nil
499}