blob: fea45824aeb44583ee567e19ddce690c04c99b73 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Tim Windelschmidtf64f1972023-07-28 00:00:50 +00004package metrics
5
6import (
7 "context"
8 "encoding/json"
9 "fmt"
10 "net/http"
11 "sync"
12
Serge Bazanski60461b22023-10-26 19:16:59 +020013 "source.monogon.dev/go/types/mapsets"
14 "source.monogon.dev/metropolis/node/core/curator/watcher"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020015 "source.monogon.dev/osbase/supervisor"
Serge Bazanski60461b22023-10-26 19:16:59 +020016
17 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000018)
19
20type Discovery struct {
21 Curator ipb.CuratorClient
22
23 // sdResp will contain the cached sdResponse
Serge Bazanski60461b22023-10-26 19:16:59 +020024 sdResp mapsets.OrderedMap[string, sdTarget]
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000025 // sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
26 sdRespMtx sync.RWMutex
27}
28
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000029type sdTarget struct {
30 Targets []string `json:"targets"`
31 Labels map[string]string `json:"labels"`
32}
33
34// Run is the sub-runnable responsible for fetching and serving node updates.
35func (s *Discovery) Run(ctx context.Context) error {
36 supervisor.Signal(ctx, supervisor.SignalHealthy)
37
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000038 defer func() {
39 s.sdRespMtx.Lock()
40 // disable the metrics endpoint until the new routine takes over
Serge Bazanski60461b22023-10-26 19:16:59 +020041 s.sdResp.Clear()
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000042 s.sdRespMtx.Unlock()
43 }()
44
Serge Bazanski60461b22023-10-26 19:16:59 +020045 return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
46 FilterFn: func(a *ipb.Node) bool {
47 if a.Status == nil {
48 return false
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000049 }
Serge Bazanski60461b22023-10-26 19:16:59 +020050 if a.Status.ExternalAddress == "" {
51 return false
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000052 }
Serge Bazanski60461b22023-10-26 19:16:59 +020053 if a.Roles == nil {
54 return false
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000055 }
Serge Bazanski60461b22023-10-26 19:16:59 +020056 return true
57 },
58 EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
59 if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
60 return false
61 }
62 if (a.Roles.KubernetesController == nil) != (b.Roles.KubernetesController == nil) {
63 return false
64 }
65 if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
66 return false
67 }
68 if a.Status.ExternalAddress != b.Status.ExternalAddress {
69 return false
70 }
71 return true
72 },
73 OnNewUpdated: func(new *ipb.Node) error {
74 s.sdRespMtx.Lock()
75 defer s.sdRespMtx.Unlock()
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000076
Serge Bazanski60461b22023-10-26 19:16:59 +020077 s.sdResp.Insert(new.Id, sdTarget{
78 Targets: []string{new.Status.ExternalAddress},
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000079 Labels: map[string]string{
Serge Bazanski60461b22023-10-26 19:16:59 +020080 "__meta_metropolis_role_kubernetes_worker": fmt.Sprintf("%t", new.Roles.KubernetesWorker != nil),
81 "__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", new.Roles.KubernetesController != nil),
82 "__meta_metropolis_role_consensus_member": fmt.Sprintf("%t", new.Roles.ConsensusMember != nil),
Lorenz Bruneb2520f2023-11-19 07:04:15 +010083 "__meta_metropolis_node": new.Id,
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000084 },
85 })
Serge Bazanski60461b22023-10-26 19:16:59 +020086 return nil
87 },
88 OnDeleted: func(prev *ipb.Node) error {
89 s.sdRespMtx.Lock()
90 defer s.sdRespMtx.Unlock()
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000091
Serge Bazanski60461b22023-10-26 19:16:59 +020092 s.sdResp.Delete(prev.Id)
93 return nil
94 },
95 })
Tim Windelschmidtf64f1972023-07-28 00:00:50 +000096}
97
98func (s *Discovery) ServeHTTP(w http.ResponseWriter, r *http.Request) {
99 if r.Method != http.MethodGet {
100 http.Error(w, fmt.Sprintf("method %q not allowed", r.Method), http.StatusMethodNotAllowed)
101 return
102 }
103
104 s.sdRespMtx.RLock()
105 defer s.sdRespMtx.RUnlock()
106
Serge Bazanski60461b22023-10-26 19:16:59 +0200107 // If sdResp is empty, respond with Service Unavailable. This will only happen
108 // early enough in the lifecycle of a control plane node that it doesn't know
109 // about itself, or if this is not a control plane node:
110 if s.sdResp.Count() == 0 {
Tim Windelschmidtf64f1972023-07-28 00:00:50 +0000111 w.WriteHeader(http.StatusServiceUnavailable)
112 return
113 }
114
115 w.Header().Set("Content-Type", "application/json")
116 w.WriteHeader(http.StatusOK)
117
Serge Bazanski60461b22023-10-26 19:16:59 +0200118 // Turn into a plain array as expected by the service discovery API.
119 var res []sdTarget
120 for _, v := range s.sdResp.Values() {
121 res = append(res, v.Value)
122 }
123 if err := json.NewEncoder(w).Encode(res); err != nil {
Tim Windelschmidtf64f1972023-07-28 00:00:50 +0000124 // If the encoder fails its mostly because of closed connections
125 // so lets just ignore these errors.
126 return
127 }
128}