m/n/c/rpc/resolver: dampen curator updates
This makes the resolver only process node updates if some curator data
was actually changed.
Fixes https://github.com/monogon-dev/monogon/issues/233
Change-Id: I790adfc4aa3562864faf807d32ac00d9e3bd0bea
Reviewed-on: https://review.monogon.dev/c/monogon/+/1851
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/rpc/resolver/processor.go b/metropolis/node/core/rpc/resolver/processor.go
index c281e57..ef9d4f6 100644
--- a/metropolis/node/core/rpc/resolver/processor.go
+++ b/metropolis/node/core/rpc/resolver/processor.go
@@ -33,11 +33,47 @@
resC chan *curatorMap
}
+type nodeStatusMap map[string]*cpb.NodeStatus
+
+func (n nodeStatusMap) equals(o nodeStatusMap) bool {
+ // Check that we have the same keys on both maps.
+ for k, _ := range n {
+ _, ok := o[k]
+ if !ok {
+ return false
+ }
+ }
+ for k, _ := range o {
+ _, ok := n[k]
+ if !ok {
+ return false
+ }
+ }
+ // Keys are equal, compare values.
+ for k, v1 := range n {
+ v2 := o[k]
+
+ cur1 := v1.RunningCurator != nil
+ cur2 := v2.RunningCurator != nil
+ if cur1 != cur2 {
+ return false
+ }
+ if v1.ExternalAddress != v2.ExternalAddress {
+ return false
+ }
+
+ if cur1 && cur2 && v1.RunningCurator.Port != v2.RunningCurator.Port {
+ return false
+ }
+ }
+ return true
+}
+
// requestNodesUpdate is received from the curator updater, and carries
// information about the current curators as seen by the cluster control plane.
type requestNodesUpdate struct {
// nodes is a map from node ID to received status
- nodes map[string]*cpb.NodeStatus
+ nodes nodeStatusMap
}
// requestSeedAdd is received from AddEndpoint calls. It updates the processor's
diff --git a/metropolis/node/core/rpc/resolver/resolver.go b/metropolis/node/core/rpc/resolver/resolver.go
index dfac477..8b695b0 100644
--- a/metropolis/node/core/rpc/resolver/resolver.go
+++ b/metropolis/node/core/rpc/resolver/resolver.go
@@ -51,29 +51,29 @@
// gRPC client connections can use. Usually one ClusterResolver instance should
// be used per application.
//
-// .------------------------. .--------------------------------------.
-// | Metropolis Cluster | | Resolver |
-// :------------------------: :--------------------------------------:
-// : : : :
-// : .--------------------. : : .----------------. :
-// : | curator (follower) |<---.---------| Leader Updater |------------. :
-// : '--------------------' : | : '----------------' | :
-// : .--------------------. : | : .------------------------. | :
-// : | curator (follower) |<---: : | Processor (CuratorMap) |<-.-'-. :
-// : '--------------------' : | : '------------------------' | | :
-// : .--------------------.<---' : .-----------------. | | :
-// : | curator (leader) |<-------------| Curator Updater |---------' | :
-// : '--------------------' : : '-----------------' | :
-// : : : | :
-// '------------------------' : .----------. | :
-// : | Watchers |-. | :
-// : '----------' |------------------' :
-// : '-^--------' :
-// : | ^ :
-// : | | :
-// .---------------.
-// | gRPC channels |
-// '---------------'
+// .------------------------. .--------------------------------------.
+// | Metropolis Cluster | | Resolver |
+// :------------------------: :--------------------------------------:
+// : : : :
+// : .--------------------. : : .----------------. :
+// : | curator (follower) |<---.---------| Leader Updater |------------. :
+// : '--------------------' : | : '----------------' | :
+// : .--------------------. : | : .------------------------. | :
+// : | curator (follower) |<---: : | Processor (CuratorMap) |<-.-'-. :
+// : '--------------------' : | : '------------------------' | | :
+// : .--------------------.<---' : .-----------------. | | :
+// : | curator (leader) |<-------------| Curator Updater |---------' | :
+// : '--------------------' : : '-----------------' | :
+// : : : | :
+// '------------------------' : .----------. | :
+// : | Watchers |-. | :
+// : '----------' |------------------' :
+// : '-^--------' :
+// : | ^ :
+// : | | :
+// .---------------.
+// | gRPC channels |
+// '---------------'
type Resolver struct {
reqC chan *request
ctx context.Context
@@ -223,6 +223,7 @@
defer cl.Close()
cur := apb.NewCuratorClient(cl)
+ prevCurators := make(nodeStatusMap)
return backoff.RetryNotify(func() error {
w, err := cur.Watch(ctx, &apb.WatchRequest{
@@ -245,24 +246,35 @@
}
bo.Reset()
- // Update internal map.
+ // Update internal map but only care about curators.
for _, n := range ev.Nodes {
+ if n.Status == nil || n.Status.RunningCurator == nil {
+ delete(nodes, n.Id)
+ continue
+ }
nodes[n.Id] = n.Status
}
for _, n := range ev.NodeTombstones {
delete(nodes, n.NodeId)
}
- // Make a copy, this time only curators.
- curators := make(map[string]*cpb.NodeStatus)
+ // Make a copy to submit to client.
+ curators := make(nodeStatusMap)
var curatorNames []string
for k, v := range nodes {
- if v == nil || v.RunningCurator == nil {
+ if v == nil {
continue
}
curators[k] = v
curatorNames = append(curatorNames, k)
}
+
+ // Only submit an update (and log) if the effective curator map actually changed.
+ if prevCurators.equals(curators) {
+ continue
+ }
+ prevCurators = curators
+
r.logger("CURUPDATE: got new curators: %s", strings.Join(curatorNames, ", "))
select {