blob: afd1da93aee2be9a0b13cbb23eb5f29297886c2d [file] [log] [blame]
Serge Bazanskiebe02592021-07-07 14:23:26 +02001package unraw
2
3import (
4 "context"
Serge Bazanski826a9e92021-10-05 21:23:48 +02005 "errors"
Serge Bazanskiebe02592021-07-07 14:23:26 +02006 "fmt"
7 "io/ioutil"
8 "os"
Serge Bazanski826a9e92021-10-05 21:23:48 +02009 "syscall"
Serge Bazanskiebe02592021-07-07 14:23:26 +020010 "testing"
11
12 "source.monogon.dev/metropolis/pkg/logbuffer"
13 "source.monogon.dev/metropolis/pkg/logtree"
14 "source.monogon.dev/metropolis/pkg/supervisor"
15)
16
17func testParser(l *logbuffer.Line, w LeveledWriter) {
18 w(&logtree.ExternalLeveledPayload{
19 Message: l.Data,
20 })
21}
22
23func TestNamedPipeReader(t *testing.T) {
Serge Bazanski826a9e92021-10-05 21:23:48 +020024 dir, err := ioutil.TempDir("/tmp", "metropolis-test-named-pipe-reader")
Serge Bazanskiebe02592021-07-07 14:23:26 +020025 if err != nil {
26 t.Fatalf("could not create tempdir: %v", err)
27 }
Serge Bazanski826a9e92021-10-05 21:23:48 +020028 defer os.RemoveAll(dir)
Serge Bazanskiebe02592021-07-07 14:23:26 +020029 fifoPath := dir + "/fifo"
30
31 // Start named pipe reader.
32 started := make(chan struct{})
33 stop, lt := supervisor.TestHarness(t, func(ctx context.Context) error {
34 converter := Converter{
35 Parser: testParser,
36 LeveledLogger: supervisor.Logger(ctx),
37 }
38
39 r, err := converter.NamedPipeReader(fifoPath)
40 if err != nil {
41 return fmt.Errorf("could not create pipe reader: %w", err)
42 }
43 close(started)
44 return r(ctx)
45 })
46
Serge Bazanskiebe02592021-07-07 14:23:26 +020047 <-started
48
Serge Bazanski826a9e92021-10-05 21:23:48 +020049 // Open FIFO...
50 f, err := os.OpenFile(fifoPath, os.O_WRONLY, 0)
51 if err != nil {
52 t.Fatalf("could not open fifo: %v", err)
53 }
54
Serge Bazanskiebe02592021-07-07 14:23:26 +020055 // Start reading all logs.
56 reader, err := lt.Read("root", logtree.WithChildren(), logtree.WithStream())
57 if err != nil {
58 t.Fatalf("could not get logtree reader: %v", err)
59 }
60 defer reader.Close()
61
62 // Write two lines to the fifo.
Serge Bazanskiebe02592021-07-07 14:23:26 +020063 fmt.Fprintf(f, "foo\nbar\n")
64 f.Close()
65
66 // Expect lines to end up in logtree.
67 if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "foo"; want != got {
68 t.Errorf("expected first message to be %q, got %q", want, got)
69 }
70 if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "bar"; want != got {
71 t.Errorf("expected second message to be %q, got %q", want, got)
72 }
73
Serge Bazanski826a9e92021-10-05 21:23:48 +020074 // Fully restart the entire supervisor and pipe reader, redo test, things
Serge Bazanskiebe02592021-07-07 14:23:26 +020075 // should continue to work.
76 stop()
77
Serge Bazanski826a9e92021-10-05 21:23:48 +020078 // Block until FIFO isn't being read anymore. This ensures that the
79 // NamedPipeReader actually stopped running, otherwise the following write to
80 // the fifo can race by writing to the old NamedPipeReader and making the test
81 // time out. This can also happen in production, but that will just cause us to
82 // lose piped data in the very small race window when this can happen
83 // (statistically in this test, <0.1%).
84 //
85 // The check is being done by opening the FIFO in 'non-blocking mode', which
86 // returns ENXIO immediately if the FIFO has no corresponding writer, and
87 // succeeds otherwise.
88 for {
89 ft, err := os.OpenFile(fifoPath, os.O_WRONLY|syscall.O_NONBLOCK, 0)
90 if err == nil {
91 // There's still a writer, keep trying.
92 ft.Close()
93 } else if errors.Is(err, syscall.ENXIO) {
94 // No writer, break.
95 break
96 } else {
97 // Something else?
98 t.Fatalf("OpenFile(%q): %v", fifoPath, err)
99 }
100 }
101
Serge Bazanskiebe02592021-07-07 14:23:26 +0200102 started = make(chan struct{})
103 stop, lt = supervisor.TestHarness(t, func(ctx context.Context) error {
104 converter := Converter{
105 Parser: testParser,
106 LeveledLogger: supervisor.Logger(ctx),
107 }
108
109 r, err := converter.NamedPipeReader(fifoPath)
110 if err != nil {
111 return fmt.Errorf("could not create pipe reader: %w", err)
112 }
113 close(started)
114 return r(ctx)
115 })
116
Serge Bazanski826a9e92021-10-05 21:23:48 +0200117 <-started
118
Serge Bazanskiebe02592021-07-07 14:23:26 +0200119 // Start reading all logs.
120 reader, err = lt.Read("root", logtree.WithChildren(), logtree.WithStream())
121 if err != nil {
122 t.Fatalf("could not get logtree reader: %v", err)
123 }
124 defer reader.Close()
125
Serge Bazanskiebe02592021-07-07 14:23:26 +0200126 // Write line to the fifo.
Serge Bazanski826a9e92021-10-05 21:23:48 +0200127 f, err = os.OpenFile(fifoPath, os.O_WRONLY, 0)
Serge Bazanskiebe02592021-07-07 14:23:26 +0200128 if err != nil {
129 t.Fatalf("could not open fifo: %v", err)
130 }
131 fmt.Fprintf(f, "baz\n")
132 f.Close()
133
134 // Expect lines to end up in logtree.
135 if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "baz"; want != got {
136 t.Errorf("expected first message to be %q, got %q", want, got)
137 }
138}