| // Copyright 2020 The Monogon Project Authors. | 
 | // | 
 | // SPDX-License-Identifier: Apache-2.0 | 
 | // | 
 | // Licensed under the Apache License, Version 2.0 (the "License"); | 
 | // you may not use this file except in compliance with the License. | 
 | // You may obtain a copy of the License at | 
 | // | 
 | //     http://www.apache.org/licenses/LICENSE-2.0 | 
 | // | 
 | // Unless required by applicable law or agreed to in writing, software | 
 | // distributed under the License is distributed on an "AS IS" BASIS, | 
 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | // See the License for the specific language governing permissions and | 
 | // limitations under the License. | 
 |  | 
 | package logbuffer | 
 |  | 
 | import ( | 
 | 	"bytes" | 
 | 	"fmt" | 
 | 	"strings" | 
 | 	"sync" | 
 |  | 
 | 	apb "source.monogon.dev/metropolis/proto/api" | 
 | ) | 
 |  | 
 | // Line is a line stored in the log buffer - a string, that has been perhaps truncated (due to exceeded limits). | 
 | type Line struct { | 
 | 	Data           string | 
 | 	OriginalLength int | 
 | } | 
 |  | 
 | // Truncated returns whether this line has been truncated to fit limits. | 
 | func (l *Line) Truncated() bool { | 
 | 	return l.OriginalLength > len(l.Data) | 
 | } | 
 |  | 
 | // String returns the line with an ellipsis at the end (...) if the line has been truncated, or the original line | 
 | // otherwise. | 
 | func (l *Line) String() string { | 
 | 	if l.Truncated() { | 
 | 		return l.Data + "..." | 
 | 	} | 
 | 	return l.Data | 
 | } | 
 |  | 
 | // ProtoLog returns a Logging-specific protobuf structure. | 
 | func (l *Line) ProtoLog() *apb.LogEntry_Raw { | 
 | 	return &apb.LogEntry_Raw{ | 
 | 		Data:           l.Data, | 
 | 		OriginalLength: int64(l.OriginalLength), | 
 | 	} | 
 | } | 
 |  | 
 | // LineFromLogProto converts a Logging-specific protobuf message back into a Line. | 
 | func LineFromLogProto(raw *apb.LogEntry_Raw) (*Line, error) { | 
 | 	if raw.OriginalLength < int64(len(raw.Data)) { | 
 | 		return nil, fmt.Errorf("original_length smaller than length of data") | 
 | 	} | 
 | 	originalLength := int(raw.OriginalLength) | 
 | 	if int64(originalLength) < raw.OriginalLength { | 
 | 		return nil, fmt.Errorf("original_length larger than native int size") | 
 | 	} | 
 | 	return &Line{ | 
 | 		Data:           raw.Data, | 
 | 		OriginalLength: originalLength, | 
 | 	}, nil | 
 | } | 
 |  | 
 | // LineBuffer is a io.WriteCloser that will call a given callback every time a line is completed. | 
 | type LineBuffer struct { | 
 | 	maxLineLength int | 
 | 	cb            LineBufferCallback | 
 |  | 
 | 	mu  sync.Mutex | 
 | 	cur strings.Builder | 
 | 	// length is the length of the line currently being written - this will continue to increase, even if the string | 
 | 	// exceeds maxLineLength. | 
 | 	length int | 
 | 	closed bool | 
 | } | 
 |  | 
 | // LineBufferCallback is a callback that will get called any time the line is completed. The function must not cause another | 
 | // write to the LineBuffer, or the program will deadlock. | 
 | type LineBufferCallback func(*Line) | 
 |  | 
 | // NewLineBuffer creates a new LineBuffer with a given line length limit and callback. | 
 | func NewLineBuffer(maxLineLength int, cb LineBufferCallback) *LineBuffer { | 
 | 	return &LineBuffer{ | 
 | 		maxLineLength: maxLineLength, | 
 | 		cb:            cb, | 
 | 	} | 
 | } | 
 |  | 
 | // writeLimited writes to the internal buffer, making sure that its size does not exceed the maxLineLength. | 
 | func (l *LineBuffer) writeLimited(data []byte) { | 
 | 	l.length += len(data) | 
 | 	if l.cur.Len()+len(data) > l.maxLineLength { | 
 | 		data = data[:l.maxLineLength-l.cur.Len()] | 
 | 	} | 
 | 	l.cur.Write(data) | 
 | } | 
 |  | 
 | // comitLine calls the callback and resets the builder. | 
 | func (l *LineBuffer) commitLine() { | 
 | 	l.cb(&Line{ | 
 | 		Data:           l.cur.String(), | 
 | 		OriginalLength: l.length, | 
 | 	}) | 
 | 	l.cur.Reset() | 
 | 	l.length = 0 | 
 | } | 
 |  | 
 | func (l *LineBuffer) Write(data []byte) (int, error) { | 
 | 	var pos = 0 | 
 |  | 
 | 	l.mu.Lock() | 
 | 	defer l.mu.Unlock() | 
 |  | 
 | 	if l.closed { | 
 | 		return 0, fmt.Errorf("closed") | 
 | 	} | 
 |  | 
 | 	for { | 
 | 		nextNewline := bytes.IndexRune(data[pos:], '\n') | 
 |  | 
 | 		// No newline in the data, write everything to the current line | 
 | 		if nextNewline == -1 { | 
 | 			l.writeLimited(data[pos:]) | 
 | 			break | 
 | 		} | 
 |  | 
 | 		// Write this line and update position | 
 | 		l.writeLimited(data[pos : pos+nextNewline]) | 
 | 		l.commitLine() | 
 | 		pos += nextNewline + 1 | 
 |  | 
 | 		// Data ends with a newline, stop now without writing an empty line | 
 | 		if nextNewline == len(data)-1 { | 
 | 			break | 
 | 		} | 
 | 	} | 
 | 	return len(data), nil | 
 | } | 
 |  | 
 | // Close will emit any leftover data in the buffer to the callback. Subsequent calls to Write will fail. Subsequent calls to Close | 
 | // will also fail. | 
 | func (l *LineBuffer) Close() error { | 
 | 	if l.closed { | 
 | 		return fmt.Errorf("already closed") | 
 | 	} | 
 | 	l.mu.Lock() | 
 | 	defer l.mu.Unlock() | 
 | 	l.closed = true | 
 | 	if l.length > 0 { | 
 | 		l.commitLine() | 
 | 	} | 
 | 	return nil | 
 | } |