metropolis: unify utility packages
One last sweeping rename / reshuffle.
We get rid of //metropolis/node/common and //golibs, unifying them into
a single //metropolis/pkg meta-package.
This is to be documented somwhere properly, but here's the new logic
behind selecting where to place a new library package:
- if it's specific to k8s-on-metropolis, put it in
//metropolis/node/kubernetes/*. This is a self-contained tree that
other paths cannot import from.
- if it's a big new subsystem of the metropolis core, put it in
//metropolis/node/core. This can be imported by anything in
//m/n (eg the Kubernetes code at //m/n/kubernetes
- otherwise, treat it as generic library that's part of the metropolis
project, and put it in //metropolis/pkg. This can be imported by
anything within //metropolis.
This will be followed up by a diff that updates visibility rules.
Test Plan: Pure refactor, CI only.
X-Origin-Diff: phab/D683
GitOrigin-RevId: 883e7f09a7d22d64e966d07bbe839454ed081c79
diff --git a/metropolis/pkg/logbuffer/BUILD.bazel b/metropolis/pkg/logbuffer/BUILD.bazel
new file mode 100644
index 0000000..57a85d8
--- /dev/null
+++ b/metropolis/pkg/logbuffer/BUILD.bazel
@@ -0,0 +1,22 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "linebuffer.go",
+ "logbuffer.go",
+ ],
+ importpath = "git.monogon.dev/source/nexantic.git/metropolis/pkg/logbuffer",
+ visibility = ["//visibility:public"],
+ deps = ["//metropolis/proto/api:go_default_library"],
+)
+
+go_test(
+ name = "go_default_test",
+ srcs = [
+ "linebuffer_test.go",
+ "logbuffer_test.go",
+ ],
+ embed = [":go_default_library"],
+ deps = ["@com_github_stretchr_testify//require:go_default_library"],
+)
diff --git a/metropolis/pkg/logbuffer/linebuffer.go b/metropolis/pkg/logbuffer/linebuffer.go
new file mode 100644
index 0000000..246a91b
--- /dev/null
+++ b/metropolis/pkg/logbuffer/linebuffer.go
@@ -0,0 +1,160 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+ "bytes"
+ "fmt"
+ "strings"
+ "sync"
+
+ apb "git.monogon.dev/source/nexantic.git/metropolis/proto/api"
+)
+
+// Line is a line stored in the log buffer - a string, that has been perhaps truncated (due to exceeded limits).
+type Line struct {
+ Data string
+ OriginalLength int
+}
+
+// Truncated returns whether this line has been truncated to fit limits.
+func (l *Line) Truncated() bool {
+ return l.OriginalLength > len(l.Data)
+}
+
+// String returns the line with an ellipsis at the end (...) if the line has been truncated, or the original line
+// otherwise.
+func (l *Line) String() string {
+ if l.Truncated() {
+ return l.Data + "..."
+ }
+ return l.Data
+}
+
+// ProtoLog returns a Logging-specific protobuf structure.
+func (l *Line) ProtoLog() *apb.LogEntry_Raw {
+ return &apb.LogEntry_Raw{
+ Data: l.Data,
+ OriginalLength: int64(l.OriginalLength),
+ }
+}
+
+// LineFromLogProto converts a Logging-specific protobuf message back into a Line.
+func LineFromLogProto(raw *apb.LogEntry_Raw) (*Line, error) {
+ if raw.OriginalLength < int64(len(raw.Data)) {
+ return nil, fmt.Errorf("original_length smaller than length of data")
+ }
+ originalLength := int(raw.OriginalLength)
+ if int64(originalLength) < raw.OriginalLength {
+ return nil, fmt.Errorf("original_length larger than native int size")
+ }
+ return &Line{
+ Data: raw.Data,
+ OriginalLength: originalLength,
+ }, nil
+}
+
+// LineBuffer is a io.WriteCloser that will call a given callback every time a line is completed.
+type LineBuffer struct {
+ maxLineLength int
+ cb LineBufferCallback
+
+ mu sync.Mutex
+ cur strings.Builder
+ // length is the length of the line currently being written - this will continue to increase, even if the string
+ // exceeds maxLineLength.
+ length int
+ closed bool
+}
+
+// LineBufferCallback is a callback that will get called any time the line is completed. The function must not cause another
+// write to the LineBuffer, or the program will deadlock.
+type LineBufferCallback func(*Line)
+
+// NewLineBuffer creates a new LineBuffer with a given line length limit and callback.
+func NewLineBuffer(maxLineLength int, cb LineBufferCallback) *LineBuffer {
+ return &LineBuffer{
+ maxLineLength: maxLineLength,
+ cb: cb,
+ }
+}
+
+// writeLimited writes to the internal buffer, making sure that its size does not exceed the maxLineLength.
+func (l *LineBuffer) writeLimited(data []byte) {
+ l.length += len(data)
+ if l.cur.Len()+len(data) > l.maxLineLength {
+ data = data[:l.maxLineLength-l.cur.Len()]
+ }
+ l.cur.Write(data)
+}
+
+// comitLine calls the callback and resets the builder.
+func (l *LineBuffer) commitLine() {
+ l.cb(&Line{
+ Data: l.cur.String(),
+ OriginalLength: l.length,
+ })
+ l.cur.Reset()
+ l.length = 0
+}
+
+func (l *LineBuffer) Write(data []byte) (int, error) {
+ var pos = 0
+
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ if l.closed {
+ return 0, fmt.Errorf("closed")
+ }
+
+ for {
+ nextNewline := bytes.IndexRune(data[pos:], '\n')
+
+ // No newline in the data, write everything to the current line
+ if nextNewline == -1 {
+ l.writeLimited(data[pos:])
+ break
+ }
+
+ // Write this line and update position
+ l.writeLimited(data[pos : pos+nextNewline])
+ l.commitLine()
+ pos += nextNewline + 1
+
+ // Data ends with a newline, stop now without writing an empty line
+ if nextNewline == len(data)-1 {
+ break
+ }
+ }
+ return len(data), nil
+}
+
+// Close will emit any leftover data in the buffer to the callback. Subsequent calls to Write will fail. Subsequent calls to Close
+// will also fail.
+func (l *LineBuffer) Close() error {
+ if l.closed {
+ return fmt.Errorf("already closed")
+ }
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ l.closed = true
+ if l.length > 0 {
+ l.commitLine()
+ }
+ return nil
+}
diff --git a/metropolis/pkg/logbuffer/linebuffer_test.go b/metropolis/pkg/logbuffer/linebuffer_test.go
new file mode 100644
index 0000000..c821a4b
--- /dev/null
+++ b/metropolis/pkg/logbuffer/linebuffer_test.go
@@ -0,0 +1,75 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestLineBuffer(t *testing.T) {
+ var lines []*Line
+ lb := NewLineBuffer(1024, func(l *Line) {
+ lines = append(lines, l)
+ })
+
+ compare := func(a []*Line, b ...string) string {
+ msg := fmt.Sprintf("want %v, got %v", a, b)
+ if len(a) != len(b) {
+ return msg
+ }
+ for i, _ := range a {
+ if a[i].String() != b[i] {
+ return msg
+ }
+ }
+ return ""
+ }
+
+ // Write some data.
+ fmt.Fprintf(lb, "foo ")
+ if diff := compare(lines); diff != "" {
+ t.Fatal(diff)
+ }
+ fmt.Fprintf(lb, "bar\n")
+ if diff := compare(lines, "foo bar"); diff != "" {
+ t.Fatal(diff)
+ }
+ fmt.Fprintf(lb, "baz")
+ if diff := compare(lines, "foo bar"); diff != "" {
+ t.Fatal(diff)
+ }
+ fmt.Fprintf(lb, " baz")
+ if diff := compare(lines, "foo bar"); diff != "" {
+ t.Fatal(diff)
+ }
+ // Close and expect flush.
+ if err := lb.Close(); err != nil {
+ t.Fatalf("Close: %v", err)
+ }
+ if diff := compare(lines, "foo bar", "baz baz"); diff != "" {
+ t.Fatal(diff)
+ }
+
+ // Check behaviour after close
+ if _, err := fmt.Fprintf(lb, "nope"); err == nil {
+ t.Fatalf("Write after Close: wanted error, got nil")
+ }
+ if err := lb.Close(); err == nil {
+ t.Fatalf("second Close: wanted error, got nil")
+ }
+}
diff --git a/metropolis/pkg/logbuffer/logbuffer.go b/metropolis/pkg/logbuffer/logbuffer.go
new file mode 100644
index 0000000..ce47816
--- /dev/null
+++ b/metropolis/pkg/logbuffer/logbuffer.go
@@ -0,0 +1,97 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package logbuffer implements a fixed-size in-memory ring buffer for line-separated logs.
+// It implements io.Writer and splits the data into lines. The lines are kept in a ring where the
+// oldest are overwritten once it's full. It allows retrieval of the last n lines. There is a built-in
+// line length limit to bound the memory usage at maxLineLength * size.
+package logbuffer
+
+import (
+ "sync"
+)
+
+// LogBuffer implements a fixed-size in-memory ring buffer for line-separated logs
+type LogBuffer struct {
+ mu sync.RWMutex
+ content []Line
+ length int
+ *LineBuffer
+}
+
+// New creates a new LogBuffer with a given ringbuffer size and maximum line length.
+func New(size, maxLineLength int) *LogBuffer {
+ lb := &LogBuffer{
+ content: make([]Line, size),
+ }
+ lb.LineBuffer = NewLineBuffer(maxLineLength, lb.lineCallback)
+ return lb
+}
+
+func (b *LogBuffer) lineCallback(line *Line) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ b.content[b.length%len(b.content)] = *line
+ b.length++
+}
+
+// capToContentLength caps the number of requested lines to what is actually available
+func (b *LogBuffer) capToContentLength(n int) int {
+ // If there aren't enough lines to read, reduce the request size
+ if n > b.length {
+ n = b.length
+ }
+ // If there isn't enough ringbuffer space, reduce the request size
+ if n > len(b.content) {
+ n = len(b.content)
+ }
+ return n
+}
+
+// ReadLines reads the last n lines from the buffer in chronological order. If n is bigger than the
+// ring buffer or the number of available lines only the number of stored lines are returned.
+func (b *LogBuffer) ReadLines(n int) []Line {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+
+ n = b.capToContentLength(n)
+
+ // Copy references out to keep them around
+ outArray := make([]Line, n)
+ for i := 1; i <= n; i++ {
+ outArray[n-i] = b.content[(b.length-i)%len(b.content)]
+ }
+ return outArray
+}
+
+// ReadLinesTruncated works exactly the same as ReadLines, but adds an ellipsis at the end of every
+// line that was truncated because it was over MaxLineLength
+func (b *LogBuffer) ReadLinesTruncated(n int, ellipsis string) []string {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+ // This does not use ReadLines() to prevent excessive reference copying and associated GC pressure
+ // since it could process a lot of lines.
+
+ n = b.capToContentLength(n)
+
+ outArray := make([]string, n)
+ for i := 1; i <= n; i++ {
+ line := b.content[(b.length-i)%len(b.content)]
+ outArray[n-i] = line.String()
+ }
+ return outArray
+}
diff --git a/metropolis/pkg/logbuffer/logbuffer_test.go b/metropolis/pkg/logbuffer/logbuffer_test.go
new file mode 100644
index 0000000..c38d7a6
--- /dev/null
+++ b/metropolis/pkg/logbuffer/logbuffer_test.go
@@ -0,0 +1,94 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logbuffer
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestSingleLine(t *testing.T) {
+ buf := New(1, 16000)
+ buf.Write([]byte("Hello World\n"))
+ out := buf.ReadLines(1)
+ require.Len(t, out, 1, "Invalid number of lines read")
+ require.Equal(t, "Hello World", out[0].Data, "Read bad log line")
+ require.Equal(t, 11, out[0].OriginalLength, "Invalid line length")
+}
+
+func TestPartialWritesAndReads(t *testing.T) {
+ buf := New(2, 16000)
+ buf.Write([]byte("Hello "))
+ buf.Write([]byte("World\nTest "))
+ buf.Write([]byte("2\n"))
+
+ out := buf.ReadLines(1)
+ require.Len(t, out, 1, "Invalid number of lines for partial read")
+ require.Equal(t, "Test 2", out[0].Data, "Read bad log line")
+
+ out2 := buf.ReadLines(2)
+ require.Len(t, out2, 2, "Invalid number of lines read")
+ require.Equal(t, "Hello World", out2[0].Data, "Read bad log line")
+ require.Equal(t, "Test 2", out2[1].Data, "Read bad log line")
+}
+
+func TestBufferOverwrite(t *testing.T) {
+ buf := New(3, 16000)
+ buf.Write([]byte("Test1\nTest2\nTest3\nTest4\n"))
+
+ out := buf.ReadLines(3)
+ require.Equal(t, out[0].Data, "Test2", "Read bad log line")
+ require.Equal(t, out[1].Data, "Test3", "Read bad log line")
+ require.Equal(t, out[2].Data, "Test4", "Overwritten data is invalid")
+}
+
+func TestTooLargeRequests(t *testing.T) {
+ buf := New(1, 16000)
+ outEmpty := buf.ReadLines(1)
+ require.Len(t, outEmpty, 0, "Returned more data than there is")
+
+ buf.Write([]byte("1\n2\n"))
+ out := buf.ReadLines(2)
+ require.Len(t, out, 1, "Returned more data than the ring buffer can hold")
+}
+
+func TestSpecialCases(t *testing.T) {
+ buf := New(2, 16000)
+ buf.Write([]byte("Test1"))
+ buf.Write([]byte("\nTest2\n"))
+ out := buf.ReadLines(2)
+ require.Len(t, out, 2, "Too many lines written")
+ require.Equal(t, out[0].Data, "Test1", "Read bad log line")
+ require.Equal(t, out[1].Data, "Test2", "Read bad log line")
+}
+
+func TestLineLengthLimit(t *testing.T) {
+ buf := New(2, 6)
+
+ testStr := "Just Testing"
+
+ buf.Write([]byte(testStr + "\nShort\n"))
+
+ out := buf.ReadLines(2)
+ require.Equal(t, len(testStr), out[0].OriginalLength, "Line is over length limit")
+ require.Equal(t, "Just T", out[0].Data, "Log line not properly truncated")
+
+ out2 := buf.ReadLinesTruncated(2, "...")
+ require.Equal(t, out2[0], "Just T...", "Line is over length limit")
+ require.Equal(t, out2[1], "Short", "Truncated small enough line")
+}