Serge Bazanski | c64ba1e | 2023-03-15 19:15:13 +0100 | [diff] [blame^] | 1 | // Package tinylb implements a small and simple userland round-robin load |
| 2 | // balancer, mostly for TCP connections. However, it is entirely |
| 3 | // protocol-agnostic, and only expects net.Listener and net.Conn objects. |
| 4 | // |
| 5 | // Apart from the simple act of load-balancing across a set of backends, tinylb |
| 6 | // also automatically and immediately closes all open connections to backend |
| 7 | // targets that have been removed from the set. This is perhaps not the ideal |
| 8 | // behaviour for user-facing services, but it's the sort of behaviour that works |
| 9 | // very well for machine-to-machine communication. |
| 10 | package tinylb |
| 11 | |
| 12 | import ( |
| 13 | "context" |
| 14 | "io" |
| 15 | "net" |
| 16 | "sync" |
| 17 | |
| 18 | "source.monogon.dev/go/types/mapsets" |
| 19 | "source.monogon.dev/metropolis/pkg/event" |
| 20 | "source.monogon.dev/metropolis/pkg/supervisor" |
| 21 | ) |
| 22 | |
| 23 | // Backend is to be implemented by different kinds of loadbalancing backends, eg. |
| 24 | // one per network protocol. |
| 25 | type Backend interface { |
| 26 | // TargetName returns the 'target name' of the backend, which is _not_ the same |
| 27 | // as its key in the BackendSet. Instead, the TargetName should uniquely identify |
| 28 | // some backend address, and will be used to figure out that while a backend |
| 29 | // might still exist, its target address has changed - and thus, all existing |
| 30 | // connections to the previous target address should be closed. |
| 31 | // |
| 32 | // For simple load balancing backends, this could be the connection string used |
| 33 | // to connect to the backend. |
| 34 | TargetName() string |
| 35 | // Dial returns a new connection to this backend. |
| 36 | Dial() (net.Conn, error) |
| 37 | } |
| 38 | |
| 39 | // BackendSet is the main structure used to provide the current set of backends |
| 40 | // that should be targeted by tinylb. The key is some unique backend identifier. |
| 41 | type BackendSet = mapsets.OrderedMap[string, Backend] |
| 42 | |
| 43 | // SimpleTCPBackend implements Backend for trivial TCP-based backends. |
| 44 | type SimpleTCPBackend struct { |
| 45 | Remote string |
| 46 | } |
| 47 | |
| 48 | func (t *SimpleTCPBackend) TargetName() string { |
| 49 | return t.Remote |
| 50 | } |
| 51 | |
| 52 | func (t *SimpleTCPBackend) Dial() (net.Conn, error) { |
| 53 | return net.Dial("tcp", t.Remote) |
| 54 | } |
| 55 | |
| 56 | // Server is a tiny round-robin loadbalancer for net.Listener/net.Conn compatible |
| 57 | // protocols. |
| 58 | // |
| 59 | // All fields must be set before the loadbalancer can be run. |
| 60 | type Server struct { |
| 61 | // Provider is some event Value which provides the current BackendSet for the |
| 62 | // loadbalancer to use. As the BackendSet is updated, the internal loadbalancing |
| 63 | // algorithm will adjust to the updated set, and any connections to backend |
| 64 | // TargetNames that are not present in the set anymore will be closed. |
| 65 | Provider event.Value[BackendSet] |
| 66 | // Listener is where the loadbalancer will listen on. After the loadbalancer |
| 67 | // exits, this listener will be closed. |
| 68 | Listener net.Listener |
| 69 | } |
| 70 | |
| 71 | // Run the loadbalancer in a superervisor.Runnable and block until canceled. |
| 72 | // Because the Server's Listener will be closed after exit, it should be opened |
| 73 | // in the same runnable as this function is then started. |
| 74 | func (s *Server) Run(ctx context.Context) error { |
| 75 | // Connection pool used to track connections/backends. |
| 76 | pool := newConnectionPool() |
| 77 | |
| 78 | // Current backend set and its lock. |
| 79 | var curSetMu sync.RWMutex |
| 80 | var curSet BackendSet |
| 81 | |
| 82 | // Close listener on exit. |
| 83 | go func() { |
| 84 | <-ctx.Done() |
| 85 | s.Listener.Close() |
| 86 | }() |
| 87 | |
| 88 | // The acceptor is runs the main Accept() loop on the given Listener. |
| 89 | err := supervisor.Run(ctx, "acceptor", func(ctx context.Context) error { |
| 90 | // This doesn't need a lock, as it doesn't read any fields of curSet. |
| 91 | it := curSet.Cycle() |
| 92 | |
| 93 | supervisor.Signal(ctx, supervisor.SignalHealthy) |
| 94 | |
| 95 | for { |
| 96 | if ctx.Err() != nil { |
| 97 | return ctx.Err() |
| 98 | } |
| 99 | conn, err := s.Listener.Accept() |
| 100 | if err != nil { |
| 101 | return err |
| 102 | } |
| 103 | |
| 104 | // Get next backend. |
| 105 | curSetMu.RLock() |
| 106 | id, backend, ok := it.Next() |
| 107 | curSetMu.RUnlock() |
| 108 | |
| 109 | if !ok { |
| 110 | supervisor.Logger(ctx).Warningf("Balancing %s: failed due to empty backend set", conn.RemoteAddr().String()) |
| 111 | conn.Close() |
| 112 | continue |
| 113 | } |
| 114 | // Dial backend. |
| 115 | r, err := backend.Dial() |
| 116 | if err != nil { |
| 117 | supervisor.Logger(ctx).Warningf("Balancing %s: failed due to backend %s error: %v", conn.RemoteAddr(), id, err) |
| 118 | conn.Close() |
| 119 | continue |
| 120 | } |
| 121 | // Insert connection/target name into connectionPool. |
| 122 | target := backend.TargetName() |
| 123 | cid := pool.Insert(target, r) |
| 124 | |
| 125 | // Pipe data. Close both connections on any side failing. |
| 126 | go func() { |
| 127 | defer conn.Close() |
| 128 | defer pool.CloseConn(cid) |
| 129 | io.Copy(r, conn) |
| 130 | }() |
| 131 | go func() { |
| 132 | defer conn.Close() |
| 133 | defer pool.CloseConn(cid) |
| 134 | io.Copy(conn, r) |
| 135 | }() |
| 136 | } |
| 137 | }) |
| 138 | if err != nil { |
| 139 | return err |
| 140 | } |
| 141 | |
| 142 | supervisor.Signal(ctx, supervisor.SignalHealthy) |
| 143 | |
| 144 | // Update curSet from Provider. |
| 145 | w := s.Provider.Watch() |
| 146 | defer w.Close() |
| 147 | for { |
| 148 | set, err := w.Get(ctx) |
| 149 | if err != nil { |
| 150 | return err |
| 151 | } |
| 152 | |
| 153 | // Did we lose a backend? If so, kill all connections going through it. |
| 154 | |
| 155 | // First, gather a map from TargetName to backend ID for the current set. |
| 156 | curTargets := make(map[string]string) |
| 157 | curSetMu.Lock() |
| 158 | for _, kv := range curSet.Values() { |
| 159 | curTargets[kv.Value.TargetName()] = kv.Key |
| 160 | } |
| 161 | curSetMu.Unlock() |
| 162 | |
| 163 | // Then, gather it for the new set. |
| 164 | targets := make(map[string]string) |
| 165 | for _, kv := range set.Values() { |
| 166 | targets[kv.Value.TargetName()] = kv.Key |
| 167 | } |
| 168 | |
| 169 | // Then, if we have any target name in the connection pool that's not in the new |
| 170 | // set, close all of its connections. |
| 171 | for _, target := range pool.Targets() { |
| 172 | if _, ok := targets[target]; ok { |
| 173 | continue |
| 174 | } |
| 175 | // Use curTargets just for displaying the name of the backend that has changed. |
| 176 | supervisor.Logger(ctx).Infof("Backend %s / target %s removed, killing connections", curTargets[target], target) |
| 177 | pool.CloseTarget(target) |
| 178 | } |
| 179 | |
| 180 | // Tell about new backend set and actually replace it. |
| 181 | supervisor.Logger(ctx).Infof("New backend set (%d backends):", len(set.Keys())) |
| 182 | for _, kv := range set.Values() { |
| 183 | supervisor.Logger(ctx).Infof(" - %s, target %s", kv.Key, kv.Value.TargetName()) |
| 184 | } |
| 185 | curSetMu.Lock() |
| 186 | curSet.Replace(&set) |
| 187 | curSetMu.Unlock() |
| 188 | } |
| 189 | } |