m/n/core/rpc: add ClusterResolver
This is a first-pass implementation of a baseline, functioning, but not
fully featured gRPC resolver builder that connects to a given Metropolis
cluster based on just a single functioning node.
This is planned to be extended to be aware of node health, and possibly
curator leadership. It will then replace the main roleserver client and
allow metroctl to connect to a cluster given just a single node.
Change-Id: I8321a6ce19bdaead35b5f266dd9774ce1b78f075
Reviewed-on: https://review.monogon.dev/c/monogon/+/637
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/rpc/BUILD.bazel b/metropolis/node/core/rpc/BUILD.bazel
index 9b171e3..e4ff25f 100644
--- a/metropolis/node/core/rpc/BUILD.bazel
+++ b/metropolis/node/core/rpc/BUILD.bazel
@@ -6,6 +6,7 @@
"client.go",
"methodinfo.go",
"peerinfo.go",
+ "resolver.go",
"server.go",
"server_authentication.go",
"testhelpers.go",
@@ -20,10 +21,12 @@
"//metropolis/pkg/pki",
"//metropolis/proto/api",
"//metropolis/proto/ext",
+ "@com_github_cenkalti_backoff_v4//:backoff",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//peer",
+ "@org_golang_google_grpc//resolver",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//encoding/prototext",
"@org_golang_google_protobuf//proto",
@@ -35,6 +38,7 @@
go_test(
name = "rpc_test",
srcs = [
+ "resolver_test.go",
"server_authentication_test.go",
"trace_test.go",
],
@@ -43,9 +47,11 @@
"//metropolis/node/core/curator/proto/api",
"//metropolis/pkg/logtree",
"//metropolis/proto/api",
+ "//metropolis/proto/common",
"//metropolis/proto/ext",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
+ "@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//status",
"@org_golang_google_grpc//test/bufconn",
],
diff --git a/metropolis/node/core/rpc/resolver.go b/metropolis/node/core/rpc/resolver.go
new file mode 100644
index 0000000..d4e9dff
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver.go
@@ -0,0 +1,352 @@
+package rpc
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/resolver"
+
+ cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+const (
+ MetropolisControlAddress = "metropolis:///control"
+)
+
+// ClusterResolver is a gRPC resolver Builder that can be passed to
+// grpc.WithResolvers() when dialing a gRPC endpoint.
+//
+// It's responsible for resolving the magic MetropolisControlAddress
+// (metropolis:///control) into all Metropolis nodes running control plane
+// services, ie. the Curator.
+//
+// To function, the ClusterResolver needs to be provided with at least one node
+// address. Afterwards, it will continuously update an internal list of nodes
+// which can be contacted for access to control planes services, and gRPC
+// clients using this resolver will automatically try the available addresses
+// for each RPC call in a round-robin fashion.
+//
+// The ClusterResolver is designed to be used as a long-running objects which
+// multiple gRPC client connections can use. Usually one ClusterResolver
+// instance should be used per application.
+type ClusterResolver struct {
+ ctx context.Context
+ ctxC context.CancelFunc
+
+ // logger, if set, will be called with fmt.Sprintf-like arguments containing
+ // debug logs from the running ClusterResolver, subordinate watchers and
+ // updaters.
+ logger func(f string, args ...interface{})
+
+ condCurators *sync.Cond
+ curators map[string]string
+ condTLSConfig *sync.Cond
+ tlsConfig credentials.TransportCredentials
+}
+
+// AddNode provides a given node ID at a given address as an initial (or
+// additional) node for the ClusterResolver to update cluster information
+// from.
+func (b *ClusterResolver) AddNode(name, remote string) {
+ b.condCurators.L.Lock()
+ defer b.condCurators.L.Unlock()
+
+ b.curators[name] = remote
+ b.condCurators.Broadcast()
+}
+
+// NewClusterResolver creates an empty ClusterResolver. It must be populated
+// with initial node information for any gRPC call that uses it to succeed.
+func NewClusterResolver() *ClusterResolver {
+ ctx, ctxC := context.WithCancel(context.Background())
+ b := &ClusterResolver{
+ ctx: ctx,
+ ctxC: ctxC,
+ logger: func(f string, args ...interface{}) {},
+ condCurators: sync.NewCond(&sync.Mutex{}),
+ curators: make(map[string]string),
+ condTLSConfig: sync.NewCond(&sync.Mutex{}),
+ }
+
+ go b.run(b.ctx)
+
+ return b
+}
+
+var (
+ ResolverClosed = errors.New("cluster resolver closed")
+)
+
+// Close the ClusterResolver to clean up background goroutines. The node address
+// resolution process stops and all future connections done via this
+// ClusterResolver will continue to use whatever node addresses were last known.
+// However, new attempts to dial using this ClusterResolver will fail.
+func (b *ClusterResolver) Close() {
+ b.ctxC()
+}
+
+// run is the main loop of the ClusterResolver. Its job is to wait for a TLS
+// config from a gRPC client, and iterate through available node addresses to
+// start an updater on. The updater will then communicate back to this goroutine
+// with up to date node information. In case an updater cannot run anymore (eg.
+// a node stopped working), the main loop of run restarts and another endpoint
+// will be picked.
+func (b *ClusterResolver) run(ctx context.Context) {
+ bo := backoff.NewExponentialBackOff()
+ bo.MaxElapsedTime = 0
+
+ // Helper to update internal node list and notify all gRPC clients of it.
+ updateCurators := func(nodes map[string]string) {
+ b.condCurators.L.Lock()
+ b.curators = nodes
+ b.condCurators.L.Unlock()
+ b.condCurators.Broadcast()
+ }
+
+ // Helper to sleep for a given time, but with possible interruption by the
+ // resolver being stopped.
+ waitTimeout := func(t time.Duration) bool {
+ select {
+ case <-time.After(t):
+ return true
+ case <-ctx.Done():
+ return false
+ }
+ }
+
+ for {
+ b.logger("RESOLVER: waiting for TLS config...")
+ // Wait for a TLS config to be set.
+ b.condTLSConfig.L.Lock()
+ for b.tlsConfig == nil {
+ b.condTLSConfig.Wait()
+ }
+ creds := b.tlsConfig
+ b.condTLSConfig.L.Unlock()
+ b.logger("RESOLVER: have TLS config...")
+
+ // Iterate over endpoints to find a working one, and retrieve cluster-provided
+ // node info from there.
+ endpoints := b.addresses()
+ if len(endpoints) == 0 {
+ w := bo.NextBackOff()
+ b.logger("RESOLVER: no endpoints, waiting %s...", w)
+ if waitTimeout(w) {
+ b.logger("RESOLVER: canceled")
+ return
+ }
+ continue
+ }
+
+ b.logger("RESOLVER: starting endpoint loop with %v...", endpoints)
+ for name, endpoint := range endpoints {
+ upC := make(chan map[string]string)
+ b.logger("RESOLVER: starting updater pointed at %s/%s", name, endpoint)
+
+ // Start updater, which actually connects to the endpoint and provides back the
+ // newest set of nodes via upC.
+ go b.runUpdater(ctx, endpoint, creds, upC)
+
+ // Keep using this updater as long as possible. If it fails, restart the main
+ // loop.
+ failed := false
+ for {
+ var newNodes map[string]string
+ failed := false
+ select {
+ case newNodes = <-upC:
+ if newNodes == nil {
+ // Updater quit.
+ failed = true
+ }
+ case <-ctx.Done():
+ b.logger("RESOLVER: canceled")
+ updateCurators(nil)
+ return
+ }
+
+ if failed {
+ w := bo.NextBackOff()
+ b.logger("RESOLVER: updater failed, waiting %s...", w)
+ if waitTimeout(w) {
+ b.logger("RESOLVER: canceled")
+ return
+ }
+ b.logger("RESOLVER: done waiting")
+ break
+ } else {
+ bo.Reset()
+ updateCurators(newNodes)
+ }
+
+ }
+ // Restart entire ClusterResolver loop on failure.
+ if failed {
+ break
+ }
+ }
+ }
+}
+
+// runUpdaters runs the ClusterResolver's updater, which is a goroutine that
+// connects to a Curator running on a given node and feeds back information
+// about consensus members via updateC. If the endpoints fails (eg. because the
+// node went down), updateC will be closed.
+func (b *ClusterResolver) runUpdater(ctx context.Context, endpoint string, creds credentials.TransportCredentials, updateC chan map[string]string) {
+ defer close(updateC)
+ cl, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(creds))
+ if err != nil {
+ b.logger("UPDATER: dial failed: %v", err)
+ return
+ }
+ defer cl.Close()
+ cur := cpb.NewCuratorClient(cl)
+ w, err := cur.Watch(ctx, &cpb.WatchRequest{
+ Kind: &cpb.WatchRequest_NodesInCluster_{
+ NodesInCluster: &cpb.WatchRequest_NodesInCluster{},
+ },
+ })
+ if err != nil {
+ b.logger("UPDATER: watch failed: %v", err)
+ return
+ }
+
+ // Maintain a long-term set of node ID to node external address, and populate it
+ // from the Curator Watcher above.
+ nodes := make(map[string]string)
+ for {
+ ev, err := w.Recv()
+ if err != nil {
+ b.logger("UPDATER: recv failed: %v", err)
+ return
+ }
+ for _, node := range ev.Nodes {
+ if node.Roles.ConsensusMember == nil {
+ delete(nodes, node.Id)
+ continue
+ }
+ st := node.Status
+ if st == nil || st.ExternalAddress == "" {
+ delete(nodes, node.Id)
+ continue
+ }
+ nodes[node.Id] = st.ExternalAddress
+ }
+ for _, node := range ev.NodeTombstones {
+ delete(nodes, node.NodeId)
+ }
+ b.logger("UPDATER: new nodes: %v", nodes)
+ updateC <- nodes
+ }
+}
+
+// addresses returns the current set of node addresses that the ClusterResolver
+// considers as possible updater candidates.
+func (b *ClusterResolver) addresses() map[string]string {
+ b.condCurators.L.Lock()
+ defer b.condCurators.L.Unlock()
+
+ res := make(map[string]string)
+ for k, v := range b.curators {
+ res[k] = v
+ }
+ return res
+}
+
+// Build is called by gRPC on each Dial call. It spawns a new clientWatcher,
+// whose goroutine receives information about currently available nodes from the
+// parent ClusterResolver and actually updates a given gRPC client connection
+// with information about the current set of nodes.
+func (b *ClusterResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
+ // We can only connect to "metropolis://control".
+ if target.Scheme != "metropolis" || target.Authority != "" || target.Endpoint != "control" {
+ return nil, fmt.Errorf("invalid target: must be %s, is: %s", MetropolisControlAddress, target.Endpoint)
+ }
+
+ if opts.DialCreds == nil {
+ return nil, fmt.Errorf("can only be used with clients containing TransportCredentials")
+ }
+
+ if b.ctx.Err() != nil {
+ return nil, ResolverClosed
+ }
+
+ b.condTLSConfig.L.Lock()
+ // TODO(q3k): make sure we didn't receive different DialCreds for a different
+ // cluster or something.
+ b.tlsConfig = opts.DialCreds
+ b.condTLSConfig.Broadcast()
+ defer b.condTLSConfig.L.Unlock()
+
+ ctx, ctxC := context.WithCancel(b.ctx)
+ resolver := &clientWatcher{
+ builder: b,
+ clientConn: cc,
+ ctx: ctx,
+ ctxC: ctxC,
+ }
+ go resolver.watch()
+ return resolver, nil
+}
+
+func (b *ClusterResolver) Scheme() string {
+ return "metropolis"
+}
+
+// clientWatcher is a subordinate structure to a given ClusterResolver,
+// updating a gRPC ClientConn with information about current endpoints.
+type clientWatcher struct {
+ builder *ClusterResolver
+ clientConn resolver.ClientConn
+
+ ctx context.Context
+ ctxC context.CancelFunc
+}
+
+func (r *clientWatcher) watch() {
+ // Craft a trivial gRPC service config which forces round-robin behaviour for
+ // RPCs. This makes the gRPC client contact all curators in a round-robin
+ // fashion. Ideally, we would prioritize contacting the leader, but this will do
+ // for now.
+ svcConfig := r.clientConn.ParseServiceConfig(`{ "loadBalancingConfig": [{"round_robin": {}}]}`)
+
+ // Watch for condCurators being updated.
+ r.builder.condCurators.L.Lock()
+ for {
+ if r.ctx.Err() != nil {
+ return
+ }
+
+ nodes := r.builder.curators
+ var addresses []resolver.Address
+ for n, addr := range nodes {
+ addresses = append(addresses, resolver.Address{
+ Addr: addr,
+ ServerName: n,
+ })
+ }
+ r.builder.logger("WATCHER: new addresses: %v", addresses)
+ r.clientConn.UpdateState(resolver.State{
+ Addresses: addresses,
+ ServiceConfig: svcConfig,
+ })
+ r.builder.condCurators.Wait()
+ }
+}
+
+func (r *clientWatcher) ResolveNow(_ resolver.ResolveNowOptions) {
+ // No-op. The clientWatcher's watcher runs as fast as possible.
+}
+
+func (r *clientWatcher) Close() {
+ r.ctxC()
+ // Spuriously interrupt all clientWatchers on this ClusterResolver so that this
+ // clientWatcher gets to notice it should quit. This isn't ideal.
+ r.builder.condCurators.Broadcast()
+}
diff --git a/metropolis/node/core/rpc/resolver_test.go b/metropolis/node/core/rpc/resolver_test.go
new file mode 100644
index 0000000..5acca74
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver_test.go
@@ -0,0 +1,169 @@
+package rpc
+
+import (
+ "context"
+ "crypto/tls"
+ "log"
+ "net"
+ "strings"
+ "testing"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ apb "source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+type testImplementationClusterAware struct {
+ ipb.UnimplementedCuratorServer
+ apb.UnimplementedAAAServer
+ apb.UnimplementedManagementServer
+
+ addresses map[string]string
+}
+
+func (t *testImplementationClusterAware) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (*apb.GetClusterInfoResponse, error) {
+ return &apb.GetClusterInfoResponse{}, nil
+}
+
+func (t *testImplementationClusterAware) Watch(_ *ipb.WatchRequest, srv ipb.Curator_WatchServer) error {
+ var nodes []*ipb.Node
+ for name, addr := range t.addresses {
+ nodes = append(nodes, &ipb.Node{
+ Id: name,
+ Roles: &cpb.NodeRoles{ConsensusMember: &cpb.NodeRoles_ConsensusMember{}},
+ Status: &cpb.NodeStatus{
+ ExternalAddress: addr,
+ },
+ })
+ }
+ err := srv.Send(&ipb.WatchEvent{
+ Nodes: nodes,
+ })
+ if err != nil {
+ return err
+ }
+ <-srv.Context().Done()
+ return srv.Context().Err()
+}
+
+// TestResolverSimple exercises the happy path of the gRPC ResolverBuilder,
+// checking that a single node can be used to bootstrap multiple nodes from, and
+// ensuring that nodes are being dialed in a round-robin fashion.
+//
+// TODO(q3k): exercise node removal and re-dial of updater to another node.
+func TestResolverSimple(t *testing.T) {
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ // Make three nodes for testing, each with its own bufconn listener.
+ numCurators := 3
+ eph := NewEphemeralClusterCredentials(t, numCurators)
+
+ listeners := make([]net.Listener, numCurators)
+ for i := 0; i < numCurators; i++ {
+ lis, err := net.Listen("tcp", "")
+ if err != nil {
+ t.Fatalf("Listen failed: %v", err)
+ }
+ listeners[i] = lis
+ }
+
+ addresses := make(map[string]string)
+ for i, lis := range listeners {
+ name := eph.Nodes[i].ID()
+ addresses[name] = lis.Addr().String()
+ }
+ impls := make([]*testImplementationClusterAware, numCurators)
+ for i := 0; i < numCurators; i++ {
+ impls[i] = &testImplementationClusterAware{
+ addresses: addresses,
+ }
+ }
+
+ servers := make([]*grpc.Server, numCurators)
+ for i := 0; i < numCurators; i++ {
+ i := i
+ ss := ServerSecurity{
+ NodeCredentials: eph.Nodes[i],
+ }
+ servers[i] = ss.SetupExternalGRPC(nil, impls[i])
+ go func() {
+ if err := servers[i].Serve(listeners[i]); err != nil {
+ t.Fatalf("GRPC serve failed: %v", err)
+ }
+ }()
+
+ defer listeners[i].Close()
+ defer servers[i].Stop()
+ }
+
+ r := NewClusterResolver()
+ r.logger = func(f string, args ...interface{}) {
+ log.Printf(f, args...)
+ }
+ defer r.Close()
+
+ creds := credentials.NewTLS(&tls.Config{
+ Certificates: []tls.Certificate{eph.Manager},
+ InsecureSkipVerify: true,
+ VerifyPeerCertificate: verifyClusterCertificate(eph.CA),
+ })
+ cl, err := grpc.Dial("metropolis:///control", grpc.WithTransportCredentials(creds), grpc.WithResolvers(r))
+ if err != nil {
+ t.Fatalf("Could not dial: %v", err)
+ }
+
+ // Add first node to bootstrap node information from.
+ r.AddNode(eph.Nodes[0].ID(), listeners[0].Addr().String())
+
+ mgmt := apb.NewManagementClient(cl)
+ _, err = mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+ if err != nil {
+ t.Fatalf("Running initial GetClusterInfo failed: %v", err)
+ }
+
+ // Wait until client finds all three nodes.
+ r.condCurators.L.Lock()
+ for len(r.curators) < 3 {
+ r.condCurators.Wait()
+ }
+ curators := r.curators
+ r.condCurators.L.Unlock()
+
+ // Ensure the three nodes as are expected.
+ for i, node := range eph.Nodes {
+ if got, want := curators[node.ID()], listeners[i].Addr().String(); want != got {
+ t.Errorf("Node %s: wanted address %q, got %q", node.ID(), want, got)
+ }
+ }
+
+ // Stop first node, make sure the call now reaches the other servers. This will
+ // happen due to the resolver's round-robin behaviour, not because this node is
+ // dropped from the active set of nodes.
+ servers[0].Stop()
+ listeners[0].Close()
+
+ for i := 0; i < 10; i++ {
+ _, err = mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+ if err == nil {
+ break
+ }
+ time.Sleep(time.Second)
+ }
+ if err != nil {
+ t.Errorf("Running GetClusterInfo after stopping first node failed: %v", err)
+ }
+
+ // Close the builder, new dials should fail.
+ r.Close()
+ _, err = grpc.Dial(MetropolisControlAddress, grpc.WithTransportCredentials(creds), grpc.WithResolvers(r), grpc.WithBlock())
+ // String comparison required because the underlying gRPC code does not wrap the
+ // error.
+ if want, got := ResolverClosed, err; !strings.Contains(got.Error(), want.Error()) {
+ t.Errorf("Unexpected dial error after closing builder, wanted %q, got %q", want, got)
+ }
+}