blob: e6c7c6219a642f55dcf69de55bbd14281c67b7de [file] [log] [blame]
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package logtree
18
19import (
20 "sync/atomic"
21)
22
23// subscriber is an observer for new entries that are appended to the journal.
24type subscriber struct {
25 // filters that entries need to pass through in order to be sent to the subscriber.
26 filters []filter
27 // dataC is the channel to which entries that pass filters will be sent. The channel must be drained regularly in
28 // order to prevent accumulation of goroutines and possible reordering of messages.
29 dataC chan *LogEntry
30 // doneC is a channel that is closed once the subscriber wishes to stop receiving notifications.
31 doneC chan struct{}
32 // missed is the amount of messages missed by the subscriber by not receiving from dataC fast enough
33 missed uint64
34}
35
36// subscribe attaches a subscriber to the journal.
37// mu must be taken in W mode
38func (j *journal) subscribe(sub *subscriber) {
39 j.subscribers = append(j.subscribers, sub)
40}
41
42// notify sends an entry to all subscribers that wish to receive it.
43func (j *journal) notify(e *entry) {
44 j.mu.Lock()
45 defer j.mu.Unlock()
46
47 newSub := make([]*subscriber, 0, len(j.subscribers))
48 for _, sub := range j.subscribers {
49 select {
50 case <-sub.doneC:
51 close(sub.dataC)
52 continue
53 default:
54 newSub = append(newSub, sub)
55 }
56
57 for _, filter := range sub.filters {
Serge Bazanskif68153c2020-10-26 13:54:37 +010058 if !filter(e) {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020059 continue
60 }
61 }
62 select {
Serge Bazanskif68153c2020-10-26 13:54:37 +010063 case sub.dataC <- e.external():
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020064 default:
65 atomic.AddUint64(&sub.missed, 1)
66 }
67 }
68 j.subscribers = newSub
69}