blob: 6f84e5a916d2d070fef23562bd301941a5a7e0df [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Tim Windelschmidtc2290c22024-08-15 19:56:00 +02004// Package unraw implements a facility to convert raw logs from external sources
5// into leveled logs.
Serge Bazanskiebe02592021-07-07 14:23:26 +02006//
7// This is not the same as raw logging inside the logtree, which exists to
8// ingest logs that are either fully arbitrary or do not map cleanly to the
9// leveled logging concept. The unraw library is instead made to parse logs
10// from systems that also use leveled logs internally, but emit them to a
11// serialized byte stream that then needs to be turned back into something
12// leveled inside metropolis.
13//
14// Logs converted this way are unfortunately lossy and do not come with the
15// same guarantees as logs directly emitted via logtree. For example, there's
16// no built-in protection against systems emiting fudged timestamps or file
17// locations. Thus, this functionality should be used to interact with trusted
18// systems, not fully arbitrary logs.
19package unraw
20
21import (
22 "context"
23 "fmt"
24 "io"
25 "os"
26 "sync"
27 "syscall"
28 "time"
29
Serge Bazanski3c5d0632024-09-12 10:49:12 +000030 "source.monogon.dev/go/logging"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020031 "source.monogon.dev/osbase/logbuffer"
32 "source.monogon.dev/osbase/logtree"
33 "source.monogon.dev/osbase/supervisor"
Serge Bazanskiebe02592021-07-07 14:23:26 +020034)
35
36// Parser is a user-defined function for converting a log line received from an
37// external system into a leveled logging payload.
38// The given LeveledWriter should be called for every leveled log entry that
39// results from this line. This means that a parser might skip some lines, or
40// emit multiple leveled payloads per line.
41type Parser func(*logbuffer.Line, LeveledWriter)
42
43// Converter is the main entrypoint of the unraw library. It wraps a
44// LeveledLogger in combination with a Parser to create an io.Writer that can
45// be sent raw log data.
46type Converter struct {
47 // Parser is the user-defined parsing function for converting log lines
48 // into leveled logging payloads. This must be set.
49 Parser Parser
50 // MaximumLineLength is the maximum length of a single log line when
51 // splitting incoming writes into lines. If a line is longer than this, it
52 // will be truncated (and will be sent to the Parser regardless).
53 //
54 // If not set, this defaults to 1024 bytes.
55 MaximumLineLength int
56 // LeveledLogger is the logtree leveled logger into which events from the
57 // Parser will be sent.
Serge Bazanski3c5d0632024-09-12 10:49:12 +000058 LeveledLogger logging.Leveled
Serge Bazanskiebe02592021-07-07 14:23:26 +020059
60 // mu guards lb.
61 mu sync.Mutex
62 // lb is the underlying line buffer used to split incoming data into lines.
63 // It will be initialized on first Write.
64 lb *logbuffer.LineBuffer
65}
66
67// LeveledWriter is called by a Parser for every ExternelLeveledPayload it
68// wishes to emit into a backing LeveledLogger. If the payload is missing some
69// fields, these will default to some sensible values - see the
70// ExternalLeveledPayload structure definition for more information.
71type LeveledWriter func(*logtree.ExternalLeveledPayload)
72
73// Write implements io.Writer. Any write performed into the Converter will
74// populate the converter's internal buffer, and any time that buffer contains
75// a full line it will be sent over to the Parser for processing.
76func (e *Converter) Write(p []byte) (int, error) {
77 e.mu.Lock()
78 defer e.mu.Unlock()
79
80 if e.MaximumLineLength <= 0 {
81 e.MaximumLineLength = 1024
82 }
83 if e.lb == nil {
84 e.lb = logbuffer.NewLineBuffer(e.MaximumLineLength, func(l *logbuffer.Line) {
85 e.Parser(l, e.insert)
86 })
87 }
88 return e.lb.Write(p)
89}
90
91// insert implements LeveledWriter.
92func (e *Converter) insert(d *logtree.ExternalLeveledPayload) {
93 if err := logtree.LogExternalLeveled(e.LeveledLogger, d); err != nil {
94 e.LeveledLogger.Fatal("Could not insert unrawed entry: %v", err)
95 }
96}
97
98// NamedPipeReader returns a supervisor runnable that continously reads logs
99// from the given path and attempts to parse them into leveled logs using this
100// Converter.
101//
102// If the given path doesn't exist, a named pipe will be created there before
103// the function exits. This guarantee means that as long as any writing process
104// is not started before NamedPipeReader returns ther is no need to
105// remove/recreate the named pipe.
106//
107// TODO(q3k): defer the creation of the FIFO to localstorage so this doesn't
108// need to be taken care of in the first place.
109func (e *Converter) NamedPipeReader(path string) (supervisor.Runnable, error) {
110 if _, err := os.Stat(path); os.IsNotExist(err) {
111 if err := syscall.Mkfifo(path, 0666); err != nil {
112 return nil, fmt.Errorf("when creating named pipe: %w", err)
113 }
114 }
115 return func(ctx context.Context) error {
116 fifo, err := os.OpenFile(path, os.O_RDONLY, os.ModeNamedPipe)
117 if err != nil {
118 return fmt.Errorf("when opening named pipe: %w", err)
119 }
Serge Bazanski1fd64a22021-10-29 16:59:40 +0200120 go func() {
121 <-ctx.Done()
122 fifo.Close()
123 }()
Serge Bazanski826a9e92021-10-05 21:23:48 +0200124 defer fifo.Close()
Serge Bazanskiebe02592021-07-07 14:23:26 +0200125 supervisor.Signal(ctx, supervisor.SignalHealthy)
126 for {
127 // Quit if requested.
128 if ctx.Err() != nil {
129 return ctx.Err()
130 }
131
132 n, err := io.Copy(e, fifo)
133 if n == 0 && err == nil {
134 // Hack because pipes/FIFOs can return zero reads when nobody
135 // is writing. To avoid busy-looping, sleep a bit before
136 // retrying. This does not loose data since the FIFO internal
137 // buffer will stall writes when it becomes full. 10ms maximum
138 // stall in a non-latency critical process (reading debug logs)
139 // is not an issue for us.
140 time.Sleep(10 * time.Millisecond)
141 } else if err != nil {
Serge Bazanski1fd64a22021-10-29 16:59:40 +0200142 // Since we close fifo on context cancel, we'll get a 'file is already closed'
143 // io error here. Translate that over to the context error that caused it.
144 if ctx.Err() != nil {
145 return ctx.Err()
146 }
147 return fmt.Errorf("log pump failed: %w", err)
Serge Bazanskiebe02592021-07-07 14:23:26 +0200148 }
149
150 }
151 }, nil
152}