logtree: implement
This implements logtree, as per go/logtree .
Some API changes are present, the design doc will be updated to reflect
these and the rationale behind the changes.
This implementation is missing 'raw' log functionality, this will be
added in a diff on top (as the implementation is trivial, but we want
to keep this diff as simple as possible).
Test Plan: covered by tests
X-Origin-Diff: phab/D624
GitOrigin-RevId: 6d1e0fb16f47e4b0dc9a18765cecb9314bbcb441
diff --git a/core/pkg/logtree/journal_subscriber.go b/core/pkg/logtree/journal_subscriber.go
new file mode 100644
index 0000000..18b8124
--- /dev/null
+++ b/core/pkg/logtree/journal_subscriber.go
@@ -0,0 +1,69 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package logtree
+
+import (
+ "sync/atomic"
+)
+
+// subscriber is an observer for new entries that are appended to the journal.
+type subscriber struct {
+ // filters that entries need to pass through in order to be sent to the subscriber.
+ filters []filter
+ // dataC is the channel to which entries that pass filters will be sent. The channel must be drained regularly in
+ // order to prevent accumulation of goroutines and possible reordering of messages.
+ dataC chan *LogEntry
+ // doneC is a channel that is closed once the subscriber wishes to stop receiving notifications.
+ doneC chan struct{}
+ // missed is the amount of messages missed by the subscriber by not receiving from dataC fast enough
+ missed uint64
+}
+
+// subscribe attaches a subscriber to the journal.
+// mu must be taken in W mode
+func (j *journal) subscribe(sub *subscriber) {
+ j.subscribers = append(j.subscribers, sub)
+}
+
+// notify sends an entry to all subscribers that wish to receive it.
+func (j *journal) notify(e *entry) {
+ j.mu.Lock()
+ defer j.mu.Unlock()
+
+ newSub := make([]*subscriber, 0, len(j.subscribers))
+ for _, sub := range j.subscribers {
+ select {
+ case <-sub.doneC:
+ close(sub.dataC)
+ continue
+ default:
+ newSub = append(newSub, sub)
+ }
+
+ for _, filter := range sub.filters {
+ if !filter(e.origin, e.payload.severity) {
+ continue
+ }
+ }
+ select {
+ case sub.dataC <- &LogEntry{Payload: e.payload, DN: e.origin}:
+ default:
+ atomic.AddUint64(&sub.missed, 1)
+ }
+ }
+ j.subscribers = newSub
+}