blob: 63a8d60da3b6f2055ec93135ccc535165793ff10 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanskiebe02592021-07-07 14:23:26 +02004package unraw
5
6import (
7 "context"
Serge Bazanski826a9e92021-10-05 21:23:48 +02008 "errors"
Serge Bazanskiebe02592021-07-07 14:23:26 +02009 "fmt"
Serge Bazanskiebe02592021-07-07 14:23:26 +020010 "os"
Serge Bazanski826a9e92021-10-05 21:23:48 +020011 "syscall"
Serge Bazanskiebe02592021-07-07 14:23:26 +020012 "testing"
13
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020014 "source.monogon.dev/osbase/logbuffer"
15 "source.monogon.dev/osbase/logtree"
16 "source.monogon.dev/osbase/supervisor"
Serge Bazanskiebe02592021-07-07 14:23:26 +020017)
18
19func testParser(l *logbuffer.Line, w LeveledWriter) {
20 w(&logtree.ExternalLeveledPayload{
21 Message: l.Data,
22 })
23}
24
25func TestNamedPipeReader(t *testing.T) {
Lorenz Brun764a2de2021-11-22 16:26:36 +010026 dir, err := os.MkdirTemp("/tmp", "metropolis-test-named-pipe-reader")
Serge Bazanskiebe02592021-07-07 14:23:26 +020027 if err != nil {
28 t.Fatalf("could not create tempdir: %v", err)
29 }
Serge Bazanski826a9e92021-10-05 21:23:48 +020030 defer os.RemoveAll(dir)
Serge Bazanskiebe02592021-07-07 14:23:26 +020031 fifoPath := dir + "/fifo"
32
33 // Start named pipe reader.
34 started := make(chan struct{})
35 stop, lt := supervisor.TestHarness(t, func(ctx context.Context) error {
36 converter := Converter{
37 Parser: testParser,
38 LeveledLogger: supervisor.Logger(ctx),
39 }
40
41 r, err := converter.NamedPipeReader(fifoPath)
42 if err != nil {
43 return fmt.Errorf("could not create pipe reader: %w", err)
44 }
45 close(started)
46 return r(ctx)
47 })
48
Serge Bazanskiebe02592021-07-07 14:23:26 +020049 <-started
50
Serge Bazanski826a9e92021-10-05 21:23:48 +020051 // Open FIFO...
52 f, err := os.OpenFile(fifoPath, os.O_WRONLY, 0)
53 if err != nil {
54 t.Fatalf("could not open fifo: %v", err)
55 }
56
Serge Bazanskiebe02592021-07-07 14:23:26 +020057 // Start reading all logs.
58 reader, err := lt.Read("root", logtree.WithChildren(), logtree.WithStream())
59 if err != nil {
60 t.Fatalf("could not get logtree reader: %v", err)
61 }
62 defer reader.Close()
63
64 // Write two lines to the fifo.
Serge Bazanskiebe02592021-07-07 14:23:26 +020065 fmt.Fprintf(f, "foo\nbar\n")
66 f.Close()
67
68 // Expect lines to end up in logtree.
69 if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "foo"; want != got {
70 t.Errorf("expected first message to be %q, got %q", want, got)
71 }
72 if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "bar"; want != got {
73 t.Errorf("expected second message to be %q, got %q", want, got)
74 }
75
Serge Bazanski826a9e92021-10-05 21:23:48 +020076 // Fully restart the entire supervisor and pipe reader, redo test, things
Serge Bazanskiebe02592021-07-07 14:23:26 +020077 // should continue to work.
78 stop()
79
Serge Bazanski826a9e92021-10-05 21:23:48 +020080 // Block until FIFO isn't being read anymore. This ensures that the
81 // NamedPipeReader actually stopped running, otherwise the following write to
82 // the fifo can race by writing to the old NamedPipeReader and making the test
83 // time out. This can also happen in production, but that will just cause us to
84 // lose piped data in the very small race window when this can happen
85 // (statistically in this test, <0.1%).
86 //
87 // The check is being done by opening the FIFO in 'non-blocking mode', which
88 // returns ENXIO immediately if the FIFO has no corresponding writer, and
89 // succeeds otherwise.
90 for {
91 ft, err := os.OpenFile(fifoPath, os.O_WRONLY|syscall.O_NONBLOCK, 0)
92 if err == nil {
93 // There's still a writer, keep trying.
94 ft.Close()
95 } else if errors.Is(err, syscall.ENXIO) {
96 // No writer, break.
97 break
98 } else {
99 // Something else?
100 t.Fatalf("OpenFile(%q): %v", fifoPath, err)
101 }
102 }
103
Serge Bazanskiebe02592021-07-07 14:23:26 +0200104 started = make(chan struct{})
105 stop, lt = supervisor.TestHarness(t, func(ctx context.Context) error {
106 converter := Converter{
107 Parser: testParser,
108 LeveledLogger: supervisor.Logger(ctx),
109 }
110
111 r, err := converter.NamedPipeReader(fifoPath)
112 if err != nil {
113 return fmt.Errorf("could not create pipe reader: %w", err)
114 }
115 close(started)
116 return r(ctx)
117 })
Tim Windelschmidt096654a2024-04-18 23:10:19 +0200118 defer stop()
Serge Bazanskiebe02592021-07-07 14:23:26 +0200119
Serge Bazanski826a9e92021-10-05 21:23:48 +0200120 <-started
121
Serge Bazanskiebe02592021-07-07 14:23:26 +0200122 // Start reading all logs.
123 reader, err = lt.Read("root", logtree.WithChildren(), logtree.WithStream())
124 if err != nil {
125 t.Fatalf("could not get logtree reader: %v", err)
126 }
127 defer reader.Close()
128
Serge Bazanskiebe02592021-07-07 14:23:26 +0200129 // Write line to the fifo.
Serge Bazanski826a9e92021-10-05 21:23:48 +0200130 f, err = os.OpenFile(fifoPath, os.O_WRONLY, 0)
Serge Bazanskiebe02592021-07-07 14:23:26 +0200131 if err != nil {
132 t.Fatalf("could not open fifo: %v", err)
133 }
134 fmt.Fprintf(f, "baz\n")
135 f.Close()
136
137 // Expect lines to end up in logtree.
138 if got, want := (<-reader.Stream).Leveled.MessagesJoined(), "baz"; want != got {
139 t.Errorf("expected first message to be %q, got %q", want, got)
140 }
141}