go/net/tinylb: init
This implements tinylb, a tiny round-robin load balancer for
net.Conn/net.Listener protocols.
This will be used to loadbalance connections to Kubernetes apiservers
before cluster networking is available.
Change-Id: I48892e1fe03e0648df60c674e7394ca69b32932d
Reviewed-on: https://review.monogon.dev/c/monogon/+/1369
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/go/net/tinylb/tinylb_test.go b/go/net/tinylb/tinylb_test.go
new file mode 100644
index 0000000..acf2dda
--- /dev/null
+++ b/go/net/tinylb/tinylb_test.go
@@ -0,0 +1,230 @@
+package tinylb
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "net"
+ "strings"
+ "testing"
+ "time"
+
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+func TestLoadbalancer(t *testing.T) {
+ v := memory.Value[BackendSet]{}
+ set := BackendSet{}
+ v.Set(set.Clone())
+
+ ln, err := net.Listen("tcp", ":0")
+ if err != nil {
+ t.Fatalf("Listen failed: %v", err)
+ }
+ s := Server{
+ Provider: &v,
+ Listener: ln,
+ }
+ supervisor.TestHarness(t, s.Run)
+
+ connect := func() net.Conn {
+ conn, err := net.Dial("tcp", ln.Addr().String())
+ if err != nil {
+ t.Fatalf("Connection failed: %v", err)
+ }
+ return conn
+ }
+
+ c := connect()
+ buf := make([]byte, 128)
+ if _, err := c.Read(buf); err == nil {
+ t.Fatalf("Expected error on read (no backends yet)")
+ }
+
+ // Now add a backend and expect it to be served.
+ makeBackend := func(hello string) net.Listener {
+ aln, err := net.Listen("tcp", ":0")
+ if err != nil {
+ t.Fatalf("Failed to make backend listener: %v", err)
+ }
+ // Start backend.
+ go func() {
+ for {
+ c, err := aln.Accept()
+ if err != nil {
+ return
+ }
+ // For each connection, keep writing 'hello' over and over, newline-separated.
+ go func() {
+ defer c.Close()
+ for {
+ if _, err := fmt.Fprintf(c, "%s\n", hello); err != nil {
+ return
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ }()
+ }
+ }()
+ addr := aln.Addr().(*net.TCPAddr)
+ set.Insert(hello, &SimpleTCPBackend{Remote: addr.AddrPort().String()})
+ v.Set(set.Clone())
+ return aln
+ }
+
+ as1 := makeBackend("a")
+ defer as1.Close()
+
+ for {
+ c = connect()
+ _, err := c.Read(buf)
+ c.Close()
+ if err == nil {
+ break
+ }
+ }
+
+ measure := func() map[string]int {
+ res := make(map[string]int)
+ for {
+ count := 0
+ for _, v := range res {
+ count += v
+ }
+ if count >= 20 {
+ return res
+ }
+
+ c := connect()
+ b := bufio.NewScanner(c)
+ if !b.Scan() {
+ err := b.Err()
+ if err == nil {
+ err = io.EOF
+ }
+ t.Fatalf("Scan failed: %v", err)
+ }
+ v := b.Text()
+ res[v]++
+ c.Close()
+ }
+ }
+
+ m := measure()
+ if m["a"] < 20 {
+ t.Errorf("Expected only one backend, got: %v", m)
+ }
+
+ as2 := makeBackend("b")
+ defer as2.Close()
+
+ as3 := makeBackend("c")
+ defer as3.Close()
+
+ as4 := makeBackend("d")
+ defer as4.Close()
+
+ m = measure()
+ for _, id := range []string{"a", "b", "c", "d"} {
+ if want, got := 4, m[id]; got < want {
+ t.Errorf("Expected at least %d responses from %s, got %d", want, id, got)
+ }
+ }
+
+ // Test killing backend connections on backend removal.
+ // Open a bunch of connections to 'a'.
+ var conns []*bufio.Scanner
+ for len(conns) < 5 {
+ c := connect()
+ b := bufio.NewScanner(c)
+ b.Scan()
+ if b.Text() != "a" {
+ c.Close()
+ } else {
+ conns = append(conns, b)
+ }
+ }
+
+ // Now remove the 'a' backend.
+ set.Delete("a")
+ v.Set(set.Clone())
+ // All open connections should now get killed.
+ for _, b := range conns {
+ start := time.Now().Add(time.Second)
+ for b.Scan() {
+ if time.Now().After(start) {
+ t.Errorf("Connection still alive")
+ break
+ }
+ }
+ }
+}
+
+func BenchmarkLB(b *testing.B) {
+ v := memory.Value[BackendSet]{}
+ set := BackendSet{}
+ v.Set(set.Clone())
+
+ ln, err := net.Listen("tcp", ":0")
+ if err != nil {
+ b.Fatalf("Listen failed: %v", err)
+ }
+ s := Server{
+ Provider: &v,
+ Listener: ln,
+ }
+ supervisor.TestHarness(b, s.Run)
+
+ makeBackend := func(hello string) net.Listener {
+ aln, err := net.Listen("tcp", ":0")
+ if err != nil {
+ b.Fatalf("Failed to make backend listener: %v", err)
+ }
+ // Start backend.
+ go func() {
+ for {
+ c, err := aln.Accept()
+ if err != nil {
+ return
+ }
+ go func() {
+ fmt.Fprintf(c, "%s\n", hello)
+ c.Close()
+ }()
+ }
+ }()
+ addr := aln.Addr().(*net.TCPAddr)
+ set.Insert(hello, &SimpleTCPBackend{Remote: addr.AddrPort().String()})
+ v.Set(set.Clone())
+ return aln
+ }
+ var backends []net.Listener
+ for i := 0; i < 10; i++ {
+ b := makeBackend(fmt.Sprintf("backend%d", i))
+ backends = append(backends, b)
+ }
+
+ defer func() {
+ for _, b := range backends {
+ b.Close()
+ }
+ }()
+
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ conn, err := net.Dial("tcp", ln.Addr().String())
+ if err != nil {
+ b.Fatalf("Connection failed: %v", err)
+ }
+ buf := bufio.NewScanner(conn)
+ buf.Scan()
+ if !strings.HasPrefix(buf.Text(), "backend") {
+ b.Fatalf("Invalid backend response: %q", buf.Text())
+ }
+ conn.Close()
+ }
+ })
+ b.StopTimer()
+}