core -> metropolis

Smalltown is now called Metropolis!

This is the first commit in a series of cleanup commits that prepare us
for an open source release. This one just some Bazel packages around to
follow a stricter directory layout.

All of Metropolis now lives in `//metropolis`.

All of Metropolis Node code now lives in `//metropolis/node`.

All of the main /init now lives in `//m/n/core`.

All of the Kubernetes functionality/glue now lives in `//m/n/kubernetes`.

Next steps:
     - hunt down all references to Smalltown and replace them appropriately
     - narrow down visibility rules
     - document new code organization
     - move `//build/toolchain` to `//monogon/build/toolchain`
     - do another cleanup pass between `//golibs` and
       `//monogon/node/{core,common}`.
     - remove `//delta` and `//anubis`

Fixes T799.

Test Plan: Just a very large refactor. CI should help us out here.

Bug: T799

X-Origin-Diff: phab/D667
GitOrigin-RevId: 6029b8d4edc42325d50042596b639e8b122d0ded
diff --git a/metropolis/node/common/logbuffer/linebuffer.go b/metropolis/node/common/logbuffer/linebuffer.go
new file mode 100644
index 0000000..246a91b
--- /dev/null
+++ b/metropolis/node/common/logbuffer/linebuffer.go
@@ -0,0 +1,160 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+	"bytes"
+	"fmt"
+	"strings"
+	"sync"
+
+	apb "git.monogon.dev/source/nexantic.git/metropolis/proto/api"
+)
+
+// Line is a line stored in the log buffer - a string, that has been perhaps truncated (due to exceeded limits).
+type Line struct {
+	Data           string
+	OriginalLength int
+}
+
+// Truncated returns whether this line has been truncated to fit limits.
+func (l *Line) Truncated() bool {
+	return l.OriginalLength > len(l.Data)
+}
+
+// String returns the line with an ellipsis at the end (...) if the line has been truncated, or the original line
+// otherwise.
+func (l *Line) String() string {
+	if l.Truncated() {
+		return l.Data + "..."
+	}
+	return l.Data
+}
+
+// ProtoLog returns a Logging-specific protobuf structure.
+func (l *Line) ProtoLog() *apb.LogEntry_Raw {
+	return &apb.LogEntry_Raw{
+		Data:           l.Data,
+		OriginalLength: int64(l.OriginalLength),
+	}
+}
+
+// LineFromLogProto converts a Logging-specific protobuf message back into a Line.
+func LineFromLogProto(raw *apb.LogEntry_Raw) (*Line, error) {
+	if raw.OriginalLength < int64(len(raw.Data)) {
+		return nil, fmt.Errorf("original_length smaller than length of data")
+	}
+	originalLength := int(raw.OriginalLength)
+	if int64(originalLength) < raw.OriginalLength {
+		return nil, fmt.Errorf("original_length larger than native int size")
+	}
+	return &Line{
+		Data:           raw.Data,
+		OriginalLength: originalLength,
+	}, nil
+}
+
+// LineBuffer is a io.WriteCloser that will call a given callback every time a line is completed.
+type LineBuffer struct {
+	maxLineLength int
+	cb            LineBufferCallback
+
+	mu  sync.Mutex
+	cur strings.Builder
+	// length is the length of the line currently being written - this will continue to increase, even if the string
+	// exceeds maxLineLength.
+	length int
+	closed bool
+}
+
+// LineBufferCallback is a callback that will get called any time the line is completed. The function must not cause another
+// write to the LineBuffer, or the program will deadlock.
+type LineBufferCallback func(*Line)
+
+// NewLineBuffer creates a new LineBuffer with a given line length limit and callback.
+func NewLineBuffer(maxLineLength int, cb LineBufferCallback) *LineBuffer {
+	return &LineBuffer{
+		maxLineLength: maxLineLength,
+		cb:            cb,
+	}
+}
+
+// writeLimited writes to the internal buffer, making sure that its size does not exceed the maxLineLength.
+func (l *LineBuffer) writeLimited(data []byte) {
+	l.length += len(data)
+	if l.cur.Len()+len(data) > l.maxLineLength {
+		data = data[:l.maxLineLength-l.cur.Len()]
+	}
+	l.cur.Write(data)
+}
+
+// comitLine calls the callback and resets the builder.
+func (l *LineBuffer) commitLine() {
+	l.cb(&Line{
+		Data:           l.cur.String(),
+		OriginalLength: l.length,
+	})
+	l.cur.Reset()
+	l.length = 0
+}
+
+func (l *LineBuffer) Write(data []byte) (int, error) {
+	var pos = 0
+
+	l.mu.Lock()
+	defer l.mu.Unlock()
+
+	if l.closed {
+		return 0, fmt.Errorf("closed")
+	}
+
+	for {
+		nextNewline := bytes.IndexRune(data[pos:], '\n')
+
+		// No newline in the data, write everything to the current line
+		if nextNewline == -1 {
+			l.writeLimited(data[pos:])
+			break
+		}
+
+		// Write this line and update position
+		l.writeLimited(data[pos : pos+nextNewline])
+		l.commitLine()
+		pos += nextNewline + 1
+
+		// Data ends with a newline, stop now without writing an empty line
+		if nextNewline == len(data)-1 {
+			break
+		}
+	}
+	return len(data), nil
+}
+
+// Close will emit any leftover data in the buffer to the callback. Subsequent calls to Write will fail. Subsequent calls to Close
+// will also fail.
+func (l *LineBuffer) Close() error {
+	if l.closed {
+		return fmt.Errorf("already closed")
+	}
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	l.closed = true
+	if l.length > 0 {
+		l.commitLine()
+	}
+	return nil
+}