go/net/tinylb: init

This implements tinylb, a tiny round-robin load balancer for
net.Conn/net.Listener protocols.

This will be used to loadbalance connections to Kubernetes apiservers
before cluster networking is available.

Change-Id: I48892e1fe03e0648df60c674e7394ca69b32932d
Reviewed-on: https://review.monogon.dev/c/monogon/+/1369
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/go/net/tinylb/tinylb.go b/go/net/tinylb/tinylb.go
new file mode 100644
index 0000000..67a639e
--- /dev/null
+++ b/go/net/tinylb/tinylb.go
@@ -0,0 +1,189 @@
+// Package tinylb implements a small and simple userland round-robin load
+// balancer, mostly for TCP connections. However, it is entirely
+// protocol-agnostic, and only expects net.Listener and net.Conn objects.
+//
+// Apart from the simple act of load-balancing across a set of backends, tinylb
+// also automatically and immediately closes all open connections to backend
+// targets that have been removed from the set. This is perhaps not the ideal
+// behaviour for user-facing services, but it's the sort of behaviour that works
+// very well for machine-to-machine communication.
+package tinylb
+
+import (
+	"context"
+	"io"
+	"net"
+	"sync"
+
+	"source.monogon.dev/go/types/mapsets"
+	"source.monogon.dev/metropolis/pkg/event"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// Backend is to be implemented by different kinds of loadbalancing backends, eg.
+// one per network protocol.
+type Backend interface {
+	// TargetName returns the 'target name' of the backend, which is _not_ the same
+	// as its key in the BackendSet. Instead, the TargetName should uniquely identify
+	// some backend address, and will be used to figure out that while a backend
+	// might still exist, its target address has changed - and thus, all existing
+	// connections to the previous target address should be closed.
+	//
+	// For simple load balancing backends, this could be the connection string used
+	// to connect to the backend.
+	TargetName() string
+	// Dial returns a new connection to this backend.
+	Dial() (net.Conn, error)
+}
+
+// BackendSet is the main structure used to provide the current set of backends
+// that should be targeted by tinylb. The key is some unique backend identifier.
+type BackendSet = mapsets.OrderedMap[string, Backend]
+
+// SimpleTCPBackend implements Backend for trivial TCP-based backends.
+type SimpleTCPBackend struct {
+	Remote string
+}
+
+func (t *SimpleTCPBackend) TargetName() string {
+	return t.Remote
+}
+
+func (t *SimpleTCPBackend) Dial() (net.Conn, error) {
+	return net.Dial("tcp", t.Remote)
+}
+
+// Server is a tiny round-robin loadbalancer for net.Listener/net.Conn compatible
+// protocols.
+//
+// All fields must be set before the loadbalancer can be run.
+type Server struct {
+	// Provider is some event Value which provides the current BackendSet for the
+	// loadbalancer to use. As the BackendSet is updated, the internal loadbalancing
+	// algorithm will adjust to the updated set, and any connections to backend
+	// TargetNames that are not present in the set anymore will be closed.
+	Provider event.Value[BackendSet]
+	// Listener is where the loadbalancer will listen on. After the loadbalancer
+	// exits, this listener will be closed.
+	Listener net.Listener
+}
+
+// Run the loadbalancer in a superervisor.Runnable and block until canceled.
+// Because the Server's Listener will be closed after exit, it should be opened
+// in the same runnable as this function is then started.
+func (s *Server) Run(ctx context.Context) error {
+	// Connection pool used to track connections/backends.
+	pool := newConnectionPool()
+
+	// Current backend set and its lock.
+	var curSetMu sync.RWMutex
+	var curSet BackendSet
+
+	// Close listener on exit.
+	go func() {
+		<-ctx.Done()
+		s.Listener.Close()
+	}()
+
+	// The acceptor is runs the main Accept() loop on the given Listener.
+	err := supervisor.Run(ctx, "acceptor", func(ctx context.Context) error {
+		// This doesn't need a lock, as it doesn't read any fields of curSet.
+		it := curSet.Cycle()
+
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+		for {
+			if ctx.Err() != nil {
+				return ctx.Err()
+			}
+			conn, err := s.Listener.Accept()
+			if err != nil {
+				return err
+			}
+
+			// Get next backend.
+			curSetMu.RLock()
+			id, backend, ok := it.Next()
+			curSetMu.RUnlock()
+
+			if !ok {
+				supervisor.Logger(ctx).Warningf("Balancing %s: failed due to empty backend set", conn.RemoteAddr().String())
+				conn.Close()
+				continue
+			}
+			// Dial backend.
+			r, err := backend.Dial()
+			if err != nil {
+				supervisor.Logger(ctx).Warningf("Balancing %s: failed due to backend %s error: %v", conn.RemoteAddr(), id, err)
+				conn.Close()
+				continue
+			}
+			// Insert connection/target name into connectionPool.
+			target := backend.TargetName()
+			cid := pool.Insert(target, r)
+
+			// Pipe data. Close both connections on any side failing.
+			go func() {
+				defer conn.Close()
+				defer pool.CloseConn(cid)
+				io.Copy(r, conn)
+			}()
+			go func() {
+				defer conn.Close()
+				defer pool.CloseConn(cid)
+				io.Copy(conn, r)
+			}()
+		}
+	})
+	if err != nil {
+		return err
+	}
+
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+	// Update curSet from Provider.
+	w := s.Provider.Watch()
+	defer w.Close()
+	for {
+		set, err := w.Get(ctx)
+		if err != nil {
+			return err
+		}
+
+		// Did we lose a backend? If so, kill all connections going through it.
+
+		// First, gather a map from TargetName to backend ID for the current set.
+		curTargets := make(map[string]string)
+		curSetMu.Lock()
+		for _, kv := range curSet.Values() {
+			curTargets[kv.Value.TargetName()] = kv.Key
+		}
+		curSetMu.Unlock()
+
+		// Then, gather it for the new set.
+		targets := make(map[string]string)
+		for _, kv := range set.Values() {
+			targets[kv.Value.TargetName()] = kv.Key
+		}
+
+		// Then, if we have any target name in the connection pool that's not in the new
+		// set, close all of its connections.
+		for _, target := range pool.Targets() {
+			if _, ok := targets[target]; ok {
+				continue
+			}
+			// Use curTargets just for displaying the name of the backend that has changed.
+			supervisor.Logger(ctx).Infof("Backend %s / target %s removed, killing connections", curTargets[target], target)
+			pool.CloseTarget(target)
+		}
+
+		// Tell about new backend set and actually replace it.
+		supervisor.Logger(ctx).Infof("New backend set (%d backends):", len(set.Keys()))
+		for _, kv := range set.Values() {
+			supervisor.Logger(ctx).Infof(" - %s, target %s", kv.Key, kv.Value.TargetName())
+		}
+		curSetMu.Lock()
+		curSet.Replace(&set)
+		curSetMu.Unlock()
+	}
+}