blob: 1babe1ef2a755ac72bdfa282d3b46a78e45e27f7 [file] [log] [blame]
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package logtree
18
Serge Bazanskif68153c2020-10-26 13:54:37 +010019import (
Serge Bazanskib0272182020-11-02 18:39:44 +010020 "errors"
Serge Bazanskif68153c2020-10-26 13:54:37 +010021 "sync/atomic"
Serge Bazanskif68153c2020-10-26 13:54:37 +010022)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020023
24// LogReadOption describes options for the LogTree.Read call.
25type LogReadOption struct {
Serge Bazanskif68153c2020-10-26 13:54:37 +010026 withChildren bool
27 withStream bool
28 withBacklog int
29 onlyLeveled bool
30 onlyRaw bool
31 leveledWithMinimumSeverity Severity
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020032}
33
Serge Bazanski216fe7b2021-05-21 18:36:16 +020034// WithChildren makes Read return/stream data for both a given DN and all its
35// children.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020036func WithChildren() LogReadOption { return LogReadOption{withChildren: true} }
37
Serge Bazanski216fe7b2021-05-21 18:36:16 +020038// WithStream makes Read return a stream of data. This works alongside WithBacklog
39// to create a read-and-stream construct.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020040func WithStream() LogReadOption { return LogReadOption{withStream: true} }
41
Serge Bazanski216fe7b2021-05-21 18:36:16 +020042// WithBacklog makes Read return already recorded log entries, up to count
43// elements.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020044func WithBacklog(count int) LogReadOption { return LogReadOption{withBacklog: count} }
45
Serge Bazanski216fe7b2021-05-21 18:36:16 +020046// BacklogAllAvailable makes WithBacklog return all backlogged log data that
47// logtree possesses.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020048const BacklogAllAvailable int = -1
49
Serge Bazanskif68153c2020-10-26 13:54:37 +010050func OnlyRaw() LogReadOption { return LogReadOption{onlyRaw: true} }
51
52func OnlyLeveled() LogReadOption { return LogReadOption{onlyLeveled: true} }
53
Serge Bazanski216fe7b2021-05-21 18:36:16 +020054// LeveledWithMinimumSeverity makes Read return only log entries that are at least
55// at a given Severity. If only leveled entries are needed, OnlyLeveled must be
56// used. This is a no-op when OnlyRaw is used.
Serge Bazanskif68153c2020-10-26 13:54:37 +010057func LeveledWithMinimumSeverity(s Severity) LogReadOption {
58 return LogReadOption{leveledWithMinimumSeverity: s}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020059}
60
Serge Bazanski216fe7b2021-05-21 18:36:16 +020061// LogReader permits reading an already existing backlog of log entries and to
62// stream further ones.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020063type LogReader struct {
Serge Bazanski216fe7b2021-05-21 18:36:16 +020064 // Backlog are the log entries already logged by LogTree. This will only be set if
65 // WithBacklog has been passed to Read.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020066 Backlog []*LogEntry
Serge Bazanski216fe7b2021-05-21 18:36:16 +020067 // Stream is a channel of new entries as received live by LogTree. This will only
68 // be set if WithStream has been passed to Read. In this case, entries from this
69 // channel must be read as fast as possible by the consumer in order to prevent
70 // missing entries.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020071 Stream <-chan *LogEntry
Serge Bazanski216fe7b2021-05-21 18:36:16 +020072 // done is channel used to signal (by closing) that the log consumer is not
73 // interested in more Stream data.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020074 done chan<- struct{}
Serge Bazanski216fe7b2021-05-21 18:36:16 +020075 // missed is an atomic integer pointer that tells the subscriber how many messages
76 // in Stream they missed. This pointer is nil if no streaming has been requested.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020077 missed *uint64
78}
79
Serge Bazanski216fe7b2021-05-21 18:36:16 +020080// Missed returns the amount of entries that were missed from Stream (as the
81// channel was not drained fast enough).
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020082func (l *LogReader) Missed() uint64 {
83 // No Stream.
84 if l.missed == nil {
85 return 0
86 }
87 return atomic.LoadUint64(l.missed)
88}
89
Serge Bazanski216fe7b2021-05-21 18:36:16 +020090// Close closes the LogReader's Stream. This must be called once the Reader does
91// not wish to receive streaming messages anymore.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020092func (l *LogReader) Close() {
93 if l.done != nil {
94 close(l.done)
95 }
96}
97
Serge Bazanskib0272182020-11-02 18:39:44 +010098var (
99 ErrRawAndLeveled = errors.New("cannot return logs that are simultaneously OnlyRaw and OnlyLeveled")
100)
101
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200102// Read and/or stream entries from a LogTree. The returned LogReader is influenced
103// by the LogReadOptions passed, which influence whether the Read will return
104// existing entries, a stream, or both. In addition the options also dictate
105// whether only entries for that particular DN are returned, or for all sub-DNs as
106// well.
Serge Bazanskif68153c2020-10-26 13:54:37 +0100107func (l *LogTree) Read(dn DN, opts ...LogReadOption) (*LogReader, error) {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200108 l.journal.mu.RLock()
109 defer l.journal.mu.RUnlock()
110
111 var backlog int
112 var stream bool
113 var recursive bool
Serge Bazanskif68153c2020-10-26 13:54:37 +0100114 var leveledSeverity Severity
115 var onlyRaw, onlyLeveled bool
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200116
117 for _, opt := range opts {
118 if opt.withBacklog > 0 || opt.withBacklog == BacklogAllAvailable {
119 backlog = opt.withBacklog
120 }
121 if opt.withStream {
122 stream = true
123 }
124 if opt.withChildren {
125 recursive = true
126 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100127 if opt.leveledWithMinimumSeverity != "" {
128 leveledSeverity = opt.leveledWithMinimumSeverity
129 }
130 if opt.onlyLeveled {
131 onlyLeveled = true
132 }
133 if opt.onlyRaw {
134 onlyRaw = true
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200135 }
136 }
137
Serge Bazanskif68153c2020-10-26 13:54:37 +0100138 if onlyLeveled && onlyRaw {
Serge Bazanskib0272182020-11-02 18:39:44 +0100139 return nil, ErrRawAndLeveled
Serge Bazanskif68153c2020-10-26 13:54:37 +0100140 }
141
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200142 var filters []filter
Serge Bazanskif68153c2020-10-26 13:54:37 +0100143 if onlyLeveled {
144 filters = append(filters, filterOnlyLeveled)
145 }
146 if onlyRaw {
147 filters = append(filters, filterOnlyRaw)
148 }
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200149 if recursive {
150 filters = append(filters, filterSubtree(dn))
151 } else {
152 filters = append(filters, filterExact(dn))
153 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100154 if leveledSeverity != "" {
155 filters = append(filters, filterSeverity(leveledSeverity))
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200156 }
157
158 var entries []*entry
159 if backlog > 0 || backlog == BacklogAllAvailable {
160 // TODO(q3k): pass over the backlog count to scanEntries/getEntries, instead of discarding them here.
161 if recursive {
162 entries = l.journal.scanEntries(filters...)
163 } else {
164 entries = l.journal.getEntries(dn, filters...)
165 }
166 if backlog != BacklogAllAvailable && len(entries) > backlog {
167 entries = entries[:backlog]
168 }
169 }
170
171 var sub *subscriber
172 if stream {
173 sub = &subscriber{
174 // TODO(q3k): make buffer size configurable
175 dataC: make(chan *LogEntry, 128),
176 doneC: make(chan struct{}),
177 filters: filters,
178 }
179 l.journal.subscribe(sub)
180 }
181
182 lr := &LogReader{}
183 lr.Backlog = make([]*LogEntry, len(entries))
184 for i, entry := range entries {
Serge Bazanskif68153c2020-10-26 13:54:37 +0100185 lr.Backlog[i] = entry.external()
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200186 }
187 if stream {
188 lr.Stream = sub.dataC
189 lr.done = sub.doneC
190 lr.missed = &sub.missed
191 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100192 return lr, nil
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200193}