m/n/c/rpc: make resolver leader-aware
This is a fairly large change which makes the resolver only contact the
current leader of the control plane, not all nodes in a round-robin
fashion.
This resolver isn't yet used by the cluster, the tests, or metroctl -
but that will come in upcoming CLs.
We also move the resolver to a subpackage of rpc, in preparation for
moving it into a package path designed to be depended upon by users.
Change-Id: I230853bbf552fbde947de05f95de37cea93a168c
Reviewed-on: https://review.monogon.dev/c/monogon/+/795
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/rpc/BUILD.bazel b/metropolis/node/core/rpc/BUILD.bazel
index e4ff25f..c530a65 100644
--- a/metropolis/node/core/rpc/BUILD.bazel
+++ b/metropolis/node/core/rpc/BUILD.bazel
@@ -6,7 +6,6 @@
"client.go",
"methodinfo.go",
"peerinfo.go",
- "resolver.go",
"server.go",
"server_authentication.go",
"testhelpers.go",
@@ -15,18 +14,15 @@
importpath = "source.monogon.dev/metropolis/node/core/rpc",
visibility = ["//visibility:public"],
deps = [
- "//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/identity",
"//metropolis/pkg/logtree",
"//metropolis/pkg/pki",
"//metropolis/proto/api",
"//metropolis/proto/ext",
- "@com_github_cenkalti_backoff_v4//:backoff",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//peer",
- "@org_golang_google_grpc//resolver",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//encoding/prototext",
"@org_golang_google_protobuf//proto",
@@ -38,7 +34,6 @@
go_test(
name = "rpc_test",
srcs = [
- "resolver_test.go",
"server_authentication_test.go",
"trace_test.go",
],
@@ -47,11 +42,9 @@
"//metropolis/node/core/curator/proto/api",
"//metropolis/pkg/logtree",
"//metropolis/proto/api",
- "//metropolis/proto/common",
"//metropolis/proto/ext",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
- "@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//status",
"@org_golang_google_grpc//test/bufconn",
],
diff --git a/metropolis/node/core/rpc/resolver.go b/metropolis/node/core/rpc/resolver.go
deleted file mode 100644
index d4e9dff..0000000
--- a/metropolis/node/core/rpc/resolver.go
+++ /dev/null
@@ -1,352 +0,0 @@
-package rpc
-
-import (
- "context"
- "errors"
- "fmt"
- "sync"
- "time"
-
- "github.com/cenkalti/backoff/v4"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/resolver"
-
- cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-)
-
-const (
- MetropolisControlAddress = "metropolis:///control"
-)
-
-// ClusterResolver is a gRPC resolver Builder that can be passed to
-// grpc.WithResolvers() when dialing a gRPC endpoint.
-//
-// It's responsible for resolving the magic MetropolisControlAddress
-// (metropolis:///control) into all Metropolis nodes running control plane
-// services, ie. the Curator.
-//
-// To function, the ClusterResolver needs to be provided with at least one node
-// address. Afterwards, it will continuously update an internal list of nodes
-// which can be contacted for access to control planes services, and gRPC
-// clients using this resolver will automatically try the available addresses
-// for each RPC call in a round-robin fashion.
-//
-// The ClusterResolver is designed to be used as a long-running objects which
-// multiple gRPC client connections can use. Usually one ClusterResolver
-// instance should be used per application.
-type ClusterResolver struct {
- ctx context.Context
- ctxC context.CancelFunc
-
- // logger, if set, will be called with fmt.Sprintf-like arguments containing
- // debug logs from the running ClusterResolver, subordinate watchers and
- // updaters.
- logger func(f string, args ...interface{})
-
- condCurators *sync.Cond
- curators map[string]string
- condTLSConfig *sync.Cond
- tlsConfig credentials.TransportCredentials
-}
-
-// AddNode provides a given node ID at a given address as an initial (or
-// additional) node for the ClusterResolver to update cluster information
-// from.
-func (b *ClusterResolver) AddNode(name, remote string) {
- b.condCurators.L.Lock()
- defer b.condCurators.L.Unlock()
-
- b.curators[name] = remote
- b.condCurators.Broadcast()
-}
-
-// NewClusterResolver creates an empty ClusterResolver. It must be populated
-// with initial node information for any gRPC call that uses it to succeed.
-func NewClusterResolver() *ClusterResolver {
- ctx, ctxC := context.WithCancel(context.Background())
- b := &ClusterResolver{
- ctx: ctx,
- ctxC: ctxC,
- logger: func(f string, args ...interface{}) {},
- condCurators: sync.NewCond(&sync.Mutex{}),
- curators: make(map[string]string),
- condTLSConfig: sync.NewCond(&sync.Mutex{}),
- }
-
- go b.run(b.ctx)
-
- return b
-}
-
-var (
- ResolverClosed = errors.New("cluster resolver closed")
-)
-
-// Close the ClusterResolver to clean up background goroutines. The node address
-// resolution process stops and all future connections done via this
-// ClusterResolver will continue to use whatever node addresses were last known.
-// However, new attempts to dial using this ClusterResolver will fail.
-func (b *ClusterResolver) Close() {
- b.ctxC()
-}
-
-// run is the main loop of the ClusterResolver. Its job is to wait for a TLS
-// config from a gRPC client, and iterate through available node addresses to
-// start an updater on. The updater will then communicate back to this goroutine
-// with up to date node information. In case an updater cannot run anymore (eg.
-// a node stopped working), the main loop of run restarts and another endpoint
-// will be picked.
-func (b *ClusterResolver) run(ctx context.Context) {
- bo := backoff.NewExponentialBackOff()
- bo.MaxElapsedTime = 0
-
- // Helper to update internal node list and notify all gRPC clients of it.
- updateCurators := func(nodes map[string]string) {
- b.condCurators.L.Lock()
- b.curators = nodes
- b.condCurators.L.Unlock()
- b.condCurators.Broadcast()
- }
-
- // Helper to sleep for a given time, but with possible interruption by the
- // resolver being stopped.
- waitTimeout := func(t time.Duration) bool {
- select {
- case <-time.After(t):
- return true
- case <-ctx.Done():
- return false
- }
- }
-
- for {
- b.logger("RESOLVER: waiting for TLS config...")
- // Wait for a TLS config to be set.
- b.condTLSConfig.L.Lock()
- for b.tlsConfig == nil {
- b.condTLSConfig.Wait()
- }
- creds := b.tlsConfig
- b.condTLSConfig.L.Unlock()
- b.logger("RESOLVER: have TLS config...")
-
- // Iterate over endpoints to find a working one, and retrieve cluster-provided
- // node info from there.
- endpoints := b.addresses()
- if len(endpoints) == 0 {
- w := bo.NextBackOff()
- b.logger("RESOLVER: no endpoints, waiting %s...", w)
- if waitTimeout(w) {
- b.logger("RESOLVER: canceled")
- return
- }
- continue
- }
-
- b.logger("RESOLVER: starting endpoint loop with %v...", endpoints)
- for name, endpoint := range endpoints {
- upC := make(chan map[string]string)
- b.logger("RESOLVER: starting updater pointed at %s/%s", name, endpoint)
-
- // Start updater, which actually connects to the endpoint and provides back the
- // newest set of nodes via upC.
- go b.runUpdater(ctx, endpoint, creds, upC)
-
- // Keep using this updater as long as possible. If it fails, restart the main
- // loop.
- failed := false
- for {
- var newNodes map[string]string
- failed := false
- select {
- case newNodes = <-upC:
- if newNodes == nil {
- // Updater quit.
- failed = true
- }
- case <-ctx.Done():
- b.logger("RESOLVER: canceled")
- updateCurators(nil)
- return
- }
-
- if failed {
- w := bo.NextBackOff()
- b.logger("RESOLVER: updater failed, waiting %s...", w)
- if waitTimeout(w) {
- b.logger("RESOLVER: canceled")
- return
- }
- b.logger("RESOLVER: done waiting")
- break
- } else {
- bo.Reset()
- updateCurators(newNodes)
- }
-
- }
- // Restart entire ClusterResolver loop on failure.
- if failed {
- break
- }
- }
- }
-}
-
-// runUpdaters runs the ClusterResolver's updater, which is a goroutine that
-// connects to a Curator running on a given node and feeds back information
-// about consensus members via updateC. If the endpoints fails (eg. because the
-// node went down), updateC will be closed.
-func (b *ClusterResolver) runUpdater(ctx context.Context, endpoint string, creds credentials.TransportCredentials, updateC chan map[string]string) {
- defer close(updateC)
- cl, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(creds))
- if err != nil {
- b.logger("UPDATER: dial failed: %v", err)
- return
- }
- defer cl.Close()
- cur := cpb.NewCuratorClient(cl)
- w, err := cur.Watch(ctx, &cpb.WatchRequest{
- Kind: &cpb.WatchRequest_NodesInCluster_{
- NodesInCluster: &cpb.WatchRequest_NodesInCluster{},
- },
- })
- if err != nil {
- b.logger("UPDATER: watch failed: %v", err)
- return
- }
-
- // Maintain a long-term set of node ID to node external address, and populate it
- // from the Curator Watcher above.
- nodes := make(map[string]string)
- for {
- ev, err := w.Recv()
- if err != nil {
- b.logger("UPDATER: recv failed: %v", err)
- return
- }
- for _, node := range ev.Nodes {
- if node.Roles.ConsensusMember == nil {
- delete(nodes, node.Id)
- continue
- }
- st := node.Status
- if st == nil || st.ExternalAddress == "" {
- delete(nodes, node.Id)
- continue
- }
- nodes[node.Id] = st.ExternalAddress
- }
- for _, node := range ev.NodeTombstones {
- delete(nodes, node.NodeId)
- }
- b.logger("UPDATER: new nodes: %v", nodes)
- updateC <- nodes
- }
-}
-
-// addresses returns the current set of node addresses that the ClusterResolver
-// considers as possible updater candidates.
-func (b *ClusterResolver) addresses() map[string]string {
- b.condCurators.L.Lock()
- defer b.condCurators.L.Unlock()
-
- res := make(map[string]string)
- for k, v := range b.curators {
- res[k] = v
- }
- return res
-}
-
-// Build is called by gRPC on each Dial call. It spawns a new clientWatcher,
-// whose goroutine receives information about currently available nodes from the
-// parent ClusterResolver and actually updates a given gRPC client connection
-// with information about the current set of nodes.
-func (b *ClusterResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
- // We can only connect to "metropolis://control".
- if target.Scheme != "metropolis" || target.Authority != "" || target.Endpoint != "control" {
- return nil, fmt.Errorf("invalid target: must be %s, is: %s", MetropolisControlAddress, target.Endpoint)
- }
-
- if opts.DialCreds == nil {
- return nil, fmt.Errorf("can only be used with clients containing TransportCredentials")
- }
-
- if b.ctx.Err() != nil {
- return nil, ResolverClosed
- }
-
- b.condTLSConfig.L.Lock()
- // TODO(q3k): make sure we didn't receive different DialCreds for a different
- // cluster or something.
- b.tlsConfig = opts.DialCreds
- b.condTLSConfig.Broadcast()
- defer b.condTLSConfig.L.Unlock()
-
- ctx, ctxC := context.WithCancel(b.ctx)
- resolver := &clientWatcher{
- builder: b,
- clientConn: cc,
- ctx: ctx,
- ctxC: ctxC,
- }
- go resolver.watch()
- return resolver, nil
-}
-
-func (b *ClusterResolver) Scheme() string {
- return "metropolis"
-}
-
-// clientWatcher is a subordinate structure to a given ClusterResolver,
-// updating a gRPC ClientConn with information about current endpoints.
-type clientWatcher struct {
- builder *ClusterResolver
- clientConn resolver.ClientConn
-
- ctx context.Context
- ctxC context.CancelFunc
-}
-
-func (r *clientWatcher) watch() {
- // Craft a trivial gRPC service config which forces round-robin behaviour for
- // RPCs. This makes the gRPC client contact all curators in a round-robin
- // fashion. Ideally, we would prioritize contacting the leader, but this will do
- // for now.
- svcConfig := r.clientConn.ParseServiceConfig(`{ "loadBalancingConfig": [{"round_robin": {}}]}`)
-
- // Watch for condCurators being updated.
- r.builder.condCurators.L.Lock()
- for {
- if r.ctx.Err() != nil {
- return
- }
-
- nodes := r.builder.curators
- var addresses []resolver.Address
- for n, addr := range nodes {
- addresses = append(addresses, resolver.Address{
- Addr: addr,
- ServerName: n,
- })
- }
- r.builder.logger("WATCHER: new addresses: %v", addresses)
- r.clientConn.UpdateState(resolver.State{
- Addresses: addresses,
- ServiceConfig: svcConfig,
- })
- r.builder.condCurators.Wait()
- }
-}
-
-func (r *clientWatcher) ResolveNow(_ resolver.ResolveNowOptions) {
- // No-op. The clientWatcher's watcher runs as fast as possible.
-}
-
-func (r *clientWatcher) Close() {
- r.ctxC()
- // Spuriously interrupt all clientWatchers on this ClusterResolver so that this
- // clientWatcher gets to notice it should quit. This isn't ideal.
- r.builder.condCurators.Broadcast()
-}
diff --git a/metropolis/node/core/rpc/resolver/BUILD.bazel b/metropolis/node/core/rpc/resolver/BUILD.bazel
new file mode 100644
index 0000000..092ac19
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver/BUILD.bazel
@@ -0,0 +1,35 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "resolver",
+ srcs = [
+ "processor.go",
+ "resolver.go",
+ "watcher.go",
+ ],
+ importpath = "source.monogon.dev/metropolis/node/core/rpc/resolver",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//metropolis/node",
+ "//metropolis/node/core/curator/proto/api",
+ "//metropolis/proto/common",
+ "@com_github_cenkalti_backoff_v4//:backoff",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//resolver",
+ ],
+)
+
+go_test(
+ name = "resolver_test",
+ srcs = ["resolver_test.go"],
+ embed = [":resolver"],
+ deps = [
+ "//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/rpc",
+ "//metropolis/proto/api",
+ "//metropolis/proto/common",
+ "@com_github_cenkalti_backoff_v4//:backoff",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//credentials",
+ ],
+)
diff --git a/metropolis/node/core/rpc/resolver/processor.go b/metropolis/node/core/rpc/resolver/processor.go
new file mode 100644
index 0000000..08f56b5
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver/processor.go
@@ -0,0 +1,281 @@
+package resolver
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "sort"
+
+ "google.golang.org/grpc"
+
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// request contains all possible requests passed to the processor. Only one
+// field can be set at a time. See the documentation of member structures for
+// more information about the possible requests.
+type request struct {
+ cmg *requestCuratorMapGet
+ nu *requestNodesUpdate
+ sa *requestSeedAdd
+ oa *requestOverrideAdd
+ lu *requestLeaderUpdate
+ ds *requestDialOptionsSet
+ sub *requestSubscribe
+ unsub *requestUnsubscribe
+}
+
+// requestCuratorMapGet is received from any subsystem which wants to
+// immediately receive the current curatorMap as seen by the processor.
+type requestCuratorMapGet struct {
+ // resC carries the current curatorMap. It must be drained by the caller,
+ // otherwise the processor will get stuck.
+ resC chan *curatorMap
+}
+
+// requestNodesUpdate is received from the curator updater, and carries
+// information about the current curators as seen by the cluster control plane.
+type requestNodesUpdate struct {
+ // nodes is a map from node ID to received status
+ nodes map[string]*cpb.NodeStatus
+}
+
+// requestSeedAdd is received from AddEndpoint calls. It updates the processor's
+// curator map with the given seed.
+type requestSeedAdd struct {
+ endpoint *NodeEndpoint
+}
+
+// requestOverrideAdd is received from AddOverride calls. It updates the
+// processor's curator map with the given override.
+type requestOverrideAdd struct {
+ nodeID string
+ endpoint *NodeEndpoint
+}
+
+// requestLeaderUpdate is received from the leader watcher whenever a new leader
+// is found from any curator.
+type requestLeaderUpdate struct {
+ nodeID string
+ endpoint *NodeEndpoint
+}
+
+// requestDialOptionsSet is received from any subordinate watchers when a client
+// connects with the given dial options. The processor will use the first
+// options received this way to establish connectivity to curators.
+type requestDialOptionsSet struct {
+ options []grpc.DialOption
+}
+
+// requestSubscribe is received from subordinate watchers. The processor will
+// then create a subscription channel that will be populated with updates about
+// the current leader.
+type requestSubscribe struct {
+ resC chan *responseSubscribe
+}
+
+type responseSubscribe struct {
+ // id is the ID of the subscription, used to cancel the subscription by the
+ // subscriber.
+ id int64
+ // subC carries updates about the current leader. The subscriber must drain the
+ // updates as fast as possible, otherwise the processing loop will be stopped.
+ subC chan *update
+}
+
+type update struct {
+ // node ID of the current leader.
+ nodeID string
+ // endpoint of the current leader.
+ endpoint *NodeEndpoint
+}
+
+// requestUnsubscribe is received from subordinate watcher to cancel a given
+// subscription.
+type requestUnsubscribe struct {
+ id int64
+}
+
+// run the resolver's processor, which is the main processing loop. It received
+// updates from users, watchers, the curator updater and the leader updater.
+func (r *Resolver) run(ctx context.Context) error {
+ // Current curator map.
+ curMap := newCuratorMap()
+
+ // Current leader.
+ var leader *requestLeaderUpdate
+
+ // Subscribers.
+ subscribers := make(map[int64]chan *update)
+ subscriberIDNext := int64(0)
+
+ // Whether the curator updater and leader updater have been started. This is
+ // only done once we receive dial options from a watcher.
+ running := false
+
+ for {
+ // Receive a request. Quit if our context gets canceled in the meantime.
+ var req *request
+ select {
+ case <-ctx.Done():
+ // Close all subscription channels, ensuring all the watchers get notified that
+ // the resolver has closed.
+ for _, subC := range subscribers {
+ close(subC)
+ }
+ return ctx.Err()
+ case req = <-r.reqC:
+ }
+
+ // Dispatch request.
+ switch {
+ case req.cmg != nil:
+ // Curator Map Get
+ req.cmg.resC <- curMap
+ case req.nu != nil:
+ // Nodes Update
+ for nid, status := range req.nu.nodes {
+ // Skip nodes which aren't running the curator right now.
+ if status == nil || status.RunningCurator == nil {
+ continue
+ }
+ addr := net.JoinHostPort(status.ExternalAddress, fmt.Sprintf("%d", status.RunningCurator.Port))
+ if a, ok := curMap.overrides[nid]; ok {
+ addr = a.endpoint
+ }
+
+ curMap.curators[nid] = &NodeEndpoint{
+ endpoint: addr,
+ }
+ }
+ toDelete := make(map[string]bool)
+ for nid, _ := range curMap.curators {
+ if req.nu.nodes[nid] == nil {
+ toDelete[nid] = true
+ }
+ }
+ for nid, _ := range toDelete {
+ delete(curMap.curators, nid)
+ }
+ case req.sa != nil:
+ // Seed Add
+ curMap.seeds[req.sa.endpoint.endpoint] = true
+ case req.oa != nil:
+ // Override Add
+ curMap.overrides[req.oa.nodeID] = req.oa.endpoint
+ case req.lu != nil:
+ // Leader Update
+ leader = req.lu
+ for _, s := range subscribers {
+ s <- &update{
+ nodeID: leader.nodeID,
+ endpoint: leader.endpoint,
+ }
+ }
+ case req.ds != nil:
+ // Dial options Set
+ if !running {
+ go r.runCuratorUpdater(ctx, req.ds.options)
+ go r.runLeaderUpdater(ctx, req.ds.options)
+ }
+ running = true
+ case req.sub != nil:
+ // Subscribe
+ id := subscriberIDNext
+ subC := make(chan *update)
+ req.sub.resC <- &responseSubscribe{
+ id: id,
+ subC: subC,
+ }
+ subscriberIDNext++
+ subscribers[id] = subC
+
+ // Provide current leader if missing.
+ if leader != nil {
+ subC <- &update{
+ nodeID: leader.nodeID,
+ endpoint: leader.endpoint,
+ }
+ }
+ case req.unsub != nil:
+ // Unsubscribe
+ if subscribers[req.unsub.id] != nil {
+ close(subscribers[req.unsub.id])
+ delete(subscribers, req.unsub.id)
+ }
+ default:
+ panic(fmt.Sprintf("unhandled request: %+v", req))
+ }
+ }
+}
+
+// curatorMap is the main state of the cluster as seen by the resolver's processor.
+type curatorMap struct {
+ // curators is a map from node ID to endpoint of nodes that are currently
+ // running the curator. This is updated by the processor through the curator
+ // updater.
+ curators map[string]*NodeEndpoint
+ // overrides are user-provided node ID to endpoint overrides. They are applied
+ // to the curators in the curator map (above) and the leader information as
+ // retrieved by the leader updater.
+ overrides map[string]*NodeEndpoint
+ // seeds are endpoints provided by the user that the leader updater will use to
+ // bootstrap initial cluster connectivity.
+ seeds map[string]bool
+}
+
+func newCuratorMap() *curatorMap {
+ return &curatorMap{
+ curators: make(map[string]*NodeEndpoint),
+ overrides: make(map[string]*NodeEndpoint),
+ seeds: make(map[string]bool),
+ }
+}
+
+func (m *curatorMap) copy() *curatorMap {
+ res := newCuratorMap()
+ for k, v := range m.curators {
+ res.curators[k] = v
+ }
+ for k, v := range m.overrides {
+ res.overrides[k] = v
+ }
+ for k, v := range m.seeds {
+ res.seeds[k] = v
+ }
+ return res
+}
+
+// candidates returns the curator endpoints that should be used to attempt to
+// retrieve the current leader from. This is a combination of the curators
+// received by the curator updater, and the seeds provided by the user.
+func (m *curatorMap) candidates() []string {
+ resMap := make(map[string]bool)
+ for ep, _ := range m.seeds {
+ resMap[ep] = true
+ }
+ for nid, v := range m.curators {
+ if o, ok := m.overrides[nid]; ok {
+ resMap[o.endpoint] = true
+ } else {
+ resMap[v.endpoint] = true
+ }
+ }
+ var res []string
+ for ep, _ := range resMap {
+ res = append(res, ep)
+ }
+ sort.Strings(res)
+ return res
+}
+
+// curatorMap returns the current curator map as seen by the resolver processor.
+func (r *Resolver) curatorMap() *curatorMap {
+ req := &request{
+ cmg: &requestCuratorMapGet{
+ resC: make(chan *curatorMap),
+ },
+ }
+ r.reqC <- req
+ return <-req.cmg.resC
+}
diff --git a/metropolis/node/core/rpc/resolver/resolver.go b/metropolis/node/core/rpc/resolver/resolver.go
new file mode 100644
index 0000000..39a0124
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver/resolver.go
@@ -0,0 +1,341 @@
+package resolver
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net"
+ "regexp"
+ "strings"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "google.golang.org/grpc"
+
+ common "source.monogon.dev/metropolis/node"
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+const (
+ // MetropolisControlAddress is the address of the current Metropolis leader as
+ // accepted by the Resolver. Dialing a gRPC channel to this address while the
+ // Resolver is used will open the channel to the current leader of the
+ // Metropolis control plane.
+ MetropolisControlAddress = "metropolis:///control"
+)
+
+// Resolver is a gRPC resolver Builder that can be passed to
+// grpc.WithResolvers() when dialing a gRPC endpoint.
+//
+// It's responsible for resolving the magic MetropolisControlAddress
+// (metropolis:///control) into an address of the node that is currently the
+// leader of the cluster's control plane.
+//
+// To function, the ClusterResolver needs to be provided with at least one
+// control plane node address. It will use these addresses to retrieve the
+// address of the node which is the current leader of the control plane.
+//
+// Then, having established communication with the leader, it will continuously
+// update an internal set of control plane node endpoints (the curator map) that
+// will be contacted in the future about the state of the leadership when the
+// current leader fails over.
+//
+// The resolver will wait for a first gRPC connection established through it to
+// extract the transport credentials used, then use these credentials to call
+// the Curator and CuratorLocal services on control plane nodes to perform its
+// logic.
+//
+// This resolver is designed to be used as a long-running object which multiple
+// gRPC client connections can use. Usually one ClusterResolver instance should
+// be used per application.
+//
+// .------------------------. .--------------------------------------.
+// | Metropolis Cluster | | Resolver |
+// :------------------------: :--------------------------------------:
+// : : : :
+// : .--------------------. : : .----------------. :
+// : | curator (follower) |<---.---------| Leader Updater |------------. :
+// : '--------------------' : | : '----------------' | :
+// : .--------------------. : | : .------------------------. | :
+// : | curator (follower) |<---: : | Processor (CuratorMap) |<-.-'-. :
+// : '--------------------' : | : '------------------------' | | :
+// : .--------------------.<---' : .-----------------. | | :
+// : | curator (leader) |<-------------| Curator Updater |---------' | :
+// : '--------------------' : : '-----------------' | :
+// : : : | :
+// '------------------------' : .----------. | :
+// : | Watchers |-. | :
+// : '----------' |------------------' :
+// : '-^--------' :
+// : | ^ :
+// : | | :
+// .---------------.
+// | gRPC channels |
+// '---------------'
+type Resolver struct {
+ reqC chan *request
+ ctx context.Context
+
+ // logger, if set, will be called with fmt.Sprintf-like arguments containing
+ // debug logs from the running ClusterResolver, subordinate watchers and
+ // updaters.
+ logger func(f string, args ...interface{})
+}
+
+// SetLogger configures a given function as the logger of the resolver. The
+// function should take a printf-style format string and arguments.
+func (r *Resolver) SetLogger(logger func(f string, args ...interface{})) {
+ r.logger = logger
+}
+
+// New starts a new Resolver, ready to be used as a gRPC via WithResolvers.
+// However, it needs to be populated with at least one endpoint first (via
+// AddEndpoint).
+func New(ctx context.Context) *Resolver {
+ r := &Resolver{
+ reqC: make(chan *request),
+ ctx: ctx,
+ logger: func(string, ...interface{}) {},
+ }
+ go r.run(ctx)
+ return r
+}
+
+// NodeEndpoint is the gRPC endpoint (host+port) of a Metropolis control plane
+// node.
+type NodeEndpoint struct {
+ endpoint string
+}
+
+// NodeWithDefaultPort returns a NodeEndpoint referencing the default control
+// plane port (the Curator port) of a node resolved by its ID over DNS. This is
+// the easiest way to construct a NodeEndpoint provided DNS is fully set up.
+func NodeWithDefaultPort(id string) (*NodeEndpoint, error) {
+ if m, _ := regexp.MatchString(`metropolis-[a-f0-9]+`, id); !m {
+ return nil, fmt.Errorf("invalid node ID")
+ }
+ return NodeByHostPort(id, uint16(common.CuratorServicePort)), nil
+}
+
+// NodeByHostPort returns a NodeEndpoint for a fully specified host + port pair.
+// The host can either be a hostname or an IP address.
+func NodeByHostPort(host string, port uint16) *NodeEndpoint {
+ return &NodeEndpoint{
+ endpoint: net.JoinHostPort(host, fmt.Sprintf("%d", port)),
+ }
+}
+
+// nodeAtListener is used in tests to connect to the address of a given listener.
+func nodeAtListener(lis net.Listener) *NodeEndpoint {
+ return &NodeEndpoint{
+ endpoint: lis.Addr().String(),
+ }
+}
+
+// AddEndpoint tells the resolver that it should attempt to reach the cluster
+// through a node available at the given NodeEndpoint.
+//
+// The resolver will make use of this during the next leadership find routine,
+// but this node might then get overridden when the resolver retrieves the
+// newest set of Curators from the acquired leader.
+func (r *Resolver) AddEndpoint(endpoint *NodeEndpoint) {
+ select {
+ case <-r.ctx.Done():
+ return
+ case r.reqC <- &request{
+ sa: &requestSeedAdd{
+ endpoint: endpoint,
+ },
+ }:
+ }
+}
+
+// AddOverride adds a long-lived override which forces the resolver to assume
+// that a given node (by ID) is available at the given endpoint, instead of at
+// whatever endpoint is reported by the cluster. This should be used sparingly
+// outside the cluster, and is mostly designed so that nodes which connect to
+// themselves can do so over the loopback address instead of their (possibly
+// changing) external address.
+func (r *Resolver) AddOverride(id string, ep *NodeEndpoint) {
+ select {
+ case <-r.ctx.Done():
+ return
+ case r.reqC <- &request{
+ oa: &requestOverrideAdd{
+ nodeID: id,
+ endpoint: ep,
+ },
+ }:
+ }
+}
+
+// runCuratorUpdater runs the curator updater, noted in logs as CURUPDATE. It
+// uses the resolver itself to contact the current leader, retrieve all nodes
+// which are running a curator, and populate the processor's curator list in the
+// curatorMap. That curatorMap will then be used by the leader updater to find
+// the current leader.
+func (r *Resolver) runCuratorUpdater(ctx context.Context, opts []grpc.DialOption) error {
+ bo := backoff.NewExponentialBackOff()
+ bo.MaxElapsedTime = 0
+ bo.MaxInterval = 10 * time.Second
+
+ return backoff.RetryNotify(func() error {
+ opts = append(opts, grpc.WithResolvers(r))
+ cl, err := grpc.Dial(MetropolisControlAddress, opts...)
+ if err != nil {
+ // This generally shouldn't happen.
+ return fmt.Errorf("could not dial gRPC: %v", err)
+ }
+ defer cl.Close()
+
+ cur := apb.NewCuratorClient(cl)
+ w, err := cur.Watch(ctx, &apb.WatchRequest{
+ Kind: &apb.WatchRequest_NodesInCluster_{
+ NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("could not watch nodes: %v", err)
+ }
+
+ // Map from node ID to status.
+ nodes := make(map[string]*cpb.NodeStatus)
+
+ // Keep updating map from watcher.
+ for {
+ ev, err := w.Recv()
+ if err != nil {
+ return fmt.Errorf("when receiving node: %w", err)
+ }
+ bo.Reset()
+
+ // Update internal map.
+ for _, n := range ev.Nodes {
+ nodes[n.Id] = n.Status
+ }
+ for _, n := range ev.NodeTombstones {
+ delete(nodes, n.NodeId)
+ }
+
+ // Make a copy, this time only curators.
+ curators := make(map[string]*cpb.NodeStatus)
+ var curatorNames []string
+ for k, v := range nodes {
+ if v == nil || v.RunningCurator == nil {
+ continue
+ }
+ curators[k] = v
+ curatorNames = append(curatorNames, k)
+ }
+ r.logger("CURUPDATE: got new curators: %s", strings.Join(curatorNames, ", "))
+
+ select {
+ case r.reqC <- &request{nu: &requestNodesUpdate{nodes: curators}}:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ }, backoff.WithContext(bo, ctx), func(err error, t time.Duration) {
+ r.logger("CURUPDATE: error in loop: %v", err)
+ r.logger("CURUPDATE: retrying in %s...", t.String())
+ })
+}
+
+// runLeaderUpdater runs the leader updater, noted in logs as FINDLEADER and
+// WATCHLEADER. It uses the curator map from the resolver processor to find the
+// current leader.
+func (r *Resolver) runLeaderUpdater(ctx context.Context, opts []grpc.DialOption) error {
+ bo := backoff.NewExponentialBackOff()
+ bo.MaxElapsedTime = 0
+ bo.MaxInterval = 10 * time.Second
+
+ return backoff.RetryNotify(func() error {
+ curMap := r.curatorMap()
+ for _, endpoint := range curMap.candidates() {
+ ok := r.watchLeaderVia(ctx, endpoint, opts)
+ if ok {
+ bo.Reset()
+ }
+ }
+ return fmt.Errorf("out of endpoints")
+ }, backoff.WithContext(bo, ctx), func(err error, t time.Duration) {
+ r.logger("FINDLEADER: error in loop: %v, retrying in %s...", err, t.String())
+ })
+}
+
+// watchLeaderVia connects to the endpoint defined by 'via' and attempts to
+// continuously update the current leader (b.leader) based on data returned from
+// it. Whenever new information about a leader is available, b.condLeader is
+// updated.
+//
+// A boolean value is returned indicating whether the update was at all
+// successful. This is used by retry logic to figure out whether to wait before
+// retrying or not.
+func (r *Resolver) watchLeaderVia(ctx context.Context, via string, opts []grpc.DialOption) bool {
+ cl, err := grpc.Dial(via, opts...)
+ if err != nil {
+ r.logger("WATCHLEADER: dialing %s failed: %v", via, err)
+ return false
+ }
+ defer cl.Close()
+ cpl := apb.NewCuratorLocalClient(cl)
+
+ cur, err := cpl.GetCurrentLeader(ctx, &apb.GetCurrentLeaderRequest{})
+ if err != nil {
+ r.logger("WATCHLEADER: failed to retrieve current leader from %s: %v", via, err)
+ return false
+ }
+ ok := false
+ for {
+ leaderInfo, err := cur.Recv()
+ if err == io.EOF {
+ r.logger("WATCHLEADER: connection with %s closed", via)
+ return ok
+ }
+ if err != nil {
+ r.logger("WATCHLEADER: connection with %s failed: %v", via, err)
+ return ok
+ }
+
+ curMap := r.curatorMap()
+
+ viaID := leaderInfo.ThisNodeId
+ if viaID == "" {
+ // This shouldn't happen, but let's handle this just in case
+ viaID = fmt.Sprintf("UNKNOWN NODE ID (%s)", via)
+ }
+
+ if leaderInfo.LeaderNodeId == "" {
+ r.logger("WATCHLEADER: %s does not know the leader, trying next", viaID)
+ return false
+ }
+ endpoint := ""
+ if leaderInfo.LeaderHost == "" {
+ // This node knows the leader, but doesn't know its host. Perhaps we have an
+ // override for this?
+ if ep, ok := curMap.overrides[leaderInfo.LeaderNodeId]; ok {
+ endpoint = ep.endpoint
+ }
+ } else {
+ if leaderInfo.LeaderPort == 0 {
+ r.logger("WATCHLEADER: %s knows the leader's host (%s), but not its' port", viaID, leaderInfo.LeaderHost)
+ return false
+ }
+ endpoint = net.JoinHostPort(leaderInfo.LeaderHost, fmt.Sprintf("%d", leaderInfo.LeaderPort))
+ }
+
+ r.logger("WATCHLEADER: got new leader: %s (%s) via %s", leaderInfo.LeaderNodeId, endpoint, viaID)
+
+ select {
+ case <-ctx.Done():
+ return ok
+ case r.reqC <- &request{lu: &requestLeaderUpdate{
+ nodeID: leaderInfo.LeaderNodeId,
+ endpoint: &NodeEndpoint{endpoint: endpoint},
+ }}:
+ }
+
+ ok = true
+ }
+}
diff --git a/metropolis/node/core/rpc/resolver/resolver_test.go b/metropolis/node/core/rpc/resolver/resolver_test.go
new file mode 100644
index 0000000..0de45e1
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver/resolver_test.go
@@ -0,0 +1,244 @@
+package resolver
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "log"
+ "net"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/rpc"
+ apb "source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// fakeCuratorClusterAware is a fake curator implementation that has a vague
+// concept of a cluster and the current leader within a cluster. Every instance
+// of a fakeCuratorClusterAware is backed by a net.Listener, and is aware of
+// the other implementations' net.Listeners.
+type fakeCuratorClusterAware struct {
+ ipb.UnimplementedCuratorServer
+ apb.UnimplementedAAAServer
+ apb.UnimplementedManagementServer
+
+ // mu guards all the other fields of this struct.
+ mu sync.Mutex
+ // listeners is the collection of all listeners that make up this cluster, keyed
+ // by node ID.
+ listeners map[string]net.Listener
+ // thisNode is the node ID of this fake.
+ thisNode string
+ // leader is the node ID of the leader of the cluster.
+ leader string
+}
+
+// Watch implements a minimum Watch which just returns all nodes at once.
+func (t *fakeCuratorClusterAware) Watch(_ *ipb.WatchRequest, srv ipb.Curator_WatchServer) error {
+ var nodes []*ipb.Node
+ for name, listener := range t.listeners {
+ addr := listener.Addr().String()
+ host, port, _ := net.SplitHostPort(addr)
+ portNum, _ := strconv.ParseUint(port, 10, 16)
+
+ nodes = append(nodes, &ipb.Node{
+ Id: name,
+ Roles: &cpb.NodeRoles{ConsensusMember: &cpb.NodeRoles_ConsensusMember{}},
+ Status: &cpb.NodeStatus{
+ ExternalAddress: host,
+ RunningCurator: &cpb.NodeStatus_RunningCurator{
+ Port: int32(portNum),
+ },
+ },
+ })
+ }
+ err := srv.Send(&ipb.WatchEvent{
+ Nodes: nodes,
+ })
+ if err != nil {
+ return err
+ }
+ <-srv.Context().Done()
+ return srv.Context().Err()
+}
+
+// GetCurrentLeader returns this fake cluster's current leader, based on
+// thisNode and leader from the struct.
+func (t *fakeCuratorClusterAware) GetCurrentLeader(req *ipb.GetCurrentLeaderRequest, srv ipb.CuratorLocal_GetCurrentLeaderServer) error {
+ ctx := srv.Context()
+
+ t.mu.Lock()
+ leaderName := t.leader
+ thisNode := t.thisNode
+ t.mu.Unlock()
+ leader := t.listeners[leaderName]
+
+ host, port, _ := net.SplitHostPort(leader.Addr().String())
+ portNum, _ := strconv.ParseUint(port, 10, 16)
+ srv.Send(&ipb.GetCurrentLeaderResponse{
+ LeaderNodeId: leaderName,
+ LeaderHost: host,
+ LeaderPort: int32(portNum),
+ ThisNodeId: thisNode,
+ })
+ <-ctx.Done()
+ return ctx.Err()
+}
+
+// TestResolverSimple exercises the happy path of the gRPC ResolverBuilder,
+// checking that a single node can be used to bootstrap multiple nodes from, and
+// ensuring that nodes are being dialed in a round-robin fashion.
+//
+// TODO(q3k): exercise node removal and re-dial of updater to another node.
+func TestResolverSimple(t *testing.T) {
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ // Make three nodes for testing, each with its own bufconn listener.
+ numCurators := 3
+ eph := rpc.NewEphemeralClusterCredentials(t, numCurators)
+
+ listeners := make([]net.Listener, numCurators)
+ for i := 0; i < numCurators; i++ {
+ lis, err := net.Listen("tcp", "")
+ if err != nil {
+ t.Fatalf("Listen failed: %v", err)
+ }
+ listeners[i] = lis
+ }
+
+ // Make fakeCuratorClusterAware implementations for every node.
+ listenerMap := make(map[string]net.Listener)
+ for i, lis := range listeners {
+ log.Printf("Test listener: %s", lis.Addr().String())
+ name := eph.Nodes[i].ID()
+ listenerMap[name] = lis
+ }
+ impls := make([]*fakeCuratorClusterAware, numCurators)
+
+ for i := 0; i < numCurators; i++ {
+ impls[i] = &fakeCuratorClusterAware{
+ listeners: listenerMap,
+ leader: eph.Nodes[0].ID(),
+ thisNode: eph.Nodes[i].ID(),
+ }
+ }
+
+ // Make gRPC servers for every node.
+ servers := make([]*grpc.Server, numCurators)
+ for i := 0; i < numCurators; i++ {
+ i := i
+ ss := rpc.ServerSecurity{
+ NodeCredentials: eph.Nodes[i],
+ }
+ servers[i] = grpc.NewServer(ss.GRPCOptions(nil)...)
+ ipb.RegisterCuratorServer(servers[i], impls[i])
+ apb.RegisterAAAServer(servers[i], impls[i])
+ apb.RegisterManagementServer(servers[i], impls[i])
+ ipb.RegisterCuratorLocalServer(servers[i], impls[i])
+ go func() {
+ if err := servers[i].Serve(listeners[i]); err != nil {
+ t.Fatalf("GRPC serve failed: %v", err)
+ }
+ }()
+
+ defer listeners[i].Close()
+ defer servers[i].Stop()
+ }
+
+ // Create our DUT resolver.
+ r := New(ctx)
+ r.logger = func(f string, args ...interface{}) {
+ log.Printf(f, args...)
+ }
+
+ creds := credentials.NewTLS(&tls.Config{
+ Certificates: []tls.Certificate{eph.Manager},
+ InsecureSkipVerify: true,
+ })
+ cl, err := grpc.Dial("metropolis:///control", grpc.WithTransportCredentials(creds), grpc.WithResolvers(r))
+ if err != nil {
+ t.Fatalf("Could not dial: %v", err)
+ }
+
+ // Test logic follows.
+
+ // Add first node to bootstrap node information from.
+ r.AddEndpoint(nodeAtListener(listeners[0]))
+
+ // The first node should be answering.
+ cpl := ipb.NewCuratorLocalClient(cl)
+ srv, err := cpl.GetCurrentLeader(ctx, &ipb.GetCurrentLeaderRequest{})
+ if err != nil {
+ t.Fatalf("GetCurrentLeader: %v", err)
+ }
+ leader, err := srv.Recv()
+ if err != nil {
+ t.Fatalf("GetCurrentLeader.Recv: %v", err)
+ }
+ if want, got := eph.Nodes[0].ID(), leader.ThisNodeId; want != got {
+ t.Fatalf("Expected node %q to answer (current leader), got answer from %q", want, got)
+ }
+
+ // Wait for all curators to be picked up by the resolver.
+ bo := backoff.NewExponentialBackOff()
+ bo.MaxInterval = time.Second
+ bo.MaxElapsedTime = 10 * time.Second
+ err = backoff.Retry(func() error {
+ req := &request{
+ cmg: &requestCuratorMapGet{
+ resC: make(chan *curatorMap),
+ },
+ }
+ r.reqC <- req
+ cm := <-req.cmg.resC
+ if len(cm.curators) == 3 {
+ return nil
+ }
+ return fmt.Errorf("have %d leaders, wanted 3", len(cm.curators))
+ }, bo)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Move leadership to second node, resolver should follow.
+ servers[0].Stop()
+ for _, impl := range impls {
+ impl.mu.Lock()
+ }
+ for _, impl := range impls {
+ impl.leader = eph.Nodes[1].ID()
+ }
+ for _, impl := range impls {
+ impl.mu.Unlock()
+ }
+
+ // Give it a few attempts. This isn't time bound, we _expect_ the resolver to
+ // move over quickly to the correct leader.
+ for i := 0; i < 3; i += 1 {
+ srv, err = cpl.GetCurrentLeader(ctx, &ipb.GetCurrentLeaderRequest{})
+ if err != nil {
+ time.Sleep(time.Second)
+ continue
+ }
+ leader, err = srv.Recv()
+ if err != nil {
+ continue
+ }
+ if want, got := eph.Nodes[1].ID(), leader.ThisNodeId; want != got {
+ t.Fatalf("Expected node %q to answer (current leader), got answer from %q", want, got)
+ }
+ break
+ }
+ if err != nil {
+ t.Fatalf("GetCurrentLeader after leadership change: %v", err)
+ }
+}
diff --git a/metropolis/node/core/rpc/resolver/watcher.go b/metropolis/node/core/rpc/resolver/watcher.go
new file mode 100644
index 0000000..281fc59
--- /dev/null
+++ b/metropolis/node/core/rpc/resolver/watcher.go
@@ -0,0 +1,122 @@
+package resolver
+
+import (
+ "errors"
+ "fmt"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/resolver"
+)
+
+// clientWatcher is a subordinate structure to a given ClusterResolver,
+// updating a gRPC ClientConn with information about current endpoints.
+type clientWatcher struct {
+ resolver *Resolver
+ clientConn resolver.ClientConn
+ subscription *responseSubscribe
+}
+
+var (
+ // ResolverClosed will be returned by the resolver to gRPC machinery whenever a
+ // resolver cannot be used anymore because it was Closed.
+ ResolverClosed = errors.New("cluster resolver closed")
+)
+
+// Build is called by gRPC on each Dial call. It spawns a new clientWatcher,
+// whose goroutine receives information about currently available nodes from the
+// parent ClusterResolver and actually updates a given gRPC client connection
+// with information about the current set of nodes.
+func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
+ // We can only connect to "metropolis://control".
+ if target.Scheme != "metropolis" || target.Authority != "" || target.Endpoint != "control" {
+ return nil, fmt.Errorf("invalid target: must be %s, is: %s", MetropolisControlAddress, target.Endpoint)
+ }
+
+ if opts.DialCreds == nil {
+ return nil, fmt.Errorf("can only be used with clients containing TransportCredentials")
+ }
+
+ // Submit the dial options to the resolver's processor, quitting if the resolver
+ // gets canceled in the meantime.
+ options := []grpc.DialOption{
+ grpc.WithTransportCredentials(opts.DialCreds),
+ grpc.WithContextDialer(opts.Dialer),
+ }
+
+ select {
+ case <-r.ctx.Done():
+ return nil, ResolverClosed
+ case r.reqC <- &request{
+ ds: &requestDialOptionsSet{
+ options: options,
+ },
+ }:
+ }
+
+ // Submit a subscription request to the resolver's processor, quitting if the
+ // resolver gets canceled in the meantime.
+
+ req := &request{
+ sub: &requestSubscribe{resC: make(chan *responseSubscribe)},
+ }
+ select {
+ case <-r.ctx.Done():
+ return nil, ResolverClosed
+ case r.reqC <- req:
+ }
+ // This receive is uninterruptible by contract - as it's also uninterruptible on
+ // the processor side.
+ subscription := <-req.sub.resC
+
+ watcher := &clientWatcher{
+ resolver: r,
+ clientConn: cc,
+ subscription: subscription,
+ }
+ go watcher.watch()
+
+ return watcher, nil
+}
+
+func (r *Resolver) Scheme() string {
+ return "metropolis"
+}
+
+func (w *clientWatcher) watch() {
+ // Craft a trivial gRPC service config which forces round-robin behaviour. This
+ // doesn't really matter for us, as we only ever submit the single leader as a
+ // connection endpoint.
+ svcConfig := w.clientConn.ParseServiceConfig(`{ "loadBalancingConfig": [{"round_robin": {}}]}`)
+
+ // Watch for leader to be updated.
+ for {
+ update := <-w.subscription.subC
+ if update == nil {
+ // A nil result means the channel is closed, which means this watcher has either
+ // closed or the resolver has been canceled. Abort loop.
+ w.clientConn.ReportError(ResolverClosed)
+ break
+ }
+ w.clientConn.UpdateState(resolver.State{
+ Addresses: []resolver.Address{
+ {
+ Addr: update.endpoint.endpoint,
+ ServerName: update.nodeID,
+ },
+ },
+ ServiceConfig: svcConfig,
+ })
+ }
+}
+
+func (r *clientWatcher) ResolveNow(_ resolver.ResolveNowOptions) {
+ // No-op. The clientWatcher's watcher runs as fast as possible.
+}
+
+func (r *clientWatcher) Close() {
+ r.resolver.reqC <- &request{
+ unsub: &requestUnsubscribe{
+ id: r.subscription.id,
+ },
+ }
+}
diff --git a/metropolis/node/core/rpc/resolver_test.go b/metropolis/node/core/rpc/resolver_test.go
deleted file mode 100644
index 8231273..0000000
--- a/metropolis/node/core/rpc/resolver_test.go
+++ /dev/null
@@ -1,172 +0,0 @@
-package rpc
-
-import (
- "context"
- "crypto/tls"
- "log"
- "net"
- "strings"
- "testing"
- "time"
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
-
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- apb "source.monogon.dev/metropolis/proto/api"
- cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-type testImplementationClusterAware struct {
- ipb.UnimplementedCuratorServer
- apb.UnimplementedAAAServer
- apb.UnimplementedManagementServer
-
- addresses map[string]string
-}
-
-func (t *testImplementationClusterAware) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (*apb.GetClusterInfoResponse, error) {
- return &apb.GetClusterInfoResponse{}, nil
-}
-
-func (t *testImplementationClusterAware) Watch(_ *ipb.WatchRequest, srv ipb.Curator_WatchServer) error {
- var nodes []*ipb.Node
- for name, addr := range t.addresses {
- nodes = append(nodes, &ipb.Node{
- Id: name,
- Roles: &cpb.NodeRoles{ConsensusMember: &cpb.NodeRoles_ConsensusMember{}},
- Status: &cpb.NodeStatus{
- ExternalAddress: addr,
- },
- })
- }
- err := srv.Send(&ipb.WatchEvent{
- Nodes: nodes,
- })
- if err != nil {
- return err
- }
- <-srv.Context().Done()
- return srv.Context().Err()
-}
-
-// TestResolverSimple exercises the happy path of the gRPC ResolverBuilder,
-// checking that a single node can be used to bootstrap multiple nodes from, and
-// ensuring that nodes are being dialed in a round-robin fashion.
-//
-// TODO(q3k): exercise node removal and re-dial of updater to another node.
-func TestResolverSimple(t *testing.T) {
- ctx, ctxC := context.WithCancel(context.Background())
- defer ctxC()
-
- // Make three nodes for testing, each with its own bufconn listener.
- numCurators := 3
- eph := NewEphemeralClusterCredentials(t, numCurators)
-
- listeners := make([]net.Listener, numCurators)
- for i := 0; i < numCurators; i++ {
- lis, err := net.Listen("tcp", "")
- if err != nil {
- t.Fatalf("Listen failed: %v", err)
- }
- listeners[i] = lis
- }
-
- addresses := make(map[string]string)
- for i, lis := range listeners {
- name := eph.Nodes[i].ID()
- addresses[name] = lis.Addr().String()
- }
- impls := make([]*testImplementationClusterAware, numCurators)
- for i := 0; i < numCurators; i++ {
- impls[i] = &testImplementationClusterAware{
- addresses: addresses,
- }
- }
-
- servers := make([]*grpc.Server, numCurators)
- for i := 0; i < numCurators; i++ {
- i := i
- ss := ServerSecurity{
- NodeCredentials: eph.Nodes[i],
- }
- servers[i] = grpc.NewServer(ss.GRPCOptions(nil)...)
- ipb.RegisterCuratorServer(servers[i], impls[i])
- apb.RegisterAAAServer(servers[i], impls[i])
- apb.RegisterManagementServer(servers[i], impls[i])
- go func() {
- if err := servers[i].Serve(listeners[i]); err != nil {
- t.Fatalf("GRPC serve failed: %v", err)
- }
- }()
-
- defer listeners[i].Close()
- defer servers[i].Stop()
- }
-
- r := NewClusterResolver()
- r.logger = func(f string, args ...interface{}) {
- log.Printf(f, args...)
- }
- defer r.Close()
-
- creds := credentials.NewTLS(&tls.Config{
- Certificates: []tls.Certificate{eph.Manager},
- InsecureSkipVerify: true,
- VerifyPeerCertificate: verifyClusterCertificate(eph.CA),
- })
- cl, err := grpc.Dial("metropolis:///control", grpc.WithTransportCredentials(creds), grpc.WithResolvers(r))
- if err != nil {
- t.Fatalf("Could not dial: %v", err)
- }
-
- // Add first node to bootstrap node information from.
- r.AddNode(eph.Nodes[0].ID(), listeners[0].Addr().String())
-
- mgmt := apb.NewManagementClient(cl)
- _, err = mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
- if err != nil {
- t.Fatalf("Running initial GetClusterInfo failed: %v", err)
- }
-
- // Wait until client finds all three nodes.
- r.condCurators.L.Lock()
- for len(r.curators) < 3 {
- r.condCurators.Wait()
- }
- curators := r.curators
- r.condCurators.L.Unlock()
-
- // Ensure the three nodes as are expected.
- for i, node := range eph.Nodes {
- if got, want := curators[node.ID()], listeners[i].Addr().String(); want != got {
- t.Errorf("Node %s: wanted address %q, got %q", node.ID(), want, got)
- }
- }
-
- // Stop first node, make sure the call now reaches the other servers. This will
- // happen due to the resolver's round-robin behaviour, not because this node is
- // dropped from the active set of nodes.
- servers[0].Stop()
- listeners[0].Close()
-
- for i := 0; i < 10; i++ {
- _, err = mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
- if err == nil {
- break
- }
- time.Sleep(time.Second)
- }
- if err != nil {
- t.Errorf("Running GetClusterInfo after stopping first node failed: %v", err)
- }
-
- // Close the builder, new dials should fail.
- r.Close()
- _, err = grpc.Dial(MetropolisControlAddress, grpc.WithTransportCredentials(creds), grpc.WithResolvers(r), grpc.WithBlock())
- // String comparison required because the underlying gRPC code does not wrap the
- // error.
- if want, got := ResolverClosed, err; !strings.Contains(got.Error(), want.Error()) {
- t.Errorf("Unexpected dial error after closing builder, wanted %q, got %q", want, got)
- }
-}