logbuffer: split out LineBuffer

We want to be able to use similar line-oriented buffering in LogTree.
Rather than repeat ourselves, let's fact this out into a nice little
library.

Test Plan: Covered by existing logbuffer tests, added some extra linebuffer-specific ones.

X-Origin-Diff: phab/D636
GitOrigin-RevId: 38e832d323ed9f1723feaa9f9169caad18619e55
diff --git a/core/pkg/logbuffer/linebuffer.go b/core/pkg/logbuffer/linebuffer.go
new file mode 100644
index 0000000..6ee7d6b
--- /dev/null
+++ b/core/pkg/logbuffer/linebuffer.go
@@ -0,0 +1,135 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+	"bytes"
+	"fmt"
+	"strings"
+	"sync"
+)
+
+// Line is a line stored in the log buffer - a string, that has been perhaps truncated (due to exceeded limits).
+type Line struct {
+	Data           string
+	OriginalLength int
+}
+
+// Truncated returns whether this line has been truncated to fit limits.
+func (l *Line) Truncated() bool {
+	return l.OriginalLength > len(l.Data)
+}
+
+// String returns the line with an ellipsis at the end (...) if the line has been truncated, or the original line
+// otherwise.
+func (l *Line) String() string {
+	if l.Truncated() {
+		return l.Data + "..."
+	}
+	return l.Data
+}
+
+// LineBuffer is a io.WriteCloser that will call a given callback every time a line is completed.
+type LineBuffer struct {
+	maxLineLength int
+	cb            LineBufferCallback
+
+	mu  sync.Mutex
+	cur strings.Builder
+	// length is the length of the line currently being written - this will continue to increase, even if the string
+	// exceeds maxLineLength.
+	length int
+	closed bool
+}
+
+// LineBufferCallback is a callback that will get called any time the line is completed. The function must not cause another
+// write to the LineBuffer, or the program will deadlock.
+type LineBufferCallback func(*Line)
+
+// NewLineBuffer creates a new LineBuffer with a given line length limit and callback.
+func NewLineBuffer(maxLineLength int, cb LineBufferCallback) *LineBuffer {
+	return &LineBuffer{
+		maxLineLength: maxLineLength,
+		cb:            cb,
+	}
+}
+
+// writeLimited writes to the internal buffer, making sure that its size does not exceed the maxLineLength.
+func (l *LineBuffer) writeLimited(data []byte) {
+	l.length += len(data)
+	if l.cur.Len()+len(data) > l.maxLineLength {
+		data = data[:l.maxLineLength-l.cur.Len()]
+	}
+	l.cur.Write(data)
+}
+
+// comitLine calls the callback and resets the builder.
+func (l *LineBuffer) commitLine() {
+	l.cb(&Line{
+		Data:           l.cur.String(),
+		OriginalLength: l.length,
+	})
+	l.cur.Reset()
+	l.length = 0
+}
+
+func (l *LineBuffer) Write(data []byte) (int, error) {
+	var pos = 0
+
+	l.mu.Lock()
+	defer l.mu.Unlock()
+
+	if l.closed {
+		return 0, fmt.Errorf("closed")
+	}
+
+	for {
+		nextNewline := bytes.IndexRune(data[pos:], '\n')
+
+		// No newline in the data, write everything to the current line
+		if nextNewline == -1 {
+			l.writeLimited(data[pos:])
+			break
+		}
+
+		// Write this line and update position
+		l.writeLimited(data[pos : pos+nextNewline])
+		l.commitLine()
+		pos += nextNewline + 1
+
+		// Data ends with a newline, stop now without writing an empty line
+		if nextNewline == len(data)-1 {
+			break
+		}
+	}
+	return len(data), nil
+}
+
+// Close will emit any leftover data in the buffer to the callback. Subsequent calls to Write will fail. Subsequent calls to Close
+// will also fail.
+func (l *LineBuffer) Close() error {
+	if l.closed {
+		return fmt.Errorf("already closed")
+	}
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	l.closed = true
+	if l.length > 0 {
+		l.commitLine()
+	}
+	return nil
+}