blob: 3c27272361e02b8c9e5cecbda8755712c1b5a685 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02002// SPDX-License-Identifier: Apache-2.0
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02003
4package logtree
5
6import (
7 "sync/atomic"
8)
9
10// subscriber is an observer for new entries that are appended to the journal.
11type subscriber struct {
12 // filters that entries need to pass through in order to be sent to the subscriber.
13 filters []filter
Serge Bazanski216fe7b2021-05-21 18:36:16 +020014 // dataC is the channel to which entries that pass filters will be sent. The
15 // channel must be drained regularly in order to prevent accumulation of goroutines
16 // and possible reordering of messages.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020017 dataC chan *LogEntry
Serge Bazanski216fe7b2021-05-21 18:36:16 +020018 // doneC is a channel that is closed once the subscriber wishes to stop receiving
19 // notifications.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020020 doneC chan struct{}
Serge Bazanski216fe7b2021-05-21 18:36:16 +020021 // missed is the amount of messages missed by the subscriber by not receiving from
22 // dataC fast enough
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020023 missed uint64
24}
25
26// subscribe attaches a subscriber to the journal.
27// mu must be taken in W mode
28func (j *journal) subscribe(sub *subscriber) {
29 j.subscribers = append(j.subscribers, sub)
30}
31
32// notify sends an entry to all subscribers that wish to receive it.
33func (j *journal) notify(e *entry) {
34 j.mu.Lock()
35 defer j.mu.Unlock()
36
37 newSub := make([]*subscriber, 0, len(j.subscribers))
38 for _, sub := range j.subscribers {
39 select {
40 case <-sub.doneC:
41 close(sub.dataC)
42 continue
43 default:
44 newSub = append(newSub, sub)
45 }
46
47 for _, filter := range sub.filters {
Serge Bazanskif68153c2020-10-26 13:54:37 +010048 if !filter(e) {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020049 continue
50 }
51 }
52 select {
Serge Bazanskif68153c2020-10-26 13:54:37 +010053 case sub.dataC <- e.external():
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020054 default:
55 atomic.AddUint64(&sub.missed, 1)
56 }
57 }
58 j.subscribers = newSub
59}