blob: 8048604aae7b3cc5f567aacd94550d72590039a9 [file] [log] [blame]
Serge Bazanski248b2ec2020-10-26 15:55:51 +01001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package logbuffer
18
19import (
20 "bytes"
21 "fmt"
22 "strings"
23 "sync"
Serge Bazanskib0272182020-11-02 18:39:44 +010024
Serge Bazanski31370b02021-01-07 16:31:14 +010025 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanski248b2ec2020-10-26 15:55:51 +010026)
27
28// Line is a line stored in the log buffer - a string, that has been perhaps truncated (due to exceeded limits).
29type Line struct {
30 Data string
31 OriginalLength int
32}
33
34// Truncated returns whether this line has been truncated to fit limits.
35func (l *Line) Truncated() bool {
36 return l.OriginalLength > len(l.Data)
37}
38
39// String returns the line with an ellipsis at the end (...) if the line has been truncated, or the original line
40// otherwise.
41func (l *Line) String() string {
42 if l.Truncated() {
43 return l.Data + "..."
44 }
45 return l.Data
46}
47
Serge Bazanskib0272182020-11-02 18:39:44 +010048// ProtoLog returns a Logging-specific protobuf structure.
49func (l *Line) ProtoLog() *apb.LogEntry_Raw {
50 return &apb.LogEntry_Raw{
51 Data: l.Data,
52 OriginalLength: int64(l.OriginalLength),
53 }
54}
55
56// LineFromLogProto converts a Logging-specific protobuf message back into a Line.
57func LineFromLogProto(raw *apb.LogEntry_Raw) (*Line, error) {
58 if raw.OriginalLength < int64(len(raw.Data)) {
59 return nil, fmt.Errorf("original_length smaller than length of data")
60 }
61 originalLength := int(raw.OriginalLength)
62 if int64(originalLength) < raw.OriginalLength {
63 return nil, fmt.Errorf("original_length larger than native int size")
64 }
65 return &Line{
66 Data: raw.Data,
67 OriginalLength: originalLength,
68 }, nil
69}
70
Serge Bazanski248b2ec2020-10-26 15:55:51 +010071// LineBuffer is a io.WriteCloser that will call a given callback every time a line is completed.
72type LineBuffer struct {
73 maxLineLength int
74 cb LineBufferCallback
75
76 mu sync.Mutex
77 cur strings.Builder
78 // length is the length of the line currently being written - this will continue to increase, even if the string
79 // exceeds maxLineLength.
80 length int
81 closed bool
82}
83
84// LineBufferCallback is a callback that will get called any time the line is completed. The function must not cause another
85// write to the LineBuffer, or the program will deadlock.
86type LineBufferCallback func(*Line)
87
88// NewLineBuffer creates a new LineBuffer with a given line length limit and callback.
89func NewLineBuffer(maxLineLength int, cb LineBufferCallback) *LineBuffer {
90 return &LineBuffer{
91 maxLineLength: maxLineLength,
92 cb: cb,
93 }
94}
95
96// writeLimited writes to the internal buffer, making sure that its size does not exceed the maxLineLength.
97func (l *LineBuffer) writeLimited(data []byte) {
98 l.length += len(data)
99 if l.cur.Len()+len(data) > l.maxLineLength {
100 data = data[:l.maxLineLength-l.cur.Len()]
101 }
102 l.cur.Write(data)
103}
104
105// comitLine calls the callback and resets the builder.
106func (l *LineBuffer) commitLine() {
107 l.cb(&Line{
108 Data: l.cur.String(),
109 OriginalLength: l.length,
110 })
111 l.cur.Reset()
112 l.length = 0
113}
114
115func (l *LineBuffer) Write(data []byte) (int, error) {
116 var pos = 0
117
118 l.mu.Lock()
119 defer l.mu.Unlock()
120
121 if l.closed {
122 return 0, fmt.Errorf("closed")
123 }
124
125 for {
126 nextNewline := bytes.IndexRune(data[pos:], '\n')
127
128 // No newline in the data, write everything to the current line
129 if nextNewline == -1 {
130 l.writeLimited(data[pos:])
131 break
132 }
133
134 // Write this line and update position
135 l.writeLimited(data[pos : pos+nextNewline])
136 l.commitLine()
137 pos += nextNewline + 1
138
139 // Data ends with a newline, stop now without writing an empty line
140 if nextNewline == len(data)-1 {
141 break
142 }
143 }
144 return len(data), nil
145}
146
147// Close will emit any leftover data in the buffer to the callback. Subsequent calls to Write will fail. Subsequent calls to Close
148// will also fail.
149func (l *LineBuffer) Close() error {
150 if l.closed {
151 return fmt.Errorf("already closed")
152 }
153 l.mu.Lock()
154 defer l.mu.Unlock()
155 l.closed = true
156 if l.length > 0 {
157 l.commitLine()
158 }
159 return nil
160}