blob: 281fc59230d4c9edf58ea66cb4c8f4de45cd8082 [file] [log] [blame]
package resolver
import (
"errors"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
)
// clientWatcher is a subordinate structure to a given ClusterResolver,
// updating a gRPC ClientConn with information about current endpoints.
type clientWatcher struct {
resolver *Resolver
clientConn resolver.ClientConn
subscription *responseSubscribe
}
var (
// ResolverClosed will be returned by the resolver to gRPC machinery whenever a
// resolver cannot be used anymore because it was Closed.
ResolverClosed = errors.New("cluster resolver closed")
)
// Build is called by gRPC on each Dial call. It spawns a new clientWatcher,
// whose goroutine receives information about currently available nodes from the
// parent ClusterResolver and actually updates a given gRPC client connection
// with information about the current set of nodes.
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// We can only connect to "metropolis://control".
if target.Scheme != "metropolis" || target.Authority != "" || target.Endpoint != "control" {
return nil, fmt.Errorf("invalid target: must be %s, is: %s", MetropolisControlAddress, target.Endpoint)
}
if opts.DialCreds == nil {
return nil, fmt.Errorf("can only be used with clients containing TransportCredentials")
}
// Submit the dial options to the resolver's processor, quitting if the resolver
// gets canceled in the meantime.
options := []grpc.DialOption{
grpc.WithTransportCredentials(opts.DialCreds),
grpc.WithContextDialer(opts.Dialer),
}
select {
case <-r.ctx.Done():
return nil, ResolverClosed
case r.reqC <- &request{
ds: &requestDialOptionsSet{
options: options,
},
}:
}
// Submit a subscription request to the resolver's processor, quitting if the
// resolver gets canceled in the meantime.
req := &request{
sub: &requestSubscribe{resC: make(chan *responseSubscribe)},
}
select {
case <-r.ctx.Done():
return nil, ResolverClosed
case r.reqC <- req:
}
// This receive is uninterruptible by contract - as it's also uninterruptible on
// the processor side.
subscription := <-req.sub.resC
watcher := &clientWatcher{
resolver: r,
clientConn: cc,
subscription: subscription,
}
go watcher.watch()
return watcher, nil
}
func (r *Resolver) Scheme() string {
return "metropolis"
}
func (w *clientWatcher) watch() {
// Craft a trivial gRPC service config which forces round-robin behaviour. This
// doesn't really matter for us, as we only ever submit the single leader as a
// connection endpoint.
svcConfig := w.clientConn.ParseServiceConfig(`{ "loadBalancingConfig": [{"round_robin": {}}]}`)
// Watch for leader to be updated.
for {
update := <-w.subscription.subC
if update == nil {
// A nil result means the channel is closed, which means this watcher has either
// closed or the resolver has been canceled. Abort loop.
w.clientConn.ReportError(ResolverClosed)
break
}
w.clientConn.UpdateState(resolver.State{
Addresses: []resolver.Address{
{
Addr: update.endpoint.endpoint,
ServerName: update.nodeID,
},
},
ServiceConfig: svcConfig,
})
}
}
func (r *clientWatcher) ResolveNow(_ resolver.ResolveNowOptions) {
// No-op. The clientWatcher's watcher runs as fast as possible.
}
func (r *clientWatcher) Close() {
r.resolver.reqC <- &request{
unsub: &requestUnsubscribe{
id: r.subscription.id,
},
}
}