blob: 19bc40ac1f815de1d8053f77c422f09df7e81f4f [file] [log] [blame]
Serge Bazanski424e2012023-02-15 23:31:49 +01001// Package reflection implements facilities to retrieve information about the
2// implemented Tags and their types from a plain CockroachDB SQL connection,
3// bypassing the queries/types defined in models. Then, the retrieved Schema can
4// be used to retrieve information about machines.
5//
6// This is designed to be used in debugging facilities to allow arbitrary machine
7// introspection. It must _not_ be used in the user path, as the schema
8// extraction functionality is implemented best-effort.
9package reflection
10
11import (
12 "context"
13 "database/sql"
14 "encoding/hex"
15 "fmt"
16 "sort"
17 "strings"
18 "time"
19
20 "github.com/google/uuid"
21)
22
23// GetMachinesOpts influences the behaviour of GetMachines.
24type GetMachinesOpts struct {
25 // FilterMachine, if set, will only retrieve information about the machine with
26 // the given UUID. In case the given machine UUID does not exist in the database,
27 // an empty result will be returned and _no_ error will be set.
28 FilterMachine *uuid.UUID
29 // Strict enables strict consistency. This is not recommended for use when
30 // retrieving all machines, as such queries will compete against all currently
31 // running operations. When not enabled, the retrieval will be executed AS OF
32 // SYSTEM TIME follower_timestamp(), meaning the data might be a few seconds out
33 // of date. Regardless of the option, the returned machine data will be
34 // internally consistent, even across machines - but when not enabled the data
35 // might be stale.
36 Strict bool
37 // ExpiredBackoffs enables the retrieval of information about all machine
38 // backoffs, including expired backoff. Note that expired backoffs might be
39 // garbage collected in the future, and their long-term storage is not
40 // guaranteed.
41 ExpiredBackoffs bool
42}
43
44// GetMachines retrieves all available BMDB data about one or more machines. The
45// Schema's embedded SQL connection is used to performed the retrieval.
46//
47// Options can be specified to influenced the exact operation performed. By
48// default (with a zeroed structure or nil pointer), all machines with active
49// backoffs are retrieved with weak consistency. See GetMachineOpts to influence
50// this behaviour.
51func (r *Schema) GetMachines(ctx context.Context, opts *GetMachinesOpts) (*Reflected[[]*Machine], error) {
52 if opts == nil {
53 opts = &GetMachinesOpts{}
54 }
55
56 // We're about to build a pretty big SELECT query with a ton of joins.
57 //
58 // First, we join against work_backoff and work to get information about active
59 // work and backoffs on the machines we're retrieving.
60 //
61 // Second, we join against all the tags that are declared in the schema.
62
63 // These are the colums we'll SELECT <...> FROM
64 columns := []string{
65 "machines.machine_id",
66 "machines.machine_created_at",
67 "work_backoff.process",
68 "work_backoff.cause",
69 "work_backoff.until",
70 "work.process",
71 "work.session_id",
72 // ... tag columns will come after this.
73 }
74 // These are tha args we'll pass to the query.
75 var args []any
76
77 // Start building joins. First, against work_backoff and work.
78 backoffFilter := " AND work_backoff.until > now()"
79 if opts.ExpiredBackoffs {
80 backoffFilter = ""
81 }
82 joins := []string{
83 "LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id" + backoffFilter,
84 "LEFT JOIN work ON machines.machine_id = work.machine_id",
85 }
86
87 // Then, against tags. Also populate columns as we go along.
88 for _, tagType := range r.TagTypes {
89 joins = append(joins, fmt.Sprintf("LEFT JOIN %s ON machines.machine_id = %s.machine_id", tagType.NativeName, tagType.NativeName))
90 columns = append(columns, fmt.Sprintf("%s.machine_id", tagType.NativeName))
91 for _, fieldType := range tagType.Fields {
92 columns = append(columns, fmt.Sprintf("%s.%s", tagType.NativeName, fieldType.NativeName))
93 }
94 }
95
96 // Finalize query.
97 q := []string{
98 "SELECT",
99 strings.Join(columns, ", "),
100 "FROM machines",
101 }
102 q = append(q, joins...)
103 if !opts.Strict {
104 q = append(q, "AS OF SYSTEM TIME follower_read_timestamp()")
105 }
106 if opts.FilterMachine != nil {
107 q = append(q, "WHERE machines.machine_id = $1")
108 args = append(args, *opts.FilterMachine)
109 }
110
111 rows, err := r.db.QueryContext(ctx, strings.Join(q, "\n"), args...)
112 if err != nil {
113 return nil, fmt.Errorf("query failed: %w", err)
114 }
115 defer rows.Close()
116
117 // Okay, we can start scanning the result rows.
118 //
119 // As this is a complex join, we need to merge some rows together and discard
120 // some NULLs. We do merging/deduplication using machine_id values for the
121 // machine data, and abuse UNIQUE constraints in the work_backoff/work tables to
122 // deduplicate these.
123 //
124 // The alternative would be to rewrite this query to use array_agg, and we might
125 // do that at some point. This is only really a problem if we
126 // have _a lot_ of active work/backoffs (as that effectively duplicates all
127 // machine/tag data), which isn't the case yet. But we should keep an eye out for
128 // this.
129
130 var machines []*Machine
131 for rows.Next() {
132
133 // We need to scan this row back into columns. For constant columns we'll just
134 // create the data here and refer to it later.
135 var dests []any
136
137 // Add non-tag always-retrieved constants.
138 var mid uuid.UUID
139 var machineCreated time.Time
140 var workSession uuid.NullUUID
141 var backoffProcess, backoffCause, workProcess sql.NullString
142 var backoffUntil sql.NullTime
143
144 dests = append(dests, &mid, &machineCreated, &backoffProcess, &backoffCause, &backoffUntil, &workProcess, &workSession)
145
146 // For dynamic data, we need to keep a reference to a list of columns that are
147 // part of tags, and then refer to them later. We can't just refer back to dests
148 // as the types are erased into `any`. scannedTags is that data storage.
149 type scannedTag struct {
150 ty *TagType
151 id uuid.NullUUID
152 fields []*TagField
153 }
154 var scannedTags []*scannedTag
155 for _, tagType := range r.TagTypes {
156 tagType := tagType
157 st := scannedTag{
158 ty: &tagType,
159 }
160 scannedTags = append(scannedTags, &st)
161 dests = append(dests, &st.id)
162 for _, fieldType := range tagType.Fields {
163 fieldType := fieldType
164 field := TagField{
165 Type: &fieldType,
166 }
167 dests = append(dests, &field)
168 st.fields = append(st.fields, &field)
169
170 }
171 }
172
173 if err := rows.Scan(dests...); err != nil {
174 return nil, fmt.Errorf("scan failed: %w", err)
175 }
176
177 // Now comes the merging/deduplication.
178
179 // First, check if we are processing a new machine. If so, create a new
180 // Machine. Otherwise, pick up the previous one.
181 var machine *Machine
182 if len(machines) == 0 || machines[len(machines)-1].ID.String() != mid.String() {
183 // New machine or no machine yet.
184 machine = &Machine{
185 ID: mid,
186 Created: machineCreated,
187 Tags: make(map[string]Tag),
188 Backoffs: make(map[string]Backoff),
189 Work: make(map[string]Work),
190 }
191
192 // Collect tags into machine.
193 for _, st := range scannedTags {
194 if !st.id.Valid {
195 continue
196 }
197 var fields []TagField
198 for _, f := range st.fields {
199 fields = append(fields, *f)
200 }
201 machine.Tags[st.ty.Name()] = Tag{
202 Type: st.ty,
203 Fields: fields,
204 }
205 }
206 machines = append(machines, machine)
207 } else {
208 // Continue previous machine.
209 machine = machines[len(machines)-1]
210 }
211
212 // Do we have a backoff? Upsert it to the machine. This works because there's a
213 // UNIQUE(machine_id, process) constraint on the work_backoff table, and we're
214 // effectively rebuilding that keyspace here by indexing first by machine then by
215 // process.
216 if backoffCause.Valid && backoffProcess.Valid && backoffUntil.Valid {
217 process := backoffProcess.String
218 machine.Backoffs[process] = Backoff{
219 Cause: backoffCause.String,
220 Process: process,
221 Until: backoffUntil.Time,
222 }
223 }
224
225 // Do we have an active work item? Upsert it to the machine. Same UNIQUE
226 // constraint abuse happening here.
227 if workProcess.Valid && workSession.Valid {
228 process := workProcess.String
229 machine.Work[process] = Work{
230 SessionID: workSession.UUID,
231 Process: process,
232 }
233 }
234 }
235
236 return &Reflected[[]*Machine]{
237 Data: machines,
238 Query: strings.Join(q, " "),
239 }, nil
240}
241
242// Reflected wraps data retrieved by reflection (T) with metadata about the
243// retrieval.
244type Reflected[T any] struct {
245 Data T
246 // Effective SQL query performed on the database.
247 Query string
248}
249
250// Machine retrieved from BMDB.
251type Machine struct {
252 ID uuid.UUID
253 Created time.Time
254
255 // Tags on this machine, keyed by Tag type name (canonical, not native).
256 Tags map[string]Tag
257
258 // Backoffs on this machine, keyed by process name. By default these are only
259 // active backoffs, unless ExpiredBackoffs was set on GetMachineOptions.
260 Backoffs map[string]Backoff
261
262 // Work active on this machine, keyed by process name.
263 Work map[string]Work
264}
265
266// ActiveBackoffs retrieves a copy of a Machine's active backoffs. Note: the
267// expiration check is performed according tu current system time, so it might
268// not be consistent with the data snapshot retrieved from the database.
269func (r *Machine) ActiveBackoffs() []*Backoff {
270 var res []*Backoff
271 for _, bo := range r.Backoffs {
272 bo := bo
273 if !bo.Active() {
274 continue
275 }
276 res = append(res, &bo)
277 }
278 sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
279 return res
280}
281
282// ExpiredBackoffs retrieves a copy of a Machine's expired backoffs. Note: the
283// expiration check is performed according tu current system time, so it might
284// not be consistent with the data snapshot retrieved from the database.
285func (r *Machine) ExpiredBackoffs() []*Backoff {
286 var res []*Backoff
287 for _, bo := range r.Backoffs {
288 bo := bo
289 if bo.Active() {
290 continue
291 }
292 res = append(res, &bo)
293 }
294 sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
295 return res
296}
297
298// Tag value set on a Machine.
299type Tag struct {
300 // Type describing this tag.
301 Type *TagType
302 // Field data contained in this tag, sorted alphabetically by name.
303 Fields []TagField
304}
305
306// Field is a shorthand for returning a TagField by its name.
307func (r *Tag) Field(name string) *TagField {
308 for _, f := range r.Fields {
309 if f.Type.NativeName == name {
310 return &f
311 }
312 }
313 return nil
314}
315
316// TagField value which is part of a Tag set on a Machine.
317type TagField struct {
318 // Type describing this field.
319 Type *TagFieldType
320
321 text *string
322 bytes *[]byte
323 time *time.Time
324}
325
326// HumanValue returns a human-readable (best effort) representation of the field
327// value.
328func (r *TagField) HumanValue() string {
329 switch {
330 case r.text != nil:
331 return *r.text
332 case r.bytes != nil:
333 return hex.EncodeToString(*r.bytes)
334 case r.time != nil:
335 return r.time.String()
336 default:
337 return "<unknown>"
338 }
339}
340
341// Backoff on a Machine.
342type Backoff struct {
343 // Process which established Backoff.
344 Process string
345 // Time when Backoff expires.
346 Until time.Time
347 // Cause for the Backoff as emitted by worker.
348 Cause string
349}
350
351// Active returns whether this Backoff is _currently_ active per the _local_ time.
352func (r Backoff) Active() bool {
353 return time.Now().Before(r.Until)
354}
355
356// Work being actively performed on a Machine.
357type Work struct {
358 // SessionID of the worker performing this Work.
359 SessionID uuid.UUID
360 // Process name of this Work.
361 Process string
362}
363
364// Scan implements sql.Scanner for direct scanning of query results into a
365// reflected tag value. This method is not meant to by used outside the
366// reflection package.
367func (r *TagField) Scan(src any) error {
368 if src == nil {
369 return nil
370 }
371
372 switch r.Type.NativeType {
373 case "text":
374 src2, ok := src.(string)
375 if !ok {
376 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
377 }
378 r.text = &src2
379 case "bytea":
380 src2, ok := src.([]byte)
381 if !ok {
382 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
383 }
384 r.bytes = &src2
385 case "USER-DEFINED":
386 switch r.Type.NativeUDTName {
387 case "provider":
388 src2, ok := src.([]byte)
389 if !ok {
390 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
391 }
392 src3 := string(src2)
393 r.text = &src3
394 }
395 case "timestamp with time zone":
396 src2, ok := src.(time.Time)
397 if !ok {
398 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
399 }
400 r.time = &src2
401 default:
402 return fmt.Errorf("unimplemented SQL type %q", r.Type.NativeType)
403 }
404
405 return nil
406}