m/pkg/logtree: add Zap and gRPC facades

Change-Id: I37a861edeba9b916e17598da559bd378e494ec35
Reviewed-on: https://review.monogon.dev/c/monogon/+/1486
Reviewed-by: Leopold Schabel <leo@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/pkg/logbuffer/linebuffer.go b/metropolis/pkg/logbuffer/linebuffer.go
index 16f7b1a..92b70e9 100644
--- a/metropolis/pkg/logbuffer/linebuffer.go
+++ b/metropolis/pkg/logbuffer/linebuffer.go
@@ -163,3 +163,7 @@
 	}
 	return nil
 }
+
+func (l *LineBuffer) Sync() error {
+	return nil
+}
diff --git a/metropolis/pkg/logtree/BUILD.bazel b/metropolis/pkg/logtree/BUILD.bazel
index 2e2f07e..f6c696c 100644
--- a/metropolis/pkg/logtree/BUILD.bazel
+++ b/metropolis/pkg/logtree/BUILD.bazel
@@ -4,6 +4,7 @@
     name = "logtree",
     srcs = [
         "doc.go",
+        "grpc.go",
         "journal.go",
         "journal_entry.go",
         "journal_subscriber.go",
@@ -15,6 +16,7 @@
         "logtree_entry.go",
         "logtree_publisher.go",
         "testhelpers.go",
+        "zap.go",
     ],
     importpath = "source.monogon.dev/metropolis/pkg/logtree",
     visibility = ["//metropolis:__subpackages__"],
@@ -22,7 +24,10 @@
         "//metropolis/pkg/logbuffer",
         "//metropolis/proto/common",
         "@com_github_mitchellh_go_wordwrap//:go-wordwrap",
+        "@org_golang_google_grpc//grpclog",
         "@org_golang_google_protobuf//types/known/timestamppb",
+        "@org_uber_go_zap//:zap",
+        "@org_uber_go_zap//zapcore",
     ],
 )
 
@@ -32,7 +37,11 @@
         "journal_test.go",
         "klog_test.go",
         "logtree_test.go",
+        "zap_test.go",
     ],
     embed = [":logtree"],
-    deps = ["@com_github_google_go_cmp//cmp"],
+    deps = [
+        "@com_github_google_go_cmp//cmp",
+        "@org_uber_go_zap//:zap",
+    ],
 )
diff --git a/metropolis/pkg/logtree/grpc.go b/metropolis/pkg/logtree/grpc.go
new file mode 100644
index 0000000..3b2594d
--- /dev/null
+++ b/metropolis/pkg/logtree/grpc.go
@@ -0,0 +1,75 @@
+package logtree
+
+import "google.golang.org/grpc/grpclog"
+
+// GRPCify turns a LeveledLogger into a go-grpc compatible logger.
+func GRPCify(logger LeveledLogger) grpclog.LoggerV2 {
+	lp, ok := logger.(*leveledPublisher)
+	if !ok {
+		// Fail fast, as this is a programming error.
+		panic("Expected *leveledPublisher in LeveledLogger from supervisor")
+	}
+
+	lp2 := *lp
+	lp2.depth += 1
+
+	return &leveledGRPCV2{
+		lp: &lp2,
+	}
+}
+
+type leveledGRPCV2 struct {
+	lp *leveledPublisher
+}
+
+func (g *leveledGRPCV2) Info(args ...interface{}) {
+	g.lp.Info(args...)
+}
+
+func (g *leveledGRPCV2) Infoln(args ...interface{}) {
+	g.lp.Info(args...)
+}
+
+func (g *leveledGRPCV2) Infof(format string, args ...interface{}) {
+	g.lp.Infof(format, args...)
+}
+
+func (g *leveledGRPCV2) Warning(args ...interface{}) {
+	g.lp.Warning(args...)
+}
+
+func (g *leveledGRPCV2) Warningln(args ...interface{}) {
+	g.lp.Warning(args...)
+}
+
+func (g *leveledGRPCV2) Warningf(format string, args ...interface{}) {
+	g.lp.Warningf(format, args...)
+}
+
+func (g *leveledGRPCV2) Error(args ...interface{}) {
+	g.lp.Error(args...)
+}
+
+func (g *leveledGRPCV2) Errorln(args ...interface{}) {
+	g.lp.Error(args...)
+}
+
+func (g *leveledGRPCV2) Errorf(format string, args ...interface{}) {
+	g.lp.Errorf(format, args...)
+}
+
+func (g *leveledGRPCV2) Fatal(args ...interface{}) {
+	g.lp.Fatal(args...)
+}
+
+func (g *leveledGRPCV2) Fatalln(args ...interface{}) {
+	g.lp.Fatal(args...)
+}
+
+func (g *leveledGRPCV2) Fatalf(format string, args ...interface{}) {
+	g.lp.Fatalf(format, args...)
+}
+
+func (g *leveledGRPCV2) V(l int) bool {
+	return g.lp.V(VerbosityLevel(l)).Enabled()
+}
diff --git a/metropolis/pkg/logtree/zap.go b/metropolis/pkg/logtree/zap.go
new file mode 100644
index 0000000..7790cd0
--- /dev/null
+++ b/metropolis/pkg/logtree/zap.go
@@ -0,0 +1,141 @@
+package logtree
+
+import (
+	"encoding/json"
+	"fmt"
+	"strconv"
+	"strings"
+	"time"
+
+	"go.uber.org/zap"
+	"go.uber.org/zap/zapcore"
+
+	"source.monogon.dev/metropolis/pkg/logbuffer"
+)
+
+// Zapify turns a LeveledLogger into a zap.Logger which pipes its output into the
+// LeveledLogger. The message, severity and caller are carried over. Extra fields
+// are appended as JSON to the end of the log line.
+func Zapify(logger LeveledLogger, minimumLevel zapcore.Level) *zap.Logger {
+	p, ok := logger.(*leveledPublisher)
+	if !ok {
+		// Fail fast, as this is a programming error.
+		panic("Expected *leveledPublisher in LeveledLogger from supervisor")
+	}
+
+	ec := zapcore.EncoderConfig{
+		MessageKey:   "message",
+		LevelKey:     "level",
+		TimeKey:      "time",
+		CallerKey:    "caller",
+		EncodeLevel:  zapcore.LowercaseLevelEncoder,
+		EncodeTime:   zapcore.EpochTimeEncoder,
+		EncodeCaller: zapcore.ShortCallerEncoder,
+	}
+	s := zapSink{
+		publisher: p,
+	}
+	s.buffer = logbuffer.NewLineBuffer(4096, s.consumeLine)
+	zc := zapcore.NewCore(zapcore.NewJSONEncoder(ec), s.buffer, minimumLevel)
+	return zap.New(zc, zap.AddCaller())
+}
+
+type zapSink struct {
+	publisher *leveledPublisher
+	buffer    *logbuffer.LineBuffer
+}
+
+func (z *zapSink) consumeLine(l *logbuffer.Line) {
+	ze, err := parseZapJSON(l.Data)
+	if err != nil {
+		z.publisher.Warningf("failed to parse zap JSON: %v: %q", err, l.Data)
+		return
+	}
+	message := ze.message
+	if len(ze.extra) > 0 {
+		message += " " + ze.extra
+	}
+	e := &entry{
+		origin: z.publisher.node.dn,
+		leveled: &LeveledPayload{
+			timestamp: ze.time,
+			severity:  ze.severity,
+			messages:  []string{message},
+			file:      ze.file,
+			line:      ze.line,
+		},
+	}
+	z.publisher.node.tree.journal.append(e)
+	z.publisher.node.tree.journal.notify(e)
+}
+
+type zapEntry struct {
+	message  string
+	severity Severity
+	time     time.Time
+	file     string
+	line     int
+	extra    string
+}
+
+func parseZapJSON(s string) (*zapEntry, error) {
+	entry := make(map[string]any)
+	if err := json.Unmarshal([]byte(s), &entry); err != nil {
+		return nil, fmt.Errorf("invalid JSON: %v", err)
+	}
+	message, ok := entry["message"].(string)
+	if !ok {
+		return nil, fmt.Errorf("no message field")
+	}
+	level, ok := entry["level"].(string)
+	if !ok {
+		return nil, fmt.Errorf("no level field")
+	}
+	t, ok := entry["time"].(float64)
+	if !ok {
+		return nil, fmt.Errorf("no time field")
+	}
+	caller, ok := entry["caller"].(string)
+	if !ok {
+		return nil, fmt.Errorf("no caller field")
+	}
+
+	callerParts := strings.Split(caller, ":")
+	if len(callerParts) != 2 {
+		return nil, fmt.Errorf("invalid caller")
+	}
+	callerDirFile := strings.Split(callerParts[0], "/")
+	callerFile := callerDirFile[len(callerDirFile)-1]
+	callerLineS := callerParts[1]
+	callerLine, _ := strconv.Atoi(callerLineS)
+
+	var severity Severity
+	switch level {
+	case "warn":
+		severity = WARNING
+	case "error", "dpanic", "panic", "fatal":
+		severity = ERROR
+	default:
+		severity = INFO
+	}
+
+	secs := int64(t)
+	nsecs := int64((t - float64(secs)) * 1e9)
+
+	delete(entry, "message")
+	delete(entry, "level")
+	delete(entry, "time")
+	delete(entry, "caller")
+	extra := []byte{}
+	if len(entry) > 0 {
+		extra, _ = json.Marshal(entry)
+	}
+	return &zapEntry{
+		message:  message,
+		severity: severity,
+		time:     time.Unix(secs, nsecs),
+		file:     callerFile,
+		line:     callerLine,
+		extra:    string(extra),
+	}, nil
+}
diff --git a/metropolis/pkg/logtree/zap_test.go b/metropolis/pkg/logtree/zap_test.go
new file mode 100644
index 0000000..3917cd8
--- /dev/null
+++ b/metropolis/pkg/logtree/zap_test.go
@@ -0,0 +1,42 @@
+package logtree
+
+import (
+	"testing"
+
+	"go.uber.org/zap"
+)
+
+func TestZapify(t *testing.T) {
+	lt := New()
+
+	z := Zapify(lt.MustLeveledFor("zap"), zap.InfoLevel)
+	z.Info("foo", zap.String("strp", "strv"), zap.Int("intp", 42))
+	z.Warn("foo!", zap.String("strp", "strv"), zap.Int("intp", 1337))
+	z.Error("foo!!")
+
+	res, err := lt.Read("zap", WithBacklog(BacklogAllAvailable))
+	if err != nil {
+		t.Fatalf("Read: %v", err)
+	}
+	defer res.Close()
+
+	if want, got := 3, len(res.Backlog); want != got {
+		t.Errorf("Wanted %d entries, got %d", want, got)
+	} else {
+		for i, te := range []struct {
+			msg string
+			sev Severity
+		}{
+			{`foo {"intp":42,"strp":"strv"}`, INFO},
+			{`foo! {"intp":1337,"strp":"strv"}`, WARNING},
+			{`foo!!`, ERROR},
+		} {
+			if want, got := te.msg, res.Backlog[i].Leveled.messages[0]; want != got {
+				t.Errorf("Line %d: wanted message %q, got %q", i, want, got)
+			}
+			if want, got := te.sev, res.Backlog[i].Leveled.severity; want != got {
+				t.Errorf("Line %d: wanted level %s, got %s", i, want, got)
+			}
+		}
+	}
+}