blob: dc9750fe1ab49ef24973f814b04231593970babe [file] [log] [blame]
Serge Bazanski5faa2fc2020-09-07 14:09:30 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package logtree
18
19import (
20 "sync/atomic"
21)
22
23// subscriber is an observer for new entries that are appended to the journal.
24type subscriber struct {
25 // filters that entries need to pass through in order to be sent to the subscriber.
26 filters []filter
Serge Bazanski216fe7b2021-05-21 18:36:16 +020027 // dataC is the channel to which entries that pass filters will be sent. The
28 // channel must be drained regularly in order to prevent accumulation of goroutines
29 // and possible reordering of messages.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020030 dataC chan *LogEntry
Serge Bazanski216fe7b2021-05-21 18:36:16 +020031 // doneC is a channel that is closed once the subscriber wishes to stop receiving
32 // notifications.
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020033 doneC chan struct{}
Serge Bazanski216fe7b2021-05-21 18:36:16 +020034 // missed is the amount of messages missed by the subscriber by not receiving from
35 // dataC fast enough
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020036 missed uint64
37}
38
39// subscribe attaches a subscriber to the journal.
40// mu must be taken in W mode
41func (j *journal) subscribe(sub *subscriber) {
42 j.subscribers = append(j.subscribers, sub)
43}
44
45// notify sends an entry to all subscribers that wish to receive it.
46func (j *journal) notify(e *entry) {
47 j.mu.Lock()
48 defer j.mu.Unlock()
49
50 newSub := make([]*subscriber, 0, len(j.subscribers))
51 for _, sub := range j.subscribers {
52 select {
53 case <-sub.doneC:
54 close(sub.dataC)
55 continue
56 default:
57 newSub = append(newSub, sub)
58 }
59
60 for _, filter := range sub.filters {
Serge Bazanskif68153c2020-10-26 13:54:37 +010061 if !filter(e) {
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020062 continue
63 }
64 }
65 select {
Serge Bazanskif68153c2020-10-26 13:54:37 +010066 case sub.dataC <- e.external():
Serge Bazanski5faa2fc2020-09-07 14:09:30 +020067 default:
68 atomic.AddUint64(&sub.missed, 1)
69 }
70 }
71 j.subscribers = newSub
72}