blob: 807d79b583e1b8fedfcdc14346a545aed57c5d47 [file] [log] [blame]
Serge Bazanskiebe02592021-07-07 14:23:26 +02001// unraw implements a facility to convert raw logs from external sources into
2// leveled logs.
3//
4// This is not the same as raw logging inside the logtree, which exists to
5// ingest logs that are either fully arbitrary or do not map cleanly to the
6// leveled logging concept. The unraw library is instead made to parse logs
7// from systems that also use leveled logs internally, but emit them to a
8// serialized byte stream that then needs to be turned back into something
9// leveled inside metropolis.
10//
11// Logs converted this way are unfortunately lossy and do not come with the
12// same guarantees as logs directly emitted via logtree. For example, there's
13// no built-in protection against systems emiting fudged timestamps or file
14// locations. Thus, this functionality should be used to interact with trusted
15// systems, not fully arbitrary logs.
16package unraw
17
18import (
19 "context"
20 "fmt"
21 "io"
22 "os"
23 "sync"
24 "syscall"
25 "time"
26
27 "source.monogon.dev/metropolis/pkg/logbuffer"
28 "source.monogon.dev/metropolis/pkg/logtree"
29 "source.monogon.dev/metropolis/pkg/supervisor"
30)
31
32// Parser is a user-defined function for converting a log line received from an
33// external system into a leveled logging payload.
34// The given LeveledWriter should be called for every leveled log entry that
35// results from this line. This means that a parser might skip some lines, or
36// emit multiple leveled payloads per line.
37type Parser func(*logbuffer.Line, LeveledWriter)
38
39// Converter is the main entrypoint of the unraw library. It wraps a
40// LeveledLogger in combination with a Parser to create an io.Writer that can
41// be sent raw log data.
42type Converter struct {
43 // Parser is the user-defined parsing function for converting log lines
44 // into leveled logging payloads. This must be set.
45 Parser Parser
46 // MaximumLineLength is the maximum length of a single log line when
47 // splitting incoming writes into lines. If a line is longer than this, it
48 // will be truncated (and will be sent to the Parser regardless).
49 //
50 // If not set, this defaults to 1024 bytes.
51 MaximumLineLength int
52 // LeveledLogger is the logtree leveled logger into which events from the
53 // Parser will be sent.
54 LeveledLogger logtree.LeveledLogger
55
56 // mu guards lb.
57 mu sync.Mutex
58 // lb is the underlying line buffer used to split incoming data into lines.
59 // It will be initialized on first Write.
60 lb *logbuffer.LineBuffer
61}
62
63// LeveledWriter is called by a Parser for every ExternelLeveledPayload it
64// wishes to emit into a backing LeveledLogger. If the payload is missing some
65// fields, these will default to some sensible values - see the
66// ExternalLeveledPayload structure definition for more information.
67type LeveledWriter func(*logtree.ExternalLeveledPayload)
68
69// Write implements io.Writer. Any write performed into the Converter will
70// populate the converter's internal buffer, and any time that buffer contains
71// a full line it will be sent over to the Parser for processing.
72func (e *Converter) Write(p []byte) (int, error) {
73 e.mu.Lock()
74 defer e.mu.Unlock()
75
76 if e.MaximumLineLength <= 0 {
77 e.MaximumLineLength = 1024
78 }
79 if e.lb == nil {
80 e.lb = logbuffer.NewLineBuffer(e.MaximumLineLength, func(l *logbuffer.Line) {
81 e.Parser(l, e.insert)
82 })
83 }
84 return e.lb.Write(p)
85}
86
87// insert implements LeveledWriter.
88func (e *Converter) insert(d *logtree.ExternalLeveledPayload) {
89 if err := logtree.LogExternalLeveled(e.LeveledLogger, d); err != nil {
90 e.LeveledLogger.Fatal("Could not insert unrawed entry: %v", err)
91 }
92}
93
94// NamedPipeReader returns a supervisor runnable that continously reads logs
95// from the given path and attempts to parse them into leveled logs using this
96// Converter.
97//
98// If the given path doesn't exist, a named pipe will be created there before
99// the function exits. This guarantee means that as long as any writing process
100// is not started before NamedPipeReader returns ther is no need to
101// remove/recreate the named pipe.
102//
103// TODO(q3k): defer the creation of the FIFO to localstorage so this doesn't
104// need to be taken care of in the first place.
105func (e *Converter) NamedPipeReader(path string) (supervisor.Runnable, error) {
106 if _, err := os.Stat(path); os.IsNotExist(err) {
107 if err := syscall.Mkfifo(path, 0666); err != nil {
108 return nil, fmt.Errorf("when creating named pipe: %w", err)
109 }
110 }
111 return func(ctx context.Context) error {
112 fifo, err := os.OpenFile(path, os.O_RDONLY, os.ModeNamedPipe)
113 if err != nil {
114 return fmt.Errorf("when opening named pipe: %w", err)
115 }
116 supervisor.Signal(ctx, supervisor.SignalHealthy)
117 for {
118 // Quit if requested.
119 if ctx.Err() != nil {
120 return ctx.Err()
121 }
122
123 n, err := io.Copy(e, fifo)
124 if n == 0 && err == nil {
125 // Hack because pipes/FIFOs can return zero reads when nobody
126 // is writing. To avoid busy-looping, sleep a bit before
127 // retrying. This does not loose data since the FIFO internal
128 // buffer will stall writes when it becomes full. 10ms maximum
129 // stall in a non-latency critical process (reading debug logs)
130 // is not an issue for us.
131 time.Sleep(10 * time.Millisecond)
132 } else if err != nil {
133 return fmt.Errorf("log pump failed: %v", err)
134 }
135
136 }
137 }, nil
138}