m/pkg/logtree: add Zap and gRPC facades
Change-Id: I37a861edeba9b916e17598da559bd378e494ec35
Reviewed-on: https://review.monogon.dev/c/monogon/+/1486
Reviewed-by: Leopold Schabel <leo@monogon.tech>
Tested-by: Jenkins CI
diff --git a/go.mod b/go.mod
index f7a076c..619614d 100644
--- a/go.mod
+++ b/go.mod
@@ -384,7 +384,7 @@
go.opentelemetry.io/proto/otlp v0.11.0 // indirect
go.starlark.net v0.0.0-20210223155950-e043a3d3c984
go.uber.org/atomic v1.9.0 // indirect
- go.uber.org/zap v1.19.1 // indirect
+ go.uber.org/zap v1.19.1
golang.org/x/arch v0.0.0-20190927153633-4e8777c89be4 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/term v0.5.0 // indirect
diff --git a/metropolis/pkg/logbuffer/linebuffer.go b/metropolis/pkg/logbuffer/linebuffer.go
index 16f7b1a..92b70e9 100644
--- a/metropolis/pkg/logbuffer/linebuffer.go
+++ b/metropolis/pkg/logbuffer/linebuffer.go
@@ -163,3 +163,7 @@
}
return nil
}
+
+func (l *LineBuffer) Sync() error {
+ return nil
+}
diff --git a/metropolis/pkg/logtree/BUILD.bazel b/metropolis/pkg/logtree/BUILD.bazel
index 2e2f07e..f6c696c 100644
--- a/metropolis/pkg/logtree/BUILD.bazel
+++ b/metropolis/pkg/logtree/BUILD.bazel
@@ -4,6 +4,7 @@
name = "logtree",
srcs = [
"doc.go",
+ "grpc.go",
"journal.go",
"journal_entry.go",
"journal_subscriber.go",
@@ -15,6 +16,7 @@
"logtree_entry.go",
"logtree_publisher.go",
"testhelpers.go",
+ "zap.go",
],
importpath = "source.monogon.dev/metropolis/pkg/logtree",
visibility = ["//metropolis:__subpackages__"],
@@ -22,7 +24,10 @@
"//metropolis/pkg/logbuffer",
"//metropolis/proto/common",
"@com_github_mitchellh_go_wordwrap//:go-wordwrap",
+ "@org_golang_google_grpc//grpclog",
"@org_golang_google_protobuf//types/known/timestamppb",
+ "@org_uber_go_zap//:zap",
+ "@org_uber_go_zap//zapcore",
],
)
@@ -32,7 +37,11 @@
"journal_test.go",
"klog_test.go",
"logtree_test.go",
+ "zap_test.go",
],
embed = [":logtree"],
- deps = ["@com_github_google_go_cmp//cmp"],
+ deps = [
+ "@com_github_google_go_cmp//cmp",
+ "@org_uber_go_zap//:zap",
+ ],
)
diff --git a/metropolis/pkg/logtree/grpc.go b/metropolis/pkg/logtree/grpc.go
new file mode 100644
index 0000000..3b2594d
--- /dev/null
+++ b/metropolis/pkg/logtree/grpc.go
@@ -0,0 +1,75 @@
+package logtree
+
+import "google.golang.org/grpc/grpclog"
+
+// GRPCify turns a LeveledLogger into a go-grpc compatible logger.
+func GRPCify(logger LeveledLogger) grpclog.LoggerV2 {
+ lp, ok := logger.(*leveledPublisher)
+ if !ok {
+ // Fail fast, as this is a programming error.
+ panic("Expected *leveledPublisher in LeveledLogger from supervisor")
+ }
+
+ lp2 := *lp
+ lp2.depth += 1
+
+ return &leveledGRPCV2{
+ lp: &lp2,
+ }
+}
+
+type leveledGRPCV2 struct {
+ lp *leveledPublisher
+}
+
+func (g *leveledGRPCV2) Info(args ...interface{}) {
+ g.lp.Info(args...)
+}
+
+func (g *leveledGRPCV2) Infoln(args ...interface{}) {
+ g.lp.Info(args...)
+}
+
+func (g *leveledGRPCV2) Infof(format string, args ...interface{}) {
+ g.lp.Infof(format, args...)
+}
+
+func (g *leveledGRPCV2) Warning(args ...interface{}) {
+ g.lp.Warning(args...)
+}
+
+func (g *leveledGRPCV2) Warningln(args ...interface{}) {
+ g.lp.Warning(args...)
+}
+
+func (g *leveledGRPCV2) Warningf(format string, args ...interface{}) {
+ g.lp.Warningf(format, args...)
+}
+
+func (g *leveledGRPCV2) Error(args ...interface{}) {
+ g.lp.Error(args...)
+}
+
+func (g *leveledGRPCV2) Errorln(args ...interface{}) {
+ g.lp.Error(args...)
+}
+
+func (g *leveledGRPCV2) Errorf(format string, args ...interface{}) {
+ g.lp.Errorf(format, args...)
+}
+
+func (g *leveledGRPCV2) Fatal(args ...interface{}) {
+ g.lp.Fatal(args...)
+}
+
+func (g *leveledGRPCV2) Fatalln(args ...interface{}) {
+ g.lp.Fatal(args...)
+}
+
+func (g *leveledGRPCV2) Fatalf(format string, args ...interface{}) {
+ g.lp.Fatalf(format, args...)
+}
+
+func (g *leveledGRPCV2) V(l int) bool {
+ return g.lp.V(VerbosityLevel(l)).Enabled()
+}
diff --git a/metropolis/pkg/logtree/zap.go b/metropolis/pkg/logtree/zap.go
new file mode 100644
index 0000000..7790cd0
--- /dev/null
+++ b/metropolis/pkg/logtree/zap.go
@@ -0,0 +1,141 @@
+package logtree
+
+import (
+ "encoding/json"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+
+ "source.monogon.dev/metropolis/pkg/logbuffer"
+)
+
+// Zapify turns a LeveledLogger into a zap.Logger which pipes its output into the
+// LeveledLogger. The message, severity and caller are carried over. Extra fields
+// are appended as JSON to the end of the log line.
+func Zapify(logger LeveledLogger, minimumLevel zapcore.Level) *zap.Logger {
+ p, ok := logger.(*leveledPublisher)
+ if !ok {
+ // Fail fast, as this is a programming error.
+ panic("Expected *leveledPublisher in LeveledLogger from supervisor")
+ }
+
+ ec := zapcore.EncoderConfig{
+ MessageKey: "message",
+ LevelKey: "level",
+ TimeKey: "time",
+ CallerKey: "caller",
+ EncodeLevel: zapcore.LowercaseLevelEncoder,
+ EncodeTime: zapcore.EpochTimeEncoder,
+ EncodeCaller: zapcore.ShortCallerEncoder,
+ }
+ s := zapSink{
+ publisher: p,
+ }
+ s.buffer = logbuffer.NewLineBuffer(4096, s.consumeLine)
+ zc := zapcore.NewCore(zapcore.NewJSONEncoder(ec), s.buffer, minimumLevel)
+ return zap.New(zc, zap.AddCaller())
+}
+
+type zapSink struct {
+ publisher *leveledPublisher
+ buffer *logbuffer.LineBuffer
+}
+
+func (z *zapSink) consumeLine(l *logbuffer.Line) {
+ ze, err := parseZapJSON(l.Data)
+ if err != nil {
+ z.publisher.Warningf("failed to parse zap JSON: %v: %q", err, l.Data)
+ return
+ }
+ message := ze.message
+ if len(ze.extra) > 0 {
+ message += " " + ze.extra
+ }
+ e := &entry{
+ origin: z.publisher.node.dn,
+ leveled: &LeveledPayload{
+ timestamp: ze.time,
+ severity: ze.severity,
+ messages: []string{message},
+ file: ze.file,
+ line: ze.line,
+ },
+ }
+ z.publisher.node.tree.journal.append(e)
+ z.publisher.node.tree.journal.notify(e)
+}
+
+type zapEntry struct {
+ message string
+ severity Severity
+ time time.Time
+ file string
+ line int
+ extra string
+}
+
+func parseZapJSON(s string) (*zapEntry, error) {
+ entry := make(map[string]any)
+ if err := json.Unmarshal([]byte(s), &entry); err != nil {
+ return nil, fmt.Errorf("invalid JSON: %v", err)
+ }
+ message, ok := entry["message"].(string)
+ if !ok {
+ return nil, fmt.Errorf("no message field")
+ }
+ level, ok := entry["level"].(string)
+ if !ok {
+ return nil, fmt.Errorf("no level field")
+ }
+ t, ok := entry["time"].(float64)
+ if !ok {
+ return nil, fmt.Errorf("no time field")
+ }
+ caller, ok := entry["caller"].(string)
+ if !ok {
+ return nil, fmt.Errorf("no caller field")
+ }
+
+ callerParts := strings.Split(caller, ":")
+ if len(callerParts) != 2 {
+ return nil, fmt.Errorf("invalid caller")
+ }
+ callerDirFile := strings.Split(callerParts[0], "/")
+ callerFile := callerDirFile[len(callerDirFile)-1]
+ callerLineS := callerParts[1]
+ callerLine, _ := strconv.Atoi(callerLineS)
+
+ var severity Severity
+ switch level {
+ case "warn":
+ severity = WARNING
+ case "error", "dpanic", "panic", "fatal":
+ severity = ERROR
+ default:
+ severity = INFO
+ }
+
+ secs := int64(t)
+ nsecs := int64((t - float64(secs)) * 1e9)
+
+ delete(entry, "message")
+ delete(entry, "level")
+ delete(entry, "time")
+ delete(entry, "caller")
+ extra := []byte{}
+ if len(entry) > 0 {
+ extra, _ = json.Marshal(entry)
+ }
+ return &zapEntry{
+ message: message,
+ severity: severity,
+ time: time.Unix(secs, nsecs),
+ file: callerFile,
+ line: callerLine,
+ extra: string(extra),
+ }, nil
+}
diff --git a/metropolis/pkg/logtree/zap_test.go b/metropolis/pkg/logtree/zap_test.go
new file mode 100644
index 0000000..3917cd8
--- /dev/null
+++ b/metropolis/pkg/logtree/zap_test.go
@@ -0,0 +1,42 @@
+package logtree
+
+import (
+ "testing"
+
+ "go.uber.org/zap"
+)
+
+func TestZapify(t *testing.T) {
+ lt := New()
+
+ z := Zapify(lt.MustLeveledFor("zap"), zap.InfoLevel)
+ z.Info("foo", zap.String("strp", "strv"), zap.Int("intp", 42))
+ z.Warn("foo!", zap.String("strp", "strv"), zap.Int("intp", 1337))
+ z.Error("foo!!")
+
+ res, err := lt.Read("zap", WithBacklog(BacklogAllAvailable))
+ if err != nil {
+ t.Fatalf("Read: %v", err)
+ }
+ defer res.Close()
+
+ if want, got := 3, len(res.Backlog); want != got {
+ t.Errorf("Wanted %d entries, got %d", want, got)
+ } else {
+ for i, te := range []struct {
+ msg string
+ sev Severity
+ }{
+ {`foo {"intp":42,"strp":"strv"}`, INFO},
+ {`foo! {"intp":1337,"strp":"strv"}`, WARNING},
+ {`foo!!`, ERROR},
+ } {
+ if want, got := te.msg, res.Backlog[i].Leveled.messages[0]; want != got {
+ t.Errorf("Line %d: wanted message %q, got %q", i, want, got)
+ }
+ if want, got := te.sev, res.Backlog[i].Leveled.severity; want != got {
+ t.Errorf("Line %d: wanted level %s, got %s", i, want, got)
+ }
+ }
+ }
+}