core -> metropolis

Smalltown is now called Metropolis!

This is the first commit in a series of cleanup commits that prepare us
for an open source release. This one just some Bazel packages around to
follow a stricter directory layout.

All of Metropolis now lives in `//metropolis`.

All of Metropolis Node code now lives in `//metropolis/node`.

All of the main /init now lives in `//m/n/core`.

All of the Kubernetes functionality/glue now lives in `//m/n/kubernetes`.

Next steps:
     - hunt down all references to Smalltown and replace them appropriately
     - narrow down visibility rules
     - document new code organization
     - move `//build/toolchain` to `//monogon/build/toolchain`
     - do another cleanup pass between `//golibs` and
       `//monogon/node/{core,common}`.
     - remove `//delta` and `//anubis`

Fixes T799.

Test Plan: Just a very large refactor. CI should help us out here.

Bug: T799

X-Origin-Diff: phab/D667
GitOrigin-RevId: 6029b8d4edc42325d50042596b639e8b122d0ded
diff --git a/metropolis/node/common/logbuffer/BUILD.bazel b/metropolis/node/common/logbuffer/BUILD.bazel
new file mode 100644
index 0000000..2d4650d
--- /dev/null
+++ b/metropolis/node/common/logbuffer/BUILD.bazel
@@ -0,0 +1,22 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+    name = "go_default_library",
+    srcs = [
+        "linebuffer.go",
+        "logbuffer.go",
+    ],
+    importpath = "git.monogon.dev/source/nexantic.git/metropolis/node/common/logbuffer",
+    visibility = ["//visibility:public"],
+    deps = ["//metropolis/proto/api:go_default_library"],
+)
+
+go_test(
+    name = "go_default_test",
+    srcs = [
+        "linebuffer_test.go",
+        "logbuffer_test.go",
+    ],
+    embed = [":go_default_library"],
+    deps = ["@com_github_stretchr_testify//require:go_default_library"],
+)
diff --git a/metropolis/node/common/logbuffer/linebuffer.go b/metropolis/node/common/logbuffer/linebuffer.go
new file mode 100644
index 0000000..246a91b
--- /dev/null
+++ b/metropolis/node/common/logbuffer/linebuffer.go
@@ -0,0 +1,160 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+	"bytes"
+	"fmt"
+	"strings"
+	"sync"
+
+	apb "git.monogon.dev/source/nexantic.git/metropolis/proto/api"
+)
+
+// Line is a line stored in the log buffer - a string, that has been perhaps truncated (due to exceeded limits).
+type Line struct {
+	Data           string
+	OriginalLength int
+}
+
+// Truncated returns whether this line has been truncated to fit limits.
+func (l *Line) Truncated() bool {
+	return l.OriginalLength > len(l.Data)
+}
+
+// String returns the line with an ellipsis at the end (...) if the line has been truncated, or the original line
+// otherwise.
+func (l *Line) String() string {
+	if l.Truncated() {
+		return l.Data + "..."
+	}
+	return l.Data
+}
+
+// ProtoLog returns a Logging-specific protobuf structure.
+func (l *Line) ProtoLog() *apb.LogEntry_Raw {
+	return &apb.LogEntry_Raw{
+		Data:           l.Data,
+		OriginalLength: int64(l.OriginalLength),
+	}
+}
+
+// LineFromLogProto converts a Logging-specific protobuf message back into a Line.
+func LineFromLogProto(raw *apb.LogEntry_Raw) (*Line, error) {
+	if raw.OriginalLength < int64(len(raw.Data)) {
+		return nil, fmt.Errorf("original_length smaller than length of data")
+	}
+	originalLength := int(raw.OriginalLength)
+	if int64(originalLength) < raw.OriginalLength {
+		return nil, fmt.Errorf("original_length larger than native int size")
+	}
+	return &Line{
+		Data:           raw.Data,
+		OriginalLength: originalLength,
+	}, nil
+}
+
+// LineBuffer is a io.WriteCloser that will call a given callback every time a line is completed.
+type LineBuffer struct {
+	maxLineLength int
+	cb            LineBufferCallback
+
+	mu  sync.Mutex
+	cur strings.Builder
+	// length is the length of the line currently being written - this will continue to increase, even if the string
+	// exceeds maxLineLength.
+	length int
+	closed bool
+}
+
+// LineBufferCallback is a callback that will get called any time the line is completed. The function must not cause another
+// write to the LineBuffer, or the program will deadlock.
+type LineBufferCallback func(*Line)
+
+// NewLineBuffer creates a new LineBuffer with a given line length limit and callback.
+func NewLineBuffer(maxLineLength int, cb LineBufferCallback) *LineBuffer {
+	return &LineBuffer{
+		maxLineLength: maxLineLength,
+		cb:            cb,
+	}
+}
+
+// writeLimited writes to the internal buffer, making sure that its size does not exceed the maxLineLength.
+func (l *LineBuffer) writeLimited(data []byte) {
+	l.length += len(data)
+	if l.cur.Len()+len(data) > l.maxLineLength {
+		data = data[:l.maxLineLength-l.cur.Len()]
+	}
+	l.cur.Write(data)
+}
+
+// comitLine calls the callback and resets the builder.
+func (l *LineBuffer) commitLine() {
+	l.cb(&Line{
+		Data:           l.cur.String(),
+		OriginalLength: l.length,
+	})
+	l.cur.Reset()
+	l.length = 0
+}
+
+func (l *LineBuffer) Write(data []byte) (int, error) {
+	var pos = 0
+
+	l.mu.Lock()
+	defer l.mu.Unlock()
+
+	if l.closed {
+		return 0, fmt.Errorf("closed")
+	}
+
+	for {
+		nextNewline := bytes.IndexRune(data[pos:], '\n')
+
+		// No newline in the data, write everything to the current line
+		if nextNewline == -1 {
+			l.writeLimited(data[pos:])
+			break
+		}
+
+		// Write this line and update position
+		l.writeLimited(data[pos : pos+nextNewline])
+		l.commitLine()
+		pos += nextNewline + 1
+
+		// Data ends with a newline, stop now without writing an empty line
+		if nextNewline == len(data)-1 {
+			break
+		}
+	}
+	return len(data), nil
+}
+
+// Close will emit any leftover data in the buffer to the callback. Subsequent calls to Write will fail. Subsequent calls to Close
+// will also fail.
+func (l *LineBuffer) Close() error {
+	if l.closed {
+		return fmt.Errorf("already closed")
+	}
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	l.closed = true
+	if l.length > 0 {
+		l.commitLine()
+	}
+	return nil
+}
diff --git a/metropolis/node/common/logbuffer/linebuffer_test.go b/metropolis/node/common/logbuffer/linebuffer_test.go
new file mode 100644
index 0000000..c821a4b
--- /dev/null
+++ b/metropolis/node/common/logbuffer/linebuffer_test.go
@@ -0,0 +1,75 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+	"fmt"
+	"testing"
+)
+
+func TestLineBuffer(t *testing.T) {
+	var lines []*Line
+	lb := NewLineBuffer(1024, func(l *Line) {
+		lines = append(lines, l)
+	})
+
+	compare := func(a []*Line, b ...string) string {
+		msg := fmt.Sprintf("want %v, got %v", a, b)
+		if len(a) != len(b) {
+			return msg
+		}
+		for i, _ := range a {
+			if a[i].String() != b[i] {
+				return msg
+			}
+		}
+		return ""
+	}
+
+	// Write some data.
+	fmt.Fprintf(lb, "foo ")
+	if diff := compare(lines); diff != "" {
+		t.Fatal(diff)
+	}
+	fmt.Fprintf(lb, "bar\n")
+	if diff := compare(lines, "foo bar"); diff != "" {
+		t.Fatal(diff)
+	}
+	fmt.Fprintf(lb, "baz")
+	if diff := compare(lines, "foo bar"); diff != "" {
+		t.Fatal(diff)
+	}
+	fmt.Fprintf(lb, " baz")
+	if diff := compare(lines, "foo bar"); diff != "" {
+		t.Fatal(diff)
+	}
+	// Close and expect flush.
+	if err := lb.Close(); err != nil {
+		t.Fatalf("Close: %v", err)
+	}
+	if diff := compare(lines, "foo bar", "baz baz"); diff != "" {
+		t.Fatal(diff)
+	}
+
+	// Check behaviour after close
+	if _, err := fmt.Fprintf(lb, "nope"); err == nil {
+		t.Fatalf("Write after Close: wanted  error, got nil")
+	}
+	if err := lb.Close(); err == nil {
+		t.Fatalf("second Close: wanted error, got nil")
+	}
+}
diff --git a/metropolis/node/common/logbuffer/logbuffer.go b/metropolis/node/common/logbuffer/logbuffer.go
new file mode 100644
index 0000000..ce47816
--- /dev/null
+++ b/metropolis/node/common/logbuffer/logbuffer.go
@@ -0,0 +1,97 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package logbuffer implements a fixed-size in-memory ring buffer for line-separated logs.
+// It implements io.Writer and splits the data into lines. The lines are kept in a ring where the
+// oldest are overwritten once it's full. It allows retrieval of the last n lines. There is a built-in
+// line length limit to bound the memory usage at maxLineLength * size.
+package logbuffer
+
+import (
+	"sync"
+)
+
+// LogBuffer implements a fixed-size in-memory ring buffer for line-separated logs
+type LogBuffer struct {
+	mu      sync.RWMutex
+	content []Line
+	length  int
+	*LineBuffer
+}
+
+// New creates a new LogBuffer with a given ringbuffer size and maximum line length.
+func New(size, maxLineLength int) *LogBuffer {
+	lb := &LogBuffer{
+		content: make([]Line, size),
+	}
+	lb.LineBuffer = NewLineBuffer(maxLineLength, lb.lineCallback)
+	return lb
+}
+
+func (b *LogBuffer) lineCallback(line *Line) {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+
+	b.content[b.length%len(b.content)] = *line
+	b.length++
+}
+
+// capToContentLength caps the number of requested lines to what is actually available
+func (b *LogBuffer) capToContentLength(n int) int {
+	// If there aren't enough lines to read, reduce the request size
+	if n > b.length {
+		n = b.length
+	}
+	// If there isn't enough ringbuffer space, reduce the request size
+	if n > len(b.content) {
+		n = len(b.content)
+	}
+	return n
+}
+
+// ReadLines reads the last n lines from the buffer in chronological order. If n is bigger than the
+// ring buffer or the number of available lines only the number of stored lines are returned.
+func (b *LogBuffer) ReadLines(n int) []Line {
+	b.mu.RLock()
+	defer b.mu.RUnlock()
+
+	n = b.capToContentLength(n)
+
+	// Copy references out to keep them around
+	outArray := make([]Line, n)
+	for i := 1; i <= n; i++ {
+		outArray[n-i] = b.content[(b.length-i)%len(b.content)]
+	}
+	return outArray
+}
+
+// ReadLinesTruncated works exactly the same as ReadLines, but adds an ellipsis at the end of every
+// line that was truncated because it was over MaxLineLength
+func (b *LogBuffer) ReadLinesTruncated(n int, ellipsis string) []string {
+	b.mu.RLock()
+	defer b.mu.RUnlock()
+	// This does not use ReadLines() to prevent excessive reference copying and associated GC pressure
+	// since it could process a lot of lines.
+
+	n = b.capToContentLength(n)
+
+	outArray := make([]string, n)
+	for i := 1; i <= n; i++ {
+		line := b.content[(b.length-i)%len(b.content)]
+		outArray[n-i] = line.String()
+	}
+	return outArray
+}
diff --git a/metropolis/node/common/logbuffer/logbuffer_test.go b/metropolis/node/common/logbuffer/logbuffer_test.go
new file mode 100644
index 0000000..c38d7a6
--- /dev/null
+++ b/metropolis/node/common/logbuffer/logbuffer_test.go
@@ -0,0 +1,94 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/require"
+)
+
+func TestSingleLine(t *testing.T) {
+	buf := New(1, 16000)
+	buf.Write([]byte("Hello World\n"))
+	out := buf.ReadLines(1)
+	require.Len(t, out, 1, "Invalid number of lines read")
+	require.Equal(t, "Hello World", out[0].Data, "Read bad log line")
+	require.Equal(t, 11, out[0].OriginalLength, "Invalid line length")
+}
+
+func TestPartialWritesAndReads(t *testing.T) {
+	buf := New(2, 16000)
+	buf.Write([]byte("Hello "))
+	buf.Write([]byte("World\nTest "))
+	buf.Write([]byte("2\n"))
+
+	out := buf.ReadLines(1)
+	require.Len(t, out, 1, "Invalid number of lines for partial read")
+	require.Equal(t, "Test 2", out[0].Data, "Read bad log line")
+
+	out2 := buf.ReadLines(2)
+	require.Len(t, out2, 2, "Invalid number of lines read")
+	require.Equal(t, "Hello World", out2[0].Data, "Read bad log line")
+	require.Equal(t, "Test 2", out2[1].Data, "Read bad log line")
+}
+
+func TestBufferOverwrite(t *testing.T) {
+	buf := New(3, 16000)
+	buf.Write([]byte("Test1\nTest2\nTest3\nTest4\n"))
+
+	out := buf.ReadLines(3)
+	require.Equal(t, out[0].Data, "Test2", "Read bad log line")
+	require.Equal(t, out[1].Data, "Test3", "Read bad log line")
+	require.Equal(t, out[2].Data, "Test4", "Overwritten data is invalid")
+}
+
+func TestTooLargeRequests(t *testing.T) {
+	buf := New(1, 16000)
+	outEmpty := buf.ReadLines(1)
+	require.Len(t, outEmpty, 0, "Returned more data than there is")
+
+	buf.Write([]byte("1\n2\n"))
+	out := buf.ReadLines(2)
+	require.Len(t, out, 1, "Returned more data than the ring buffer can hold")
+}
+
+func TestSpecialCases(t *testing.T) {
+	buf := New(2, 16000)
+	buf.Write([]byte("Test1"))
+	buf.Write([]byte("\nTest2\n"))
+	out := buf.ReadLines(2)
+	require.Len(t, out, 2, "Too many lines written")
+	require.Equal(t, out[0].Data, "Test1", "Read bad log line")
+	require.Equal(t, out[1].Data, "Test2", "Read bad log line")
+}
+
+func TestLineLengthLimit(t *testing.T) {
+	buf := New(2, 6)
+
+	testStr := "Just Testing"
+
+	buf.Write([]byte(testStr + "\nShort\n"))
+
+	out := buf.ReadLines(2)
+	require.Equal(t, len(testStr), out[0].OriginalLength, "Line is over length limit")
+	require.Equal(t, "Just T", out[0].Data, "Log line not properly truncated")
+
+	out2 := buf.ReadLinesTruncated(2, "...")
+	require.Equal(t, out2[0], "Just T...", "Line is over length limit")
+	require.Equal(t, out2[1], "Short", "Truncated small enough line")
+}