m/pkg/logtree: implement klog parsing
This adds logtree.KLogParser, a shim which parses klog/glog-formatted
lines into logtree leveled logging.
This will be used to consume logs from external components (like
Kubernetes services) into leveled logging inside logtree.
An alternative would be to switch all Kubernetes components to
'structured' (JSON) logging - but that seems to still be experimental,
and does not exactly map into something that we can log further. Maybe
in the future we can switch over, and also copy these over into our own
binary/structured logging.
Test Plan: Adds unit tests for parsing, which is the most tricky part.
X-Origin-Diff: phab/D715
GitOrigin-RevId: 9994d819f15c9542800d488f57c83ab945a35d34
diff --git a/metropolis/pkg/logtree/BUILD.bazel b/metropolis/pkg/logtree/BUILD.bazel
index 5573807..d325e42 100644
--- a/metropolis/pkg/logtree/BUILD.bazel
+++ b/metropolis/pkg/logtree/BUILD.bazel
@@ -7,6 +7,7 @@
"journal.go",
"journal_entry.go",
"journal_subscriber.go",
+ "klog.go",
"leveled.go",
"leveled_payload.go",
"logtree.go",
@@ -26,7 +27,9 @@
name = "go_default_test",
srcs = [
"journal_test.go",
+ "klog_test.go",
"logtree_test.go",
],
embed = [":go_default_library"],
+ deps = ["@com_github_google_go_cmp//cmp:go_default_library"],
)
diff --git a/metropolis/pkg/logtree/klog.go b/metropolis/pkg/logtree/klog.go
new file mode 100644
index 0000000..8755286
--- /dev/null
+++ b/metropolis/pkg/logtree/klog.go
@@ -0,0 +1,211 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "fmt"
+ "io"
+ "regexp"
+ "source.monogon.dev/metropolis/pkg/logbuffer"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// KLogParser returns an io.WriteCloser to which raw logging from a klog emitter
+// can be piped. It will attempt to parse all lines from this log as
+// glog/klog-style entries, and pass them over to a LeveledLogger as if they were
+// emitted locally.
+//
+// This allows for piping in external processes that emit klog logging into a
+// logtree, leading to niceties like transparently exposing the log severity or
+// source file/line.
+//
+// One caveat, however, is that V-leveled logs will not be translated
+// appropriately - anything that the klog-emitter pumps out as Info will be
+// directly ingested as Info logging. There is no way to work around this.
+//
+// Another important limitation is that any written line is interpreted as having
+// happened recently (ie. within one hour of the time of execution of this
+// function). This is important as klog/glog-formatted loglines don't have a year
+// attached, so we have to infer it based on the current timestamp (note: parsed
+// lines do not necessarily have their year aleays equal to the current year, as
+// the code handles the edge case of parsing a line from the end of a previous
+// year at the beginning of the next).
+func KLogParser(logger LeveledLogger) io.WriteCloser {
+ n, ok := logger.(*node)
+ if !ok {
+ // Fail fast, as this is a programming error.
+ panic("Expected *node in LeveledLogger from supervisor")
+ }
+
+ k := &klogParser{
+ n: n,
+ }
+ // klog seems to have no line length limit. Let's assume some sane sort of default.
+ k.buffer = logbuffer.NewLineBuffer(1024, k.consumeLine)
+ return k
+}
+
+
+type klogParser struct {
+ n *node
+ buffer *logbuffer.LineBuffer
+}
+
+func (k *klogParser) Write(p []byte) (n int, err error) {
+ return k.buffer.Write(p)
+}
+
+// Close must be called exactly once after the parser is done being used. It will
+// pipe any leftover data in its write buffer as one last line to parse.
+func (k *klogParser) Close() error {
+ return k.buffer.Close()
+}
+
+// consumeLine is called by the internal LineBuffer any time a new line is fully
+// written.
+func (k *klogParser) consumeLine(l *logbuffer.Line) {
+ p := parse(time.Now(), l.Data)
+ if p == nil {
+ // We could instead emit that line as a raw log - however, this would lead to
+ // interleaving raw logging and leveled logging.
+ k.n.Errorf("Invalid klog line: %s", l.Data)
+ }
+ // TODO(q3k): should this be exposed as an API on LeveledLogger? How much should
+ // we permit library users to 'fake' logs? This would also permit us to get rid
+ // of the type assertion in KLogParser().
+ e := &entry{
+ origin: k.n.dn,
+ leveled: p,
+ }
+ k.n.tree.journal.append(e)
+ k.n.tree.journal.notify(e)
+}
+
+var (
+ // reKLog matches and parses klog/glog-formatted log lines.
+ // Format: I0312 14:20:04.240540 204 shared_informer.go:247] Caches are synced for attach detach
+ reKLog = regexp.MustCompile(`^([IEWF])(\d{4})\s+(\d{2}:\d{2}:\d{2}(\.\d+)?)\s+(\d+)\s+([^:]+):(\d+)]\s+(.+)$`)
+)
+
+// parse attempts to parse a klog-formatted line. Returns nil if the line
+// couldn't have been parsed successfully.
+func parse(now time.Time, s string) (*LeveledPayload) {
+ parts := reKLog.FindStringSubmatch(s)
+ if parts == nil {
+ return nil
+ }
+
+ severityS := parts[1]
+ date := parts[2]
+ timestamp := parts[3]
+ pid := parts[5]
+ file := parts[6]
+ lineS := parts[7]
+ message := parts[8]
+
+ var severity Severity
+ switch severityS {
+ case "I":
+ severity = INFO
+ case "W":
+ severity = WARNING
+ case "E":
+ severity = ERROR
+ case "F":
+ severity = FATAL
+ default:
+ return nil
+ }
+
+ // Possible race due to klog's/glog's format not containing a year.
+ // On 2020/12/31 at 23:59:59.99999 a klog logger emits this line:
+ //
+ // I1231 23:59:59.99999 1 example.go:10] It's almost 2021! Hooray.
+ //
+ // Then, if this library parses that line at 2021/01/01 00:00:00.00001, the
+ // time will be interpreted as:
+ //
+ // 2021/12/31 23:59:59
+ //
+ // So around one year in the future. We attempt to fix this case further down in
+ // this function.
+ year := now.Year()
+ ts, err := parseKLogTime(year, date, timestamp)
+ if err != nil {
+ return nil
+ }
+
+ // Attempt to fix the aforementioned year-in-the-future issue.
+ if ts.After(now) && ts.Sub(now) > time.Hour {
+ // Parsed timestamp is in the future. How close is it to One-Year-From-Now?
+ oyfn := now.Add(time.Hour * 24 * 365)
+ dOyfn := ts.Sub(oyfn)
+ // Let's make sure Duration-To-One-Year-From-Now is always positive. This
+ // simplifies the rest of the checks and papers over some possible edge cases.
+ if dOyfn < 0 {
+ dOyfn = -dOyfn
+ }
+
+ // Okay, is that very close? Then the issue above happened and we should
+ // attempt to reparse it with last year. We can't just manipulate the date we
+ // already have, as it's difficult to 'subtract one year'.
+ if dOyfn < (time.Hour * 24 * 2) {
+ ts, err = parseKLogTime(year-1, date, timestamp)
+ if err != nil {
+ return nil
+ }
+ } else {
+ // Otherwise, we received some seriously time traveling log entry. Abort.
+ return nil
+ }
+ }
+
+ line, err := strconv.Atoi(lineS)
+ if err != nil {
+ return nil
+ }
+
+ // The PID is discarded.
+ _ = pid
+
+ // Finally we have extracted all the data from the line. Inject into the log publisher.
+ return &LeveledPayload{
+ timestamp: ts,
+ severity: severity,
+ messages: []string{message},
+ file: file,
+ line: line,
+ }
+}
+
+// parseKLogTime parses a klog date and time (eg. "0314", "12:13:14.12345") into
+// a time.Time happening at a given year.
+func parseKLogTime(year int, d, t string) (time.Time, error) {
+ var layout string
+ if strings.Contains(t, ".") {
+ layout = "2006 0102 15:04:05.000000"
+ } else {
+ layout = "2006 0102 15:04:05"
+ }
+ // Make up a string that contains the current year. This permits us to parse
+ // fully into an actual timestamp.
+ // TODO(q3k): add a timezone? This currently behaves as UTC, which is probably
+ // what we want, but we should formalize this.
+ return time.Parse(layout, fmt.Sprintf("%d %s %s", year, d, t))
+}
diff --git a/metropolis/pkg/logtree/klog_test.go b/metropolis/pkg/logtree/klog_test.go
new file mode 100644
index 0000000..f163f16
--- /dev/null
+++ b/metropolis/pkg/logtree/klog_test.go
@@ -0,0 +1,82 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "testing"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+)
+
+func TestParse(t *testing.T) {
+ // Injected 'now'. Used to make these tests reproducible and to allow for
+ // testing the log-across-year edgecase.
+ // Fri 12 Mar 2021 03:46:26 PM UTC
+ now := time.Unix(1615563986, 123456789)
+ // Sat 01 Jan 2000 12:00:01 AM UTC
+ nowNewYear := time.Unix(946684801, 0)
+
+ for i, te := range []struct{
+ now time.Time
+ line string
+ want *LeveledPayload
+ }{
+ // 0: Simple case: everything should parse correctly.
+ { now, "E0312 14:20:04.240540 204 shared_informer.go:247] Caches are synced for attach detach", &LeveledPayload{
+ messages: []string{"Caches are synced for attach detach"},
+ timestamp: time.Date(2021, 03, 12, 14, 20, 4, 240540000, time.UTC),
+ severity: ERROR,
+ file: "shared_informer.go",
+ line: 247,
+ } },
+ // 1: Mumbling line, should fail.
+ { now, "Application starting up...", nil},
+ // 2: Empty line, should fail.
+ { now, "", nil},
+ // 3: Line from the future, should fail.
+ { now, "I1224 14:20:04.240540 204 john_titor.go:247] I'm sorry, what day is it today? Uuuh, and what year?", nil },
+ // 4: Log-across-year edge case. The log was emitted right before a year
+ // rollover, and parsed right after it. It should be attributed to the
+ // previous year.
+ { nowNewYear, "I1231 23:59:43.123456 123 fry.go:123] Here's to another lousy millenium!", &LeveledPayload{
+ messages: []string{"Here's to another lousy millenium!"},
+ timestamp: time.Date(1999, 12, 31, 23, 59, 43, 123456000, time.UTC),
+ severity: INFO,
+ file: "fry.go",
+ line: 123,
+ }},
+ // 5: Invalid severity, should fail.
+ { now, "D0312 14:20:04.240540 204 shared_informer.go:247] Caches are synced for attach detach", nil },
+ // 6: Invalid time, should fail.
+ { now, "D0312 25:20:04.240540 204 shared_informer.go:247] Caches are synced for attach detach", nil },
+ // 7: Simple case without sub-second timing: everything should parse correctly
+ { now, "E0312 14:20:04 204 shared_informer.go:247] Caches are synced for attach detach", &LeveledPayload{
+ messages: []string{"Caches are synced for attach detach"},
+ timestamp: time.Date(2021, 03, 12, 14, 20, 4, 0, time.UTC),
+ severity: ERROR,
+ file: "shared_informer.go",
+ line: 247,
+ } },
+ } {
+ got := parse(te.now, te.line)
+ if diff := cmp.Diff(te.want, got, cmp.AllowUnexported(LeveledPayload{})); diff != "" {
+ t.Errorf("%d: mismatch (-want +got):\n%s", i, diff)
+ }
+ }
+}
+