m/p/logtree: add kmsg pipe
This allows ingesting Linux kernel (kmsg) logs into logtree with
the original metadata (timestamp, severity) preserved.
Change-Id: Ibb6e3a7a0ae4a008b8e9c98beccb3a95c067cb75
Reviewed-on: https://review.monogon.dev/c/monogon/+/2044
Reviewed-by: Leopold Schabel <leo@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/pkg/logtree/kmsg.go b/metropolis/pkg/logtree/kmsg.go
new file mode 100644
index 0000000..03bb6ff
--- /dev/null
+++ b/metropolis/pkg/logtree/kmsg.go
@@ -0,0 +1,141 @@
+//go:build linux
+// +build linux
+
+package logtree
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+
+ "golang.org/x/sys/unix"
+)
+
+const (
+ loglevelEmergency = 0
+ loglevelAlert = 1
+ loglevelCritical = 2
+ loglevelError = 3
+ loglevelWarning = 4
+ loglevelNotice = 5
+ loglevelInfo = 6
+ loglevelDebug = 7
+)
+
+// KmsgPipe pipes logs from the kernel kmsg interface at /dev/kmsg into the
+// given logger.
+func KmsgPipe(ctx context.Context, lt LeveledLogger) error {
+ publisher, ok := lt.(*leveledPublisher)
+ if !ok {
+ // Fail fast, as this is a programming error.
+ panic("Expected *leveledPublisher in LeveledLogger from supervisor")
+ }
+ kmsgFile, err := os.Open("/dev/kmsg")
+ if err != nil {
+ return err
+ }
+ defer kmsgFile.Close()
+ var lastOverflow time.Time
+ // PRINTK_MESSAGE_MAX in @linux//kernel/printk:internal.h
+ linebuf := make([]byte, 2048)
+ for {
+ n, err := kmsgFile.Read(linebuf)
+ // Best-effort, in Go it is not possible to cancel a Read on-demand.
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+ if errors.Is(err, unix.EPIPE) {
+ now := time.Now()
+ // Rate-limit to 1 per second
+ if lastOverflow.Add(1 * time.Second).Before(now) {
+ lt.Warning("Lost messages due to kernel ring buffer overflow")
+ lastOverflow = now
+ }
+ continue
+ }
+ if err != nil {
+ return fmt.Errorf("while reading from kmsg: %w", err)
+ }
+ var monotonicRaw unix.Timespec
+ if err := unix.ClockGettime(unix.CLOCK_MONOTONIC_RAW, &monotonicRaw); err != nil {
+ return fmt.Errorf("while getting monotonic timestamp: %w", err)
+ }
+ p := parseKmsg(time.Now(), time.Duration(monotonicRaw.Nano())*time.Nanosecond, linebuf[:n])
+ if p == nil {
+ continue
+ }
+ e := &entry{
+ origin: publisher.node.dn,
+ leveled: p,
+ }
+ publisher.node.tree.journal.append(e)
+ publisher.node.tree.journal.notify(e)
+ }
+}
+
+// See https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg for format.
+func parseKmsg(now time.Time, monotonicSinceBoot time.Duration, data []byte) *LeveledPayload {
+ meta, message, ok := bytes.Cut(data, []byte(";"))
+ if !ok {
+ // Unknown message format
+ return nil
+ }
+ endOfMsgIdx := bytes.IndexByte(message, '\n')
+ if endOfMsgIdx == -1 {
+ return nil
+ }
+ message = message[:endOfMsgIdx]
+ metaFields := strings.FieldsFunc(string(meta), func(r rune) bool { return r == ',' })
+ if len(metaFields) < 4 {
+ return nil
+ }
+ loglevel, err := strconv.ParseUint(metaFields[0], 10, 64)
+ if err != nil {
+ return nil
+ }
+
+ monotonicMicro, err := strconv.ParseUint(metaFields[2], 10, 64)
+ if err != nil {
+ return nil
+ }
+
+ // Kmsg entries are timestamped with CLOCK_MONOTONIC_RAW, a clock which does
+ // not have a direct correspondence with civil time (UTC). To assign best-
+ // effort timestamps, use the current monotonic clock reading to determine
+ // the elapsed time between the kmsg entry and now on the monotonic clock.
+ // This does not correspond well to elapsed UTC time on longer timescales as
+ // CLOCK_MONOTONIC_RAW is not trimmed to run true to UTC, but up to in the
+ // order of hours this is close. As the pipe generally processes messages
+ // very close to their creation date, the elapsed time and thus the accrued
+ // error is extremely small.
+ monotonic := time.Duration(monotonicMicro) * time.Microsecond
+
+ monotonicFromNow := monotonic - monotonicSinceBoot
+
+ var severity Severity
+ switch loglevel {
+ case loglevelEmergency, loglevelAlert:
+ severity = FATAL
+ case loglevelCritical, loglevelError:
+ severity = ERROR
+ case loglevelWarning:
+ severity = WARNING
+ case loglevelNotice, loglevelInfo, loglevelDebug:
+ severity = INFO
+ default:
+ severity = INFO
+ }
+
+ return &LeveledPayload{
+ timestamp: now.Add(monotonicFromNow),
+ severity: severity,
+ messages: []string{string(message)},
+ }
+}