m/n/k/reconciler: implement leader election
Before this change, the reconciler runs on all Kubernetes controllers.
When we are in a rolling upgrade of the cluster where a reconciled
object changes, this will cause the old and new versions of the
reconciler to fight each other, constantly updating the object back and
forth.
Now, the reconciler is elected among nodes of the latest release. The
status of the reconciliation is communicated to all Kubernetes
controllers through a new key-value in etcd.
Additionally, compatibility constraints can be expressed by changing the
constants minReconcilerRelease and minApiserverRelease, allowing
reconciliation to happen in a controlled way that ensures compatibility
even during rolling upgrades.
Change-Id: Iaf7c27702bd9809a13d47bcf041b71438353bef2
Reviewed-on: https://review.monogon.dev/c/monogon/+/3062
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/kubernetes/service_controller.go b/metropolis/node/kubernetes/service_controller.go
index c1cec27..6b4360b 100644
--- a/metropolis/node/kubernetes/service_controller.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -20,13 +20,13 @@
"context"
"fmt"
"net"
- "time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
@@ -36,7 +36,6 @@
"source.monogon.dev/metropolis/node/kubernetes/pki"
"source.monogon.dev/metropolis/node/kubernetes/reconciler"
"source.monogon.dev/metropolis/pkg/supervisor"
-
apb "source.monogon.dev/metropolis/proto/api"
)
@@ -45,10 +44,11 @@
ClusterNet net.IPNet
ClusterDomain string
- KPKI *pki.PKI
- Root *localstorage.Root
- Network *network.Service
- Node *identity.NodeCredentials
+ KPKI *pki.PKI
+ Root *localstorage.Root
+ Consensus consensus.ServiceHandle
+ Network *network.Service
+ Node *identity.NodeCredentials
}
type Controller struct {
@@ -93,6 +93,18 @@
return fmt.Errorf("could not generate kubernetes client: %w", err)
}
+ supervisor.Logger(ctx).Infof("Waiting for consensus...")
+ w := s.c.Consensus.Watch()
+ defer w.Close()
+ st, err := w.Get(ctx, consensus.FilterRunning)
+ if err != nil {
+ return fmt.Errorf("while waiting for consensus: %w", err)
+ }
+ etcd, err := st.CuratorClient()
+ if err != nil {
+ return fmt.Errorf("while retrieving consensus client: %w", err)
+ }
+
// Sub-runnable which starts all parts of Kubernetes that depend on the
// machine's external IP address. If it changes, the runnable will exit.
// TODO(q3k): test this
@@ -137,24 +149,26 @@
return fmt.Errorf("network configuration changed (%s -> %s)", address.String(), status.ExternalAddress.String())
})
+ reconcilerService := &reconciler.Service{
+ Etcd: etcd,
+ ClientSet: clientSet,
+ NodeID: s.c.Node.ID(),
+ }
+ err = supervisor.Run(ctx, "reconciler", reconcilerService.Run)
+ if err != nil {
+ return fmt.Errorf("could not run sub-service reconciler: %w", err)
+ }
+
// Before we start anything else, make sure reconciliation passes at least once.
// This makes the initial startup of a cluster much cleaner as we don't end up
// starting the scheduler/controller-manager/etc just to get them to immediately
// fail and back off with 'unauthorized'.
- startLogging := time.Now().Add(2 * time.Second)
- supervisor.Logger(ctx).Infof("Performing initial resource reconciliation...")
- for {
- err := reconciler.ReconcileAll(ctx, clientSet)
- if err == nil {
- supervisor.Logger(ctx).Infof("Initial resource reconciliation succeeded.")
- break
- }
- if time.Now().After(startLogging) {
- supervisor.Logger(ctx).Errorf("Still couldn't do initial reconciliation: %v", err)
- startLogging = time.Now().Add(10 * time.Second)
- }
- time.Sleep(100 * time.Millisecond)
+ supervisor.Logger(ctx).Info("Waiting for reconciler...")
+ err = reconciler.WaitReady(ctx, etcd)
+ if err != nil {
+ return fmt.Errorf("while waiting for reconciler: %w", err)
}
+ supervisor.Logger(ctx).Info("Reconciler is done.")
authProxy := authproxy.Service{
KPKI: s.c.KPKI,
@@ -171,7 +185,6 @@
}{
{"controller-manager", runControllerManager(*controllerManagerConfig)},
{"scheduler", runScheduler(*schedulerConfig)},
- {"reconciler", reconciler.Maintain(clientSet)},
{"authproxy", authProxy.Run},
{"metricsproxy", metricsProxy.Run},
} {