blob: 7790cd0466cd3ccb6d1a2497b19e911f3931ff21 [file] [log] [blame]
Serge Bazanski6c8ee0b2023-04-05 12:29:57 +02001package logtree
2
3import (
4 "encoding/json"
5 "fmt"
6 "strconv"
7 "strings"
8 "time"
9
10 "go.uber.org/zap"
11 "go.uber.org/zap/zapcore"
12
13 "source.monogon.dev/metropolis/pkg/logbuffer"
14)
15
16// Zapify turns a LeveledLogger into a zap.Logger which pipes its output into the
17// LeveledLogger. The message, severity and caller are carried over. Extra fields
18// are appended as JSON to the end of the log line.
19func Zapify(logger LeveledLogger, minimumLevel zapcore.Level) *zap.Logger {
20 p, ok := logger.(*leveledPublisher)
21 if !ok {
22 // Fail fast, as this is a programming error.
23 panic("Expected *leveledPublisher in LeveledLogger from supervisor")
24 }
25
26 ec := zapcore.EncoderConfig{
27 MessageKey: "message",
28 LevelKey: "level",
29 TimeKey: "time",
30 CallerKey: "caller",
31 EncodeLevel: zapcore.LowercaseLevelEncoder,
32 EncodeTime: zapcore.EpochTimeEncoder,
33 EncodeCaller: zapcore.ShortCallerEncoder,
34 }
35 s := zapSink{
36 publisher: p,
37 }
38 s.buffer = logbuffer.NewLineBuffer(4096, s.consumeLine)
39 zc := zapcore.NewCore(zapcore.NewJSONEncoder(ec), s.buffer, minimumLevel)
40 return zap.New(zc, zap.AddCaller())
41}
42
43type zapSink struct {
44 publisher *leveledPublisher
45 buffer *logbuffer.LineBuffer
46}
47
48func (z *zapSink) consumeLine(l *logbuffer.Line) {
49 ze, err := parseZapJSON(l.Data)
50 if err != nil {
51 z.publisher.Warningf("failed to parse zap JSON: %v: %q", err, l.Data)
52 return
53 }
54 message := ze.message
55 if len(ze.extra) > 0 {
56 message += " " + ze.extra
57 }
58 e := &entry{
59 origin: z.publisher.node.dn,
60 leveled: &LeveledPayload{
61 timestamp: ze.time,
62 severity: ze.severity,
63 messages: []string{message},
64 file: ze.file,
65 line: ze.line,
66 },
67 }
68 z.publisher.node.tree.journal.append(e)
69 z.publisher.node.tree.journal.notify(e)
70}
71
72type zapEntry struct {
73 message string
74 severity Severity
75 time time.Time
76 file string
77 line int
78 extra string
79}
80
81func parseZapJSON(s string) (*zapEntry, error) {
82 entry := make(map[string]any)
83 if err := json.Unmarshal([]byte(s), &entry); err != nil {
84 return nil, fmt.Errorf("invalid JSON: %v", err)
85 }
86 message, ok := entry["message"].(string)
87 if !ok {
88 return nil, fmt.Errorf("no message field")
89 }
90 level, ok := entry["level"].(string)
91 if !ok {
92 return nil, fmt.Errorf("no level field")
93 }
94 t, ok := entry["time"].(float64)
95 if !ok {
96 return nil, fmt.Errorf("no time field")
97 }
98 caller, ok := entry["caller"].(string)
99 if !ok {
100 return nil, fmt.Errorf("no caller field")
101 }
102
103 callerParts := strings.Split(caller, ":")
104 if len(callerParts) != 2 {
105 return nil, fmt.Errorf("invalid caller")
106 }
107 callerDirFile := strings.Split(callerParts[0], "/")
108 callerFile := callerDirFile[len(callerDirFile)-1]
109 callerLineS := callerParts[1]
110 callerLine, _ := strconv.Atoi(callerLineS)
111
112 var severity Severity
113 switch level {
114 case "warn":
115 severity = WARNING
116 case "error", "dpanic", "panic", "fatal":
117 severity = ERROR
118 default:
119 severity = INFO
120 }
121
122 secs := int64(t)
123 nsecs := int64((t - float64(secs)) * 1e9)
124
125 delete(entry, "message")
126 delete(entry, "level")
127 delete(entry, "time")
128 delete(entry, "caller")
129 extra := []byte{}
130 if len(entry) > 0 {
131 extra, _ = json.Marshal(entry)
132 }
133 return &zapEntry{
134 message: message,
135 severity: severity,
136 time: time.Unix(secs, nsecs),
137 file: callerFile,
138 line: callerLine,
139 extra: string(extra),
140 }, nil
141}