blob: 8cd513f96161d2c84c58ba844f54c0aaa11b2689 [file] [log] [blame]
Serge Bazanski6d1ff362024-09-30 15:15:31 +00001package kubernetes
2
3import (
4 "context"
Serge Bazanskie99638e2024-09-30 17:06:44 +00005 "fmt"
6 "regexp"
Serge Bazanski6d1ff362024-09-30 15:15:31 +00007 "time"
8
9 kerrors "k8s.io/apimachinery/pkg/api/errors"
Serge Bazanskie99638e2024-09-30 17:06:44 +000010 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Serge Bazanski6d1ff362024-09-30 15:15:31 +000011 applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
12 "k8s.io/client-go/kubernetes"
13
14 common "source.monogon.dev/metropolis/node"
15 "source.monogon.dev/osbase/supervisor"
16
17 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Serge Bazanskie99638e2024-09-30 17:06:44 +000018 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanski6d1ff362024-09-30 15:15:31 +000019)
20
21// labelmaker is a service responsible for keeping Kubernetes node labels in sync
22// with Metropolis data. Currently, it synchronized Metropolis node roles into
23// corresponding Kubernetes roles (implemented as labels).
24//
25// The labelmaker service runs on all Kubernetes controller nodes. This is safe,
26// but it can cause spurious updates or update errors.
27//
28// TODO(q3k): leader elect a node responsible for this (and other Kubernetes
29// control loops, too).
30type labelmaker struct {
31 clientSet kubernetes.Interface
32 curator ipb.CuratorClient
Serge Bazanskie99638e2024-09-30 17:06:44 +000033 mgmt apb.ManagementClient
Serge Bazanski6d1ff362024-09-30 15:15:31 +000034}
35
Serge Bazanskie99638e2024-09-30 17:06:44 +000036const labelmakerFieldManager string = "metropolis-label"
Serge Bazanski6d1ff362024-09-30 15:15:31 +000037
38// getIntendedLabelsPerNode returns labels that a node should have according to
Serge Bazanskie99638e2024-09-30 17:06:44 +000039// Curator data and the given list of custom label regexes.
40func getIntendedLabelsPerNode(labelRegexes []*regexp.Regexp, node *ipb.Node) common.Labels {
Serge Bazanski6d1ff362024-09-30 15:15:31 +000041 labels := make(map[string]string)
Serge Bazanskie99638e2024-09-30 17:06:44 +000042 // Add role labels. Use empty string as content, following convention set by
Serge Bazanski6d1ff362024-09-30 15:15:31 +000043 // kubeadm et al.
44 if node.Roles.ConsensusMember != nil {
45 labels["node-role.kubernetes.io/ConsensusMember"] = ""
46 }
47 if node.Roles.KubernetesController != nil {
48 labels["node-role.kubernetes.io/KubernetesController"] = ""
49 }
50 if node.Roles.KubernetesWorker != nil {
51 labels["node-role.kubernetes.io/KubernetesWorker"] = ""
52 }
Serge Bazanskie99638e2024-09-30 17:06:44 +000053
54 // Add user managed labels.
55 for _, pair := range node.Labels.Pairs {
56 for _, regex := range labelRegexes {
57 if regex.MatchString(pair.Key) {
58 labels[pair.Key] = pair.Value
59 break
60 }
61 }
62 }
Serge Bazanski6d1ff362024-09-30 15:15:31 +000063 return labels
64}
65
66// getCurrentLabelsForNode returns the current labels in Kubernetes for a given
67// node.
68//
69// If the given node does not exist in Kubernetes, an empty label map is returned.
70func getCurrentLabelsForNode(ctx context.Context, clientset kubernetes.Interface, nid string) (common.Labels, error) {
Serge Bazanskie99638e2024-09-30 17:06:44 +000071 node, err := clientset.CoreV1().Nodes().Get(ctx, nid, metav1.GetOptions{})
Serge Bazanski6d1ff362024-09-30 15:15:31 +000072 if kerrors.IsNotFound(err) {
73 return nil, nil
74 }
75 if err != nil {
76 return nil, err
77 }
Serge Bazanskie99638e2024-09-30 17:06:44 +000078 nac, err := applycorev1.ExtractNode(node, labelmakerFieldManager)
79 if err != nil {
80 return nil, err
81 }
82 // If there are no managed labels in this node, the Labels field will be nil. We
83 // can't return that, as our caller would think it means that this node doesn't
84 // exist in Kubernetes.
85 labels := nac.Labels
86 if labels == nil {
87 labels = make(common.Labels)
88 }
89 return labels, nil
Serge Bazanski6d1ff362024-09-30 15:15:31 +000090}
91
92// run the labelmaker service.
93func (l *labelmaker) run(ctx context.Context) error {
94 for {
95 ctxT, ctxC := context.WithTimeout(ctx, time.Minute*10)
96 err := l.runInner(ctxT)
97 // runInner will return an error either on context expiry (which should lead to a
98 // silent restart) or any other error (either processing error or parent context
99 // timeout) which should be bubbled up.
100 if err == nil {
101 // This shouldn't happen, but let's handle it just in case.
102 ctxC()
103 continue
104 }
105 if ctxT.Err() != nil && ctx.Err() == nil {
106 // We should check with errors.Is whether the returned error is influenced by
107 // context expiry, but gRPC apparently doesn't (always?) wrap the context error
108 // cause into the returned error. Skipping this check might lead us to
109 // misidentifying other errors as context expiry if these two race, but there's
110 // not much we can do about this.
111 ctxC()
112 continue
113 }
114 // Probably some other kind of error. Return it.
115 ctxC()
116 return err
117 }
118}
119
120// runInner runs the labelmaker service with one active curator Watch call. This
121// will be interrupted by the given context timing out (as provided by run) in
122// order to ensure nodes that couldn't get processed have another chance later
123// on.
124func (l *labelmaker) runInner(ctx context.Context) error {
Serge Bazanskie99638e2024-09-30 17:06:44 +0000125 var labelRegexes []*regexp.Regexp
126 info, err := l.mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
127 if err != nil {
128 return fmt.Errorf("could not get cluster info: %w", err)
129 }
Serge Bazanski78567602024-10-31 13:42:04 +0000130 if info.ClusterConfiguration != nil && info.ClusterConfiguration.Kubernetes != nil {
131 for _, rule := range info.ClusterConfiguration.Kubernetes.NodeLabelsToSynchronize {
Serge Bazanskie99638e2024-09-30 17:06:44 +0000132 if rule.Regexp != "" {
133 re, err := regexp.Compile(rule.Regexp)
134 if err != nil {
135 supervisor.Logger(ctx).Warningf("Invalid regexp rule %q for node labels: %v", rule.Regexp, err)
136 continue
137 }
138 labelRegexes = append(labelRegexes, re)
139 }
140 }
141 }
142
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000143 srv, err := l.curator.Watch(ctx, &ipb.WatchRequest{
144 Kind: &ipb.WatchRequest_NodesInCluster_{
145 NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
146 },
147 })
148 if err != nil {
149 return err
150 }
151 defer srv.CloseSend()
152
Serge Bazanskie99638e2024-09-30 17:06:44 +0000153 supervisor.Logger(ctx).Infof("Starting node watch with %d regexes", len(labelRegexes))
154
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000155 for {
156 ev, err := srv.Recv()
157 if err != nil {
158 return err
159 }
160
161 supervisor.Logger(ctx).Infof("Processing %d nodes...", len(ev.Nodes))
162
163 for _, node := range ev.Nodes {
Serge Bazanskie99638e2024-09-30 17:06:44 +0000164 if err := l.processClusterNode(ctx, node, labelRegexes); err != nil {
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000165 supervisor.Logger(ctx).Warningf("Failed to process labels on node %s: %v", node.Id, err)
166 }
167 }
168
169 if ev.Progress == ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED {
170 supervisor.Logger(ctx).Infof("Caught up with node backlog, now watching for updates.")
171 }
172 }
173}
174
175// processClusterNodes runs the label reconciliation algorithm on a single node.
176// It requests current labels from Kubernetes and issues a server-side-apply
177// operation to bring them into the requested state, if there is a diff.
Serge Bazanskie99638e2024-09-30 17:06:44 +0000178func (l *labelmaker) processClusterNode(ctx context.Context, node *ipb.Node, labelRegexes []*regexp.Regexp) error {
179 intent := getIntendedLabelsPerNode(labelRegexes, node)
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000180 state, err := getCurrentLabelsForNode(ctx, l.clientSet, node.Id)
181 if err != nil {
182 return err
183 }
184 if state == nil {
Serge Bazanskie99638e2024-09-30 17:06:44 +0000185 supervisor.Logger(ctx).V(1).Infof("Node %s does not exist in Kubernetes, skipping", node.Id)
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000186 return nil
187 }
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000188
189 if intent.Equals(state) {
190 return nil
191 }
192 supervisor.Logger(ctx).Infof("Updating labels on node %s... ", node.Id)
193 for k, v := range state {
194 if intent[k] != v {
195 supervisor.Logger(ctx).Infof(" Removing %s=%q", k, v)
196 }
197 }
198 for k, v := range intent {
199 if state[k] != v {
200 supervisor.Logger(ctx).Infof(" Adding %s=%q", k, v)
201 }
202 }
203
204 cfg := applycorev1.Node(node.Id)
205 cfg.Labels = intent
Serge Bazanskie99638e2024-09-30 17:06:44 +0000206 _, err = l.clientSet.CoreV1().Nodes().Apply(ctx, cfg, metav1.ApplyOptions{
207 FieldManager: labelmakerFieldManager,
Serge Bazanski6d1ff362024-09-30 15:15:31 +0000208 Force: true,
209 })
210 return err
211}