treewide: introduce osbase package and move things around
All except localregistry moved from metropolis/pkg to osbase,
localregistry moved to metropolis/test as its only used there anyway.
Change-Id: If1a4bf377364bef0ac23169e1b90379c71b06d72
Reviewed-on: https://review.monogon.dev/c/monogon/+/3079
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/osbase/logbuffer/linebuffer.go b/osbase/logbuffer/linebuffer.go
new file mode 100644
index 0000000..2195ec3
--- /dev/null
+++ b/osbase/logbuffer/linebuffer.go
@@ -0,0 +1,169 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+ "bytes"
+ "fmt"
+ "strings"
+ "sync"
+
+ lpb "source.monogon.dev/osbase/logtree/proto"
+)
+
+// Line is a line stored in the log buffer - a string, that has been perhaps
+// truncated (due to exceeded limits).
+type Line struct {
+ Data string
+ OriginalLength int
+}
+
+// Truncated returns whether this line has been truncated to fit limits.
+func (l *Line) Truncated() bool {
+ return l.OriginalLength > len(l.Data)
+}
+
+// String returns the line with an ellipsis at the end (...) if the line has been
+// truncated, or the original line otherwise.
+func (l *Line) String() string {
+ if l.Truncated() {
+ return l.Data + "..."
+ }
+ return l.Data
+}
+
+// ProtoLog returns a Logging-specific protobuf structure.
+func (l *Line) ProtoLog() *lpb.LogEntry_Raw {
+ return &lpb.LogEntry_Raw{
+ Data: l.Data,
+ OriginalLength: int64(l.OriginalLength),
+ }
+}
+
+// LineFromLogProto converts a Logging-specific protobuf message back into a Line.
+func LineFromLogProto(raw *lpb.LogEntry_Raw) (*Line, error) {
+ if raw.OriginalLength < int64(len(raw.Data)) {
+ return nil, fmt.Errorf("original_length smaller than length of data")
+ }
+ originalLength := int(raw.OriginalLength)
+ if int64(originalLength) < raw.OriginalLength {
+ return nil, fmt.Errorf("original_length larger than native int size")
+ }
+ return &Line{
+ Data: raw.Data,
+ OriginalLength: originalLength,
+ }, nil
+}
+
+// LineBuffer is a io.WriteCloser that will call a given callback every time a line
+// is completed.
+type LineBuffer struct {
+ maxLineLength int
+ cb LineBufferCallback
+
+ mu sync.Mutex
+ cur strings.Builder
+ // length is the length of the line currently being written - this will continue to
+ // increase, even if the string exceeds maxLineLength.
+ length int
+ closed bool
+}
+
+// LineBufferCallback is a callback that will get called any time the line is
+// completed. The function must not cause another write to the LineBuffer, or the
+// program will deadlock.
+type LineBufferCallback func(*Line)
+
+// NewLineBuffer creates a new LineBuffer with a given line length limit and
+// callback.
+func NewLineBuffer(maxLineLength int, cb LineBufferCallback) *LineBuffer {
+ return &LineBuffer{
+ maxLineLength: maxLineLength,
+ cb: cb,
+ }
+}
+
+// writeLimited writes to the internal buffer, making sure that its size does not
+// exceed the maxLineLength.
+func (l *LineBuffer) writeLimited(data []byte) {
+ l.length += len(data)
+ if l.cur.Len()+len(data) > l.maxLineLength {
+ data = data[:l.maxLineLength-l.cur.Len()]
+ }
+ l.cur.Write(data)
+}
+
+// comitLine calls the callback and resets the builder.
+func (l *LineBuffer) commitLine() {
+ l.cb(&Line{
+ Data: l.cur.String(),
+ OriginalLength: l.length,
+ })
+ l.cur.Reset()
+ l.length = 0
+}
+
+func (l *LineBuffer) Write(data []byte) (int, error) {
+ var pos = 0
+
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ if l.closed {
+ return 0, fmt.Errorf("closed")
+ }
+
+ for {
+ nextNewline := bytes.IndexRune(data[pos:], '\n')
+
+ // No newline in the data, write everything to the current line
+ if nextNewline == -1 {
+ l.writeLimited(data[pos:])
+ break
+ }
+
+ // Write this line and update position
+ l.writeLimited(data[pos : pos+nextNewline])
+ l.commitLine()
+ pos += nextNewline + 1
+
+ // Data ends with a newline, stop now without writing an empty line
+ if nextNewline == len(data)-1 {
+ break
+ }
+ }
+ return len(data), nil
+}
+
+// Close will emit any leftover data in the buffer to the callback. Subsequent
+// calls to Write will fail. Subsequent calls to Close will also fail.
+func (l *LineBuffer) Close() error {
+ if l.closed {
+ return fmt.Errorf("already closed")
+ }
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ l.closed = true
+ if l.length > 0 {
+ l.commitLine()
+ }
+ return nil
+}
+
+func (l *LineBuffer) Sync() error {
+ return nil
+}