| // unraw implements a facility to convert raw logs from external sources into |
| // leveled logs. |
| // |
| // This is not the same as raw logging inside the logtree, which exists to |
| // ingest logs that are either fully arbitrary or do not map cleanly to the |
| // leveled logging concept. The unraw library is instead made to parse logs |
| // from systems that also use leveled logs internally, but emit them to a |
| // serialized byte stream that then needs to be turned back into something |
| // leveled inside metropolis. |
| // |
| // Logs converted this way are unfortunately lossy and do not come with the |
| // same guarantees as logs directly emitted via logtree. For example, there's |
| // no built-in protection against systems emiting fudged timestamps or file |
| // locations. Thus, this functionality should be used to interact with trusted |
| // systems, not fully arbitrary logs. |
| package unraw |
| |
| import ( |
| "context" |
| "fmt" |
| "io" |
| "os" |
| "sync" |
| "syscall" |
| "time" |
| |
| "source.monogon.dev/metropolis/pkg/logbuffer" |
| "source.monogon.dev/metropolis/pkg/logtree" |
| "source.monogon.dev/metropolis/pkg/supervisor" |
| ) |
| |
| // Parser is a user-defined function for converting a log line received from an |
| // external system into a leveled logging payload. |
| // The given LeveledWriter should be called for every leveled log entry that |
| // results from this line. This means that a parser might skip some lines, or |
| // emit multiple leveled payloads per line. |
| type Parser func(*logbuffer.Line, LeveledWriter) |
| |
| // Converter is the main entrypoint of the unraw library. It wraps a |
| // LeveledLogger in combination with a Parser to create an io.Writer that can |
| // be sent raw log data. |
| type Converter struct { |
| // Parser is the user-defined parsing function for converting log lines |
| // into leveled logging payloads. This must be set. |
| Parser Parser |
| // MaximumLineLength is the maximum length of a single log line when |
| // splitting incoming writes into lines. If a line is longer than this, it |
| // will be truncated (and will be sent to the Parser regardless). |
| // |
| // If not set, this defaults to 1024 bytes. |
| MaximumLineLength int |
| // LeveledLogger is the logtree leveled logger into which events from the |
| // Parser will be sent. |
| LeveledLogger logtree.LeveledLogger |
| |
| // mu guards lb. |
| mu sync.Mutex |
| // lb is the underlying line buffer used to split incoming data into lines. |
| // It will be initialized on first Write. |
| lb *logbuffer.LineBuffer |
| } |
| |
| // LeveledWriter is called by a Parser for every ExternelLeveledPayload it |
| // wishes to emit into a backing LeveledLogger. If the payload is missing some |
| // fields, these will default to some sensible values - see the |
| // ExternalLeveledPayload structure definition for more information. |
| type LeveledWriter func(*logtree.ExternalLeveledPayload) |
| |
| // Write implements io.Writer. Any write performed into the Converter will |
| // populate the converter's internal buffer, and any time that buffer contains |
| // a full line it will be sent over to the Parser for processing. |
| func (e *Converter) Write(p []byte) (int, error) { |
| e.mu.Lock() |
| defer e.mu.Unlock() |
| |
| if e.MaximumLineLength <= 0 { |
| e.MaximumLineLength = 1024 |
| } |
| if e.lb == nil { |
| e.lb = logbuffer.NewLineBuffer(e.MaximumLineLength, func(l *logbuffer.Line) { |
| e.Parser(l, e.insert) |
| }) |
| } |
| return e.lb.Write(p) |
| } |
| |
| // insert implements LeveledWriter. |
| func (e *Converter) insert(d *logtree.ExternalLeveledPayload) { |
| if err := logtree.LogExternalLeveled(e.LeveledLogger, d); err != nil { |
| e.LeveledLogger.Fatal("Could not insert unrawed entry: %v", err) |
| } |
| } |
| |
| // NamedPipeReader returns a supervisor runnable that continously reads logs |
| // from the given path and attempts to parse them into leveled logs using this |
| // Converter. |
| // |
| // If the given path doesn't exist, a named pipe will be created there before |
| // the function exits. This guarantee means that as long as any writing process |
| // is not started before NamedPipeReader returns ther is no need to |
| // remove/recreate the named pipe. |
| // |
| // TODO(q3k): defer the creation of the FIFO to localstorage so this doesn't |
| // need to be taken care of in the first place. |
| func (e *Converter) NamedPipeReader(path string) (supervisor.Runnable, error) { |
| if _, err := os.Stat(path); os.IsNotExist(err) { |
| if err := syscall.Mkfifo(path, 0666); err != nil { |
| return nil, fmt.Errorf("when creating named pipe: %w", err) |
| } |
| } |
| return func(ctx context.Context) error { |
| fifo, err := os.OpenFile(path, os.O_RDONLY, os.ModeNamedPipe) |
| if err != nil { |
| return fmt.Errorf("when opening named pipe: %w", err) |
| } |
| supervisor.Signal(ctx, supervisor.SignalHealthy) |
| for { |
| // Quit if requested. |
| if ctx.Err() != nil { |
| return ctx.Err() |
| } |
| |
| n, err := io.Copy(e, fifo) |
| if n == 0 && err == nil { |
| // Hack because pipes/FIFOs can return zero reads when nobody |
| // is writing. To avoid busy-looping, sleep a bit before |
| // retrying. This does not loose data since the FIFO internal |
| // buffer will stall writes when it becomes full. 10ms maximum |
| // stall in a non-latency critical process (reading debug logs) |
| // is not an issue for us. |
| time.Sleep(10 * time.Millisecond) |
| } else if err != nil { |
| return fmt.Errorf("log pump failed: %v", err) |
| } |
| |
| } |
| }, nil |
| } |