blob: 5526feb0daba51aed01230c5ef024fdd14605133 [file] [log] [blame]
Serge Bazanskiebe02592021-07-07 14:23:26 +02001package unraw
2
3import (
4 "context"
Serge Bazanski826a9e92021-10-05 21:23:48 +02005 "errors"
Serge Bazanskiebe02592021-07-07 14:23:26 +02006 "fmt"
Serge Bazanskiebe02592021-07-07 14:23:26 +02007 "os"
Serge Bazanski826a9e92021-10-05 21:23:48 +02008 "syscall"
Serge Bazanskiebe02592021-07-07 14:23:26 +02009 "testing"
10
11 "source.monogon.dev/metropolis/pkg/logbuffer"
12 "source.monogon.dev/metropolis/pkg/logtree"
13 "source.monogon.dev/metropolis/pkg/supervisor"
14)
15
16func testParser(l *logbuffer.Line, w LeveledWriter) {
17 w(&logtree.ExternalLeveledPayload{
18 Message: l.Data,
19 })
20}
21
22func TestNamedPipeReader(t *testing.T) {
Lorenz Brun764a2de2021-11-22 16:26:36 +010023 dir, err := os.MkdirTemp("/tmp", "metropolis-test-named-pipe-reader")
Serge Bazanskiebe02592021-07-07 14:23:26 +020024 if err != nil {
25 t.Fatalf("could not create tempdir: %v", err)
26 }
Serge Bazanski826a9e92021-10-05 21:23:48 +020027 defer os.RemoveAll(dir)
Serge Bazanskiebe02592021-07-07 14:23:26 +020028 fifoPath := dir + "/fifo"
29
30 // Start named pipe reader.
31 started := make(chan struct{})
32 stop, lt := supervisor.TestHarness(t, func(ctx context.Context) error {
33 converter := Converter{
34 Parser: testParser,
35 LeveledLogger: supervisor.Logger(ctx),
36 }
37
38 r, err := converter.NamedPipeReader(fifoPath)
39 if err != nil {
40 return fmt.Errorf("could not create pipe reader: %w", err)
41 }
42 close(started)
43 return r(ctx)
44 })
45
Serge Bazanskiebe02592021-07-07 14:23:26 +020046 <-started
47
Serge Bazanski826a9e92021-10-05 21:23:48 +020048 // Open FIFO...
49 f, err := os.OpenFile(fifoPath, os.O_WRONLY, 0)
50 if err != nil {
51 t.Fatalf("could not open fifo: %v", err)
52 }
53
Serge Bazanskiebe02592021-07-07 14:23:26 +020054 // Start reading all logs.
55 reader, err := lt.Read("root", logtree.WithChildren(), logtree.WithStream())
56 if err != nil {
57 t.Fatalf("could not get logtree reader: %v", err)
58 }
59 defer reader.Close()
60
61 // Write two lines to the fifo.
Serge Bazanskiebe02592021-07-07 14:23:26 +020062 fmt.Fprintf(f, "foo\nbar\n")
63 f.Close()
64
65 // Expect lines to end up in logtree.
66 if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "foo"; want != got {
67 t.Errorf("expected first message to be %q, got %q", want, got)
68 }
69 if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "bar"; want != got {
70 t.Errorf("expected second message to be %q, got %q", want, got)
71 }
72
Serge Bazanski826a9e92021-10-05 21:23:48 +020073 // Fully restart the entire supervisor and pipe reader, redo test, things
Serge Bazanskiebe02592021-07-07 14:23:26 +020074 // should continue to work.
75 stop()
76
Serge Bazanski826a9e92021-10-05 21:23:48 +020077 // Block until FIFO isn't being read anymore. This ensures that the
78 // NamedPipeReader actually stopped running, otherwise the following write to
79 // the fifo can race by writing to the old NamedPipeReader and making the test
80 // time out. This can also happen in production, but that will just cause us to
81 // lose piped data in the very small race window when this can happen
82 // (statistically in this test, <0.1%).
83 //
84 // The check is being done by opening the FIFO in 'non-blocking mode', which
85 // returns ENXIO immediately if the FIFO has no corresponding writer, and
86 // succeeds otherwise.
87 for {
88 ft, err := os.OpenFile(fifoPath, os.O_WRONLY|syscall.O_NONBLOCK, 0)
89 if err == nil {
90 // There's still a writer, keep trying.
91 ft.Close()
92 } else if errors.Is(err, syscall.ENXIO) {
93 // No writer, break.
94 break
95 } else {
96 // Something else?
97 t.Fatalf("OpenFile(%q): %v", fifoPath, err)
98 }
99 }
100
Serge Bazanskiebe02592021-07-07 14:23:26 +0200101 started = make(chan struct{})
102 stop, lt = supervisor.TestHarness(t, func(ctx context.Context) error {
103 converter := Converter{
104 Parser: testParser,
105 LeveledLogger: supervisor.Logger(ctx),
106 }
107
108 r, err := converter.NamedPipeReader(fifoPath)
109 if err != nil {
110 return fmt.Errorf("could not create pipe reader: %w", err)
111 }
112 close(started)
113 return r(ctx)
114 })
115
Serge Bazanski826a9e92021-10-05 21:23:48 +0200116 <-started
117
Serge Bazanskiebe02592021-07-07 14:23:26 +0200118 // Start reading all logs.
119 reader, err = lt.Read("root", logtree.WithChildren(), logtree.WithStream())
120 if err != nil {
121 t.Fatalf("could not get logtree reader: %v", err)
122 }
123 defer reader.Close()
124
Serge Bazanskiebe02592021-07-07 14:23:26 +0200125 // Write line to the fifo.
Serge Bazanski826a9e92021-10-05 21:23:48 +0200126 f, err = os.OpenFile(fifoPath, os.O_WRONLY, 0)
Serge Bazanskiebe02592021-07-07 14:23:26 +0200127 if err != nil {
128 t.Fatalf("could not open fifo: %v", err)
129 }
130 fmt.Fprintf(f, "baz\n")
131 f.Close()
132
133 // Expect lines to end up in logtree.
134 if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "baz"; want != got {
135 t.Errorf("expected first message to be %q, got %q", want, got)
136 }
137}