logbuffer: split out LineBuffer
We want to be able to use similar line-oriented buffering in LogTree.
Rather than repeat ourselves, let's fact this out into a nice little
library.
Test Plan: Covered by existing logbuffer tests, added some extra linebuffer-specific ones.
X-Origin-Diff: phab/D636
GitOrigin-RevId: 38e832d323ed9f1723feaa9f9169caad18619e55
diff --git a/core/pkg/logbuffer/BUILD.bazel b/core/pkg/logbuffer/BUILD.bazel
index a53cb7a..fb7512a 100644
--- a/core/pkg/logbuffer/BUILD.bazel
+++ b/core/pkg/logbuffer/BUILD.bazel
@@ -2,14 +2,20 @@
go_library(
name = "go_default_library",
- srcs = ["logbuffer.go"],
+ srcs = [
+ "linebuffer.go",
+ "logbuffer.go",
+ ],
importpath = "git.monogon.dev/source/nexantic.git/core/pkg/logbuffer",
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
- srcs = ["logbuffer_test.go"],
+ srcs = [
+ "linebuffer_test.go",
+ "logbuffer_test.go",
+ ],
embed = [":go_default_library"],
deps = ["@com_github_stretchr_testify//require:go_default_library"],
)
diff --git a/core/pkg/logbuffer/linebuffer.go b/core/pkg/logbuffer/linebuffer.go
new file mode 100644
index 0000000..6ee7d6b
--- /dev/null
+++ b/core/pkg/logbuffer/linebuffer.go
@@ -0,0 +1,135 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+ "bytes"
+ "fmt"
+ "strings"
+ "sync"
+)
+
+// Line is a line stored in the log buffer - a string, that has been perhaps truncated (due to exceeded limits).
+type Line struct {
+ Data string
+ OriginalLength int
+}
+
+// Truncated returns whether this line has been truncated to fit limits.
+func (l *Line) Truncated() bool {
+ return l.OriginalLength > len(l.Data)
+}
+
+// String returns the line with an ellipsis at the end (...) if the line has been truncated, or the original line
+// otherwise.
+func (l *Line) String() string {
+ if l.Truncated() {
+ return l.Data + "..."
+ }
+ return l.Data
+}
+
+// LineBuffer is a io.WriteCloser that will call a given callback every time a line is completed.
+type LineBuffer struct {
+ maxLineLength int
+ cb LineBufferCallback
+
+ mu sync.Mutex
+ cur strings.Builder
+ // length is the length of the line currently being written - this will continue to increase, even if the string
+ // exceeds maxLineLength.
+ length int
+ closed bool
+}
+
+// LineBufferCallback is a callback that will get called any time the line is completed. The function must not cause another
+// write to the LineBuffer, or the program will deadlock.
+type LineBufferCallback func(*Line)
+
+// NewLineBuffer creates a new LineBuffer with a given line length limit and callback.
+func NewLineBuffer(maxLineLength int, cb LineBufferCallback) *LineBuffer {
+ return &LineBuffer{
+ maxLineLength: maxLineLength,
+ cb: cb,
+ }
+}
+
+// writeLimited writes to the internal buffer, making sure that its size does not exceed the maxLineLength.
+func (l *LineBuffer) writeLimited(data []byte) {
+ l.length += len(data)
+ if l.cur.Len()+len(data) > l.maxLineLength {
+ data = data[:l.maxLineLength-l.cur.Len()]
+ }
+ l.cur.Write(data)
+}
+
+// comitLine calls the callback and resets the builder.
+func (l *LineBuffer) commitLine() {
+ l.cb(&Line{
+ Data: l.cur.String(),
+ OriginalLength: l.length,
+ })
+ l.cur.Reset()
+ l.length = 0
+}
+
+func (l *LineBuffer) Write(data []byte) (int, error) {
+ var pos = 0
+
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ if l.closed {
+ return 0, fmt.Errorf("closed")
+ }
+
+ for {
+ nextNewline := bytes.IndexRune(data[pos:], '\n')
+
+ // No newline in the data, write everything to the current line
+ if nextNewline == -1 {
+ l.writeLimited(data[pos:])
+ break
+ }
+
+ // Write this line and update position
+ l.writeLimited(data[pos : pos+nextNewline])
+ l.commitLine()
+ pos += nextNewline + 1
+
+ // Data ends with a newline, stop now without writing an empty line
+ if nextNewline == len(data)-1 {
+ break
+ }
+ }
+ return len(data), nil
+}
+
+// Close will emit any leftover data in the buffer to the callback. Subsequent calls to Write will fail. Subsequent calls to Close
+// will also fail.
+func (l *LineBuffer) Close() error {
+ if l.closed {
+ return fmt.Errorf("already closed")
+ }
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ l.closed = true
+ if l.length > 0 {
+ l.commitLine()
+ }
+ return nil
+}
diff --git a/core/pkg/logbuffer/linebuffer_test.go b/core/pkg/logbuffer/linebuffer_test.go
new file mode 100644
index 0000000..c821a4b
--- /dev/null
+++ b/core/pkg/logbuffer/linebuffer_test.go
@@ -0,0 +1,75 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestLineBuffer(t *testing.T) {
+ var lines []*Line
+ lb := NewLineBuffer(1024, func(l *Line) {
+ lines = append(lines, l)
+ })
+
+ compare := func(a []*Line, b ...string) string {
+ msg := fmt.Sprintf("want %v, got %v", a, b)
+ if len(a) != len(b) {
+ return msg
+ }
+ for i, _ := range a {
+ if a[i].String() != b[i] {
+ return msg
+ }
+ }
+ return ""
+ }
+
+ // Write some data.
+ fmt.Fprintf(lb, "foo ")
+ if diff := compare(lines); diff != "" {
+ t.Fatal(diff)
+ }
+ fmt.Fprintf(lb, "bar\n")
+ if diff := compare(lines, "foo bar"); diff != "" {
+ t.Fatal(diff)
+ }
+ fmt.Fprintf(lb, "baz")
+ if diff := compare(lines, "foo bar"); diff != "" {
+ t.Fatal(diff)
+ }
+ fmt.Fprintf(lb, " baz")
+ if diff := compare(lines, "foo bar"); diff != "" {
+ t.Fatal(diff)
+ }
+ // Close and expect flush.
+ if err := lb.Close(); err != nil {
+ t.Fatalf("Close: %v", err)
+ }
+ if diff := compare(lines, "foo bar", "baz baz"); diff != "" {
+ t.Fatal(diff)
+ }
+
+ // Check behaviour after close
+ if _, err := fmt.Fprintf(lb, "nope"); err == nil {
+ t.Fatalf("Write after Close: wanted error, got nil")
+ }
+ if err := lb.Close(); err == nil {
+ t.Fatalf("second Close: wanted error, got nil")
+ }
+}
diff --git a/core/pkg/logbuffer/logbuffer.go b/core/pkg/logbuffer/logbuffer.go
index 8298deb..ce47816 100644
--- a/core/pkg/logbuffer/logbuffer.go
+++ b/core/pkg/logbuffer/logbuffer.go
@@ -21,79 +21,32 @@
package logbuffer
import (
- "bytes"
- "strings"
"sync"
)
// LogBuffer implements a fixed-size in-memory ring buffer for line-separated logs
type LogBuffer struct {
- mu sync.RWMutex
- maxLineLength int
- content []Line
- length int
-
- currentLineBuilder strings.Builder
- currentLineWrittenLength int
+ mu sync.RWMutex
+ content []Line
+ length int
+ *LineBuffer
}
-type Line struct {
- Data string
- OriginalLength int
-}
-
+// New creates a new LogBuffer with a given ringbuffer size and maximum line length.
func New(size, maxLineLength int) *LogBuffer {
- return &LogBuffer{
- content: make([]Line, size),
- maxLineLength: maxLineLength,
+ lb := &LogBuffer{
+ content: make([]Line, size),
}
+ lb.LineBuffer = NewLineBuffer(maxLineLength, lb.lineCallback)
+ return lb
}
-func (b *LogBuffer) writeLimited(newData []byte) {
- builder := &b.currentLineBuilder
- b.currentLineWrittenLength += len(newData)
- if builder.Len()+len(newData) > b.maxLineLength {
- builder.Write(newData[:b.maxLineLength-builder.Len()])
- } else {
- builder.Write(newData)
- }
-}
-
-func (b *LogBuffer) commitLine() {
- b.content[b.length%len(b.content)] = Line{
- Data: b.currentLineBuilder.String(),
- OriginalLength: b.currentLineWrittenLength}
- b.length++
- b.currentLineBuilder.Reset()
- b.currentLineWrittenLength = 0
-}
-
-func (b *LogBuffer) Write(data []byte) (int, error) {
- var pos int = 0
-
+func (b *LogBuffer) lineCallback(line *Line) {
b.mu.Lock()
defer b.mu.Unlock()
- for {
- nextNewline := bytes.IndexRune(data[pos:], '\n')
-
- // No newline in the data, write everything to the current line
- if nextNewline == -1 {
- b.writeLimited(data[pos:])
- break
- }
-
- // Write this line and update position
- b.writeLimited(data[pos : pos+nextNewline])
- b.commitLine()
- pos += nextNewline + 1
-
- // Data ends with a newline, stop now without writing an empty line
- if nextNewline == len(data)-1 {
- break
- }
- }
- return len(data), nil
+ b.content[b.length%len(b.content)] = *line
+ b.length++
}
// capToContentLength caps the number of requested lines to what is actually available
@@ -128,6 +81,8 @@
// ReadLinesTruncated works exactly the same as ReadLines, but adds an ellipsis at the end of every
// line that was truncated because it was over MaxLineLength
func (b *LogBuffer) ReadLinesTruncated(n int, ellipsis string) []string {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
// This does not use ReadLines() to prevent excessive reference copying and associated GC pressure
// since it could process a lot of lines.
@@ -136,11 +91,7 @@
outArray := make([]string, n)
for i := 1; i <= n; i++ {
line := b.content[(b.length-i)%len(b.content)]
- if line.OriginalLength > b.maxLineLength {
- outArray[n-i] = line.Data + ellipsis
- } else {
- outArray[n-i] = line.Data
- }
+ outArray[n-i] = line.String()
}
return outArray
}