| Tim Windelschmidt | c2290c2 | 2024-08-15 19:56:00 +0200 | [diff] [blame] | 1 | // Package unraw implements a facility to convert raw logs from external sources |
| 2 | // into leveled logs. |
| Serge Bazanski | ebe0259 | 2021-07-07 14:23:26 +0200 | [diff] [blame] | 3 | // |
| 4 | // This is not the same as raw logging inside the logtree, which exists to |
| 5 | // ingest logs that are either fully arbitrary or do not map cleanly to the |
| 6 | // leveled logging concept. The unraw library is instead made to parse logs |
| 7 | // from systems that also use leveled logs internally, but emit them to a |
| 8 | // serialized byte stream that then needs to be turned back into something |
| 9 | // leveled inside metropolis. |
| 10 | // |
| 11 | // Logs converted this way are unfortunately lossy and do not come with the |
| 12 | // same guarantees as logs directly emitted via logtree. For example, there's |
| 13 | // no built-in protection against systems emiting fudged timestamps or file |
| 14 | // locations. Thus, this functionality should be used to interact with trusted |
| 15 | // systems, not fully arbitrary logs. |
| 16 | package unraw |
| 17 | |
| 18 | import ( |
| 19 | "context" |
| 20 | "fmt" |
| 21 | "io" |
| 22 | "os" |
| 23 | "sync" |
| 24 | "syscall" |
| 25 | "time" |
| 26 | |
| Serge Bazanski | 3c5d063 | 2024-09-12 10:49:12 +0000 | [diff] [blame] | 27 | "source.monogon.dev/go/logging" |
| Tim Windelschmidt | 9f21f53 | 2024-05-07 15:14:20 +0200 | [diff] [blame] | 28 | "source.monogon.dev/osbase/logbuffer" |
| 29 | "source.monogon.dev/osbase/logtree" |
| 30 | "source.monogon.dev/osbase/supervisor" |
| Serge Bazanski | ebe0259 | 2021-07-07 14:23:26 +0200 | [diff] [blame] | 31 | ) |
| 32 | |
| 33 | // Parser is a user-defined function for converting a log line received from an |
| 34 | // external system into a leveled logging payload. |
| 35 | // The given LeveledWriter should be called for every leveled log entry that |
| 36 | // results from this line. This means that a parser might skip some lines, or |
| 37 | // emit multiple leveled payloads per line. |
| 38 | type Parser func(*logbuffer.Line, LeveledWriter) |
| 39 | |
| 40 | // Converter is the main entrypoint of the unraw library. It wraps a |
| 41 | // LeveledLogger in combination with a Parser to create an io.Writer that can |
| 42 | // be sent raw log data. |
| 43 | type Converter struct { |
| 44 | // Parser is the user-defined parsing function for converting log lines |
| 45 | // into leveled logging payloads. This must be set. |
| 46 | Parser Parser |
| 47 | // MaximumLineLength is the maximum length of a single log line when |
| 48 | // splitting incoming writes into lines. If a line is longer than this, it |
| 49 | // will be truncated (and will be sent to the Parser regardless). |
| 50 | // |
| 51 | // If not set, this defaults to 1024 bytes. |
| 52 | MaximumLineLength int |
| 53 | // LeveledLogger is the logtree leveled logger into which events from the |
| 54 | // Parser will be sent. |
| Serge Bazanski | 3c5d063 | 2024-09-12 10:49:12 +0000 | [diff] [blame] | 55 | LeveledLogger logging.Leveled |
| Serge Bazanski | ebe0259 | 2021-07-07 14:23:26 +0200 | [diff] [blame] | 56 | |
| 57 | // mu guards lb. |
| 58 | mu sync.Mutex |
| 59 | // lb is the underlying line buffer used to split incoming data into lines. |
| 60 | // It will be initialized on first Write. |
| 61 | lb *logbuffer.LineBuffer |
| 62 | } |
| 63 | |
| 64 | // LeveledWriter is called by a Parser for every ExternelLeveledPayload it |
| 65 | // wishes to emit into a backing LeveledLogger. If the payload is missing some |
| 66 | // fields, these will default to some sensible values - see the |
| 67 | // ExternalLeveledPayload structure definition for more information. |
| 68 | type LeveledWriter func(*logtree.ExternalLeveledPayload) |
| 69 | |
| 70 | // Write implements io.Writer. Any write performed into the Converter will |
| 71 | // populate the converter's internal buffer, and any time that buffer contains |
| 72 | // a full line it will be sent over to the Parser for processing. |
| 73 | func (e *Converter) Write(p []byte) (int, error) { |
| 74 | e.mu.Lock() |
| 75 | defer e.mu.Unlock() |
| 76 | |
| 77 | if e.MaximumLineLength <= 0 { |
| 78 | e.MaximumLineLength = 1024 |
| 79 | } |
| 80 | if e.lb == nil { |
| 81 | e.lb = logbuffer.NewLineBuffer(e.MaximumLineLength, func(l *logbuffer.Line) { |
| 82 | e.Parser(l, e.insert) |
| 83 | }) |
| 84 | } |
| 85 | return e.lb.Write(p) |
| 86 | } |
| 87 | |
| 88 | // insert implements LeveledWriter. |
| 89 | func (e *Converter) insert(d *logtree.ExternalLeveledPayload) { |
| 90 | if err := logtree.LogExternalLeveled(e.LeveledLogger, d); err != nil { |
| 91 | e.LeveledLogger.Fatal("Could not insert unrawed entry: %v", err) |
| 92 | } |
| 93 | } |
| 94 | |
| 95 | // NamedPipeReader returns a supervisor runnable that continously reads logs |
| 96 | // from the given path and attempts to parse them into leveled logs using this |
| 97 | // Converter. |
| 98 | // |
| 99 | // If the given path doesn't exist, a named pipe will be created there before |
| 100 | // the function exits. This guarantee means that as long as any writing process |
| 101 | // is not started before NamedPipeReader returns ther is no need to |
| 102 | // remove/recreate the named pipe. |
| 103 | // |
| 104 | // TODO(q3k): defer the creation of the FIFO to localstorage so this doesn't |
| 105 | // need to be taken care of in the first place. |
| 106 | func (e *Converter) NamedPipeReader(path string) (supervisor.Runnable, error) { |
| 107 | if _, err := os.Stat(path); os.IsNotExist(err) { |
| 108 | if err := syscall.Mkfifo(path, 0666); err != nil { |
| 109 | return nil, fmt.Errorf("when creating named pipe: %w", err) |
| 110 | } |
| 111 | } |
| 112 | return func(ctx context.Context) error { |
| 113 | fifo, err := os.OpenFile(path, os.O_RDONLY, os.ModeNamedPipe) |
| 114 | if err != nil { |
| 115 | return fmt.Errorf("when opening named pipe: %w", err) |
| 116 | } |
| Serge Bazanski | 1fd64a2 | 2021-10-29 16:59:40 +0200 | [diff] [blame] | 117 | go func() { |
| 118 | <-ctx.Done() |
| 119 | fifo.Close() |
| 120 | }() |
| Serge Bazanski | 826a9e9 | 2021-10-05 21:23:48 +0200 | [diff] [blame] | 121 | defer fifo.Close() |
| Serge Bazanski | ebe0259 | 2021-07-07 14:23:26 +0200 | [diff] [blame] | 122 | supervisor.Signal(ctx, supervisor.SignalHealthy) |
| 123 | for { |
| 124 | // Quit if requested. |
| 125 | if ctx.Err() != nil { |
| 126 | return ctx.Err() |
| 127 | } |
| 128 | |
| 129 | n, err := io.Copy(e, fifo) |
| 130 | if n == 0 && err == nil { |
| 131 | // Hack because pipes/FIFOs can return zero reads when nobody |
| 132 | // is writing. To avoid busy-looping, sleep a bit before |
| 133 | // retrying. This does not loose data since the FIFO internal |
| 134 | // buffer will stall writes when it becomes full. 10ms maximum |
| 135 | // stall in a non-latency critical process (reading debug logs) |
| 136 | // is not an issue for us. |
| 137 | time.Sleep(10 * time.Millisecond) |
| 138 | } else if err != nil { |
| Serge Bazanski | 1fd64a2 | 2021-10-29 16:59:40 +0200 | [diff] [blame] | 139 | // Since we close fifo on context cancel, we'll get a 'file is already closed' |
| 140 | // io error here. Translate that over to the context error that caused it. |
| 141 | if ctx.Err() != nil { |
| 142 | return ctx.Err() |
| 143 | } |
| 144 | return fmt.Errorf("log pump failed: %w", err) |
| Serge Bazanski | ebe0259 | 2021-07-07 14:23:26 +0200 | [diff] [blame] | 145 | } |
| 146 | |
| 147 | } |
| 148 | }, nil |
| 149 | } |