blob: 6fd9a628296c807d582d2ea75ff638d12488f64e [file] [log] [blame]
// Copyright 2020 The Monogon Project Authors.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logbuffer
import (
"bytes"
"fmt"
"strings"
"sync"
apb "source.monogon.dev/metropolis/proto/api"
)
// Line is a line stored in the log buffer - a string, that has been perhaps
// truncated (due to exceeded limits).
type Line struct {
Data string
OriginalLength int
}
// Truncated returns whether this line has been truncated to fit limits.
func (l *Line) Truncated() bool {
return l.OriginalLength > len(l.Data)
}
// String returns the line with an ellipsis at the end (...) if the line has been
// truncated, or the original line otherwise.
func (l *Line) String() string {
if l.Truncated() {
return l.Data + "..."
}
return l.Data
}
// ProtoLog returns a Logging-specific protobuf structure.
func (l *Line) ProtoLog() *apb.LogEntry_Raw {
return &apb.LogEntry_Raw{
Data: l.Data,
OriginalLength: int64(l.OriginalLength),
}
}
// LineFromLogProto converts a Logging-specific protobuf message back into a Line.
func LineFromLogProto(raw *apb.LogEntry_Raw) (*Line, error) {
if raw.OriginalLength < int64(len(raw.Data)) {
return nil, fmt.Errorf("original_length smaller than length of data")
}
originalLength := int(raw.OriginalLength)
if int64(originalLength) < raw.OriginalLength {
return nil, fmt.Errorf("original_length larger than native int size")
}
return &Line{
Data: raw.Data,
OriginalLength: originalLength,
}, nil
}
// LineBuffer is a io.WriteCloser that will call a given callback every time a line
// is completed.
type LineBuffer struct {
maxLineLength int
cb LineBufferCallback
mu sync.Mutex
cur strings.Builder
// length is the length of the line currently being written - this will continue to
// increase, even if the string exceeds maxLineLength.
length int
closed bool
}
// LineBufferCallback is a callback that will get called any time the line is
// completed. The function must not cause another write to the LineBuffer, or the
// program will deadlock.
type LineBufferCallback func(*Line)
// NewLineBuffer creates a new LineBuffer with a given line length limit and
// callback.
func NewLineBuffer(maxLineLength int, cb LineBufferCallback) *LineBuffer {
return &LineBuffer{
maxLineLength: maxLineLength,
cb: cb,
}
}
// writeLimited writes to the internal buffer, making sure that its size does not
// exceed the maxLineLength.
func (l *LineBuffer) writeLimited(data []byte) {
l.length += len(data)
if l.cur.Len()+len(data) > l.maxLineLength {
data = data[:l.maxLineLength-l.cur.Len()]
}
l.cur.Write(data)
}
// comitLine calls the callback and resets the builder.
func (l *LineBuffer) commitLine() {
l.cb(&Line{
Data: l.cur.String(),
OriginalLength: l.length,
})
l.cur.Reset()
l.length = 0
}
func (l *LineBuffer) Write(data []byte) (int, error) {
var pos = 0
l.mu.Lock()
defer l.mu.Unlock()
if l.closed {
return 0, fmt.Errorf("closed")
}
for {
nextNewline := bytes.IndexRune(data[pos:], '\n')
// No newline in the data, write everything to the current line
if nextNewline == -1 {
l.writeLimited(data[pos:])
break
}
// Write this line and update position
l.writeLimited(data[pos : pos+nextNewline])
l.commitLine()
pos += nextNewline + 1
// Data ends with a newline, stop now without writing an empty line
if nextNewline == len(data)-1 {
break
}
}
return len(data), nil
}
// Close will emit any leftover data in the buffer to the callback. Subsequent
// calls to Write will fail. Subsequent calls to Close will also fail.
func (l *LineBuffer) Close() error {
if l.closed {
return fmt.Errorf("already closed")
}
l.mu.Lock()
defer l.mu.Unlock()
l.closed = true
if l.length > 0 {
l.commitLine()
}
return nil
}