blob: bb8a524312e44b11d350cf1bbf663e7d2c17d44b [file] [log] [blame]
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package logtree
18
Serge Bazanskif68153c2020-10-26 13:54:37 +010019import (
Serge Bazanskib0272182020-11-02 18:39:44 +010020 "errors"
Serge Bazanskif68153c2020-10-26 13:54:37 +010021 "fmt"
Serge Bazanskif68153c2020-10-26 13:54:37 +010022 "sync/atomic"
23
24 "git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
Serge Bazanskib0272182020-11-02 18:39:44 +010025 apb "git.monogon.dev/source/nexantic.git/core/proto/api"
Serge Bazanskif68153c2020-10-26 13:54:37 +010026)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020027
28// LogReadOption describes options for the LogTree.Read call.
29type LogReadOption struct {
Serge Bazanskif68153c2020-10-26 13:54:37 +010030 withChildren bool
31 withStream bool
32 withBacklog int
33 onlyLeveled bool
34 onlyRaw bool
35 leveledWithMinimumSeverity Severity
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020036}
37
38// WithChildren makes Read return/stream data for both a given DN and all its children.
39func WithChildren() LogReadOption { return LogReadOption{withChildren: true} }
40
41// WithStream makes Read return a stream of data. This works alongside WithBacklog to create a read-and-stream
42// construct.
43func WithStream() LogReadOption { return LogReadOption{withStream: true} }
44
45// WithBacklog makes Read return already recorded log entries, up to count elements.
46func WithBacklog(count int) LogReadOption { return LogReadOption{withBacklog: count} }
47
48// BacklogAllAvailable makes WithBacklog return all backlogged log data that logtree possesses.
49const BacklogAllAvailable int = -1
50
Serge Bazanskif68153c2020-10-26 13:54:37 +010051func OnlyRaw() LogReadOption { return LogReadOption{onlyRaw: true} }
52
53func OnlyLeveled() LogReadOption { return LogReadOption{onlyLeveled: true} }
54
55// LeveledWithMinimumSeverity makes Read return only log entries that are at least at a given Severity. If only leveled
56// entries are needed, OnlyLeveled must be used. This is a no-op when OnlyRaw is used.
57func LeveledWithMinimumSeverity(s Severity) LogReadOption {
58 return LogReadOption{leveledWithMinimumSeverity: s}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020059}
60
61// LogReader permits reading an already existing backlog of log entries and to stream further ones.
62type LogReader struct {
63 // Backlog are the log entries already logged by LogTree. This will only be set if WithBacklog has been passed to
64 // Read.
65 Backlog []*LogEntry
66 // Stream is a channel of new entries as received live by LogTree. This will only be set if WithStream has been
67 // passed to Read. In this case, entries from this channel must be read as fast as possible by the consumer in order
68 // to prevent missing entries.
69 Stream <-chan *LogEntry
70 // done is channel used to signal (by closing) that the log consumer is not interested in more Stream data.
71 done chan<- struct{}
72 // missed is an atomic integer pointer that tells the subscriber how many messages in Stream they missed. This
73 // pointer is nil if no streaming has been requested.
74 missed *uint64
75}
76
77// Missed returns the amount of entries that were missed from Stream (as the channel was not drained fast enough).
78func (l *LogReader) Missed() uint64 {
79 // No Stream.
80 if l.missed == nil {
81 return 0
82 }
83 return atomic.LoadUint64(l.missed)
84}
85
86// Close closes the LogReader's Stream. This must be called once the Reader does not wish to receive streaming messages
87// anymore.
88func (l *LogReader) Close() {
89 if l.done != nil {
90 close(l.done)
91 }
92}
93
Serge Bazanskif68153c2020-10-26 13:54:37 +010094// LogEntry contains a log entry, combining both leveled and raw logging into a single stream of events. A LogEntry
95// will contain exactly one of either LeveledPayload or RawPayload.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020096type LogEntry struct {
Serge Bazanskif68153c2020-10-26 13:54:37 +010097 // If non-nil, this is a leveled logging entry.
98 Leveled *LeveledPayload
99 // If non-nil, this is a raw logging entry line.
100 Raw *logbuffer.Line
101 // DN from which this entry was logged.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200102 DN DN
103}
104
Serge Bazanskib0272182020-11-02 18:39:44 +0100105// Convert this LogEntry to proto. Returned value may be nil if given LogEntry is invalid, eg. contains neither a Raw
106// nor Leveled entry.
107func (l *LogEntry) Proto() *apb.LogEntry {
108 p := &apb.LogEntry{
109 Dn: string(l.DN),
110 }
111 switch {
112 case l.Leveled != nil:
113 leveled := l.Leveled
114 p.Kind = &apb.LogEntry_Leveled_{
115 Leveled: leveled.Proto(),
116 }
117 case l.Raw != nil:
118 raw := l.Raw
119 p.Kind = &apb.LogEntry_Raw_{
120 Raw: raw.ProtoLog(),
121 }
122 default:
123 return nil
124 }
125 return p
126}
127
128// String returns a standardized human-readable representation of either underlying raw or leveled entry. The returned
129// data is pre-formatted to be displayed in a fixed-width font.
130func (l *LogEntry) String() string {
131 if l.Leveled != nil {
132 // Use glog-like layout, but with supervisor DN instead of filename.
133 timestamp := l.Leveled.Timestamp()
134 _, month, day := timestamp.Date()
135 hour, minute, second := timestamp.Clock()
136 nsec := timestamp.Nanosecond() / 1000
137 return fmt.Sprintf("%s%02d%02d %02d:%02d:%02d.%06d %s] %s", l.Leveled.Severity(), month, day, hour, minute, second, nsec, l.DN, l.Leveled.Message())
138 }
139 if l.Raw != nil {
140 return fmt.Sprintf("%-32s R %s", l.DN, l.Raw)
141 }
142 return "INVALID"
143}
144
145// Parse a proto LogEntry back into internal structure. This can be used in log proto API consumers to easily print
146// received log entries.
147func LogEntryFromProto(l *apb.LogEntry) (*LogEntry, error) {
148 dn := DN(l.Dn)
149 if _, err := dn.Path(); err != nil {
150 return nil, fmt.Errorf("could not convert DN: %w", err)
151 }
152 res := &LogEntry{
153 DN: dn,
154 }
155 switch inner := l.Kind.(type) {
156 case *apb.LogEntry_Leveled_:
157 leveled, err := LeveledPayloadFromProto(inner.Leveled)
158 if err != nil {
159 return nil, fmt.Errorf("could not convert leveled entry: %w", err)
160 }
161 res.Leveled = leveled
162 case *apb.LogEntry_Raw_:
163 line, err := logbuffer.LineFromLogProto(inner.Raw)
164 if err != nil {
165 return nil, fmt.Errorf("could not convert raw entry: %w", err)
166 }
167 res.Raw = line
168 default:
169 return nil, fmt.Errorf("proto has neither Leveled nor Raw set")
170 }
171 return res, nil
172}
173
174var (
175 ErrRawAndLeveled = errors.New("cannot return logs that are simultaneously OnlyRaw and OnlyLeveled")
176)
177
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200178// Read and/or stream entries from a LogTree. The returned LogReader is influenced by the LogReadOptions passed, which
179// influence whether the Read will return existing entries, a stream, or both. In addition the options also dictate
180// whether only entries for that particular DN are returned, or for all sub-DNs as well.
Serge Bazanskif68153c2020-10-26 13:54:37 +0100181func (l *LogTree) Read(dn DN, opts ...LogReadOption) (*LogReader, error) {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200182 l.journal.mu.RLock()
183 defer l.journal.mu.RUnlock()
184
185 var backlog int
186 var stream bool
187 var recursive bool
Serge Bazanskif68153c2020-10-26 13:54:37 +0100188 var leveledSeverity Severity
189 var onlyRaw, onlyLeveled bool
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200190
191 for _, opt := range opts {
192 if opt.withBacklog > 0 || opt.withBacklog == BacklogAllAvailable {
193 backlog = opt.withBacklog
194 }
195 if opt.withStream {
196 stream = true
197 }
198 if opt.withChildren {
199 recursive = true
200 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100201 if opt.leveledWithMinimumSeverity != "" {
202 leveledSeverity = opt.leveledWithMinimumSeverity
203 }
204 if opt.onlyLeveled {
205 onlyLeveled = true
206 }
207 if opt.onlyRaw {
208 onlyRaw = true
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200209 }
210 }
211
Serge Bazanskif68153c2020-10-26 13:54:37 +0100212 if onlyLeveled && onlyRaw {
Serge Bazanskib0272182020-11-02 18:39:44 +0100213 return nil, ErrRawAndLeveled
Serge Bazanskif68153c2020-10-26 13:54:37 +0100214 }
215
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200216 var filters []filter
Serge Bazanskif68153c2020-10-26 13:54:37 +0100217 if onlyLeveled {
218 filters = append(filters, filterOnlyLeveled)
219 }
220 if onlyRaw {
221 filters = append(filters, filterOnlyRaw)
222 }
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200223 if recursive {
224 filters = append(filters, filterSubtree(dn))
225 } else {
226 filters = append(filters, filterExact(dn))
227 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100228 if leveledSeverity != "" {
229 filters = append(filters, filterSeverity(leveledSeverity))
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200230 }
231
232 var entries []*entry
233 if backlog > 0 || backlog == BacklogAllAvailable {
234 // TODO(q3k): pass over the backlog count to scanEntries/getEntries, instead of discarding them here.
235 if recursive {
236 entries = l.journal.scanEntries(filters...)
237 } else {
238 entries = l.journal.getEntries(dn, filters...)
239 }
240 if backlog != BacklogAllAvailable && len(entries) > backlog {
241 entries = entries[:backlog]
242 }
243 }
244
245 var sub *subscriber
246 if stream {
247 sub = &subscriber{
248 // TODO(q3k): make buffer size configurable
249 dataC: make(chan *LogEntry, 128),
250 doneC: make(chan struct{}),
251 filters: filters,
252 }
253 l.journal.subscribe(sub)
254 }
255
256 lr := &LogReader{}
257 lr.Backlog = make([]*LogEntry, len(entries))
258 for i, entry := range entries {
Serge Bazanskif68153c2020-10-26 13:54:37 +0100259 lr.Backlog[i] = entry.external()
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200260 }
261 if stream {
262 lr.Stream = sub.dataC
263 lr.done = sub.doneC
264 lr.missed = &sub.missed
265 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100266 return lr, nil
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200267}