blob: ff3fcf178f51da746cbfcc20ea6ece866140dde9 [file] [log] [blame]
Tim Windelschmidtf64f1972023-07-28 00:00:50 +00001package metrics
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "sync"
9
Serge Bazanski60461b22023-10-26 19:16:59 +020010 "source.monogon.dev/go/types/mapsets"
11 "source.monogon.dev/metropolis/node/core/curator/watcher"
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000012 "source.monogon.dev/metropolis/pkg/supervisor"
Serge Bazanski60461b22023-10-26 19:16:59 +020013
14 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000015)
16
17type Discovery struct {
18 Curator ipb.CuratorClient
19
20 // sdResp will contain the cached sdResponse
Serge Bazanski60461b22023-10-26 19:16:59 +020021 sdResp mapsets.OrderedMap[string, sdTarget]
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000022 // sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
23 sdRespMtx sync.RWMutex
24}
25
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000026type sdTarget struct {
27 Targets []string `json:"targets"`
28 Labels map[string]string `json:"labels"`
29}
30
31// Run is the sub-runnable responsible for fetching and serving node updates.
32func (s *Discovery) Run(ctx context.Context) error {
33 supervisor.Signal(ctx, supervisor.SignalHealthy)
34
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000035 defer func() {
36 s.sdRespMtx.Lock()
37 // disable the metrics endpoint until the new routine takes over
Serge Bazanski60461b22023-10-26 19:16:59 +020038 s.sdResp.Clear()
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000039 s.sdRespMtx.Unlock()
40 }()
41
Serge Bazanski60461b22023-10-26 19:16:59 +020042 return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
43 FilterFn: func(a *ipb.Node) bool {
44 if a.Status == nil {
45 return false
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000046 }
Serge Bazanski60461b22023-10-26 19:16:59 +020047 if a.Status.ExternalAddress == "" {
48 return false
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000049 }
Serge Bazanski60461b22023-10-26 19:16:59 +020050 if a.Roles == nil {
51 return false
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000052 }
Serge Bazanski60461b22023-10-26 19:16:59 +020053 return true
54 },
55 EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
56 if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
57 return false
58 }
59 if (a.Roles.KubernetesController == nil) != (b.Roles.KubernetesController == nil) {
60 return false
61 }
62 if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
63 return false
64 }
65 if a.Status.ExternalAddress != b.Status.ExternalAddress {
66 return false
67 }
68 return true
69 },
70 OnNewUpdated: func(new *ipb.Node) error {
71 s.sdRespMtx.Lock()
72 defer s.sdRespMtx.Unlock()
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000073
Serge Bazanski60461b22023-10-26 19:16:59 +020074 s.sdResp.Insert(new.Id, sdTarget{
75 Targets: []string{new.Status.ExternalAddress},
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000076 Labels: map[string]string{
Serge Bazanski60461b22023-10-26 19:16:59 +020077 "__meta_metropolis_role_kubernetes_worker": fmt.Sprintf("%t", new.Roles.KubernetesWorker != nil),
78 "__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", new.Roles.KubernetesController != nil),
79 "__meta_metropolis_role_consensus_member": fmt.Sprintf("%t", new.Roles.ConsensusMember != nil),
Lorenz Bruneb2520f2023-11-19 07:04:15 +010080 "__meta_metropolis_node": new.Id,
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000081 },
82 })
Serge Bazanski60461b22023-10-26 19:16:59 +020083 return nil
84 },
85 OnDeleted: func(prev *ipb.Node) error {
86 s.sdRespMtx.Lock()
87 defer s.sdRespMtx.Unlock()
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000088
Serge Bazanski60461b22023-10-26 19:16:59 +020089 s.sdResp.Delete(prev.Id)
90 return nil
91 },
92 })
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000093}
94
95func (s *Discovery) ServeHTTP(w http.ResponseWriter, r *http.Request) {
96 if r.Method != http.MethodGet {
97 http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
98 return
99 }
100
101 s.sdRespMtx.RLock()
102 defer s.sdRespMtx.RUnlock()
103
Serge Bazanski60461b22023-10-26 19:16:59 +0200104 // If sdResp is empty, respond with Service Unavailable. This will only happen
105 // early enough in the lifecycle of a control plane node that it doesn't know
106 // about itself, or if this is not a control plane node:
107 if s.sdResp.Count() == 0 {
Tim Windelschmidtf64f1972023-07-28 00:00:50 +0000108 w.WriteHeader(http.StatusServiceUnavailable)
109 return
110 }
111
112 w.Header().Set("Content-Type", "application/json")
113 w.WriteHeader(http.StatusOK)
114
Serge Bazanski60461b22023-10-26 19:16:59 +0200115 // Turn into a plain array as expected by the service discovery API.
116 var res []sdTarget
117 for _, v := range s.sdResp.Values() {
118 res = append(res, v.Value)
119 }
120 if err := json.NewEncoder(w).Encode(res); err != nil {
Tim Windelschmidtf64f1972023-07-28 00:00:50 +0000121 // If the encoder fails its mostly because of closed connections
122 // so lets just ignore these errors.
123 return
124 }
125}