Serge Bazanski | ebe0259 | 2021-07-07 14:23:26 +0200 | [diff] [blame] | 1 | // unraw implements a facility to convert raw logs from external sources into |
| 2 | // leveled logs. |
| 3 | // |
| 4 | // This is not the same as raw logging inside the logtree, which exists to |
| 5 | // ingest logs that are either fully arbitrary or do not map cleanly to the |
| 6 | // leveled logging concept. The unraw library is instead made to parse logs |
| 7 | // from systems that also use leveled logs internally, but emit them to a |
| 8 | // serialized byte stream that then needs to be turned back into something |
| 9 | // leveled inside metropolis. |
| 10 | // |
| 11 | // Logs converted this way are unfortunately lossy and do not come with the |
| 12 | // same guarantees as logs directly emitted via logtree. For example, there's |
| 13 | // no built-in protection against systems emiting fudged timestamps or file |
| 14 | // locations. Thus, this functionality should be used to interact with trusted |
| 15 | // systems, not fully arbitrary logs. |
| 16 | package unraw |
| 17 | |
| 18 | import ( |
| 19 | "context" |
| 20 | "fmt" |
| 21 | "io" |
| 22 | "os" |
| 23 | "sync" |
| 24 | "syscall" |
| 25 | "time" |
| 26 | |
| 27 | "source.monogon.dev/metropolis/pkg/logbuffer" |
| 28 | "source.monogon.dev/metropolis/pkg/logtree" |
| 29 | "source.monogon.dev/metropolis/pkg/supervisor" |
| 30 | ) |
| 31 | |
| 32 | // Parser is a user-defined function for converting a log line received from an |
| 33 | // external system into a leveled logging payload. |
| 34 | // The given LeveledWriter should be called for every leveled log entry that |
| 35 | // results from this line. This means that a parser might skip some lines, or |
| 36 | // emit multiple leveled payloads per line. |
| 37 | type Parser func(*logbuffer.Line, LeveledWriter) |
| 38 | |
| 39 | // Converter is the main entrypoint of the unraw library. It wraps a |
| 40 | // LeveledLogger in combination with a Parser to create an io.Writer that can |
| 41 | // be sent raw log data. |
| 42 | type Converter struct { |
| 43 | // Parser is the user-defined parsing function for converting log lines |
| 44 | // into leveled logging payloads. This must be set. |
| 45 | Parser Parser |
| 46 | // MaximumLineLength is the maximum length of a single log line when |
| 47 | // splitting incoming writes into lines. If a line is longer than this, it |
| 48 | // will be truncated (and will be sent to the Parser regardless). |
| 49 | // |
| 50 | // If not set, this defaults to 1024 bytes. |
| 51 | MaximumLineLength int |
| 52 | // LeveledLogger is the logtree leveled logger into which events from the |
| 53 | // Parser will be sent. |
| 54 | LeveledLogger logtree.LeveledLogger |
| 55 | |
| 56 | // mu guards lb. |
| 57 | mu sync.Mutex |
| 58 | // lb is the underlying line buffer used to split incoming data into lines. |
| 59 | // It will be initialized on first Write. |
| 60 | lb *logbuffer.LineBuffer |
| 61 | } |
| 62 | |
| 63 | // LeveledWriter is called by a Parser for every ExternelLeveledPayload it |
| 64 | // wishes to emit into a backing LeveledLogger. If the payload is missing some |
| 65 | // fields, these will default to some sensible values - see the |
| 66 | // ExternalLeveledPayload structure definition for more information. |
| 67 | type LeveledWriter func(*logtree.ExternalLeveledPayload) |
| 68 | |
| 69 | // Write implements io.Writer. Any write performed into the Converter will |
| 70 | // populate the converter's internal buffer, and any time that buffer contains |
| 71 | // a full line it will be sent over to the Parser for processing. |
| 72 | func (e *Converter) Write(p []byte) (int, error) { |
| 73 | e.mu.Lock() |
| 74 | defer e.mu.Unlock() |
| 75 | |
| 76 | if e.MaximumLineLength <= 0 { |
| 77 | e.MaximumLineLength = 1024 |
| 78 | } |
| 79 | if e.lb == nil { |
| 80 | e.lb = logbuffer.NewLineBuffer(e.MaximumLineLength, func(l *logbuffer.Line) { |
| 81 | e.Parser(l, e.insert) |
| 82 | }) |
| 83 | } |
| 84 | return e.lb.Write(p) |
| 85 | } |
| 86 | |
| 87 | // insert implements LeveledWriter. |
| 88 | func (e *Converter) insert(d *logtree.ExternalLeveledPayload) { |
| 89 | if err := logtree.LogExternalLeveled(e.LeveledLogger, d); err != nil { |
| 90 | e.LeveledLogger.Fatal("Could not insert unrawed entry: %v", err) |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | // NamedPipeReader returns a supervisor runnable that continously reads logs |
| 95 | // from the given path and attempts to parse them into leveled logs using this |
| 96 | // Converter. |
| 97 | // |
| 98 | // If the given path doesn't exist, a named pipe will be created there before |
| 99 | // the function exits. This guarantee means that as long as any writing process |
| 100 | // is not started before NamedPipeReader returns ther is no need to |
| 101 | // remove/recreate the named pipe. |
| 102 | // |
| 103 | // TODO(q3k): defer the creation of the FIFO to localstorage so this doesn't |
| 104 | // need to be taken care of in the first place. |
| 105 | func (e *Converter) NamedPipeReader(path string) (supervisor.Runnable, error) { |
| 106 | if _, err := os.Stat(path); os.IsNotExist(err) { |
| 107 | if err := syscall.Mkfifo(path, 0666); err != nil { |
| 108 | return nil, fmt.Errorf("when creating named pipe: %w", err) |
| 109 | } |
| 110 | } |
| 111 | return func(ctx context.Context) error { |
| 112 | fifo, err := os.OpenFile(path, os.O_RDONLY, os.ModeNamedPipe) |
| 113 | if err != nil { |
| 114 | return fmt.Errorf("when opening named pipe: %w", err) |
| 115 | } |
| 116 | supervisor.Signal(ctx, supervisor.SignalHealthy) |
| 117 | for { |
| 118 | // Quit if requested. |
| 119 | if ctx.Err() != nil { |
| 120 | return ctx.Err() |
| 121 | } |
| 122 | |
| 123 | n, err := io.Copy(e, fifo) |
| 124 | if n == 0 && err == nil { |
| 125 | // Hack because pipes/FIFOs can return zero reads when nobody |
| 126 | // is writing. To avoid busy-looping, sleep a bit before |
| 127 | // retrying. This does not loose data since the FIFO internal |
| 128 | // buffer will stall writes when it becomes full. 10ms maximum |
| 129 | // stall in a non-latency critical process (reading debug logs) |
| 130 | // is not an issue for us. |
| 131 | time.Sleep(10 * time.Millisecond) |
| 132 | } else if err != nil { |
| 133 | return fmt.Errorf("log pump failed: %v", err) |
| 134 | } |
| 135 | |
| 136 | } |
| 137 | }, nil |
| 138 | } |