blob: acf2dda85a2fda2633c182153d5e5d9c9a129463 [file] [log] [blame]
Serge Bazanskic64ba1e2023-03-15 19:15:13 +01001package tinylb
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "net"
8 "strings"
9 "testing"
10 "time"
11
12 "source.monogon.dev/metropolis/pkg/event/memory"
13 "source.monogon.dev/metropolis/pkg/supervisor"
14)
15
16func TestLoadbalancer(t *testing.T) {
17 v := memory.Value[BackendSet]{}
18 set := BackendSet{}
19 v.Set(set.Clone())
20
21 ln, err := net.Listen("tcp", ":0")
22 if err != nil {
23 t.Fatalf("Listen failed: %v", err)
24 }
25 s := Server{
26 Provider: &v,
27 Listener: ln,
28 }
29 supervisor.TestHarness(t, s.Run)
30
31 connect := func() net.Conn {
32 conn, err := net.Dial("tcp", ln.Addr().String())
33 if err != nil {
34 t.Fatalf("Connection failed: %v", err)
35 }
36 return conn
37 }
38
39 c := connect()
40 buf := make([]byte, 128)
41 if _, err := c.Read(buf); err == nil {
42 t.Fatalf("Expected error on read (no backends yet)")
43 }
44
45 // Now add a backend and expect it to be served.
46 makeBackend := func(hello string) net.Listener {
47 aln, err := net.Listen("tcp", ":0")
48 if err != nil {
49 t.Fatalf("Failed to make backend listener: %v", err)
50 }
51 // Start backend.
52 go func() {
53 for {
54 c, err := aln.Accept()
55 if err != nil {
56 return
57 }
58 // For each connection, keep writing 'hello' over and over, newline-separated.
59 go func() {
60 defer c.Close()
61 for {
62 if _, err := fmt.Fprintf(c, "%s\n", hello); err != nil {
63 return
64 }
65 time.Sleep(100 * time.Millisecond)
66 }
67 }()
68 }
69 }()
70 addr := aln.Addr().(*net.TCPAddr)
71 set.Insert(hello, &SimpleTCPBackend{Remote: addr.AddrPort().String()})
72 v.Set(set.Clone())
73 return aln
74 }
75
76 as1 := makeBackend("a")
77 defer as1.Close()
78
79 for {
80 c = connect()
81 _, err := c.Read(buf)
82 c.Close()
83 if err == nil {
84 break
85 }
86 }
87
88 measure := func() map[string]int {
89 res := make(map[string]int)
90 for {
91 count := 0
92 for _, v := range res {
93 count += v
94 }
95 if count >= 20 {
96 return res
97 }
98
99 c := connect()
100 b := bufio.NewScanner(c)
101 if !b.Scan() {
102 err := b.Err()
103 if err == nil {
104 err = io.EOF
105 }
106 t.Fatalf("Scan failed: %v", err)
107 }
108 v := b.Text()
109 res[v]++
110 c.Close()
111 }
112 }
113
114 m := measure()
115 if m["a"] < 20 {
116 t.Errorf("Expected only one backend, got: %v", m)
117 }
118
119 as2 := makeBackend("b")
120 defer as2.Close()
121
122 as3 := makeBackend("c")
123 defer as3.Close()
124
125 as4 := makeBackend("d")
126 defer as4.Close()
127
128 m = measure()
129 for _, id := range []string{"a", "b", "c", "d"} {
130 if want, got := 4, m[id]; got < want {
131 t.Errorf("Expected at least %d responses from %s, got %d", want, id, got)
132 }
133 }
134
135 // Test killing backend connections on backend removal.
136 // Open a bunch of connections to 'a'.
137 var conns []*bufio.Scanner
138 for len(conns) < 5 {
139 c := connect()
140 b := bufio.NewScanner(c)
141 b.Scan()
142 if b.Text() != "a" {
143 c.Close()
144 } else {
145 conns = append(conns, b)
146 }
147 }
148
149 // Now remove the 'a' backend.
150 set.Delete("a")
151 v.Set(set.Clone())
152 // All open connections should now get killed.
153 for _, b := range conns {
154 start := time.Now().Add(time.Second)
155 for b.Scan() {
156 if time.Now().After(start) {
157 t.Errorf("Connection still alive")
158 break
159 }
160 }
161 }
162}
163
164func BenchmarkLB(b *testing.B) {
165 v := memory.Value[BackendSet]{}
166 set := BackendSet{}
167 v.Set(set.Clone())
168
169 ln, err := net.Listen("tcp", ":0")
170 if err != nil {
171 b.Fatalf("Listen failed: %v", err)
172 }
173 s := Server{
174 Provider: &v,
175 Listener: ln,
176 }
177 supervisor.TestHarness(b, s.Run)
178
179 makeBackend := func(hello string) net.Listener {
180 aln, err := net.Listen("tcp", ":0")
181 if err != nil {
182 b.Fatalf("Failed to make backend listener: %v", err)
183 }
184 // Start backend.
185 go func() {
186 for {
187 c, err := aln.Accept()
188 if err != nil {
189 return
190 }
191 go func() {
192 fmt.Fprintf(c, "%s\n", hello)
193 c.Close()
194 }()
195 }
196 }()
197 addr := aln.Addr().(*net.TCPAddr)
198 set.Insert(hello, &SimpleTCPBackend{Remote: addr.AddrPort().String()})
199 v.Set(set.Clone())
200 return aln
201 }
202 var backends []net.Listener
203 for i := 0; i < 10; i++ {
204 b := makeBackend(fmt.Sprintf("backend%d", i))
205 backends = append(backends, b)
206 }
207
208 defer func() {
209 for _, b := range backends {
210 b.Close()
211 }
212 }()
213
214 b.ResetTimer()
215 b.RunParallel(func(pb *testing.PB) {
216 for pb.Next() {
217 conn, err := net.Dial("tcp", ln.Addr().String())
218 if err != nil {
219 b.Fatalf("Connection failed: %v", err)
220 }
221 buf := bufio.NewScanner(conn)
222 buf.Scan()
223 if !strings.HasPrefix(buf.Text(), "backend") {
224 b.Fatalf("Invalid backend response: %q", buf.Text())
225 }
226 conn.Close()
227 }
228 })
229 b.StopTimer()
230}