treewide: introduce osbase package and move things around

All except localregistry moved from metropolis/pkg to osbase,
localregistry moved to metropolis/test as its only used there anyway.

Change-Id: If1a4bf377364bef0ac23169e1b90379c71b06d72
Reviewed-on: https://review.monogon.dev/c/monogon/+/3079
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/osbase/logbuffer/linebuffer.go b/osbase/logbuffer/linebuffer.go
new file mode 100644
index 0000000..2195ec3
--- /dev/null
+++ b/osbase/logbuffer/linebuffer.go
@@ -0,0 +1,169 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+	"bytes"
+	"fmt"
+	"strings"
+	"sync"
+
+	lpb "source.monogon.dev/osbase/logtree/proto"
+)
+
+// Line is a line stored in the log buffer - a string, that has been perhaps
+// truncated (due to exceeded limits).
+type Line struct {
+	Data           string
+	OriginalLength int
+}
+
+// Truncated returns whether this line has been truncated to fit limits.
+func (l *Line) Truncated() bool {
+	return l.OriginalLength > len(l.Data)
+}
+
+// String returns the line with an ellipsis at the end (...) if the line has been
+// truncated, or the original line otherwise.
+func (l *Line) String() string {
+	if l.Truncated() {
+		return l.Data + "..."
+	}
+	return l.Data
+}
+
+// ProtoLog returns a Logging-specific protobuf structure.
+func (l *Line) ProtoLog() *lpb.LogEntry_Raw {
+	return &lpb.LogEntry_Raw{
+		Data:           l.Data,
+		OriginalLength: int64(l.OriginalLength),
+	}
+}
+
+// LineFromLogProto converts a Logging-specific protobuf message back into a Line.
+func LineFromLogProto(raw *lpb.LogEntry_Raw) (*Line, error) {
+	if raw.OriginalLength < int64(len(raw.Data)) {
+		return nil, fmt.Errorf("original_length smaller than length of data")
+	}
+	originalLength := int(raw.OriginalLength)
+	if int64(originalLength) < raw.OriginalLength {
+		return nil, fmt.Errorf("original_length larger than native int size")
+	}
+	return &Line{
+		Data:           raw.Data,
+		OriginalLength: originalLength,
+	}, nil
+}
+
+// LineBuffer is a io.WriteCloser that will call a given callback every time a line
+// is completed.
+type LineBuffer struct {
+	maxLineLength int
+	cb            LineBufferCallback
+
+	mu  sync.Mutex
+	cur strings.Builder
+	// length is the length of the line currently being written - this will continue to
+	// increase, even if the string exceeds maxLineLength.
+	length int
+	closed bool
+}
+
+// LineBufferCallback is a callback that will get called any time the line is
+// completed. The function must not cause another write to the LineBuffer, or the
+// program will deadlock.
+type LineBufferCallback func(*Line)
+
+// NewLineBuffer creates a new LineBuffer with a given line length limit and
+// callback.
+func NewLineBuffer(maxLineLength int, cb LineBufferCallback) *LineBuffer {
+	return &LineBuffer{
+		maxLineLength: maxLineLength,
+		cb:            cb,
+	}
+}
+
+// writeLimited writes to the internal buffer, making sure that its size does not
+// exceed the maxLineLength.
+func (l *LineBuffer) writeLimited(data []byte) {
+	l.length += len(data)
+	if l.cur.Len()+len(data) > l.maxLineLength {
+		data = data[:l.maxLineLength-l.cur.Len()]
+	}
+	l.cur.Write(data)
+}
+
+// comitLine calls the callback and resets the builder.
+func (l *LineBuffer) commitLine() {
+	l.cb(&Line{
+		Data:           l.cur.String(),
+		OriginalLength: l.length,
+	})
+	l.cur.Reset()
+	l.length = 0
+}
+
+func (l *LineBuffer) Write(data []byte) (int, error) {
+	var pos = 0
+
+	l.mu.Lock()
+	defer l.mu.Unlock()
+
+	if l.closed {
+		return 0, fmt.Errorf("closed")
+	}
+
+	for {
+		nextNewline := bytes.IndexRune(data[pos:], '\n')
+
+		// No newline in the data, write everything to the current line
+		if nextNewline == -1 {
+			l.writeLimited(data[pos:])
+			break
+		}
+
+		// Write this line and update position
+		l.writeLimited(data[pos : pos+nextNewline])
+		l.commitLine()
+		pos += nextNewline + 1
+
+		// Data ends with a newline, stop now without writing an empty line
+		if nextNewline == len(data)-1 {
+			break
+		}
+	}
+	return len(data), nil
+}
+
+// Close will emit any leftover data in the buffer to the callback. Subsequent
+// calls to Write will fail. Subsequent calls to Close will also fail.
+func (l *LineBuffer) Close() error {
+	if l.closed {
+		return fmt.Errorf("already closed")
+	}
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	l.closed = true
+	if l.length > 0 {
+		l.commitLine()
+	}
+	return nil
+}
+
+func (l *LineBuffer) Sync() error {
+	return nil
+}