blob: 3ca6b364c3dd50306c91bbce0c487b22fe3328cf [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanskic64ba1e2023-03-15 19:15:13 +01004// Package tinylb implements a small and simple userland round-robin load
5// balancer, mostly for TCP connections. However, it is entirely
6// protocol-agnostic, and only expects net.Listener and net.Conn objects.
7//
8// Apart from the simple act of load-balancing across a set of backends, tinylb
9// also automatically and immediately closes all open connections to backend
10// targets that have been removed from the set. This is perhaps not the ideal
11// behaviour for user-facing services, but it's the sort of behaviour that works
12// very well for machine-to-machine communication.
13package tinylb
14
15import (
16 "context"
17 "io"
18 "net"
19 "sync"
20
21 "source.monogon.dev/go/types/mapsets"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020022 "source.monogon.dev/osbase/event"
23 "source.monogon.dev/osbase/supervisor"
Serge Bazanskic64ba1e2023-03-15 19:15:13 +010024)
25
26// Backend is to be implemented by different kinds of loadbalancing backends, eg.
27// one per network protocol.
28type Backend interface {
29 // TargetName returns the 'target name' of the backend, which is _not_ the same
30 // as its key in the BackendSet. Instead, the TargetName should uniquely identify
31 // some backend address, and will be used to figure out that while a backend
32 // might still exist, its target address has changed - and thus, all existing
33 // connections to the previous target address should be closed.
34 //
35 // For simple load balancing backends, this could be the connection string used
36 // to connect to the backend.
37 TargetName() string
38 // Dial returns a new connection to this backend.
39 Dial() (net.Conn, error)
40}
41
42// BackendSet is the main structure used to provide the current set of backends
43// that should be targeted by tinylb. The key is some unique backend identifier.
44type BackendSet = mapsets.OrderedMap[string, Backend]
45
46// SimpleTCPBackend implements Backend for trivial TCP-based backends.
47type SimpleTCPBackend struct {
48 Remote string
49}
50
51func (t *SimpleTCPBackend) TargetName() string {
52 return t.Remote
53}
54
55func (t *SimpleTCPBackend) Dial() (net.Conn, error) {
56 return net.Dial("tcp", t.Remote)
57}
58
59// Server is a tiny round-robin loadbalancer for net.Listener/net.Conn compatible
60// protocols.
61//
62// All fields must be set before the loadbalancer can be run.
63type Server struct {
64 // Provider is some event Value which provides the current BackendSet for the
65 // loadbalancer to use. As the BackendSet is updated, the internal loadbalancing
66 // algorithm will adjust to the updated set, and any connections to backend
67 // TargetNames that are not present in the set anymore will be closed.
68 Provider event.Value[BackendSet]
69 // Listener is where the loadbalancer will listen on. After the loadbalancer
70 // exits, this listener will be closed.
71 Listener net.Listener
72}
73
74// Run the loadbalancer in a superervisor.Runnable and block until canceled.
75// Because the Server's Listener will be closed after exit, it should be opened
76// in the same runnable as this function is then started.
77func (s *Server) Run(ctx context.Context) error {
78 // Connection pool used to track connections/backends.
79 pool := newConnectionPool()
80
81 // Current backend set and its lock.
82 var curSetMu sync.RWMutex
83 var curSet BackendSet
84
85 // Close listener on exit.
86 go func() {
87 <-ctx.Done()
88 s.Listener.Close()
89 }()
90
91 // The acceptor is runs the main Accept() loop on the given Listener.
92 err := supervisor.Run(ctx, "acceptor", func(ctx context.Context) error {
93 // This doesn't need a lock, as it doesn't read any fields of curSet.
94 it := curSet.Cycle()
95
96 supervisor.Signal(ctx, supervisor.SignalHealthy)
97
98 for {
99 if ctx.Err() != nil {
100 return ctx.Err()
101 }
102 conn, err := s.Listener.Accept()
103 if err != nil {
104 return err
105 }
106
107 // Get next backend.
108 curSetMu.RLock()
109 id, backend, ok := it.Next()
110 curSetMu.RUnlock()
111
112 if !ok {
113 supervisor.Logger(ctx).Warningf("Balancing %s: failed due to empty backend set", conn.RemoteAddr().String())
114 conn.Close()
115 continue
116 }
117 // Dial backend.
118 r, err := backend.Dial()
119 if err != nil {
120 supervisor.Logger(ctx).Warningf("Balancing %s: failed due to backend %s error: %v", conn.RemoteAddr(), id, err)
121 conn.Close()
122 continue
123 }
124 // Insert connection/target name into connectionPool.
125 target := backend.TargetName()
126 cid := pool.Insert(target, r)
127
128 // Pipe data. Close both connections on any side failing.
129 go func() {
130 defer conn.Close()
131 defer pool.CloseConn(cid)
132 io.Copy(r, conn)
133 }()
134 go func() {
135 defer conn.Close()
136 defer pool.CloseConn(cid)
137 io.Copy(conn, r)
138 }()
139 }
140 })
141 if err != nil {
142 return err
143 }
144
145 supervisor.Signal(ctx, supervisor.SignalHealthy)
146
147 // Update curSet from Provider.
148 w := s.Provider.Watch()
149 defer w.Close()
150 for {
151 set, err := w.Get(ctx)
152 if err != nil {
153 return err
154 }
155
156 // Did we lose a backend? If so, kill all connections going through it.
157
158 // First, gather a map from TargetName to backend ID for the current set.
159 curTargets := make(map[string]string)
160 curSetMu.Lock()
161 for _, kv := range curSet.Values() {
162 curTargets[kv.Value.TargetName()] = kv.Key
163 }
164 curSetMu.Unlock()
165
166 // Then, gather it for the new set.
167 targets := make(map[string]string)
168 for _, kv := range set.Values() {
169 targets[kv.Value.TargetName()] = kv.Key
170 }
171
172 // Then, if we have any target name in the connection pool that's not in the new
173 // set, close all of its connections.
174 for _, target := range pool.Targets() {
175 if _, ok := targets[target]; ok {
176 continue
177 }
178 // Use curTargets just for displaying the name of the backend that has changed.
179 supervisor.Logger(ctx).Infof("Backend %s / target %s removed, killing connections", curTargets[target], target)
180 pool.CloseTarget(target)
181 }
182
183 // Tell about new backend set and actually replace it.
184 supervisor.Logger(ctx).Infof("New backend set (%d backends):", len(set.Keys()))
185 for _, kv := range set.Values() {
186 supervisor.Logger(ctx).Infof(" - %s, target %s", kv.Key, kv.Value.TargetName())
187 }
188 curSetMu.Lock()
189 curSet.Replace(&set)
190 curSetMu.Unlock()
191 }
192}