metropolis: move curator client watches to curator/watcher
This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.
Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/metrics/discovery.go b/metropolis/node/core/metrics/discovery.go
index 037d3b0..d5a992b 100644
--- a/metropolis/node/core/metrics/discovery.go
+++ b/metropolis/node/core/metrics/discovery.go
@@ -7,23 +7,22 @@
"net/http"
"sync"
- apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-
+ "source.monogon.dev/go/types/mapsets"
+ "source.monogon.dev/metropolis/node/core/curator/watcher"
"source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
)
type Discovery struct {
Curator ipb.CuratorClient
// sdResp will contain the cached sdResponse
- sdResp sdResponse
+ sdResp mapsets.OrderedMap[string, sdTarget]
// sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
sdRespMtx sync.RWMutex
}
-type sdResponse []sdTarget
-
type sdTarget struct {
Targets []string `json:"targets"`
Labels map[string]string `json:"labels"`
@@ -33,73 +32,63 @@
func (s *Discovery) Run(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
- srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
- Kind: &apb.WatchRequest_NodesInCluster_{
- NodesInCluster: &apb.WatchRequest_NodesInCluster{},
- },
- })
- if err != nil {
- return fmt.Errorf("curator watch failed: %w", err)
- }
- defer srv.CloseSend()
-
defer func() {
s.sdRespMtx.Lock()
// disable the metrics endpoint until the new routine takes over
- s.sdResp = nil
+ s.sdResp.Clear()
s.sdRespMtx.Unlock()
}()
- nodes := make(map[string]*apb.Node)
- for {
- ev, err := srv.Recv()
- if err != nil {
- // The watcher wont return a properly wrapped error which confuses
- // our testing harness. Lets just return the context error directly
- // if it exists.
- if ctx.Err() != nil {
- return ctx.Err()
+ return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+ FilterFn: func(a *ipb.Node) bool {
+ if a.Status == nil {
+ return false
}
-
- return fmt.Errorf("curator watch recv failed: %w", err)
- }
-
- for _, n := range ev.Nodes {
- nodes[n.Id] = n
- }
-
- for _, t := range ev.NodeTombstones {
- n, ok := nodes[t.NodeId]
- if !ok {
- // This is an indication of us losing data somehow. If this happens, it likely
- // means a Curator bug.
- supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
- continue
+ if a.Status.ExternalAddress == "" {
+ return false
}
- delete(nodes, n.Id)
- }
-
- s.sdRespMtx.Lock()
-
- s.sdResp = nil
- for _, n := range nodes {
- // Only care about nodes that have all required configuration set.
- if n.Status == nil || n.Status.ExternalAddress == "" || n.Roles == nil {
- continue
+ if a.Roles == nil {
+ return false
}
+ return true
+ },
+ EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
+ if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
+ return false
+ }
+ if (a.Roles.KubernetesController == nil) != (b.Roles.KubernetesController == nil) {
+ return false
+ }
+ if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
+ return false
+ }
+ if a.Status.ExternalAddress != b.Status.ExternalAddress {
+ return false
+ }
+ return true
+ },
+ OnNewUpdated: func(new *ipb.Node) error {
+ s.sdRespMtx.Lock()
+ defer s.sdRespMtx.Unlock()
- s.sdResp = append(s.sdResp, sdTarget{
- Targets: []string{n.Status.ExternalAddress},
+ s.sdResp.Insert(new.Id, sdTarget{
+ Targets: []string{new.Status.ExternalAddress},
Labels: map[string]string{
- "__meta_metropolis_role_kubernetes_worker": fmt.Sprintf("%t", n.Roles.KubernetesWorker != nil),
- "__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", n.Roles.KubernetesController != nil),
- "__meta_metropolis_role_consensus_member": fmt.Sprintf("%t", n.Roles.ConsensusMember != nil),
+ "__meta_metropolis_role_kubernetes_worker": fmt.Sprintf("%t", new.Roles.KubernetesWorker != nil),
+ "__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", new.Roles.KubernetesController != nil),
+ "__meta_metropolis_role_consensus_member": fmt.Sprintf("%t", new.Roles.ConsensusMember != nil),
},
})
- }
+ return nil
+ },
+ OnDeleted: func(prev *ipb.Node) error {
+ s.sdRespMtx.Lock()
+ defer s.sdRespMtx.Unlock()
- s.sdRespMtx.Unlock()
- }
+ s.sdResp.Delete(prev.Id)
+ return nil
+ },
+ })
}
func (s *Discovery) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -111,9 +100,10 @@
s.sdRespMtx.RLock()
defer s.sdRespMtx.RUnlock()
- // If sdResp is nil, which only happens if we are not a master node
- // or we are still booting, we respond with NotImplemented.
- if s.sdResp == nil {
+ // If sdResp is empty, respond with Service Unavailable. This will only happen
+ // early enough in the lifecycle of a control plane node that it doesn't know
+ // about itself, or if this is not a control plane node:
+ if s.sdResp.Count() == 0 {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
@@ -121,7 +111,12 @@
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
- if err := json.NewEncoder(w).Encode(s.sdResp); err != nil {
+ // Turn into a plain array as expected by the service discovery API.
+ var res []sdTarget
+ for _, v := range s.sdResp.Values() {
+ res = append(res, v.Value)
+ }
+ if err := json.NewEncoder(w).Encode(res); err != nil {
// If the encoder fails its mostly because of closed connections
// so lets just ignore these errors.
return