blob: fed202ef5c9674fbf7ce33e5910ca20f3a563788 [file] [log] [blame]
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package logtree
18
Serge Bazanskif68153c2020-10-26 13:54:37 +010019import (
Serge Bazanskib0272182020-11-02 18:39:44 +010020 "errors"
Serge Bazanskif68153c2020-10-26 13:54:37 +010021 "sync/atomic"
Serge Bazanskif68153c2020-10-26 13:54:37 +010022)
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020023
24// LogReadOption describes options for the LogTree.Read call.
25type LogReadOption struct {
Serge Bazanskif68153c2020-10-26 13:54:37 +010026 withChildren bool
27 withStream bool
28 withBacklog int
29 onlyLeveled bool
30 onlyRaw bool
31 leveledWithMinimumSeverity Severity
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020032}
33
34// WithChildren makes Read return/stream data for both a given DN and all its children.
35func WithChildren() LogReadOption { return LogReadOption{withChildren: true} }
36
37// WithStream makes Read return a stream of data. This works alongside WithBacklog to create a read-and-stream
38// construct.
39func WithStream() LogReadOption { return LogReadOption{withStream: true} }
40
41// WithBacklog makes Read return already recorded log entries, up to count elements.
42func WithBacklog(count int) LogReadOption { return LogReadOption{withBacklog: count} }
43
44// BacklogAllAvailable makes WithBacklog return all backlogged log data that logtree possesses.
45const BacklogAllAvailable int = -1
46
Serge Bazanskif68153c2020-10-26 13:54:37 +010047func OnlyRaw() LogReadOption { return LogReadOption{onlyRaw: true} }
48
49func OnlyLeveled() LogReadOption { return LogReadOption{onlyLeveled: true} }
50
51// LeveledWithMinimumSeverity makes Read return only log entries that are at least at a given Severity. If only leveled
52// entries are needed, OnlyLeveled must be used. This is a no-op when OnlyRaw is used.
53func LeveledWithMinimumSeverity(s Severity) LogReadOption {
54 return LogReadOption{leveledWithMinimumSeverity: s}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020055}
56
57// LogReader permits reading an already existing backlog of log entries and to stream further ones.
58type LogReader struct {
59 // Backlog are the log entries already logged by LogTree. This will only be set if WithBacklog has been passed to
60 // Read.
61 Backlog []*LogEntry
62 // Stream is a channel of new entries as received live by LogTree. This will only be set if WithStream has been
63 // passed to Read. In this case, entries from this channel must be read as fast as possible by the consumer in order
64 // to prevent missing entries.
65 Stream <-chan *LogEntry
66 // done is channel used to signal (by closing) that the log consumer is not interested in more Stream data.
67 done chan<- struct{}
68 // missed is an atomic integer pointer that tells the subscriber how many messages in Stream they missed. This
69 // pointer is nil if no streaming has been requested.
70 missed *uint64
71}
72
73// Missed returns the amount of entries that were missed from Stream (as the channel was not drained fast enough).
74func (l *LogReader) Missed() uint64 {
75 // No Stream.
76 if l.missed == nil {
77 return 0
78 }
79 return atomic.LoadUint64(l.missed)
80}
81
82// Close closes the LogReader's Stream. This must be called once the Reader does not wish to receive streaming messages
83// anymore.
84func (l *LogReader) Close() {
85 if l.done != nil {
86 close(l.done)
87 }
88}
89
Serge Bazanskib0272182020-11-02 18:39:44 +010090var (
91 ErrRawAndLeveled = errors.New("cannot return logs that are simultaneously OnlyRaw and OnlyLeveled")
92)
93
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020094// Read and/or stream entries from a LogTree. The returned LogReader is influenced by the LogReadOptions passed, which
95// influence whether the Read will return existing entries, a stream, or both. In addition the options also dictate
96// whether only entries for that particular DN are returned, or for all sub-DNs as well.
Serge Bazanskif68153c2020-10-26 13:54:37 +010097func (l *LogTree) Read(dn DN, opts ...LogReadOption) (*LogReader, error) {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020098 l.journal.mu.RLock()
99 defer l.journal.mu.RUnlock()
100
101 var backlog int
102 var stream bool
103 var recursive bool
Serge Bazanskif68153c2020-10-26 13:54:37 +0100104 var leveledSeverity Severity
105 var onlyRaw, onlyLeveled bool
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200106
107 for _, opt := range opts {
108 if opt.withBacklog > 0 || opt.withBacklog == BacklogAllAvailable {
109 backlog = opt.withBacklog
110 }
111 if opt.withStream {
112 stream = true
113 }
114 if opt.withChildren {
115 recursive = true
116 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100117 if opt.leveledWithMinimumSeverity != "" {
118 leveledSeverity = opt.leveledWithMinimumSeverity
119 }
120 if opt.onlyLeveled {
121 onlyLeveled = true
122 }
123 if opt.onlyRaw {
124 onlyRaw = true
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200125 }
126 }
127
Serge Bazanskif68153c2020-10-26 13:54:37 +0100128 if onlyLeveled && onlyRaw {
Serge Bazanskib0272182020-11-02 18:39:44 +0100129 return nil, ErrRawAndLeveled
Serge Bazanskif68153c2020-10-26 13:54:37 +0100130 }
131
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200132 var filters []filter
Serge Bazanskif68153c2020-10-26 13:54:37 +0100133 if onlyLeveled {
134 filters = append(filters, filterOnlyLeveled)
135 }
136 if onlyRaw {
137 filters = append(filters, filterOnlyRaw)
138 }
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200139 if recursive {
140 filters = append(filters, filterSubtree(dn))
141 } else {
142 filters = append(filters, filterExact(dn))
143 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100144 if leveledSeverity != "" {
145 filters = append(filters, filterSeverity(leveledSeverity))
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200146 }
147
148 var entries []*entry
149 if backlog > 0 || backlog == BacklogAllAvailable {
150 // TODO(q3k): pass over the backlog count to scanEntries/getEntries, instead of discarding them here.
151 if recursive {
152 entries = l.journal.scanEntries(filters...)
153 } else {
154 entries = l.journal.getEntries(dn, filters...)
155 }
156 if backlog != BacklogAllAvailable && len(entries) > backlog {
157 entries = entries[:backlog]
158 }
159 }
160
161 var sub *subscriber
162 if stream {
163 sub = &subscriber{
164 // TODO(q3k): make buffer size configurable
165 dataC: make(chan *LogEntry, 128),
166 doneC: make(chan struct{}),
167 filters: filters,
168 }
169 l.journal.subscribe(sub)
170 }
171
172 lr := &LogReader{}
173 lr.Backlog = make([]*LogEntry, len(entries))
174 for i, entry := range entries {
Serge Bazanskif68153c2020-10-26 13:54:37 +0100175 lr.Backlog[i] = entry.external()
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200176 }
177 if stream {
178 lr.Stream = sub.dataC
179 lr.done = sub.doneC
180 lr.missed = &sub.missed
181 }
Serge Bazanskif68153c2020-10-26 13:54:37 +0100182 return lr, nil
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200183}