blob: 7790cd0466cd3ccb6d1a2497b19e911f3931ff21 [file] [log] [blame] [edit]
package logtree
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"source.monogon.dev/metropolis/pkg/logbuffer"
)
// Zapify turns a LeveledLogger into a zap.Logger which pipes its output into the
// LeveledLogger. The message, severity and caller are carried over. Extra fields
// are appended as JSON to the end of the log line.
func Zapify(logger LeveledLogger, minimumLevel zapcore.Level) *zap.Logger {
p, ok := logger.(*leveledPublisher)
if !ok {
// Fail fast, as this is a programming error.
panic("Expected *leveledPublisher in LeveledLogger from supervisor")
}
ec := zapcore.EncoderConfig{
MessageKey: "message",
LevelKey: "level",
TimeKey: "time",
CallerKey: "caller",
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.EpochTimeEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}
s := zapSink{
publisher: p,
}
s.buffer = logbuffer.NewLineBuffer(4096, s.consumeLine)
zc := zapcore.NewCore(zapcore.NewJSONEncoder(ec), s.buffer, minimumLevel)
return zap.New(zc, zap.AddCaller())
}
type zapSink struct {
publisher *leveledPublisher
buffer *logbuffer.LineBuffer
}
func (z *zapSink) consumeLine(l *logbuffer.Line) {
ze, err := parseZapJSON(l.Data)
if err != nil {
z.publisher.Warningf("failed to parse zap JSON: %v: %q", err, l.Data)
return
}
message := ze.message
if len(ze.extra) > 0 {
message += " " + ze.extra
}
e := &entry{
origin: z.publisher.node.dn,
leveled: &LeveledPayload{
timestamp: ze.time,
severity: ze.severity,
messages: []string{message},
file: ze.file,
line: ze.line,
},
}
z.publisher.node.tree.journal.append(e)
z.publisher.node.tree.journal.notify(e)
}
type zapEntry struct {
message string
severity Severity
time time.Time
file string
line int
extra string
}
func parseZapJSON(s string) (*zapEntry, error) {
entry := make(map[string]any)
if err := json.Unmarshal([]byte(s), &entry); err != nil {
return nil, fmt.Errorf("invalid JSON: %v", err)
}
message, ok := entry["message"].(string)
if !ok {
return nil, fmt.Errorf("no message field")
}
level, ok := entry["level"].(string)
if !ok {
return nil, fmt.Errorf("no level field")
}
t, ok := entry["time"].(float64)
if !ok {
return nil, fmt.Errorf("no time field")
}
caller, ok := entry["caller"].(string)
if !ok {
return nil, fmt.Errorf("no caller field")
}
callerParts := strings.Split(caller, ":")
if len(callerParts) != 2 {
return nil, fmt.Errorf("invalid caller")
}
callerDirFile := strings.Split(callerParts[0], "/")
callerFile := callerDirFile[len(callerDirFile)-1]
callerLineS := callerParts[1]
callerLine, _ := strconv.Atoi(callerLineS)
var severity Severity
switch level {
case "warn":
severity = WARNING
case "error", "dpanic", "panic", "fatal":
severity = ERROR
default:
severity = INFO
}
secs := int64(t)
nsecs := int64((t - float64(secs)) * 1e9)
delete(entry, "message")
delete(entry, "level")
delete(entry, "time")
delete(entry, "caller")
extra := []byte{}
if len(entry) > 0 {
extra, _ = json.Marshal(entry)
}
return &zapEntry{
message: message,
severity: severity,
time: time.Unix(secs, nsecs),
file: callerFile,
line: callerLine,
extra: string(extra),
}, nil
}