Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 1 | // Package reflection implements facilities to retrieve information about the |
| 2 | // implemented Tags and their types from a plain CockroachDB SQL connection, |
| 3 | // bypassing the queries/types defined in models. Then, the retrieved Schema can |
| 4 | // be used to retrieve information about machines. |
| 5 | // |
| 6 | // This is designed to be used in debugging facilities to allow arbitrary machine |
| 7 | // introspection. It must _not_ be used in the user path, as the schema |
| 8 | // extraction functionality is implemented best-effort. |
| 9 | package reflection |
| 10 | |
| 11 | import ( |
| 12 | "context" |
| 13 | "database/sql" |
| 14 | "encoding/hex" |
| 15 | "fmt" |
| 16 | "sort" |
| 17 | "strings" |
| 18 | "time" |
| 19 | |
| 20 | "github.com/google/uuid" |
| 21 | ) |
| 22 | |
| 23 | // GetMachinesOpts influences the behaviour of GetMachines. |
| 24 | type GetMachinesOpts struct { |
| 25 | // FilterMachine, if set, will only retrieve information about the machine with |
| 26 | // the given UUID. In case the given machine UUID does not exist in the database, |
| 27 | // an empty result will be returned and _no_ error will be set. |
| 28 | FilterMachine *uuid.UUID |
| 29 | // Strict enables strict consistency. This is not recommended for use when |
| 30 | // retrieving all machines, as such queries will compete against all currently |
| 31 | // running operations. When not enabled, the retrieval will be executed AS OF |
| 32 | // SYSTEM TIME follower_timestamp(), meaning the data might be a few seconds out |
| 33 | // of date. Regardless of the option, the returned machine data will be |
| 34 | // internally consistent, even across machines - but when not enabled the data |
| 35 | // might be stale. |
| 36 | Strict bool |
| 37 | // ExpiredBackoffs enables the retrieval of information about all machine |
| 38 | // backoffs, including expired backoff. Note that expired backoffs might be |
| 39 | // garbage collected in the future, and their long-term storage is not |
| 40 | // guaranteed. |
| 41 | ExpiredBackoffs bool |
| 42 | } |
| 43 | |
| 44 | // GetMachines retrieves all available BMDB data about one or more machines. The |
| 45 | // Schema's embedded SQL connection is used to performed the retrieval. |
| 46 | // |
| 47 | // Options can be specified to influenced the exact operation performed. By |
| 48 | // default (with a zeroed structure or nil pointer), all machines with active |
| 49 | // backoffs are retrieved with weak consistency. See GetMachineOpts to influence |
| 50 | // this behaviour. |
| 51 | func (r *Schema) GetMachines(ctx context.Context, opts *GetMachinesOpts) (*Reflected[[]*Machine], error) { |
| 52 | if opts == nil { |
| 53 | opts = &GetMachinesOpts{} |
| 54 | } |
| 55 | |
| 56 | // We're about to build a pretty big SELECT query with a ton of joins. |
| 57 | // |
| 58 | // First, we join against work_backoff and work to get information about active |
| 59 | // work and backoffs on the machines we're retrieving. |
| 60 | // |
| 61 | // Second, we join against all the tags that are declared in the schema. |
| 62 | |
| 63 | // These are the colums we'll SELECT <...> FROM |
| 64 | columns := []string{ |
| 65 | "machines.machine_id", |
| 66 | "machines.machine_created_at", |
| 67 | "work_backoff.process", |
| 68 | "work_backoff.cause", |
| 69 | "work_backoff.until", |
| 70 | "work.process", |
| 71 | "work.session_id", |
| 72 | // ... tag columns will come after this. |
| 73 | } |
| 74 | // These are tha args we'll pass to the query. |
| 75 | var args []any |
| 76 | |
| 77 | // Start building joins. First, against work_backoff and work. |
| 78 | backoffFilter := " AND work_backoff.until > now()" |
| 79 | if opts.ExpiredBackoffs { |
| 80 | backoffFilter = "" |
| 81 | } |
| 82 | joins := []string{ |
| 83 | "LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id" + backoffFilter, |
| 84 | "LEFT JOIN work ON machines.machine_id = work.machine_id", |
| 85 | } |
| 86 | |
| 87 | // Then, against tags. Also populate columns as we go along. |
| 88 | for _, tagType := range r.TagTypes { |
| 89 | joins = append(joins, fmt.Sprintf("LEFT JOIN %s ON machines.machine_id = %s.machine_id", tagType.NativeName, tagType.NativeName)) |
| 90 | columns = append(columns, fmt.Sprintf("%s.machine_id", tagType.NativeName)) |
| 91 | for _, fieldType := range tagType.Fields { |
| 92 | columns = append(columns, fmt.Sprintf("%s.%s", tagType.NativeName, fieldType.NativeName)) |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | // Finalize query. |
| 97 | q := []string{ |
| 98 | "SELECT", |
| 99 | strings.Join(columns, ", "), |
| 100 | "FROM machines", |
| 101 | } |
| 102 | q = append(q, joins...) |
| 103 | if !opts.Strict { |
| 104 | q = append(q, "AS OF SYSTEM TIME follower_read_timestamp()") |
| 105 | } |
| 106 | if opts.FilterMachine != nil { |
| 107 | q = append(q, "WHERE machines.machine_id = $1") |
| 108 | args = append(args, *opts.FilterMachine) |
| 109 | } |
| 110 | |
| 111 | rows, err := r.db.QueryContext(ctx, strings.Join(q, "\n"), args...) |
| 112 | if err != nil { |
| 113 | return nil, fmt.Errorf("query failed: %w", err) |
| 114 | } |
| 115 | defer rows.Close() |
| 116 | |
| 117 | // Okay, we can start scanning the result rows. |
| 118 | // |
| 119 | // As this is a complex join, we need to merge some rows together and discard |
| 120 | // some NULLs. We do merging/deduplication using machine_id values for the |
| 121 | // machine data, and abuse UNIQUE constraints in the work_backoff/work tables to |
| 122 | // deduplicate these. |
| 123 | // |
| 124 | // The alternative would be to rewrite this query to use array_agg, and we might |
| 125 | // do that at some point. This is only really a problem if we |
| 126 | // have _a lot_ of active work/backoffs (as that effectively duplicates all |
| 127 | // machine/tag data), which isn't the case yet. But we should keep an eye out for |
| 128 | // this. |
| 129 | |
| 130 | var machines []*Machine |
| 131 | for rows.Next() { |
| 132 | |
| 133 | // We need to scan this row back into columns. For constant columns we'll just |
| 134 | // create the data here and refer to it later. |
| 135 | var dests []any |
| 136 | |
| 137 | // Add non-tag always-retrieved constants. |
| 138 | var mid uuid.UUID |
| 139 | var machineCreated time.Time |
| 140 | var workSession uuid.NullUUID |
| 141 | var backoffProcess, backoffCause, workProcess sql.NullString |
| 142 | var backoffUntil sql.NullTime |
| 143 | |
| 144 | dests = append(dests, &mid, &machineCreated, &backoffProcess, &backoffCause, &backoffUntil, &workProcess, &workSession) |
| 145 | |
| 146 | // For dynamic data, we need to keep a reference to a list of columns that are |
| 147 | // part of tags, and then refer to them later. We can't just refer back to dests |
| 148 | // as the types are erased into `any`. scannedTags is that data storage. |
| 149 | type scannedTag struct { |
| 150 | ty *TagType |
| 151 | id uuid.NullUUID |
| 152 | fields []*TagField |
| 153 | } |
| 154 | var scannedTags []*scannedTag |
| 155 | for _, tagType := range r.TagTypes { |
| 156 | tagType := tagType |
| 157 | st := scannedTag{ |
| 158 | ty: &tagType, |
| 159 | } |
| 160 | scannedTags = append(scannedTags, &st) |
| 161 | dests = append(dests, &st.id) |
| 162 | for _, fieldType := range tagType.Fields { |
| 163 | fieldType := fieldType |
| 164 | field := TagField{ |
| 165 | Type: &fieldType, |
| 166 | } |
| 167 | dests = append(dests, &field) |
| 168 | st.fields = append(st.fields, &field) |
| 169 | |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | if err := rows.Scan(dests...); err != nil { |
| 174 | return nil, fmt.Errorf("scan failed: %w", err) |
| 175 | } |
| 176 | |
| 177 | // Now comes the merging/deduplication. |
| 178 | |
| 179 | // First, check if we are processing a new machine. If so, create a new |
| 180 | // Machine. Otherwise, pick up the previous one. |
| 181 | var machine *Machine |
| 182 | if len(machines) == 0 || machines[len(machines)-1].ID.String() != mid.String() { |
| 183 | // New machine or no machine yet. |
| 184 | machine = &Machine{ |
| 185 | ID: mid, |
| 186 | Created: machineCreated, |
Serge Bazanski | 9e7875c | 2023-02-20 13:55:58 +0100 | [diff] [blame^] | 187 | Tags: make(map[string]*Tag), |
Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 188 | Backoffs: make(map[string]Backoff), |
| 189 | Work: make(map[string]Work), |
| 190 | } |
| 191 | |
| 192 | // Collect tags into machine. |
| 193 | for _, st := range scannedTags { |
| 194 | if !st.id.Valid { |
| 195 | continue |
| 196 | } |
| 197 | var fields []TagField |
| 198 | for _, f := range st.fields { |
| 199 | fields = append(fields, *f) |
| 200 | } |
Serge Bazanski | 9e7875c | 2023-02-20 13:55:58 +0100 | [diff] [blame^] | 201 | machine.Tags[st.ty.Name()] = &Tag{ |
Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 202 | Type: st.ty, |
| 203 | Fields: fields, |
| 204 | } |
| 205 | } |
| 206 | machines = append(machines, machine) |
| 207 | } else { |
| 208 | // Continue previous machine. |
| 209 | machine = machines[len(machines)-1] |
| 210 | } |
| 211 | |
| 212 | // Do we have a backoff? Upsert it to the machine. This works because there's a |
| 213 | // UNIQUE(machine_id, process) constraint on the work_backoff table, and we're |
| 214 | // effectively rebuilding that keyspace here by indexing first by machine then by |
| 215 | // process. |
| 216 | if backoffCause.Valid && backoffProcess.Valid && backoffUntil.Valid { |
| 217 | process := backoffProcess.String |
| 218 | machine.Backoffs[process] = Backoff{ |
| 219 | Cause: backoffCause.String, |
| 220 | Process: process, |
| 221 | Until: backoffUntil.Time, |
| 222 | } |
| 223 | } |
| 224 | |
| 225 | // Do we have an active work item? Upsert it to the machine. Same UNIQUE |
| 226 | // constraint abuse happening here. |
| 227 | if workProcess.Valid && workSession.Valid { |
| 228 | process := workProcess.String |
| 229 | machine.Work[process] = Work{ |
| 230 | SessionID: workSession.UUID, |
| 231 | Process: process, |
| 232 | } |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | return &Reflected[[]*Machine]{ |
| 237 | Data: machines, |
| 238 | Query: strings.Join(q, " "), |
| 239 | }, nil |
| 240 | } |
| 241 | |
| 242 | // Reflected wraps data retrieved by reflection (T) with metadata about the |
| 243 | // retrieval. |
| 244 | type Reflected[T any] struct { |
| 245 | Data T |
| 246 | // Effective SQL query performed on the database. |
| 247 | Query string |
| 248 | } |
| 249 | |
| 250 | // Machine retrieved from BMDB. |
| 251 | type Machine struct { |
| 252 | ID uuid.UUID |
| 253 | Created time.Time |
| 254 | |
| 255 | // Tags on this machine, keyed by Tag type name (canonical, not native). |
Serge Bazanski | 9e7875c | 2023-02-20 13:55:58 +0100 | [diff] [blame^] | 256 | Tags map[string]*Tag |
Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 257 | |
| 258 | // Backoffs on this machine, keyed by process name. By default these are only |
| 259 | // active backoffs, unless ExpiredBackoffs was set on GetMachineOptions. |
| 260 | Backoffs map[string]Backoff |
| 261 | |
| 262 | // Work active on this machine, keyed by process name. |
| 263 | Work map[string]Work |
| 264 | } |
| 265 | |
| 266 | // ActiveBackoffs retrieves a copy of a Machine's active backoffs. Note: the |
| 267 | // expiration check is performed according tu current system time, so it might |
| 268 | // not be consistent with the data snapshot retrieved from the database. |
| 269 | func (r *Machine) ActiveBackoffs() []*Backoff { |
| 270 | var res []*Backoff |
| 271 | for _, bo := range r.Backoffs { |
| 272 | bo := bo |
| 273 | if !bo.Active() { |
| 274 | continue |
| 275 | } |
| 276 | res = append(res, &bo) |
| 277 | } |
| 278 | sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process }) |
| 279 | return res |
| 280 | } |
| 281 | |
| 282 | // ExpiredBackoffs retrieves a copy of a Machine's expired backoffs. Note: the |
| 283 | // expiration check is performed according tu current system time, so it might |
| 284 | // not be consistent with the data snapshot retrieved from the database. |
| 285 | func (r *Machine) ExpiredBackoffs() []*Backoff { |
| 286 | var res []*Backoff |
| 287 | for _, bo := range r.Backoffs { |
| 288 | bo := bo |
| 289 | if bo.Active() { |
| 290 | continue |
| 291 | } |
| 292 | res = append(res, &bo) |
| 293 | } |
| 294 | sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process }) |
| 295 | return res |
| 296 | } |
| 297 | |
| 298 | // Tag value set on a Machine. |
| 299 | type Tag struct { |
| 300 | // Type describing this tag. |
| 301 | Type *TagType |
| 302 | // Field data contained in this tag, sorted alphabetically by name. |
| 303 | Fields []TagField |
| 304 | } |
| 305 | |
| 306 | // Field is a shorthand for returning a TagField by its name. |
| 307 | func (r *Tag) Field(name string) *TagField { |
| 308 | for _, f := range r.Fields { |
| 309 | if f.Type.NativeName == name { |
| 310 | return &f |
| 311 | } |
| 312 | } |
| 313 | return nil |
| 314 | } |
| 315 | |
| 316 | // TagField value which is part of a Tag set on a Machine. |
| 317 | type TagField struct { |
| 318 | // Type describing this field. |
| 319 | Type *TagFieldType |
| 320 | |
| 321 | text *string |
| 322 | bytes *[]byte |
| 323 | time *time.Time |
| 324 | } |
| 325 | |
| 326 | // HumanValue returns a human-readable (best effort) representation of the field |
| 327 | // value. |
| 328 | func (r *TagField) HumanValue() string { |
| 329 | switch { |
| 330 | case r.text != nil: |
| 331 | return *r.text |
| 332 | case r.bytes != nil: |
| 333 | return hex.EncodeToString(*r.bytes) |
| 334 | case r.time != nil: |
| 335 | return r.time.String() |
| 336 | default: |
| 337 | return "<unknown>" |
| 338 | } |
| 339 | } |
| 340 | |
| 341 | // Backoff on a Machine. |
| 342 | type Backoff struct { |
| 343 | // Process which established Backoff. |
| 344 | Process string |
| 345 | // Time when Backoff expires. |
| 346 | Until time.Time |
| 347 | // Cause for the Backoff as emitted by worker. |
| 348 | Cause string |
| 349 | } |
| 350 | |
| 351 | // Active returns whether this Backoff is _currently_ active per the _local_ time. |
| 352 | func (r Backoff) Active() bool { |
| 353 | return time.Now().Before(r.Until) |
| 354 | } |
| 355 | |
| 356 | // Work being actively performed on a Machine. |
| 357 | type Work struct { |
| 358 | // SessionID of the worker performing this Work. |
| 359 | SessionID uuid.UUID |
| 360 | // Process name of this Work. |
| 361 | Process string |
| 362 | } |
| 363 | |
| 364 | // Scan implements sql.Scanner for direct scanning of query results into a |
| 365 | // reflected tag value. This method is not meant to by used outside the |
| 366 | // reflection package. |
| 367 | func (r *TagField) Scan(src any) error { |
| 368 | if src == nil { |
| 369 | return nil |
| 370 | } |
| 371 | |
| 372 | switch r.Type.NativeType { |
| 373 | case "text": |
| 374 | src2, ok := src.(string) |
| 375 | if !ok { |
| 376 | return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) |
| 377 | } |
| 378 | r.text = &src2 |
| 379 | case "bytea": |
| 380 | src2, ok := src.([]byte) |
| 381 | if !ok { |
| 382 | return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) |
| 383 | } |
| 384 | r.bytes = &src2 |
| 385 | case "USER-DEFINED": |
| 386 | switch r.Type.NativeUDTName { |
| 387 | case "provider": |
| 388 | src2, ok := src.([]byte) |
| 389 | if !ok { |
| 390 | return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) |
| 391 | } |
| 392 | src3 := string(src2) |
| 393 | r.text = &src3 |
| 394 | } |
| 395 | case "timestamp with time zone": |
| 396 | src2, ok := src.(time.Time) |
| 397 | if !ok { |
| 398 | return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) |
| 399 | } |
| 400 | r.time = &src2 |
| 401 | default: |
| 402 | return fmt.Errorf("unimplemented SQL type %q", r.Type.NativeType) |
| 403 | } |
| 404 | |
| 405 | return nil |
| 406 | } |