| package unraw |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "os" |
| "syscall" |
| "testing" |
| |
| "source.monogon.dev/metropolis/pkg/logbuffer" |
| "source.monogon.dev/metropolis/pkg/logtree" |
| "source.monogon.dev/metropolis/pkg/supervisor" |
| ) |
| |
| func testParser(l *logbuffer.Line, w LeveledWriter) { |
| w(&logtree.ExternalLeveledPayload{ |
| Message: l.Data, |
| }) |
| } |
| |
| func TestNamedPipeReader(t *testing.T) { |
| dir, err := os.MkdirTemp("/tmp", "metropolis-test-named-pipe-reader") |
| if err != nil { |
| t.Fatalf("could not create tempdir: %v", err) |
| } |
| defer os.RemoveAll(dir) |
| fifoPath := dir + "/fifo" |
| |
| // Start named pipe reader. |
| started := make(chan struct{}) |
| stop, lt := supervisor.TestHarness(t, func(ctx context.Context) error { |
| converter := Converter{ |
| Parser: testParser, |
| LeveledLogger: supervisor.Logger(ctx), |
| } |
| |
| r, err := converter.NamedPipeReader(fifoPath) |
| if err != nil { |
| return fmt.Errorf("could not create pipe reader: %w", err) |
| } |
| close(started) |
| return r(ctx) |
| }) |
| |
| <-started |
| |
| // Open FIFO... |
| f, err := os.OpenFile(fifoPath, os.O_WRONLY, 0) |
| if err != nil { |
| t.Fatalf("could not open fifo: %v", err) |
| } |
| |
| // Start reading all logs. |
| reader, err := lt.Read("root", logtree.WithChildren(), logtree.WithStream()) |
| if err != nil { |
| t.Fatalf("could not get logtree reader: %v", err) |
| } |
| defer reader.Close() |
| |
| // Write two lines to the fifo. |
| fmt.Fprintf(f, "foo\nbar\n") |
| f.Close() |
| |
| // Expect lines to end up in logtree. |
| if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "foo"; want != got { |
| t.Errorf("expected first message to be %q, got %q", want, got) |
| } |
| if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "bar"; want != got { |
| t.Errorf("expected second message to be %q, got %q", want, got) |
| } |
| |
| // Fully restart the entire supervisor and pipe reader, redo test, things |
| // should continue to work. |
| stop() |
| |
| // Block until FIFO isn't being read anymore. This ensures that the |
| // NamedPipeReader actually stopped running, otherwise the following write to |
| // the fifo can race by writing to the old NamedPipeReader and making the test |
| // time out. This can also happen in production, but that will just cause us to |
| // lose piped data in the very small race window when this can happen |
| // (statistically in this test, <0.1%). |
| // |
| // The check is being done by opening the FIFO in 'non-blocking mode', which |
| // returns ENXIO immediately if the FIFO has no corresponding writer, and |
| // succeeds otherwise. |
| for { |
| ft, err := os.OpenFile(fifoPath, os.O_WRONLY|syscall.O_NONBLOCK, 0) |
| if err == nil { |
| // There's still a writer, keep trying. |
| ft.Close() |
| } else if errors.Is(err, syscall.ENXIO) { |
| // No writer, break. |
| break |
| } else { |
| // Something else? |
| t.Fatalf("OpenFile(%q): %v", fifoPath, err) |
| } |
| } |
| |
| started = make(chan struct{}) |
| stop, lt = supervisor.TestHarness(t, func(ctx context.Context) error { |
| converter := Converter{ |
| Parser: testParser, |
| LeveledLogger: supervisor.Logger(ctx), |
| } |
| |
| r, err := converter.NamedPipeReader(fifoPath) |
| if err != nil { |
| return fmt.Errorf("could not create pipe reader: %w", err) |
| } |
| close(started) |
| return r(ctx) |
| }) |
| |
| <-started |
| |
| // Start reading all logs. |
| reader, err = lt.Read("root", logtree.WithChildren(), logtree.WithStream()) |
| if err != nil { |
| t.Fatalf("could not get logtree reader: %v", err) |
| } |
| defer reader.Close() |
| |
| // Write line to the fifo. |
| f, err = os.OpenFile(fifoPath, os.O_WRONLY, 0) |
| if err != nil { |
| t.Fatalf("could not open fifo: %v", err) |
| } |
| fmt.Fprintf(f, "baz\n") |
| f.Close() |
| |
| // Expect lines to end up in logtree. |
| if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "baz"; want != got { |
| t.Errorf("expected first message to be %q, got %q", want, got) |
| } |
| } |