blob: 914e4c2f2c6eab5cd056958602651b7262d17719 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Jan Schärd20ddcc2024-05-08 14:18:29 +02004package reconciler
5
6import (
7 "context"
8 "errors"
9 "fmt"
10 "time"
11
12 "github.com/cenkalti/backoff/v4"
13 "go.etcd.io/etcd/api/v3/mvccpb"
14 clientv3 "go.etcd.io/etcd/client/v3"
15 "go.etcd.io/etcd/client/v3/concurrency"
16 "google.golang.org/protobuf/proto"
17 "k8s.io/client-go/kubernetes"
18
19 "source.monogon.dev/metropolis/node/core/consensus/client"
20 "source.monogon.dev/metropolis/node/core/curator"
21 ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
Jan Schärb86917b2025-05-14 16:31:08 +000022 "source.monogon.dev/metropolis/node/core/productinfo"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020023 "source.monogon.dev/osbase/event/etcd"
24 "source.monogon.dev/osbase/event/memory"
25 "source.monogon.dev/osbase/supervisor"
Jan Schärd20ddcc2024-05-08 14:18:29 +020026 "source.monogon.dev/version"
27 vpb "source.monogon.dev/version/spec"
28)
29
30// This file contains the reconciler Service, whose purpose is to run
31// reconcileAll in a controlled way (leader elected and only when other nodes
32// are compatible) and to set the reconciler status in etcd.
33// The file also contains WaitReady, which watches the status and returns
34// when the apiserver can start serving.
35// These two form the public interface of the reconciler.
36
37const (
38 // statusKey is the key in the curator etcd namespace
39 // under which the reconciler status is stored.
40 // At some point, we do a transaction involving both this key and
41 // the nodes prefix, so both must be in the same namespace.
42 statusKey = "/kubernetes/reconciler/status"
43 // electionPrefix is the etcd prefix where
44 // a node is elected to run the reconciler.
45 electionPrefix = "/kubernetes/reconciler/leader"
46)
47
48var (
49 // minReconcilerRelease is the minimum Metropolis release which
50 // the node last performing reconciliation must have
51 // for the local node to be able to start serving.
52 // Set this to the next release when making changes to reconciled state
53 // which must be applied before starting to serve.
54 minReconcilerRelease = &vpb.Version_Release{Major: 0, Minor: 1, Patch: 0}
55 // minApiserverRelease is the minimum Metropolis release which all Kubernetes
56 // controller nodes must have before the local node can reconcile.
57 // This will be written to minimum_compatible_release in the reconciler status,
58 // and thus block any reappearing apiservers with a lower release from serving,
59 // until a reconciler of a lower release has run.
60 // Increase this when making changes to reconciled state which are
61 // incompatible with apiservers serving at the current minApiserverRelease.
62 minApiserverRelease = &vpb.Version_Release{Major: 0, Minor: 1, Patch: 0}
63)
64
65// reconcileWait is the wait time between getting elected and
66// starting to reconcile.
67// It is a variable to allow changing it from tests.
68var reconcileWait = 5 * time.Second
69
70// WaitReady watches the reconciler status and returns once initial
71// reconciliation is done and the reconciled state is compatible.
Jan Schärb86917b2025-05-14 16:31:08 +000072func (s *Service) WaitReady(ctx context.Context) error {
73 value := etcd.NewValue(s.Etcd, statusKey, func(_, data []byte) (*ppb.KubernetesReconcilerStatus, error) {
Jan Schärd20ddcc2024-05-08 14:18:29 +020074 status := &ppb.KubernetesReconcilerStatus{}
75 if err := proto.Unmarshal(data, status); err != nil {
76 return nil, fmt.Errorf("could not unmarshal: %w", err)
77 }
78 return status, nil
79 })
80
81 w := value.Watch()
82 defer w.Close()
83
84 for {
85 status, err := w.Get(ctx)
86 if err != nil {
87 return err
88 }
89
90 state := "unknown"
91 switch status.State {
92 case ppb.KubernetesReconcilerStatus_STATE_DONE:
93 state = "done"
94 case ppb.KubernetesReconcilerStatus_STATE_WORKING:
95 state = "working"
96 }
97 supervisor.Logger(ctx).Infof("Reconciler status: %s, version: %s, minimum compatible release: %s. Local node version: %s, minimum reconciler release: %s.",
98 state,
99 version.Semver(status.Version),
100 version.Release(status.MinimumCompatibleRelease),
Jan Schärb86917b2025-05-14 16:31:08 +0000101 version.Semver(productinfo.Get().Version),
Jan Schärd20ddcc2024-05-08 14:18:29 +0200102 version.Release(minReconcilerRelease),
103 )
104
Jan Schärb86917b2025-05-14 16:31:08 +0000105 if version.ReleaseLessThan(productinfo.Get().Version.Release, status.MinimumCompatibleRelease) {
Jan Schärd20ddcc2024-05-08 14:18:29 +0200106 supervisor.Logger(ctx).Info("Not ready, because the local node release is below the reconciler minimum compatible release. Waiting for status change.")
107 continue
108 }
109
110 if version.ReleaseLessThan(status.Version.Release, minReconcilerRelease) {
111 supervisor.Logger(ctx).Info("Not ready, because the reconciler release is below the local required minimum. Waiting for status change.")
112 continue
113 }
114
115 // Startup is intentionally not blocked by state=working.
116 // As long as a node is compatible with both the before and after state,
117 // it can continue running, and startup should also be allowed.
118 // This way, disruption is minimized in case reconciliation fails
119 // to complete and the status stays in working state.
120 // For the initial reconcile, a status is only created after it is complete.
121
122 return nil
123 }
124}
125
126// Service is the reconciler service.
127type Service struct {
128 // Etcd is an etcd client for the curator namespace.
129 Etcd client.Namespaced
130 // ClientSet is what the reconciler uses to interact with the apiserver.
131 ClientSet kubernetes.Interface
132 // NodeID is the ID of the local node.
133 NodeID string
134 // releases is set by watchNodes and watched by other parts of the service.
135 releases memory.Value[*nodeReleases]
136}
137
138// nodeReleases contains a summary of the releases of all
139// Kubernetes controller nodes currently in the cluster.
140type nodeReleases struct {
141 minRelease *vpb.Version_Release
142 maxRelease *vpb.Version_Release
143 // revision is the etcd revision at which this info is valid.
144 revision int64
145}
146
147// The reconciler service has a tree of runnables:
148//
149// - watch-nodes: Watches nodes in etcd and sets releases.
150// - watch-releases: Watches releases and runs elect while the local node is
151// the latest release.
152// - elect: Performs etcd leader election and starts lead once elected.
153// - lead: Checks current status, watches releases until incompatible
154// nodes disappear, updates status, runs reconcileAll.
155
156// Run is the root runnable of the reconciler service.
157func (s *Service) Run(ctx context.Context) error {
158 err := supervisor.Run(ctx, "watch-nodes", s.watchNodes)
159 if err != nil {
160 return fmt.Errorf("could not run watch-nodes: %w", err)
161 }
162
163 err = supervisor.Run(ctx, "watch-releases", s.watchReleases)
164 if err != nil {
165 return fmt.Errorf("could not run watch-releases: %w", err)
166 }
167
168 supervisor.Signal(ctx, supervisor.SignalHealthy)
169 supervisor.Signal(ctx, supervisor.SignalDone)
170 return nil
171}
172
173// watchNodes watches nodes in etcd, and publishes a summary of
174// releases of Kubernetes controller nodes in s.releases.
175func (s *Service) watchNodes(ctx context.Context) error {
176 nodesStart, nodesEnd := curator.NodeEtcdPrefix.KeyRange()
177
178 var revision int64
179 nodeToRelease := make(map[string]string)
180 releaseCount := make(map[string]int)
181 releaseStruct := make(map[string]*vpb.Version_Release)
182
183 updateNode := func(kv *mvccpb.KeyValue) {
184 nodeKey := string(kv.Key)
185 // Subtract the previous release of this node if any.
186 if prevRelease, ok := nodeToRelease[nodeKey]; ok {
187 delete(nodeToRelease, nodeKey)
188 releaseCount[prevRelease] -= 1
189 if releaseCount[prevRelease] == 0 {
190 delete(releaseCount, prevRelease)
191 delete(releaseStruct, prevRelease)
192 }
193 }
194
195 // Parse the node release. Skip if the node was deleted, is not a
196 // Kubernetes controller, or does not have a release.
197 if len(kv.Value) == 0 {
198 return
199 }
Tim Windelschmidt3b5a9172024-05-23 13:33:52 +0200200 var node ppb.Node
Jan Schärd20ddcc2024-05-08 14:18:29 +0200201 if err := proto.Unmarshal(kv.Value, &node); err != nil {
202 supervisor.Logger(ctx).Errorf("Failed to unmarshal node %q: %w", nodeKey, err)
203 return
204 }
205 if node.Roles.KubernetesController == nil {
206 return
207 }
208 if node.Status == nil || node.Status.Version == nil {
209 return
210 }
211 release := version.Release(node.Status.Version.Release)
212 // Add the new release.
213 nodeToRelease[nodeKey] = release
214 if releaseCount[release] == 0 {
215 releaseStruct[release] = node.Status.Version.Release
216 }
217 releaseCount[release] += 1
218 }
219
220 publish := func() {
Jan Schärb86917b2025-05-14 16:31:08 +0000221 minRelease := productinfo.Get().Version.Release
222 maxRelease := productinfo.Get().Version.Release
Jan Schärd20ddcc2024-05-08 14:18:29 +0200223 for _, release := range releaseStruct {
224 if version.ReleaseLessThan(release, minRelease) {
225 minRelease = release
226 }
227 if version.ReleaseLessThan(maxRelease, release) {
228 maxRelease = release
229 }
230 }
231 s.releases.Set(&nodeReleases{
232 minRelease: minRelease,
233 maxRelease: maxRelease,
234 revision: revision,
235 })
236 }
237
238 // Get the initial nodes data.
239 get, err := s.Etcd.Get(ctx, nodesStart, clientv3.WithRange(nodesEnd))
240 if err != nil {
241 return fmt.Errorf("when retrieving initial nodes: %w", err)
242 }
243
244 for _, kv := range get.Kvs {
245 updateNode(kv)
246 }
247 revision = get.Header.Revision
248 publish()
249
250 supervisor.Signal(ctx, supervisor.SignalHealthy)
251
252 // Watch for changes.
253 wch := s.Etcd.Watch(ctx, nodesStart, clientv3.WithRange(nodesEnd), clientv3.WithRev(revision+1))
254 for resp := range wch {
255 if err := resp.Err(); err != nil {
256 return fmt.Errorf("watch failed: %w", err)
257 }
258 for _, ev := range resp.Events {
259 updateNode(ev.Kv)
260 }
261 revision = resp.Header.Revision
262 publish()
263 }
264 return fmt.Errorf("channel closed: %w", ctx.Err())
265}
266
267// watchReleases watches s.releases, and runs elect for as long as
268// the local node has the latest release.
269func (s *Service) watchReleases(ctx context.Context) error {
270 w := s.releases.Watch()
271 defer w.Close()
272
273 r, err := w.Get(ctx)
274 if err != nil {
275 return err
276 }
277
Jan Schärb86917b2025-05-14 16:31:08 +0000278 shouldRun := !version.ReleaseLessThan(productinfo.Get().Version.Release, r.maxRelease)
Jan Schärd20ddcc2024-05-08 14:18:29 +0200279 if shouldRun {
280 supervisor.Logger(ctx).Info("This Kubernetes controller node has the latest release, starting election.")
281 err := supervisor.Run(ctx, "elect", s.elect)
282 if err != nil {
283 return fmt.Errorf("could not run elect: %w", err)
284 }
285 } else {
286 supervisor.Logger(ctx).Infof("This Kubernetes controller node does not have the latest release, not starting election. Latest release: %s", version.Release(r.maxRelease))
287 }
288
289 supervisor.Signal(ctx, supervisor.SignalHealthy)
290
291 for {
292 r, err := w.Get(ctx)
293 if err != nil {
294 return err
295 }
Jan Schärb86917b2025-05-14 16:31:08 +0000296 shouldRunNow := !version.ReleaseLessThan(productinfo.Get().Version.Release, r.maxRelease)
Jan Schärd20ddcc2024-05-08 14:18:29 +0200297 if shouldRunNow != shouldRun {
298 return errors.New("latest release changed, restarting")
299 }
300 }
301}
302
303func (s *Service) elect(ctx context.Context) error {
304 session, err := concurrency.NewSession(s.Etcd.ThinClient(ctx))
305 if err != nil {
306 return fmt.Errorf("creating session failed: %w", err)
307 }
308
309 defer func() {
310 session.Orphan()
311 // ctx may be canceled, but we still try to revoke with a short timeout.
312 revokeCtx, cancel := context.WithTimeout(context.Background(), time.Second)
313 _, err := s.Etcd.Revoke(revokeCtx, session.Lease())
314 cancel()
315 if err != nil {
316 supervisor.Logger(ctx).Warningf("Failed to revoke lease: %v", err)
317 }
318 }()
319
320 supervisor.Signal(ctx, supervisor.SignalHealthy)
321
322 supervisor.Logger(ctx).Infof("Campaigning. Lease ID: %x", session.Lease())
323 election := concurrency.NewElection(session, electionPrefix)
324
325 // The election value is unused; we put the node ID there for manual inspection.
326 err = election.Campaign(ctx, s.NodeID)
327 if err != nil {
328 return fmt.Errorf("campaigning failed: %w", err)
329 }
330 supervisor.Logger(ctx).Info("Elected.")
331
332 leadCtx, leadCancel := context.WithCancel(ctx)
333 go func() {
334 <-session.Done()
335 leadCancel()
336 }()
337
338 isLeaderCmp := clientv3.Compare(clientv3.CreateRevision(election.Key()), "=", election.Rev())
339 return s.lead(leadCtx, isLeaderCmp)
340}
341
342func (s *Service) lead(ctx context.Context, isLeaderCmp clientv3.Cmp) error {
343 log := supervisor.Logger(ctx)
344
345 // Retrieve the initial status.
346 status := &ppb.KubernetesReconcilerStatus{}
347 statusGet, err := s.Etcd.Get(ctx, statusKey)
348 if err != nil {
349 return fmt.Errorf("when getting status: %w", err)
350 }
351 if len(statusGet.Kvs) == 1 {
352 err := proto.Unmarshal(statusGet.Kvs[0].Value, status)
353 if err != nil {
354 log.Warningf("Could not unmarshal status: %v", err)
355 status = nil
356 }
357 } else {
358 status = nil
359 }
360
361 doneStatus := &ppb.KubernetesReconcilerStatus{
362 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
Jan Schärb86917b2025-05-14 16:31:08 +0000363 Version: productinfo.Get().Version,
Jan Schärd20ddcc2024-05-08 14:18:29 +0200364 MinimumCompatibleRelease: minApiserverRelease,
365 }
366 doneStatusBytes, err := proto.Marshal(doneStatus)
367 if err != nil {
368 return fmt.Errorf("could not marshal status: %w", err)
369 }
370
371 if status == nil {
372 // The status does not exist yet. Reconcile, then create the status.
373 log.Info("Status does not exist yet.")
374 } else if proto.Equal(status, doneStatus) {
375 // The status is already what we would set, so leave it as is.
376 log.Info("Status is already up to date.")
Jan Schärb86917b2025-05-14 16:31:08 +0000377 } else if !version.ReleaseLessThan(productinfo.Get().Version.Release, status.Version.Release) &&
Jan Schärd20ddcc2024-05-08 14:18:29 +0200378 !version.ReleaseLessThan(status.MinimumCompatibleRelease, minApiserverRelease) {
379 // The status does not allow apiservers to start serving which would be
380 // incompatible after we reconcile. So just set the state to working.
381 log.Info("Status is compatible, setting state to working.")
382 if status.State != ppb.KubernetesReconcilerStatus_STATE_WORKING {
383 status.State = ppb.KubernetesReconcilerStatus_STATE_WORKING
384
385 workingStatusBytes, err := proto.Marshal(status)
386 if err != nil {
387 return fmt.Errorf("could not marshal status: %w", err)
388 }
389 resp, err := s.Etcd.Txn(ctx).If(isLeaderCmp).Then(
390 clientv3.OpPut(statusKey, string(workingStatusBytes)),
391 ).Commit()
392 if err != nil {
393 return fmt.Errorf("failed to update status: %w", err)
394 }
395 if !resp.Succeeded {
396 return errors.New("lost leadership, could not update status")
397 }
398 }
399 } else {
400 // The status allows apiservers to start which would be incompatible after
401 // we reconcile. We need to wait for any such nodes to disappear, then set
402 // the status to disallow these nodes from starting before reconciling.
403 // While reconciliation is ongoing, we are in an intermediate state
404 // between the previous and the new reconciled state, and we only want
405 // to allow nodes that are compatible with both. So we use the minimum of
406 // the two versions and the maximum of the two MinimumCompatibleReleases,
407 // which results in allowing the intersection of the two statuses.
408 log.Info("Status allows incompatible releases, need to restrict.")
409
410 status.State = ppb.KubernetesReconcilerStatus_STATE_WORKING
Jan Schärb86917b2025-05-14 16:31:08 +0000411 if !version.ReleaseLessThan(status.Version.Release, productinfo.Get().Version.Release) {
412 status.Version = productinfo.Get().Version
Jan Schärd20ddcc2024-05-08 14:18:29 +0200413 }
414 if version.ReleaseLessThan(status.MinimumCompatibleRelease, minApiserverRelease) {
415 status.MinimumCompatibleRelease = minApiserverRelease
416 }
417 restrictedStatusBytes, err := proto.Marshal(status)
418 if err != nil {
419 return fmt.Errorf("could not marshal status: %w", err)
420 }
421
422 releasesW := s.releases.Watch()
423 defer releasesW.Close()
424
425 lastLogRelease := ""
426 for {
427 releases, err := releasesW.Get(ctx)
428 if err != nil {
429 return err
430 }
Jan Schärb86917b2025-05-14 16:31:08 +0000431 if version.ReleaseLessThan(productinfo.Get().Version.Release, releases.maxRelease) {
Jan Schärd20ddcc2024-05-08 14:18:29 +0200432 // We will likely get canceled soon by watchReleases restarting, unless
433 // this is a very short transient that is not noticed by watchReleases.
434 continue
435 }
436 if version.ReleaseLessThan(releases.minRelease, minApiserverRelease) {
437 rel := version.Release(releases.minRelease)
438 if rel != lastLogRelease {
439 lastLogRelease = rel
440 log.Infof("There are incompatible nodes, waiting for node changes. Minimum node release: %s Need at least: %s", rel, version.Release(minApiserverRelease))
441 }
442 continue
443 }
444
445 nodesStart, nodesEnd := curator.NodeEtcdPrefix.KeyRange()
446 resp, err := s.Etcd.Txn(ctx).If(
447 isLeaderCmp,
448 clientv3.Compare(clientv3.ModRevision(nodesStart).WithRange(nodesEnd), "<", releases.revision+1),
449 ).Then(
450 clientv3.OpPut(statusKey, string(restrictedStatusBytes)),
451 ).Commit()
452 if err != nil {
453 return fmt.Errorf("failed to update status: %w", err)
454 }
455 if !resp.Succeeded {
456 // This could happen either if we lost leadership, or any node was
457 // modified since we got the releases. If a node was modified, this
458 // should be seen soon by the nodes watcher. If we lost leadership,
459 // we will get canceled soon, and it's fine to go back to watching.
460 log.Info("Transaction failed, retrying.")
461 continue
462 }
463 break
464 }
465 }
466
467 if status != nil {
468 // A status exists, which means a reconciler has been running before.
469 // Wait a bit for any still outstanding Kubernetes API requests by the
470 // previous reconciler to be processed.
471 // The Kubernetes API does not support making requests conditional on an
472 // etcd lease, so requests can still be processed after leadership expired.
473 // This is best effort, since requests could take arbitrarily long to be
474 // processed. The periodic reconcile below ensures that we eventually
475 // reach the desired state and stay there.
476 select {
477 case <-time.After(reconcileWait):
478 case <-ctx.Done():
479 return ctx.Err()
480 }
481 }
482
483 log.Info("Performing initial resource reconciliation...")
484 // If the apiserver was just started, reconciliation will fail until the
485 // apiserver is ready. To keep the logs clean, retry with exponential
486 // backoff and only start logging errors after some time has passed.
487 startLogging := time.Now().Add(2 * time.Second)
488 bo := backoff.NewExponentialBackOff()
489 bo.InitialInterval = 100 * time.Millisecond
490 bo.MaxElapsedTime = 0
491 err = backoff.Retry(func() error {
492 err := reconcileAll(ctx, s.ClientSet)
493 if err != nil && time.Now().After(startLogging) {
494 log.Errorf("Still couldn't do initial reconciliation: %v", err)
495 startLogging = time.Now().Add(10 * time.Second)
496 }
497 return err
498 }, backoff.WithContext(bo, ctx))
499 if err != nil {
500 return err
501 }
502 log.Infof("Initial resource reconciliation succeeded.")
503
504 // Update status.
505 if !proto.Equal(status, doneStatus) {
506 resp, err := s.Etcd.Txn(ctx).If(isLeaderCmp).Then(
507 clientv3.OpPut(statusKey, string(doneStatusBytes)),
508 ).Commit()
509 if err != nil {
510 return fmt.Errorf("failed to update status: %w", err)
511 }
512 if !resp.Succeeded {
513 return errors.New("lost leadership, could not update status")
514 }
515 }
516
517 // Reconcile at a regular interval.
518 t := time.NewTicker(30 * time.Second)
519 defer t.Stop()
520 for {
521 select {
522 case <-t.C:
523 err := reconcileAll(ctx, s.ClientSet)
524 if err != nil {
525 log.Warning(err)
526 }
527 case <-ctx.Done():
528 return ctx.Err()
529 }
530 }
531}