blob: 770952622e23165e69d29a6bb7781d60f8b83b66 [file] [log] [blame]
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package logtree
18
Serge Bazanskif68153c2020-10-26 13:54:37 +010019import (
Serge Bazanskib0272182020-11-02 18:39:44 +010020 "errors"
Serge Bazanskif68153c2020-10-26 13:54:37 +010021 "fmt"
Serge Bazanski12971d62020-11-17 12:12:58 +010022 "strings"
Serge Bazanskif68153c2020-10-26 13:54:37 +010023 "sync/atomic"
24
25 "git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
Serge Bazanskib0272182020-11-02 18:39:44 +010026 apb "git.monogon.dev/source/nexantic.git/core/proto/api"
Serge Bazanskif68153c2020-10-26 13:54:37 +010027)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020028
29// LogReadOption describes options for the LogTree.Read call.
30type LogReadOption struct {
Serge Bazanskif68153c2020-10-26 13:54:37 +010031 withChildren bool
32 withStream bool
33 withBacklog int
34 onlyLeveled bool
35 onlyRaw bool
36 leveledWithMinimumSeverity Severity
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020037}
38
39// WithChildren makes Read return/stream data for both a given DN and all its children.
40func WithChildren() LogReadOption { return LogReadOption{withChildren: true} }
41
42// WithStream makes Read return a stream of data. This works alongside WithBacklog to create a read-and-stream
43// construct.
44func WithStream() LogReadOption { return LogReadOption{withStream: true} }
45
46// WithBacklog makes Read return already recorded log entries, up to count elements.
47func WithBacklog(count int) LogReadOption { return LogReadOption{withBacklog: count} }
48
49// BacklogAllAvailable makes WithBacklog return all backlogged log data that logtree possesses.
50const BacklogAllAvailable int = -1
51
Serge Bazanskif68153c2020-10-26 13:54:37 +010052func OnlyRaw() LogReadOption { return LogReadOption{onlyRaw: true} }
53
54func OnlyLeveled() LogReadOption { return LogReadOption{onlyLeveled: true} }
55
56// LeveledWithMinimumSeverity makes Read return only log entries that are at least at a given Severity. If only leveled
57// entries are needed, OnlyLeveled must be used. This is a no-op when OnlyRaw is used.
58func LeveledWithMinimumSeverity(s Severity) LogReadOption {
59 return LogReadOption{leveledWithMinimumSeverity: s}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020060}
61
62// LogReader permits reading an already existing backlog of log entries and to stream further ones.
63type LogReader struct {
64 // Backlog are the log entries already logged by LogTree. This will only be set if WithBacklog has been passed to
65 // Read.
66 Backlog []*LogEntry
67 // Stream is a channel of new entries as received live by LogTree. This will only be set if WithStream has been
68 // passed to Read. In this case, entries from this channel must be read as fast as possible by the consumer in order
69 // to prevent missing entries.
70 Stream <-chan *LogEntry
71 // done is channel used to signal (by closing) that the log consumer is not interested in more Stream data.
72 done chan<- struct{}
73 // missed is an atomic integer pointer that tells the subscriber how many messages in Stream they missed. This
74 // pointer is nil if no streaming has been requested.
75 missed *uint64
76}
77
78// Missed returns the amount of entries that were missed from Stream (as the channel was not drained fast enough).
79func (l *LogReader) Missed() uint64 {
80 // No Stream.
81 if l.missed == nil {
82 return 0
83 }
84 return atomic.LoadUint64(l.missed)
85}
86
87// Close closes the LogReader's Stream. This must be called once the Reader does not wish to receive streaming messages
88// anymore.
89func (l *LogReader) Close() {
90 if l.done != nil {
91 close(l.done)
92 }
93}
94
Serge Bazanskif68153c2020-10-26 13:54:37 +010095// LogEntry contains a log entry, combining both leveled and raw logging into a single stream of events. A LogEntry
96// will contain exactly one of either LeveledPayload or RawPayload.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020097type LogEntry struct {
Serge Bazanskif68153c2020-10-26 13:54:37 +010098 // If non-nil, this is a leveled logging entry.
99 Leveled *LeveledPayload
100 // If non-nil, this is a raw logging entry line.
101 Raw *logbuffer.Line
102 // DN from which this entry was logged.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200103 DN DN
104}
105
Serge Bazanski12971d62020-11-17 12:12:58 +0100106// String returns a canonical representation of this payload as a single string prefixed with metadata. If the entry is
107// a leveled log entry that originally was logged with newlines this representation will also contain newlines, with
108// each original message part prefixed by the metadata.
109// For an alternative call that will instead return a canonical prefix and a list of lines in the message, see Strings().
110func (l *LogEntry) String() string {
111 if l.Leveled != nil {
112 prefix, messages := l.Leveled.Strings()
113 res := make([]string, len(messages))
114 for i, m := range messages {
115 res[i] = fmt.Sprintf("%-32s %s%s", l.DN, prefix, m)
116 }
117 return strings.Join(res, "\n")
118 }
119 if l.Raw != nil {
120 return fmt.Sprintf("%-32s R %s", l.DN, l.Raw)
121 }
122 return "INVALID"
123}
124
125// Strings returns the canonical representation of this payload split into a prefix and all lines that were contained in
126// the original message. This is meant to be displayed to the user by showing the prefix before each line, concatenated
127// together - possibly in a table form with the prefixes all unified with a rowspan-like mechanism.
128//
129// For example, this function can return:
130// prefix = "root.foo.bar I1102 17:20:06.921395 0 foo.go:42] "
131// lines = []string{"current tags:", " - one", " - two"}
132//
133// With this data, the result should be presented to users this way in text form:
134// root.foo.bar I1102 17:20:06.921395 foo.go:42] current tags:
135// root.foo.bar I1102 17:20:06.921395 foo.go:42] - one
136// root.foo.bar I1102 17:20:06.921395 foo.go:42] - two
137//
138// Or, in a table layout:
139// .-------------------------------------------------------------------------------------.
140// | root.foo.bar I1102 17:20:06.921395 foo.go:42] : current tags: |
141// | :------------------|
142// | : - one |
143// | :------------------|
144// | : - two |
145// '-------------------------------------------------------------------------------------'
146//
147func (l *LogEntry) Strings() (prefix string, lines []string) {
148 if l.Leveled != nil {
149 prefix, messages := l.Leveled.Strings()
150 prefix = fmt.Sprintf("%-32s %s", l.DN, prefix)
151 return prefix, messages
152 }
153 if l.Raw != nil {
154 return fmt.Sprintf("%-32s R ", l.DN), []string{l.Raw.Data}
155 }
156 return "INVALID ", []string{"INVALID"}
157}
158
Serge Bazanskib0272182020-11-02 18:39:44 +0100159// Convert this LogEntry to proto. Returned value may be nil if given LogEntry is invalid, eg. contains neither a Raw
160// nor Leveled entry.
161func (l *LogEntry) Proto() *apb.LogEntry {
162 p := &apb.LogEntry{
163 Dn: string(l.DN),
164 }
165 switch {
166 case l.Leveled != nil:
167 leveled := l.Leveled
168 p.Kind = &apb.LogEntry_Leveled_{
169 Leveled: leveled.Proto(),
170 }
171 case l.Raw != nil:
172 raw := l.Raw
173 p.Kind = &apb.LogEntry_Raw_{
174 Raw: raw.ProtoLog(),
175 }
176 default:
177 return nil
178 }
179 return p
180}
181
Serge Bazanskib0272182020-11-02 18:39:44 +0100182// Parse a proto LogEntry back into internal structure. This can be used in log proto API consumers to easily print
183// received log entries.
184func LogEntryFromProto(l *apb.LogEntry) (*LogEntry, error) {
185 dn := DN(l.Dn)
186 if _, err := dn.Path(); err != nil {
187 return nil, fmt.Errorf("could not convert DN: %w", err)
188 }
189 res := &LogEntry{
190 DN: dn,
191 }
192 switch inner := l.Kind.(type) {
193 case *apb.LogEntry_Leveled_:
194 leveled, err := LeveledPayloadFromProto(inner.Leveled)
195 if err != nil {
196 return nil, fmt.Errorf("could not convert leveled entry: %w", err)
197 }
198 res.Leveled = leveled
199 case *apb.LogEntry_Raw_:
200 line, err := logbuffer.LineFromLogProto(inner.Raw)
201 if err != nil {
202 return nil, fmt.Errorf("could not convert raw entry: %w", err)
203 }
204 res.Raw = line
205 default:
206 return nil, fmt.Errorf("proto has neither Leveled nor Raw set")
207 }
208 return res, nil
209}
210
211var (
212 ErrRawAndLeveled = errors.New("cannot return logs that are simultaneously OnlyRaw and OnlyLeveled")
213)
214
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200215// Read and/or stream entries from a LogTree. The returned LogReader is influenced by the LogReadOptions passed, which
216// influence whether the Read will return existing entries, a stream, or both. In addition the options also dictate
217// whether only entries for that particular DN are returned, or for all sub-DNs as well.
Serge Bazanskif68153c2020-10-26 13:54:37 +0100218func (l *LogTree) Read(dn DN, opts ...LogReadOption) (*LogReader, error) {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200219 l.journal.mu.RLock()
220 defer l.journal.mu.RUnlock()
221
222 var backlog int
223 var stream bool
224 var recursive bool
Serge Bazanskif68153c2020-10-26 13:54:37 +0100225 var leveledSeverity Severity
226 var onlyRaw, onlyLeveled bool
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200227
228 for _, opt := range opts {
229 if opt.withBacklog > 0 || opt.withBacklog == BacklogAllAvailable {
230 backlog = opt.withBacklog
231 }
232 if opt.withStream {
233 stream = true
234 }
235 if opt.withChildren {
236 recursive = true
237 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100238 if opt.leveledWithMinimumSeverity != "" {
239 leveledSeverity = opt.leveledWithMinimumSeverity
240 }
241 if opt.onlyLeveled {
242 onlyLeveled = true
243 }
244 if opt.onlyRaw {
245 onlyRaw = true
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200246 }
247 }
248
Serge Bazanskif68153c2020-10-26 13:54:37 +0100249 if onlyLeveled && onlyRaw {
Serge Bazanskib0272182020-11-02 18:39:44 +0100250 return nil, ErrRawAndLeveled
Serge Bazanskif68153c2020-10-26 13:54:37 +0100251 }
252
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200253 var filters []filter
Serge Bazanskif68153c2020-10-26 13:54:37 +0100254 if onlyLeveled {
255 filters = append(filters, filterOnlyLeveled)
256 }
257 if onlyRaw {
258 filters = append(filters, filterOnlyRaw)
259 }
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200260 if recursive {
261 filters = append(filters, filterSubtree(dn))
262 } else {
263 filters = append(filters, filterExact(dn))
264 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100265 if leveledSeverity != "" {
266 filters = append(filters, filterSeverity(leveledSeverity))
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200267 }
268
269 var entries []*entry
270 if backlog > 0 || backlog == BacklogAllAvailable {
271 // TODO(q3k): pass over the backlog count to scanEntries/getEntries, instead of discarding them here.
272 if recursive {
273 entries = l.journal.scanEntries(filters...)
274 } else {
275 entries = l.journal.getEntries(dn, filters...)
276 }
277 if backlog != BacklogAllAvailable && len(entries) > backlog {
278 entries = entries[:backlog]
279 }
280 }
281
282 var sub *subscriber
283 if stream {
284 sub = &subscriber{
285 // TODO(q3k): make buffer size configurable
286 dataC: make(chan *LogEntry, 128),
287 doneC: make(chan struct{}),
288 filters: filters,
289 }
290 l.journal.subscribe(sub)
291 }
292
293 lr := &LogReader{}
294 lr.Backlog = make([]*LogEntry, len(entries))
295 for i, entry := range entries {
Serge Bazanskif68153c2020-10-26 13:54:37 +0100296 lr.Backlog[i] = entry.external()
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200297 }
298 if stream {
299 lr.Stream = sub.dataC
300 lr.done = sub.doneC
301 lr.missed = &sub.missed
302 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100303 return lr, nil
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200304}