blob: 5942a305b0f89d2d37be393f050bceb6c757dc9c [file] [log] [blame]
Serge Bazanski424e2012023-02-15 23:31:49 +01001// Package reflection implements facilities to retrieve information about the
2// implemented Tags and their types from a plain CockroachDB SQL connection,
3// bypassing the queries/types defined in models. Then, the retrieved Schema can
4// be used to retrieve information about machines.
5//
6// This is designed to be used in debugging facilities to allow arbitrary machine
7// introspection. It must _not_ be used in the user path, as the schema
8// extraction functionality is implemented best-effort.
9package reflection
10
11import (
12 "context"
13 "database/sql"
14 "encoding/hex"
15 "fmt"
16 "sort"
17 "strings"
18 "time"
19
Serge Bazanski10b21542023-04-13 12:12:05 +020020 "k8s.io/klog/v2"
21
Serge Bazanski424e2012023-02-15 23:31:49 +010022 "github.com/google/uuid"
Serge Bazanski10b21542023-04-13 12:12:05 +020023 "google.golang.org/protobuf/encoding/prototext"
24 "google.golang.org/protobuf/proto"
Serge Bazanski424e2012023-02-15 23:31:49 +010025)
26
27// GetMachinesOpts influences the behaviour of GetMachines.
28type GetMachinesOpts struct {
29 // FilterMachine, if set, will only retrieve information about the machine with
30 // the given UUID. In case the given machine UUID does not exist in the database,
31 // an empty result will be returned and _no_ error will be set.
32 FilterMachine *uuid.UUID
33 // Strict enables strict consistency. This is not recommended for use when
34 // retrieving all machines, as such queries will compete against all currently
35 // running operations. When not enabled, the retrieval will be executed AS OF
36 // SYSTEM TIME follower_timestamp(), meaning the data might be a few seconds out
37 // of date. Regardless of the option, the returned machine data will be
38 // internally consistent, even across machines - but when not enabled the data
39 // might be stale.
40 Strict bool
41 // ExpiredBackoffs enables the retrieval of information about all machine
42 // backoffs, including expired backoff. Note that expired backoffs might be
43 // garbage collected in the future, and their long-term storage is not
44 // guaranteed.
45 ExpiredBackoffs bool
46}
47
48// GetMachines retrieves all available BMDB data about one or more machines. The
49// Schema's embedded SQL connection is used to performed the retrieval.
50//
51// Options can be specified to influenced the exact operation performed. By
52// default (with a zeroed structure or nil pointer), all machines with active
53// backoffs are retrieved with weak consistency. See GetMachineOpts to influence
54// this behaviour.
55func (r *Schema) GetMachines(ctx context.Context, opts *GetMachinesOpts) (*Reflected[[]*Machine], error) {
56 if opts == nil {
57 opts = &GetMachinesOpts{}
58 }
59
60 // We're about to build a pretty big SELECT query with a ton of joins.
61 //
62 // First, we join against work_backoff and work to get information about active
63 // work and backoffs on the machines we're retrieving.
64 //
65 // Second, we join against all the tags that are declared in the schema.
66
67 // These are the colums we'll SELECT <...> FROM
68 columns := []string{
69 "machines.machine_id",
70 "machines.machine_created_at",
71 "work_backoff.process",
72 "work_backoff.cause",
73 "work_backoff.until",
74 "work.process",
75 "work.session_id",
76 // ... tag columns will come after this.
77 }
78 // These are tha args we'll pass to the query.
79 var args []any
80
81 // Start building joins. First, against work_backoff and work.
82 backoffFilter := " AND work_backoff.until > now()"
83 if opts.ExpiredBackoffs {
84 backoffFilter = ""
85 }
86 joins := []string{
87 "LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id" + backoffFilter,
88 "LEFT JOIN work ON machines.machine_id = work.machine_id",
89 }
90
91 // Then, against tags. Also populate columns as we go along.
92 for _, tagType := range r.TagTypes {
93 joins = append(joins, fmt.Sprintf("LEFT JOIN %s ON machines.machine_id = %s.machine_id", tagType.NativeName, tagType.NativeName))
94 columns = append(columns, fmt.Sprintf("%s.machine_id", tagType.NativeName))
95 for _, fieldType := range tagType.Fields {
96 columns = append(columns, fmt.Sprintf("%s.%s", tagType.NativeName, fieldType.NativeName))
97 }
98 }
99
100 // Finalize query.
101 q := []string{
102 "SELECT",
103 strings.Join(columns, ", "),
104 "FROM machines",
105 }
106 q = append(q, joins...)
107 if !opts.Strict {
108 q = append(q, "AS OF SYSTEM TIME follower_read_timestamp()")
109 }
110 if opts.FilterMachine != nil {
111 q = append(q, "WHERE machines.machine_id = $1")
112 args = append(args, *opts.FilterMachine)
113 }
Tim Windelschmidtae7e3ed2023-04-17 23:15:39 +0200114 q = append(q, "ORDER BY machines.machine_id")
Serge Bazanski424e2012023-02-15 23:31:49 +0100115
116 rows, err := r.db.QueryContext(ctx, strings.Join(q, "\n"), args...)
117 if err != nil {
118 return nil, fmt.Errorf("query failed: %w", err)
119 }
120 defer rows.Close()
121
122 // Okay, we can start scanning the result rows.
123 //
124 // As this is a complex join, we need to merge some rows together and discard
125 // some NULLs. We do merging/deduplication using machine_id values for the
126 // machine data, and abuse UNIQUE constraints in the work_backoff/work tables to
127 // deduplicate these.
128 //
129 // The alternative would be to rewrite this query to use array_agg, and we might
130 // do that at some point. This is only really a problem if we
131 // have _a lot_ of active work/backoffs (as that effectively duplicates all
132 // machine/tag data), which isn't the case yet. But we should keep an eye out for
133 // this.
134
135 var machines []*Machine
136 for rows.Next() {
137
138 // We need to scan this row back into columns. For constant columns we'll just
139 // create the data here and refer to it later.
140 var dests []any
141
142 // Add non-tag always-retrieved constants.
143 var mid uuid.UUID
144 var machineCreated time.Time
145 var workSession uuid.NullUUID
146 var backoffProcess, backoffCause, workProcess sql.NullString
147 var backoffUntil sql.NullTime
148
149 dests = append(dests, &mid, &machineCreated, &backoffProcess, &backoffCause, &backoffUntil, &workProcess, &workSession)
150
151 // For dynamic data, we need to keep a reference to a list of columns that are
152 // part of tags, and then refer to them later. We can't just refer back to dests
153 // as the types are erased into `any`. scannedTags is that data storage.
154 type scannedTag struct {
155 ty *TagType
156 id uuid.NullUUID
157 fields []*TagField
158 }
159 var scannedTags []*scannedTag
160 for _, tagType := range r.TagTypes {
161 tagType := tagType
162 st := scannedTag{
163 ty: &tagType,
164 }
165 scannedTags = append(scannedTags, &st)
166 dests = append(dests, &st.id)
167 for _, fieldType := range tagType.Fields {
168 fieldType := fieldType
169 field := TagField{
170 Type: &fieldType,
171 }
172 dests = append(dests, &field)
173 st.fields = append(st.fields, &field)
174
175 }
176 }
177
178 if err := rows.Scan(dests...); err != nil {
179 return nil, fmt.Errorf("scan failed: %w", err)
180 }
181
182 // Now comes the merging/deduplication.
183
184 // First, check if we are processing a new machine. If so, create a new
185 // Machine. Otherwise, pick up the previous one.
186 var machine *Machine
187 if len(machines) == 0 || machines[len(machines)-1].ID.String() != mid.String() {
188 // New machine or no machine yet.
189 machine = &Machine{
190 ID: mid,
191 Created: machineCreated,
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100192 Tags: make(map[string]*Tag),
Serge Bazanski424e2012023-02-15 23:31:49 +0100193 Backoffs: make(map[string]Backoff),
194 Work: make(map[string]Work),
195 }
196
197 // Collect tags into machine.
198 for _, st := range scannedTags {
199 if !st.id.Valid {
200 continue
201 }
202 var fields []TagField
203 for _, f := range st.fields {
204 fields = append(fields, *f)
205 }
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100206 machine.Tags[st.ty.Name()] = &Tag{
Serge Bazanski424e2012023-02-15 23:31:49 +0100207 Type: st.ty,
208 Fields: fields,
209 }
210 }
211 machines = append(machines, machine)
212 } else {
213 // Continue previous machine.
214 machine = machines[len(machines)-1]
215 }
216
217 // Do we have a backoff? Upsert it to the machine. This works because there's a
218 // UNIQUE(machine_id, process) constraint on the work_backoff table, and we're
219 // effectively rebuilding that keyspace here by indexing first by machine then by
220 // process.
221 if backoffCause.Valid && backoffProcess.Valid && backoffUntil.Valid {
222 process := backoffProcess.String
223 machine.Backoffs[process] = Backoff{
224 Cause: backoffCause.String,
225 Process: process,
226 Until: backoffUntil.Time,
227 }
228 }
229
230 // Do we have an active work item? Upsert it to the machine. Same UNIQUE
231 // constraint abuse happening here.
232 if workProcess.Valid && workSession.Valid {
233 process := workProcess.String
234 machine.Work[process] = Work{
235 SessionID: workSession.UUID,
236 Process: process,
237 }
238 }
239 }
240
241 return &Reflected[[]*Machine]{
242 Data: machines,
243 Query: strings.Join(q, " "),
244 }, nil
245}
246
247// Reflected wraps data retrieved by reflection (T) with metadata about the
248// retrieval.
249type Reflected[T any] struct {
250 Data T
251 // Effective SQL query performed on the database.
252 Query string
253}
254
255// Machine retrieved from BMDB.
256type Machine struct {
257 ID uuid.UUID
258 Created time.Time
259
260 // Tags on this machine, keyed by Tag type name (canonical, not native).
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100261 Tags map[string]*Tag
Serge Bazanski424e2012023-02-15 23:31:49 +0100262
263 // Backoffs on this machine, keyed by process name. By default these are only
264 // active backoffs, unless ExpiredBackoffs was set on GetMachineOptions.
265 Backoffs map[string]Backoff
266
267 // Work active on this machine, keyed by process name.
268 Work map[string]Work
269}
270
271// ActiveBackoffs retrieves a copy of a Machine's active backoffs. Note: the
272// expiration check is performed according tu current system time, so it might
273// not be consistent with the data snapshot retrieved from the database.
274func (r *Machine) ActiveBackoffs() []*Backoff {
275 var res []*Backoff
276 for _, bo := range r.Backoffs {
277 bo := bo
278 if !bo.Active() {
279 continue
280 }
281 res = append(res, &bo)
282 }
283 sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
284 return res
285}
286
287// ExpiredBackoffs retrieves a copy of a Machine's expired backoffs. Note: the
288// expiration check is performed according tu current system time, so it might
289// not be consistent with the data snapshot retrieved from the database.
290func (r *Machine) ExpiredBackoffs() []*Backoff {
291 var res []*Backoff
292 for _, bo := range r.Backoffs {
293 bo := bo
294 if bo.Active() {
295 continue
296 }
297 res = append(res, &bo)
298 }
299 sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
300 return res
301}
302
303// Tag value set on a Machine.
304type Tag struct {
305 // Type describing this tag.
306 Type *TagType
307 // Field data contained in this tag, sorted alphabetically by name.
308 Fields []TagField
309}
310
311// Field is a shorthand for returning a TagField by its name.
312func (r *Tag) Field(name string) *TagField {
313 for _, f := range r.Fields {
314 if f.Type.NativeName == name {
315 return &f
316 }
317 }
318 return nil
319}
320
321// TagField value which is part of a Tag set on a Machine.
322type TagField struct {
323 // Type describing this field.
324 Type *TagFieldType
325
326 text *string
327 bytes *[]byte
328 time *time.Time
Serge Bazanski10b21542023-04-13 12:12:05 +0200329 proto proto.Message
Serge Bazanski424e2012023-02-15 23:31:49 +0100330}
331
332// HumanValue returns a human-readable (best effort) representation of the field
333// value.
334func (r *TagField) HumanValue() string {
335 switch {
Serge Bazanski10b21542023-04-13 12:12:05 +0200336 case r.proto != nil:
337 opts := prototext.MarshalOptions{
338 Multiline: true,
339 Indent: "\t",
340 }
341 return opts.Format(r.proto)
Serge Bazanski424e2012023-02-15 23:31:49 +0100342 case r.text != nil:
343 return *r.text
344 case r.bytes != nil:
345 return hex.EncodeToString(*r.bytes)
346 case r.time != nil:
347 return r.time.String()
348 default:
349 return "<unknown>"
350 }
351}
352
353// Backoff on a Machine.
354type Backoff struct {
355 // Process which established Backoff.
356 Process string
357 // Time when Backoff expires.
358 Until time.Time
359 // Cause for the Backoff as emitted by worker.
360 Cause string
361}
362
363// Active returns whether this Backoff is _currently_ active per the _local_ time.
364func (r Backoff) Active() bool {
365 return time.Now().Before(r.Until)
366}
367
368// Work being actively performed on a Machine.
369type Work struct {
370 // SessionID of the worker performing this Work.
371 SessionID uuid.UUID
372 // Process name of this Work.
373 Process string
374}
375
376// Scan implements sql.Scanner for direct scanning of query results into a
377// reflected tag value. This method is not meant to by used outside the
378// reflection package.
379func (r *TagField) Scan(src any) error {
380 if src == nil {
381 return nil
382 }
383
384 switch r.Type.NativeType {
385 case "text":
386 src2, ok := src.(string)
387 if !ok {
388 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
389 }
390 r.text = &src2
391 case "bytea":
392 src2, ok := src.([]byte)
393 if !ok {
394 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
395 }
Serge Bazanski5cd7ddf2023-04-13 12:09:53 +0200396 // Copy the bytes, as they are otherwise going to be reused by the pq library.
397 copied := make([]byte, len(src2))
398 copy(copied[:], src2)
399 r.bytes = &copied
Serge Bazanski10b21542023-04-13 12:12:05 +0200400
401 if r.Type.ProtoType != nil {
402 msg := r.Type.ProtoType.New().Interface()
403 err := proto.Unmarshal(*r.bytes, msg)
404 if err != nil {
405 klog.Warningf("Could not unmarshal %s: %v", r.Type.NativeName, err)
406 } else {
407 r.proto = msg
408 }
409 }
Serge Bazanski424e2012023-02-15 23:31:49 +0100410 case "USER-DEFINED":
411 switch r.Type.NativeUDTName {
Serge Bazanskiafd3cf82023-04-19 17:43:46 +0200412 case "provider", "provider_status":
Serge Bazanski424e2012023-02-15 23:31:49 +0100413 src2, ok := src.([]byte)
414 if !ok {
415 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
416 }
417 src3 := string(src2)
418 r.text = &src3
419 }
420 case "timestamp with time zone":
421 src2, ok := src.(time.Time)
422 if !ok {
423 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
424 }
425 r.time = &src2
Tim Windelschmidt2bffb6f2023-04-24 19:06:10 +0200426 case "bigint":
427 src2, ok := src.(int64)
428 if !ok {
429 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
430 }
431 src3 := fmt.Sprintf("%d", src2)
432 r.text = &src3
Serge Bazanski424e2012023-02-15 23:31:49 +0100433 default:
434 return fmt.Errorf("unimplemented SQL type %q", r.Type.NativeType)
435 }
436
437 return nil
438}