blob: afd1da93aee2be9a0b13cbb23eb5f29297886c2d [file] [log] [blame]
package unraw
import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"syscall"
"testing"
"source.monogon.dev/metropolis/pkg/logbuffer"
"source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/pkg/supervisor"
)
func testParser(l *logbuffer.Line, w LeveledWriter) {
w(&logtree.ExternalLeveledPayload{
Message: l.Data,
})
}
func TestNamedPipeReader(t *testing.T) {
dir, err := ioutil.TempDir("/tmp", "metropolis-test-named-pipe-reader")
if err != nil {
t.Fatalf("could not create tempdir: %v", err)
}
defer os.RemoveAll(dir)
fifoPath := dir + "/fifo"
// Start named pipe reader.
started := make(chan struct{})
stop, lt := supervisor.TestHarness(t, func(ctx context.Context) error {
converter := Converter{
Parser: testParser,
LeveledLogger: supervisor.Logger(ctx),
}
r, err := converter.NamedPipeReader(fifoPath)
if err != nil {
return fmt.Errorf("could not create pipe reader: %w", err)
}
close(started)
return r(ctx)
})
<-started
// Open FIFO...
f, err := os.OpenFile(fifoPath, os.O_WRONLY, 0)
if err != nil {
t.Fatalf("could not open fifo: %v", err)
}
// Start reading all logs.
reader, err := lt.Read("root", logtree.WithChildren(), logtree.WithStream())
if err != nil {
t.Fatalf("could not get logtree reader: %v", err)
}
defer reader.Close()
// Write two lines to the fifo.
fmt.Fprintf(f, "foo\nbar\n")
f.Close()
// Expect lines to end up in logtree.
if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "foo"; want != got {
t.Errorf("expected first message to be %q, got %q", want, got)
}
if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "bar"; want != got {
t.Errorf("expected second message to be %q, got %q", want, got)
}
// Fully restart the entire supervisor and pipe reader, redo test, things
// should continue to work.
stop()
// Block until FIFO isn't being read anymore. This ensures that the
// NamedPipeReader actually stopped running, otherwise the following write to
// the fifo can race by writing to the old NamedPipeReader and making the test
// time out. This can also happen in production, but that will just cause us to
// lose piped data in the very small race window when this can happen
// (statistically in this test, <0.1%).
//
// The check is being done by opening the FIFO in 'non-blocking mode', which
// returns ENXIO immediately if the FIFO has no corresponding writer, and
// succeeds otherwise.
for {
ft, err := os.OpenFile(fifoPath, os.O_WRONLY|syscall.O_NONBLOCK, 0)
if err == nil {
// There's still a writer, keep trying.
ft.Close()
} else if errors.Is(err, syscall.ENXIO) {
// No writer, break.
break
} else {
// Something else?
t.Fatalf("OpenFile(%q): %v", fifoPath, err)
}
}
started = make(chan struct{})
stop, lt = supervisor.TestHarness(t, func(ctx context.Context) error {
converter := Converter{
Parser: testParser,
LeveledLogger: supervisor.Logger(ctx),
}
r, err := converter.NamedPipeReader(fifoPath)
if err != nil {
return fmt.Errorf("could not create pipe reader: %w", err)
}
close(started)
return r(ctx)
})
<-started
// Start reading all logs.
reader, err = lt.Read("root", logtree.WithChildren(), logtree.WithStream())
if err != nil {
t.Fatalf("could not get logtree reader: %v", err)
}
defer reader.Close()
// Write line to the fifo.
f, err = os.OpenFile(fifoPath, os.O_WRONLY, 0)
if err != nil {
t.Fatalf("could not open fifo: %v", err)
}
fmt.Fprintf(f, "baz\n")
f.Close()
// Expect lines to end up in logtree.
if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "baz"; want != got {
t.Errorf("expected first message to be %q, got %q", want, got)
}
}