blob: a5f7869672eee9891680864101b6b76ff4580fdc [file] [log] [blame]
Serge Bazanski6d1ff362024-09-30 15:15:31 +00001package kubernetes
2
3import (
4 "context"
5 "strings"
6 "time"
7
8 kerrors "k8s.io/apimachinery/pkg/api/errors"
9 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10 applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
11 "k8s.io/client-go/kubernetes"
12
13 common "source.monogon.dev/metropolis/node"
14 "source.monogon.dev/osbase/supervisor"
15
16 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
17)
18
19// labelmaker is a service responsible for keeping Kubernetes node labels in sync
20// with Metropolis data. Currently, it synchronized Metropolis node roles into
21// corresponding Kubernetes roles (implemented as labels).
22//
23// The labelmaker service runs on all Kubernetes controller nodes. This is safe,
24// but it can cause spurious updates or update errors.
25//
26// TODO(q3k): leader elect a node responsible for this (and other Kubernetes
27// control loops, too).
28type labelmaker struct {
29 clientSet kubernetes.Interface
30 curator ipb.CuratorClient
31}
32
33// managedLabelPrefixes are string prefixes to the keys of labels that we manage.
34// This is used to filter out labels when doing a local diff of state vs. intent,
35// before Kubernetes server-side apply is used.
36var managedLabelPrefixes = []string{
37 "node-role.kubernetes.io/",
38}
39
40// filterManaged filters out all labels not falling under managedLabelPrefixes.
41func filterManaged(l common.Labels) common.Labels {
42 res := make(map[string]string)
43 for k, v := range l {
44 for _, prefix := range managedLabelPrefixes {
45 if strings.HasPrefix(k, prefix) {
46 res[k] = v
47 break
48 }
49 }
50 }
51 return res
52}
53
54// getIntendedLabelsPerNode returns labels that a node should have according to
55// Curator data.
56func getIntendedLabelsPerNode(node *ipb.Node) common.Labels {
57 labels := make(map[string]string)
58 // Mark role labels with empty string as content, following convention set by
59 // kubeadm et al.
60 if node.Roles.ConsensusMember != nil {
61 labels["node-role.kubernetes.io/ConsensusMember"] = ""
62 }
63 if node.Roles.KubernetesController != nil {
64 labels["node-role.kubernetes.io/KubernetesController"] = ""
65 }
66 if node.Roles.KubernetesWorker != nil {
67 labels["node-role.kubernetes.io/KubernetesWorker"] = ""
68 }
69 return labels
70}
71
72// getCurrentLabelsForNode returns the current labels in Kubernetes for a given
73// node.
74//
75// If the given node does not exist in Kubernetes, an empty label map is returned.
76func getCurrentLabelsForNode(ctx context.Context, clientset kubernetes.Interface, nid string) (common.Labels, error) {
77 node, err := clientset.CoreV1().Nodes().Get(ctx, nid, v1.GetOptions{})
78 if kerrors.IsNotFound(err) {
79 return nil, nil
80 }
81 if err != nil {
82 return nil, err
83 }
84 return node.Labels, nil
85}
86
87// run the labelmaker service.
88func (l *labelmaker) run(ctx context.Context) error {
89 for {
90 ctxT, ctxC := context.WithTimeout(ctx, time.Minute*10)
91 err := l.runInner(ctxT)
92 // runInner will return an error either on context expiry (which should lead to a
93 // silent restart) or any other error (either processing error or parent context
94 // timeout) which should be bubbled up.
95 if err == nil {
96 // This shouldn't happen, but let's handle it just in case.
97 ctxC()
98 continue
99 }
100 if ctxT.Err() != nil && ctx.Err() == nil {
101 // We should check with errors.Is whether the returned error is influenced by
102 // context expiry, but gRPC apparently doesn't (always?) wrap the context error
103 // cause into the returned error. Skipping this check might lead us to
104 // misidentifying other errors as context expiry if these two race, but there's
105 // not much we can do about this.
106 ctxC()
107 continue
108 }
109 // Probably some other kind of error. Return it.
110 ctxC()
111 return err
112 }
113}
114
115// runInner runs the labelmaker service with one active curator Watch call. This
116// will be interrupted by the given context timing out (as provided by run) in
117// order to ensure nodes that couldn't get processed have another chance later
118// on.
119func (l *labelmaker) runInner(ctx context.Context) error {
120 srv, err := l.curator.Watch(ctx, &ipb.WatchRequest{
121 Kind: &ipb.WatchRequest_NodesInCluster_{
122 NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
123 },
124 })
125 if err != nil {
126 return err
127 }
128 defer srv.CloseSend()
129
130 for {
131 ev, err := srv.Recv()
132 if err != nil {
133 return err
134 }
135
136 supervisor.Logger(ctx).Infof("Processing %d nodes...", len(ev.Nodes))
137
138 for _, node := range ev.Nodes {
139 if err := l.processClusterNode(ctx, node); err != nil {
140 supervisor.Logger(ctx).Warningf("Failed to process labels on node %s: %v", node.Id, err)
141 }
142 }
143
144 if ev.Progress == ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED {
145 supervisor.Logger(ctx).Infof("Caught up with node backlog, now watching for updates.")
146 }
147 }
148}
149
150// processClusterNodes runs the label reconciliation algorithm on a single node.
151// It requests current labels from Kubernetes and issues a server-side-apply
152// operation to bring them into the requested state, if there is a diff.
153func (l *labelmaker) processClusterNode(ctx context.Context, node *ipb.Node) error {
154 intent := getIntendedLabelsPerNode(node)
155 state, err := getCurrentLabelsForNode(ctx, l.clientSet, node.Id)
156 if err != nil {
157 return err
158 }
159 if state == nil {
160 return nil
161 }
162 intent = filterManaged(intent)
163 state = filterManaged(state)
164
165 if intent.Equals(state) {
166 return nil
167 }
168 supervisor.Logger(ctx).Infof("Updating labels on node %s... ", node.Id)
169 for k, v := range state {
170 if intent[k] != v {
171 supervisor.Logger(ctx).Infof(" Removing %s=%q", k, v)
172 }
173 }
174 for k, v := range intent {
175 if state[k] != v {
176 supervisor.Logger(ctx).Infof(" Adding %s=%q", k, v)
177 }
178 }
179
180 cfg := applycorev1.Node(node.Id)
181 cfg.Labels = intent
182 _, err = l.clientSet.CoreV1().Nodes().Apply(ctx, cfg, v1.ApplyOptions{
183 FieldManager: "metropolis-labelmaker",
184 Force: true,
185 })
186 return err
187}