blob: 037d3b00c6459d0173f7af8e9e44c31ef51d92f1 [file] [log] [blame]
Tim Windelschmidtf64f1972023-07-28 00:00:50 +00001package metrics
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "sync"
9
10 apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
11 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
12
13 "source.monogon.dev/metropolis/pkg/supervisor"
14)
15
16type Discovery struct {
17 Curator ipb.CuratorClient
18
19 // sdResp will contain the cached sdResponse
20 sdResp sdResponse
21 // sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
22 sdRespMtx sync.RWMutex
23}
24
25type sdResponse []sdTarget
26
27type sdTarget struct {
28 Targets []string `json:"targets"`
29 Labels map[string]string `json:"labels"`
30}
31
32// Run is the sub-runnable responsible for fetching and serving node updates.
33func (s *Discovery) Run(ctx context.Context) error {
34 supervisor.Signal(ctx, supervisor.SignalHealthy)
35
36 srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
37 Kind: &apb.WatchRequest_NodesInCluster_{
38 NodesInCluster: &apb.WatchRequest_NodesInCluster{},
39 },
40 })
41 if err != nil {
42 return fmt.Errorf("curator watch failed: %w", err)
43 }
44 defer srv.CloseSend()
45
46 defer func() {
47 s.sdRespMtx.Lock()
48 // disable the metrics endpoint until the new routine takes over
49 s.sdResp = nil
50 s.sdRespMtx.Unlock()
51 }()
52
53 nodes := make(map[string]*apb.Node)
54 for {
55 ev, err := srv.Recv()
56 if err != nil {
57 // The watcher wont return a properly wrapped error which confuses
58 // our testing harness. Lets just return the context error directly
59 // if it exists.
60 if ctx.Err() != nil {
61 return ctx.Err()
62 }
63
64 return fmt.Errorf("curator watch recv failed: %w", err)
65 }
66
67 for _, n := range ev.Nodes {
68 nodes[n.Id] = n
69 }
70
71 for _, t := range ev.NodeTombstones {
72 n, ok := nodes[t.NodeId]
73 if !ok {
74 // This is an indication of us losing data somehow. If this happens, it likely
75 // means a Curator bug.
76 supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
77 continue
78 }
79 delete(nodes, n.Id)
80 }
81
82 s.sdRespMtx.Lock()
83
84 s.sdResp = nil
85 for _, n := range nodes {
86 // Only care about nodes that have all required configuration set.
87 if n.Status == nil || n.Status.ExternalAddress == "" || n.Roles == nil {
88 continue
89 }
90
91 s.sdResp = append(s.sdResp, sdTarget{
92 Targets: []string{n.Status.ExternalAddress},
93 Labels: map[string]string{
94 "__meta_metropolis_role_kubernetes_worker": fmt.Sprintf("%t", n.Roles.KubernetesWorker != nil),
95 "__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", n.Roles.KubernetesController != nil),
96 "__meta_metropolis_role_consensus_member": fmt.Sprintf("%t", n.Roles.ConsensusMember != nil),
97 },
98 })
99 }
100
101 s.sdRespMtx.Unlock()
102 }
103}
104
105func (s *Discovery) ServeHTTP(w http.ResponseWriter, r *http.Request) {
106 if r.Method != http.MethodGet {
107 http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
108 return
109 }
110
111 s.sdRespMtx.RLock()
112 defer s.sdRespMtx.RUnlock()
113
114 // If sdResp is nil, which only happens if we are not a master node
115 // or we are still booting, we respond with NotImplemented.
116 if s.sdResp == nil {
117 w.WriteHeader(http.StatusServiceUnavailable)
118 return
119 }
120
121 w.Header().Set("Content-Type", "application/json")
122 w.WriteHeader(http.StatusOK)
123
124 if err := json.NewEncoder(w).Encode(s.sdResp); err != nil {
125 // If the encoder fails its mostly because of closed connections
126 // so lets just ignore these errors.
127 return
128 }
129}