m/p/logtree: add kmsg pipe

This allows ingesting Linux kernel (kmsg) logs into logtree with
the original metadata (timestamp, severity) preserved.

Change-Id: Ibb6e3a7a0ae4a008b8e9c98beccb3a95c067cb75
Reviewed-on: https://review.monogon.dev/c/monogon/+/2044
Reviewed-by: Leopold Schabel <leo@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/pkg/logtree/BUILD.bazel b/metropolis/pkg/logtree/BUILD.bazel
index a520e8f..595e8cf 100644
--- a/metropolis/pkg/logtree/BUILD.bazel
+++ b/metropolis/pkg/logtree/BUILD.bazel
@@ -9,6 +9,7 @@
         "journal_entry.go",
         "journal_subscriber.go",
         "klog.go",
+        "kmsg.go",
         "leveled.go",
         "leveled_payload.go",
         "logtree.go",
@@ -29,7 +30,15 @@
         "@org_golang_google_protobuf//types/known/timestamppb",
         "@org_uber_go_zap//:zap",
         "@org_uber_go_zap//zapcore",
-    ],
+    ] + select({
+        "@io_bazel_rules_go//go/platform:android": [
+            "@org_golang_x_sys//unix",
+        ],
+        "@io_bazel_rules_go//go/platform:linux": [
+            "@org_golang_x_sys//unix",
+        ],
+        "//conditions:default": [],
+    }),
 )
 
 go_test(
@@ -37,6 +46,7 @@
     srcs = [
         "journal_test.go",
         "klog_test.go",
+        "kmsg_test.go",
         "logtree_test.go",
         "zap_test.go",
     ],
diff --git a/metropolis/pkg/logtree/kmsg.go b/metropolis/pkg/logtree/kmsg.go
new file mode 100644
index 0000000..03bb6ff
--- /dev/null
+++ b/metropolis/pkg/logtree/kmsg.go
@@ -0,0 +1,141 @@
+//go:build linux
+// +build linux
+
+package logtree
+
+import (
+	"bytes"
+	"context"
+	"errors"
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+	"time"
+
+	"golang.org/x/sys/unix"
+)
+
+const (
+	loglevelEmergency = 0
+	loglevelAlert     = 1
+	loglevelCritical  = 2
+	loglevelError     = 3
+	loglevelWarning   = 4
+	loglevelNotice    = 5
+	loglevelInfo      = 6
+	loglevelDebug     = 7
+)
+
+// KmsgPipe pipes logs from the kernel kmsg interface at /dev/kmsg into the
+// given logger.
+func KmsgPipe(ctx context.Context, lt LeveledLogger) error {
+	publisher, ok := lt.(*leveledPublisher)
+	if !ok {
+		// Fail fast, as this is a programming error.
+		panic("Expected *leveledPublisher in LeveledLogger from supervisor")
+	}
+	kmsgFile, err := os.Open("/dev/kmsg")
+	if err != nil {
+		return err
+	}
+	defer kmsgFile.Close()
+	var lastOverflow time.Time
+	// PRINTK_MESSAGE_MAX in @linux//kernel/printk:internal.h
+	linebuf := make([]byte, 2048)
+	for {
+		n, err := kmsgFile.Read(linebuf)
+		// Best-effort, in Go it is not possible to cancel a Read on-demand.
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+		}
+		if errors.Is(err, unix.EPIPE) {
+			now := time.Now()
+			// Rate-limit to 1 per second
+			if lastOverflow.Add(1 * time.Second).Before(now) {
+				lt.Warning("Lost messages due to kernel ring buffer overflow")
+				lastOverflow = now
+			}
+			continue
+		}
+		if err != nil {
+			return fmt.Errorf("while reading from kmsg: %w", err)
+		}
+		var monotonicRaw unix.Timespec
+		if err := unix.ClockGettime(unix.CLOCK_MONOTONIC_RAW, &monotonicRaw); err != nil {
+			return fmt.Errorf("while getting monotonic timestamp: %w", err)
+		}
+		p := parseKmsg(time.Now(), time.Duration(monotonicRaw.Nano())*time.Nanosecond, linebuf[:n])
+		if p == nil {
+			continue
+		}
+		e := &entry{
+			origin:  publisher.node.dn,
+			leveled: p,
+		}
+		publisher.node.tree.journal.append(e)
+		publisher.node.tree.journal.notify(e)
+	}
+}
+
+// See https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg for format.
+func parseKmsg(now time.Time, monotonicSinceBoot time.Duration, data []byte) *LeveledPayload {
+	meta, message, ok := bytes.Cut(data, []byte(";"))
+	if !ok {
+		// Unknown message format
+		return nil
+	}
+	endOfMsgIdx := bytes.IndexByte(message, '\n')
+	if endOfMsgIdx == -1 {
+		return nil
+	}
+	message = message[:endOfMsgIdx]
+	metaFields := strings.FieldsFunc(string(meta), func(r rune) bool { return r == ',' })
+	if len(metaFields) < 4 {
+		return nil
+	}
+	loglevel, err := strconv.ParseUint(metaFields[0], 10, 64)
+	if err != nil {
+		return nil
+	}
+
+	monotonicMicro, err := strconv.ParseUint(metaFields[2], 10, 64)
+	if err != nil {
+		return nil
+	}
+
+	// Kmsg entries are timestamped with CLOCK_MONOTONIC_RAW, a clock which does
+	// not have a direct correspondence with civil time (UTC). To assign best-
+	// effort timestamps, use the current monotonic clock reading to determine
+	// the elapsed time between the kmsg entry and now on the monotonic clock.
+	// This does not correspond well to elapsed UTC time on longer timescales as
+	// CLOCK_MONOTONIC_RAW is not trimmed to run true to UTC, but up to in the
+	// order of hours this is close. As the pipe generally processes messages
+	// very close to their creation date, the elapsed time and thus the accrued
+	// error is extremely small.
+	monotonic := time.Duration(monotonicMicro) * time.Microsecond
+
+	monotonicFromNow := monotonic - monotonicSinceBoot
+
+	var severity Severity
+	switch loglevel {
+	case loglevelEmergency, loglevelAlert:
+		severity = FATAL
+	case loglevelCritical, loglevelError:
+		severity = ERROR
+	case loglevelWarning:
+		severity = WARNING
+	case loglevelNotice, loglevelInfo, loglevelDebug:
+		severity = INFO
+	default:
+		severity = INFO
+	}
+
+	return &LeveledPayload{
+		timestamp: now.Add(monotonicFromNow),
+		severity:  severity,
+		messages:  []string{string(message)},
+	}
+}
diff --git a/metropolis/pkg/logtree/kmsg_test.go b/metropolis/pkg/logtree/kmsg_test.go
new file mode 100644
index 0000000..e2faf82
--- /dev/null
+++ b/metropolis/pkg/logtree/kmsg_test.go
@@ -0,0 +1,43 @@
+//go:build linux
+// +build linux
+
+package logtree
+
+import (
+	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+)
+
+func TestParseKmsg(t *testing.T) {
+	now := time.Unix(1691593045, 128027944)
+	nowMonotonic := time.Duration(1501096434537722)
+
+	for i, te := range []struct {
+		line string
+		want *LeveledPayload
+	}{
+		// Empty line
+		{"", nil},
+		// Unknown format
+		{"Not a valid line", nil},
+		// Normal entry
+		{"6,30962,1501094342185,-;test\n", &LeveledPayload{
+			messages:  []string{"test"},
+			timestamp: time.Date(2023, 8, 9, 14, 57, 23, 35675222, time.UTC),
+			severity:  INFO,
+		}},
+		// With metadata and different severity
+		{"4,30951,1486884175312,-;nvme nvme2: starting error recovery\n SUBSYSTEM=nvme\n DEVICE=c239:2\n", &LeveledPayload{
+			messages:  []string{"nvme nvme2: starting error recovery"},
+			timestamp: time.Date(2023, 8, 9, 11, 00, 32, 868802222, time.UTC),
+			severity:  WARNING,
+		}},
+	} {
+		got := parseKmsg(now, nowMonotonic, []byte(te.line))
+		if diff := cmp.Diff(te.want, got, cmp.AllowUnexported(LeveledPayload{})); diff != "" {
+			t.Errorf("%d: mismatch (-want +got):\n%s", i, diff)
+		}
+	}
+}