blob: 88ab9d1a6dda01b2cdcb415dc1b42d0ffba05230 [file] [log] [blame]
Tim Windelschmidtc2290c22024-08-15 19:56:00 +02001// Package unraw implements a facility to convert raw logs from external sources
2// into leveled logs.
Serge Bazanskiebe02592021-07-07 14:23:26 +02003//
4// This is not the same as raw logging inside the logtree, which exists to
5// ingest logs that are either fully arbitrary or do not map cleanly to the
6// leveled logging concept. The unraw library is instead made to parse logs
7// from systems that also use leveled logs internally, but emit them to a
8// serialized byte stream that then needs to be turned back into something
9// leveled inside metropolis.
10//
11// Logs converted this way are unfortunately lossy and do not come with the
12// same guarantees as logs directly emitted via logtree. For example, there's
13// no built-in protection against systems emiting fudged timestamps or file
14// locations. Thus, this functionality should be used to interact with trusted
15// systems, not fully arbitrary logs.
16package unraw
17
18import (
19 "context"
20 "fmt"
21 "io"
22 "os"
23 "sync"
24 "syscall"
25 "time"
26
Serge Bazanski3c5d0632024-09-12 10:49:12 +000027 "source.monogon.dev/go/logging"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020028 "source.monogon.dev/osbase/logbuffer"
29 "source.monogon.dev/osbase/logtree"
30 "source.monogon.dev/osbase/supervisor"
Serge Bazanskiebe02592021-07-07 14:23:26 +020031)
32
33// Parser is a user-defined function for converting a log line received from an
34// external system into a leveled logging payload.
35// The given LeveledWriter should be called for every leveled log entry that
36// results from this line. This means that a parser might skip some lines, or
37// emit multiple leveled payloads per line.
38type Parser func(*logbuffer.Line, LeveledWriter)
39
40// Converter is the main entrypoint of the unraw library. It wraps a
41// LeveledLogger in combination with a Parser to create an io.Writer that can
42// be sent raw log data.
43type Converter struct {
44 // Parser is the user-defined parsing function for converting log lines
45 // into leveled logging payloads. This must be set.
46 Parser Parser
47 // MaximumLineLength is the maximum length of a single log line when
48 // splitting incoming writes into lines. If a line is longer than this, it
49 // will be truncated (and will be sent to the Parser regardless).
50 //
51 // If not set, this defaults to 1024 bytes.
52 MaximumLineLength int
53 // LeveledLogger is the logtree leveled logger into which events from the
54 // Parser will be sent.
Serge Bazanski3c5d0632024-09-12 10:49:12 +000055 LeveledLogger logging.Leveled
Serge Bazanskiebe02592021-07-07 14:23:26 +020056
57 // mu guards lb.
58 mu sync.Mutex
59 // lb is the underlying line buffer used to split incoming data into lines.
60 // It will be initialized on first Write.
61 lb *logbuffer.LineBuffer
62}
63
64// LeveledWriter is called by a Parser for every ExternelLeveledPayload it
65// wishes to emit into a backing LeveledLogger. If the payload is missing some
66// fields, these will default to some sensible values - see the
67// ExternalLeveledPayload structure definition for more information.
68type LeveledWriter func(*logtree.ExternalLeveledPayload)
69
70// Write implements io.Writer. Any write performed into the Converter will
71// populate the converter's internal buffer, and any time that buffer contains
72// a full line it will be sent over to the Parser for processing.
73func (e *Converter) Write(p []byte) (int, error) {
74 e.mu.Lock()
75 defer e.mu.Unlock()
76
77 if e.MaximumLineLength <= 0 {
78 e.MaximumLineLength = 1024
79 }
80 if e.lb == nil {
81 e.lb = logbuffer.NewLineBuffer(e.MaximumLineLength, func(l *logbuffer.Line) {
82 e.Parser(l, e.insert)
83 })
84 }
85 return e.lb.Write(p)
86}
87
88// insert implements LeveledWriter.
89func (e *Converter) insert(d *logtree.ExternalLeveledPayload) {
90 if err := logtree.LogExternalLeveled(e.LeveledLogger, d); err != nil {
91 e.LeveledLogger.Fatal("Could not insert unrawed entry: %v", err)
92 }
93}
94
95// NamedPipeReader returns a supervisor runnable that continously reads logs
96// from the given path and attempts to parse them into leveled logs using this
97// Converter.
98//
99// If the given path doesn't exist, a named pipe will be created there before
100// the function exits. This guarantee means that as long as any writing process
101// is not started before NamedPipeReader returns ther is no need to
102// remove/recreate the named pipe.
103//
104// TODO(q3k): defer the creation of the FIFO to localstorage so this doesn't
105// need to be taken care of in the first place.
106func (e *Converter) NamedPipeReader(path string) (supervisor.Runnable, error) {
107 if _, err := os.Stat(path); os.IsNotExist(err) {
108 if err := syscall.Mkfifo(path, 0666); err != nil {
109 return nil, fmt.Errorf("when creating named pipe: %w", err)
110 }
111 }
112 return func(ctx context.Context) error {
113 fifo, err := os.OpenFile(path, os.O_RDONLY, os.ModeNamedPipe)
114 if err != nil {
115 return fmt.Errorf("when opening named pipe: %w", err)
116 }
Serge Bazanski1fd64a22021-10-29 16:59:40 +0200117 go func() {
118 <-ctx.Done()
119 fifo.Close()
120 }()
Serge Bazanski826a9e92021-10-05 21:23:48 +0200121 defer fifo.Close()
Serge Bazanskiebe02592021-07-07 14:23:26 +0200122 supervisor.Signal(ctx, supervisor.SignalHealthy)
123 for {
124 // Quit if requested.
125 if ctx.Err() != nil {
126 return ctx.Err()
127 }
128
129 n, err := io.Copy(e, fifo)
130 if n == 0 && err == nil {
131 // Hack because pipes/FIFOs can return zero reads when nobody
132 // is writing. To avoid busy-looping, sleep a bit before
133 // retrying. This does not loose data since the FIFO internal
134 // buffer will stall writes when it becomes full. 10ms maximum
135 // stall in a non-latency critical process (reading debug logs)
136 // is not an issue for us.
137 time.Sleep(10 * time.Millisecond)
138 } else if err != nil {
Serge Bazanski1fd64a22021-10-29 16:59:40 +0200139 // Since we close fifo on context cancel, we'll get a 'file is already closed'
140 // io error here. Translate that over to the context error that caused it.
141 if ctx.Err() != nil {
142 return ctx.Err()
143 }
144 return fmt.Errorf("log pump failed: %w", err)
Serge Bazanskiebe02592021-07-07 14:23:26 +0200145 }
146
147 }
148 }, nil
149}