blob: 1582a8f04984868efea5f4c04a8f954b36b0d89e [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02002// SPDX-License-Identifier: Apache-2.0
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02003
4package logtree
5
Serge Bazanskif68153c2020-10-26 13:54:37 +01006import (
Serge Bazanskib0272182020-11-02 18:39:44 +01007 "errors"
Serge Bazanskif68153c2020-10-26 13:54:37 +01008 "sync/atomic"
Serge Bazanski3c5d0632024-09-12 10:49:12 +00009
10 "source.monogon.dev/go/logging"
Serge Bazanskif68153c2020-10-26 13:54:37 +010011)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020012
13// LogReadOption describes options for the LogTree.Read call.
Tim Windelschmidt50093be2025-07-21 17:39:09 +020014type LogReadOption func(*logReaderOptions)
15
16type logReaderOptions struct {
Serge Bazanskif68153c2020-10-26 13:54:37 +010017 withChildren bool
18 withStream bool
19 withBacklog int
20 onlyLeveled bool
21 onlyRaw bool
Serge Bazanski3c5d0632024-09-12 10:49:12 +000022 leveledWithMinimumSeverity logging.Severity
Tim Windelschmidtc2635922025-07-21 18:01:23 +020023 withStreamBufferSize int
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020024}
25
Serge Bazanski216fe7b2021-05-21 18:36:16 +020026// WithChildren makes Read return/stream data for both a given DN and all its
27// children.
Tim Windelschmidt50093be2025-07-21 17:39:09 +020028func WithChildren() LogReadOption {
29 return func(lro *logReaderOptions) {
30 lro.withChildren = true
31 }
32}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020033
Serge Bazanski216fe7b2021-05-21 18:36:16 +020034// WithStream makes Read return a stream of data. This works alongside WithBacklog
35// to create a read-and-stream construct.
Tim Windelschmidt50093be2025-07-21 17:39:09 +020036func WithStream() LogReadOption {
37 return func(lro *logReaderOptions) {
38 lro.withStream = true
39 }
40}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020041
Tim Windelschmidtc2635922025-07-21 18:01:23 +020042// WithStreamBuffer applies WithStream and overrides the default stream buffer
43// size of 128.
44func WithStreamBuffer(size int) LogReadOption {
45 return func(lro *logReaderOptions) {
46 lro.withStreamBufferSize = size
47 lro.withStream = true
48 }
49}
50
Serge Bazanski216fe7b2021-05-21 18:36:16 +020051// WithBacklog makes Read return already recorded log entries, up to count
52// elements.
Tim Windelschmidt50093be2025-07-21 17:39:09 +020053func WithBacklog(count int) LogReadOption {
54 return func(lro *logReaderOptions) { lro.withBacklog = count }
55}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020056
Serge Bazanski216fe7b2021-05-21 18:36:16 +020057// BacklogAllAvailable makes WithBacklog return all backlogged log data that
58// logtree possesses.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020059const BacklogAllAvailable int = -1
60
Tim Windelschmidt50093be2025-07-21 17:39:09 +020061func OnlyRaw() LogReadOption { return func(lro *logReaderOptions) { lro.onlyRaw = true } }
Serge Bazanskif68153c2020-10-26 13:54:37 +010062
Tim Windelschmidt50093be2025-07-21 17:39:09 +020063func OnlyLeveled() LogReadOption { return func(lro *logReaderOptions) { lro.onlyLeveled = true } }
Serge Bazanskif68153c2020-10-26 13:54:37 +010064
Serge Bazanski216fe7b2021-05-21 18:36:16 +020065// LeveledWithMinimumSeverity makes Read return only log entries that are at least
66// at a given Severity. If only leveled entries are needed, OnlyLeveled must be
67// used. This is a no-op when OnlyRaw is used.
Serge Bazanski3c5d0632024-09-12 10:49:12 +000068func LeveledWithMinimumSeverity(s logging.Severity) LogReadOption {
Tim Windelschmidt50093be2025-07-21 17:39:09 +020069 return func(lro *logReaderOptions) { lro.leveledWithMinimumSeverity = s }
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020070}
71
Serge Bazanski216fe7b2021-05-21 18:36:16 +020072// LogReader permits reading an already existing backlog of log entries and to
73// stream further ones.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020074type LogReader struct {
Serge Bazanski216fe7b2021-05-21 18:36:16 +020075 // Backlog are the log entries already logged by LogTree. This will only be set if
76 // WithBacklog has been passed to Read.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020077 Backlog []*LogEntry
Serge Bazanski216fe7b2021-05-21 18:36:16 +020078 // Stream is a channel of new entries as received live by LogTree. This will only
79 // be set if WithStream has been passed to Read. In this case, entries from this
80 // channel must be read as fast as possible by the consumer in order to prevent
81 // missing entries.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020082 Stream <-chan *LogEntry
Serge Bazanski216fe7b2021-05-21 18:36:16 +020083 // done is channel used to signal (by closing) that the log consumer is not
84 // interested in more Stream data.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020085 done chan<- struct{}
Serge Bazanski216fe7b2021-05-21 18:36:16 +020086 // missed is an atomic integer pointer that tells the subscriber how many messages
87 // in Stream they missed. This pointer is nil if no streaming has been requested.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020088 missed *uint64
89}
90
Serge Bazanski216fe7b2021-05-21 18:36:16 +020091// Missed returns the amount of entries that were missed from Stream (as the
92// channel was not drained fast enough).
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020093func (l *LogReader) Missed() uint64 {
94 // No Stream.
95 if l.missed == nil {
96 return 0
97 }
98 return atomic.LoadUint64(l.missed)
99}
100
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200101// Close closes the LogReader's Stream. This must be called once the Reader does
102// not wish to receive streaming messages anymore.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200103func (l *LogReader) Close() {
104 if l.done != nil {
105 close(l.done)
106 }
107}
108
Serge Bazanskib0272182020-11-02 18:39:44 +0100109var (
110 ErrRawAndLeveled = errors.New("cannot return logs that are simultaneously OnlyRaw and OnlyLeveled")
111)
112
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200113// Read and/or stream entries from a LogTree. The returned LogReader is influenced
114// by the LogReadOptions passed, which influence whether the Read will return
115// existing entries, a stream, or both. In addition the options also dictate
116// whether only entries for that particular DN are returned, or for all sub-DNs as
117// well.
Serge Bazanskif68153c2020-10-26 13:54:37 +0100118func (l *LogTree) Read(dn DN, opts ...LogReadOption) (*LogReader, error) {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200119 l.journal.mu.RLock()
120 defer l.journal.mu.RUnlock()
121
Tim Windelschmidtc2635922025-07-21 18:01:23 +0200122 lro := logReaderOptions{
123 withStreamBufferSize: 128,
124 }
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200125
126 for _, opt := range opts {
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200127 opt(&lro)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200128 }
129
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200130 if lro.onlyLeveled && lro.onlyRaw {
Serge Bazanskib0272182020-11-02 18:39:44 +0100131 return nil, ErrRawAndLeveled
Serge Bazanskif68153c2020-10-26 13:54:37 +0100132 }
133
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200134 var filters []filter
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200135 if lro.onlyLeveled {
Serge Bazanskif68153c2020-10-26 13:54:37 +0100136 filters = append(filters, filterOnlyLeveled)
137 }
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200138 if lro.onlyRaw {
Serge Bazanskif68153c2020-10-26 13:54:37 +0100139 filters = append(filters, filterOnlyRaw)
140 }
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200141 if lro.withChildren {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200142 filters = append(filters, filterSubtree(dn))
143 } else {
144 filters = append(filters, filterExact(dn))
145 }
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200146 if lro.leveledWithMinimumSeverity != "" {
147 filters = append(filters, filterSeverity(lro.leveledWithMinimumSeverity))
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200148 }
149
150 var entries []*entry
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200151 if lro.withBacklog > 0 || lro.withBacklog == BacklogAllAvailable {
152 if lro.withChildren {
153 entries = l.journal.scanEntries(lro.withBacklog, filters...)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200154 } else {
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200155 entries = l.journal.getEntries(lro.withBacklog, dn, filters...)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200156 }
157 }
158
Tim Windelschmidtce0ea8f2025-07-21 19:13:43 +0200159 lr := &LogReader{}
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200160 if lro.withStream {
Tim Windelschmidtce0ea8f2025-07-21 19:13:43 +0200161 sub := &subscriber{
Tim Windelschmidtc2635922025-07-21 18:01:23 +0200162 dataC: make(chan *LogEntry, lro.withStreamBufferSize),
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200163 doneC: make(chan struct{}),
164 filters: filters,
165 }
166 l.journal.subscribe(sub)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200167
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200168 lr.Stream = sub.dataC
169 lr.done = sub.doneC
170 lr.missed = &sub.missed
171 }
Tim Windelschmidtce0ea8f2025-07-21 19:13:43 +0200172
173 lr.Backlog = make([]*LogEntry, len(entries))
174 for i, entry := range entries {
175 lr.Backlog[i] = entry.external()
176 }
177
Serge Bazanskif68153c2020-10-26 13:54:37 +0100178 return lr, nil
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200179}