blob: 1b4f90d6bb49b6a86e60fd34bc2734b0aec8a38d [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02002// SPDX-License-Identifier: Apache-2.0
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02003
4package logtree
5
Serge Bazanskif68153c2020-10-26 13:54:37 +01006import (
Serge Bazanskib0272182020-11-02 18:39:44 +01007 "errors"
Serge Bazanskif68153c2020-10-26 13:54:37 +01008 "sync/atomic"
Serge Bazanski3c5d0632024-09-12 10:49:12 +00009
10 "source.monogon.dev/go/logging"
Serge Bazanskif68153c2020-10-26 13:54:37 +010011)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020012
Tim Windelschmidt01491a72025-07-23 21:49:11 +020013type ReadDirection int
14
15const (
16 ReadDirectionAfter ReadDirection = iota
17 ReadDirectionBefore
18)
19
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020020// LogReadOption describes options for the LogTree.Read call.
Tim Windelschmidt50093be2025-07-21 17:39:09 +020021type LogReadOption func(*logReaderOptions)
22
23type logReaderOptions struct {
Serge Bazanskif68153c2020-10-26 13:54:37 +010024 withChildren bool
25 withStream bool
26 withBacklog int
27 onlyLeveled bool
28 onlyRaw bool
Serge Bazanski3c5d0632024-09-12 10:49:12 +000029 leveledWithMinimumSeverity logging.Severity
Tim Windelschmidtc2635922025-07-21 18:01:23 +020030 withStreamBufferSize int
Tim Windelschmidt01491a72025-07-23 21:49:11 +020031 withStartPosition int
32 startPositionReadDirection ReadDirection
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020033}
34
Serge Bazanski216fe7b2021-05-21 18:36:16 +020035// WithChildren makes Read return/stream data for both a given DN and all its
36// children.
Tim Windelschmidt50093be2025-07-21 17:39:09 +020037func WithChildren() LogReadOption {
38 return func(lro *logReaderOptions) {
39 lro.withChildren = true
40 }
41}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020042
Serge Bazanski216fe7b2021-05-21 18:36:16 +020043// WithStream makes Read return a stream of data. This works alongside WithBacklog
44// to create a read-and-stream construct.
Tim Windelschmidt50093be2025-07-21 17:39:09 +020045func WithStream() LogReadOption {
46 return func(lro *logReaderOptions) {
47 lro.withStream = true
48 }
49}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020050
Tim Windelschmidtc2635922025-07-21 18:01:23 +020051// WithStreamBuffer applies WithStream and overrides the default stream buffer
52// size of 128.
53func WithStreamBuffer(size int) LogReadOption {
54 return func(lro *logReaderOptions) {
55 lro.withStreamBufferSize = size
56 lro.withStream = true
57 }
58}
59
Serge Bazanski216fe7b2021-05-21 18:36:16 +020060// WithBacklog makes Read return already recorded log entries, up to count
61// elements.
Tim Windelschmidt50093be2025-07-21 17:39:09 +020062func WithBacklog(count int) LogReadOption {
63 return func(lro *logReaderOptions) { lro.withBacklog = count }
64}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020065
Tim Windelschmidt01491a72025-07-23 21:49:11 +020066// WithStartPosition makes Read return log entries from the given position.
67// It requires WithBacklog to be provided.
68//
69// The Journal keeps a global counter for all logs, starting at 0 for the
70// first message. Based on this the user can read entries
71// (based on the ReadDirection option) either after or before the given
72// position.
73func WithStartPosition(pos int, direction ReadDirection) LogReadOption {
74 return func(lro *logReaderOptions) {
75 lro.withStartPosition = pos
76 lro.startPositionReadDirection = direction
77 }
78}
79
Serge Bazanski216fe7b2021-05-21 18:36:16 +020080// BacklogAllAvailable makes WithBacklog return all backlogged log data that
81// logtree possesses.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020082const BacklogAllAvailable int = -1
83
Tim Windelschmidt50093be2025-07-21 17:39:09 +020084func OnlyRaw() LogReadOption { return func(lro *logReaderOptions) { lro.onlyRaw = true } }
Serge Bazanskif68153c2020-10-26 13:54:37 +010085
Tim Windelschmidt50093be2025-07-21 17:39:09 +020086func OnlyLeveled() LogReadOption { return func(lro *logReaderOptions) { lro.onlyLeveled = true } }
Serge Bazanskif68153c2020-10-26 13:54:37 +010087
Serge Bazanski216fe7b2021-05-21 18:36:16 +020088// LeveledWithMinimumSeverity makes Read return only log entries that are at least
89// at a given Severity. If only leveled entries are needed, OnlyLeveled must be
90// used. This is a no-op when OnlyRaw is used.
Serge Bazanski3c5d0632024-09-12 10:49:12 +000091func LeveledWithMinimumSeverity(s logging.Severity) LogReadOption {
Tim Windelschmidt50093be2025-07-21 17:39:09 +020092 return func(lro *logReaderOptions) { lro.leveledWithMinimumSeverity = s }
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020093}
94
Serge Bazanski216fe7b2021-05-21 18:36:16 +020095// LogReader permits reading an already existing backlog of log entries and to
96// stream further ones.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020097type LogReader struct {
Serge Bazanski216fe7b2021-05-21 18:36:16 +020098 // Backlog are the log entries already logged by LogTree. This will only be set if
99 // WithBacklog has been passed to Read.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200100 Backlog []*LogEntry
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200101 // Stream is a channel of new entries as received live by LogTree. This will only
102 // be set if WithStream has been passed to Read. In this case, entries from this
103 // channel must be read as fast as possible by the consumer in order to prevent
104 // missing entries.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200105 Stream <-chan *LogEntry
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200106 // done is channel used to signal (by closing) that the log consumer is not
107 // interested in more Stream data.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200108 done chan<- struct{}
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200109 // missed is an atomic integer pointer that tells the subscriber how many messages
110 // in Stream they missed. This pointer is nil if no streaming has been requested.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200111 missed *uint64
112}
113
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200114// Missed returns the amount of entries that were missed from Stream (as the
115// channel was not drained fast enough).
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200116func (l *LogReader) Missed() uint64 {
117 // No Stream.
118 if l.missed == nil {
119 return 0
120 }
121 return atomic.LoadUint64(l.missed)
122}
123
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200124// Close closes the LogReader's Stream. This must be called once the Reader does
125// not wish to receive streaming messages anymore.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200126func (l *LogReader) Close() {
127 if l.done != nil {
128 close(l.done)
129 }
130}
131
Serge Bazanskib0272182020-11-02 18:39:44 +0100132var (
Tim Windelschmidt01491a72025-07-23 21:49:11 +0200133 ErrRawAndLeveled = errors.New("cannot return logs that are simultaneously OnlyRaw and OnlyLeveled")
134 ErrStartPositionWithoutBacklog = errors.New("cannot return logs that are WithStartingPosition and missing WithBacklog")
Serge Bazanskib0272182020-11-02 18:39:44 +0100135)
136
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200137// Read and/or stream entries from a LogTree. The returned LogReader is influenced
138// by the LogReadOptions passed, which influence whether the Read will return
139// existing entries, a stream, or both. In addition the options also dictate
140// whether only entries for that particular DN are returned, or for all sub-DNs as
141// well.
Serge Bazanskif68153c2020-10-26 13:54:37 +0100142func (l *LogTree) Read(dn DN, opts ...LogReadOption) (*LogReader, error) {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200143 l.journal.mu.RLock()
144 defer l.journal.mu.RUnlock()
145
Tim Windelschmidtc2635922025-07-21 18:01:23 +0200146 lro := logReaderOptions{
147 withStreamBufferSize: 128,
Tim Windelschmidt01491a72025-07-23 21:49:11 +0200148 withStartPosition: -1,
Tim Windelschmidtc2635922025-07-21 18:01:23 +0200149 }
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200150
151 for _, opt := range opts {
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200152 opt(&lro)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200153 }
154
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200155 if lro.onlyLeveled && lro.onlyRaw {
Serge Bazanskib0272182020-11-02 18:39:44 +0100156 return nil, ErrRawAndLeveled
Serge Bazanskif68153c2020-10-26 13:54:37 +0100157 }
158
Tim Windelschmidt01491a72025-07-23 21:49:11 +0200159 isWithBacklog := lro.withBacklog > 0 || lro.withBacklog == BacklogAllAvailable
160 if lro.withStartPosition != -1 && !isWithBacklog {
161 return nil, ErrStartPositionWithoutBacklog
162 }
163
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200164 var filters []filter
Tim Windelschmidt01491a72025-07-23 21:49:11 +0200165 if lro.withStartPosition != -1 {
166 filters = append(filters, filterStartPosition(lro.withBacklog, lro.withStartPosition, lro.startPositionReadDirection))
167 }
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200168 if lro.onlyLeveled {
Serge Bazanskif68153c2020-10-26 13:54:37 +0100169 filters = append(filters, filterOnlyLeveled)
170 }
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200171 if lro.onlyRaw {
Serge Bazanskif68153c2020-10-26 13:54:37 +0100172 filters = append(filters, filterOnlyRaw)
173 }
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200174 if lro.withChildren {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200175 filters = append(filters, filterSubtree(dn))
176 } else {
177 filters = append(filters, filterExact(dn))
178 }
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200179 if lro.leveledWithMinimumSeverity != "" {
180 filters = append(filters, filterSeverity(lro.leveledWithMinimumSeverity))
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200181 }
182
183 var entries []*entry
Tim Windelschmidt01491a72025-07-23 21:49:11 +0200184 if isWithBacklog {
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200185 if lro.withChildren {
186 entries = l.journal.scanEntries(lro.withBacklog, filters...)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200187 } else {
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200188 entries = l.journal.getEntries(lro.withBacklog, dn, filters...)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200189 }
190 }
191
Tim Windelschmidtce0ea8f2025-07-21 19:13:43 +0200192 lr := &LogReader{}
Tim Windelschmidt50093be2025-07-21 17:39:09 +0200193 if lro.withStream {
Tim Windelschmidtce0ea8f2025-07-21 19:13:43 +0200194 sub := &subscriber{
Tim Windelschmidtc2635922025-07-21 18:01:23 +0200195 dataC: make(chan *LogEntry, lro.withStreamBufferSize),
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200196 doneC: make(chan struct{}),
197 filters: filters,
198 }
199 l.journal.subscribe(sub)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200200
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200201 lr.Stream = sub.dataC
202 lr.done = sub.doneC
203 lr.missed = &sub.missed
204 }
Tim Windelschmidtce0ea8f2025-07-21 19:13:43 +0200205
206 lr.Backlog = make([]*LogEntry, len(entries))
207 for i, entry := range entries {
208 lr.Backlog[i] = entry.external()
209 }
210
Serge Bazanskif68153c2020-10-26 13:54:37 +0100211 return lr, nil
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200212}