blob: d4e9dffb6dc8f6c5403314fdb412fc1ac31c3b2b [file] [log] [blame]
Serge Bazanski33ce3bc2022-03-11 11:57:48 +01001package rpc
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8 "time"
9
10 "github.com/cenkalti/backoff/v4"
11 "google.golang.org/grpc"
12 "google.golang.org/grpc/credentials"
13 "google.golang.org/grpc/resolver"
14
15 cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
16)
17
18const (
19 MetropolisControlAddress = "metropolis:///control"
20)
21
22// ClusterResolver is a gRPC resolver Builder that can be passed to
23// grpc.WithResolvers() when dialing a gRPC endpoint.
24//
25// It's responsible for resolving the magic MetropolisControlAddress
26// (metropolis:///control) into all Metropolis nodes running control plane
27// services, ie. the Curator.
28//
29// To function, the ClusterResolver needs to be provided with at least one node
30// address. Afterwards, it will continuously update an internal list of nodes
31// which can be contacted for access to control planes services, and gRPC
32// clients using this resolver will automatically try the available addresses
33// for each RPC call in a round-robin fashion.
34//
35// The ClusterResolver is designed to be used as a long-running objects which
36// multiple gRPC client connections can use. Usually one ClusterResolver
37// instance should be used per application.
38type ClusterResolver struct {
39 ctx context.Context
40 ctxC context.CancelFunc
41
42 // logger, if set, will be called with fmt.Sprintf-like arguments containing
43 // debug logs from the running ClusterResolver, subordinate watchers and
44 // updaters.
45 logger func(f string, args ...interface{})
46
47 condCurators *sync.Cond
48 curators map[string]string
49 condTLSConfig *sync.Cond
50 tlsConfig credentials.TransportCredentials
51}
52
53// AddNode provides a given node ID at a given address as an initial (or
54// additional) node for the ClusterResolver to update cluster information
55// from.
56func (b *ClusterResolver) AddNode(name, remote string) {
57 b.condCurators.L.Lock()
58 defer b.condCurators.L.Unlock()
59
60 b.curators[name] = remote
61 b.condCurators.Broadcast()
62}
63
64// NewClusterResolver creates an empty ClusterResolver. It must be populated
65// with initial node information for any gRPC call that uses it to succeed.
66func NewClusterResolver() *ClusterResolver {
67 ctx, ctxC := context.WithCancel(context.Background())
68 b := &ClusterResolver{
69 ctx: ctx,
70 ctxC: ctxC,
71 logger: func(f string, args ...interface{}) {},
72 condCurators: sync.NewCond(&sync.Mutex{}),
73 curators: make(map[string]string),
74 condTLSConfig: sync.NewCond(&sync.Mutex{}),
75 }
76
77 go b.run(b.ctx)
78
79 return b
80}
81
82var (
83 ResolverClosed = errors.New("cluster resolver closed")
84)
85
86// Close the ClusterResolver to clean up background goroutines. The node address
87// resolution process stops and all future connections done via this
88// ClusterResolver will continue to use whatever node addresses were last known.
89// However, new attempts to dial using this ClusterResolver will fail.
90func (b *ClusterResolver) Close() {
91 b.ctxC()
92}
93
94// run is the main loop of the ClusterResolver. Its job is to wait for a TLS
95// config from a gRPC client, and iterate through available node addresses to
96// start an updater on. The updater will then communicate back to this goroutine
97// with up to date node information. In case an updater cannot run anymore (eg.
98// a node stopped working), the main loop of run restarts and another endpoint
99// will be picked.
100func (b *ClusterResolver) run(ctx context.Context) {
101 bo := backoff.NewExponentialBackOff()
102 bo.MaxElapsedTime = 0
103
104 // Helper to update internal node list and notify all gRPC clients of it.
105 updateCurators := func(nodes map[string]string) {
106 b.condCurators.L.Lock()
107 b.curators = nodes
108 b.condCurators.L.Unlock()
109 b.condCurators.Broadcast()
110 }
111
112 // Helper to sleep for a given time, but with possible interruption by the
113 // resolver being stopped.
114 waitTimeout := func(t time.Duration) bool {
115 select {
116 case <-time.After(t):
117 return true
118 case <-ctx.Done():
119 return false
120 }
121 }
122
123 for {
124 b.logger("RESOLVER: waiting for TLS config...")
125 // Wait for a TLS config to be set.
126 b.condTLSConfig.L.Lock()
127 for b.tlsConfig == nil {
128 b.condTLSConfig.Wait()
129 }
130 creds := b.tlsConfig
131 b.condTLSConfig.L.Unlock()
132 b.logger("RESOLVER: have TLS config...")
133
134 // Iterate over endpoints to find a working one, and retrieve cluster-provided
135 // node info from there.
136 endpoints := b.addresses()
137 if len(endpoints) == 0 {
138 w := bo.NextBackOff()
139 b.logger("RESOLVER: no endpoints, waiting %s...", w)
140 if waitTimeout(w) {
141 b.logger("RESOLVER: canceled")
142 return
143 }
144 continue
145 }
146
147 b.logger("RESOLVER: starting endpoint loop with %v...", endpoints)
148 for name, endpoint := range endpoints {
149 upC := make(chan map[string]string)
150 b.logger("RESOLVER: starting updater pointed at %s/%s", name, endpoint)
151
152 // Start updater, which actually connects to the endpoint and provides back the
153 // newest set of nodes via upC.
154 go b.runUpdater(ctx, endpoint, creds, upC)
155
156 // Keep using this updater as long as possible. If it fails, restart the main
157 // loop.
158 failed := false
159 for {
160 var newNodes map[string]string
161 failed := false
162 select {
163 case newNodes = <-upC:
164 if newNodes == nil {
165 // Updater quit.
166 failed = true
167 }
168 case <-ctx.Done():
169 b.logger("RESOLVER: canceled")
170 updateCurators(nil)
171 return
172 }
173
174 if failed {
175 w := bo.NextBackOff()
176 b.logger("RESOLVER: updater failed, waiting %s...", w)
177 if waitTimeout(w) {
178 b.logger("RESOLVER: canceled")
179 return
180 }
181 b.logger("RESOLVER: done waiting")
182 break
183 } else {
184 bo.Reset()
185 updateCurators(newNodes)
186 }
187
188 }
189 // Restart entire ClusterResolver loop on failure.
190 if failed {
191 break
192 }
193 }
194 }
195}
196
197// runUpdaters runs the ClusterResolver's updater, which is a goroutine that
198// connects to a Curator running on a given node and feeds back information
199// about consensus members via updateC. If the endpoints fails (eg. because the
200// node went down), updateC will be closed.
201func (b *ClusterResolver) runUpdater(ctx context.Context, endpoint string, creds credentials.TransportCredentials, updateC chan map[string]string) {
202 defer close(updateC)
203 cl, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(creds))
204 if err != nil {
205 b.logger("UPDATER: dial failed: %v", err)
206 return
207 }
208 defer cl.Close()
209 cur := cpb.NewCuratorClient(cl)
210 w, err := cur.Watch(ctx, &cpb.WatchRequest{
211 Kind: &cpb.WatchRequest_NodesInCluster_{
212 NodesInCluster: &cpb.WatchRequest_NodesInCluster{},
213 },
214 })
215 if err != nil {
216 b.logger("UPDATER: watch failed: %v", err)
217 return
218 }
219
220 // Maintain a long-term set of node ID to node external address, and populate it
221 // from the Curator Watcher above.
222 nodes := make(map[string]string)
223 for {
224 ev, err := w.Recv()
225 if err != nil {
226 b.logger("UPDATER: recv failed: %v", err)
227 return
228 }
229 for _, node := range ev.Nodes {
230 if node.Roles.ConsensusMember == nil {
231 delete(nodes, node.Id)
232 continue
233 }
234 st := node.Status
235 if st == nil || st.ExternalAddress == "" {
236 delete(nodes, node.Id)
237 continue
238 }
239 nodes[node.Id] = st.ExternalAddress
240 }
241 for _, node := range ev.NodeTombstones {
242 delete(nodes, node.NodeId)
243 }
244 b.logger("UPDATER: new nodes: %v", nodes)
245 updateC <- nodes
246 }
247}
248
249// addresses returns the current set of node addresses that the ClusterResolver
250// considers as possible updater candidates.
251func (b *ClusterResolver) addresses() map[string]string {
252 b.condCurators.L.Lock()
253 defer b.condCurators.L.Unlock()
254
255 res := make(map[string]string)
256 for k, v := range b.curators {
257 res[k] = v
258 }
259 return res
260}
261
262// Build is called by gRPC on each Dial call. It spawns a new clientWatcher,
263// whose goroutine receives information about currently available nodes from the
264// parent ClusterResolver and actually updates a given gRPC client connection
265// with information about the current set of nodes.
266func (b *ClusterResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
267 // We can only connect to "metropolis://control".
268 if target.Scheme != "metropolis" || target.Authority != "" || target.Endpoint != "control" {
269 return nil, fmt.Errorf("invalid target: must be %s, is: %s", MetropolisControlAddress, target.Endpoint)
270 }
271
272 if opts.DialCreds == nil {
273 return nil, fmt.Errorf("can only be used with clients containing TransportCredentials")
274 }
275
276 if b.ctx.Err() != nil {
277 return nil, ResolverClosed
278 }
279
280 b.condTLSConfig.L.Lock()
281 // TODO(q3k): make sure we didn't receive different DialCreds for a different
282 // cluster or something.
283 b.tlsConfig = opts.DialCreds
284 b.condTLSConfig.Broadcast()
285 defer b.condTLSConfig.L.Unlock()
286
287 ctx, ctxC := context.WithCancel(b.ctx)
288 resolver := &clientWatcher{
289 builder: b,
290 clientConn: cc,
291 ctx: ctx,
292 ctxC: ctxC,
293 }
294 go resolver.watch()
295 return resolver, nil
296}
297
298func (b *ClusterResolver) Scheme() string {
299 return "metropolis"
300}
301
302// clientWatcher is a subordinate structure to a given ClusterResolver,
303// updating a gRPC ClientConn with information about current endpoints.
304type clientWatcher struct {
305 builder *ClusterResolver
306 clientConn resolver.ClientConn
307
308 ctx context.Context
309 ctxC context.CancelFunc
310}
311
312func (r *clientWatcher) watch() {
313 // Craft a trivial gRPC service config which forces round-robin behaviour for
314 // RPCs. This makes the gRPC client contact all curators in a round-robin
315 // fashion. Ideally, we would prioritize contacting the leader, but this will do
316 // for now.
317 svcConfig := r.clientConn.ParseServiceConfig(`{ "loadBalancingConfig": [{"round_robin": {}}]}`)
318
319 // Watch for condCurators being updated.
320 r.builder.condCurators.L.Lock()
321 for {
322 if r.ctx.Err() != nil {
323 return
324 }
325
326 nodes := r.builder.curators
327 var addresses []resolver.Address
328 for n, addr := range nodes {
329 addresses = append(addresses, resolver.Address{
330 Addr: addr,
331 ServerName: n,
332 })
333 }
334 r.builder.logger("WATCHER: new addresses: %v", addresses)
335 r.clientConn.UpdateState(resolver.State{
336 Addresses: addresses,
337 ServiceConfig: svcConfig,
338 })
339 r.builder.condCurators.Wait()
340 }
341}
342
343func (r *clientWatcher) ResolveNow(_ resolver.ResolveNowOptions) {
344 // No-op. The clientWatcher's watcher runs as fast as possible.
345}
346
347func (r *clientWatcher) Close() {
348 r.ctxC()
349 // Spuriously interrupt all clientWatchers on this ClusterResolver so that this
350 // clientWatcher gets to notice it should quit. This isn't ideal.
351 r.builder.condCurators.Broadcast()
352}