logbuffer: split out LineBuffer

We want to be able to use similar line-oriented buffering in LogTree.
Rather than repeat ourselves, let's fact this out into a nice little
library.

Test Plan: Covered by existing logbuffer tests, added some extra linebuffer-specific ones.

X-Origin-Diff: phab/D636
GitOrigin-RevId: 38e832d323ed9f1723feaa9f9169caad18619e55
diff --git a/core/pkg/logbuffer/BUILD.bazel b/core/pkg/logbuffer/BUILD.bazel
index a53cb7a..fb7512a 100644
--- a/core/pkg/logbuffer/BUILD.bazel
+++ b/core/pkg/logbuffer/BUILD.bazel
@@ -2,14 +2,20 @@
 
 go_library(
     name = "go_default_library",
-    srcs = ["logbuffer.go"],
+    srcs = [
+        "linebuffer.go",
+        "logbuffer.go",
+    ],
     importpath = "git.monogon.dev/source/nexantic.git/core/pkg/logbuffer",
     visibility = ["//visibility:public"],
 )
 
 go_test(
     name = "go_default_test",
-    srcs = ["logbuffer_test.go"],
+    srcs = [
+        "linebuffer_test.go",
+        "logbuffer_test.go",
+    ],
     embed = [":go_default_library"],
     deps = ["@com_github_stretchr_testify//require:go_default_library"],
 )
diff --git a/core/pkg/logbuffer/linebuffer.go b/core/pkg/logbuffer/linebuffer.go
new file mode 100644
index 0000000..6ee7d6b
--- /dev/null
+++ b/core/pkg/logbuffer/linebuffer.go
@@ -0,0 +1,135 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+	"bytes"
+	"fmt"
+	"strings"
+	"sync"
+)
+
+// Line is a line stored in the log buffer - a string, that has been perhaps truncated (due to exceeded limits).
+type Line struct {
+	Data           string
+	OriginalLength int
+}
+
+// Truncated returns whether this line has been truncated to fit limits.
+func (l *Line) Truncated() bool {
+	return l.OriginalLength > len(l.Data)
+}
+
+// String returns the line with an ellipsis at the end (...) if the line has been truncated, or the original line
+// otherwise.
+func (l *Line) String() string {
+	if l.Truncated() {
+		return l.Data + "..."
+	}
+	return l.Data
+}
+
+// LineBuffer is a io.WriteCloser that will call a given callback every time a line is completed.
+type LineBuffer struct {
+	maxLineLength int
+	cb            LineBufferCallback
+
+	mu  sync.Mutex
+	cur strings.Builder
+	// length is the length of the line currently being written - this will continue to increase, even if the string
+	// exceeds maxLineLength.
+	length int
+	closed bool
+}
+
+// LineBufferCallback is a callback that will get called any time the line is completed. The function must not cause another
+// write to the LineBuffer, or the program will deadlock.
+type LineBufferCallback func(*Line)
+
+// NewLineBuffer creates a new LineBuffer with a given line length limit and callback.
+func NewLineBuffer(maxLineLength int, cb LineBufferCallback) *LineBuffer {
+	return &LineBuffer{
+		maxLineLength: maxLineLength,
+		cb:            cb,
+	}
+}
+
+// writeLimited writes to the internal buffer, making sure that its size does not exceed the maxLineLength.
+func (l *LineBuffer) writeLimited(data []byte) {
+	l.length += len(data)
+	if l.cur.Len()+len(data) > l.maxLineLength {
+		data = data[:l.maxLineLength-l.cur.Len()]
+	}
+	l.cur.Write(data)
+}
+
+// comitLine calls the callback and resets the builder.
+func (l *LineBuffer) commitLine() {
+	l.cb(&Line{
+		Data:           l.cur.String(),
+		OriginalLength: l.length,
+	})
+	l.cur.Reset()
+	l.length = 0
+}
+
+func (l *LineBuffer) Write(data []byte) (int, error) {
+	var pos = 0
+
+	l.mu.Lock()
+	defer l.mu.Unlock()
+
+	if l.closed {
+		return 0, fmt.Errorf("closed")
+	}
+
+	for {
+		nextNewline := bytes.IndexRune(data[pos:], '\n')
+
+		// No newline in the data, write everything to the current line
+		if nextNewline == -1 {
+			l.writeLimited(data[pos:])
+			break
+		}
+
+		// Write this line and update position
+		l.writeLimited(data[pos : pos+nextNewline])
+		l.commitLine()
+		pos += nextNewline + 1
+
+		// Data ends with a newline, stop now without writing an empty line
+		if nextNewline == len(data)-1 {
+			break
+		}
+	}
+	return len(data), nil
+}
+
+// Close will emit any leftover data in the buffer to the callback. Subsequent calls to Write will fail. Subsequent calls to Close
+// will also fail.
+func (l *LineBuffer) Close() error {
+	if l.closed {
+		return fmt.Errorf("already closed")
+	}
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	l.closed = true
+	if l.length > 0 {
+		l.commitLine()
+	}
+	return nil
+}
diff --git a/core/pkg/logbuffer/linebuffer_test.go b/core/pkg/logbuffer/linebuffer_test.go
new file mode 100644
index 0000000..c821a4b
--- /dev/null
+++ b/core/pkg/logbuffer/linebuffer_test.go
@@ -0,0 +1,75 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+	"fmt"
+	"testing"
+)
+
+func TestLineBuffer(t *testing.T) {
+	var lines []*Line
+	lb := NewLineBuffer(1024, func(l *Line) {
+		lines = append(lines, l)
+	})
+
+	compare := func(a []*Line, b ...string) string {
+		msg := fmt.Sprintf("want %v, got %v", a, b)
+		if len(a) != len(b) {
+			return msg
+		}
+		for i, _ := range a {
+			if a[i].String() != b[i] {
+				return msg
+			}
+		}
+		return ""
+	}
+
+	// Write some data.
+	fmt.Fprintf(lb, "foo ")
+	if diff := compare(lines); diff != "" {
+		t.Fatal(diff)
+	}
+	fmt.Fprintf(lb, "bar\n")
+	if diff := compare(lines, "foo bar"); diff != "" {
+		t.Fatal(diff)
+	}
+	fmt.Fprintf(lb, "baz")
+	if diff := compare(lines, "foo bar"); diff != "" {
+		t.Fatal(diff)
+	}
+	fmt.Fprintf(lb, " baz")
+	if diff := compare(lines, "foo bar"); diff != "" {
+		t.Fatal(diff)
+	}
+	// Close and expect flush.
+	if err := lb.Close(); err != nil {
+		t.Fatalf("Close: %v", err)
+	}
+	if diff := compare(lines, "foo bar", "baz baz"); diff != "" {
+		t.Fatal(diff)
+	}
+
+	// Check behaviour after close
+	if _, err := fmt.Fprintf(lb, "nope"); err == nil {
+		t.Fatalf("Write after Close: wanted  error, got nil")
+	}
+	if err := lb.Close(); err == nil {
+		t.Fatalf("second Close: wanted error, got nil")
+	}
+}
diff --git a/core/pkg/logbuffer/logbuffer.go b/core/pkg/logbuffer/logbuffer.go
index 8298deb..ce47816 100644
--- a/core/pkg/logbuffer/logbuffer.go
+++ b/core/pkg/logbuffer/logbuffer.go
@@ -21,79 +21,32 @@
 package logbuffer
 
 import (
-	"bytes"
-	"strings"
 	"sync"
 )
 
 // LogBuffer implements a fixed-size in-memory ring buffer for line-separated logs
 type LogBuffer struct {
-	mu            sync.RWMutex
-	maxLineLength int
-	content       []Line
-	length        int
-
-	currentLineBuilder       strings.Builder
-	currentLineWrittenLength int
+	mu      sync.RWMutex
+	content []Line
+	length  int
+	*LineBuffer
 }
 
-type Line struct {
-	Data           string
-	OriginalLength int
-}
-
+// New creates a new LogBuffer with a given ringbuffer size and maximum line length.
 func New(size, maxLineLength int) *LogBuffer {
-	return &LogBuffer{
-		content:       make([]Line, size),
-		maxLineLength: maxLineLength,
+	lb := &LogBuffer{
+		content: make([]Line, size),
 	}
+	lb.LineBuffer = NewLineBuffer(maxLineLength, lb.lineCallback)
+	return lb
 }
 
-func (b *LogBuffer) writeLimited(newData []byte) {
-	builder := &b.currentLineBuilder
-	b.currentLineWrittenLength += len(newData)
-	if builder.Len()+len(newData) > b.maxLineLength {
-		builder.Write(newData[:b.maxLineLength-builder.Len()])
-	} else {
-		builder.Write(newData)
-	}
-}
-
-func (b *LogBuffer) commitLine() {
-	b.content[b.length%len(b.content)] = Line{
-		Data:           b.currentLineBuilder.String(),
-		OriginalLength: b.currentLineWrittenLength}
-	b.length++
-	b.currentLineBuilder.Reset()
-	b.currentLineWrittenLength = 0
-}
-
-func (b *LogBuffer) Write(data []byte) (int, error) {
-	var pos int = 0
-
+func (b *LogBuffer) lineCallback(line *Line) {
 	b.mu.Lock()
 	defer b.mu.Unlock()
 
-	for {
-		nextNewline := bytes.IndexRune(data[pos:], '\n')
-
-		// No newline in the data, write everything to the current line
-		if nextNewline == -1 {
-			b.writeLimited(data[pos:])
-			break
-		}
-
-		// Write this line and update position
-		b.writeLimited(data[pos : pos+nextNewline])
-		b.commitLine()
-		pos += nextNewline + 1
-
-		// Data ends with a newline, stop now without writing an empty line
-		if nextNewline == len(data)-1 {
-			break
-		}
-	}
-	return len(data), nil
+	b.content[b.length%len(b.content)] = *line
+	b.length++
 }
 
 // capToContentLength caps the number of requested lines to what is actually available
@@ -128,6 +81,8 @@
 // ReadLinesTruncated works exactly the same as ReadLines, but adds an ellipsis at the end of every
 // line that was truncated because it was over MaxLineLength
 func (b *LogBuffer) ReadLinesTruncated(n int, ellipsis string) []string {
+	b.mu.RLock()
+	defer b.mu.RUnlock()
 	// This does not use ReadLines() to prevent excessive reference copying and associated GC pressure
 	// since it could process a lot of lines.
 
@@ -136,11 +91,7 @@
 	outArray := make([]string, n)
 	for i := 1; i <= n; i++ {
 		line := b.content[(b.length-i)%len(b.content)]
-		if line.OriginalLength > b.maxLineLength {
-			outArray[n-i] = line.Data + ellipsis
-		} else {
-			outArray[n-i] = line.Data
-		}
+		outArray[n-i] = line.String()
 	}
 	return outArray
 }