Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 1 | // Package reflection implements facilities to retrieve information about the |
| 2 | // implemented Tags and their types from a plain CockroachDB SQL connection, |
| 3 | // bypassing the queries/types defined in models. Then, the retrieved Schema can |
| 4 | // be used to retrieve information about machines. |
| 5 | // |
| 6 | // This is designed to be used in debugging facilities to allow arbitrary machine |
| 7 | // introspection. It must _not_ be used in the user path, as the schema |
| 8 | // extraction functionality is implemented best-effort. |
| 9 | package reflection |
| 10 | |
| 11 | import ( |
| 12 | "context" |
| 13 | "database/sql" |
| 14 | "encoding/hex" |
| 15 | "fmt" |
| 16 | "sort" |
| 17 | "strings" |
| 18 | "time" |
| 19 | |
Serge Bazanski | 10b2154 | 2023-04-13 12:12:05 +0200 | [diff] [blame^] | 20 | "k8s.io/klog/v2" |
| 21 | |
Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 22 | "github.com/google/uuid" |
Serge Bazanski | 10b2154 | 2023-04-13 12:12:05 +0200 | [diff] [blame^] | 23 | "google.golang.org/protobuf/encoding/prototext" |
| 24 | "google.golang.org/protobuf/proto" |
Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 25 | ) |
| 26 | |
| 27 | // GetMachinesOpts influences the behaviour of GetMachines. |
| 28 | type GetMachinesOpts struct { |
| 29 | // FilterMachine, if set, will only retrieve information about the machine with |
| 30 | // the given UUID. In case the given machine UUID does not exist in the database, |
| 31 | // an empty result will be returned and _no_ error will be set. |
| 32 | FilterMachine *uuid.UUID |
| 33 | // Strict enables strict consistency. This is not recommended for use when |
| 34 | // retrieving all machines, as such queries will compete against all currently |
| 35 | // running operations. When not enabled, the retrieval will be executed AS OF |
| 36 | // SYSTEM TIME follower_timestamp(), meaning the data might be a few seconds out |
| 37 | // of date. Regardless of the option, the returned machine data will be |
| 38 | // internally consistent, even across machines - but when not enabled the data |
| 39 | // might be stale. |
| 40 | Strict bool |
| 41 | // ExpiredBackoffs enables the retrieval of information about all machine |
| 42 | // backoffs, including expired backoff. Note that expired backoffs might be |
| 43 | // garbage collected in the future, and their long-term storage is not |
| 44 | // guaranteed. |
| 45 | ExpiredBackoffs bool |
| 46 | } |
| 47 | |
| 48 | // GetMachines retrieves all available BMDB data about one or more machines. The |
| 49 | // Schema's embedded SQL connection is used to performed the retrieval. |
| 50 | // |
| 51 | // Options can be specified to influenced the exact operation performed. By |
| 52 | // default (with a zeroed structure or nil pointer), all machines with active |
| 53 | // backoffs are retrieved with weak consistency. See GetMachineOpts to influence |
| 54 | // this behaviour. |
| 55 | func (r *Schema) GetMachines(ctx context.Context, opts *GetMachinesOpts) (*Reflected[[]*Machine], error) { |
| 56 | if opts == nil { |
| 57 | opts = &GetMachinesOpts{} |
| 58 | } |
| 59 | |
| 60 | // We're about to build a pretty big SELECT query with a ton of joins. |
| 61 | // |
| 62 | // First, we join against work_backoff and work to get information about active |
| 63 | // work and backoffs on the machines we're retrieving. |
| 64 | // |
| 65 | // Second, we join against all the tags that are declared in the schema. |
| 66 | |
| 67 | // These are the colums we'll SELECT <...> FROM |
| 68 | columns := []string{ |
| 69 | "machines.machine_id", |
| 70 | "machines.machine_created_at", |
| 71 | "work_backoff.process", |
| 72 | "work_backoff.cause", |
| 73 | "work_backoff.until", |
| 74 | "work.process", |
| 75 | "work.session_id", |
| 76 | // ... tag columns will come after this. |
| 77 | } |
| 78 | // These are tha args we'll pass to the query. |
| 79 | var args []any |
| 80 | |
| 81 | // Start building joins. First, against work_backoff and work. |
| 82 | backoffFilter := " AND work_backoff.until > now()" |
| 83 | if opts.ExpiredBackoffs { |
| 84 | backoffFilter = "" |
| 85 | } |
| 86 | joins := []string{ |
| 87 | "LEFT JOIN work_backoff ON machines.machine_id = work_backoff.machine_id" + backoffFilter, |
| 88 | "LEFT JOIN work ON machines.machine_id = work.machine_id", |
| 89 | } |
| 90 | |
| 91 | // Then, against tags. Also populate columns as we go along. |
| 92 | for _, tagType := range r.TagTypes { |
| 93 | joins = append(joins, fmt.Sprintf("LEFT JOIN %s ON machines.machine_id = %s.machine_id", tagType.NativeName, tagType.NativeName)) |
| 94 | columns = append(columns, fmt.Sprintf("%s.machine_id", tagType.NativeName)) |
| 95 | for _, fieldType := range tagType.Fields { |
| 96 | columns = append(columns, fmt.Sprintf("%s.%s", tagType.NativeName, fieldType.NativeName)) |
| 97 | } |
| 98 | } |
| 99 | |
| 100 | // Finalize query. |
| 101 | q := []string{ |
| 102 | "SELECT", |
| 103 | strings.Join(columns, ", "), |
| 104 | "FROM machines", |
| 105 | } |
| 106 | q = append(q, joins...) |
| 107 | if !opts.Strict { |
| 108 | q = append(q, "AS OF SYSTEM TIME follower_read_timestamp()") |
| 109 | } |
| 110 | if opts.FilterMachine != nil { |
| 111 | q = append(q, "WHERE machines.machine_id = $1") |
| 112 | args = append(args, *opts.FilterMachine) |
| 113 | } |
| 114 | |
| 115 | rows, err := r.db.QueryContext(ctx, strings.Join(q, "\n"), args...) |
| 116 | if err != nil { |
| 117 | return nil, fmt.Errorf("query failed: %w", err) |
| 118 | } |
| 119 | defer rows.Close() |
| 120 | |
| 121 | // Okay, we can start scanning the result rows. |
| 122 | // |
| 123 | // As this is a complex join, we need to merge some rows together and discard |
| 124 | // some NULLs. We do merging/deduplication using machine_id values for the |
| 125 | // machine data, and abuse UNIQUE constraints in the work_backoff/work tables to |
| 126 | // deduplicate these. |
| 127 | // |
| 128 | // The alternative would be to rewrite this query to use array_agg, and we might |
| 129 | // do that at some point. This is only really a problem if we |
| 130 | // have _a lot_ of active work/backoffs (as that effectively duplicates all |
| 131 | // machine/tag data), which isn't the case yet. But we should keep an eye out for |
| 132 | // this. |
| 133 | |
| 134 | var machines []*Machine |
| 135 | for rows.Next() { |
| 136 | |
| 137 | // We need to scan this row back into columns. For constant columns we'll just |
| 138 | // create the data here and refer to it later. |
| 139 | var dests []any |
| 140 | |
| 141 | // Add non-tag always-retrieved constants. |
| 142 | var mid uuid.UUID |
| 143 | var machineCreated time.Time |
| 144 | var workSession uuid.NullUUID |
| 145 | var backoffProcess, backoffCause, workProcess sql.NullString |
| 146 | var backoffUntil sql.NullTime |
| 147 | |
| 148 | dests = append(dests, &mid, &machineCreated, &backoffProcess, &backoffCause, &backoffUntil, &workProcess, &workSession) |
| 149 | |
| 150 | // For dynamic data, we need to keep a reference to a list of columns that are |
| 151 | // part of tags, and then refer to them later. We can't just refer back to dests |
| 152 | // as the types are erased into `any`. scannedTags is that data storage. |
| 153 | type scannedTag struct { |
| 154 | ty *TagType |
| 155 | id uuid.NullUUID |
| 156 | fields []*TagField |
| 157 | } |
| 158 | var scannedTags []*scannedTag |
| 159 | for _, tagType := range r.TagTypes { |
| 160 | tagType := tagType |
| 161 | st := scannedTag{ |
| 162 | ty: &tagType, |
| 163 | } |
| 164 | scannedTags = append(scannedTags, &st) |
| 165 | dests = append(dests, &st.id) |
| 166 | for _, fieldType := range tagType.Fields { |
| 167 | fieldType := fieldType |
| 168 | field := TagField{ |
| 169 | Type: &fieldType, |
| 170 | } |
| 171 | dests = append(dests, &field) |
| 172 | st.fields = append(st.fields, &field) |
| 173 | |
| 174 | } |
| 175 | } |
| 176 | |
| 177 | if err := rows.Scan(dests...); err != nil { |
| 178 | return nil, fmt.Errorf("scan failed: %w", err) |
| 179 | } |
| 180 | |
| 181 | // Now comes the merging/deduplication. |
| 182 | |
| 183 | // First, check if we are processing a new machine. If so, create a new |
| 184 | // Machine. Otherwise, pick up the previous one. |
| 185 | var machine *Machine |
| 186 | if len(machines) == 0 || machines[len(machines)-1].ID.String() != mid.String() { |
| 187 | // New machine or no machine yet. |
| 188 | machine = &Machine{ |
| 189 | ID: mid, |
| 190 | Created: machineCreated, |
Serge Bazanski | 9e7875c | 2023-02-20 13:55:58 +0100 | [diff] [blame] | 191 | Tags: make(map[string]*Tag), |
Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 192 | Backoffs: make(map[string]Backoff), |
| 193 | Work: make(map[string]Work), |
| 194 | } |
| 195 | |
| 196 | // Collect tags into machine. |
| 197 | for _, st := range scannedTags { |
| 198 | if !st.id.Valid { |
| 199 | continue |
| 200 | } |
| 201 | var fields []TagField |
| 202 | for _, f := range st.fields { |
| 203 | fields = append(fields, *f) |
| 204 | } |
Serge Bazanski | 9e7875c | 2023-02-20 13:55:58 +0100 | [diff] [blame] | 205 | machine.Tags[st.ty.Name()] = &Tag{ |
Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 206 | Type: st.ty, |
| 207 | Fields: fields, |
| 208 | } |
| 209 | } |
| 210 | machines = append(machines, machine) |
| 211 | } else { |
| 212 | // Continue previous machine. |
| 213 | machine = machines[len(machines)-1] |
| 214 | } |
| 215 | |
| 216 | // Do we have a backoff? Upsert it to the machine. This works because there's a |
| 217 | // UNIQUE(machine_id, process) constraint on the work_backoff table, and we're |
| 218 | // effectively rebuilding that keyspace here by indexing first by machine then by |
| 219 | // process. |
| 220 | if backoffCause.Valid && backoffProcess.Valid && backoffUntil.Valid { |
| 221 | process := backoffProcess.String |
| 222 | machine.Backoffs[process] = Backoff{ |
| 223 | Cause: backoffCause.String, |
| 224 | Process: process, |
| 225 | Until: backoffUntil.Time, |
| 226 | } |
| 227 | } |
| 228 | |
| 229 | // Do we have an active work item? Upsert it to the machine. Same UNIQUE |
| 230 | // constraint abuse happening here. |
| 231 | if workProcess.Valid && workSession.Valid { |
| 232 | process := workProcess.String |
| 233 | machine.Work[process] = Work{ |
| 234 | SessionID: workSession.UUID, |
| 235 | Process: process, |
| 236 | } |
| 237 | } |
| 238 | } |
| 239 | |
| 240 | return &Reflected[[]*Machine]{ |
| 241 | Data: machines, |
| 242 | Query: strings.Join(q, " "), |
| 243 | }, nil |
| 244 | } |
| 245 | |
| 246 | // Reflected wraps data retrieved by reflection (T) with metadata about the |
| 247 | // retrieval. |
| 248 | type Reflected[T any] struct { |
| 249 | Data T |
| 250 | // Effective SQL query performed on the database. |
| 251 | Query string |
| 252 | } |
| 253 | |
| 254 | // Machine retrieved from BMDB. |
| 255 | type Machine struct { |
| 256 | ID uuid.UUID |
| 257 | Created time.Time |
| 258 | |
| 259 | // Tags on this machine, keyed by Tag type name (canonical, not native). |
Serge Bazanski | 9e7875c | 2023-02-20 13:55:58 +0100 | [diff] [blame] | 260 | Tags map[string]*Tag |
Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 261 | |
| 262 | // Backoffs on this machine, keyed by process name. By default these are only |
| 263 | // active backoffs, unless ExpiredBackoffs was set on GetMachineOptions. |
| 264 | Backoffs map[string]Backoff |
| 265 | |
| 266 | // Work active on this machine, keyed by process name. |
| 267 | Work map[string]Work |
| 268 | } |
| 269 | |
| 270 | // ActiveBackoffs retrieves a copy of a Machine's active backoffs. Note: the |
| 271 | // expiration check is performed according tu current system time, so it might |
| 272 | // not be consistent with the data snapshot retrieved from the database. |
| 273 | func (r *Machine) ActiveBackoffs() []*Backoff { |
| 274 | var res []*Backoff |
| 275 | for _, bo := range r.Backoffs { |
| 276 | bo := bo |
| 277 | if !bo.Active() { |
| 278 | continue |
| 279 | } |
| 280 | res = append(res, &bo) |
| 281 | } |
| 282 | sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process }) |
| 283 | return res |
| 284 | } |
| 285 | |
| 286 | // ExpiredBackoffs retrieves a copy of a Machine's expired backoffs. Note: the |
| 287 | // expiration check is performed according tu current system time, so it might |
| 288 | // not be consistent with the data snapshot retrieved from the database. |
| 289 | func (r *Machine) ExpiredBackoffs() []*Backoff { |
| 290 | var res []*Backoff |
| 291 | for _, bo := range r.Backoffs { |
| 292 | bo := bo |
| 293 | if bo.Active() { |
| 294 | continue |
| 295 | } |
| 296 | res = append(res, &bo) |
| 297 | } |
| 298 | sort.Slice(res, func(i, j int) bool { return res[i].Process < res[j].Process }) |
| 299 | return res |
| 300 | } |
| 301 | |
| 302 | // Tag value set on a Machine. |
| 303 | type Tag struct { |
| 304 | // Type describing this tag. |
| 305 | Type *TagType |
| 306 | // Field data contained in this tag, sorted alphabetically by name. |
| 307 | Fields []TagField |
| 308 | } |
| 309 | |
| 310 | // Field is a shorthand for returning a TagField by its name. |
| 311 | func (r *Tag) Field(name string) *TagField { |
| 312 | for _, f := range r.Fields { |
| 313 | if f.Type.NativeName == name { |
| 314 | return &f |
| 315 | } |
| 316 | } |
| 317 | return nil |
| 318 | } |
| 319 | |
| 320 | // TagField value which is part of a Tag set on a Machine. |
| 321 | type TagField struct { |
| 322 | // Type describing this field. |
| 323 | Type *TagFieldType |
| 324 | |
| 325 | text *string |
| 326 | bytes *[]byte |
| 327 | time *time.Time |
Serge Bazanski | 10b2154 | 2023-04-13 12:12:05 +0200 | [diff] [blame^] | 328 | proto proto.Message |
Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 329 | } |
| 330 | |
| 331 | // HumanValue returns a human-readable (best effort) representation of the field |
| 332 | // value. |
| 333 | func (r *TagField) HumanValue() string { |
| 334 | switch { |
Serge Bazanski | 10b2154 | 2023-04-13 12:12:05 +0200 | [diff] [blame^] | 335 | case r.proto != nil: |
| 336 | opts := prototext.MarshalOptions{ |
| 337 | Multiline: true, |
| 338 | Indent: "\t", |
| 339 | } |
| 340 | return opts.Format(r.proto) |
Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 341 | case r.text != nil: |
| 342 | return *r.text |
| 343 | case r.bytes != nil: |
| 344 | return hex.EncodeToString(*r.bytes) |
| 345 | case r.time != nil: |
| 346 | return r.time.String() |
| 347 | default: |
| 348 | return "<unknown>" |
| 349 | } |
| 350 | } |
| 351 | |
| 352 | // Backoff on a Machine. |
| 353 | type Backoff struct { |
| 354 | // Process which established Backoff. |
| 355 | Process string |
| 356 | // Time when Backoff expires. |
| 357 | Until time.Time |
| 358 | // Cause for the Backoff as emitted by worker. |
| 359 | Cause string |
| 360 | } |
| 361 | |
| 362 | // Active returns whether this Backoff is _currently_ active per the _local_ time. |
| 363 | func (r Backoff) Active() bool { |
| 364 | return time.Now().Before(r.Until) |
| 365 | } |
| 366 | |
| 367 | // Work being actively performed on a Machine. |
| 368 | type Work struct { |
| 369 | // SessionID of the worker performing this Work. |
| 370 | SessionID uuid.UUID |
| 371 | // Process name of this Work. |
| 372 | Process string |
| 373 | } |
| 374 | |
| 375 | // Scan implements sql.Scanner for direct scanning of query results into a |
| 376 | // reflected tag value. This method is not meant to by used outside the |
| 377 | // reflection package. |
| 378 | func (r *TagField) Scan(src any) error { |
| 379 | if src == nil { |
| 380 | return nil |
| 381 | } |
| 382 | |
| 383 | switch r.Type.NativeType { |
| 384 | case "text": |
| 385 | src2, ok := src.(string) |
| 386 | if !ok { |
| 387 | return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) |
| 388 | } |
| 389 | r.text = &src2 |
| 390 | case "bytea": |
| 391 | src2, ok := src.([]byte) |
| 392 | if !ok { |
| 393 | return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) |
| 394 | } |
Serge Bazanski | 5cd7ddf | 2023-04-13 12:09:53 +0200 | [diff] [blame] | 395 | // Copy the bytes, as they are otherwise going to be reused by the pq library. |
| 396 | copied := make([]byte, len(src2)) |
| 397 | copy(copied[:], src2) |
| 398 | r.bytes = &copied |
Serge Bazanski | 10b2154 | 2023-04-13 12:12:05 +0200 | [diff] [blame^] | 399 | |
| 400 | if r.Type.ProtoType != nil { |
| 401 | msg := r.Type.ProtoType.New().Interface() |
| 402 | err := proto.Unmarshal(*r.bytes, msg) |
| 403 | if err != nil { |
| 404 | klog.Warningf("Could not unmarshal %s: %v", r.Type.NativeName, err) |
| 405 | } else { |
| 406 | r.proto = msg |
| 407 | } |
| 408 | } |
Serge Bazanski | 424e201 | 2023-02-15 23:31:49 +0100 | [diff] [blame] | 409 | case "USER-DEFINED": |
| 410 | switch r.Type.NativeUDTName { |
| 411 | case "provider": |
| 412 | src2, ok := src.([]byte) |
| 413 | if !ok { |
| 414 | return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) |
| 415 | } |
| 416 | src3 := string(src2) |
| 417 | r.text = &src3 |
| 418 | } |
| 419 | case "timestamp with time zone": |
| 420 | src2, ok := src.(time.Time) |
| 421 | if !ok { |
| 422 | return fmt.Errorf("SQL type %q, but got %+v", r.Type.NativeType, src) |
| 423 | } |
| 424 | r.time = &src2 |
| 425 | default: |
| 426 | return fmt.Errorf("unimplemented SQL type %q", r.Type.NativeType) |
| 427 | } |
| 428 | |
| 429 | return nil |
| 430 | } |