treewide: introduce osbase package and move things around
All except localregistry moved from metropolis/pkg to osbase,
localregistry moved to metropolis/test as its only used there anyway.
Change-Id: If1a4bf377364bef0ac23169e1b90379c71b06d72
Reviewed-on: https://review.monogon.dev/c/monogon/+/3079
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/osbase/logtree/BUILD.bazel b/osbase/logtree/BUILD.bazel
new file mode 100644
index 0000000..7c13aeb
--- /dev/null
+++ b/osbase/logtree/BUILD.bazel
@@ -0,0 +1,58 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "logtree",
+ srcs = [
+ "doc.go",
+ "grpc.go",
+ "journal.go",
+ "journal_entry.go",
+ "journal_subscriber.go",
+ "klog.go",
+ "kmsg.go",
+ "leveled.go",
+ "leveled_payload.go",
+ "logtree.go",
+ "logtree_access.go",
+ "logtree_entry.go",
+ "logtree_publisher.go",
+ "testhelpers.go",
+ "zap.go",
+ ],
+ # TODO(#189): move logtree to //go
+ importpath = "source.monogon.dev/osbase/logtree",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//osbase/logbuffer",
+ "//osbase/logtree/proto",
+ "@com_github_mitchellh_go_wordwrap//:go-wordwrap",
+ "@org_golang_google_grpc//grpclog",
+ "@org_golang_google_protobuf//types/known/timestamppb",
+ "@org_uber_go_zap//:zap",
+ "@org_uber_go_zap//zapcore",
+ ] + select({
+ "@io_bazel_rules_go//go/platform:android": [
+ "@org_golang_x_sys//unix",
+ ],
+ "@io_bazel_rules_go//go/platform:linux": [
+ "@org_golang_x_sys//unix",
+ ],
+ "//conditions:default": [],
+ }),
+)
+
+go_test(
+ name = "logtree_test",
+ srcs = [
+ "journal_test.go",
+ "klog_test.go",
+ "kmsg_test.go",
+ "logtree_test.go",
+ "zap_test.go",
+ ],
+ embed = [":logtree"],
+ deps = [
+ "@com_github_google_go_cmp//cmp",
+ "@org_uber_go_zap//:zap",
+ ],
+)
diff --git a/osbase/logtree/doc.go b/osbase/logtree/doc.go
new file mode 100644
index 0000000..ab3c537
--- /dev/null
+++ b/osbase/logtree/doc.go
@@ -0,0 +1,116 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/*
+Package logtree implements a tree-shaped logger for debug events. It provides log publishers (ie. Go code) with a
+glog-like API and io.Writer API, with loggers placed in a hierarchical structure defined by a dot-delimited path
+(called a DN, short for Distinguished Name).
+
+ tree.MustLeveledFor("foo.bar.baz").Warningf("Houston, we have a problem: %v", err)
+ fmt.Fprintf(tree.MustRawFor("foo.bar.baz"), "some\nunstructured\ndata\n")
+
+Logs in this context are unstructured, operational and developer-centric human readable text messages presented as lines
+of text to consumers, with some attached metadata. Logtree does not deal with 'structured' logs as some parts of the
+industry do, and instead defers any machine-readable logs to either be handled by metrics systems like Prometheus or
+event sourcing systems like Kafka.
+
+Tree Structure
+
+As an example, consider an application that produces logs with the following DNs:
+
+ listener.http
+ listener.grpc
+ svc
+ svc.cache
+ svc.cache.gc
+
+This would correspond to a tree as follows:
+
+ .------.
+ | "" |
+ | (root) |
+ '------'
+ .----------------' '------.
+ .--------------. .---------------.
+ | svc | | listener |
+ '--------------' '---------------'
+ | .----' '----.
+ .--------------. .---------------. .---------------.
+ | svc.cache | | listener.http | | listener.grpc |
+ '--------------' '---------------' '---------------'
+ |
+ .--------------.
+ | svc.cache.gc |
+ '--------------'
+
+In this setup, every DN acts as a separate logging target, each with its own retention policy and quota. Logging to a DN
+under foo.bar does NOT automatically log to foo - all tree mechanisms are applied on log access by consumers. Loggers
+are automatically created on first use, and importantly, can be created at any time, and will automatically be created
+if a sub-DN is created that requires a parent DN to exist first. Note, for instance, that a `listener` logging node was
+created even though the example application only logged to `listener.http` and `listener.grpc`.
+
+An implicit root node is always present in the tree, accessed by DN "" (an empty string). All other logger nodes are
+children (or transitive children) of the root node.
+
+Log consumers (application code that reads the log and passes them on to operators, or ships them off for aggregation in
+other systems) to select subtrees of logs for readout. In the example tree, a consumer could select to either read all
+logs of the entire tree, just a single DN (like svc), or a subtree (like everything under listener, ie. messages emitted
+to listener.http and listener.grpc).
+
+Leveled Log Producer API
+
+As part of the glog-like logging API available to producers, the following metadata is attached to emitted logs in
+addition to the DN of the logger to which the log entry was emitted:
+
+ - timestamp at which the entry was emitted
+ - a severity level (one of FATAL, ERROR, WARN or INFO)
+ - a source of the message (file name and line number)
+
+In addition, the logger mechanism supports a variable verbosity level (so-called 'V-logging') that can be set at every
+node of the tree. For more information about the producer-facing logging API, see the documentation of the LeveledLogger
+interface, which is the main interface exposed to log producers.
+
+If the submitted message contains newlines, it will be split accordingly into a single log entry that contains multiple
+string lines. This allows for log producers to submit long, multi-line messages that are guaranteed to be non-interleaved
+with other entries, and allows for access API consumers to maintain semantic linking between multiple lines being emitted
+as a single atomic entry.
+
+Raw Log Producer API
+
+In addition to leveled, glog-like logging, LogTree supports 'raw logging'. This is implemented as an io.Writer that will
+split incoming bytes into newline-delimited lines, and log them into that logtree's DN. This mechanism is primarily
+intended to support storage of unstructured log data from external processes - for example binaries running with redirected
+stdout/stderr.
+
+Log Access API
+
+The Log Access API is mostly exposed via a single function on the LogTree struct: Read. It allows access to log entries
+that have been already buffered inside LogTree and to subscribe to receive future entries over a channel. As outlined
+earlier, any access can specify whether it is just interested in a single logger (addressed by DN), or a subtree of
+loggers.
+
+Due to the current implementation of the logtree, subtree accesses of backlogged data is significantly slower than
+accessing data of just one DN, or the whole tree (as every subtree backlog access performs a scan on all logged data).
+Thus, log consumers should be aware that it is much better to stream and buffer logs specific to some long-standing
+logging request on their own, rather than repeatedly perform reads of a subtree backlog.
+
+The data returned from the log access API is a LogEntry, which itself can contain either a raw logging entry, or a leveled
+logging entry. Helper functions are available on LogEntry that allow canonical string representations to be returned, for
+easy use in consuming tools/interfaces. Alternatively, the consumer can itself access the internal raw/leveled entries and
+print them according to their own preferred format.
+
+*/
+package logtree
diff --git a/osbase/logtree/grpc.go b/osbase/logtree/grpc.go
new file mode 100644
index 0000000..3b2594d
--- /dev/null
+++ b/osbase/logtree/grpc.go
@@ -0,0 +1,75 @@
+package logtree
+
+import "google.golang.org/grpc/grpclog"
+
+// GRPCify turns a LeveledLogger into a go-grpc compatible logger.
+func GRPCify(logger LeveledLogger) grpclog.LoggerV2 {
+ lp, ok := logger.(*leveledPublisher)
+ if !ok {
+ // Fail fast, as this is a programming error.
+ panic("Expected *leveledPublisher in LeveledLogger from supervisor")
+ }
+
+ lp2 := *lp
+ lp2.depth += 1
+
+ return &leveledGRPCV2{
+ lp: &lp2,
+ }
+}
+
+type leveledGRPCV2 struct {
+ lp *leveledPublisher
+}
+
+func (g *leveledGRPCV2) Info(args ...interface{}) {
+ g.lp.Info(args...)
+}
+
+func (g *leveledGRPCV2) Infoln(args ...interface{}) {
+ g.lp.Info(args...)
+}
+
+func (g *leveledGRPCV2) Infof(format string, args ...interface{}) {
+ g.lp.Infof(format, args...)
+}
+
+func (g *leveledGRPCV2) Warning(args ...interface{}) {
+ g.lp.Warning(args...)
+}
+
+func (g *leveledGRPCV2) Warningln(args ...interface{}) {
+ g.lp.Warning(args...)
+}
+
+func (g *leveledGRPCV2) Warningf(format string, args ...interface{}) {
+ g.lp.Warningf(format, args...)
+}
+
+func (g *leveledGRPCV2) Error(args ...interface{}) {
+ g.lp.Error(args...)
+}
+
+func (g *leveledGRPCV2) Errorln(args ...interface{}) {
+ g.lp.Error(args...)
+}
+
+func (g *leveledGRPCV2) Errorf(format string, args ...interface{}) {
+ g.lp.Errorf(format, args...)
+}
+
+func (g *leveledGRPCV2) Fatal(args ...interface{}) {
+ g.lp.Fatal(args...)
+}
+
+func (g *leveledGRPCV2) Fatalln(args ...interface{}) {
+ g.lp.Fatal(args...)
+}
+
+func (g *leveledGRPCV2) Fatalf(format string, args ...interface{}) {
+ g.lp.Fatalf(format, args...)
+}
+
+func (g *leveledGRPCV2) V(l int) bool {
+ return g.lp.V(VerbosityLevel(l)).Enabled()
+}
diff --git a/osbase/logtree/journal.go b/osbase/logtree/journal.go
new file mode 100644
index 0000000..412c042
--- /dev/null
+++ b/osbase/logtree/journal.go
@@ -0,0 +1,319 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "errors"
+ "sort"
+ "strings"
+ "sync"
+)
+
+// DN is the Distinguished Name, a dot-delimited path used to address loggers
+// within a LogTree. For example, "foo.bar" designates the 'bar' logger node
+// under the 'foo' logger node under the root node of the logger. An empty
+// string is the root node of the tree.
+type DN string
+
+var (
+ ErrInvalidDN = errors.New("invalid DN")
+)
+
+// Path return the parts of a DN, ie. all the elements of the dot-delimited DN
+// path. For the root node, an empty list will be returned. An error will be
+// returned if the DN is invalid (contains empty parts, eg. `foo..bar`, `.foo`
+// or `foo.`.
+func (d DN) Path() ([]string, error) {
+ if d == "" {
+ return nil, nil
+ }
+ parts := strings.Split(string(d), ".")
+ for _, p := range parts {
+ if p == "" {
+ return nil, ErrInvalidDN
+ }
+ }
+ return parts, nil
+}
+
+// journal is the main log recording structure of logtree. It manages linked lists
+// containing the actual log entries, and implements scans across them. It does not
+// understand the hierarchical nature of logtree, and instead sees all entries as
+// part of a global linked list and a local linked list for a given DN.
+//
+// The global linked list is represented by the head/tail pointers in journal and
+// nextGlobal/prevGlobal pointers in entries. The local linked lists are
+// represented by heads[DN]/tails[DN] pointers in journal and nextLocal/prevLocal
+// pointers in entries:
+//
+// .------------. .------------. .------------.
+// | dn: A.B | | dn: Z | | dn: A.B |
+// | time: 1 | | time: 2 | | time: 3 |
+// |------------| |------------| |------------|
+// | nextGlobal :------->| nextGlobal :------->| nextGlobal :--> nil
+// nil <-: prevGlobal |<-------: prevGlobal |<-------| prevGlobal |
+// |------------| |------------| n |------------|
+// | nextLocal :---. n | nextLocal :->i .-->| nextLocal :--> nil
+// nil <-: prevLocal |<--: i<-: prevLocal | l :---| prevLocal |
+// '------------' | l '------------' | '------------'
+// ^ '----------------------' ^
+// | ^ |
+// | | |
+// ( head ) ( tails[Z] ) ( tail )
+// ( heads[A.B] ) ( heads[Z] ) ( tails[A.B] )
+type journal struct {
+ // mu locks the rest of the structure. It must be taken during any operation on the
+ // journal.
+ mu sync.RWMutex
+
+ // tail is the side of the global linked list that contains the newest log entry,
+ // ie. the one that has been pushed the most recently. It can be nil when no log
+ // entry has yet been pushed. The global linked list contains all log entries
+ // pushed to the journal.
+ tail *entry
+ // head is the side of the global linked list that contains the oldest log entry.
+ // It can be nil when no log entry has yet been pushed.
+ head *entry
+
+ // tails are the tail sides of a local linked list for a given DN, ie. the sides
+ // that contain the newest entry. They are nil if there are no log entries for that
+ // DN.
+ tails map[DN]*entry
+ // heads are the head sides of a local linked list for a given DN, ie. the sides
+ // that contain the oldest entry. They are nil if there are no log entries for that
+ // DN.
+ heads map[DN]*entry
+
+ // quota is a map from DN to quota structure, representing the quota policy of a
+ // particular DN-designated logger.
+ quota map[DN]*quota
+
+ // subscribers are observer to logs. New log entries get emitted to channels
+ // present in the subscriber structure, after filtering them through subscriber-
+ // provided filters (eg. to limit events to subtrees that interest that particular
+ // subscriber).
+ subscribers []*subscriber
+}
+
+// newJournal creates a new empty journal. All journals are independent from
+// eachother, and as such, all LogTrees are also independent.
+func newJournal() *journal {
+ return &journal{
+ tails: make(map[DN]*entry),
+ heads: make(map[DN]*entry),
+
+ quota: make(map[DN]*quota),
+ }
+}
+
+// filter is a predicate that returns true if a log subscriber or reader is
+// interested in a given log entry.
+type filter func(*entry) bool
+
+// filterAll returns a filter that accepts all log entries.
+func filterAll() filter {
+ return func(*entry) bool { return true }
+}
+
+// filterExact returns a filter that accepts only log entries at a given exact
+// DN. This filter should not be used in conjunction with journal.scanEntries
+// - instead, journal.getEntries should be used, as it is much faster.
+func filterExact(dn DN) filter {
+ return func(e *entry) bool {
+ return e.origin == dn
+ }
+}
+
+// filterSubtree returns a filter that accepts all log entries at a given DN and
+// sub-DNs. For example, filterSubtree at "foo.bar" would allow entries at
+// "foo.bar", "foo.bar.baz", but not "foo" or "foo.barr".
+func filterSubtree(root DN) filter {
+ if root == "" {
+ return filterAll()
+ }
+
+ rootParts := strings.Split(string(root), ".")
+ return func(e *entry) bool {
+ parts := strings.Split(string(e.origin), ".")
+ if len(parts) < len(rootParts) {
+ return false
+ }
+
+ for i, p := range rootParts {
+ if parts[i] != p {
+ return false
+ }
+ }
+
+ return true
+ }
+}
+
+// filterSeverity returns a filter that accepts log entries at a given severity
+// level or above. See the Severity type for more information about severity
+// levels.
+func filterSeverity(atLeast Severity) filter {
+ return func(e *entry) bool {
+ return e.leveled != nil && e.leveled.severity.AtLeast(atLeast)
+ }
+}
+
+func filterOnlyRaw(e *entry) bool {
+ return e.raw != nil
+}
+
+func filterOnlyLeveled(e *entry) bool {
+ return e.leveled != nil
+}
+
+// scanEntries does a linear scan through the global entry list and returns all
+// entries that match the given filters. If retrieving entries for an exact event,
+// getEntries should be used instead, as it will leverage DN-local linked lists to
+// retrieve them faster. journal.mu must be taken at R or RW level when calling
+// this function.
+func (j *journal) scanEntries(count int, filters ...filter) (res []*entry) {
+ cur := j.tail
+ for {
+ if cur == nil {
+ break
+ }
+
+ passed := true
+ for _, filter := range filters {
+ if !filter(cur) {
+ passed = false
+ break
+ }
+ }
+ if passed {
+ res = append(res, cur)
+ }
+ if count != BacklogAllAvailable && len(res) >= count {
+ break
+ }
+ cur = cur.prevGlobal
+ }
+
+ // Reverse entries back into chronological order.
+ sort.SliceStable(res, func(i, j int) bool {
+ return i > j
+ })
+ return
+}
+
+// getEntries returns all entries at a given DN. This is faster than a
+// scanEntries(filterExact), as it uses the special local linked list pointers to
+// traverse the journal. Additional filters can be passed to further limit the
+// entries returned, but a scan through this DN's local linked list will be
+// performed regardless. journal.mu must be taken at R or RW level when calling
+// this function.
+func (j *journal) getEntries(count int, exact DN, filters ...filter) (res []*entry) {
+ cur := j.tails[exact]
+ for {
+ if cur == nil {
+ break
+ }
+
+ passed := true
+ for _, filter := range filters {
+ if !filter(cur) {
+ passed = false
+ break
+ }
+ }
+ if passed {
+ res = append(res, cur)
+ }
+ if count != BacklogAllAvailable && len(res) >= count {
+ break
+ }
+ cur = cur.prevLocal
+ }
+
+ // Reverse entries back into chronological order.
+ sort.SliceStable(res, func(i, j int) bool {
+ return i > j
+ })
+ return
+}
+
+// Shorten returns a shortened version of this DN for constrained logging
+// environments like tty0 logging.
+//
+// If ShortenDictionary is given, it will be used to replace DN parts with
+// shorter equivalents. For example, with the dictionary:
+//
+// { "foobar": "foo", "manager": "mgr" }
+//
+// The DN some.foobar.logger will be turned into some.foo.logger before further
+// being processed by the shortening mechanism.
+//
+// The shortening rules applied are Metropolis-specific.
+func (d DN) Shorten(dict ShortenDictionary, maxLen int) string {
+ path, _ := d.Path()
+ // Apply DN part shortening rules.
+ if dict != nil {
+ for i, p := range path {
+ if sh, ok := dict[p]; ok {
+ path[i] = sh
+ }
+ }
+ }
+
+ // This generally shouldn't happen.
+ if len(path) == 0 {
+ return "?"
+ }
+
+ // Strip 'root.' prefix.
+ if len(path) > 1 && path[0] == "root" {
+ path = path[1:]
+ }
+
+ // Replace role.xxx.yyy.zzz with xxx.zzz - stripping everything between the role
+ // name and the last element of the path.
+ if path[0] == "role" && len(path) > 1 {
+ if len(path) == 2 {
+ path = path[1:]
+ } else {
+ path = []string{
+ path[1],
+ path[len(path)-1],
+ }
+ }
+ }
+
+ // Join back to be ' '-delimited, and ellipsize if too long.
+ s := strings.Join(path, " ")
+ if overflow := len(s) - maxLen; overflow > 0 {
+ s = "..." + s[overflow+3:]
+ }
+ return s
+}
+
+type ShortenDictionary map[string]string
+
+var MetropolisShortenDict = ShortenDictionary{
+ "controlplane": "cplane",
+ "map-cluster-membership": "map-membership",
+ "cluster-membership": "cluster",
+ "controller-manager": "controllers",
+ "networking": "net",
+ "network": "net",
+ "interfaces": "ifaces",
+ "kubernetes": "k8s",
+}
diff --git a/osbase/logtree/journal_entry.go b/osbase/logtree/journal_entry.go
new file mode 100644
index 0000000..547a711
--- /dev/null
+++ b/osbase/logtree/journal_entry.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import "source.monogon.dev/osbase/logbuffer"
+
+// entry is a journal entry, representing a single log event (encompassed in a
+// Payload) at a given DN. See the journal struct for more information about the
+// global/local linked lists.
+type entry struct {
+ // origin is the DN at which the log entry was recorded, or conversely, in which DN
+ // it will be available at.
+ origin DN
+ // journal is the parent journal of this entry. An entry can belong only to a
+ // single journal. This pointer is used to mutate the journal's head/tail pointers
+ // when unlinking an entry.
+ journal *journal
+ // leveled is the leveled log entry for this entry, if this log entry was emitted
+ // by leveled logging. Otherwise it is nil.
+ leveled *LeveledPayload
+ // raw is the raw log entry for this entry, if this log entry was emitted by raw
+ // logging. Otherwise it is nil.
+ raw *logbuffer.Line
+
+ // prevGlobal is the previous entry in the global linked list, or nil if this entry
+ // is the oldest entry in the global linked list.
+ prevGlobal *entry
+ // nextGlobal is the next entry in the global linked list, or nil if this entry is
+ // the newest entry in the global linked list.
+ nextGlobal *entry
+
+ // prevLocal is the previous entry in this entry DN's local linked list, or nil if
+ // this entry is the oldest entry in this local linked list.
+ prevLocal *entry
+ // prevLocal is the next entry in this entry DN's local linked list, or nil if this
+ // entry is the newest entry in this local linked list.
+ nextLocal *entry
+
+ // seqLocal is a counter within a local linked list that increases by one each time
+ // a new log entry is added. It is used to quickly establish local linked list
+ // sizes (by subtracting seqLocal from both ends). This setup allows for O(1)
+ // length calculation for local linked lists as long as entries are only unlinked
+ // from the head or tail (which is the case in the current implementation).
+ seqLocal uint64
+}
+
+// external returns a LogEntry object for this entry, ie. the public version of
+// this object, without fields relating to the parent journal, linked lists,
+// sequences, etc. These objects are visible to library consumers.
+func (e *entry) external() *LogEntry {
+ return &LogEntry{
+ DN: e.origin,
+ Leveled: e.leveled,
+ Raw: e.raw,
+ }
+}
+
+// unlink removes this entry from both global and local linked lists, updating the
+// journal's head/tail pointers if needed. journal.mu must be taken as RW
+func (e *entry) unlink() {
+ // Unlink from the global linked list.
+ if e.prevGlobal != nil {
+ e.prevGlobal.nextGlobal = e.nextGlobal
+ }
+ if e.nextGlobal != nil {
+ e.nextGlobal.prevGlobal = e.prevGlobal
+ }
+ // Update journal head/tail pointers.
+ if e.journal.head == e {
+ e.journal.head = e.nextGlobal
+ }
+ if e.journal.tail == e {
+ e.journal.tail = e.prevGlobal
+ }
+
+ // Unlink from the local linked list.
+ if e.prevLocal != nil {
+ e.prevLocal.nextLocal = e.nextLocal
+ }
+ if e.nextLocal != nil {
+ e.nextLocal.prevLocal = e.prevLocal
+ }
+ // Update journal head/tail pointers.
+ if e.journal.heads[e.origin] == e {
+ e.journal.heads[e.origin] = e.nextLocal
+ }
+ if e.journal.tails[e.origin] == e {
+ e.journal.tails[e.origin] = e.prevLocal
+ }
+}
+
+// quota describes the quota policy for logging at a given DN.
+type quota struct {
+ // origin is the exact DN that this quota applies to.
+ origin DN
+ // max is the maximum count of log entries permitted for this DN - ie, the maximum
+ // size of the local linked list.
+ max uint64
+}
+
+// append adds an entry at the head of the global and local linked lists.
+func (j *journal) append(e *entry) {
+ j.mu.Lock()
+ defer j.mu.Unlock()
+
+ e.journal = j
+
+ // Insert at head in global linked list, set pointers.
+ e.nextGlobal = nil
+ e.prevGlobal = j.tail
+ if j.tail != nil {
+ j.tail.nextGlobal = e
+ }
+ j.tail = e
+ if j.head == nil {
+ j.head = e
+ }
+
+ // Create quota if necessary.
+ if _, ok := j.quota[e.origin]; !ok {
+ j.quota[e.origin] = "a{origin: e.origin, max: 8192}
+ }
+
+ // Insert at head in local linked list, calculate seqLocal, set pointers.
+ e.nextLocal = nil
+ e.prevLocal = j.tails[e.origin]
+ if j.tails[e.origin] != nil {
+ j.tails[e.origin].nextLocal = e
+ e.seqLocal = e.prevLocal.seqLocal + 1
+ } else {
+ e.seqLocal = 0
+ }
+ j.tails[e.origin] = e
+ if j.heads[e.origin] == nil {
+ j.heads[e.origin] = e
+ }
+
+ // Apply quota to the local linked list that this entry got inserted to, ie. remove
+ // elements in excess of the quota.max count.
+ quota := j.quota[e.origin]
+ count := (j.tails[e.origin].seqLocal - j.heads[e.origin].seqLocal) + 1
+ if count > quota.max {
+ // Keep popping elements off the head of the local linked list until quota is not
+ // violated.
+ left := count - quota.max
+ cur := j.heads[e.origin]
+ for {
+ // This shouldn't happen if quota.max >= 1.
+ if cur == nil {
+ break
+ }
+ if left == 0 {
+ break
+ }
+ el := cur
+ cur = el.nextLocal
+ // Unlinking the entry unlinks it from both the global and local linked lists.
+ el.unlink()
+ left -= 1
+ }
+ }
+}
diff --git a/osbase/logtree/journal_subscriber.go b/osbase/logtree/journal_subscriber.go
new file mode 100644
index 0000000..dc9750f
--- /dev/null
+++ b/osbase/logtree/journal_subscriber.go
@@ -0,0 +1,72 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "sync/atomic"
+)
+
+// subscriber is an observer for new entries that are appended to the journal.
+type subscriber struct {
+ // filters that entries need to pass through in order to be sent to the subscriber.
+ filters []filter
+ // dataC is the channel to which entries that pass filters will be sent. The
+ // channel must be drained regularly in order to prevent accumulation of goroutines
+ // and possible reordering of messages.
+ dataC chan *LogEntry
+ // doneC is a channel that is closed once the subscriber wishes to stop receiving
+ // notifications.
+ doneC chan struct{}
+ // missed is the amount of messages missed by the subscriber by not receiving from
+ // dataC fast enough
+ missed uint64
+}
+
+// subscribe attaches a subscriber to the journal.
+// mu must be taken in W mode
+func (j *journal) subscribe(sub *subscriber) {
+ j.subscribers = append(j.subscribers, sub)
+}
+
+// notify sends an entry to all subscribers that wish to receive it.
+func (j *journal) notify(e *entry) {
+ j.mu.Lock()
+ defer j.mu.Unlock()
+
+ newSub := make([]*subscriber, 0, len(j.subscribers))
+ for _, sub := range j.subscribers {
+ select {
+ case <-sub.doneC:
+ close(sub.dataC)
+ continue
+ default:
+ newSub = append(newSub, sub)
+ }
+
+ for _, filter := range sub.filters {
+ if !filter(e) {
+ continue
+ }
+ }
+ select {
+ case sub.dataC <- e.external():
+ default:
+ atomic.AddUint64(&sub.missed, 1)
+ }
+ }
+ j.subscribers = newSub
+}
diff --git a/osbase/logtree/journal_test.go b/osbase/logtree/journal_test.go
new file mode 100644
index 0000000..e9fc3b4
--- /dev/null
+++ b/osbase/logtree/journal_test.go
@@ -0,0 +1,173 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+)
+
+func testPayload(msg string) *LeveledPayload {
+ return &LeveledPayload{
+ messages: []string{msg},
+ timestamp: time.Now(),
+ severity: INFO,
+ file: "main.go",
+ line: 1337,
+ }
+}
+
+func TestJournalRetention(t *testing.T) {
+ j := newJournal()
+
+ for i := 0; i < 9000; i += 1 {
+ e := &entry{
+ origin: "main",
+ leveled: testPayload(fmt.Sprintf("test %d", i)),
+ }
+ j.append(e)
+ }
+
+ entries := j.getEntries(BacklogAllAvailable, "main")
+ if want, got := 8192, len(entries); want != got {
+ t.Fatalf("wanted %d entries, got %d", want, got)
+ }
+ for i, entry := range entries {
+ want := fmt.Sprintf("test %d", (9000-8192)+i)
+ got := strings.Join(entry.leveled.messages, "\n")
+ if want != got {
+ t.Fatalf("wanted entry %q, got %q", want, got)
+ }
+ }
+}
+
+func TestJournalQuota(t *testing.T) {
+ j := newJournal()
+
+ for i := 0; i < 9000; i += 1 {
+ j.append(&entry{
+ origin: "chatty",
+ leveled: testPayload(fmt.Sprintf("chatty %d", i)),
+ })
+ if i%10 == 0 {
+ j.append(&entry{
+ origin: "solemn",
+ leveled: testPayload(fmt.Sprintf("solemn %d", i)),
+ })
+ }
+ }
+
+ entries := j.getEntries(BacklogAllAvailable, "chatty")
+ if want, got := 8192, len(entries); want != got {
+ t.Fatalf("wanted %d chatty entries, got %d", want, got)
+ }
+ entries = j.getEntries(BacklogAllAvailable, "solemn")
+ if want, got := 900, len(entries); want != got {
+ t.Fatalf("wanted %d solemn entries, got %d", want, got)
+ }
+ entries = j.getEntries(BacklogAllAvailable, "absent")
+ if want, got := 0, len(entries); want != got {
+ t.Fatalf("wanted %d absent entries, got %d", want, got)
+ }
+
+ entries = j.scanEntries(BacklogAllAvailable, filterAll())
+ if want, got := 8192+900, len(entries); want != got {
+ t.Fatalf("wanted %d total entries, got %d", want, got)
+ }
+ setMessages := make(map[string]bool)
+ for _, entry := range entries {
+ setMessages[strings.Join(entry.leveled.messages, "\n")] = true
+ }
+
+ for i := 0; i < 900; i += 1 {
+ want := fmt.Sprintf("solemn %d", i*10)
+ if !setMessages[want] {
+ t.Fatalf("could not find entry %q in journal", want)
+ }
+ }
+ for i := 0; i < 8192; i += 1 {
+ want := fmt.Sprintf("chatty %d", i+(9000-8192))
+ if !setMessages[want] {
+ t.Fatalf("could not find entry %q in journal", want)
+ }
+ }
+}
+
+func TestJournalSubtree(t *testing.T) {
+ j := newJournal()
+ j.append(&entry{origin: "a", leveled: testPayload("a")})
+ j.append(&entry{origin: "a.b", leveled: testPayload("a.b")})
+ j.append(&entry{origin: "a.b.c", leveled: testPayload("a.b.c")})
+ j.append(&entry{origin: "a.b.d", leveled: testPayload("a.b.d")})
+ j.append(&entry{origin: "e.f", leveled: testPayload("e.f")})
+ j.append(&entry{origin: "e.g", leveled: testPayload("e.g")})
+
+ expect := func(f filter, msgs ...string) string {
+ res := j.scanEntries(BacklogAllAvailable, f)
+ set := make(map[string]bool)
+ for _, entry := range res {
+ set[strings.Join(entry.leveled.messages, "\n")] = true
+ }
+
+ for _, want := range msgs {
+ if !set[want] {
+ return fmt.Sprintf("missing entry %q", want)
+ }
+ }
+ return ""
+ }
+
+ if res := expect(filterAll(), "a", "a.b", "a.b.c", "a.b.d", "e.f", "e.g"); res != "" {
+ t.Fatalf("All: %s", res)
+ }
+ if res := expect(filterSubtree("a"), "a", "a.b", "a.b.c", "a.b.d"); res != "" {
+ t.Fatalf("Subtree(a): %s", res)
+ }
+ if res := expect(filterSubtree("a.b"), "a.b", "a.b.c", "a.b.d"); res != "" {
+ t.Fatalf("Subtree(a.b): %s", res)
+ }
+ if res := expect(filterSubtree("e"), "e.f", "e.g"); res != "" {
+ t.Fatalf("Subtree(a.b): %s", res)
+ }
+}
+
+func TestDN_Shorten(t *testing.T) {
+ for i, te := range []struct {
+ input string
+ maxLen int
+ want string
+ }{
+ {"root.role.controlplane.launcher.consensus.autopromoter", 20, "cplane autopromoter"},
+ {"networking.interfaces", 20, "net ifaces"},
+ {"hostsfile", 20, "hostsfile"},
+ {"root.dhcp-server", 20, "dhcp-server"},
+ {"root.role.kubernetes.run.kubernetes.apiserver", 20, "k8s apiserver"},
+ {"some.very.long.dn.that.cant.be.shortened", 20, "...cant be shortened"},
+ {"network.interfaces.dhcp", 20, "net ifaces dhcp"},
+ } {
+ got := DN(te.input).Shorten(MetropolisShortenDict, te.maxLen)
+ if len(got) > te.maxLen {
+ t.Errorf("case %d: output %q too long, got %d bytes, wanted %d", i, got, len(got), te.maxLen)
+ } else {
+ if te.want != got {
+ t.Errorf("case %d: wanted %q, got %q", i, te.want, got)
+ }
+ }
+ }
+}
diff --git a/osbase/logtree/klog.go b/osbase/logtree/klog.go
new file mode 100644
index 0000000..ad7e162
--- /dev/null
+++ b/osbase/logtree/klog.go
@@ -0,0 +1,214 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "fmt"
+ "io"
+ "regexp"
+ "strconv"
+ "strings"
+ "time"
+
+ "source.monogon.dev/osbase/logbuffer"
+)
+
+// KLogParser returns an io.WriteCloser to which raw logging from a klog emitter
+// can be piped. It will attempt to parse all lines from this log as
+// glog/klog-style entries, and pass them over to a LeveledLogger as if they were
+// emitted locally.
+//
+// This allows for piping in external processes that emit klog logging into a
+// logtree, leading to niceties like transparently exposing the log severity or
+// source file/line.
+//
+// One caveat, however, is that V-leveled logs will not be translated
+// appropriately - anything that the klog-emitter pumps out as Info will be
+// directly ingested as Info logging. There is no way to work around this.
+//
+// Another important limitation is that any written line is interpreted as having
+// happened recently (ie. within one hour of the time of execution of this
+// function). This is important as klog/glog-formatted loglines don't have a year
+// attached, so we have to infer it based on the current timestamp (note: parsed
+// lines do not necessarily have their year aleays equal to the current year, as
+// the code handles the edge case of parsing a line from the end of a previous
+// year at the beginning of the next).
+func KLogParser(logger LeveledLogger) io.WriteCloser {
+ p, ok := logger.(*leveledPublisher)
+ if !ok {
+ // Fail fast, as this is a programming error.
+ panic("Expected *leveledPublisher in LeveledLogger from supervisor")
+ }
+
+ k := &klogParser{
+ publisher: p,
+ }
+ // klog seems to have no line length limit. Let's assume some sane sort of default.
+ k.buffer = logbuffer.NewLineBuffer(1024, k.consumeLine)
+ return k
+}
+
+type klogParser struct {
+ publisher *leveledPublisher
+ buffer *logbuffer.LineBuffer
+}
+
+func (k *klogParser) Write(p []byte) (n int, err error) {
+ return k.buffer.Write(p)
+}
+
+// Close must be called exactly once after the parser is done being used. It will
+// pipe any leftover data in its write buffer as one last line to parse.
+func (k *klogParser) Close() error {
+ return k.buffer.Close()
+}
+
+// consumeLine is called by the internal LineBuffer any time a new line is fully
+// written.
+func (k *klogParser) consumeLine(l *logbuffer.Line) {
+ p := parse(time.Now(), l.Data)
+ if p == nil {
+ // We could instead emit that line as a raw log - however, this would lead to
+ // interleaving raw logging and leveled logging.
+ k.publisher.Errorf("Invalid klog line: %s", l.Data)
+ return
+ }
+ // TODO(q3k): should this be exposed as an API on LeveledLogger? How much should
+ // we permit library users to 'fake' logs? This would also permit us to get rid
+ // of the type assertion in KLogParser().
+ e := &entry{
+ origin: k.publisher.node.dn,
+ leveled: p,
+ }
+ k.publisher.node.tree.journal.append(e)
+ k.publisher.node.tree.journal.notify(e)
+}
+
+var (
+ // reKLog matches and parses klog/glog-formatted log lines. Format: I0312
+ // 14:20:04.240540 204 shared_informer.go:247] Caches are synced for attach
+ // detach
+ reKLog = regexp.MustCompile(`^([IEWF])(\d{4})\s+(\d{2}:\d{2}:\d{2}(\.\d+)?)\s+(\d+)\s+([^:]+):(\d+)]\s+(.+)$`)
+)
+
+// parse attempts to parse a klog-formatted line. Returns nil if the line
+// couldn't have been parsed successfully.
+func parse(now time.Time, s string) *LeveledPayload {
+ parts := reKLog.FindStringSubmatch(s)
+ if parts == nil {
+ return nil
+ }
+
+ severityS := parts[1]
+ date := parts[2]
+ timestamp := parts[3]
+ pid := parts[5]
+ file := parts[6]
+ lineS := parts[7]
+ message := parts[8]
+
+ var severity Severity
+ switch severityS {
+ case "I":
+ severity = INFO
+ case "W":
+ severity = WARNING
+ case "E":
+ severity = ERROR
+ case "F":
+ severity = FATAL
+ default:
+ return nil
+ }
+
+ // Possible race due to klog's/glog's format not containing a year.
+ // On 2020/12/31 at 23:59:59.99999 a klog logger emits this line:
+ //
+ // I1231 23:59:59.99999 1 example.go:10] It's almost 2021! Hooray.
+ //
+ // Then, if this library parses that line at 2021/01/01 00:00:00.00001, the
+ // time will be interpreted as:
+ //
+ // 2021/12/31 23:59:59
+ //
+ // So around one year in the future. We attempt to fix this case further down in
+ // this function.
+ year := now.Year()
+ ts, err := parseKLogTime(year, date, timestamp)
+ if err != nil {
+ return nil
+ }
+
+ // Attempt to fix the aforementioned year-in-the-future issue.
+ if ts.After(now) && ts.Sub(now) > time.Hour {
+ // Parsed timestamp is in the future. How close is it to One-Year-From-Now?
+ oyfn := now.Add(time.Hour * 24 * 365)
+ dOyfn := ts.Sub(oyfn)
+ // Let's make sure Duration-To-One-Year-From-Now is always positive. This
+ // simplifies the rest of the checks and papers over some possible edge cases.
+ if dOyfn < 0 {
+ dOyfn = -dOyfn
+ }
+
+ // Okay, is that very close? Then the issue above happened and we should
+ // attempt to reparse it with last year. We can't just manipulate the date we
+ // already have, as it's difficult to 'subtract one year'.
+ if dOyfn < (time.Hour * 24 * 2) {
+ ts, err = parseKLogTime(year-1, date, timestamp)
+ if err != nil {
+ return nil
+ }
+ } else {
+ // Otherwise, we received some seriously time traveling log entry. Abort.
+ return nil
+ }
+ }
+
+ line, err := strconv.Atoi(lineS)
+ if err != nil {
+ return nil
+ }
+
+ // The PID is discarded.
+ _ = pid
+
+ // Finally we have extracted all the data from the line. Inject into the log
+ // publisher.
+ return &LeveledPayload{
+ timestamp: ts,
+ severity: severity,
+ messages: []string{message},
+ file: file,
+ line: line,
+ }
+}
+
+// parseKLogTime parses a klog date and time (eg. "0314", "12:13:14.12345") into
+// a time.Time happening at a given year.
+func parseKLogTime(year int, d, t string) (time.Time, error) {
+ var layout string
+ if strings.Contains(t, ".") {
+ layout = "2006 0102 15:04:05.000000"
+ } else {
+ layout = "2006 0102 15:04:05"
+ }
+ // Make up a string that contains the current year. This permits us to parse
+ // fully into an actual timestamp.
+ // TODO(q3k): add a timezone? This currently behaves as UTC, which is probably
+ // what we want, but we should formalize this.
+ return time.Parse(layout, fmt.Sprintf("%d %s %s", year, d, t))
+}
diff --git a/osbase/logtree/klog_test.go b/osbase/logtree/klog_test.go
new file mode 100644
index 0000000..d53df3f
--- /dev/null
+++ b/osbase/logtree/klog_test.go
@@ -0,0 +1,81 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "testing"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+)
+
+func TestParse(t *testing.T) {
+ // Injected 'now'. Used to make these tests reproducible and to allow for
+ // testing the log-across-year edgecase.
+ // Fri 12 Mar 2021 03:46:26 PM UTC
+ now := time.Unix(1615563986, 123456789)
+ // Sat 01 Jan 2000 12:00:01 AM UTC
+ nowNewYear := time.Unix(946684801, 0)
+
+ for i, te := range []struct {
+ now time.Time
+ line string
+ want *LeveledPayload
+ }{
+ // 0: Simple case: everything should parse correctly.
+ {now, "E0312 14:20:04.240540 204 shared_informer.go:247] Caches are synced for attach detach", &LeveledPayload{
+ messages: []string{"Caches are synced for attach detach"},
+ timestamp: time.Date(2021, 03, 12, 14, 20, 4, 240540000, time.UTC),
+ severity: ERROR,
+ file: "shared_informer.go",
+ line: 247,
+ }},
+ // 1: Mumbling line, should fail.
+ {now, "Application starting up...", nil},
+ // 2: Empty line, should fail.
+ {now, "", nil},
+ // 3: Line from the future, should fail.
+ {now, "I1224 14:20:04.240540 204 john_titor.go:247] I'm sorry, what day is it today? Uuuh, and what year?", nil},
+ // 4: Log-across-year edge case. The log was emitted right before a year
+ // rollover, and parsed right after it. It should be attributed to the
+ // previous year.
+ {nowNewYear, "I1231 23:59:43.123456 123 fry.go:123] Here's to another lousy millenium!", &LeveledPayload{
+ messages: []string{"Here's to another lousy millenium!"},
+ timestamp: time.Date(1999, 12, 31, 23, 59, 43, 123456000, time.UTC),
+ severity: INFO,
+ file: "fry.go",
+ line: 123,
+ }},
+ // 5: Invalid severity, should fail.
+ {now, "D0312 14:20:04.240540 204 shared_informer.go:247] Caches are synced for attach detach", nil},
+ // 6: Invalid time, should fail.
+ {now, "D0312 25:20:04.240540 204 shared_informer.go:247] Caches are synced for attach detach", nil},
+ // 7: Simple case without sub-second timing: everything should parse correctly
+ {now, "E0312 14:20:04 204 shared_informer.go:247] Caches are synced for attach detach", &LeveledPayload{
+ messages: []string{"Caches are synced for attach detach"},
+ timestamp: time.Date(2021, 03, 12, 14, 20, 4, 0, time.UTC),
+ severity: ERROR,
+ file: "shared_informer.go",
+ line: 247,
+ }},
+ } {
+ got := parse(te.now, te.line)
+ if diff := cmp.Diff(te.want, got, cmp.AllowUnexported(LeveledPayload{})); diff != "" {
+ t.Errorf("%d: mismatch (-want +got):\n%s", i, diff)
+ }
+ }
+}
diff --git a/osbase/logtree/kmsg.go b/osbase/logtree/kmsg.go
new file mode 100644
index 0000000..03bb6ff
--- /dev/null
+++ b/osbase/logtree/kmsg.go
@@ -0,0 +1,141 @@
+//go:build linux
+// +build linux
+
+package logtree
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+
+ "golang.org/x/sys/unix"
+)
+
+const (
+ loglevelEmergency = 0
+ loglevelAlert = 1
+ loglevelCritical = 2
+ loglevelError = 3
+ loglevelWarning = 4
+ loglevelNotice = 5
+ loglevelInfo = 6
+ loglevelDebug = 7
+)
+
+// KmsgPipe pipes logs from the kernel kmsg interface at /dev/kmsg into the
+// given logger.
+func KmsgPipe(ctx context.Context, lt LeveledLogger) error {
+ publisher, ok := lt.(*leveledPublisher)
+ if !ok {
+ // Fail fast, as this is a programming error.
+ panic("Expected *leveledPublisher in LeveledLogger from supervisor")
+ }
+ kmsgFile, err := os.Open("/dev/kmsg")
+ if err != nil {
+ return err
+ }
+ defer kmsgFile.Close()
+ var lastOverflow time.Time
+ // PRINTK_MESSAGE_MAX in @linux//kernel/printk:internal.h
+ linebuf := make([]byte, 2048)
+ for {
+ n, err := kmsgFile.Read(linebuf)
+ // Best-effort, in Go it is not possible to cancel a Read on-demand.
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+ if errors.Is(err, unix.EPIPE) {
+ now := time.Now()
+ // Rate-limit to 1 per second
+ if lastOverflow.Add(1 * time.Second).Before(now) {
+ lt.Warning("Lost messages due to kernel ring buffer overflow")
+ lastOverflow = now
+ }
+ continue
+ }
+ if err != nil {
+ return fmt.Errorf("while reading from kmsg: %w", err)
+ }
+ var monotonicRaw unix.Timespec
+ if err := unix.ClockGettime(unix.CLOCK_MONOTONIC_RAW, &monotonicRaw); err != nil {
+ return fmt.Errorf("while getting monotonic timestamp: %w", err)
+ }
+ p := parseKmsg(time.Now(), time.Duration(monotonicRaw.Nano())*time.Nanosecond, linebuf[:n])
+ if p == nil {
+ continue
+ }
+ e := &entry{
+ origin: publisher.node.dn,
+ leveled: p,
+ }
+ publisher.node.tree.journal.append(e)
+ publisher.node.tree.journal.notify(e)
+ }
+}
+
+// See https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg for format.
+func parseKmsg(now time.Time, monotonicSinceBoot time.Duration, data []byte) *LeveledPayload {
+ meta, message, ok := bytes.Cut(data, []byte(";"))
+ if !ok {
+ // Unknown message format
+ return nil
+ }
+ endOfMsgIdx := bytes.IndexByte(message, '\n')
+ if endOfMsgIdx == -1 {
+ return nil
+ }
+ message = message[:endOfMsgIdx]
+ metaFields := strings.FieldsFunc(string(meta), func(r rune) bool { return r == ',' })
+ if len(metaFields) < 4 {
+ return nil
+ }
+ loglevel, err := strconv.ParseUint(metaFields[0], 10, 64)
+ if err != nil {
+ return nil
+ }
+
+ monotonicMicro, err := strconv.ParseUint(metaFields[2], 10, 64)
+ if err != nil {
+ return nil
+ }
+
+ // Kmsg entries are timestamped with CLOCK_MONOTONIC_RAW, a clock which does
+ // not have a direct correspondence with civil time (UTC). To assign best-
+ // effort timestamps, use the current monotonic clock reading to determine
+ // the elapsed time between the kmsg entry and now on the monotonic clock.
+ // This does not correspond well to elapsed UTC time on longer timescales as
+ // CLOCK_MONOTONIC_RAW is not trimmed to run true to UTC, but up to in the
+ // order of hours this is close. As the pipe generally processes messages
+ // very close to their creation date, the elapsed time and thus the accrued
+ // error is extremely small.
+ monotonic := time.Duration(monotonicMicro) * time.Microsecond
+
+ monotonicFromNow := monotonic - monotonicSinceBoot
+
+ var severity Severity
+ switch loglevel {
+ case loglevelEmergency, loglevelAlert:
+ severity = FATAL
+ case loglevelCritical, loglevelError:
+ severity = ERROR
+ case loglevelWarning:
+ severity = WARNING
+ case loglevelNotice, loglevelInfo, loglevelDebug:
+ severity = INFO
+ default:
+ severity = INFO
+ }
+
+ return &LeveledPayload{
+ timestamp: now.Add(monotonicFromNow),
+ severity: severity,
+ messages: []string{string(message)},
+ }
+}
diff --git a/osbase/logtree/kmsg_test.go b/osbase/logtree/kmsg_test.go
new file mode 100644
index 0000000..e2faf82
--- /dev/null
+++ b/osbase/logtree/kmsg_test.go
@@ -0,0 +1,43 @@
+//go:build linux
+// +build linux
+
+package logtree
+
+import (
+ "testing"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+)
+
+func TestParseKmsg(t *testing.T) {
+ now := time.Unix(1691593045, 128027944)
+ nowMonotonic := time.Duration(1501096434537722)
+
+ for i, te := range []struct {
+ line string
+ want *LeveledPayload
+ }{
+ // Empty line
+ {"", nil},
+ // Unknown format
+ {"Not a valid line", nil},
+ // Normal entry
+ {"6,30962,1501094342185,-;test\n", &LeveledPayload{
+ messages: []string{"test"},
+ timestamp: time.Date(2023, 8, 9, 14, 57, 23, 35675222, time.UTC),
+ severity: INFO,
+ }},
+ // With metadata and different severity
+ {"4,30951,1486884175312,-;nvme nvme2: starting error recovery\n SUBSYSTEM=nvme\n DEVICE=c239:2\n", &LeveledPayload{
+ messages: []string{"nvme nvme2: starting error recovery"},
+ timestamp: time.Date(2023, 8, 9, 11, 00, 32, 868802222, time.UTC),
+ severity: WARNING,
+ }},
+ } {
+ got := parseKmsg(now, nowMonotonic, []byte(te.line))
+ if diff := cmp.Diff(te.want, got, cmp.AllowUnexported(LeveledPayload{})); diff != "" {
+ t.Errorf("%d: mismatch (-want +got):\n%s", i, diff)
+ }
+ }
+}
diff --git a/osbase/logtree/leveled.go b/osbase/logtree/leveled.go
new file mode 100644
index 0000000..98699b8
--- /dev/null
+++ b/osbase/logtree/leveled.go
@@ -0,0 +1,175 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "fmt"
+
+ lpb "source.monogon.dev/osbase/logtree/proto"
+)
+
+// LeveledLogger is a generic interface for glog-style logging. There are four
+// hardcoded log severities, in increasing order: INFO, WARNING, ERROR, FATAL.
+// Logging at a certain severity level logs not only to consumers expecting data at
+// that severity level, but also all lower severity levels. For example, an ERROR
+// log will also be passed to consumers looking at INFO or WARNING logs.
+type LeveledLogger interface {
+ // Info logs at the INFO severity. Arguments are handled in the manner of
+ // fmt.Print, a terminating newline is added if missing.
+ Info(args ...interface{})
+ // Infof logs at the INFO severity. Arguments are handled in the manner of
+ // fmt.Printf, a terminating newline is added if missing.
+ Infof(format string, args ...interface{})
+
+ // Warning logs at the WARNING severity. Arguments are handled in the manner of
+ // fmt.Print, a terminating newline is added if missing.
+ Warning(args ...interface{})
+ // Warningf logs at the WARNING severity. Arguments are handled in the manner of
+ // fmt.Printf, a terminating newline is added if missing.
+ Warningf(format string, args ...interface{})
+
+ // Error logs at the ERROR severity. Arguments are handled in the manner of
+ // fmt.Print, a terminating newline is added if missing.
+ Error(args ...interface{})
+ // Errorf logs at the ERROR severity. Arguments are handled in the manner of
+ // fmt.Printf, a terminating newline is added if missing.
+ Errorf(format string, args ...interface{})
+
+ // Fatal logs at the FATAL severity and aborts the current program. Arguments are
+ // handled in the manner of fmt.Print, a terminating newline is added if missing.
+ Fatal(args ...interface{})
+ // Fatalf logs at the FATAL severity and aborts the current program. Arguments are
+ // handled in the manner of fmt.Printf, a terminating newline is added if missing.
+ Fatalf(format string, args ...interface{})
+
+ // V returns a VerboseLeveledLogger at a given verbosity level. These verbosity
+ // levels can be dynamically set and unset on a package-granular level by consumers
+ // of the LeveledLogger logs. The returned value represents whether logging at the
+ // given verbosity level was active at that time, and as such should not be a long-
+ // lived object in programs. This construct is further refered to as 'V-logs'.
+ V(level VerbosityLevel) VerboseLeveledLogger
+
+ // WithAddedStackDepth returns the same LeveledLogger, but adjusted with an
+ // additional 'extra stack depth' which will be used to skip a given number of
+ // stack/call frames when determining the location where the error originated.
+ // For example, WithStackDepth(1) will return a logger that will skip one
+ // stack/call frame. Then, with function foo() calling function helper() which
+ // in turns call l.Infof(), the log line will be emitted with the call site of
+ // helper() within foo(), instead of the default behaviour of logging the
+ // call site of Infof() within helper().
+ //
+ // This is useful for functions which somehow wrap loggers in helper functions,
+ // for example to expose a slightly different API.
+ WithAddedStackDepth(depth int) LeveledLogger
+}
+
+// VerbosityLevel is a verbosity level defined for V-logs. This can be changed
+// programmatically per Go package. When logging at a given VerbosityLevel V, the
+// current level must be equal or higher to V for the logs to be recorded.
+// Conversely, enabling a V-logging at a VerbosityLevel V also enables all logging
+// at lower levels [Int32Min .. (V-1)].
+type VerbosityLevel int32
+
+type VerboseLeveledLogger interface {
+ // Enabled returns if this level was enabled. If not enabled, all logging into this
+ // logger will be discarded immediately. Thus, Enabled() can be used to check the
+ // verbosity level before performing any logging:
+ // if l.V(3).Enabled() { l.Info("V3 is enabled") }
+ // or, in simple cases, the convenience function .Info can be used:
+ // l.V(3).Info("V3 is enabled")
+ // The second form is shorter and more convenient, but more expensive, as its
+ // arguments are always evaluated.
+ Enabled() bool
+ // Info is the equivalent of a LeveledLogger's Info call, guarded by whether this
+ // VerboseLeveledLogger is enabled.
+ Info(args ...interface{})
+ // Infof is the equivalent of a LeveledLogger's Infof call, guarded by whether this
+ // VerboseLeveledLogger is enabled.
+ Infof(format string, args ...interface{})
+}
+
+// Severity is one of the severities as described in LeveledLogger.
+type Severity string
+
+const (
+ INFO Severity = "I"
+ WARNING Severity = "W"
+ ERROR Severity = "E"
+ FATAL Severity = "F"
+)
+
+var (
+ // SeverityAtLeast maps a given severity to a list of severities that at that
+ // severity or higher. In other words, SeverityAtLeast[X] returns a list of
+ // severities that might be seen in a log at severity X.
+ SeverityAtLeast = map[Severity][]Severity{
+ INFO: {INFO, WARNING, ERROR, FATAL},
+ WARNING: {WARNING, ERROR, FATAL},
+ ERROR: {ERROR, FATAL},
+ FATAL: {FATAL},
+ }
+)
+
+func (s Severity) AtLeast(other Severity) bool {
+ for _, el := range SeverityAtLeast[other] {
+ if el == s {
+ return true
+ }
+ }
+ return false
+}
+
+// Valid returns whether true if this severity is one of the known levels
+// (INFO, WARNING, ERROR or FATAL), false otherwise.
+func (s Severity) Valid() bool {
+ switch s {
+ case INFO, WARNING, ERROR, FATAL:
+ return true
+ default:
+ return false
+ }
+}
+
+func SeverityFromProto(s lpb.LeveledLogSeverity) (Severity, error) {
+ switch s {
+ case lpb.LeveledLogSeverity_INFO:
+ return INFO, nil
+ case lpb.LeveledLogSeverity_WARNING:
+ return WARNING, nil
+ case lpb.LeveledLogSeverity_ERROR:
+ return ERROR, nil
+ case lpb.LeveledLogSeverity_FATAL:
+ return FATAL, nil
+ default:
+ return "", fmt.Errorf("unknown severity value %d", s)
+ }
+}
+
+func (s Severity) ToProto() lpb.LeveledLogSeverity {
+ switch s {
+ case INFO:
+ return lpb.LeveledLogSeverity_INFO
+ case WARNING:
+ return lpb.LeveledLogSeverity_WARNING
+ case ERROR:
+ return lpb.LeveledLogSeverity_ERROR
+ case FATAL:
+ return lpb.LeveledLogSeverity_FATAL
+ default:
+ return lpb.LeveledLogSeverity_INVALID
+ }
+}
diff --git a/osbase/logtree/leveled_payload.go b/osbase/logtree/leveled_payload.go
new file mode 100644
index 0000000..95b9d5c
--- /dev/null
+++ b/osbase/logtree/leveled_payload.go
@@ -0,0 +1,200 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+
+ tpb "google.golang.org/protobuf/types/known/timestamppb"
+
+ lpb "source.monogon.dev/osbase/logtree/proto"
+)
+
+// LeveledPayload is a log entry for leveled logs (as per leveled.go). It contains
+// the input to these calls (severity and message split into newline-delimited
+// messages) and additional metadata that would be usually seen in a text
+// representation of a leveled log entry.
+type LeveledPayload struct {
+ // messages is the list of messages contained in this payload. This list is built
+ // from splitting up the given message from the user by newline.
+ messages []string
+ // timestamp is the time at which this message was emitted.
+ timestamp time.Time
+ // severity is the leveled Severity at which this message was emitted.
+ severity Severity
+ // file is the filename of the caller that emitted this message.
+ file string
+ // line is the line number within the file of the caller that emitted this message.
+ line int
+}
+
+// String returns a canonical representation of this payload as a single string
+// prefixed with metadata. If the original message was logged with newlines, this
+// representation will also contain newlines, with each original message part
+// prefixed by the metadata. For an alternative call that will instead return a
+// canonical prefix and a list of lines in the message, see Strings().
+func (p *LeveledPayload) String() string {
+ prefix, lines := p.Strings()
+ res := make([]string, len(p.messages))
+ for i, line := range lines {
+ res[i] = fmt.Sprintf("%s%s", prefix, line)
+ }
+ return strings.Join(res, "\n")
+}
+
+// Strings returns the canonical representation of this payload split into a
+// prefix and all lines that were contained in the original message. This is
+// meant to be displayed to the user by showing the prefix before each line,
+// concatenated together - possibly in a table form with the prefixes all
+// unified with a rowspan- like mechanism.
+//
+// For example, this function can return:
+//
+// prefix = "I1102 17:20:06.921395 foo.go:42] "
+// lines = []string{"current tags:", " - one", " - two"}
+//
+// With this data, the result should be presented to users this way in text form:
+// I1102 17:20:06.921395 foo.go:42] current tags:
+// I1102 17:20:06.921395 foo.go:42] - one
+// I1102 17:20:06.921395 foo.go:42] - two
+//
+// Or, in a table layout:
+// .-----------------------------------------------------------.
+// | I1102 17:20:06.921395 0 foo.go:42] : current tags: |
+// | :------------------|
+// | : - one |
+// | :------------------|
+// | : - two |
+// '-----------------------------------------------------------'
+func (p *LeveledPayload) Strings() (prefix string, lines []string) {
+ _, month, day := p.timestamp.Date()
+ hour, minute, second := p.timestamp.Clock()
+ nsec := p.timestamp.Nanosecond() / 1000
+
+ // Same format as in glog, but without treadid.
+ // Lmmdd hh:mm:ss.uuuuuu file:line]
+ // TODO(q3k): rewrite this to printf-less code.
+ prefix = fmt.Sprintf("%s%02d%02d %02d:%02d:%02d.%06d %s:%d] ", p.severity, month, day, hour, minute, second, nsec, p.file, p.line)
+
+ lines = p.messages
+ return
+}
+
+// Messages returns the inner message lines of this entry, ie. what was passed
+// to the actual logging method, but split by newlines.
+func (p *LeveledPayload) Messages() []string { return p.messages }
+
+func (p *LeveledPayload) MessagesJoined() string { return strings.Join(p.messages, "\n") }
+
+// Timestamp returns the time at which this entry was logged.
+func (p *LeveledPayload) Timestamp() time.Time { return p.timestamp }
+
+// Location returns a string in the form of file_name:line_number that shows the
+// origin of the log entry in the program source.
+func (p *LeveledPayload) Location() string { return fmt.Sprintf("%s:%d", p.file, p.line) }
+
+// Severity returns the Severity with which this entry was logged.
+func (p *LeveledPayload) Severity() Severity { return p.severity }
+
+// Proto converts a LeveledPayload to protobuf format.
+func (p *LeveledPayload) Proto() *lpb.LogEntry_Leveled {
+ return &lpb.LogEntry_Leveled{
+ Lines: p.Messages(),
+ Timestamp: tpb.New(p.Timestamp()),
+ Severity: p.Severity().ToProto(),
+ Location: p.Location(),
+ }
+}
+
+// LeveledPayloadFromProto parses a protobuf message into the internal format.
+func LeveledPayloadFromProto(p *lpb.LogEntry_Leveled) (*LeveledPayload, error) {
+ severity, err := SeverityFromProto(p.Severity)
+ if err != nil {
+ return nil, fmt.Errorf("could not convert severity: %w", err)
+ }
+ parts := strings.Split(p.Location, ":")
+ if len(parts) != 2 {
+ return nil, fmt.Errorf("invalid location, must be two :-delimited parts, is %d parts", len(parts))
+ }
+ file := parts[0]
+ line, err := strconv.Atoi(parts[1])
+ if err != nil {
+ return nil, fmt.Errorf("invalid location line number: %w", err)
+ }
+ return &LeveledPayload{
+ messages: p.Lines,
+ timestamp: p.Timestamp.AsTime(),
+ severity: severity,
+ file: file,
+ line: line,
+ }, nil
+}
+
+// ExternalLeveledPayload is a LeveledPayload received from an external source,
+// eg. from parsing the logging output of third-party programs. It can be
+// converted into a LeveledPayload and inserted into a leveled logger, but will
+// be sanitized before that, ensuring that potentially buggy
+// emitters/converters do not end up polluting the leveled logger data.
+//
+// This type should be used only when inserting data from external systems, not
+// by code that just wishes to log things. In the future, data inserted this
+// way might be explicitly marked as tainted so operators can understand that
+// parts of this data might not give the same guarantees as the log entries
+// emitted by the native LeveledLogger API.
+type ExternalLeveledPayload struct {
+ // Log line. If any newlines are found, they will split the message into
+ // multiple messages within LeveledPayload. Empty messages are accepted
+ // verbatim.
+ Message string
+ // Timestamp when this payload was emitted according to its source. If not
+ // given, will default to the time of conversion to LeveledPayload.
+ Timestamp time.Time
+ // Log severity. If invalid or unset will default to INFO.
+ Severity Severity
+ // File name of originating code. Defaults to "unknown" if not set.
+ File string
+ // Line in File. Zero indicates the line is not known.
+ Line int
+}
+
+// sanitize the given ExternalLeveledPayload by creating a corresponding
+// LeveledPayload. The original object is unaltered.
+func (e *ExternalLeveledPayload) sanitize() *LeveledPayload {
+ l := &LeveledPayload{
+ messages: strings.Split(e.Message, "\n"),
+ timestamp: e.Timestamp,
+ severity: e.Severity,
+ file: e.File,
+ line: e.Line,
+ }
+ if l.timestamp.IsZero() {
+ l.timestamp = time.Now()
+ }
+ if !l.severity.Valid() {
+ l.severity = INFO
+ }
+ if l.file == "" {
+ l.file = "unknown"
+ }
+ if l.line < 0 {
+ l.line = 0
+ }
+ return l
+}
diff --git a/osbase/logtree/logtree.go b/osbase/logtree/logtree.go
new file mode 100644
index 0000000..c20681d
--- /dev/null
+++ b/osbase/logtree/logtree.go
@@ -0,0 +1,158 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+
+ "source.monogon.dev/osbase/logbuffer"
+)
+
+// LogTree is a tree-shaped logging system. For more information, see the package-
+// level documentation.
+type LogTree struct {
+ // journal is the tree's journal, storing all log data and managing subscribers.
+ journal *journal
+ // root is the root node of the actual tree of the log tree. The nodes contain per-
+ // DN configuration options, notably the current verbosity level of that DN.
+ root *node
+}
+
+func New() *LogTree {
+ lt := &LogTree{
+ journal: newJournal(),
+ }
+ lt.root = newNode(lt, "")
+ return lt
+}
+
+// node represents a given DN as a discrete 'logger'. It implements the
+// LeveledLogger interface for log publishing, entries from which it passes over to
+// the logtree's journal.
+type node struct {
+ // dn is the DN which this node represents (or "" if this is the root node).
+ dn DN
+ // tree is the LogTree to which this node belongs.
+ tree *LogTree
+ // verbosity is the current verbosity level of this DN/node, affecting .V(n)
+ // LeveledLogger calls
+ verbosity VerbosityLevel
+ rawLineBuffer *logbuffer.LineBuffer
+
+ // mu guards children.
+ mu sync.Mutex
+ // children is a map of DN-part to a children node in the logtree. A DN-part is a
+ // string representing a part of the DN between the deliming dots, as returned by
+ // DN.Path.
+ children map[string]*node
+}
+
+// newNode returns a node at a given DN in the LogTree - but doesn't set up the
+// LogTree to insert it accordingly.
+func newNode(tree *LogTree, dn DN) *node {
+ n := &node{
+ dn: dn,
+ tree: tree,
+ children: make(map[string]*node),
+ }
+ // TODO(q3k): make this limit configurable. If this happens, or the default (1024) gets changes, max chunk size
+ // calculations when serving the logs (eg. in NodeDebugService) must reflect this.
+ n.rawLineBuffer = logbuffer.NewLineBuffer(1024, n.logRaw)
+ return n
+}
+
+// nodeByDN returns the LogTree node corresponding to a given DN. If either the
+// node or some of its parents do not exist they will be created as needed.
+func (l *LogTree) nodeByDN(dn DN) (*node, error) {
+ traversal, err := newTraversal(dn)
+ if err != nil {
+ return nil, fmt.Errorf("traversal failed: %w", err)
+ }
+ return traversal.execute(l.root), nil
+}
+
+// nodeTraversal represents a request to traverse the LogTree in search of a given
+// node by DN.
+type nodeTraversal struct {
+ // want is the DN of the node's that requested to be found.
+ want DN
+ // current is the path already taken to find the node, in the form of DN parts. It
+ // starts out as want.Parts() and progresses to become empty as the traversal
+ // continues.
+ current []string
+ // left is the path that's still needed to be taken in order to find the node, in
+ // the form of DN parts. It starts out empty and progresses to become wants.Parts()
+ // as the traversal continues.
+ left []string
+}
+
+// next adjusts the traversal's current/left slices to the next element of the
+// traversal, returns the part that's now being looked for (or "" if the traveral
+// is done) and the full DN of the element that's being looked for.
+//
+// For example, a traversal of foo.bar.baz will cause .next() to return the
+// following on each invocation:
+// - part: foo, full: foo
+// - part: bar, full: foo.bar
+// - part: baz, full: foo.bar.baz
+// - part: "", full: foo.bar.baz
+func (t *nodeTraversal) next() (part string, full DN) {
+ if len(t.left) == 0 {
+ return "", t.want
+ }
+ part = t.left[0]
+ t.current = append(t.current, part)
+ t.left = t.left[1:]
+ full = DN(strings.Join(t.current, "."))
+ return
+}
+
+// newTraversal returns a nodeTraversal fora a given wanted DN.
+func newTraversal(dn DN) (*nodeTraversal, error) {
+ parts, err := dn.Path()
+ if err != nil {
+ return nil, err
+ }
+ return &nodeTraversal{
+ want: dn,
+ left: parts,
+ }, nil
+}
+
+// execute the traversal in order to find the node. This can only be called once
+// per traversal. Nodes will be created within the tree until the target node is
+// reached. Existing nodes will be reused. This is effectively an idempotent way of
+// accessing a node in the tree based on a traversal.
+func (t *nodeTraversal) execute(n *node) *node {
+ cur := n
+ for {
+ part, full := t.next()
+ if part == "" {
+ return cur
+ }
+
+ mu := &cur.mu
+ mu.Lock()
+ if _, ok := cur.children[part]; !ok {
+ cur.children[part] = newNode(n.tree, full)
+ }
+ cur = cur.children[part]
+ mu.Unlock()
+ }
+}
diff --git a/osbase/logtree/logtree_access.go b/osbase/logtree/logtree_access.go
new file mode 100644
index 0000000..b601ea4
--- /dev/null
+++ b/osbase/logtree/logtree_access.go
@@ -0,0 +1,189 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "errors"
+ "sync/atomic"
+)
+
+// LogReadOption describes options for the LogTree.Read call.
+type LogReadOption struct {
+ withChildren bool
+ withStream bool
+ withBacklog int
+ onlyLeveled bool
+ onlyRaw bool
+ leveledWithMinimumSeverity Severity
+}
+
+// WithChildren makes Read return/stream data for both a given DN and all its
+// children.
+func WithChildren() LogReadOption { return LogReadOption{withChildren: true} }
+
+// WithStream makes Read return a stream of data. This works alongside WithBacklog
+// to create a read-and-stream construct.
+func WithStream() LogReadOption { return LogReadOption{withStream: true} }
+
+// WithBacklog makes Read return already recorded log entries, up to count
+// elements.
+func WithBacklog(count int) LogReadOption { return LogReadOption{withBacklog: count} }
+
+// BacklogAllAvailable makes WithBacklog return all backlogged log data that
+// logtree possesses.
+const BacklogAllAvailable int = -1
+
+func OnlyRaw() LogReadOption { return LogReadOption{onlyRaw: true} }
+
+func OnlyLeveled() LogReadOption { return LogReadOption{onlyLeveled: true} }
+
+// LeveledWithMinimumSeverity makes Read return only log entries that are at least
+// at a given Severity. If only leveled entries are needed, OnlyLeveled must be
+// used. This is a no-op when OnlyRaw is used.
+func LeveledWithMinimumSeverity(s Severity) LogReadOption {
+ return LogReadOption{leveledWithMinimumSeverity: s}
+}
+
+// LogReader permits reading an already existing backlog of log entries and to
+// stream further ones.
+type LogReader struct {
+ // Backlog are the log entries already logged by LogTree. This will only be set if
+ // WithBacklog has been passed to Read.
+ Backlog []*LogEntry
+ // Stream is a channel of new entries as received live by LogTree. This will only
+ // be set if WithStream has been passed to Read. In this case, entries from this
+ // channel must be read as fast as possible by the consumer in order to prevent
+ // missing entries.
+ Stream <-chan *LogEntry
+ // done is channel used to signal (by closing) that the log consumer is not
+ // interested in more Stream data.
+ done chan<- struct{}
+ // missed is an atomic integer pointer that tells the subscriber how many messages
+ // in Stream they missed. This pointer is nil if no streaming has been requested.
+ missed *uint64
+}
+
+// Missed returns the amount of entries that were missed from Stream (as the
+// channel was not drained fast enough).
+func (l *LogReader) Missed() uint64 {
+ // No Stream.
+ if l.missed == nil {
+ return 0
+ }
+ return atomic.LoadUint64(l.missed)
+}
+
+// Close closes the LogReader's Stream. This must be called once the Reader does
+// not wish to receive streaming messages anymore.
+func (l *LogReader) Close() {
+ if l.done != nil {
+ close(l.done)
+ }
+}
+
+var (
+ ErrRawAndLeveled = errors.New("cannot return logs that are simultaneously OnlyRaw and OnlyLeveled")
+)
+
+// Read and/or stream entries from a LogTree. The returned LogReader is influenced
+// by the LogReadOptions passed, which influence whether the Read will return
+// existing entries, a stream, or both. In addition the options also dictate
+// whether only entries for that particular DN are returned, or for all sub-DNs as
+// well.
+func (l *LogTree) Read(dn DN, opts ...LogReadOption) (*LogReader, error) {
+ l.journal.mu.RLock()
+ defer l.journal.mu.RUnlock()
+
+ var backlog int
+ var stream bool
+ var recursive bool
+ var leveledSeverity Severity
+ var onlyRaw, onlyLeveled bool
+
+ for _, opt := range opts {
+ if opt.withBacklog > 0 || opt.withBacklog == BacklogAllAvailable {
+ backlog = opt.withBacklog
+ }
+ if opt.withStream {
+ stream = true
+ }
+ if opt.withChildren {
+ recursive = true
+ }
+ if opt.leveledWithMinimumSeverity != "" {
+ leveledSeverity = opt.leveledWithMinimumSeverity
+ }
+ if opt.onlyLeveled {
+ onlyLeveled = true
+ }
+ if opt.onlyRaw {
+ onlyRaw = true
+ }
+ }
+
+ if onlyLeveled && onlyRaw {
+ return nil, ErrRawAndLeveled
+ }
+
+ var filters []filter
+ if onlyLeveled {
+ filters = append(filters, filterOnlyLeveled)
+ }
+ if onlyRaw {
+ filters = append(filters, filterOnlyRaw)
+ }
+ if recursive {
+ filters = append(filters, filterSubtree(dn))
+ } else {
+ filters = append(filters, filterExact(dn))
+ }
+ if leveledSeverity != "" {
+ filters = append(filters, filterSeverity(leveledSeverity))
+ }
+
+ var entries []*entry
+ if backlog > 0 || backlog == BacklogAllAvailable {
+ if recursive {
+ entries = l.journal.scanEntries(backlog, filters...)
+ } else {
+ entries = l.journal.getEntries(backlog, dn, filters...)
+ }
+ }
+
+ var sub *subscriber
+ if stream {
+ sub = &subscriber{
+ // TODO(q3k): make buffer size configurable
+ dataC: make(chan *LogEntry, 128),
+ doneC: make(chan struct{}),
+ filters: filters,
+ }
+ l.journal.subscribe(sub)
+ }
+
+ lr := &LogReader{}
+ lr.Backlog = make([]*LogEntry, len(entries))
+ for i, entry := range entries {
+ lr.Backlog[i] = entry.external()
+ }
+ if stream {
+ lr.Stream = sub.dataC
+ lr.done = sub.doneC
+ lr.missed = &sub.missed
+ }
+ return lr, nil
+}
diff --git a/osbase/logtree/logtree_entry.go b/osbase/logtree/logtree_entry.go
new file mode 100644
index 0000000..5fcb392
--- /dev/null
+++ b/osbase/logtree/logtree_entry.go
@@ -0,0 +1,259 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/mitchellh/go-wordwrap"
+
+ "source.monogon.dev/osbase/logbuffer"
+ lpb "source.monogon.dev/osbase/logtree/proto"
+)
+
+// LogEntry contains a log entry, combining both leveled and raw logging into a
+// single stream of events. A LogEntry will contain exactly one of either
+// LeveledPayload or RawPayload.
+type LogEntry struct {
+ // If non-nil, this is a leveled logging entry.
+ Leveled *LeveledPayload
+ // If non-nil, this is a raw logging entry line.
+ Raw *logbuffer.Line
+ // DN from which this entry was logged.
+ DN DN
+}
+
+// String returns a canonical representation of this payload as a single string
+// prefixed with metadata. If the entry is a leveled log entry that originally was
+// logged with newlines this representation will also contain newlines, with each
+// original message part prefixed by the metadata. For an alternative call that
+// will instead return a canonical prefix and a list of lines in the message, see
+// Strings().
+func (l *LogEntry) String() string {
+ if l.Leveled != nil {
+ prefix, messages := l.Leveled.Strings()
+ res := make([]string, len(messages))
+ for i, m := range messages {
+ res[i] = fmt.Sprintf("%-32s %s%s", l.DN, prefix, m)
+ }
+ return strings.Join(res, "\n")
+ }
+ if l.Raw != nil {
+ return fmt.Sprintf("%-32s R %s", l.DN, l.Raw)
+ }
+ return "INVALID"
+}
+
+// ConciseString returns a concise representation of this log entry for
+// constrained environments, like TTY consoles.
+//
+// The output format is as follows:
+//
+// shortened dn I Hello there
+// some component W Something went wrong
+// shortened dn I Goodbye there
+// external stuff R I am en external process using raw logging.
+//
+// The above output is the result of calling ConciseString on three different
+// LogEntries.
+//
+// If maxWidth is greater than zero, word wrapping will be applied. For example,
+// with maxWidth set to 40:
+//
+// shortened I Hello there
+// some component W Something went wrong and here are the very long details that
+// | describe this particular issue: according to all known laws of
+// | aviation, there is no way a bee should be able to fly.
+// shortened dn I Goodbye there
+// external stuff R I am en external process using raw logging.
+//
+// The above output is also the result of calling ConciseString on three
+// different LogEntries.
+//
+// Multi-line log entries will emit 'continuation' lines (with '|') in the same
+// way as word wrapping does. That means that even with word wrapping disabled,
+// the result of this function might be multiline.
+//
+// The width of the first column (the 'shortened DN' column) is automatically
+// selected based on maxWidth. If maxWidth is less than 60, the column will be
+// omitted. For example, with maxWidth set to 20:
+//
+// I Hello there
+// W Something went wrong and here are the very long details that
+// | describe this particular issue: according to all known laws of
+// | aviation, there is no way a bee should be able to fly.
+// I Goodbye there
+// R I am en external process using raw logging.
+//
+// The given `dict` implements simple replacement rules for shortening the DN
+// parts of a log entry's DN. Some rules are hardcoded for Metropolis' DN tree.
+// If no extra shortening rules should be applied, dict can be set to ni// The
+// given `dict` implements simple replacement rules for shortening the DN parts
+// of a log entry's DN. Some rules are hardcoded for Metropolis' DN tree. If no
+// extra shortening rules should be applied, dict can be set to nil.
+func (l *LogEntry) ConciseString(dict ShortenDictionary, maxWidth int) string {
+ // Decide on a dnWidth.
+ dnWidth := 0
+ switch {
+ case maxWidth >= 80:
+ dnWidth = 20
+ case maxWidth >= 60:
+ dnWidth = 16
+ case maxWidth <= 0:
+ // No word wrapping.
+ dnWidth = 20
+ }
+
+ // Compute shortened DN, if needed.
+ sh := ""
+ if dnWidth > 0 {
+ sh = l.DN.Shorten(dict, dnWidth)
+ sh = fmt.Sprintf("%*s ", dnWidth, sh)
+ }
+
+ // Prefix of the first line emitted.
+ var prefix string
+ switch {
+ case l.Leveled != nil:
+ prefix = sh + string(l.Leveled.Severity()) + " "
+ case l.Raw != nil:
+ prefix = sh + "R "
+ }
+ // Prefix of rest of lines emitted.
+ continuationPrefix := strings.Repeat(" ", len(sh)) + "| "
+
+ // Collect lines based on the type of LogEntry.
+ var lines []string
+ collect := func(message string) {
+ if maxWidth > 0 {
+ message = wordwrap.WrapString(message, uint(maxWidth-len(prefix)))
+ }
+ for _, m2 := range strings.Split(message, "\n") {
+ if len(m2) == 0 {
+ continue
+ }
+ if len(lines) == 0 {
+ lines = append(lines, prefix+m2)
+ } else {
+ lines = append(lines, continuationPrefix+m2)
+ }
+ }
+ }
+ switch {
+ case l.Leveled != nil:
+ _, messages := l.Leveled.Strings()
+ for _, m := range messages {
+ collect(m)
+ }
+ case l.Raw != nil:
+ collect(l.Raw.String())
+ default:
+ return ""
+ }
+
+ return strings.Join(lines, "\n")
+}
+
+// Strings returns the canonical representation of this payload split into a
+// prefix and all lines that were contained in the original message. This is
+// meant to be displayed to the user by showing the prefix before each line,
+// concatenated together - possibly in a table form with the prefixes all
+// unified with a rowspan- like mechanism.
+//
+// For example, this function can return:
+// prefix = "root.foo.bar I1102 17:20:06.921395 0 foo.go:42] "
+// lines = []string{"current tags:", " - one", " - two"}
+//
+// With this data, the result should be presented to users this way in text form:
+// root.foo.bar I1102 17:20:06.921395 foo.go:42] current tags:
+// root.foo.bar I1102 17:20:06.921395 foo.go:42] - one
+// root.foo.bar I1102 17:20:06.921395 foo.go:42] - two
+//
+// Or, in a table layout:
+// .----------------------------------------------------------------------.
+// | root.foo.bar I1102 17:20:06.921395 foo.go:42] : current tags: |
+// | :------------------|
+// | : - one |
+// | :------------------|
+// | : - two |
+// '----------------------------------------------------------------------'
+
+func (l *LogEntry) Strings() (prefix string, lines []string) {
+ if l.Leveled != nil {
+ prefix, messages := l.Leveled.Strings()
+ prefix = fmt.Sprintf("%-32s %s", l.DN, prefix)
+ return prefix, messages
+ }
+ if l.Raw != nil {
+ return fmt.Sprintf("%-32s R ", l.DN), []string{l.Raw.Data}
+ }
+ return "INVALID ", []string{"INVALID"}
+}
+
+// Proto converts this LogEntry to proto. Returned value may be nil if given
+// LogEntry is invalid, eg. contains neither a Raw nor Leveled entry.
+func (l *LogEntry) Proto() *lpb.LogEntry {
+ p := &lpb.LogEntry{
+ Dn: string(l.DN),
+ }
+ switch {
+ case l.Leveled != nil:
+ leveled := l.Leveled
+ p.Kind = &lpb.LogEntry_Leveled_{
+ Leveled: leveled.Proto(),
+ }
+ case l.Raw != nil:
+ raw := l.Raw
+ p.Kind = &lpb.LogEntry_Raw_{
+ Raw: raw.ProtoLog(),
+ }
+ default:
+ return nil
+ }
+ return p
+}
+
+// LogEntryFromProto parses a proto LogEntry back into internal structure.
+// This can be used in log proto API consumers to easily print received log
+// entries.
+func LogEntryFromProto(l *lpb.LogEntry) (*LogEntry, error) {
+ dn := DN(l.Dn)
+ if _, err := dn.Path(); err != nil {
+ return nil, fmt.Errorf("could not convert DN: %w", err)
+ }
+ res := &LogEntry{
+ DN: dn,
+ }
+ switch inner := l.Kind.(type) {
+ case *lpb.LogEntry_Leveled_:
+ leveled, err := LeveledPayloadFromProto(inner.Leveled)
+ if err != nil {
+ return nil, fmt.Errorf("could not convert leveled entry: %w", err)
+ }
+ res.Leveled = leveled
+ case *lpb.LogEntry_Raw_:
+ line, err := logbuffer.LineFromLogProto(inner.Raw)
+ if err != nil {
+ return nil, fmt.Errorf("could not convert raw entry: %w", err)
+ }
+ res.Raw = line
+ default:
+ return nil, fmt.Errorf("proto has neither Leveled nor Raw set")
+ }
+ return res, nil
+}
diff --git a/osbase/logtree/logtree_publisher.go b/osbase/logtree/logtree_publisher.go
new file mode 100644
index 0000000..6c4120a
--- /dev/null
+++ b/osbase/logtree/logtree_publisher.go
@@ -0,0 +1,229 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "fmt"
+ "io"
+ "runtime"
+ "strings"
+ "time"
+
+ "source.monogon.dev/osbase/logbuffer"
+)
+
+type leveledPublisher struct {
+ node *node
+ depth int
+}
+
+// LeveledFor returns a LeveledLogger publishing interface for a given DN. An error
+// may be returned if the DN is malformed.
+func (l *LogTree) LeveledFor(dn DN) (LeveledLogger, error) {
+ node, err := l.nodeByDN(dn)
+ if err != nil {
+ return nil, err
+ }
+ return &leveledPublisher{
+ node: node,
+ depth: 0,
+ }, nil
+}
+
+func (l *LogTree) RawFor(dn DN) (io.Writer, error) {
+ node, err := l.nodeByDN(dn)
+ if err != nil {
+ return nil, fmt.Errorf("could not retrieve raw logger: %w", err)
+ }
+ return node.rawLineBuffer, nil
+}
+
+// MustLeveledFor returns a LeveledLogger publishing interface for a given DN, or
+// panics if the given DN is invalid.
+func (l *LogTree) MustLeveledFor(dn DN) LeveledLogger {
+ leveled, err := l.LeveledFor(dn)
+ if err != nil {
+ panic(fmt.Errorf("LeveledFor returned: %w", err))
+ }
+ return leveled
+}
+
+func (l *LogTree) MustRawFor(dn DN) io.Writer {
+ raw, err := l.RawFor(dn)
+ if err != nil {
+ panic(fmt.Errorf("RawFor returned: %w", err))
+ }
+ return raw
+}
+
+// SetVerbosity sets the verbosity for a given DN (non-recursively, ie. for that DN
+// only, not its children).
+func (l *LogTree) SetVerbosity(dn DN, level VerbosityLevel) error {
+ node, err := l.nodeByDN(dn)
+ if err != nil {
+ return err
+ }
+ node.verbosity = level
+ return nil
+}
+
+// logRaw is called by this node's LineBuffer any time a raw log line is completed.
+// It will create a new entry, append it to the journal, and notify all pertinent
+// subscribers.
+func (n *node) logRaw(line *logbuffer.Line) {
+ e := &entry{
+ origin: n.dn,
+ raw: line,
+ }
+ n.tree.journal.append(e)
+ n.tree.journal.notify(e)
+}
+
+// LogExternalLeveled injects a ExternalLeveledPayload into a given
+// LeveledLogger. This should only be used by systems which translate external
+// data sources into leveled logging - see ExternelLeveledPayload for more
+// information.
+func LogExternalLeveled(l LeveledLogger, e *ExternalLeveledPayload) error {
+ publisher, ok := l.(*leveledPublisher)
+ if !ok {
+ return fmt.Errorf("the given LeveledLogger is not a *leveledPublisher")
+ }
+ p := e.sanitize()
+ entry := &entry{
+ origin: publisher.node.dn,
+ leveled: p,
+ }
+ publisher.node.tree.journal.append(entry)
+ publisher.node.tree.journal.notify(entry)
+ return nil
+}
+
+// log builds a LeveledPayload and entry for a given message, including all related
+// metadata. It will create a new entry append it to the journal, and notify all
+// pertinent subscribers.
+func (l *leveledPublisher) logLeveled(depth int, severity Severity, msg string) {
+ _, file, line, ok := runtime.Caller(2 + depth)
+ if !ok {
+ file = "???"
+ line = 1
+ } else {
+ slash := strings.LastIndex(file, "/")
+ if slash >= 0 {
+ file = file[slash+1:]
+ }
+ }
+
+ // Remove leading/trailing newlines and split.
+ messages := strings.Split(strings.Trim(msg, "\n"), "\n")
+
+ p := &LeveledPayload{
+ timestamp: time.Now(),
+ severity: severity,
+ messages: messages,
+ file: file,
+ line: line,
+ }
+ e := &entry{
+ origin: l.node.dn,
+ leveled: p,
+ }
+ l.node.tree.journal.append(e)
+ l.node.tree.journal.notify(e)
+}
+
+// Info implements the LeveledLogger interface.
+func (l *leveledPublisher) Info(args ...interface{}) {
+ l.logLeveled(l.depth, INFO, fmt.Sprint(args...))
+}
+
+// Infof implements the LeveledLogger interface.
+func (l *leveledPublisher) Infof(format string, args ...interface{}) {
+ l.logLeveled(l.depth, INFO, fmt.Sprintf(format, args...))
+}
+
+// Warning implements the LeveledLogger interface.
+func (l *leveledPublisher) Warning(args ...interface{}) {
+ l.logLeveled(l.depth, WARNING, fmt.Sprint(args...))
+}
+
+// Warningf implements the LeveledLogger interface.
+func (l *leveledPublisher) Warningf(format string, args ...interface{}) {
+ l.logLeveled(l.depth, WARNING, fmt.Sprintf(format, args...))
+}
+
+// Error implements the LeveledLogger interface.
+func (l *leveledPublisher) Error(args ...interface{}) {
+ l.logLeveled(l.depth, ERROR, fmt.Sprint(args...))
+}
+
+// Errorf implements the LeveledLogger interface.
+func (l *leveledPublisher) Errorf(format string, args ...interface{}) {
+ l.logLeveled(l.depth, ERROR, fmt.Sprintf(format, args...))
+}
+
+// Fatal implements the LeveledLogger interface.
+func (l *leveledPublisher) Fatal(args ...interface{}) {
+ l.logLeveled(l.depth, FATAL, fmt.Sprint(args...))
+}
+
+// Fatalf implements the LeveledLogger interface.
+func (l *leveledPublisher) Fatalf(format string, args ...interface{}) {
+ l.logLeveled(l.depth, FATAL, fmt.Sprintf(format, args...))
+}
+
+// WithAddedStackDepth impleemnts the LeveledLogger interface.
+func (l *leveledPublisher) WithAddedStackDepth(depth int) LeveledLogger {
+ l2 := *l
+ l2.depth += depth
+ return &l2
+}
+
+// V implements the LeveledLogger interface.
+func (l *leveledPublisher) V(v VerbosityLevel) VerboseLeveledLogger {
+ return &verbose{
+ publisher: l,
+ enabled: l.node.verbosity >= v,
+ }
+}
+
+// verbose implements the VerboseLeveledLogger interface. It is a thin wrapper
+// around node, with an 'enabled' bool. This means that V(n)-returned
+// VerboseLeveledLoggers must be short lived, as a changed in verbosity will not
+// affect all already existing VerboseLeveledLoggers.
+type verbose struct {
+ publisher *leveledPublisher
+ node *node
+ enabled bool
+}
+
+func (v *verbose) Enabled() bool {
+ return v.enabled
+}
+
+func (v *verbose) Info(args ...interface{}) {
+ if !v.enabled {
+ return
+ }
+ v.publisher.logLeveled(v.publisher.depth, INFO, fmt.Sprint(args...))
+}
+
+func (v *verbose) Infof(format string, args ...interface{}) {
+ if !v.enabled {
+ return
+ }
+ v.publisher.logLeveled(v.publisher.depth, INFO, fmt.Sprintf(format, args...))
+}
diff --git a/osbase/logtree/logtree_test.go b/osbase/logtree/logtree_test.go
new file mode 100644
index 0000000..54eabb7
--- /dev/null
+++ b/osbase/logtree/logtree_test.go
@@ -0,0 +1,386 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+)
+
+func expect(tree *LogTree, t *testing.T, dn DN, entries ...string) string {
+ t.Helper()
+ res, err := tree.Read(dn, WithChildren(), WithBacklog(BacklogAllAvailable))
+ if err != nil {
+ t.Fatalf("Read: %v", err)
+ }
+ defer res.Close()
+ if want, got := len(entries), len(res.Backlog); want != got {
+ t.Fatalf("wanted %v backlog entries, got %v", want, got)
+ }
+ got := make(map[string]bool)
+ for _, entry := range res.Backlog {
+ if entry.Leveled != nil {
+ got[entry.Leveled.MessagesJoined()] = true
+ }
+ if entry.Raw != nil {
+ got[entry.Raw.Data] = true
+ }
+ }
+ for _, entry := range entries {
+ if !got[entry] {
+ return fmt.Sprintf("missing entry %q", entry)
+ }
+ }
+ return ""
+}
+
+func readBacklog(tree *LogTree, t *testing.T, dn DN, backlog int, recursive bool) []string {
+ t.Helper()
+ opts := []LogReadOption{
+ WithBacklog(backlog),
+ }
+ if recursive {
+ opts = append(opts, WithChildren())
+ }
+ res, err := tree.Read(dn, opts...)
+ if err != nil {
+ t.Fatalf("Read: %v", err)
+ }
+ defer res.Close()
+
+ var lines []string
+ for _, e := range res.Backlog {
+ lines = append(lines, e.Leveled.Messages()...)
+ }
+ return lines
+}
+
+func TestMultiline(t *testing.T) {
+ tree := New()
+ // Two lines in a single message.
+ tree.MustLeveledFor("main").Info("foo\nbar")
+ // Two lines in a single message with a hanging newline that should get stripped.
+ tree.MustLeveledFor("main").Info("one\ntwo\n")
+
+ if res := expect(tree, t, "main", "foo\nbar", "one\ntwo"); res != "" {
+ t.Errorf("retrieval at main failed: %s", res)
+ }
+}
+
+func TestBacklogAll(t *testing.T) {
+ tree := New()
+ tree.MustLeveledFor("main").Info("hello, main!")
+ tree.MustLeveledFor("main.foo").Info("hello, main.foo!")
+ tree.MustLeveledFor("main.bar").Info("hello, main.bar!")
+ tree.MustLeveledFor("aux").Info("hello, aux!")
+ // No newline at the last entry - shouldn't get propagated to the backlog.
+ fmt.Fprintf(tree.MustRawFor("aux.process"), "processing foo\nprocessing bar\nbaz")
+
+ if res := expect(tree, t, "main", "hello, main!", "hello, main.foo!", "hello, main.bar!"); res != "" {
+ t.Errorf("retrieval at main failed: %s", res)
+ }
+ if res := expect(tree, t, "", "hello, main!", "hello, main.foo!", "hello, main.bar!", "hello, aux!", "processing foo", "processing bar"); res != "" {
+ t.Errorf("retrieval at root failed: %s", res)
+ }
+ if res := expect(tree, t, "aux", "hello, aux!", "processing foo", "processing bar"); res != "" {
+ t.Errorf("retrieval at aux failed: %s", res)
+ }
+}
+
+func TestBacklogExact(t *testing.T) {
+ tree := New()
+ tree.MustLeveledFor("main").Info("hello, main!")
+ tree.MustLeveledFor("main.foo").Info("hello, main.foo!")
+ tree.MustLeveledFor("main.bar").Info("hello, main.bar!")
+ tree.MustLeveledFor("main.bar.chatty").Info("hey there how are you")
+ tree.MustLeveledFor("main.bar.quiet").Info("fine how are you")
+ tree.MustLeveledFor("main.bar.chatty").Info("i've been alright myself")
+ tree.MustLeveledFor("main.bar.chatty").Info("but to tell you honestly...")
+ tree.MustLeveledFor("main.bar.chatty").Info("i feel like i'm stuck?")
+ tree.MustLeveledFor("main.bar.quiet").Info("mhm")
+ tree.MustLeveledFor("main.bar.chatty").Info("like you know what i'm saying, stuck in like")
+ tree.MustLeveledFor("main.bar.chatty").Info("like a go test?")
+ tree.MustLeveledFor("main.bar.quiet").Info("yeah totally")
+ tree.MustLeveledFor("main.bar.chatty").Info("it's hard to put my finger on it")
+ tree.MustLeveledFor("main.bar.chatty").Info("anyway, how's the wife doing?")
+
+ check := func(a []string, b ...string) {
+ t.Helper()
+ if len(a) != len(b) {
+ t.Errorf("Legth mismatch: wanted %d, got %d", len(b), len(a))
+ }
+ count := len(a)
+ if len(b) < count {
+ count = len(b)
+ }
+ for i := 0; i < count; i++ {
+ if want, got := b[i], a[i]; want != got {
+ t.Errorf("Message %d: wanted %q, got %q", i, want, got)
+ }
+ }
+ }
+
+ check(readBacklog(tree, t, "main", 3, true), "yeah totally", "it's hard to put my finger on it", "anyway, how's the wife doing?")
+ check(readBacklog(tree, t, "main.foo", 3, false), "hello, main.foo!")
+ check(readBacklog(tree, t, "main.bar.quiet", 2, true), "mhm", "yeah totally")
+}
+
+func TestStream(t *testing.T) {
+ tree := New()
+ tree.MustLeveledFor("main").Info("hello, backlog")
+ fmt.Fprintf(tree.MustRawFor("main.process"), "hello, raw backlog\n")
+
+ res, err := tree.Read("", WithBacklog(BacklogAllAvailable), WithChildren(), WithStream())
+ if err != nil {
+ t.Fatalf("Read: %v", err)
+ }
+ defer res.Close()
+ if want, got := 2, len(res.Backlog); want != got {
+ t.Errorf("wanted %d backlog item, got %d", want, got)
+ }
+
+ tree.MustLeveledFor("main").Info("hello, stream")
+ fmt.Fprintf(tree.MustRawFor("main.raw"), "hello, raw stream\n")
+
+ entries := make(map[string]bool)
+ timeout := time.After(time.Second * 1)
+ for {
+ done := false
+ select {
+ case <-timeout:
+ done = true
+ case p := <-res.Stream:
+ if p.Leveled != nil {
+ entries[p.Leveled.MessagesJoined()] = true
+ }
+ if p.Raw != nil {
+ entries[p.Raw.Data] = true
+ }
+ }
+ if done {
+ break
+ }
+ }
+ if entry := "hello, stream"; !entries[entry] {
+ t.Errorf("Missing entry %q", entry)
+ }
+ if entry := "hello, raw stream"; !entries[entry] {
+ t.Errorf("Missing entry %q", entry)
+ }
+}
+
+func TestVerbose(t *testing.T) {
+ tree := New()
+
+ tree.MustLeveledFor("main").V(10).Info("this shouldn't get logged")
+
+ reader, err := tree.Read("", WithBacklog(BacklogAllAvailable), WithChildren())
+ if err != nil {
+ t.Fatalf("Read: %v", err)
+ }
+ if want, got := 0, len(reader.Backlog); want != got {
+ t.Fatalf("expected nothing to be logged, got %+v", reader.Backlog)
+ }
+
+ tree.SetVerbosity("main", 10)
+ tree.MustLeveledFor("main").V(10).Info("this should get logged")
+
+ reader, err = tree.Read("", WithBacklog(BacklogAllAvailable), WithChildren())
+ if err != nil {
+ t.Fatalf("Read: %v", err)
+ }
+ if want, got := 1, len(reader.Backlog); want != got {
+ t.Fatalf("expected %d entries to get logged, got %d", want, got)
+ }
+}
+
+func TestMetadata(t *testing.T) {
+ tree := New()
+ tree.MustLeveledFor("main").Error("i am an error")
+ tree.MustLeveledFor("main").Warning("i am a warning")
+ tree.MustLeveledFor("main").Info("i am informative")
+ tree.MustLeveledFor("main").V(0).Info("i am a zero-level debug")
+
+ reader, err := tree.Read("", WithChildren(), WithBacklog(BacklogAllAvailable))
+ if err != nil {
+ t.Fatalf("Read: %v", err)
+ }
+ if want, got := 4, len(reader.Backlog); want != got {
+ t.Fatalf("expected %d entries, got %d", want, got)
+ }
+
+ for _, te := range []struct {
+ ix int
+ severity Severity
+ message string
+ }{
+ {0, ERROR, "i am an error"},
+ {1, WARNING, "i am a warning"},
+ {2, INFO, "i am informative"},
+ {3, INFO, "i am a zero-level debug"},
+ } {
+ p := reader.Backlog[te.ix]
+ if want, got := te.severity, p.Leveled.Severity(); want != got {
+ t.Errorf("wanted element %d to have severity %s, got %s", te.ix, want, got)
+ }
+ if want, got := te.message, p.Leveled.MessagesJoined(); want != got {
+ t.Errorf("wanted element %d to have message %q, got %q", te.ix, want, got)
+ }
+ if want, got := "logtree_test.go", strings.Split(p.Leveled.Location(), ":")[0]; want != got {
+ t.Errorf("wanted element %d to have file %q, got %q", te.ix, want, got)
+ }
+ }
+}
+
+func TestSeverity(t *testing.T) {
+ tree := New()
+ tree.MustLeveledFor("main").Error("i am an error")
+ tree.MustLeveledFor("main").Warning("i am a warning")
+ tree.MustLeveledFor("main").Info("i am informative")
+ tree.MustLeveledFor("main").V(0).Info("i am a zero-level debug")
+
+ reader, err := tree.Read("main", WithBacklog(BacklogAllAvailable), LeveledWithMinimumSeverity(WARNING))
+ if err != nil {
+ t.Fatalf("Read: %v", err)
+ }
+ if want, got := 2, len(reader.Backlog); want != got {
+ t.Fatalf("wanted %d entries, got %d", want, got)
+ }
+ if want, got := "i am an error", reader.Backlog[0].Leveled.MessagesJoined(); want != got {
+ t.Fatalf("wanted entry %q, got %q", want, got)
+ }
+ if want, got := "i am a warning", reader.Backlog[1].Leveled.MessagesJoined(); want != got {
+ t.Fatalf("wanted entry %q, got %q", want, got)
+ }
+}
+
+func TestAddedStackDepth(t *testing.T) {
+ tree := New()
+ helper := func(msg string) {
+ tree.MustLeveledFor("main").WithAddedStackDepth(1).Infof("oh no: %s", msg)
+ }
+
+ // The next three lines are tested to be next to each other.
+ helper("it failed")
+ tree.MustLeveledFor("main").Infof("something else")
+
+ reader, err := tree.Read("main", WithBacklog(BacklogAllAvailable))
+ if err != nil {
+ t.Fatalf("Read: %v", err)
+ }
+ if want, got := 2, len(reader.Backlog); want != got {
+ t.Fatalf("wanted %d entries, got %d", want, got)
+ }
+ if want, got := "oh no: it failed", reader.Backlog[0].Leveled.MessagesJoined(); want != got {
+ t.Errorf("wanted entry %q, got %q", want, got)
+ }
+ if want, got := "something else", reader.Backlog[1].Leveled.MessagesJoined(); want != got {
+ t.Errorf("wanted entry %q, got %q", want, got)
+ }
+ if first, second := reader.Backlog[0].Leveled.line, reader.Backlog[1].Leveled.line; first+1 != second {
+ t.Errorf("first entry at %d, second at %d, wanted one after the other", first, second)
+ }
+}
+
+func TestLogEntry_ConciseString(t *testing.T) {
+ trim := func(s string) string {
+ return strings.Trim(s, "\n")
+ }
+ for i, te := range []struct {
+ entry *LogEntry
+ maxWidth int
+ want string
+ }{
+ {
+ &LogEntry{
+ Leveled: &LeveledPayload{
+ messages: []string{"Hello there!"},
+ severity: WARNING,
+ },
+ DN: "root.role.kubernetes.run.kubernetes.apiserver",
+ },
+ 120,
+ " k8s apiserver W Hello there!",
+ },
+ {
+ &LogEntry{
+ Leveled: &LeveledPayload{
+ messages: []string{"Hello there!", "I am multiline."},
+ severity: WARNING,
+ },
+ DN: "root.role.kubernetes.run.kubernetes.apiserver",
+ },
+ 120,
+ trim(`
+ k8s apiserver W Hello there!
+ | I am multiline.
+`),
+ },
+ {
+ &LogEntry{
+ Leveled: &LeveledPayload{
+ messages: []string{"Hello there! I am a very long string, and I will get wrapped to 120 columns because that's just how life is for long strings."},
+ severity: WARNING,
+ },
+ DN: "root.role.kubernetes.run.kubernetes.apiserver",
+ },
+ 120,
+ trim(`
+ k8s apiserver W Hello there! I am a very long string, and I will get wrapped to 120 columns because that's just
+ | how life is for long strings.
+`),
+ },
+ {
+ &LogEntry{
+ Leveled: &LeveledPayload{
+ messages: []string{"Hello there!"},
+ severity: WARNING,
+ },
+ DN: "root.role.kubernetes.run.kubernetes.apiserver",
+ },
+ 60,
+ trim(`
+ k8s apiserver W Hello there!
+`),
+ },
+ {
+ &LogEntry{
+ Leveled: &LeveledPayload{
+ messages: []string{"Hello there!"},
+ severity: WARNING,
+ },
+ DN: "root.role.kubernetes.run.kubernetes.apiserver",
+ },
+ 40,
+ "W Hello there!",
+ },
+ } {
+ got := te.entry.ConciseString(MetropolisShortenDict, te.maxWidth)
+ for _, line := range strings.Split(got, "\n") {
+ if want, got := te.maxWidth, len(line); got > want {
+ t.Errorf("Case %d, line %q too long (%d bytes, wanted at most %d)", i, line, got, want)
+ }
+ }
+ if te.want != got {
+ t.Errorf("Case %d, message diff", i)
+ t.Logf("Wanted:\n%s", te.want)
+ t.Logf("Got:\n%s", got)
+ }
+ }
+}
diff --git a/osbase/logtree/proto/BUILD.bazel b/osbase/logtree/proto/BUILD.bazel
new file mode 100644
index 0000000..92381f1
--- /dev/null
+++ b/osbase/logtree/proto/BUILD.bazel
@@ -0,0 +1,24 @@
+load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
+
+proto_library(
+ name = "proto_proto",
+ srcs = ["logtree.proto"],
+ visibility = ["//visibility:public"],
+ deps = ["@com_google_protobuf//:timestamp_proto"],
+)
+
+go_proto_library(
+ name = "proto_go_proto",
+ importpath = "source.monogon.dev/osbase/logtree/proto",
+ proto = ":proto_proto",
+ visibility = ["//visibility:public"],
+)
+
+go_library(
+ name = "proto",
+ embed = [":proto_go_proto"],
+ importpath = "source.monogon.dev/osbase/logtree/proto",
+ visibility = ["//visibility:public"],
+)
diff --git a/osbase/logtree/proto/gomod-generated-placeholder.go b/osbase/logtree/proto/gomod-generated-placeholder.go
new file mode 100644
index 0000000..92256db
--- /dev/null
+++ b/osbase/logtree/proto/gomod-generated-placeholder.go
@@ -0,0 +1 @@
+package proto
diff --git a/osbase/logtree/proto/logtree.proto b/osbase/logtree/proto/logtree.proto
new file mode 100644
index 0000000..bf341d4
--- /dev/null
+++ b/osbase/logtree/proto/logtree.proto
@@ -0,0 +1,59 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+package osbase.pkg.logtree.proto;
+option go_package = "source.monogon.dev/osbase/logtree/proto";
+
+import "google/protobuf/timestamp.proto";
+
+// Severity level corresponding to //osbase/logtree.Severity.
+enum LeveledLogSeverity {
+ INVALID = 0;
+ INFO = 1;
+ WARNING = 2;
+ ERROR = 3;
+ FATAL = 4;
+}
+
+// LogEntry corresponding to logtree.LogEntry in //osbase/logtree.
+message LogEntry {
+ // A leveled log entry emitted from a compatible system, eg. Metorpolis code
+ // or a klog-parsed line.
+ message Leveled {
+ repeated string lines = 1;
+ google.protobuf.Timestamp timestamp = 2;
+ LeveledLogSeverity severity = 3;
+ // Source of the error, expressed as file:line.
+ string location = 4;
+ }
+ // Raw log entry, captured from an external system without parting. Might
+ // contain some timestamp/level/origin information embedded in data. Data
+ // contained within should be treated as unsanitized external data.
+ message Raw {
+ string data = 1;
+ // Original length of line, set if data was truncated.
+ int64 original_length = 2;
+ }
+
+ // Origin DN (Distinguished Name), a unique identifier which is provided by
+ // the supervisor system.
+ string dn = 1;
+ oneof kind {
+ Leveled leveled = 2;
+ Raw raw = 3;
+ }
+}
diff --git a/osbase/logtree/testhelpers.go b/osbase/logtree/testhelpers.go
new file mode 100644
index 0000000..45bcaf2
--- /dev/null
+++ b/osbase/logtree/testhelpers.go
@@ -0,0 +1,37 @@
+package logtree
+
+import (
+ "context"
+ "testing"
+)
+
+// PipeAllToTest starts a goroutine that will forward all logtree entries
+// t.Logf(), in the canonical logtree payload representation.
+//
+// It's designed to be used in tests, and will automatically stop when the
+// test/benchmark it's running in exits.
+func PipeAllToTest(t testing.TB, lt *LogTree) {
+ t.Helper()
+
+ reader, err := lt.Read("", WithChildren(), WithStream())
+ if err != nil {
+ t.Fatalf("Failed to set up logtree reader: %v", err)
+ }
+
+ // Internal context used to cancel the goroutine. This could also be a
+ // implemented via a channel.
+ ctx, ctxC := context.WithCancel(context.Background())
+ t.Cleanup(ctxC)
+
+ go func() {
+ t.Helper()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case p := <-reader.Stream:
+ t.Logf("%s", p.String())
+ }
+ }
+ }()
+}
diff --git a/osbase/logtree/unraw/BUILD.bazel b/osbase/logtree/unraw/BUILD.bazel
new file mode 100644
index 0000000..3ae4da1
--- /dev/null
+++ b/osbase/logtree/unraw/BUILD.bazel
@@ -0,0 +1,24 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "unraw",
+ srcs = ["unraw.go"],
+ importpath = "source.monogon.dev/osbase/logtree/unraw",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//osbase/logbuffer",
+ "//osbase/logtree",
+ "//osbase/supervisor",
+ ],
+)
+
+go_test(
+ name = "unraw_test",
+ srcs = ["unraw_test.go"],
+ embed = [":unraw"],
+ deps = [
+ "//osbase/logbuffer",
+ "//osbase/logtree",
+ "//osbase/supervisor",
+ ],
+)
diff --git a/osbase/logtree/unraw/unraw.go b/osbase/logtree/unraw/unraw.go
new file mode 100644
index 0000000..5c4e8e9
--- /dev/null
+++ b/osbase/logtree/unraw/unraw.go
@@ -0,0 +1,148 @@
+// unraw implements a facility to convert raw logs from external sources into
+// leveled logs.
+//
+// This is not the same as raw logging inside the logtree, which exists to
+// ingest logs that are either fully arbitrary or do not map cleanly to the
+// leveled logging concept. The unraw library is instead made to parse logs
+// from systems that also use leveled logs internally, but emit them to a
+// serialized byte stream that then needs to be turned back into something
+// leveled inside metropolis.
+//
+// Logs converted this way are unfortunately lossy and do not come with the
+// same guarantees as logs directly emitted via logtree. For example, there's
+// no built-in protection against systems emiting fudged timestamps or file
+// locations. Thus, this functionality should be used to interact with trusted
+// systems, not fully arbitrary logs.
+package unraw
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "os"
+ "sync"
+ "syscall"
+ "time"
+
+ "source.monogon.dev/osbase/logbuffer"
+ "source.monogon.dev/osbase/logtree"
+ "source.monogon.dev/osbase/supervisor"
+)
+
+// Parser is a user-defined function for converting a log line received from an
+// external system into a leveled logging payload.
+// The given LeveledWriter should be called for every leveled log entry that
+// results from this line. This means that a parser might skip some lines, or
+// emit multiple leveled payloads per line.
+type Parser func(*logbuffer.Line, LeveledWriter)
+
+// Converter is the main entrypoint of the unraw library. It wraps a
+// LeveledLogger in combination with a Parser to create an io.Writer that can
+// be sent raw log data.
+type Converter struct {
+ // Parser is the user-defined parsing function for converting log lines
+ // into leveled logging payloads. This must be set.
+ Parser Parser
+ // MaximumLineLength is the maximum length of a single log line when
+ // splitting incoming writes into lines. If a line is longer than this, it
+ // will be truncated (and will be sent to the Parser regardless).
+ //
+ // If not set, this defaults to 1024 bytes.
+ MaximumLineLength int
+ // LeveledLogger is the logtree leveled logger into which events from the
+ // Parser will be sent.
+ LeveledLogger logtree.LeveledLogger
+
+ // mu guards lb.
+ mu sync.Mutex
+ // lb is the underlying line buffer used to split incoming data into lines.
+ // It will be initialized on first Write.
+ lb *logbuffer.LineBuffer
+}
+
+// LeveledWriter is called by a Parser for every ExternelLeveledPayload it
+// wishes to emit into a backing LeveledLogger. If the payload is missing some
+// fields, these will default to some sensible values - see the
+// ExternalLeveledPayload structure definition for more information.
+type LeveledWriter func(*logtree.ExternalLeveledPayload)
+
+// Write implements io.Writer. Any write performed into the Converter will
+// populate the converter's internal buffer, and any time that buffer contains
+// a full line it will be sent over to the Parser for processing.
+func (e *Converter) Write(p []byte) (int, error) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ if e.MaximumLineLength <= 0 {
+ e.MaximumLineLength = 1024
+ }
+ if e.lb == nil {
+ e.lb = logbuffer.NewLineBuffer(e.MaximumLineLength, func(l *logbuffer.Line) {
+ e.Parser(l, e.insert)
+ })
+ }
+ return e.lb.Write(p)
+}
+
+// insert implements LeveledWriter.
+func (e *Converter) insert(d *logtree.ExternalLeveledPayload) {
+ if err := logtree.LogExternalLeveled(e.LeveledLogger, d); err != nil {
+ e.LeveledLogger.Fatal("Could not insert unrawed entry: %v", err)
+ }
+}
+
+// NamedPipeReader returns a supervisor runnable that continously reads logs
+// from the given path and attempts to parse them into leveled logs using this
+// Converter.
+//
+// If the given path doesn't exist, a named pipe will be created there before
+// the function exits. This guarantee means that as long as any writing process
+// is not started before NamedPipeReader returns ther is no need to
+// remove/recreate the named pipe.
+//
+// TODO(q3k): defer the creation of the FIFO to localstorage so this doesn't
+// need to be taken care of in the first place.
+func (e *Converter) NamedPipeReader(path string) (supervisor.Runnable, error) {
+ if _, err := os.Stat(path); os.IsNotExist(err) {
+ if err := syscall.Mkfifo(path, 0666); err != nil {
+ return nil, fmt.Errorf("when creating named pipe: %w", err)
+ }
+ }
+ return func(ctx context.Context) error {
+ fifo, err := os.OpenFile(path, os.O_RDONLY, os.ModeNamedPipe)
+ if err != nil {
+ return fmt.Errorf("when opening named pipe: %w", err)
+ }
+ go func() {
+ <-ctx.Done()
+ fifo.Close()
+ }()
+ defer fifo.Close()
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ for {
+ // Quit if requested.
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+
+ n, err := io.Copy(e, fifo)
+ if n == 0 && err == nil {
+ // Hack because pipes/FIFOs can return zero reads when nobody
+ // is writing. To avoid busy-looping, sleep a bit before
+ // retrying. This does not loose data since the FIFO internal
+ // buffer will stall writes when it becomes full. 10ms maximum
+ // stall in a non-latency critical process (reading debug logs)
+ // is not an issue for us.
+ time.Sleep(10 * time.Millisecond)
+ } else if err != nil {
+ // Since we close fifo on context cancel, we'll get a 'file is already closed'
+ // io error here. Translate that over to the context error that caused it.
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ return fmt.Errorf("log pump failed: %w", err)
+ }
+
+ }
+ }, nil
+}
diff --git a/osbase/logtree/unraw/unraw_test.go b/osbase/logtree/unraw/unraw_test.go
new file mode 100644
index 0000000..994e55e
--- /dev/null
+++ b/osbase/logtree/unraw/unraw_test.go
@@ -0,0 +1,138 @@
+package unraw
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "syscall"
+ "testing"
+
+ "source.monogon.dev/osbase/logbuffer"
+ "source.monogon.dev/osbase/logtree"
+ "source.monogon.dev/osbase/supervisor"
+)
+
+func testParser(l *logbuffer.Line, w LeveledWriter) {
+ w(&logtree.ExternalLeveledPayload{
+ Message: l.Data,
+ })
+}
+
+func TestNamedPipeReader(t *testing.T) {
+ dir, err := os.MkdirTemp("/tmp", "metropolis-test-named-pipe-reader")
+ if err != nil {
+ t.Fatalf("could not create tempdir: %v", err)
+ }
+ defer os.RemoveAll(dir)
+ fifoPath := dir + "/fifo"
+
+ // Start named pipe reader.
+ started := make(chan struct{})
+ stop, lt := supervisor.TestHarness(t, func(ctx context.Context) error {
+ converter := Converter{
+ Parser: testParser,
+ LeveledLogger: supervisor.Logger(ctx),
+ }
+
+ r, err := converter.NamedPipeReader(fifoPath)
+ if err != nil {
+ return fmt.Errorf("could not create pipe reader: %w", err)
+ }
+ close(started)
+ return r(ctx)
+ })
+
+ <-started
+
+ // Open FIFO...
+ f, err := os.OpenFile(fifoPath, os.O_WRONLY, 0)
+ if err != nil {
+ t.Fatalf("could not open fifo: %v", err)
+ }
+
+ // Start reading all logs.
+ reader, err := lt.Read("root", logtree.WithChildren(), logtree.WithStream())
+ if err != nil {
+ t.Fatalf("could not get logtree reader: %v", err)
+ }
+ defer reader.Close()
+
+ // Write two lines to the fifo.
+ fmt.Fprintf(f, "foo\nbar\n")
+ f.Close()
+
+ // Expect lines to end up in logtree.
+ if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "foo"; want != got {
+ t.Errorf("expected first message to be %q, got %q", want, got)
+ }
+ if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "bar"; want != got {
+ t.Errorf("expected second message to be %q, got %q", want, got)
+ }
+
+ // Fully restart the entire supervisor and pipe reader, redo test, things
+ // should continue to work.
+ stop()
+
+ // Block until FIFO isn't being read anymore. This ensures that the
+ // NamedPipeReader actually stopped running, otherwise the following write to
+ // the fifo can race by writing to the old NamedPipeReader and making the test
+ // time out. This can also happen in production, but that will just cause us to
+ // lose piped data in the very small race window when this can happen
+ // (statistically in this test, <0.1%).
+ //
+ // The check is being done by opening the FIFO in 'non-blocking mode', which
+ // returns ENXIO immediately if the FIFO has no corresponding writer, and
+ // succeeds otherwise.
+ for {
+ ft, err := os.OpenFile(fifoPath, os.O_WRONLY|syscall.O_NONBLOCK, 0)
+ if err == nil {
+ // There's still a writer, keep trying.
+ ft.Close()
+ } else if errors.Is(err, syscall.ENXIO) {
+ // No writer, break.
+ break
+ } else {
+ // Something else?
+ t.Fatalf("OpenFile(%q): %v", fifoPath, err)
+ }
+ }
+
+ started = make(chan struct{})
+ stop, lt = supervisor.TestHarness(t, func(ctx context.Context) error {
+ converter := Converter{
+ Parser: testParser,
+ LeveledLogger: supervisor.Logger(ctx),
+ }
+
+ r, err := converter.NamedPipeReader(fifoPath)
+ if err != nil {
+ return fmt.Errorf("could not create pipe reader: %w", err)
+ }
+ close(started)
+ return r(ctx)
+ })
+ defer stop()
+
+ <-started
+
+ // Start reading all logs.
+ reader, err = lt.Read("root", logtree.WithChildren(), logtree.WithStream())
+ if err != nil {
+ t.Fatalf("could not get logtree reader: %v", err)
+ }
+ defer reader.Close()
+
+ // Write line to the fifo.
+ f, err = os.OpenFile(fifoPath, os.O_WRONLY, 0)
+ if err != nil {
+ t.Fatalf("could not open fifo: %v", err)
+ }
+ fmt.Fprintf(f, "baz\n")
+ f.Close()
+
+ // Expect lines to end up in logtree.
+ if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "baz"; want != got {
+ t.Errorf("expected first message to be %q, got %q", want, got)
+ }
+}
diff --git a/osbase/logtree/zap.go b/osbase/logtree/zap.go
new file mode 100644
index 0000000..f3ae6e3
--- /dev/null
+++ b/osbase/logtree/zap.go
@@ -0,0 +1,141 @@
+package logtree
+
+import (
+ "encoding/json"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+
+ "source.monogon.dev/osbase/logbuffer"
+)
+
+// Zapify turns a LeveledLogger into a zap.Logger which pipes its output into the
+// LeveledLogger. The message, severity and caller are carried over. Extra fields
+// are appended as JSON to the end of the log line.
+func Zapify(logger LeveledLogger, minimumLevel zapcore.Level) *zap.Logger {
+ p, ok := logger.(*leveledPublisher)
+ if !ok {
+ // Fail fast, as this is a programming error.
+ panic("Expected *leveledPublisher in LeveledLogger from supervisor")
+ }
+
+ ec := zapcore.EncoderConfig{
+ MessageKey: "message",
+ LevelKey: "level",
+ TimeKey: "time",
+ CallerKey: "caller",
+ EncodeLevel: zapcore.LowercaseLevelEncoder,
+ EncodeTime: zapcore.EpochTimeEncoder,
+ EncodeCaller: zapcore.ShortCallerEncoder,
+ }
+ s := zapSink{
+ publisher: p,
+ }
+ s.buffer = logbuffer.NewLineBuffer(4096, s.consumeLine)
+ zc := zapcore.NewCore(zapcore.NewJSONEncoder(ec), s.buffer, minimumLevel)
+ return zap.New(zc, zap.AddCaller())
+}
+
+type zapSink struct {
+ publisher *leveledPublisher
+ buffer *logbuffer.LineBuffer
+}
+
+func (z *zapSink) consumeLine(l *logbuffer.Line) {
+ ze, err := parseZapJSON(l.Data)
+ if err != nil {
+ z.publisher.Warningf("failed to parse zap JSON: %v: %q", err, l.Data)
+ return
+ }
+ message := ze.message
+ if len(ze.extra) > 0 {
+ message += " " + ze.extra
+ }
+ e := &entry{
+ origin: z.publisher.node.dn,
+ leveled: &LeveledPayload{
+ timestamp: ze.time,
+ severity: ze.severity,
+ messages: []string{message},
+ file: ze.file,
+ line: ze.line,
+ },
+ }
+ z.publisher.node.tree.journal.append(e)
+ z.publisher.node.tree.journal.notify(e)
+}
+
+type zapEntry struct {
+ message string
+ severity Severity
+ time time.Time
+ file string
+ line int
+ extra string
+}
+
+func parseZapJSON(s string) (*zapEntry, error) {
+ entry := make(map[string]any)
+ if err := json.Unmarshal([]byte(s), &entry); err != nil {
+ return nil, fmt.Errorf("invalid JSON: %v", err)
+ }
+ message, ok := entry["message"].(string)
+ if !ok {
+ return nil, fmt.Errorf("no message field")
+ }
+ level, ok := entry["level"].(string)
+ if !ok {
+ return nil, fmt.Errorf("no level field")
+ }
+ t, ok := entry["time"].(float64)
+ if !ok {
+ return nil, fmt.Errorf("no time field")
+ }
+ caller, ok := entry["caller"].(string)
+ if !ok {
+ return nil, fmt.Errorf("no caller field")
+ }
+
+ callerParts := strings.Split(caller, ":")
+ if len(callerParts) != 2 {
+ return nil, fmt.Errorf("invalid caller")
+ }
+ callerDirFile := strings.Split(callerParts[0], "/")
+ callerFile := callerDirFile[len(callerDirFile)-1]
+ callerLineS := callerParts[1]
+ callerLine, _ := strconv.Atoi(callerLineS)
+
+ var severity Severity
+ switch level {
+ case "warn":
+ severity = WARNING
+ case "error", "dpanic", "panic", "fatal":
+ severity = ERROR
+ default:
+ severity = INFO
+ }
+
+ secs := int64(t)
+ nsecs := int64((t - float64(secs)) * 1e9)
+
+ delete(entry, "message")
+ delete(entry, "level")
+ delete(entry, "time")
+ delete(entry, "caller")
+ var extra []byte
+ if len(entry) > 0 {
+ extra, _ = json.Marshal(entry)
+ }
+ return &zapEntry{
+ message: message,
+ severity: severity,
+ time: time.Unix(secs, nsecs),
+ file: callerFile,
+ line: callerLine,
+ extra: string(extra),
+ }, nil
+}
diff --git a/osbase/logtree/zap_test.go b/osbase/logtree/zap_test.go
new file mode 100644
index 0000000..3917cd8
--- /dev/null
+++ b/osbase/logtree/zap_test.go
@@ -0,0 +1,42 @@
+package logtree
+
+import (
+ "testing"
+
+ "go.uber.org/zap"
+)
+
+func TestZapify(t *testing.T) {
+ lt := New()
+
+ z := Zapify(lt.MustLeveledFor("zap"), zap.InfoLevel)
+ z.Info("foo", zap.String("strp", "strv"), zap.Int("intp", 42))
+ z.Warn("foo!", zap.String("strp", "strv"), zap.Int("intp", 1337))
+ z.Error("foo!!")
+
+ res, err := lt.Read("zap", WithBacklog(BacklogAllAvailable))
+ if err != nil {
+ t.Fatalf("Read: %v", err)
+ }
+ defer res.Close()
+
+ if want, got := 3, len(res.Backlog); want != got {
+ t.Errorf("Wanted %d entries, got %d", want, got)
+ } else {
+ for i, te := range []struct {
+ msg string
+ sev Severity
+ }{
+ {`foo {"intp":42,"strp":"strv"}`, INFO},
+ {`foo! {"intp":1337,"strp":"strv"}`, WARNING},
+ {`foo!!`, ERROR},
+ } {
+ if want, got := te.msg, res.Backlog[i].Leveled.messages[0]; want != got {
+ t.Errorf("Line %d: wanted message %q, got %q", i, want, got)
+ }
+ if want, got := te.sev, res.Backlog[i].Leveled.severity; want != got {
+ t.Errorf("Line %d: wanted level %s, got %s", i, want, got)
+ }
+ }
+ }
+}