Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 1 | package metrics |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "encoding/json" |
| 6 | "fmt" |
| 7 | "net/http" |
| 8 | "sync" |
| 9 | |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 10 | "source.monogon.dev/go/types/mapsets" |
| 11 | "source.monogon.dev/metropolis/node/core/curator/watcher" |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 12 | "source.monogon.dev/metropolis/pkg/supervisor" |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 13 | |
| 14 | ipb "source.monogon.dev/metropolis/node/core/curator/proto/api" |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 15 | ) |
| 16 | |
| 17 | type Discovery struct { |
| 18 | Curator ipb.CuratorClient |
| 19 | |
| 20 | // sdResp will contain the cached sdResponse |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 21 | sdResp mapsets.OrderedMap[string, sdTarget] |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 22 | // sdRespMtx is the mutex for sdResp to allow usage inside the http handler. |
| 23 | sdRespMtx sync.RWMutex |
| 24 | } |
| 25 | |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 26 | type sdTarget struct { |
| 27 | Targets []string `json:"targets"` |
| 28 | Labels map[string]string `json:"labels"` |
| 29 | } |
| 30 | |
| 31 | // Run is the sub-runnable responsible for fetching and serving node updates. |
| 32 | func (s *Discovery) Run(ctx context.Context) error { |
| 33 | supervisor.Signal(ctx, supervisor.SignalHealthy) |
| 34 | |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 35 | defer func() { |
| 36 | s.sdRespMtx.Lock() |
| 37 | // disable the metrics endpoint until the new routine takes over |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 38 | s.sdResp.Clear() |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 39 | s.sdRespMtx.Unlock() |
| 40 | }() |
| 41 | |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 42 | return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{ |
| 43 | FilterFn: func(a *ipb.Node) bool { |
| 44 | if a.Status == nil { |
| 45 | return false |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 46 | } |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 47 | if a.Status.ExternalAddress == "" { |
| 48 | return false |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 49 | } |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 50 | if a.Roles == nil { |
| 51 | return false |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 52 | } |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 53 | return true |
| 54 | }, |
| 55 | EqualsFn: func(a *ipb.Node, b *ipb.Node) bool { |
| 56 | if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) { |
| 57 | return false |
| 58 | } |
| 59 | if (a.Roles.KubernetesController == nil) != (b.Roles.KubernetesController == nil) { |
| 60 | return false |
| 61 | } |
| 62 | if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) { |
| 63 | return false |
| 64 | } |
| 65 | if a.Status.ExternalAddress != b.Status.ExternalAddress { |
| 66 | return false |
| 67 | } |
| 68 | return true |
| 69 | }, |
| 70 | OnNewUpdated: func(new *ipb.Node) error { |
| 71 | s.sdRespMtx.Lock() |
| 72 | defer s.sdRespMtx.Unlock() |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 73 | |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 74 | s.sdResp.Insert(new.Id, sdTarget{ |
| 75 | Targets: []string{new.Status.ExternalAddress}, |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 76 | Labels: map[string]string{ |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 77 | "__meta_metropolis_role_kubernetes_worker": fmt.Sprintf("%t", new.Roles.KubernetesWorker != nil), |
| 78 | "__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", new.Roles.KubernetesController != nil), |
| 79 | "__meta_metropolis_role_consensus_member": fmt.Sprintf("%t", new.Roles.ConsensusMember != nil), |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 80 | }, |
| 81 | }) |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 82 | return nil |
| 83 | }, |
| 84 | OnDeleted: func(prev *ipb.Node) error { |
| 85 | s.sdRespMtx.Lock() |
| 86 | defer s.sdRespMtx.Unlock() |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 87 | |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 88 | s.sdResp.Delete(prev.Id) |
| 89 | return nil |
| 90 | }, |
| 91 | }) |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 92 | } |
| 93 | |
| 94 | func (s *Discovery) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 95 | if r.Method != http.MethodGet { |
| 96 | http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed) |
| 97 | return |
| 98 | } |
| 99 | |
| 100 | s.sdRespMtx.RLock() |
| 101 | defer s.sdRespMtx.RUnlock() |
| 102 | |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 103 | // If sdResp is empty, respond with Service Unavailable. This will only happen |
| 104 | // early enough in the lifecycle of a control plane node that it doesn't know |
| 105 | // about itself, or if this is not a control plane node: |
| 106 | if s.sdResp.Count() == 0 { |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 107 | w.WriteHeader(http.StatusServiceUnavailable) |
| 108 | return |
| 109 | } |
| 110 | |
| 111 | w.Header().Set("Content-Type", "application/json") |
| 112 | w.WriteHeader(http.StatusOK) |
| 113 | |
Serge Bazanski | 60461b2 | 2023-10-26 19:16:59 +0200 | [diff] [blame] | 114 | // Turn into a plain array as expected by the service discovery API. |
| 115 | var res []sdTarget |
| 116 | for _, v := range s.sdResp.Values() { |
| 117 | res = append(res, v.Value) |
| 118 | } |
| 119 | if err := json.NewEncoder(w).Encode(res); err != nil { |
Tim Windelschmidt | f64f197 | 2023-07-28 00:00:50 +0000 | [diff] [blame] | 120 | // If the encoder fails its mostly because of closed connections |
| 121 | // so lets just ignore these errors. |
| 122 | return |
| 123 | } |
| 124 | } |