logbuffer: split out LineBuffer

We want to be able to use similar line-oriented buffering in LogTree.
Rather than repeat ourselves, let's fact this out into a nice little
library.

Test Plan: Covered by existing logbuffer tests, added some extra linebuffer-specific ones.

X-Origin-Diff: phab/D636
GitOrigin-RevId: 38e832d323ed9f1723feaa9f9169caad18619e55
diff --git a/core/pkg/logbuffer/logbuffer.go b/core/pkg/logbuffer/logbuffer.go
index 8298deb..ce47816 100644
--- a/core/pkg/logbuffer/logbuffer.go
+++ b/core/pkg/logbuffer/logbuffer.go
@@ -21,79 +21,32 @@
 package logbuffer
 
 import (
-	"bytes"
-	"strings"
 	"sync"
 )
 
 // LogBuffer implements a fixed-size in-memory ring buffer for line-separated logs
 type LogBuffer struct {
-	mu            sync.RWMutex
-	maxLineLength int
-	content       []Line
-	length        int
-
-	currentLineBuilder       strings.Builder
-	currentLineWrittenLength int
+	mu      sync.RWMutex
+	content []Line
+	length  int
+	*LineBuffer
 }
 
-type Line struct {
-	Data           string
-	OriginalLength int
-}
-
+// New creates a new LogBuffer with a given ringbuffer size and maximum line length.
 func New(size, maxLineLength int) *LogBuffer {
-	return &LogBuffer{
-		content:       make([]Line, size),
-		maxLineLength: maxLineLength,
+	lb := &LogBuffer{
+		content: make([]Line, size),
 	}
+	lb.LineBuffer = NewLineBuffer(maxLineLength, lb.lineCallback)
+	return lb
 }
 
-func (b *LogBuffer) writeLimited(newData []byte) {
-	builder := &b.currentLineBuilder
-	b.currentLineWrittenLength += len(newData)
-	if builder.Len()+len(newData) > b.maxLineLength {
-		builder.Write(newData[:b.maxLineLength-builder.Len()])
-	} else {
-		builder.Write(newData)
-	}
-}
-
-func (b *LogBuffer) commitLine() {
-	b.content[b.length%len(b.content)] = Line{
-		Data:           b.currentLineBuilder.String(),
-		OriginalLength: b.currentLineWrittenLength}
-	b.length++
-	b.currentLineBuilder.Reset()
-	b.currentLineWrittenLength = 0
-}
-
-func (b *LogBuffer) Write(data []byte) (int, error) {
-	var pos int = 0
-
+func (b *LogBuffer) lineCallback(line *Line) {
 	b.mu.Lock()
 	defer b.mu.Unlock()
 
-	for {
-		nextNewline := bytes.IndexRune(data[pos:], '\n')
-
-		// No newline in the data, write everything to the current line
-		if nextNewline == -1 {
-			b.writeLimited(data[pos:])
-			break
-		}
-
-		// Write this line and update position
-		b.writeLimited(data[pos : pos+nextNewline])
-		b.commitLine()
-		pos += nextNewline + 1
-
-		// Data ends with a newline, stop now without writing an empty line
-		if nextNewline == len(data)-1 {
-			break
-		}
-	}
-	return len(data), nil
+	b.content[b.length%len(b.content)] = *line
+	b.length++
 }
 
 // capToContentLength caps the number of requested lines to what is actually available
@@ -128,6 +81,8 @@
 // ReadLinesTruncated works exactly the same as ReadLines, but adds an ellipsis at the end of every
 // line that was truncated because it was over MaxLineLength
 func (b *LogBuffer) ReadLinesTruncated(n int, ellipsis string) []string {
+	b.mu.RLock()
+	defer b.mu.RUnlock()
 	// This does not use ReadLines() to prevent excessive reference copying and associated GC pressure
 	// since it could process a lot of lines.
 
@@ -136,11 +91,7 @@
 	outArray := make([]string, n)
 	for i := 1; i <= n; i++ {
 		line := b.content[(b.length-i)%len(b.content)]
-		if line.OriginalLength > b.maxLineLength {
-			outArray[n-i] = line.Data + ellipsis
-		} else {
-			outArray[n-i] = line.Data
-		}
+		outArray[n-i] = line.String()
 	}
 	return outArray
 }