blob: 30ceccf8333718e4a217620503ba056a968eaf5a [file] [log] [blame]
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package logtree
18
Serge Bazanskif68153c2020-10-26 13:54:37 +010019import (
Serge Bazanskib0272182020-11-02 18:39:44 +010020 "errors"
Serge Bazanskif68153c2020-10-26 13:54:37 +010021 "sync/atomic"
Serge Bazanski3c5d0632024-09-12 10:49:12 +000022
23 "source.monogon.dev/go/logging"
Serge Bazanskif68153c2020-10-26 13:54:37 +010024)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020025
26// LogReadOption describes options for the LogTree.Read call.
27type LogReadOption struct {
Serge Bazanskif68153c2020-10-26 13:54:37 +010028 withChildren bool
29 withStream bool
30 withBacklog int
31 onlyLeveled bool
32 onlyRaw bool
Serge Bazanski3c5d0632024-09-12 10:49:12 +000033 leveledWithMinimumSeverity logging.Severity
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020034}
35
Serge Bazanski216fe7b2021-05-21 18:36:16 +020036// WithChildren makes Read return/stream data for both a given DN and all its
37// children.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020038func WithChildren() LogReadOption { return LogReadOption{withChildren: true} }
39
Serge Bazanski216fe7b2021-05-21 18:36:16 +020040// WithStream makes Read return a stream of data. This works alongside WithBacklog
41// to create a read-and-stream construct.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020042func WithStream() LogReadOption { return LogReadOption{withStream: true} }
43
Serge Bazanski216fe7b2021-05-21 18:36:16 +020044// WithBacklog makes Read return already recorded log entries, up to count
45// elements.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020046func WithBacklog(count int) LogReadOption { return LogReadOption{withBacklog: count} }
47
Serge Bazanski216fe7b2021-05-21 18:36:16 +020048// BacklogAllAvailable makes WithBacklog return all backlogged log data that
49// logtree possesses.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020050const BacklogAllAvailable int = -1
51
Serge Bazanskif68153c2020-10-26 13:54:37 +010052func OnlyRaw() LogReadOption { return LogReadOption{onlyRaw: true} }
53
54func OnlyLeveled() LogReadOption { return LogReadOption{onlyLeveled: true} }
55
Serge Bazanski216fe7b2021-05-21 18:36:16 +020056// LeveledWithMinimumSeverity makes Read return only log entries that are at least
57// at a given Severity. If only leveled entries are needed, OnlyLeveled must be
58// used. This is a no-op when OnlyRaw is used.
Serge Bazanski3c5d0632024-09-12 10:49:12 +000059func LeveledWithMinimumSeverity(s logging.Severity) LogReadOption {
Serge Bazanskif68153c2020-10-26 13:54:37 +010060 return LogReadOption{leveledWithMinimumSeverity: s}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020061}
62
Serge Bazanski216fe7b2021-05-21 18:36:16 +020063// LogReader permits reading an already existing backlog of log entries and to
64// stream further ones.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020065type LogReader struct {
Serge Bazanski216fe7b2021-05-21 18:36:16 +020066 // Backlog are the log entries already logged by LogTree. This will only be set if
67 // WithBacklog has been passed to Read.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020068 Backlog []*LogEntry
Serge Bazanski216fe7b2021-05-21 18:36:16 +020069 // Stream is a channel of new entries as received live by LogTree. This will only
70 // be set if WithStream has been passed to Read. In this case, entries from this
71 // channel must be read as fast as possible by the consumer in order to prevent
72 // missing entries.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020073 Stream <-chan *LogEntry
Serge Bazanski216fe7b2021-05-21 18:36:16 +020074 // done is channel used to signal (by closing) that the log consumer is not
75 // interested in more Stream data.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020076 done chan<- struct{}
Serge Bazanski216fe7b2021-05-21 18:36:16 +020077 // missed is an atomic integer pointer that tells the subscriber how many messages
78 // in Stream they missed. This pointer is nil if no streaming has been requested.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020079 missed *uint64
80}
81
Serge Bazanski216fe7b2021-05-21 18:36:16 +020082// Missed returns the amount of entries that were missed from Stream (as the
83// channel was not drained fast enough).
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020084func (l *LogReader) Missed() uint64 {
85 // No Stream.
86 if l.missed == nil {
87 return 0
88 }
89 return atomic.LoadUint64(l.missed)
90}
91
Serge Bazanski216fe7b2021-05-21 18:36:16 +020092// Close closes the LogReader's Stream. This must be called once the Reader does
93// not wish to receive streaming messages anymore.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020094func (l *LogReader) Close() {
95 if l.done != nil {
96 close(l.done)
97 }
98}
99
Serge Bazanskib0272182020-11-02 18:39:44 +0100100var (
101 ErrRawAndLeveled = errors.New("cannot return logs that are simultaneously OnlyRaw and OnlyLeveled")
102)
103
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200104// Read and/or stream entries from a LogTree. The returned LogReader is influenced
105// by the LogReadOptions passed, which influence whether the Read will return
106// existing entries, a stream, or both. In addition the options also dictate
107// whether only entries for that particular DN are returned, or for all sub-DNs as
108// well.
Serge Bazanskif68153c2020-10-26 13:54:37 +0100109func (l *LogTree) Read(dn DN, opts ...LogReadOption) (*LogReader, error) {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200110 l.journal.mu.RLock()
111 defer l.journal.mu.RUnlock()
112
113 var backlog int
114 var stream bool
115 var recursive bool
Serge Bazanski3c5d0632024-09-12 10:49:12 +0000116 var leveledSeverity logging.Severity
Serge Bazanskif68153c2020-10-26 13:54:37 +0100117 var onlyRaw, onlyLeveled bool
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200118
119 for _, opt := range opts {
120 if opt.withBacklog > 0 || opt.withBacklog == BacklogAllAvailable {
121 backlog = opt.withBacklog
122 }
123 if opt.withStream {
124 stream = true
125 }
126 if opt.withChildren {
127 recursive = true
128 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100129 if opt.leveledWithMinimumSeverity != "" {
130 leveledSeverity = opt.leveledWithMinimumSeverity
131 }
132 if opt.onlyLeveled {
133 onlyLeveled = true
134 }
135 if opt.onlyRaw {
136 onlyRaw = true
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200137 }
138 }
139
Serge Bazanskif68153c2020-10-26 13:54:37 +0100140 if onlyLeveled && onlyRaw {
Serge Bazanskib0272182020-11-02 18:39:44 +0100141 return nil, ErrRawAndLeveled
Serge Bazanskif68153c2020-10-26 13:54:37 +0100142 }
143
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200144 var filters []filter
Serge Bazanskif68153c2020-10-26 13:54:37 +0100145 if onlyLeveled {
146 filters = append(filters, filterOnlyLeveled)
147 }
148 if onlyRaw {
149 filters = append(filters, filterOnlyRaw)
150 }
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200151 if recursive {
152 filters = append(filters, filterSubtree(dn))
153 } else {
154 filters = append(filters, filterExact(dn))
155 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100156 if leveledSeverity != "" {
157 filters = append(filters, filterSeverity(leveledSeverity))
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200158 }
159
160 var entries []*entry
161 if backlog > 0 || backlog == BacklogAllAvailable {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200162 if recursive {
Serge Bazanski8fab0142023-03-29 16:48:16 +0200163 entries = l.journal.scanEntries(backlog, filters...)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200164 } else {
Serge Bazanski8fab0142023-03-29 16:48:16 +0200165 entries = l.journal.getEntries(backlog, dn, filters...)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200166 }
167 }
168
169 var sub *subscriber
170 if stream {
171 sub = &subscriber{
172 // TODO(q3k): make buffer size configurable
173 dataC: make(chan *LogEntry, 128),
174 doneC: make(chan struct{}),
175 filters: filters,
176 }
177 l.journal.subscribe(sub)
178 }
179
180 lr := &LogReader{}
181 lr.Backlog = make([]*LogEntry, len(entries))
182 for i, entry := range entries {
Serge Bazanskif68153c2020-10-26 13:54:37 +0100183 lr.Backlog[i] = entry.external()
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200184 }
185 if stream {
186 lr.Stream = sub.dataC
187 lr.done = sub.doneC
188 lr.missed = &sub.missed
189 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100190 return lr, nil
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200191}