blob: 67a639e0b43fddb6cfcdc4834f9567dc9441a2c6 [file] [log] [blame]
Serge Bazanskic64ba1e2023-03-15 19:15:13 +01001// Package tinylb implements a small and simple userland round-robin load
2// balancer, mostly for TCP connections. However, it is entirely
3// protocol-agnostic, and only expects net.Listener and net.Conn objects.
4//
5// Apart from the simple act of load-balancing across a set of backends, tinylb
6// also automatically and immediately closes all open connections to backend
7// targets that have been removed from the set. This is perhaps not the ideal
8// behaviour for user-facing services, but it's the sort of behaviour that works
9// very well for machine-to-machine communication.
10package tinylb
11
12import (
13 "context"
14 "io"
15 "net"
16 "sync"
17
18 "source.monogon.dev/go/types/mapsets"
19 "source.monogon.dev/metropolis/pkg/event"
20 "source.monogon.dev/metropolis/pkg/supervisor"
21)
22
23// Backend is to be implemented by different kinds of loadbalancing backends, eg.
24// one per network protocol.
25type Backend interface {
26 // TargetName returns the 'target name' of the backend, which is _not_ the same
27 // as its key in the BackendSet. Instead, the TargetName should uniquely identify
28 // some backend address, and will be used to figure out that while a backend
29 // might still exist, its target address has changed - and thus, all existing
30 // connections to the previous target address should be closed.
31 //
32 // For simple load balancing backends, this could be the connection string used
33 // to connect to the backend.
34 TargetName() string
35 // Dial returns a new connection to this backend.
36 Dial() (net.Conn, error)
37}
38
39// BackendSet is the main structure used to provide the current set of backends
40// that should be targeted by tinylb. The key is some unique backend identifier.
41type BackendSet = mapsets.OrderedMap[string, Backend]
42
43// SimpleTCPBackend implements Backend for trivial TCP-based backends.
44type SimpleTCPBackend struct {
45 Remote string
46}
47
48func (t *SimpleTCPBackend) TargetName() string {
49 return t.Remote
50}
51
52func (t *SimpleTCPBackend) Dial() (net.Conn, error) {
53 return net.Dial("tcp", t.Remote)
54}
55
56// Server is a tiny round-robin loadbalancer for net.Listener/net.Conn compatible
57// protocols.
58//
59// All fields must be set before the loadbalancer can be run.
60type Server struct {
61 // Provider is some event Value which provides the current BackendSet for the
62 // loadbalancer to use. As the BackendSet is updated, the internal loadbalancing
63 // algorithm will adjust to the updated set, and any connections to backend
64 // TargetNames that are not present in the set anymore will be closed.
65 Provider event.Value[BackendSet]
66 // Listener is where the loadbalancer will listen on. After the loadbalancer
67 // exits, this listener will be closed.
68 Listener net.Listener
69}
70
71// Run the loadbalancer in a superervisor.Runnable and block until canceled.
72// Because the Server's Listener will be closed after exit, it should be opened
73// in the same runnable as this function is then started.
74func (s *Server) Run(ctx context.Context) error {
75 // Connection pool used to track connections/backends.
76 pool := newConnectionPool()
77
78 // Current backend set and its lock.
79 var curSetMu sync.RWMutex
80 var curSet BackendSet
81
82 // Close listener on exit.
83 go func() {
84 <-ctx.Done()
85 s.Listener.Close()
86 }()
87
88 // The acceptor is runs the main Accept() loop on the given Listener.
89 err := supervisor.Run(ctx, "acceptor", func(ctx context.Context) error {
90 // This doesn't need a lock, as it doesn't read any fields of curSet.
91 it := curSet.Cycle()
92
93 supervisor.Signal(ctx, supervisor.SignalHealthy)
94
95 for {
96 if ctx.Err() != nil {
97 return ctx.Err()
98 }
99 conn, err := s.Listener.Accept()
100 if err != nil {
101 return err
102 }
103
104 // Get next backend.
105 curSetMu.RLock()
106 id, backend, ok := it.Next()
107 curSetMu.RUnlock()
108
109 if !ok {
110 supervisor.Logger(ctx).Warningf("Balancing %s: failed due to empty backend set", conn.RemoteAddr().String())
111 conn.Close()
112 continue
113 }
114 // Dial backend.
115 r, err := backend.Dial()
116 if err != nil {
117 supervisor.Logger(ctx).Warningf("Balancing %s: failed due to backend %s error: %v", conn.RemoteAddr(), id, err)
118 conn.Close()
119 continue
120 }
121 // Insert connection/target name into connectionPool.
122 target := backend.TargetName()
123 cid := pool.Insert(target, r)
124
125 // Pipe data. Close both connections on any side failing.
126 go func() {
127 defer conn.Close()
128 defer pool.CloseConn(cid)
129 io.Copy(r, conn)
130 }()
131 go func() {
132 defer conn.Close()
133 defer pool.CloseConn(cid)
134 io.Copy(conn, r)
135 }()
136 }
137 })
138 if err != nil {
139 return err
140 }
141
142 supervisor.Signal(ctx, supervisor.SignalHealthy)
143
144 // Update curSet from Provider.
145 w := s.Provider.Watch()
146 defer w.Close()
147 for {
148 set, err := w.Get(ctx)
149 if err != nil {
150 return err
151 }
152
153 // Did we lose a backend? If so, kill all connections going through it.
154
155 // First, gather a map from TargetName to backend ID for the current set.
156 curTargets := make(map[string]string)
157 curSetMu.Lock()
158 for _, kv := range curSet.Values() {
159 curTargets[kv.Value.TargetName()] = kv.Key
160 }
161 curSetMu.Unlock()
162
163 // Then, gather it for the new set.
164 targets := make(map[string]string)
165 for _, kv := range set.Values() {
166 targets[kv.Value.TargetName()] = kv.Key
167 }
168
169 // Then, if we have any target name in the connection pool that's not in the new
170 // set, close all of its connections.
171 for _, target := range pool.Targets() {
172 if _, ok := targets[target]; ok {
173 continue
174 }
175 // Use curTargets just for displaying the name of the backend that has changed.
176 supervisor.Logger(ctx).Infof("Backend %s / target %s removed, killing connections", curTargets[target], target)
177 pool.CloseTarget(target)
178 }
179
180 // Tell about new backend set and actually replace it.
181 supervisor.Logger(ctx).Infof("New backend set (%d backends):", len(set.Keys()))
182 for _, kv := range set.Values() {
183 supervisor.Logger(ctx).Infof(" - %s, target %s", kv.Key, kv.Value.TargetName())
184 }
185 curSetMu.Lock()
186 curSet.Replace(&set)
187 curSetMu.Unlock()
188 }
189}