blob: d5a992b44c4adcdd5c060a0ee091a4120d6dfe36 [file] [log] [blame]
Tim Windelschmidtf64f1972023-07-28 00:00:50 +00001package metrics
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "sync"
9
Serge Bazanski60461b22023-10-26 19:16:59 +020010 "source.monogon.dev/go/types/mapsets"
11 "source.monogon.dev/metropolis/node/core/curator/watcher"
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000012 "source.monogon.dev/metropolis/pkg/supervisor"
Serge Bazanski60461b22023-10-26 19:16:59 +020013
14 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000015)
16
17type Discovery struct {
18 Curator ipb.CuratorClient
19
20 // sdResp will contain the cached sdResponse
Serge Bazanski60461b22023-10-26 19:16:59 +020021 sdResp mapsets.OrderedMap[string, sdTarget]
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000022 // sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
23 sdRespMtx sync.RWMutex
24}
25
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000026type sdTarget struct {
27 Targets []string `json:"targets"`
28 Labels map[string]string `json:"labels"`
29}
30
31// Run is the sub-runnable responsible for fetching and serving node updates.
32func (s *Discovery) Run(ctx context.Context) error {
33 supervisor.Signal(ctx, supervisor.SignalHealthy)
34
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000035 defer func() {
36 s.sdRespMtx.Lock()
37 // disable the metrics endpoint until the new routine takes over
Serge Bazanski60461b22023-10-26 19:16:59 +020038 s.sdResp.Clear()
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000039 s.sdRespMtx.Unlock()
40 }()
41
Serge Bazanski60461b22023-10-26 19:16:59 +020042 return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
43 FilterFn: func(a *ipb.Node) bool {
44 if a.Status == nil {
45 return false
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000046 }
Serge Bazanski60461b22023-10-26 19:16:59 +020047 if a.Status.ExternalAddress == "" {
48 return false
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000049 }
Serge Bazanski60461b22023-10-26 19:16:59 +020050 if a.Roles == nil {
51 return false
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000052 }
Serge Bazanski60461b22023-10-26 19:16:59 +020053 return true
54 },
55 EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
56 if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
57 return false
58 }
59 if (a.Roles.KubernetesController == nil) != (b.Roles.KubernetesController == nil) {
60 return false
61 }
62 if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
63 return false
64 }
65 if a.Status.ExternalAddress != b.Status.ExternalAddress {
66 return false
67 }
68 return true
69 },
70 OnNewUpdated: func(new *ipb.Node) error {
71 s.sdRespMtx.Lock()
72 defer s.sdRespMtx.Unlock()
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000073
Serge Bazanski60461b22023-10-26 19:16:59 +020074 s.sdResp.Insert(new.Id, sdTarget{
75 Targets: []string{new.Status.ExternalAddress},
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000076 Labels: map[string]string{
Serge Bazanski60461b22023-10-26 19:16:59 +020077 "__meta_metropolis_role_kubernetes_worker": fmt.Sprintf("%t", new.Roles.KubernetesWorker != nil),
78 "__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", new.Roles.KubernetesController != nil),
79 "__meta_metropolis_role_consensus_member": fmt.Sprintf("%t", new.Roles.ConsensusMember != nil),
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000080 },
81 })
Serge Bazanski60461b22023-10-26 19:16:59 +020082 return nil
83 },
84 OnDeleted: func(prev *ipb.Node) error {
85 s.sdRespMtx.Lock()
86 defer s.sdRespMtx.Unlock()
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000087
Serge Bazanski60461b22023-10-26 19:16:59 +020088 s.sdResp.Delete(prev.Id)
89 return nil
90 },
91 })
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000092}
93
94func (s *Discovery) ServeHTTP(w http.ResponseWriter, r *http.Request) {
95 if r.Method != http.MethodGet {
96 http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
97 return
98 }
99
100 s.sdRespMtx.RLock()
101 defer s.sdRespMtx.RUnlock()
102
Serge Bazanski60461b22023-10-26 19:16:59 +0200103 // If sdResp is empty, respond with Service Unavailable. This will only happen
104 // early enough in the lifecycle of a control plane node that it doesn't know
105 // about itself, or if this is not a control plane node:
106 if s.sdResp.Count() == 0 {
Tim Windelschmidtf64f1972023-07-28 00:00:50 +0000107 w.WriteHeader(http.StatusServiceUnavailable)
108 return
109 }
110
111 w.Header().Set("Content-Type", "application/json")
112 w.WriteHeader(http.StatusOK)
113
Serge Bazanski60461b22023-10-26 19:16:59 +0200114 // Turn into a plain array as expected by the service discovery API.
115 var res []sdTarget
116 for _, v := range s.sdResp.Values() {
117 res = append(res, v.Value)
118 }
119 if err := json.NewEncoder(w).Encode(res); err != nil {
Tim Windelschmidtf64f1972023-07-28 00:00:50 +0000120 // If the encoder fails its mostly because of closed connections
121 // so lets just ignore these errors.
122 return
123 }
124}