blob: f08af95935f67095365d3226cb0c413a4d408b74 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanski6d1ff362024-09-30 15:15:31 +00004package kubernetes
5
6import (
7 "context"
Serge Bazanskie99638e2024-09-30 17:06:44 +00008 "fmt"
9 "regexp"
Serge Bazanski6d1ff362024-09-30 15:15:31 +000010 "time"
11
12 kerrors "k8s.io/apimachinery/pkg/api/errors"
Serge Bazanskie99638e2024-09-30 17:06:44 +000013 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Serge Bazanski6d1ff362024-09-30 15:15:31 +000014 applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
15 "k8s.io/client-go/kubernetes"
16
17 common "source.monogon.dev/metropolis/node"
18 "source.monogon.dev/osbase/supervisor"
19
20 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Serge Bazanskie99638e2024-09-30 17:06:44 +000021 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanski6d1ff362024-09-30 15:15:31 +000022)
23
24// labelmaker is a service responsible for keeping Kubernetes node labels in sync
25// with Metropolis data. Currently, it synchronized Metropolis node roles into
26// corresponding Kubernetes roles (implemented as labels).
27//
28// The labelmaker service runs on all Kubernetes controller nodes. This is safe,
29// but it can cause spurious updates or update errors.
30//
31// TODO(q3k): leader elect a node responsible for this (and other Kubernetes
32// control loops, too).
33type labelmaker struct {
34 clientSet kubernetes.Interface
35 curator ipb.CuratorClient
Serge Bazanskie99638e2024-09-30 17:06:44 +000036 mgmt apb.ManagementClient
Serge Bazanski6d1ff362024-09-30 15:15:31 +000037}
38
Serge Bazanskie99638e2024-09-30 17:06:44 +000039const labelmakerFieldManager string = "metropolis-label"
Serge Bazanski6d1ff362024-09-30 15:15:31 +000040
41// getIntendedLabelsPerNode returns labels that a node should have according to
Serge Bazanskie99638e2024-09-30 17:06:44 +000042// Curator data and the given list of custom label regexes.
43func getIntendedLabelsPerNode(labelRegexes []*regexp.Regexp, node *ipb.Node) common.Labels {
Serge Bazanski6d1ff362024-09-30 15:15:31 +000044 labels := make(map[string]string)
Serge Bazanskie99638e2024-09-30 17:06:44 +000045 // Add role labels. Use empty string as content, following convention set by
Serge Bazanski6d1ff362024-09-30 15:15:31 +000046 // kubeadm et al.
47 if node.Roles.ConsensusMember != nil {
48 labels["node-role.kubernetes.io/ConsensusMember"] = ""
49 }
50 if node.Roles.KubernetesController != nil {
51 labels["node-role.kubernetes.io/KubernetesController"] = ""
52 }
53 if node.Roles.KubernetesWorker != nil {
54 labels["node-role.kubernetes.io/KubernetesWorker"] = ""
55 }
Serge Bazanskie99638e2024-09-30 17:06:44 +000056
57 // Add user managed labels.
58 for _, pair := range node.Labels.Pairs {
59 for _, regex := range labelRegexes {
60 if regex.MatchString(pair.Key) {
61 labels[pair.Key] = pair.Value
62 break
63 }
64 }
65 }
Serge Bazanski6d1ff362024-09-30 15:15:31 +000066 return labels
67}
68
69// getCurrentLabelsForNode returns the current labels in Kubernetes for a given
70// node.
71//
72// If the given node does not exist in Kubernetes, an empty label map is returned.
73func getCurrentLabelsForNode(ctx context.Context, clientset kubernetes.Interface, nid string) (common.Labels, error) {
Serge Bazanskie99638e2024-09-30 17:06:44 +000074 node, err := clientset.CoreV1().Nodes().Get(ctx, nid, metav1.GetOptions{})
Serge Bazanski6d1ff362024-09-30 15:15:31 +000075 if kerrors.IsNotFound(err) {
76 return nil, nil
77 }
78 if err != nil {
79 return nil, err
80 }
Serge Bazanskie99638e2024-09-30 17:06:44 +000081 nac, err := applycorev1.ExtractNode(node, labelmakerFieldManager)
82 if err != nil {
83 return nil, err
84 }
85 // If there are no managed labels in this node, the Labels field will be nil. We
86 // can't return that, as our caller would think it means that this node doesn't
87 // exist in Kubernetes.
88 labels := nac.Labels
89 if labels == nil {
90 labels = make(common.Labels)
91 }
92 return labels, nil
Serge Bazanski6d1ff362024-09-30 15:15:31 +000093}
94
95// run the labelmaker service.
96func (l *labelmaker) run(ctx context.Context) error {
97 for {
98 ctxT, ctxC := context.WithTimeout(ctx, time.Minute*10)
99 err := l.runInner(ctxT)
100 // runInner will return an error either on context expiry (which should lead to a
101 // silent restart) or any other error (either processing error or parent context
102 // timeout) which should be bubbled up.
103 if err == nil {
104 // This shouldn't happen, but let's handle it just in case.
105 ctxC()
106 continue
107 }
108 if ctxT.Err() != nil && ctx.Err() == nil {
109 // We should check with errors.Is whether the returned error is influenced by
110 // context expiry, but gRPC apparently doesn't (always?) wrap the context error
111 // cause into the returned error. Skipping this check might lead us to
112 // misidentifying other errors as context expiry if these two race, but there's
113 // not much we can do about this.
114 ctxC()
115 continue
116 }
117 // Probably some other kind of error. Return it.
118 ctxC()
119 return err
120 }
121}
122
123// runInner runs the labelmaker service with one active curator Watch call. This
124// will be interrupted by the given context timing out (as provided by run) in
125// order to ensure nodes that couldn't get processed have another chance later
126// on.
127func (l *labelmaker) runInner(ctx context.Context) error {
Serge Bazanskie99638e2024-09-30 17:06:44 +0000128 var labelRegexes []*regexp.Regexp
129 info, err := l.mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
130 if err != nil {
131 return fmt.Errorf("could not get cluster info: %w", err)
132 }
Serge Bazanski78567602024-10-31 13:42:04 +0000133 if info.ClusterConfiguration != nil && info.ClusterConfiguration.Kubernetes != nil {
134 for _, rule := range info.ClusterConfiguration.Kubernetes.NodeLabelsToSynchronize {
Serge Bazanskie99638e2024-09-30 17:06:44 +0000135 if rule.Regexp != "" {
136 re, err := regexp.Compile(rule.Regexp)
137 if err != nil {
138 supervisor.Logger(ctx).Warningf("Invalid regexp rule %q for node labels: %v", rule.Regexp, err)
139 continue
140 }
141 labelRegexes = append(labelRegexes, re)
142 }
143 }
144 }
145
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000146 srv, err := l.curator.Watch(ctx, &ipb.WatchRequest{
147 Kind: &ipb.WatchRequest_NodesInCluster_{
148 NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
149 },
150 })
151 if err != nil {
152 return err
153 }
154 defer srv.CloseSend()
155
Serge Bazanskie99638e2024-09-30 17:06:44 +0000156 supervisor.Logger(ctx).Infof("Starting node watch with %d regexes", len(labelRegexes))
157
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000158 for {
159 ev, err := srv.Recv()
160 if err != nil {
161 return err
162 }
163
164 supervisor.Logger(ctx).Infof("Processing %d nodes...", len(ev.Nodes))
165
166 for _, node := range ev.Nodes {
Serge Bazanskie99638e2024-09-30 17:06:44 +0000167 if err := l.processClusterNode(ctx, node, labelRegexes); err != nil {
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000168 supervisor.Logger(ctx).Warningf("Failed to process labels on node %s: %v", node.Id, err)
169 }
170 }
171
172 if ev.Progress == ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED {
173 supervisor.Logger(ctx).Infof("Caught up with node backlog, now watching for updates.")
174 }
175 }
176}
177
178// processClusterNodes runs the label reconciliation algorithm on a single node.
179// It requests current labels from Kubernetes and issues a server-side-apply
180// operation to bring them into the requested state, if there is a diff.
Serge Bazanskie99638e2024-09-30 17:06:44 +0000181func (l *labelmaker) processClusterNode(ctx context.Context, node *ipb.Node, labelRegexes []*regexp.Regexp) error {
182 intent := getIntendedLabelsPerNode(labelRegexes, node)
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000183 state, err := getCurrentLabelsForNode(ctx, l.clientSet, node.Id)
184 if err != nil {
185 return err
186 }
187 if state == nil {
Serge Bazanskie99638e2024-09-30 17:06:44 +0000188 supervisor.Logger(ctx).V(1).Infof("Node %s does not exist in Kubernetes, skipping", node.Id)
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000189 return nil
190 }
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000191
192 if intent.Equals(state) {
193 return nil
194 }
195 supervisor.Logger(ctx).Infof("Updating labels on node %s... ", node.Id)
196 for k, v := range state {
197 if intent[k] != v {
198 supervisor.Logger(ctx).Infof(" Removing %s=%q", k, v)
199 }
200 }
201 for k, v := range intent {
202 if state[k] != v {
203 supervisor.Logger(ctx).Infof(" Adding %s=%q", k, v)
204 }
205 }
206
207 cfg := applycorev1.Node(node.Id)
208 cfg.Labels = intent
Serge Bazanskie99638e2024-09-30 17:06:44 +0000209 _, err = l.clientSet.CoreV1().Nodes().Apply(ctx, cfg, metav1.ApplyOptions{
210 FieldManager: labelmakerFieldManager,
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000211 Force: true,
212 })
213 return err
214}