blob: f3ad06b91fa28998e1486848313a9f4efbbe721e [file] [log] [blame]
Jan Schärd20ddcc2024-05-08 14:18:29 +02001package reconciler
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/cenkalti/backoff/v4"
10 "go.etcd.io/etcd/api/v3/mvccpb"
11 clientv3 "go.etcd.io/etcd/client/v3"
12 "go.etcd.io/etcd/client/v3/concurrency"
13 "google.golang.org/protobuf/proto"
14 "k8s.io/client-go/kubernetes"
15
16 "source.monogon.dev/metropolis/node/core/consensus/client"
17 "source.monogon.dev/metropolis/node/core/curator"
18 ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
19 "source.monogon.dev/metropolis/pkg/event/etcd"
20 "source.monogon.dev/metropolis/pkg/event/memory"
21 "source.monogon.dev/metropolis/pkg/supervisor"
22 mversion "source.monogon.dev/metropolis/version"
23 "source.monogon.dev/version"
24 vpb "source.monogon.dev/version/spec"
25)
26
27// This file contains the reconciler Service, whose purpose is to run
28// reconcileAll in a controlled way (leader elected and only when other nodes
29// are compatible) and to set the reconciler status in etcd.
30// The file also contains WaitReady, which watches the status and returns
31// when the apiserver can start serving.
32// These two form the public interface of the reconciler.
33
34const (
35 // statusKey is the key in the curator etcd namespace
36 // under which the reconciler status is stored.
37 // At some point, we do a transaction involving both this key and
38 // the nodes prefix, so both must be in the same namespace.
39 statusKey = "/kubernetes/reconciler/status"
40 // electionPrefix is the etcd prefix where
41 // a node is elected to run the reconciler.
42 electionPrefix = "/kubernetes/reconciler/leader"
43)
44
45var (
46 // minReconcilerRelease is the minimum Metropolis release which
47 // the node last performing reconciliation must have
48 // for the local node to be able to start serving.
49 // Set this to the next release when making changes to reconciled state
50 // which must be applied before starting to serve.
51 minReconcilerRelease = &vpb.Version_Release{Major: 0, Minor: 1, Patch: 0}
52 // minApiserverRelease is the minimum Metropolis release which all Kubernetes
53 // controller nodes must have before the local node can reconcile.
54 // This will be written to minimum_compatible_release in the reconciler status,
55 // and thus block any reappearing apiservers with a lower release from serving,
56 // until a reconciler of a lower release has run.
57 // Increase this when making changes to reconciled state which are
58 // incompatible with apiservers serving at the current minApiserverRelease.
59 minApiserverRelease = &vpb.Version_Release{Major: 0, Minor: 1, Patch: 0}
60)
61
62// reconcileWait is the wait time between getting elected and
63// starting to reconcile.
64// It is a variable to allow changing it from tests.
65var reconcileWait = 5 * time.Second
66
67// WaitReady watches the reconciler status and returns once initial
68// reconciliation is done and the reconciled state is compatible.
69func WaitReady(ctx context.Context, etcdClient client.Namespaced) error {
70 value := etcd.NewValue(etcdClient, statusKey, func(_, data []byte) (*ppb.KubernetesReconcilerStatus, error) {
71 status := &ppb.KubernetesReconcilerStatus{}
72 if err := proto.Unmarshal(data, status); err != nil {
73 return nil, fmt.Errorf("could not unmarshal: %w", err)
74 }
75 return status, nil
76 })
77
78 w := value.Watch()
79 defer w.Close()
80
81 for {
82 status, err := w.Get(ctx)
83 if err != nil {
84 return err
85 }
86
87 state := "unknown"
88 switch status.State {
89 case ppb.KubernetesReconcilerStatus_STATE_DONE:
90 state = "done"
91 case ppb.KubernetesReconcilerStatus_STATE_WORKING:
92 state = "working"
93 }
94 supervisor.Logger(ctx).Infof("Reconciler status: %s, version: %s, minimum compatible release: %s. Local node version: %s, minimum reconciler release: %s.",
95 state,
96 version.Semver(status.Version),
97 version.Release(status.MinimumCompatibleRelease),
98 version.Semver(mversion.Version),
99 version.Release(minReconcilerRelease),
100 )
101
102 if version.ReleaseLessThan(mversion.Version.Release, status.MinimumCompatibleRelease) {
103 supervisor.Logger(ctx).Info("Not ready, because the local node release is below the reconciler minimum compatible release. Waiting for status change.")
104 continue
105 }
106
107 if version.ReleaseLessThan(status.Version.Release, minReconcilerRelease) {
108 supervisor.Logger(ctx).Info("Not ready, because the reconciler release is below the local required minimum. Waiting for status change.")
109 continue
110 }
111
112 // Startup is intentionally not blocked by state=working.
113 // As long as a node is compatible with both the before and after state,
114 // it can continue running, and startup should also be allowed.
115 // This way, disruption is minimized in case reconciliation fails
116 // to complete and the status stays in working state.
117 // For the initial reconcile, a status is only created after it is complete.
118
119 return nil
120 }
121}
122
123// Service is the reconciler service.
124type Service struct {
125 // Etcd is an etcd client for the curator namespace.
126 Etcd client.Namespaced
127 // ClientSet is what the reconciler uses to interact with the apiserver.
128 ClientSet kubernetes.Interface
129 // NodeID is the ID of the local node.
130 NodeID string
131 // releases is set by watchNodes and watched by other parts of the service.
132 releases memory.Value[*nodeReleases]
133}
134
135// nodeReleases contains a summary of the releases of all
136// Kubernetes controller nodes currently in the cluster.
137type nodeReleases struct {
138 minRelease *vpb.Version_Release
139 maxRelease *vpb.Version_Release
140 // revision is the etcd revision at which this info is valid.
141 revision int64
142}
143
144// The reconciler service has a tree of runnables:
145//
146// - watch-nodes: Watches nodes in etcd and sets releases.
147// - watch-releases: Watches releases and runs elect while the local node is
148// the latest release.
149// - elect: Performs etcd leader election and starts lead once elected.
150// - lead: Checks current status, watches releases until incompatible
151// nodes disappear, updates status, runs reconcileAll.
152
153// Run is the root runnable of the reconciler service.
154func (s *Service) Run(ctx context.Context) error {
155 err := supervisor.Run(ctx, "watch-nodes", s.watchNodes)
156 if err != nil {
157 return fmt.Errorf("could not run watch-nodes: %w", err)
158 }
159
160 err = supervisor.Run(ctx, "watch-releases", s.watchReleases)
161 if err != nil {
162 return fmt.Errorf("could not run watch-releases: %w", err)
163 }
164
165 supervisor.Signal(ctx, supervisor.SignalHealthy)
166 supervisor.Signal(ctx, supervisor.SignalDone)
167 return nil
168}
169
170// watchNodes watches nodes in etcd, and publishes a summary of
171// releases of Kubernetes controller nodes in s.releases.
172func (s *Service) watchNodes(ctx context.Context) error {
173 nodesStart, nodesEnd := curator.NodeEtcdPrefix.KeyRange()
174
175 var revision int64
176 nodeToRelease := make(map[string]string)
177 releaseCount := make(map[string]int)
178 releaseStruct := make(map[string]*vpb.Version_Release)
179
180 updateNode := func(kv *mvccpb.KeyValue) {
181 nodeKey := string(kv.Key)
182 // Subtract the previous release of this node if any.
183 if prevRelease, ok := nodeToRelease[nodeKey]; ok {
184 delete(nodeToRelease, nodeKey)
185 releaseCount[prevRelease] -= 1
186 if releaseCount[prevRelease] == 0 {
187 delete(releaseCount, prevRelease)
188 delete(releaseStruct, prevRelease)
189 }
190 }
191
192 // Parse the node release. Skip if the node was deleted, is not a
193 // Kubernetes controller, or does not have a release.
194 if len(kv.Value) == 0 {
195 return
196 }
197 node := ppb.Node{}
198 if err := proto.Unmarshal(kv.Value, &node); err != nil {
199 supervisor.Logger(ctx).Errorf("Failed to unmarshal node %q: %w", nodeKey, err)
200 return
201 }
202 if node.Roles.KubernetesController == nil {
203 return
204 }
205 if node.Status == nil || node.Status.Version == nil {
206 return
207 }
208 release := version.Release(node.Status.Version.Release)
209 // Add the new release.
210 nodeToRelease[nodeKey] = release
211 if releaseCount[release] == 0 {
212 releaseStruct[release] = node.Status.Version.Release
213 }
214 releaseCount[release] += 1
215 }
216
217 publish := func() {
218 minRelease := mversion.Version.Release
219 maxRelease := mversion.Version.Release
220 for _, release := range releaseStruct {
221 if version.ReleaseLessThan(release, minRelease) {
222 minRelease = release
223 }
224 if version.ReleaseLessThan(maxRelease, release) {
225 maxRelease = release
226 }
227 }
228 s.releases.Set(&nodeReleases{
229 minRelease: minRelease,
230 maxRelease: maxRelease,
231 revision: revision,
232 })
233 }
234
235 // Get the initial nodes data.
236 get, err := s.Etcd.Get(ctx, nodesStart, clientv3.WithRange(nodesEnd))
237 if err != nil {
238 return fmt.Errorf("when retrieving initial nodes: %w", err)
239 }
240
241 for _, kv := range get.Kvs {
242 updateNode(kv)
243 }
244 revision = get.Header.Revision
245 publish()
246
247 supervisor.Signal(ctx, supervisor.SignalHealthy)
248
249 // Watch for changes.
250 wch := s.Etcd.Watch(ctx, nodesStart, clientv3.WithRange(nodesEnd), clientv3.WithRev(revision+1))
251 for resp := range wch {
252 if err := resp.Err(); err != nil {
253 return fmt.Errorf("watch failed: %w", err)
254 }
255 for _, ev := range resp.Events {
256 updateNode(ev.Kv)
257 }
258 revision = resp.Header.Revision
259 publish()
260 }
261 return fmt.Errorf("channel closed: %w", ctx.Err())
262}
263
264// watchReleases watches s.releases, and runs elect for as long as
265// the local node has the latest release.
266func (s *Service) watchReleases(ctx context.Context) error {
267 w := s.releases.Watch()
268 defer w.Close()
269
270 r, err := w.Get(ctx)
271 if err != nil {
272 return err
273 }
274
275 shouldRun := !version.ReleaseLessThan(mversion.Version.Release, r.maxRelease)
276 if shouldRun {
277 supervisor.Logger(ctx).Info("This Kubernetes controller node has the latest release, starting election.")
278 err := supervisor.Run(ctx, "elect", s.elect)
279 if err != nil {
280 return fmt.Errorf("could not run elect: %w", err)
281 }
282 } else {
283 supervisor.Logger(ctx).Infof("This Kubernetes controller node does not have the latest release, not starting election. Latest release: %s", version.Release(r.maxRelease))
284 }
285
286 supervisor.Signal(ctx, supervisor.SignalHealthy)
287
288 for {
289 r, err := w.Get(ctx)
290 if err != nil {
291 return err
292 }
293 shouldRunNow := !version.ReleaseLessThan(mversion.Version.Release, r.maxRelease)
294 if shouldRunNow != shouldRun {
295 return errors.New("latest release changed, restarting")
296 }
297 }
298}
299
300func (s *Service) elect(ctx context.Context) error {
301 session, err := concurrency.NewSession(s.Etcd.ThinClient(ctx))
302 if err != nil {
303 return fmt.Errorf("creating session failed: %w", err)
304 }
305
306 defer func() {
307 session.Orphan()
308 // ctx may be canceled, but we still try to revoke with a short timeout.
309 revokeCtx, cancel := context.WithTimeout(context.Background(), time.Second)
310 _, err := s.Etcd.Revoke(revokeCtx, session.Lease())
311 cancel()
312 if err != nil {
313 supervisor.Logger(ctx).Warningf("Failed to revoke lease: %v", err)
314 }
315 }()
316
317 supervisor.Signal(ctx, supervisor.SignalHealthy)
318
319 supervisor.Logger(ctx).Infof("Campaigning. Lease ID: %x", session.Lease())
320 election := concurrency.NewElection(session, electionPrefix)
321
322 // The election value is unused; we put the node ID there for manual inspection.
323 err = election.Campaign(ctx, s.NodeID)
324 if err != nil {
325 return fmt.Errorf("campaigning failed: %w", err)
326 }
327 supervisor.Logger(ctx).Info("Elected.")
328
329 leadCtx, leadCancel := context.WithCancel(ctx)
330 go func() {
331 <-session.Done()
332 leadCancel()
333 }()
334
335 isLeaderCmp := clientv3.Compare(clientv3.CreateRevision(election.Key()), "=", election.Rev())
336 return s.lead(leadCtx, isLeaderCmp)
337}
338
339func (s *Service) lead(ctx context.Context, isLeaderCmp clientv3.Cmp) error {
340 log := supervisor.Logger(ctx)
341
342 // Retrieve the initial status.
343 status := &ppb.KubernetesReconcilerStatus{}
344 statusGet, err := s.Etcd.Get(ctx, statusKey)
345 if err != nil {
346 return fmt.Errorf("when getting status: %w", err)
347 }
348 if len(statusGet.Kvs) == 1 {
349 err := proto.Unmarshal(statusGet.Kvs[0].Value, status)
350 if err != nil {
351 log.Warningf("Could not unmarshal status: %v", err)
352 status = nil
353 }
354 } else {
355 status = nil
356 }
357
358 doneStatus := &ppb.KubernetesReconcilerStatus{
359 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
360 Version: mversion.Version,
361 MinimumCompatibleRelease: minApiserverRelease,
362 }
363 doneStatusBytes, err := proto.Marshal(doneStatus)
364 if err != nil {
365 return fmt.Errorf("could not marshal status: %w", err)
366 }
367
368 if status == nil {
369 // The status does not exist yet. Reconcile, then create the status.
370 log.Info("Status does not exist yet.")
371 } else if proto.Equal(status, doneStatus) {
372 // The status is already what we would set, so leave it as is.
373 log.Info("Status is already up to date.")
374 } else if !version.ReleaseLessThan(mversion.Version.Release, status.Version.Release) &&
375 !version.ReleaseLessThan(status.MinimumCompatibleRelease, minApiserverRelease) {
376 // The status does not allow apiservers to start serving which would be
377 // incompatible after we reconcile. So just set the state to working.
378 log.Info("Status is compatible, setting state to working.")
379 if status.State != ppb.KubernetesReconcilerStatus_STATE_WORKING {
380 status.State = ppb.KubernetesReconcilerStatus_STATE_WORKING
381
382 workingStatusBytes, err := proto.Marshal(status)
383 if err != nil {
384 return fmt.Errorf("could not marshal status: %w", err)
385 }
386 resp, err := s.Etcd.Txn(ctx).If(isLeaderCmp).Then(
387 clientv3.OpPut(statusKey, string(workingStatusBytes)),
388 ).Commit()
389 if err != nil {
390 return fmt.Errorf("failed to update status: %w", err)
391 }
392 if !resp.Succeeded {
393 return errors.New("lost leadership, could not update status")
394 }
395 }
396 } else {
397 // The status allows apiservers to start which would be incompatible after
398 // we reconcile. We need to wait for any such nodes to disappear, then set
399 // the status to disallow these nodes from starting before reconciling.
400 // While reconciliation is ongoing, we are in an intermediate state
401 // between the previous and the new reconciled state, and we only want
402 // to allow nodes that are compatible with both. So we use the minimum of
403 // the two versions and the maximum of the two MinimumCompatibleReleases,
404 // which results in allowing the intersection of the two statuses.
405 log.Info("Status allows incompatible releases, need to restrict.")
406
407 status.State = ppb.KubernetesReconcilerStatus_STATE_WORKING
408 if !version.ReleaseLessThan(status.Version.Release, mversion.Version.Release) {
409 status.Version = mversion.Version
410 }
411 if version.ReleaseLessThan(status.MinimumCompatibleRelease, minApiserverRelease) {
412 status.MinimumCompatibleRelease = minApiserverRelease
413 }
414 restrictedStatusBytes, err := proto.Marshal(status)
415 if err != nil {
416 return fmt.Errorf("could not marshal status: %w", err)
417 }
418
419 releasesW := s.releases.Watch()
420 defer releasesW.Close()
421
422 lastLogRelease := ""
423 for {
424 releases, err := releasesW.Get(ctx)
425 if err != nil {
426 return err
427 }
428 if version.ReleaseLessThan(mversion.Version.Release, releases.maxRelease) {
429 // We will likely get canceled soon by watchReleases restarting, unless
430 // this is a very short transient that is not noticed by watchReleases.
431 continue
432 }
433 if version.ReleaseLessThan(releases.minRelease, minApiserverRelease) {
434 rel := version.Release(releases.minRelease)
435 if rel != lastLogRelease {
436 lastLogRelease = rel
437 log.Infof("There are incompatible nodes, waiting for node changes. Minimum node release: %s Need at least: %s", rel, version.Release(minApiserverRelease))
438 }
439 continue
440 }
441
442 nodesStart, nodesEnd := curator.NodeEtcdPrefix.KeyRange()
443 resp, err := s.Etcd.Txn(ctx).If(
444 isLeaderCmp,
445 clientv3.Compare(clientv3.ModRevision(nodesStart).WithRange(nodesEnd), "<", releases.revision+1),
446 ).Then(
447 clientv3.OpPut(statusKey, string(restrictedStatusBytes)),
448 ).Commit()
449 if err != nil {
450 return fmt.Errorf("failed to update status: %w", err)
451 }
452 if !resp.Succeeded {
453 // This could happen either if we lost leadership, or any node was
454 // modified since we got the releases. If a node was modified, this
455 // should be seen soon by the nodes watcher. If we lost leadership,
456 // we will get canceled soon, and it's fine to go back to watching.
457 log.Info("Transaction failed, retrying.")
458 continue
459 }
460 break
461 }
462 }
463
464 if status != nil {
465 // A status exists, which means a reconciler has been running before.
466 // Wait a bit for any still outstanding Kubernetes API requests by the
467 // previous reconciler to be processed.
468 // The Kubernetes API does not support making requests conditional on an
469 // etcd lease, so requests can still be processed after leadership expired.
470 // This is best effort, since requests could take arbitrarily long to be
471 // processed. The periodic reconcile below ensures that we eventually
472 // reach the desired state and stay there.
473 select {
474 case <-time.After(reconcileWait):
475 case <-ctx.Done():
476 return ctx.Err()
477 }
478 }
479
480 log.Info("Performing initial resource reconciliation...")
481 // If the apiserver was just started, reconciliation will fail until the
482 // apiserver is ready. To keep the logs clean, retry with exponential
483 // backoff and only start logging errors after some time has passed.
484 startLogging := time.Now().Add(2 * time.Second)
485 bo := backoff.NewExponentialBackOff()
486 bo.InitialInterval = 100 * time.Millisecond
487 bo.MaxElapsedTime = 0
488 err = backoff.Retry(func() error {
489 err := reconcileAll(ctx, s.ClientSet)
490 if err != nil && time.Now().After(startLogging) {
491 log.Errorf("Still couldn't do initial reconciliation: %v", err)
492 startLogging = time.Now().Add(10 * time.Second)
493 }
494 return err
495 }, backoff.WithContext(bo, ctx))
496 if err != nil {
497 return err
498 }
499 log.Infof("Initial resource reconciliation succeeded.")
500
501 // Update status.
502 if !proto.Equal(status, doneStatus) {
503 resp, err := s.Etcd.Txn(ctx).If(isLeaderCmp).Then(
504 clientv3.OpPut(statusKey, string(doneStatusBytes)),
505 ).Commit()
506 if err != nil {
507 return fmt.Errorf("failed to update status: %w", err)
508 }
509 if !resp.Succeeded {
510 return errors.New("lost leadership, could not update status")
511 }
512 }
513
514 // Reconcile at a regular interval.
515 t := time.NewTicker(30 * time.Second)
516 defer t.Stop()
517 for {
518 select {
519 case <-t.C:
520 err := reconcileAll(ctx, s.ClientSet)
521 if err != nil {
522 log.Warning(err)
523 }
524 case <-ctx.Done():
525 return ctx.Err()
526 }
527 }
528}