| // unraw implements a facility to convert raw logs from external sources into | 
 | // leveled logs. | 
 | // | 
 | // This is not the same as raw logging inside the logtree, which exists to | 
 | // ingest logs that are either fully arbitrary or do not map cleanly to the | 
 | // leveled logging concept. The unraw library is instead made to parse logs | 
 | // from systems that also use leveled logs internally, but emit them to a | 
 | // serialized byte stream that then needs to be turned back into something | 
 | // leveled inside metropolis. | 
 | // | 
 | // Logs converted this way are unfortunately lossy and do not come with the | 
 | // same guarantees as logs directly emitted via logtree. For example, there's | 
 | // no built-in protection against systems emiting fudged timestamps or file | 
 | // locations. Thus, this functionality should be used to interact with trusted | 
 | // systems, not fully arbitrary logs. | 
 | package unraw | 
 |  | 
 | import ( | 
 | 	"context" | 
 | 	"fmt" | 
 | 	"io" | 
 | 	"os" | 
 | 	"sync" | 
 | 	"syscall" | 
 | 	"time" | 
 |  | 
 | 	"source.monogon.dev/metropolis/pkg/logbuffer" | 
 | 	"source.monogon.dev/metropolis/pkg/logtree" | 
 | 	"source.monogon.dev/metropolis/pkg/supervisor" | 
 | ) | 
 |  | 
 | // Parser is a user-defined function for converting a log line received from an | 
 | // external system into a leveled logging payload. | 
 | // The given LeveledWriter should be called for every leveled log entry that | 
 | // results from this line. This means that a parser might skip some lines, or | 
 | // emit multiple leveled payloads per line. | 
 | type Parser func(*logbuffer.Line, LeveledWriter) | 
 |  | 
 | // Converter is the main entrypoint of the unraw library. It wraps a | 
 | // LeveledLogger in combination with a Parser to create an io.Writer that can | 
 | // be sent raw log data. | 
 | type Converter struct { | 
 | 	// Parser is the user-defined parsing function for converting log lines | 
 | 	// into leveled logging payloads. This must be set. | 
 | 	Parser Parser | 
 | 	// MaximumLineLength is the maximum length of a single log line when | 
 | 	// splitting incoming writes into lines. If a line is longer than this, it | 
 | 	// will be truncated (and will be sent to the Parser regardless). | 
 | 	// | 
 | 	// If not set, this defaults to 1024 bytes. | 
 | 	MaximumLineLength int | 
 | 	// LeveledLogger is the logtree leveled logger into which events from the | 
 | 	// Parser will be sent. | 
 | 	LeveledLogger logtree.LeveledLogger | 
 |  | 
 | 	// mu guards lb. | 
 | 	mu sync.Mutex | 
 | 	// lb is the underlying line buffer used to split incoming data into lines. | 
 | 	// It will be initialized on first Write. | 
 | 	lb *logbuffer.LineBuffer | 
 | } | 
 |  | 
 | // LeveledWriter is called by a Parser for every ExternelLeveledPayload it | 
 | // wishes to emit into a backing LeveledLogger. If the payload is missing some | 
 | // fields, these will default to some sensible values - see the | 
 | // ExternalLeveledPayload structure definition for more information. | 
 | type LeveledWriter func(*logtree.ExternalLeveledPayload) | 
 |  | 
 | // Write implements io.Writer. Any write performed into the Converter will | 
 | // populate the converter's internal buffer, and any time that buffer contains | 
 | // a full line it will be sent over to the Parser for processing. | 
 | func (e *Converter) Write(p []byte) (int, error) { | 
 | 	e.mu.Lock() | 
 | 	defer e.mu.Unlock() | 
 |  | 
 | 	if e.MaximumLineLength <= 0 { | 
 | 		e.MaximumLineLength = 1024 | 
 | 	} | 
 | 	if e.lb == nil { | 
 | 		e.lb = logbuffer.NewLineBuffer(e.MaximumLineLength, func(l *logbuffer.Line) { | 
 | 			e.Parser(l, e.insert) | 
 | 		}) | 
 | 	} | 
 | 	return e.lb.Write(p) | 
 | } | 
 |  | 
 | // insert implements LeveledWriter. | 
 | func (e *Converter) insert(d *logtree.ExternalLeveledPayload) { | 
 | 	if err := logtree.LogExternalLeveled(e.LeveledLogger, d); err != nil { | 
 | 		e.LeveledLogger.Fatal("Could not insert unrawed entry: %v", err) | 
 | 	} | 
 | } | 
 |  | 
 | // NamedPipeReader returns a supervisor runnable that continously reads logs | 
 | // from the given path and attempts to parse them into leveled logs using this | 
 | // Converter. | 
 | // | 
 | // If the given path doesn't exist, a named pipe will be created there before | 
 | // the function exits. This guarantee means that as long as any writing process | 
 | // is not started before NamedPipeReader returns ther is no need to | 
 | // remove/recreate the named pipe. | 
 | // | 
 | // TODO(q3k): defer the creation of the FIFO to localstorage so this doesn't | 
 | // need to be taken care of in the first place. | 
 | func (e *Converter) NamedPipeReader(path string) (supervisor.Runnable, error) { | 
 | 	if _, err := os.Stat(path); os.IsNotExist(err) { | 
 | 		if err := syscall.Mkfifo(path, 0666); err != nil { | 
 | 			return nil, fmt.Errorf("when creating named pipe: %w", err) | 
 | 		} | 
 | 	} | 
 | 	return func(ctx context.Context) error { | 
 | 		fifo, err := os.OpenFile(path, os.O_RDONLY, os.ModeNamedPipe) | 
 | 		if err != nil { | 
 | 			return fmt.Errorf("when opening named pipe: %w", err) | 
 | 		} | 
 | 		go func() { | 
 | 			<-ctx.Done() | 
 | 			fifo.Close() | 
 | 		}() | 
 | 		defer fifo.Close() | 
 | 		supervisor.Signal(ctx, supervisor.SignalHealthy) | 
 | 		for { | 
 | 			// Quit if requested. | 
 | 			if ctx.Err() != nil { | 
 | 				return ctx.Err() | 
 | 			} | 
 |  | 
 | 			n, err := io.Copy(e, fifo) | 
 | 			if n == 0 && err == nil { | 
 | 				// Hack because pipes/FIFOs can return zero reads when nobody | 
 | 				// is writing. To avoid busy-looping, sleep a bit before | 
 | 				// retrying. This does not loose data since the FIFO internal | 
 | 				// buffer will stall writes when it becomes full. 10ms maximum | 
 | 				// stall in a non-latency critical process (reading debug logs) | 
 | 				// is not an issue for us. | 
 | 				time.Sleep(10 * time.Millisecond) | 
 | 			} else if err != nil { | 
 | 				// Since we close fifo on context cancel, we'll get a 'file is already closed' | 
 | 				// io error here. Translate that over to the context error that caused it. | 
 | 				if ctx.Err() != nil { | 
 | 					return ctx.Err() | 
 | 				} | 
 | 				return fmt.Errorf("log pump failed: %w", err) | 
 | 			} | 
 |  | 
 | 		} | 
 | 	}, nil | 
 | } |