blob: c10aa7b1396f77ce164482db9d3424239f715278 [file] [log] [blame]
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package logtree
18
19import "sync/atomic"
20
21// LogReadOption describes options for the LogTree.Read call.
22type LogReadOption struct {
23 withChildren bool
24 withStream bool
25 withBacklog int
26 withMinimumSeverity Severity
27}
28
29// WithChildren makes Read return/stream data for both a given DN and all its children.
30func WithChildren() LogReadOption { return LogReadOption{withChildren: true} }
31
32// WithStream makes Read return a stream of data. This works alongside WithBacklog to create a read-and-stream
33// construct.
34func WithStream() LogReadOption { return LogReadOption{withStream: true} }
35
36// WithBacklog makes Read return already recorded log entries, up to count elements.
37func WithBacklog(count int) LogReadOption { return LogReadOption{withBacklog: count} }
38
39// BacklogAllAvailable makes WithBacklog return all backlogged log data that logtree possesses.
40const BacklogAllAvailable int = -1
41
42// WithMinimumSeverity makes Read return only log entries that are at least at a given Severity.
43func WithMinimumSeverity(s Severity) LogReadOption {
44 return LogReadOption{withMinimumSeverity: s}
45}
46
47// LogReader permits reading an already existing backlog of log entries and to stream further ones.
48type LogReader struct {
49 // Backlog are the log entries already logged by LogTree. This will only be set if WithBacklog has been passed to
50 // Read.
51 Backlog []*LogEntry
52 // Stream is a channel of new entries as received live by LogTree. This will only be set if WithStream has been
53 // passed to Read. In this case, entries from this channel must be read as fast as possible by the consumer in order
54 // to prevent missing entries.
55 Stream <-chan *LogEntry
56 // done is channel used to signal (by closing) that the log consumer is not interested in more Stream data.
57 done chan<- struct{}
58 // missed is an atomic integer pointer that tells the subscriber how many messages in Stream they missed. This
59 // pointer is nil if no streaming has been requested.
60 missed *uint64
61}
62
63// Missed returns the amount of entries that were missed from Stream (as the channel was not drained fast enough).
64func (l *LogReader) Missed() uint64 {
65 // No Stream.
66 if l.missed == nil {
67 return 0
68 }
69 return atomic.LoadUint64(l.missed)
70}
71
72// Close closes the LogReader's Stream. This must be called once the Reader does not wish to receive streaming messages
73// anymore.
74func (l *LogReader) Close() {
75 if l.done != nil {
76 close(l.done)
77 }
78}
79
80type LogEntry struct {
Serge Bazanski1bfa0c22020-10-14 16:45:07 +020081 *LeveledPayload
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020082 DN DN
83}
84
85// Read and/or stream entries from a LogTree. The returned LogReader is influenced by the LogReadOptions passed, which
86// influence whether the Read will return existing entries, a stream, or both. In addition the options also dictate
87// whether only entries for that particular DN are returned, or for all sub-DNs as well.
88func (l *LogTree) Read(dn DN, opts ...LogReadOption) *LogReader {
89 l.journal.mu.RLock()
90 defer l.journal.mu.RUnlock()
91
92 var backlog int
93 var stream bool
94 var recursive bool
95 var severity Severity
96
97 for _, opt := range opts {
98 if opt.withBacklog > 0 || opt.withBacklog == BacklogAllAvailable {
99 backlog = opt.withBacklog
100 }
101 if opt.withStream {
102 stream = true
103 }
104 if opt.withChildren {
105 recursive = true
106 }
107 if opt.withMinimumSeverity != "" {
108 severity = opt.withMinimumSeverity
109 }
110 }
111
112 var filters []filter
113 if recursive {
114 filters = append(filters, filterSubtree(dn))
115 } else {
116 filters = append(filters, filterExact(dn))
117 }
118 if severity != "" {
119 filters = append(filters, filterSeverity(severity))
120 }
121
122 var entries []*entry
123 if backlog > 0 || backlog == BacklogAllAvailable {
124 // TODO(q3k): pass over the backlog count to scanEntries/getEntries, instead of discarding them here.
125 if recursive {
126 entries = l.journal.scanEntries(filters...)
127 } else {
128 entries = l.journal.getEntries(dn, filters...)
129 }
130 if backlog != BacklogAllAvailable && len(entries) > backlog {
131 entries = entries[:backlog]
132 }
133 }
134
135 var sub *subscriber
136 if stream {
137 sub = &subscriber{
138 // TODO(q3k): make buffer size configurable
139 dataC: make(chan *LogEntry, 128),
140 doneC: make(chan struct{}),
141 filters: filters,
142 }
143 l.journal.subscribe(sub)
144 }
145
146 lr := &LogReader{}
147 lr.Backlog = make([]*LogEntry, len(entries))
148 for i, entry := range entries {
Serge Bazanski1bfa0c22020-10-14 16:45:07 +0200149 lr.Backlog[i] = &LogEntry{LeveledPayload: entry.leveled, DN: entry.origin}
Serge Bazanski5faa2fc2020-09-07 14:09:30 +0200150 }
151 if stream {
152 lr.Stream = sub.dataC
153 lr.done = sub.doneC
154 lr.missed = &sub.missed
155 }
156 return lr
157}