blob: 6a45d27f0e10591aaa044a131520150ed34513a4 [file] [log] [blame]
Serge Bazanski424e2012023-02-15 23:31:49 +01001// Package reflection implements facilities to retrieve information about the
2// implemented Tags and their types from a plain CockroachDB SQL connection,
3// bypassing the queries/types defined in models. Then, the retrieved Schema can
4// be used to retrieve information about machines.
5//
6// This is designed to be used in debugging facilities to allow arbitrary machine
7// introspection. It must _not_ be used in the user path, as the schema
8// extraction functionality is implemented best-effort.
9package reflection
10
11import (
12 "context"
13 "database/sql"
14 "encoding/hex"
15 "fmt"
16 "sort"
17 "strings"
18 "time"
19
Serge Bazanski10b21542023-04-13 12:12:05 +020020 "k8s.io/klog/v2"
21
Serge Bazanski424e2012023-02-15 23:31:49 +010022 "github.com/google/uuid"
Serge Bazanski10b21542023-04-13 12:12:05 +020023 "google.golang.org/protobuf/encoding/prototext"
24 "google.golang.org/protobuf/proto"
Serge Bazanski424e2012023-02-15 23:31:49 +010025)
26
27// GetMachinesOpts influences the behaviour of GetMachines.
28type GetMachinesOpts struct {
29 // FilterMachine, if set, will only retrieve information about the machine with
30 // the given UUID. In case the given machine UUID does not exist in the database,
31 // an empty result will be returned and _no_ error will be set.
32 FilterMachine *uuid.UUID
33 // Strict enables strict consistency. This is not recommended for use when
34 // retrieving all machines, as such queries will compete against all currently
35 // running operations. When not enabled, the retrieval will be executed AS OF
36 // SYSTEM TIME follower_timestamp(), meaning the data might be a few seconds out
37 // of date. Regardless of the option, the returned machine data will be
38 // internally consistent, even across machines - but when not enabled the data
39 // might be stale.
40 Strict bool
41 // ExpiredBackoffs enables the retrieval of information about all machine
42 // backoffs, including expired backoff. Note that expired backoffs might be
43 // garbage collected in the future, and their long-term storage is not
44 // guaranteed.
45 ExpiredBackoffs bool
46}
47
48// GetMachines retrieves all available BMDB data about one or more machines. The
49// Schema's embedded SQL connection is used to performed the retrieval.
50//
51// Options can be specified to influenced the exact operation performed. By
52// default (with a zeroed structure or nil pointer), all machines with active
53// backoffs are retrieved with weak consistency. See GetMachineOpts to influence
54// this behaviour.
55func (r *Schema) GetMachines(ctx context.Context, opts *GetMachinesOpts) (*Reflected[[]*Machine], error) {
56 if opts == nil {
57 opts = &GetMachinesOpts{}
58 }
59
60 // We're about to build a pretty big SELECT query with a ton of joins.
61 //
62 // First, we join against work_backoff and work to get information about active
63 // work and backoffs on the machines we're retrieving.
64 //
65 // Second, we join against all the tags that are declared in the schema.
66
67 // These are the colums we'll SELECT <...> FROM
68 columns := []string{
69 "machines.machine_id",
70 "machines.machine_created_at",
71 "work_backoff.process",
72 "work_backoff.cause",
73 "work_backoff.until",
74 "work.process",
75 "work.session_id",
76 // ... tag columns will come after this.
77 }
78 // These are tha args we'll pass to the query.
79 var args []any
80
81 // Start building joins. First, against work_backoff and work.
82 backoffFilter := " AND work_backoff.until > now()"
83 if opts.ExpiredBackoffs {
84 backoffFilter = ""
85 }
86 joins := []string{
87 "LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id" + backoffFilter,
88 "LEFT JOIN work ON machines.machine_id = work.machine_id",
89 }
90
91 // Then, against tags. Also populate columns as we go along.
92 for _, tagType := range r.TagTypes {
93 joins = append(joins, fmt.Sprintf("LEFT JOIN %s ON machines.machine_id = %s.machine_id", tagType.NativeName, tagType.NativeName))
94 columns = append(columns, fmt.Sprintf("%s.machine_id", tagType.NativeName))
95 for _, fieldType := range tagType.Fields {
96 columns = append(columns, fmt.Sprintf("%s.%s", tagType.NativeName, fieldType.NativeName))
97 }
98 }
99
100 // Finalize query.
101 q := []string{
102 "SELECT",
103 strings.Join(columns, ", "),
104 "FROM machines",
105 }
106 q = append(q, joins...)
107 if !opts.Strict {
108 q = append(q, "AS OF SYSTEM TIME follower_read_timestamp()")
109 }
110 if opts.FilterMachine != nil {
111 q = append(q, "WHERE machines.machine_id = $1")
112 args = append(args, *opts.FilterMachine)
113 }
114
115 rows, err := r.db.QueryContext(ctx, strings.Join(q, "\n"), args...)
116 if err != nil {
117 return nil, fmt.Errorf("query failed: %w", err)
118 }
119 defer rows.Close()
120
121 // Okay, we can start scanning the result rows.
122 //
123 // As this is a complex join, we need to merge some rows together and discard
124 // some NULLs. We do merging/deduplication using machine_id values for the
125 // machine data, and abuse UNIQUE constraints in the work_backoff/work tables to
126 // deduplicate these.
127 //
128 // The alternative would be to rewrite this query to use array_agg, and we might
129 // do that at some point. This is only really a problem if we
130 // have _a lot_ of active work/backoffs (as that effectively duplicates all
131 // machine/tag data), which isn't the case yet. But we should keep an eye out for
132 // this.
133
134 var machines []*Machine
135 for rows.Next() {
136
137 // We need to scan this row back into columns. For constant columns we'll just
138 // create the data here and refer to it later.
139 var dests []any
140
141 // Add non-tag always-retrieved constants.
142 var mid uuid.UUID
143 var machineCreated time.Time
144 var workSession uuid.NullUUID
145 var backoffProcess, backoffCause, workProcess sql.NullString
146 var backoffUntil sql.NullTime
147
148 dests = append(dests, &mid, &machineCreated, &backoffProcess, &backoffCause, &backoffUntil, &workProcess, &workSession)
149
150 // For dynamic data, we need to keep a reference to a list of columns that are
151 // part of tags, and then refer to them later. We can't just refer back to dests
152 // as the types are erased into `any`. scannedTags is that data storage.
153 type scannedTag struct {
154 ty *TagType
155 id uuid.NullUUID
156 fields []*TagField
157 }
158 var scannedTags []*scannedTag
159 for _, tagType := range r.TagTypes {
160 tagType := tagType
161 st := scannedTag{
162 ty: &tagType,
163 }
164 scannedTags = append(scannedTags, &st)
165 dests = append(dests, &st.id)
166 for _, fieldType := range tagType.Fields {
167 fieldType := fieldType
168 field := TagField{
169 Type: &fieldType,
170 }
171 dests = append(dests, &field)
172 st.fields = append(st.fields, &field)
173
174 }
175 }
176
177 if err := rows.Scan(dests...); err != nil {
178 return nil, fmt.Errorf("scan failed: %w", err)
179 }
180
181 // Now comes the merging/deduplication.
182
183 // First, check if we are processing a new machine. If so, create a new
184 // Machine. Otherwise, pick up the previous one.
185 var machine *Machine
186 if len(machines) == 0 || machines[len(machines)-1].ID.String() != mid.String() {
187 // New machine or no machine yet.
188 machine = &Machine{
189 ID: mid,
190 Created: machineCreated,
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100191 Tags: make(map[string]*Tag),
Serge Bazanski424e2012023-02-15 23:31:49 +0100192 Backoffs: make(map[string]Backoff),
193 Work: make(map[string]Work),
194 }
195
196 // Collect tags into machine.
197 for _, st := range scannedTags {
198 if !st.id.Valid {
199 continue
200 }
201 var fields []TagField
202 for _, f := range st.fields {
203 fields = append(fields, *f)
204 }
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100205 machine.Tags[st.ty.Name()] = &Tag{
Serge Bazanski424e2012023-02-15 23:31:49 +0100206 Type: st.ty,
207 Fields: fields,
208 }
209 }
210 machines = append(machines, machine)
211 } else {
212 // Continue previous machine.
213 machine = machines[len(machines)-1]
214 }
215
216 // Do we have a backoff? Upsert it to the machine. This works because there's a
217 // UNIQUE(machine_id, process) constraint on the work_backoff table, and we're
218 // effectively rebuilding that keyspace here by indexing first by machine then by
219 // process.
220 if backoffCause.Valid && backoffProcess.Valid && backoffUntil.Valid {
221 process := backoffProcess.String
222 machine.Backoffs[process] = Backoff{
223 Cause: backoffCause.String,
224 Process: process,
225 Until: backoffUntil.Time,
226 }
227 }
228
229 // Do we have an active work item? Upsert it to the machine. Same UNIQUE
230 // constraint abuse happening here.
231 if workProcess.Valid && workSession.Valid {
232 process := workProcess.String
233 machine.Work[process] = Work{
234 SessionID: workSession.UUID,
235 Process: process,
236 }
237 }
238 }
239
240 return &Reflected[[]*Machine]{
241 Data: machines,
242 Query: strings.Join(q, " "),
243 }, nil
244}
245
246// Reflected wraps data retrieved by reflection (T) with metadata about the
247// retrieval.
248type Reflected[T any] struct {
249 Data T
250 // Effective SQL query performed on the database.
251 Query string
252}
253
254// Machine retrieved from BMDB.
255type Machine struct {
256 ID uuid.UUID
257 Created time.Time
258
259 // Tags on this machine, keyed by Tag type name (canonical, not native).
Serge Bazanski9e7875c2023-02-20 13:55:58 +0100260 Tags map[string]*Tag
Serge Bazanski424e2012023-02-15 23:31:49 +0100261
262 // Backoffs on this machine, keyed by process name. By default these are only
263 // active backoffs, unless ExpiredBackoffs was set on GetMachineOptions.
264 Backoffs map[string]Backoff
265
266 // Work active on this machine, keyed by process name.
267 Work map[string]Work
268}
269
270// ActiveBackoffs retrieves a copy of a Machine's active backoffs. Note: the
271// expiration check is performed according tu current system time, so it might
272// not be consistent with the data snapshot retrieved from the database.
273func (r *Machine) ActiveBackoffs() []*Backoff {
274 var res []*Backoff
275 for _, bo := range r.Backoffs {
276 bo := bo
277 if !bo.Active() {
278 continue
279 }
280 res = append(res, &bo)
281 }
282 sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
283 return res
284}
285
286// ExpiredBackoffs retrieves a copy of a Machine's expired backoffs. Note: the
287// expiration check is performed according tu current system time, so it might
288// not be consistent with the data snapshot retrieved from the database.
289func (r *Machine) ExpiredBackoffs() []*Backoff {
290 var res []*Backoff
291 for _, bo := range r.Backoffs {
292 bo := bo
293 if bo.Active() {
294 continue
295 }
296 res = append(res, &bo)
297 }
298 sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process })
299 return res
300}
301
302// Tag value set on a Machine.
303type Tag struct {
304 // Type describing this tag.
305 Type *TagType
306 // Field data contained in this tag, sorted alphabetically by name.
307 Fields []TagField
308}
309
310// Field is a shorthand for returning a TagField by its name.
311func (r *Tag) Field(name string) *TagField {
312 for _, f := range r.Fields {
313 if f.Type.NativeName == name {
314 return &f
315 }
316 }
317 return nil
318}
319
320// TagField value which is part of a Tag set on a Machine.
321type TagField struct {
322 // Type describing this field.
323 Type *TagFieldType
324
325 text *string
326 bytes *[]byte
327 time *time.Time
Serge Bazanski10b21542023-04-13 12:12:05 +0200328 proto proto.Message
Serge Bazanski424e2012023-02-15 23:31:49 +0100329}
330
331// HumanValue returns a human-readable (best effort) representation of the field
332// value.
333func (r *TagField) HumanValue() string {
334 switch {
Serge Bazanski10b21542023-04-13 12:12:05 +0200335 case r.proto != nil:
336 opts := prototext.MarshalOptions{
337 Multiline: true,
338 Indent: "\t",
339 }
340 return opts.Format(r.proto)
Serge Bazanski424e2012023-02-15 23:31:49 +0100341 case r.text != nil:
342 return *r.text
343 case r.bytes != nil:
344 return hex.EncodeToString(*r.bytes)
345 case r.time != nil:
346 return r.time.String()
347 default:
348 return "<unknown>"
349 }
350}
351
352// Backoff on a Machine.
353type Backoff struct {
354 // Process which established Backoff.
355 Process string
356 // Time when Backoff expires.
357 Until time.Time
358 // Cause for the Backoff as emitted by worker.
359 Cause string
360}
361
362// Active returns whether this Backoff is _currently_ active per the _local_ time.
363func (r Backoff) Active() bool {
364 return time.Now().Before(r.Until)
365}
366
367// Work being actively performed on a Machine.
368type Work struct {
369 // SessionID of the worker performing this Work.
370 SessionID uuid.UUID
371 // Process name of this Work.
372 Process string
373}
374
375// Scan implements sql.Scanner for direct scanning of query results into a
376// reflected tag value. This method is not meant to by used outside the
377// reflection package.
378func (r *TagField) Scan(src any) error {
379 if src == nil {
380 return nil
381 }
382
383 switch r.Type.NativeType {
384 case "text":
385 src2, ok := src.(string)
386 if !ok {
387 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
388 }
389 r.text = &src2
390 case "bytea":
391 src2, ok := src.([]byte)
392 if !ok {
393 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
394 }
Serge Bazanski5cd7ddf2023-04-13 12:09:53 +0200395 // Copy the bytes, as they are otherwise going to be reused by the pq library.
396 copied := make([]byte, len(src2))
397 copy(copied[:], src2)
398 r.bytes = &copied
Serge Bazanski10b21542023-04-13 12:12:05 +0200399
400 if r.Type.ProtoType != nil {
401 msg := r.Type.ProtoType.New().Interface()
402 err := proto.Unmarshal(*r.bytes, msg)
403 if err != nil {
404 klog.Warningf("Could not unmarshal %s: %v", r.Type.NativeName, err)
405 } else {
406 r.proto = msg
407 }
408 }
Serge Bazanski424e2012023-02-15 23:31:49 +0100409 case "USER-DEFINED":
410 switch r.Type.NativeUDTName {
411 case "provider":
412 src2, ok := src.([]byte)
413 if !ok {
414 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
415 }
416 src3 := string(src2)
417 r.text = &src3
418 }
419 case "timestamp with time zone":
420 src2, ok := src.(time.Time)
421 if !ok {
422 return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src)
423 }
424 r.time = &src2
425 default:
426 return fmt.Errorf("unimplemented SQL type %q", r.Type.NativeType)
427 }
428
429 return nil
430}