m/n/c/rpc: make resolver leader-aware

This is a fairly large change which makes the resolver only contact the
current leader of the control plane, not all nodes in a round-robin
fashion.

This resolver isn't yet used by the cluster, the tests, or metroctl -
but that will come in upcoming CLs.

We also move the resolver to a subpackage of rpc, in preparation for
moving it into a package path designed to be depended upon by users.

Change-Id: I230853bbf552fbde947de05f95de37cea93a168c
Reviewed-on: https://review.monogon.dev/c/monogon/+/795
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/rpc/BUILD.bazel b/metropolis/node/core/rpc/BUILD.bazel
index e4ff25f..c530a65 100644
--- a/metropolis/node/core/rpc/BUILD.bazel
+++ b/metropolis/node/core/rpc/BUILD.bazel
@@ -6,7 +6,6 @@
         "client.go",
         "methodinfo.go",
         "peerinfo.go",
-        "resolver.go",
         "server.go",
         "server_authentication.go",
         "testhelpers.go",
@@ -15,18 +14,15 @@
     importpath = "source.monogon.dev/metropolis/node/core/rpc",
     visibility = ["//visibility:public"],
     deps = [
-        "//metropolis/node/core/curator/proto/api",
         "//metropolis/node/core/identity",
         "//metropolis/pkg/logtree",
         "//metropolis/pkg/pki",
         "//metropolis/proto/api",
         "//metropolis/proto/ext",
-        "@com_github_cenkalti_backoff_v4//:backoff",
         "@org_golang_google_grpc//:go_default_library",
         "@org_golang_google_grpc//codes",
         "@org_golang_google_grpc//credentials",
         "@org_golang_google_grpc//peer",
-        "@org_golang_google_grpc//resolver",
         "@org_golang_google_grpc//status",
         "@org_golang_google_protobuf//encoding/prototext",
         "@org_golang_google_protobuf//proto",
@@ -38,7 +34,6 @@
 go_test(
     name = "rpc_test",
     srcs = [
-        "resolver_test.go",
         "server_authentication_test.go",
         "trace_test.go",
     ],
@@ -47,11 +42,9 @@
         "//metropolis/node/core/curator/proto/api",
         "//metropolis/pkg/logtree",
         "//metropolis/proto/api",
-        "//metropolis/proto/common",
         "//metropolis/proto/ext",
         "@org_golang_google_grpc//:go_default_library",
         "@org_golang_google_grpc//codes",
-        "@org_golang_google_grpc//credentials",
         "@org_golang_google_grpc//status",
         "@org_golang_google_grpc//test/bufconn",
     ],
diff --git a/metropolis/node/core/rpc/resolver.go b/metropolis/node/core/rpc/resolver.go
deleted file mode 100644
index d4e9dff..0000000
--- a/metropolis/node/core/rpc/resolver.go
+++ /dev/null
@@ -1,352 +0,0 @@
-package rpc
-
-import (
-	"context"
-	"errors"
-	"fmt"
-	"sync"
-	"time"
-
-	"github.com/cenkalti/backoff/v4"
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/credentials"
-	"google.golang.org/grpc/resolver"
-
-	cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-)
-
-const (
-	MetropolisControlAddress = "metropolis:///control"
-)
-
-// ClusterResolver is a gRPC resolver Builder that can be passed to
-// grpc.WithResolvers() when dialing a gRPC endpoint.
-//
-// It's responsible for resolving the magic MetropolisControlAddress
-// (metropolis:///control) into all Metropolis nodes running control plane
-// services, ie. the Curator.
-//
-// To function, the ClusterResolver needs to be provided with at least one node
-// address. Afterwards, it will continuously update an internal list of nodes
-// which can be contacted for access to control planes services, and gRPC
-// clients using this resolver will automatically try the available addresses
-// for each RPC call in a round-robin fashion.
-//
-// The ClusterResolver is designed to be used as a long-running objects which
-// multiple gRPC client connections can use. Usually one ClusterResolver
-// instance should be used per application.
-type ClusterResolver struct {
-	ctx  context.Context
-	ctxC context.CancelFunc
-
-	// logger, if set, will be called with fmt.Sprintf-like arguments containing
-	// debug logs from the running ClusterResolver, subordinate watchers and
-	// updaters.
-	logger func(f string, args ...interface{})
-
-	condCurators  *sync.Cond
-	curators      map[string]string
-	condTLSConfig *sync.Cond
-	tlsConfig     credentials.TransportCredentials
-}
-
-// AddNode provides a given node ID at a given address as an initial (or
-// additional) node for the ClusterResolver to update cluster information
-// from.
-func (b *ClusterResolver) AddNode(name, remote string) {
-	b.condCurators.L.Lock()
-	defer b.condCurators.L.Unlock()
-
-	b.curators[name] = remote
-	b.condCurators.Broadcast()
-}
-
-// NewClusterResolver creates an empty ClusterResolver. It must be populated
-// with initial node information for any gRPC call that uses it to succeed.
-func NewClusterResolver() *ClusterResolver {
-	ctx, ctxC := context.WithCancel(context.Background())
-	b := &ClusterResolver{
-		ctx:           ctx,
-		ctxC:          ctxC,
-		logger:        func(f string, args ...interface{}) {},
-		condCurators:  sync.NewCond(&sync.Mutex{}),
-		curators:      make(map[string]string),
-		condTLSConfig: sync.NewCond(&sync.Mutex{}),
-	}
-
-	go b.run(b.ctx)
-
-	return b
-}
-
-var (
-	ResolverClosed = errors.New("cluster resolver closed")
-)
-
-// Close the ClusterResolver to clean up background goroutines. The node address
-// resolution process stops and all future connections done via this
-// ClusterResolver will continue to use whatever node addresses were last known.
-// However, new attempts to dial using this ClusterResolver will fail.
-func (b *ClusterResolver) Close() {
-	b.ctxC()
-}
-
-// run is the main loop of the ClusterResolver. Its job is to wait for a TLS
-// config from a gRPC client, and iterate through available node addresses to
-// start an updater on. The updater will then communicate back to this goroutine
-// with up to date node information. In case an updater cannot run anymore (eg.
-// a node stopped working), the main loop of run restarts and another endpoint
-// will be picked.
-func (b *ClusterResolver) run(ctx context.Context) {
-	bo := backoff.NewExponentialBackOff()
-	bo.MaxElapsedTime = 0
-
-	// Helper to update internal node list and notify all gRPC clients of it.
-	updateCurators := func(nodes map[string]string) {
-		b.condCurators.L.Lock()
-		b.curators = nodes
-		b.condCurators.L.Unlock()
-		b.condCurators.Broadcast()
-	}
-
-	// Helper to sleep for a given time, but with possible interruption by the
-	// resolver being stopped.
-	waitTimeout := func(t time.Duration) bool {
-		select {
-		case <-time.After(t):
-			return true
-		case <-ctx.Done():
-			return false
-		}
-	}
-
-	for {
-		b.logger("RESOLVER: waiting for TLS config...")
-		// Wait for a TLS config to be set.
-		b.condTLSConfig.L.Lock()
-		for b.tlsConfig == nil {
-			b.condTLSConfig.Wait()
-		}
-		creds := b.tlsConfig
-		b.condTLSConfig.L.Unlock()
-		b.logger("RESOLVER: have TLS config...")
-
-		// Iterate over endpoints to find a working one, and retrieve cluster-provided
-		// node info from there.
-		endpoints := b.addresses()
-		if len(endpoints) == 0 {
-			w := bo.NextBackOff()
-			b.logger("RESOLVER: no endpoints, waiting %s...", w)
-			if waitTimeout(w) {
-				b.logger("RESOLVER: canceled")
-				return
-			}
-			continue
-		}
-
-		b.logger("RESOLVER: starting endpoint loop with %v...", endpoints)
-		for name, endpoint := range endpoints {
-			upC := make(chan map[string]string)
-			b.logger("RESOLVER: starting updater pointed at %s/%s", name, endpoint)
-
-			// Start updater, which actually connects to the endpoint and provides back the
-			// newest set of nodes via upC.
-			go b.runUpdater(ctx, endpoint, creds, upC)
-
-			// Keep using this updater as long as possible. If it fails, restart the main
-			// loop.
-			failed := false
-			for {
-				var newNodes map[string]string
-				failed := false
-				select {
-				case newNodes = <-upC:
-					if newNodes == nil {
-						// Updater quit.
-						failed = true
-					}
-				case <-ctx.Done():
-					b.logger("RESOLVER: canceled")
-					updateCurators(nil)
-					return
-				}
-
-				if failed {
-					w := bo.NextBackOff()
-					b.logger("RESOLVER: updater failed, waiting %s...", w)
-					if waitTimeout(w) {
-						b.logger("RESOLVER: canceled")
-						return
-					}
-					b.logger("RESOLVER: done waiting")
-					break
-				} else {
-					bo.Reset()
-					updateCurators(newNodes)
-				}
-
-			}
-			// Restart entire ClusterResolver loop on failure.
-			if failed {
-				break
-			}
-		}
-	}
-}
-
-// runUpdaters runs the ClusterResolver's updater, which is a goroutine that
-// connects to a Curator running on a given node and feeds back information
-// about consensus members via updateC. If the endpoints fails (eg. because the
-// node went down), updateC will be closed.
-func (b *ClusterResolver) runUpdater(ctx context.Context, endpoint string, creds credentials.TransportCredentials, updateC chan map[string]string) {
-	defer close(updateC)
-	cl, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(creds))
-	if err != nil {
-		b.logger("UPDATER: dial failed: %v", err)
-		return
-	}
-	defer cl.Close()
-	cur := cpb.NewCuratorClient(cl)
-	w, err := cur.Watch(ctx, &cpb.WatchRequest{
-		Kind: &cpb.WatchRequest_NodesInCluster_{
-			NodesInCluster: &cpb.WatchRequest_NodesInCluster{},
-		},
-	})
-	if err != nil {
-		b.logger("UPDATER: watch failed: %v", err)
-		return
-	}
-
-	// Maintain a long-term set of node ID to node external address, and populate it
-	// from the Curator Watcher above.
-	nodes := make(map[string]string)
-	for {
-		ev, err := w.Recv()
-		if err != nil {
-			b.logger("UPDATER: recv failed: %v", err)
-			return
-		}
-		for _, node := range ev.Nodes {
-			if node.Roles.ConsensusMember == nil {
-				delete(nodes, node.Id)
-				continue
-			}
-			st := node.Status
-			if st == nil || st.ExternalAddress == "" {
-				delete(nodes, node.Id)
-				continue
-			}
-			nodes[node.Id] = st.ExternalAddress
-		}
-		for _, node := range ev.NodeTombstones {
-			delete(nodes, node.NodeId)
-		}
-		b.logger("UPDATER: new nodes: %v", nodes)
-		updateC <- nodes
-	}
-}
-
-// addresses returns the current set of node addresses that the ClusterResolver
-// considers as possible updater candidates.
-func (b *ClusterResolver) addresses() map[string]string {
-	b.condCurators.L.Lock()
-	defer b.condCurators.L.Unlock()
-
-	res := make(map[string]string)
-	for k, v := range b.curators {
-		res[k] = v
-	}
-	return res
-}
-
-// Build is called by gRPC on each Dial call. It spawns a new clientWatcher,
-// whose goroutine receives information about currently available nodes from the
-// parent ClusterResolver and actually updates a given gRPC client connection
-// with information about the current set of nodes.
-func (b *ClusterResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
-	// We can only connect to "metropolis://control".
-	if target.Scheme != "metropolis" || target.Authority != "" || target.Endpoint != "control" {
-		return nil, fmt.Errorf("invalid target: must be %s, is: %s", MetropolisControlAddress, target.Endpoint)
-	}
-
-	if opts.DialCreds == nil {
-		return nil, fmt.Errorf("can only be used with clients containing TransportCredentials")
-	}
-
-	if b.ctx.Err() != nil {
-		return nil, ResolverClosed
-	}
-
-	b.condTLSConfig.L.Lock()
-	// TODO(q3k): make sure we didn't receive different DialCreds for a different
-	// cluster or something.
-	b.tlsConfig = opts.DialCreds
-	b.condTLSConfig.Broadcast()
-	defer b.condTLSConfig.L.Unlock()
-
-	ctx, ctxC := context.WithCancel(b.ctx)
-	resolver := &clientWatcher{
-		builder:    b,
-		clientConn: cc,
-		ctx:        ctx,
-		ctxC:       ctxC,
-	}
-	go resolver.watch()
-	return resolver, nil
-}
-
-func (b *ClusterResolver) Scheme() string {
-	return "metropolis"
-}
-
-// clientWatcher is a subordinate structure to a given ClusterResolver,
-// updating a gRPC ClientConn with information about current endpoints.
-type clientWatcher struct {
-	builder    *ClusterResolver
-	clientConn resolver.ClientConn
-
-	ctx  context.Context
-	ctxC context.CancelFunc
-}
-
-func (r *clientWatcher) watch() {
-	// Craft a trivial gRPC service config which forces round-robin behaviour for
-	// RPCs. This makes the gRPC client contact all curators in a round-robin
-	// fashion. Ideally, we would prioritize contacting the leader, but this will do
-	// for now.
-	svcConfig := r.clientConn.ParseServiceConfig(`{ "loadBalancingConfig": [{"round_robin": {}}]}`)
-
-	// Watch for condCurators being updated.
-	r.builder.condCurators.L.Lock()
-	for {
-		if r.ctx.Err() != nil {
-			return
-		}
-
-		nodes := r.builder.curators
-		var addresses []resolver.Address
-		for n, addr := range nodes {
-			addresses = append(addresses, resolver.Address{
-				Addr:       addr,
-				ServerName: n,
-			})
-		}
-		r.builder.logger("WATCHER: new addresses: %v", addresses)
-		r.clientConn.UpdateState(resolver.State{
-			Addresses:     addresses,
-			ServiceConfig: svcConfig,
-		})
-		r.builder.condCurators.Wait()
-	}
-}
-
-func (r *clientWatcher) ResolveNow(_ resolver.ResolveNowOptions) {
-	// No-op. The clientWatcher's watcher runs as fast as possible.
-}
-
-func (r *clientWatcher) Close() {
-	r.ctxC()
-	// Spuriously interrupt all clientWatchers on this ClusterResolver so that this
-	// clientWatcher gets to notice it should quit. This isn't ideal.
-	r.builder.condCurators.Broadcast()
-}
diff --git a/metropolis/node/core/rpc/resolver/BUILD.bazel b/metropolis/node/core/rpc/resolver/BUILD.bazel
new file mode 100644
index 0000000..092ac19
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver/BUILD.bazel
@@ -0,0 +1,35 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+    name = "resolver",
+    srcs = [
+        "processor.go",
+        "resolver.go",
+        "watcher.go",
+    ],
+    importpath = "source.monogon.dev/metropolis/node/core/rpc/resolver",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//metropolis/node",
+        "//metropolis/node/core/curator/proto/api",
+        "//metropolis/proto/common",
+        "@com_github_cenkalti_backoff_v4//:backoff",
+        "@org_golang_google_grpc//:go_default_library",
+        "@org_golang_google_grpc//resolver",
+    ],
+)
+
+go_test(
+    name = "resolver_test",
+    srcs = ["resolver_test.go"],
+    embed = [":resolver"],
+    deps = [
+        "//metropolis/node/core/curator/proto/api",
+        "//metropolis/node/core/rpc",
+        "//metropolis/proto/api",
+        "//metropolis/proto/common",
+        "@com_github_cenkalti_backoff_v4//:backoff",
+        "@org_golang_google_grpc//:go_default_library",
+        "@org_golang_google_grpc//credentials",
+    ],
+)
diff --git a/metropolis/node/core/rpc/resolver/processor.go b/metropolis/node/core/rpc/resolver/processor.go
new file mode 100644
index 0000000..08f56b5
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver/processor.go
@@ -0,0 +1,281 @@
+package resolver
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"sort"
+
+	"google.golang.org/grpc"
+
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// request contains all possible requests passed to the processor. Only one
+// field can be set at a time. See the documentation of member structures for
+// more information about the possible requests.
+type request struct {
+	cmg   *requestCuratorMapGet
+	nu    *requestNodesUpdate
+	sa    *requestSeedAdd
+	oa    *requestOverrideAdd
+	lu    *requestLeaderUpdate
+	ds    *requestDialOptionsSet
+	sub   *requestSubscribe
+	unsub *requestUnsubscribe
+}
+
+// requestCuratorMapGet is received from any subsystem which wants to
+// immediately receive the current curatorMap as seen by the processor.
+type requestCuratorMapGet struct {
+	// resC carries the current curatorMap. It must be drained by the caller,
+	// otherwise the processor will get stuck.
+	resC chan *curatorMap
+}
+
+// requestNodesUpdate is received from the curator updater, and carries
+// information about the current curators as seen by the cluster control plane.
+type requestNodesUpdate struct {
+	// nodes is a map from node ID to received status
+	nodes map[string]*cpb.NodeStatus
+}
+
+// requestSeedAdd is received from AddEndpoint calls. It updates the processor's
+// curator map with the given seed.
+type requestSeedAdd struct {
+	endpoint *NodeEndpoint
+}
+
+// requestOverrideAdd is received from AddOverride calls. It updates the
+// processor's curator map with the given override.
+type requestOverrideAdd struct {
+	nodeID   string
+	endpoint *NodeEndpoint
+}
+
+// requestLeaderUpdate is received from the leader watcher whenever a new leader
+// is found from any curator.
+type requestLeaderUpdate struct {
+	nodeID   string
+	endpoint *NodeEndpoint
+}
+
+// requestDialOptionsSet is received from any subordinate watchers when a client
+// connects with the given dial options. The processor will use the first
+// options received this way to establish connectivity to curators.
+type requestDialOptionsSet struct {
+	options []grpc.DialOption
+}
+
+// requestSubscribe is received from subordinate watchers. The processor will
+// then create a subscription channel that will be populated with updates about
+// the current leader.
+type requestSubscribe struct {
+	resC chan *responseSubscribe
+}
+
+type responseSubscribe struct {
+	// id is the ID of the subscription, used to cancel the subscription by the
+	// subscriber.
+	id int64
+	// subC carries updates about the current leader. The subscriber must drain the
+	// updates as fast as possible, otherwise the processing loop will be stopped.
+	subC chan *update
+}
+
+type update struct {
+	// node ID of the current leader.
+	nodeID string
+	// endpoint of the current leader.
+	endpoint *NodeEndpoint
+}
+
+// requestUnsubscribe is received from subordinate watcher to cancel a given
+// subscription.
+type requestUnsubscribe struct {
+	id int64
+}
+
+// run the resolver's processor, which is the main processing loop. It received
+// updates from users, watchers, the curator updater and the leader updater.
+func (r *Resolver) run(ctx context.Context) error {
+	// Current curator map.
+	curMap := newCuratorMap()
+
+	// Current leader.
+	var leader *requestLeaderUpdate
+
+	// Subscribers.
+	subscribers := make(map[int64]chan *update)
+	subscriberIDNext := int64(0)
+
+	// Whether the curator updater and leader updater have been started. This is
+	// only done once we receive dial options from a watcher.
+	running := false
+
+	for {
+		// Receive a request. Quit if our context gets canceled in the meantime.
+		var req *request
+		select {
+		case <-ctx.Done():
+			// Close all subscription channels, ensuring all the watchers get notified that
+			// the resolver has closed.
+			for _, subC := range subscribers {
+				close(subC)
+			}
+			return ctx.Err()
+		case req = <-r.reqC:
+		}
+
+		// Dispatch request.
+		switch {
+		case req.cmg != nil:
+			// Curator Map Get
+			req.cmg.resC <- curMap
+		case req.nu != nil:
+			// Nodes Update
+			for nid, status := range req.nu.nodes {
+				// Skip nodes which aren't running the curator right now.
+				if status == nil || status.RunningCurator == nil {
+					continue
+				}
+				addr := net.JoinHostPort(status.ExternalAddress, fmt.Sprintf("%d", status.RunningCurator.Port))
+				if a, ok := curMap.overrides[nid]; ok {
+					addr = a.endpoint
+				}
+
+				curMap.curators[nid] = &NodeEndpoint{
+					endpoint: addr,
+				}
+			}
+			toDelete := make(map[string]bool)
+			for nid, _ := range curMap.curators {
+				if req.nu.nodes[nid] == nil {
+					toDelete[nid] = true
+				}
+			}
+			for nid, _ := range toDelete {
+				delete(curMap.curators, nid)
+			}
+		case req.sa != nil:
+			// Seed Add
+			curMap.seeds[req.sa.endpoint.endpoint] = true
+		case req.oa != nil:
+			// Override Add
+			curMap.overrides[req.oa.nodeID] = req.oa.endpoint
+		case req.lu != nil:
+			// Leader Update
+			leader = req.lu
+			for _, s := range subscribers {
+				s <- &update{
+					nodeID:   leader.nodeID,
+					endpoint: leader.endpoint,
+				}
+			}
+		case req.ds != nil:
+			// Dial options Set
+			if !running {
+				go r.runCuratorUpdater(ctx, req.ds.options)
+				go r.runLeaderUpdater(ctx, req.ds.options)
+			}
+			running = true
+		case req.sub != nil:
+			// Subscribe
+			id := subscriberIDNext
+			subC := make(chan *update)
+			req.sub.resC <- &responseSubscribe{
+				id:   id,
+				subC: subC,
+			}
+			subscriberIDNext++
+			subscribers[id] = subC
+
+			// Provide current leader if missing.
+			if leader != nil {
+				subC <- &update{
+					nodeID:   leader.nodeID,
+					endpoint: leader.endpoint,
+				}
+			}
+		case req.unsub != nil:
+			// Unsubscribe
+			if subscribers[req.unsub.id] != nil {
+				close(subscribers[req.unsub.id])
+				delete(subscribers, req.unsub.id)
+			}
+		default:
+			panic(fmt.Sprintf("unhandled request: %+v", req))
+		}
+	}
+}
+
+// curatorMap is the main state of the cluster as seen by the resolver's processor.
+type curatorMap struct {
+	// curators is a map from node ID to endpoint of nodes that are currently
+	// running the curator. This is updated by the processor through the curator
+	// updater.
+	curators map[string]*NodeEndpoint
+	// overrides are user-provided node ID to endpoint overrides. They are applied
+	// to the curators in the curator map (above) and the leader information as
+	// retrieved by the leader updater.
+	overrides map[string]*NodeEndpoint
+	// seeds are endpoints provided by the user that the leader updater will use to
+	// bootstrap initial cluster connectivity.
+	seeds map[string]bool
+}
+
+func newCuratorMap() *curatorMap {
+	return &curatorMap{
+		curators:  make(map[string]*NodeEndpoint),
+		overrides: make(map[string]*NodeEndpoint),
+		seeds:     make(map[string]bool),
+	}
+}
+
+func (m *curatorMap) copy() *curatorMap {
+	res := newCuratorMap()
+	for k, v := range m.curators {
+		res.curators[k] = v
+	}
+	for k, v := range m.overrides {
+		res.overrides[k] = v
+	}
+	for k, v := range m.seeds {
+		res.seeds[k] = v
+	}
+	return res
+}
+
+// candidates returns the curator endpoints that should be used to attempt to
+// retrieve the current leader from. This is a combination of the curators
+// received by the curator updater, and the seeds provided by the user.
+func (m *curatorMap) candidates() []string {
+	resMap := make(map[string]bool)
+	for ep, _ := range m.seeds {
+		resMap[ep] = true
+	}
+	for nid, v := range m.curators {
+		if o, ok := m.overrides[nid]; ok {
+			resMap[o.endpoint] = true
+		} else {
+			resMap[v.endpoint] = true
+		}
+	}
+	var res []string
+	for ep, _ := range resMap {
+		res = append(res, ep)
+	}
+	sort.Strings(res)
+	return res
+}
+
+// curatorMap returns the current curator map as seen by the resolver processor.
+func (r *Resolver) curatorMap() *curatorMap {
+	req := &request{
+		cmg: &requestCuratorMapGet{
+			resC: make(chan *curatorMap),
+		},
+	}
+	r.reqC <- req
+	return <-req.cmg.resC
+}
diff --git a/metropolis/node/core/rpc/resolver/resolver.go b/metropolis/node/core/rpc/resolver/resolver.go
new file mode 100644
index 0000000..39a0124
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver/resolver.go
@@ -0,0 +1,341 @@
+package resolver
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"net"
+	"regexp"
+	"strings"
+	"time"
+
+	"github.com/cenkalti/backoff/v4"
+	"google.golang.org/grpc"
+
+	common "source.monogon.dev/metropolis/node"
+	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+const (
+	// MetropolisControlAddress is the address of the current Metropolis leader as
+	// accepted by the Resolver. Dialing a gRPC channel to this address while the
+	// Resolver is used will open the channel to the current leader of the
+	// Metropolis control plane.
+	MetropolisControlAddress = "metropolis:///control"
+)
+
+// Resolver is a gRPC resolver Builder that can be passed to
+// grpc.WithResolvers() when dialing a gRPC endpoint.
+//
+// It's responsible for resolving the magic MetropolisControlAddress
+// (metropolis:///control) into an address of the node that is currently the
+// leader of the cluster's control plane.
+//
+// To function, the ClusterResolver needs to be provided with at least one
+// control plane node address. It will use these addresses to retrieve the
+// address of the node which is the current leader of the control plane.
+//
+// Then, having established communication with the leader, it will continuously
+// update an internal set of control plane node endpoints (the curator map) that
+// will be contacted in the future about the state of the leadership when the
+// current leader fails over.
+//
+// The resolver will wait for a first gRPC connection established through it to
+// extract the transport credentials used, then use these credentials to call
+// the Curator and CuratorLocal services on control plane nodes to perform its
+// logic.
+//
+// This resolver is designed to be used as a long-running object which multiple
+// gRPC client connections can use. Usually one ClusterResolver instance should
+// be used per application.
+//
+// .------------------------.        .--------------------------------------.
+// | Metropolis Cluster     |        | Resolver                             |
+// :------------------------:        :--------------------------------------:
+// :                        :        :                                      :
+// : .--------------------. :        :   .----------------.                 :
+// : | curator (follower) |<---.---------| Leader Updater |------------.    :
+// : '--------------------' :  |     :   '----------------'            |    :
+// : .--------------------. :  |     :   .------------------------.    |    :
+// : | curator (follower) |<---:     :   | Processor (CuratorMap) |<-.-'-.  :
+// : '--------------------' :  |     :   '------------------------'  |   |  :
+// : .--------------------.<---'     :   .-----------------.         |   |  :
+// : | curator (leader)   |<-------------| Curator Updater |---------'   |  :
+// : '--------------------' :        :   '-----------------'             |  :
+// :                        :        :                                   |  :
+// '------------------------'        :   .----------.                    |  :
+//                                   :   | Watchers |-.                  |  :
+//                                   :   '----------' |------------------'  :
+//                                   :     '-^--------'                     :
+//                                   :       |  ^                           :
+//                                   :       |  |                           :
+//                                        .---------------.
+//                                        | gRPC channels |
+//                                        '---------------'
+type Resolver struct {
+	reqC chan *request
+	ctx  context.Context
+
+	// logger, if set, will be called with fmt.Sprintf-like arguments containing
+	// debug logs from the running ClusterResolver, subordinate watchers and
+	// updaters.
+	logger func(f string, args ...interface{})
+}
+
+// SetLogger configures a given function as the logger of the resolver. The
+// function should take a printf-style format string and arguments.
+func (r *Resolver) SetLogger(logger func(f string, args ...interface{})) {
+	r.logger = logger
+}
+
+// New starts a new Resolver, ready to be used as a gRPC via WithResolvers.
+// However, it needs to be populated with at least one endpoint first (via
+// AddEndpoint).
+func New(ctx context.Context) *Resolver {
+	r := &Resolver{
+		reqC:   make(chan *request),
+		ctx:    ctx,
+		logger: func(string, ...interface{}) {},
+	}
+	go r.run(ctx)
+	return r
+}
+
+// NodeEndpoint is the gRPC endpoint (host+port) of a Metropolis control plane
+// node.
+type NodeEndpoint struct {
+	endpoint string
+}
+
+// NodeWithDefaultPort returns a NodeEndpoint referencing the default control
+// plane port (the Curator port) of a node resolved by its ID over DNS. This is
+// the easiest way to construct a NodeEndpoint provided DNS is fully set up.
+func NodeWithDefaultPort(id string) (*NodeEndpoint, error) {
+	if m, _ := regexp.MatchString(`metropolis-[a-f0-9]+`, id); !m {
+		return nil, fmt.Errorf("invalid node ID")
+	}
+	return NodeByHostPort(id, uint16(common.CuratorServicePort)), nil
+}
+
+// NodeByHostPort returns a NodeEndpoint for a fully specified host + port pair.
+// The host can either be a hostname or an IP address.
+func NodeByHostPort(host string, port uint16) *NodeEndpoint {
+	return &NodeEndpoint{
+		endpoint: net.JoinHostPort(host, fmt.Sprintf("%d", port)),
+	}
+}
+
+// nodeAtListener is used in tests to connect to the address of a given listener.
+func nodeAtListener(lis net.Listener) *NodeEndpoint {
+	return &NodeEndpoint{
+		endpoint: lis.Addr().String(),
+	}
+}
+
+// AddEndpoint tells the resolver that it should attempt to reach the cluster
+// through a node available at the given NodeEndpoint.
+//
+// The resolver will make use of this during the next leadership find routine,
+// but this node might then get overridden when the resolver retrieves the
+// newest set of Curators from the acquired leader.
+func (r *Resolver) AddEndpoint(endpoint *NodeEndpoint) {
+	select {
+	case <-r.ctx.Done():
+		return
+	case r.reqC <- &request{
+		sa: &requestSeedAdd{
+			endpoint: endpoint,
+		},
+	}:
+	}
+}
+
+// AddOverride adds a long-lived override which forces the resolver to assume
+// that a given node (by ID) is available at the given endpoint, instead of at
+// whatever endpoint is reported by the cluster. This should be used sparingly
+// outside the cluster, and is mostly designed so that nodes which connect to
+// themselves can do so over the loopback address instead of their (possibly
+// changing) external address.
+func (r *Resolver) AddOverride(id string, ep *NodeEndpoint) {
+	select {
+	case <-r.ctx.Done():
+		return
+	case r.reqC <- &request{
+		oa: &requestOverrideAdd{
+			nodeID:   id,
+			endpoint: ep,
+		},
+	}:
+	}
+}
+
+// runCuratorUpdater runs the curator updater, noted in logs as CURUPDATE. It
+// uses the resolver itself to contact the current leader, retrieve all nodes
+// which are running a curator, and populate the processor's curator list in the
+// curatorMap. That curatorMap will then be used by the leader updater to find
+// the current leader.
+func (r *Resolver) runCuratorUpdater(ctx context.Context, opts []grpc.DialOption) error {
+	bo := backoff.NewExponentialBackOff()
+	bo.MaxElapsedTime = 0
+	bo.MaxInterval = 10 * time.Second
+
+	return backoff.RetryNotify(func() error {
+		opts = append(opts, grpc.WithResolvers(r))
+		cl, err := grpc.Dial(MetropolisControlAddress, opts...)
+		if err != nil {
+			// This generally shouldn't happen.
+			return fmt.Errorf("could not dial gRPC: %v", err)
+		}
+		defer cl.Close()
+
+		cur := apb.NewCuratorClient(cl)
+		w, err := cur.Watch(ctx, &apb.WatchRequest{
+			Kind: &apb.WatchRequest_NodesInCluster_{
+				NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+			},
+		})
+		if err != nil {
+			return fmt.Errorf("could not watch nodes: %v", err)
+		}
+
+		// Map from node ID to status.
+		nodes := make(map[string]*cpb.NodeStatus)
+
+		// Keep updating map from watcher.
+		for {
+			ev, err := w.Recv()
+			if err != nil {
+				return fmt.Errorf("when receiving node: %w", err)
+			}
+			bo.Reset()
+
+			// Update internal map.
+			for _, n := range ev.Nodes {
+				nodes[n.Id] = n.Status
+			}
+			for _, n := range ev.NodeTombstones {
+				delete(nodes, n.NodeId)
+			}
+
+			// Make a copy, this time only curators.
+			curators := make(map[string]*cpb.NodeStatus)
+			var curatorNames []string
+			for k, v := range nodes {
+				if v == nil || v.RunningCurator == nil {
+					continue
+				}
+				curators[k] = v
+				curatorNames = append(curatorNames, k)
+			}
+			r.logger("CURUPDATE: got new curators: %s", strings.Join(curatorNames, ", "))
+
+			select {
+			case r.reqC <- &request{nu: &requestNodesUpdate{nodes: curators}}:
+			case <-ctx.Done():
+				return ctx.Err()
+			}
+		}
+	}, backoff.WithContext(bo, ctx), func(err error, t time.Duration) {
+		r.logger("CURUPDATE: error in loop: %v", err)
+		r.logger("CURUPDATE: retrying in %s...", t.String())
+	})
+}
+
+// runLeaderUpdater runs the leader updater, noted in logs as FINDLEADER and
+// WATCHLEADER. It uses the curator map from the resolver processor to find the
+// current leader.
+func (r *Resolver) runLeaderUpdater(ctx context.Context, opts []grpc.DialOption) error {
+	bo := backoff.NewExponentialBackOff()
+	bo.MaxElapsedTime = 0
+	bo.MaxInterval = 10 * time.Second
+
+	return backoff.RetryNotify(func() error {
+		curMap := r.curatorMap()
+		for _, endpoint := range curMap.candidates() {
+			ok := r.watchLeaderVia(ctx, endpoint, opts)
+			if ok {
+				bo.Reset()
+			}
+		}
+		return fmt.Errorf("out of endpoints")
+	}, backoff.WithContext(bo, ctx), func(err error, t time.Duration) {
+		r.logger("FINDLEADER: error in loop: %v, retrying in %s...", err, t.String())
+	})
+}
+
+// watchLeaderVia connects to the endpoint defined by 'via' and attempts to
+// continuously update the current leader (b.leader) based on data returned from
+// it. Whenever new information about a leader is available, b.condLeader is
+// updated.
+//
+// A boolean value is returned indicating whether the update was at all
+// successful. This is used by retry logic to figure out whether to wait before
+// retrying or not.
+func (r *Resolver) watchLeaderVia(ctx context.Context, via string, opts []grpc.DialOption) bool {
+	cl, err := grpc.Dial(via, opts...)
+	if err != nil {
+		r.logger("WATCHLEADER: dialing %s failed: %v", via, err)
+		return false
+	}
+	defer cl.Close()
+	cpl := apb.NewCuratorLocalClient(cl)
+
+	cur, err := cpl.GetCurrentLeader(ctx, &apb.GetCurrentLeaderRequest{})
+	if err != nil {
+		r.logger("WATCHLEADER: failed to retrieve current leader from %s: %v", via, err)
+		return false
+	}
+	ok := false
+	for {
+		leaderInfo, err := cur.Recv()
+		if err == io.EOF {
+			r.logger("WATCHLEADER: connection with %s closed", via)
+			return ok
+		}
+		if err != nil {
+			r.logger("WATCHLEADER: connection with %s failed: %v", via, err)
+			return ok
+		}
+
+		curMap := r.curatorMap()
+
+		viaID := leaderInfo.ThisNodeId
+		if viaID == "" {
+			// This shouldn't happen, but let's handle this just in case
+			viaID = fmt.Sprintf("UNKNOWN NODE ID (%s)", via)
+		}
+
+		if leaderInfo.LeaderNodeId == "" {
+			r.logger("WATCHLEADER: %s does not know the leader, trying next", viaID)
+			return false
+		}
+		endpoint := ""
+		if leaderInfo.LeaderHost == "" {
+			// This node knows the leader, but doesn't know its host. Perhaps we have an
+			// override for this?
+			if ep, ok := curMap.overrides[leaderInfo.LeaderNodeId]; ok {
+				endpoint = ep.endpoint
+			}
+		} else {
+			if leaderInfo.LeaderPort == 0 {
+				r.logger("WATCHLEADER: %s knows the leader's host (%s), but not its' port", viaID, leaderInfo.LeaderHost)
+				return false
+			}
+			endpoint = net.JoinHostPort(leaderInfo.LeaderHost, fmt.Sprintf("%d", leaderInfo.LeaderPort))
+		}
+
+		r.logger("WATCHLEADER: got new leader: %s (%s) via %s", leaderInfo.LeaderNodeId, endpoint, viaID)
+
+		select {
+		case <-ctx.Done():
+			return ok
+		case r.reqC <- &request{lu: &requestLeaderUpdate{
+			nodeID:   leaderInfo.LeaderNodeId,
+			endpoint: &NodeEndpoint{endpoint: endpoint},
+		}}:
+		}
+
+		ok = true
+	}
+}
diff --git a/metropolis/node/core/rpc/resolver/resolver_test.go b/metropolis/node/core/rpc/resolver/resolver_test.go
new file mode 100644
index 0000000..0de45e1
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver/resolver_test.go
@@ -0,0 +1,244 @@
+package resolver
+
+import (
+	"context"
+	"crypto/tls"
+	"fmt"
+	"log"
+	"net"
+	"strconv"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/cenkalti/backoff/v4"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/node/core/rpc"
+	apb "source.monogon.dev/metropolis/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// fakeCuratorClusterAware is a fake curator implementation that has a vague
+// concept of a cluster and the current leader within a cluster. Every instance
+// of a fakeCuratorClusterAware is backed by a net.Listener, and is aware of
+// the other implementations' net.Listeners.
+type fakeCuratorClusterAware struct {
+	ipb.UnimplementedCuratorServer
+	apb.UnimplementedAAAServer
+	apb.UnimplementedManagementServer
+
+	// mu guards all the other fields of this struct.
+	mu sync.Mutex
+	// listeners is the collection of all listeners that make up this cluster, keyed
+	// by node ID.
+	listeners map[string]net.Listener
+	// thisNode is the node ID of this fake.
+	thisNode string
+	// leader is the node ID of the leader of the cluster.
+	leader string
+}
+
+// Watch implements a minimum Watch which just returns all nodes at once.
+func (t *fakeCuratorClusterAware) Watch(_ *ipb.WatchRequest, srv ipb.Curator_WatchServer) error {
+	var nodes []*ipb.Node
+	for name, listener := range t.listeners {
+		addr := listener.Addr().String()
+		host, port, _ := net.SplitHostPort(addr)
+		portNum, _ := strconv.ParseUint(port, 10, 16)
+
+		nodes = append(nodes, &ipb.Node{
+			Id:    name,
+			Roles: &cpb.NodeRoles{ConsensusMember: &cpb.NodeRoles_ConsensusMember{}},
+			Status: &cpb.NodeStatus{
+				ExternalAddress: host,
+				RunningCurator: &cpb.NodeStatus_RunningCurator{
+					Port: int32(portNum),
+				},
+			},
+		})
+	}
+	err := srv.Send(&ipb.WatchEvent{
+		Nodes: nodes,
+	})
+	if err != nil {
+		return err
+	}
+	<-srv.Context().Done()
+	return srv.Context().Err()
+}
+
+// GetCurrentLeader returns this fake cluster's current leader, based on
+// thisNode and leader from the struct.
+func (t *fakeCuratorClusterAware) GetCurrentLeader(req *ipb.GetCurrentLeaderRequest, srv ipb.CuratorLocal_GetCurrentLeaderServer) error {
+	ctx := srv.Context()
+
+	t.mu.Lock()
+	leaderName := t.leader
+	thisNode := t.thisNode
+	t.mu.Unlock()
+	leader := t.listeners[leaderName]
+
+	host, port, _ := net.SplitHostPort(leader.Addr().String())
+	portNum, _ := strconv.ParseUint(port, 10, 16)
+	srv.Send(&ipb.GetCurrentLeaderResponse{
+		LeaderNodeId: leaderName,
+		LeaderHost:   host,
+		LeaderPort:   int32(portNum),
+		ThisNodeId:   thisNode,
+	})
+	<-ctx.Done()
+	return ctx.Err()
+}
+
+// TestResolverSimple exercises the happy path of the gRPC ResolverBuilder,
+// checking that a single node can be used to bootstrap multiple nodes from, and
+// ensuring that nodes are being dialed in a round-robin fashion.
+//
+// TODO(q3k): exercise node removal and re-dial of updater to another node.
+func TestResolverSimple(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	// Make three nodes for testing, each with its own bufconn listener.
+	numCurators := 3
+	eph := rpc.NewEphemeralClusterCredentials(t, numCurators)
+
+	listeners := make([]net.Listener, numCurators)
+	for i := 0; i < numCurators; i++ {
+		lis, err := net.Listen("tcp", "")
+		if err != nil {
+			t.Fatalf("Listen failed: %v", err)
+		}
+		listeners[i] = lis
+	}
+
+	// Make fakeCuratorClusterAware implementations for every node.
+	listenerMap := make(map[string]net.Listener)
+	for i, lis := range listeners {
+		log.Printf("Test listener: %s", lis.Addr().String())
+		name := eph.Nodes[i].ID()
+		listenerMap[name] = lis
+	}
+	impls := make([]*fakeCuratorClusterAware, numCurators)
+
+	for i := 0; i < numCurators; i++ {
+		impls[i] = &fakeCuratorClusterAware{
+			listeners: listenerMap,
+			leader:    eph.Nodes[0].ID(),
+			thisNode:  eph.Nodes[i].ID(),
+		}
+	}
+
+	// Make gRPC servers for every node.
+	servers := make([]*grpc.Server, numCurators)
+	for i := 0; i < numCurators; i++ {
+		i := i
+		ss := rpc.ServerSecurity{
+			NodeCredentials: eph.Nodes[i],
+		}
+		servers[i] = grpc.NewServer(ss.GRPCOptions(nil)...)
+		ipb.RegisterCuratorServer(servers[i], impls[i])
+		apb.RegisterAAAServer(servers[i], impls[i])
+		apb.RegisterManagementServer(servers[i], impls[i])
+		ipb.RegisterCuratorLocalServer(servers[i], impls[i])
+		go func() {
+			if err := servers[i].Serve(listeners[i]); err != nil {
+				t.Fatalf("GRPC serve failed: %v", err)
+			}
+		}()
+
+		defer listeners[i].Close()
+		defer servers[i].Stop()
+	}
+
+	// Create our DUT resolver.
+	r := New(ctx)
+	r.logger = func(f string, args ...interface{}) {
+		log.Printf(f, args...)
+	}
+
+	creds := credentials.NewTLS(&tls.Config{
+		Certificates:       []tls.Certificate{eph.Manager},
+		InsecureSkipVerify: true,
+	})
+	cl, err := grpc.Dial("metropolis:///control", grpc.WithTransportCredentials(creds), grpc.WithResolvers(r))
+	if err != nil {
+		t.Fatalf("Could not dial: %v", err)
+	}
+
+	// Test logic follows.
+
+	// Add first node to bootstrap node information from.
+	r.AddEndpoint(nodeAtListener(listeners[0]))
+
+	// The first node should be answering.
+	cpl := ipb.NewCuratorLocalClient(cl)
+	srv, err := cpl.GetCurrentLeader(ctx, &ipb.GetCurrentLeaderRequest{})
+	if err != nil {
+		t.Fatalf("GetCurrentLeader: %v", err)
+	}
+	leader, err := srv.Recv()
+	if err != nil {
+		t.Fatalf("GetCurrentLeader.Recv: %v", err)
+	}
+	if want, got := eph.Nodes[0].ID(), leader.ThisNodeId; want != got {
+		t.Fatalf("Expected node %q to answer (current leader), got answer from %q", want, got)
+	}
+
+	// Wait for all curators to be picked up by the resolver.
+	bo := backoff.NewExponentialBackOff()
+	bo.MaxInterval = time.Second
+	bo.MaxElapsedTime = 10 * time.Second
+	err = backoff.Retry(func() error {
+		req := &request{
+			cmg: &requestCuratorMapGet{
+				resC: make(chan *curatorMap),
+			},
+		}
+		r.reqC <- req
+		cm := <-req.cmg.resC
+		if len(cm.curators) == 3 {
+			return nil
+		}
+		return fmt.Errorf("have %d leaders, wanted 3", len(cm.curators))
+	}, bo)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Move leadership to second node, resolver should follow.
+	servers[0].Stop()
+	for _, impl := range impls {
+		impl.mu.Lock()
+	}
+	for _, impl := range impls {
+		impl.leader = eph.Nodes[1].ID()
+	}
+	for _, impl := range impls {
+		impl.mu.Unlock()
+	}
+
+	// Give it a few attempts. This isn't time bound, we _expect_ the resolver to
+	// move over quickly to the correct leader.
+	for i := 0; i < 3; i += 1 {
+		srv, err = cpl.GetCurrentLeader(ctx, &ipb.GetCurrentLeaderRequest{})
+		if err != nil {
+			time.Sleep(time.Second)
+			continue
+		}
+		leader, err = srv.Recv()
+		if err != nil {
+			continue
+		}
+		if want, got := eph.Nodes[1].ID(), leader.ThisNodeId; want != got {
+			t.Fatalf("Expected node %q to answer (current leader), got answer from %q", want, got)
+		}
+		break
+	}
+	if err != nil {
+		t.Fatalf("GetCurrentLeader after leadership change: %v", err)
+	}
+}
diff --git a/metropolis/node/core/rpc/resolver/watcher.go b/metropolis/node/core/rpc/resolver/watcher.go
new file mode 100644
index 0000000..281fc59
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver/watcher.go
@@ -0,0 +1,122 @@
+package resolver
+
+import (
+	"errors"
+	"fmt"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/resolver"
+)
+
+// clientWatcher is a subordinate structure to a given ClusterResolver,
+// updating a gRPC ClientConn with information about current endpoints.
+type clientWatcher struct {
+	resolver     *Resolver
+	clientConn   resolver.ClientConn
+	subscription *responseSubscribe
+}
+
+var (
+	// ResolverClosed will be returned by the resolver to gRPC machinery whenever a
+	// resolver cannot be used anymore because it was Closed.
+	ResolverClosed = errors.New("cluster resolver closed")
+)
+
+// Build is called by gRPC on each Dial call. It spawns a new clientWatcher,
+// whose goroutine receives information about currently available nodes from the
+// parent ClusterResolver and actually updates a given gRPC client connection
+// with information about the current set of nodes.
+func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
+	// We can only connect to "metropolis://control".
+	if target.Scheme != "metropolis" || target.Authority != "" || target.Endpoint != "control" {
+		return nil, fmt.Errorf("invalid target: must be %s, is: %s", MetropolisControlAddress, target.Endpoint)
+	}
+
+	if opts.DialCreds == nil {
+		return nil, fmt.Errorf("can only be used with clients containing TransportCredentials")
+	}
+
+	// Submit the dial options to the resolver's processor, quitting if the resolver
+	// gets canceled in the meantime.
+	options := []grpc.DialOption{
+		grpc.WithTransportCredentials(opts.DialCreds),
+		grpc.WithContextDialer(opts.Dialer),
+	}
+
+	select {
+	case <-r.ctx.Done():
+		return nil, ResolverClosed
+	case r.reqC <- &request{
+		ds: &requestDialOptionsSet{
+			options: options,
+		},
+	}:
+	}
+
+	// Submit a subscription request to the resolver's processor, quitting if the
+	// resolver gets canceled in the meantime.
+
+	req := &request{
+		sub: &requestSubscribe{resC: make(chan *responseSubscribe)},
+	}
+	select {
+	case <-r.ctx.Done():
+		return nil, ResolverClosed
+	case r.reqC <- req:
+	}
+	// This receive is uninterruptible by contract - as it's also uninterruptible on
+	// the processor side.
+	subscription := <-req.sub.resC
+
+	watcher := &clientWatcher{
+		resolver:     r,
+		clientConn:   cc,
+		subscription: subscription,
+	}
+	go watcher.watch()
+
+	return watcher, nil
+}
+
+func (r *Resolver) Scheme() string {
+	return "metropolis"
+}
+
+func (w *clientWatcher) watch() {
+	// Craft a trivial gRPC service config which forces round-robin behaviour. This
+	// doesn't really matter for us, as we only ever submit the single leader as a
+	// connection endpoint.
+	svcConfig := w.clientConn.ParseServiceConfig(`{ "loadBalancingConfig": [{"round_robin": {}}]}`)
+
+	// Watch for leader to be updated.
+	for {
+		update := <-w.subscription.subC
+		if update == nil {
+			// A nil result means the channel is closed, which means this watcher has either
+			// closed or the resolver has been canceled. Abort loop.
+			w.clientConn.ReportError(ResolverClosed)
+			break
+		}
+		w.clientConn.UpdateState(resolver.State{
+			Addresses: []resolver.Address{
+				{
+					Addr:       update.endpoint.endpoint,
+					ServerName: update.nodeID,
+				},
+			},
+			ServiceConfig: svcConfig,
+		})
+	}
+}
+
+func (r *clientWatcher) ResolveNow(_ resolver.ResolveNowOptions) {
+	// No-op. The clientWatcher's watcher runs as fast as possible.
+}
+
+func (r *clientWatcher) Close() {
+	r.resolver.reqC <- &request{
+		unsub: &requestUnsubscribe{
+			id: r.subscription.id,
+		},
+	}
+}
diff --git a/metropolis/node/core/rpc/resolver_test.go b/metropolis/node/core/rpc/resolver_test.go
deleted file mode 100644
index 8231273..0000000
--- a/metropolis/node/core/rpc/resolver_test.go
+++ /dev/null
@@ -1,172 +0,0 @@
-package rpc
-
-import (
-	"context"
-	"crypto/tls"
-	"log"
-	"net"
-	"strings"
-	"testing"
-	"time"
-
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/credentials"
-
-	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	apb "source.monogon.dev/metropolis/proto/api"
-	cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-type testImplementationClusterAware struct {
-	ipb.UnimplementedCuratorServer
-	apb.UnimplementedAAAServer
-	apb.UnimplementedManagementServer
-
-	addresses map[string]string
-}
-
-func (t *testImplementationClusterAware) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (*apb.GetClusterInfoResponse, error) {
-	return &apb.GetClusterInfoResponse{}, nil
-}
-
-func (t *testImplementationClusterAware) Watch(_ *ipb.WatchRequest, srv ipb.Curator_WatchServer) error {
-	var nodes []*ipb.Node
-	for name, addr := range t.addresses {
-		nodes = append(nodes, &ipb.Node{
-			Id:    name,
-			Roles: &cpb.NodeRoles{ConsensusMember: &cpb.NodeRoles_ConsensusMember{}},
-			Status: &cpb.NodeStatus{
-				ExternalAddress: addr,
-			},
-		})
-	}
-	err := srv.Send(&ipb.WatchEvent{
-		Nodes: nodes,
-	})
-	if err != nil {
-		return err
-	}
-	<-srv.Context().Done()
-	return srv.Context().Err()
-}
-
-// TestResolverSimple exercises the happy path of the gRPC ResolverBuilder,
-// checking that a single node can be used to bootstrap multiple nodes from, and
-// ensuring that nodes are being dialed in a round-robin fashion.
-//
-// TODO(q3k): exercise node removal and re-dial of updater to another node.
-func TestResolverSimple(t *testing.T) {
-	ctx, ctxC := context.WithCancel(context.Background())
-	defer ctxC()
-
-	// Make three nodes for testing, each with its own bufconn listener.
-	numCurators := 3
-	eph := NewEphemeralClusterCredentials(t, numCurators)
-
-	listeners := make([]net.Listener, numCurators)
-	for i := 0; i < numCurators; i++ {
-		lis, err := net.Listen("tcp", "")
-		if err != nil {
-			t.Fatalf("Listen failed: %v", err)
-		}
-		listeners[i] = lis
-	}
-
-	addresses := make(map[string]string)
-	for i, lis := range listeners {
-		name := eph.Nodes[i].ID()
-		addresses[name] = lis.Addr().String()
-	}
-	impls := make([]*testImplementationClusterAware, numCurators)
-	for i := 0; i < numCurators; i++ {
-		impls[i] = &testImplementationClusterAware{
-			addresses: addresses,
-		}
-	}
-
-	servers := make([]*grpc.Server, numCurators)
-	for i := 0; i < numCurators; i++ {
-		i := i
-		ss := ServerSecurity{
-			NodeCredentials: eph.Nodes[i],
-		}
-		servers[i] = grpc.NewServer(ss.GRPCOptions(nil)...)
-		ipb.RegisterCuratorServer(servers[i], impls[i])
-		apb.RegisterAAAServer(servers[i], impls[i])
-		apb.RegisterManagementServer(servers[i], impls[i])
-		go func() {
-			if err := servers[i].Serve(listeners[i]); err != nil {
-				t.Fatalf("GRPC serve failed: %v", err)
-			}
-		}()
-
-		defer listeners[i].Close()
-		defer servers[i].Stop()
-	}
-
-	r := NewClusterResolver()
-	r.logger = func(f string, args ...interface{}) {
-		log.Printf(f, args...)
-	}
-	defer r.Close()
-
-	creds := credentials.NewTLS(&tls.Config{
-		Certificates:          []tls.Certificate{eph.Manager},
-		InsecureSkipVerify:    true,
-		VerifyPeerCertificate: verifyClusterCertificate(eph.CA),
-	})
-	cl, err := grpc.Dial("metropolis:///control", grpc.WithTransportCredentials(creds), grpc.WithResolvers(r))
-	if err != nil {
-		t.Fatalf("Could not dial: %v", err)
-	}
-
-	// Add first node to bootstrap node information from.
-	r.AddNode(eph.Nodes[0].ID(), listeners[0].Addr().String())
-
-	mgmt := apb.NewManagementClient(cl)
-	_, err = mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
-	if err != nil {
-		t.Fatalf("Running initial GetClusterInfo failed: %v", err)
-	}
-
-	// Wait until client finds all three nodes.
-	r.condCurators.L.Lock()
-	for len(r.curators) < 3 {
-		r.condCurators.Wait()
-	}
-	curators := r.curators
-	r.condCurators.L.Unlock()
-
-	// Ensure the three nodes as are expected.
-	for i, node := range eph.Nodes {
-		if got, want := curators[node.ID()], listeners[i].Addr().String(); want != got {
-			t.Errorf("Node %s: wanted address %q, got %q", node.ID(), want, got)
-		}
-	}
-
-	// Stop first node, make sure the call now reaches the other servers. This will
-	// happen due to the resolver's round-robin behaviour, not because this node is
-	// dropped from the active set of nodes.
-	servers[0].Stop()
-	listeners[0].Close()
-
-	for i := 0; i < 10; i++ {
-		_, err = mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
-		if err == nil {
-			break
-		}
-		time.Sleep(time.Second)
-	}
-	if err != nil {
-		t.Errorf("Running GetClusterInfo after stopping first node failed: %v", err)
-	}
-
-	// Close the builder, new dials should fail.
-	r.Close()
-	_, err = grpc.Dial(MetropolisControlAddress, grpc.WithTransportCredentials(creds), grpc.WithResolvers(r), grpc.WithBlock())
-	// String comparison required because the underlying gRPC code does not wrap the
-	// error.
-	if want, got := ResolverClosed, err; !strings.Contains(got.Error(), want.Error()) {
-		t.Errorf("Unexpected dial error after closing builder, wanted %q, got %q", want, got)
-	}
-}