| package tinylb |
| |
| import ( |
| "bufio" |
| "fmt" |
| "io" |
| "net" |
| "strings" |
| "testing" |
| "time" |
| |
| "source.monogon.dev/metropolis/pkg/event/memory" |
| "source.monogon.dev/metropolis/pkg/supervisor" |
| ) |
| |
| func TestLoadbalancer(t *testing.T) { |
| v := memory.Value[BackendSet]{} |
| set := BackendSet{} |
| v.Set(set.Clone()) |
| |
| ln, err := net.Listen("tcp", ":0") |
| if err != nil { |
| t.Fatalf("Listen failed: %v", err) |
| } |
| s := Server{ |
| Provider: &v, |
| Listener: ln, |
| } |
| supervisor.TestHarness(t, s.Run) |
| |
| connect := func() net.Conn { |
| conn, err := net.Dial("tcp", ln.Addr().String()) |
| if err != nil { |
| t.Fatalf("Connection failed: %v", err) |
| } |
| return conn |
| } |
| |
| c := connect() |
| buf := make([]byte, 128) |
| if _, err := c.Read(buf); err == nil { |
| t.Fatalf("Expected error on read (no backends yet)") |
| } |
| |
| // Now add a backend and expect it to be served. |
| makeBackend := func(hello string) net.Listener { |
| aln, err := net.Listen("tcp", ":0") |
| if err != nil { |
| t.Fatalf("Failed to make backend listener: %v", err) |
| } |
| // Start backend. |
| go func() { |
| for { |
| c, err := aln.Accept() |
| if err != nil { |
| return |
| } |
| // For each connection, keep writing 'hello' over and over, newline-separated. |
| go func() { |
| defer c.Close() |
| for { |
| if _, err := fmt.Fprintf(c, "%s\n", hello); err != nil { |
| return |
| } |
| time.Sleep(100 * time.Millisecond) |
| } |
| }() |
| } |
| }() |
| addr := aln.Addr().(*net.TCPAddr) |
| set.Insert(hello, &SimpleTCPBackend{Remote: addr.AddrPort().String()}) |
| v.Set(set.Clone()) |
| return aln |
| } |
| |
| as1 := makeBackend("a") |
| defer as1.Close() |
| |
| for { |
| c = connect() |
| _, err := c.Read(buf) |
| c.Close() |
| if err == nil { |
| break |
| } |
| } |
| |
| measure := func() map[string]int { |
| res := make(map[string]int) |
| for { |
| count := 0 |
| for _, v := range res { |
| count += v |
| } |
| if count >= 20 { |
| return res |
| } |
| |
| c := connect() |
| b := bufio.NewScanner(c) |
| if !b.Scan() { |
| err := b.Err() |
| if err == nil { |
| err = io.EOF |
| } |
| t.Fatalf("Scan failed: %v", err) |
| } |
| v := b.Text() |
| res[v]++ |
| c.Close() |
| } |
| } |
| |
| m := measure() |
| if m["a"] < 20 { |
| t.Errorf("Expected only one backend, got: %v", m) |
| } |
| |
| as2 := makeBackend("b") |
| defer as2.Close() |
| |
| as3 := makeBackend("c") |
| defer as3.Close() |
| |
| as4 := makeBackend("d") |
| defer as4.Close() |
| |
| m = measure() |
| for _, id := range []string{"a", "b", "c", "d"} { |
| if want, got := 4, m[id]; got < want { |
| t.Errorf("Expected at least %d responses from %s, got %d", want, id, got) |
| } |
| } |
| |
| // Test killing backend connections on backend removal. |
| // Open a bunch of connections to 'a'. |
| var conns []*bufio.Scanner |
| for len(conns) < 5 { |
| c := connect() |
| b := bufio.NewScanner(c) |
| b.Scan() |
| if b.Text() != "a" { |
| c.Close() |
| } else { |
| conns = append(conns, b) |
| } |
| } |
| |
| // Now remove the 'a' backend. |
| set.Delete("a") |
| v.Set(set.Clone()) |
| // All open connections should now get killed. |
| for _, b := range conns { |
| start := time.Now().Add(time.Second) |
| for b.Scan() { |
| if time.Now().After(start) { |
| t.Errorf("Connection still alive") |
| break |
| } |
| } |
| } |
| } |
| |
| func BenchmarkLB(b *testing.B) { |
| v := memory.Value[BackendSet]{} |
| set := BackendSet{} |
| v.Set(set.Clone()) |
| |
| ln, err := net.Listen("tcp", ":0") |
| if err != nil { |
| b.Fatalf("Listen failed: %v", err) |
| } |
| s := Server{ |
| Provider: &v, |
| Listener: ln, |
| } |
| supervisor.TestHarness(b, s.Run) |
| |
| makeBackend := func(hello string) net.Listener { |
| aln, err := net.Listen("tcp", ":0") |
| if err != nil { |
| b.Fatalf("Failed to make backend listener: %v", err) |
| } |
| // Start backend. |
| go func() { |
| for { |
| c, err := aln.Accept() |
| if err != nil { |
| return |
| } |
| go func() { |
| fmt.Fprintf(c, "%s\n", hello) |
| c.Close() |
| }() |
| } |
| }() |
| addr := aln.Addr().(*net.TCPAddr) |
| set.Insert(hello, &SimpleTCPBackend{Remote: addr.AddrPort().String()}) |
| v.Set(set.Clone()) |
| return aln |
| } |
| var backends []net.Listener |
| for i := 0; i < 10; i++ { |
| b := makeBackend(fmt.Sprintf("backend%d", i)) |
| backends = append(backends, b) |
| } |
| |
| defer func() { |
| for _, b := range backends { |
| b.Close() |
| } |
| }() |
| |
| b.ResetTimer() |
| b.RunParallel(func(pb *testing.PB) { |
| for pb.Next() { |
| conn, err := net.Dial("tcp", ln.Addr().String()) |
| if err != nil { |
| b.Fatalf("Connection failed: %v", err) |
| } |
| buf := bufio.NewScanner(conn) |
| buf.Scan() |
| if !strings.HasPrefix(buf.Text(), "backend") { |
| b.Fatalf("Invalid backend response: %q", buf.Text()) |
| } |
| conn.Close() |
| } |
| }) |
| b.StopTimer() |
| } |