blob: ef9d913d8fb25e10155c8efcf9f0434126636fde [file] [log] [blame]
Serge Bazanskiebe02592021-07-07 14:23:26 +02001// unraw implements a facility to convert raw logs from external sources into
2// leveled logs.
3//
4// This is not the same as raw logging inside the logtree, which exists to
5// ingest logs that are either fully arbitrary or do not map cleanly to the
6// leveled logging concept. The unraw library is instead made to parse logs
7// from systems that also use leveled logs internally, but emit them to a
8// serialized byte stream that then needs to be turned back into something
9// leveled inside metropolis.
10//
11// Logs converted this way are unfortunately lossy and do not come with the
12// same guarantees as logs directly emitted via logtree. For example, there's
13// no built-in protection against systems emiting fudged timestamps or file
14// locations. Thus, this functionality should be used to interact with trusted
15// systems, not fully arbitrary logs.
16package unraw
17
18import (
19 "context"
20 "fmt"
21 "io"
22 "os"
23 "sync"
24 "syscall"
25 "time"
26
27 "source.monogon.dev/metropolis/pkg/logbuffer"
28 "source.monogon.dev/metropolis/pkg/logtree"
29 "source.monogon.dev/metropolis/pkg/supervisor"
30)
31
32// Parser is a user-defined function for converting a log line received from an
33// external system into a leveled logging payload.
34// The given LeveledWriter should be called for every leveled log entry that
35// results from this line. This means that a parser might skip some lines, or
36// emit multiple leveled payloads per line.
37type Parser func(*logbuffer.Line, LeveledWriter)
38
39// Converter is the main entrypoint of the unraw library. It wraps a
40// LeveledLogger in combination with a Parser to create an io.Writer that can
41// be sent raw log data.
42type Converter struct {
43 // Parser is the user-defined parsing function for converting log lines
44 // into leveled logging payloads. This must be set.
45 Parser Parser
46 // MaximumLineLength is the maximum length of a single log line when
47 // splitting incoming writes into lines. If a line is longer than this, it
48 // will be truncated (and will be sent to the Parser regardless).
49 //
50 // If not set, this defaults to 1024 bytes.
51 MaximumLineLength int
52 // LeveledLogger is the logtree leveled logger into which events from the
53 // Parser will be sent.
54 LeveledLogger logtree.LeveledLogger
55
56 // mu guards lb.
57 mu sync.Mutex
58 // lb is the underlying line buffer used to split incoming data into lines.
59 // It will be initialized on first Write.
60 lb *logbuffer.LineBuffer
61}
62
63// LeveledWriter is called by a Parser for every ExternelLeveledPayload it
64// wishes to emit into a backing LeveledLogger. If the payload is missing some
65// fields, these will default to some sensible values - see the
66// ExternalLeveledPayload structure definition for more information.
67type LeveledWriter func(*logtree.ExternalLeveledPayload)
68
69// Write implements io.Writer. Any write performed into the Converter will
70// populate the converter's internal buffer, and any time that buffer contains
71// a full line it will be sent over to the Parser for processing.
72func (e *Converter) Write(p []byte) (int, error) {
73 e.mu.Lock()
74 defer e.mu.Unlock()
75
76 if e.MaximumLineLength <= 0 {
77 e.MaximumLineLength = 1024
78 }
79 if e.lb == nil {
80 e.lb = logbuffer.NewLineBuffer(e.MaximumLineLength, func(l *logbuffer.Line) {
81 e.Parser(l, e.insert)
82 })
83 }
84 return e.lb.Write(p)
85}
86
87// insert implements LeveledWriter.
88func (e *Converter) insert(d *logtree.ExternalLeveledPayload) {
89 if err := logtree.LogExternalLeveled(e.LeveledLogger, d); err != nil {
90 e.LeveledLogger.Fatal("Could not insert unrawed entry: %v", err)
91 }
92}
93
94// NamedPipeReader returns a supervisor runnable that continously reads logs
95// from the given path and attempts to parse them into leveled logs using this
96// Converter.
97//
98// If the given path doesn't exist, a named pipe will be created there before
99// the function exits. This guarantee means that as long as any writing process
100// is not started before NamedPipeReader returns ther is no need to
101// remove/recreate the named pipe.
102//
103// TODO(q3k): defer the creation of the FIFO to localstorage so this doesn't
104// need to be taken care of in the first place.
105func (e *Converter) NamedPipeReader(path string) (supervisor.Runnable, error) {
106 if _, err := os.Stat(path); os.IsNotExist(err) {
107 if err := syscall.Mkfifo(path, 0666); err != nil {
108 return nil, fmt.Errorf("when creating named pipe: %w", err)
109 }
110 }
111 return func(ctx context.Context) error {
112 fifo, err := os.OpenFile(path, os.O_RDONLY, os.ModeNamedPipe)
113 if err != nil {
114 return fmt.Errorf("when opening named pipe: %w", err)
115 }
Serge Bazanski1fd64a22021-10-29 16:59:40 +0200116 go func() {
117 <-ctx.Done()
118 fifo.Close()
119 }()
Serge Bazanski826a9e92021-10-05 21:23:48 +0200120 defer fifo.Close()
Serge Bazanskiebe02592021-07-07 14:23:26 +0200121 supervisor.Signal(ctx, supervisor.SignalHealthy)
122 for {
123 // Quit if requested.
124 if ctx.Err() != nil {
125 return ctx.Err()
126 }
127
128 n, err := io.Copy(e, fifo)
129 if n == 0 && err == nil {
130 // Hack because pipes/FIFOs can return zero reads when nobody
131 // is writing. To avoid busy-looping, sleep a bit before
132 // retrying. This does not loose data since the FIFO internal
133 // buffer will stall writes when it becomes full. 10ms maximum
134 // stall in a non-latency critical process (reading debug logs)
135 // is not an issue for us.
136 time.Sleep(10 * time.Millisecond)
137 } else if err != nil {
Serge Bazanski1fd64a22021-10-29 16:59:40 +0200138 // Since we close fifo on context cancel, we'll get a 'file is already closed'
139 // io error here. Translate that over to the context error that caused it.
140 if ctx.Err() != nil {
141 return ctx.Err()
142 }
143 return fmt.Errorf("log pump failed: %w", err)
Serge Bazanskiebe02592021-07-07 14:23:26 +0200144 }
145
146 }
147 }, nil
148}