m/n/k/reconciler: implement leader election
Before this change, the reconciler runs on all Kubernetes controllers.
When we are in a rolling upgrade of the cluster where a reconciled
object changes, this will cause the old and new versions of the
reconciler to fight each other, constantly updating the object back and
forth.
Now, the reconciler is elected among nodes of the latest release. The
status of the reconciliation is communicated to all Kubernetes
controllers through a new key-value in etcd.
Additionally, compatibility constraints can be expressed by changing the
constants minReconcilerRelease and minApiserverRelease, allowing
reconciliation to happen in a controlled way that ensures compatibility
even during rolling upgrades.
Change-Id: Iaf7c27702bd9809a13d47bcf041b71438353bef2
Reviewed-on: https://review.monogon.dev/c/monogon/+/3062
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/curator/impl_follower.go b/metropolis/node/core/curator/impl_follower.go
index 8690540..5963737 100644
--- a/metropolis/node/core/curator/impl_follower.go
+++ b/metropolis/node/core/curator/impl_follower.go
@@ -8,11 +8,10 @@
common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus/client"
+ cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/event/memory"
-
- cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
)
type curatorFollower struct {
@@ -43,7 +42,7 @@
// Manually load node status data from etcd, even though we are not a leader.
// This is fine, as if we ever end up serving stale data, the client will
// realize and call us again.
- key, err := nodeEtcdPrefix.Key(lock.NodeId)
+ key, err := NodeEtcdPrefix.Key(lock.NodeId)
if err != nil {
rpc.Trace(ctx).Printf("invalid leader node id %q: %v", lock.NodeId, err)
return status.Errorf(codes.Internal, "current leader has invalid node id")
diff --git a/metropolis/node/core/curator/impl_leader_cluster_networking.go b/metropolis/node/core/curator/impl_leader_cluster_networking.go
index 6bfe82d..52d8c12 100644
--- a/metropolis/node/core/curator/impl_leader_cluster_networking.go
+++ b/metropolis/node/core/curator/impl_leader_cluster_networking.go
@@ -9,12 +9,11 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/etcd"
-
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
)
// preapreClusternetCacheUnlocked makes sure the leader's clusternetCache exists,
@@ -27,7 +26,7 @@
cache := make(map[string]string)
// Get all nodes.
- start, end := nodeEtcdPrefix.KeyRange()
+ start, end := NodeEtcdPrefix.KeyRange()
value := etcd.NewValue[*nodeAtID](l.etcd, start, nodeValueConverter, etcd.Range(end))
w := value.Watch()
defer w.Close()
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 9833410..259677b 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -66,7 +66,7 @@
// underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node
// that doesn't exist, and that will just hang . All access is privileged, so
// there's also no need to filter anything.
- nodePath, err := nodeEtcdPrefix.Key(nic.NodeId)
+ nodePath, err := NodeEtcdPrefix.Key(nic.NodeId)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid node name: %v", err)
}
@@ -99,7 +99,7 @@
func (l *leaderCurator) watchNodesInCluster(_ *ipb.WatchRequest_NodesInCluster, srv ipb.Curator_WatchServer) error {
ctx := srv.Context()
- start, end := nodeEtcdPrefix.KeyRange()
+ start, end := NodeEtcdPrefix.KeyRange()
value := etcd.NewValue[*nodeAtID](l.etcd, start, nodeValueConverter, etcd.Range(end))
w := value.Watch()
@@ -170,7 +170,7 @@
// invariants.
func nodeValueConverter(key, value []byte) (*nodeAtID, error) {
res := nodeAtID{
- id: nodeEtcdPrefix.ExtractID(string(key)),
+ id: NodeEtcdPrefix.ExtractID(string(key)),
}
if len(value) > 0 {
node, err := nodeUnmarshal(value)
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 2fe5e6c..ff18f72 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -70,7 +70,7 @@
// GetClusterInfo implements Management.GetClusterInfo, which returns summary
// information about the Metropolis cluster.
func (l *leaderManagement) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (*apb.GetClusterInfoResponse, error) {
- res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
+ res, err := l.txnAsLeader(ctx, NodeEtcdPrefix.Range())
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
}
@@ -183,7 +183,7 @@
defer l.muNodes.Unlock()
// Retrieve all nodes from etcd in a single Get call.
- res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
+ res, err := l.txnAsLeader(ctx, NodeEtcdPrefix.Range())
if err != nil {
return status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 0b39410..29f1f58 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -354,7 +354,7 @@
t.Fatalf("GenerateKey: %v", err)
}
fakeNodeID := identity.NodeID(fakeNodePub)
- fakeNodeKey, _ := nodeEtcdPrefix.Key(fakeNodeID)
+ fakeNodeKey, _ := NodeEtcdPrefix.Key(fakeNodeID)
w, err := cur.Watch(ctx, &ipb.WatchRequest{
Kind: &ipb.WatchRequest_NodeInCluster_{
@@ -446,7 +446,7 @@
}
// Remove node. This should trigger an update from the watcher.
- k, _ := nodeEtcdPrefix.Key(fakeNodeID)
+ k, _ := NodeEtcdPrefix.Key(fakeNodeID)
if _, err := cl.etcd.Delete(ctx, k); err != nil {
t.Fatalf("could not delete node from etcd: %v", err)
}
@@ -541,7 +541,7 @@
t.Fatalf("GenerateKey: %v", err)
}
fakeNodeID := identity.NodeID(fakeNodePub)
- fakeNodeKey, _ := nodeEtcdPrefix.Key(fakeNodeID)
+ fakeNodeKey, _ := NodeEtcdPrefix.Key(fakeNodeID)
fakeNode := &ppb.Node{
PublicKey: fakeNodePub,
@@ -634,7 +634,7 @@
}
// Remove fake node, expect it to be removed from synced map.
- k, _ := nodeEtcdPrefix.Key(fakeNodeID)
+ k, _ := NodeEtcdPrefix.Key(fakeNodeID)
if _, err := cl.etcd.Delete(ctx, k); err != nil {
t.Fatalf("could not delete node from etcd: %v", err)
}
diff --git a/metropolis/node/core/curator/proto/private/BUILD.bazel b/metropolis/node/core/curator/proto/private/BUILD.bazel
index a731e06..035f007 100644
--- a/metropolis/node/core/curator/proto/private/BUILD.bazel
+++ b/metropolis/node/core/curator/proto/private/BUILD.bazel
@@ -9,7 +9,10 @@
"storage.proto",
],
visibility = ["//visibility:public"],
- deps = ["//metropolis/proto/common:common_proto"],
+ deps = [
+ "//metropolis/proto/common:common_proto",
+ "//version/spec:spec_proto",
+ ],
)
go_proto_library(
@@ -17,7 +20,10 @@
importpath = "source.monogon.dev/metropolis/node/core/curator/proto/private",
proto = ":private_proto",
visibility = ["//visibility:public"],
- deps = ["//metropolis/proto/common"],
+ deps = [
+ "//metropolis/proto/common",
+ "//version/spec",
+ ],
)
go_library(
diff --git a/metropolis/node/core/curator/proto/private/storage.proto b/metropolis/node/core/curator/proto/private/storage.proto
index d279a28..23b60ac 100644
--- a/metropolis/node/core/curator/proto/private/storage.proto
+++ b/metropolis/node/core/curator/proto/private/storage.proto
@@ -3,6 +3,7 @@
package metropolis.node.core.curator.proto.private;
import "metropolis/proto/common/common.proto";
+import "version/spec/spec.proto";
// Node describes a single node's state in etcd. This is only ever visible to
// the curator, and fully managed by the curator.
@@ -67,3 +68,28 @@
bytes opaque = 1;
}
+// KubernetesReconcilerStatus contains status reported by the reconciler.
+// This is used by the reconciler itself, and it is used by the Kubernetes
+// controller service to wait for reconciliation to be complete with a
+// compatible version.
+//
+// Stored under /kubernetes/reconciler/status
+message KubernetesReconcilerStatus {
+ enum State {
+ STATE_UNKNOWN = 0;
+ STATE_DONE = 1;
+ STATE_WORKING = 2;
+ };
+ // state tells whether reconciliation is in progress or complete.
+ // This is shown in logs, but not otherwise used.
+ State state = 1;
+
+ // version is the Metropolis version of the node that last set the status.
+ version.spec.Version version = 2;
+ // minimum_compatible_release is the minimum Metropolis release which is
+ // compatible with the changes made by the reconciler.
+ // A Kubernetes controller must not start serving as long as this is higher
+ // than its own release. This may not be increased if there are
+ // Kubernetes controllers with a lower release than the new value.
+ version.spec.Version.Release minimum_compatible_release = 3;
+}
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
index bd11f6a..0c2f53b 100644
--- a/metropolis/node/core/curator/state_node.go
+++ b/metropolis/node/core/curator/state_node.go
@@ -218,9 +218,9 @@
}
var (
- // nodeEtcdPrefix is an etcd key prefix preceding cluster member node IDs,
+ // NodeEtcdPrefix is an etcd key prefix preceding cluster member node IDs,
// mapping to ppb.Node values.
- nodeEtcdPrefix = mustNewEtcdPrefix("/nodes/")
+ NodeEtcdPrefix = mustNewEtcdPrefix("/nodes/")
// joinCredPrefix is an etcd key prefix preceding hex-encoded cluster member
// node join keys, mapping to node IDs.
joinCredPrefix = mustNewEtcdPrefix("/join_keys/")
@@ -229,7 +229,7 @@
// etcdNodePath builds the etcd path in which this node's protobuf-serialized
// state is stored in etcd.
func (n *Node) etcdNodePath() (string, error) {
- return nodeEtcdPrefix.Key(n.ID())
+ return NodeEtcdPrefix.Key(n.ID())
}
func (n *Node) etcdJoinKeyPath() (string, error) {
@@ -361,7 +361,7 @@
// returned.
func nodeLoad(ctx context.Context, l *leadership, id string) (*Node, error) {
rpc.Trace(ctx).Printf("loadNode(%s)...", id)
- key, err := nodeEtcdPrefix.Key(id)
+ key, err := NodeEtcdPrefix.Key(id)
if err != nil {
rpc.Trace(ctx).Printf("invalid node id: %v", err)
return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
@@ -394,7 +394,7 @@
// Build an etcd operation to save the node with a key based on its ID.
id := n.ID()
rpc.Trace(ctx).Printf("nodeSave(%s)...", id)
- nkey, err := nodeEtcdPrefix.Key(id)
+ nkey, err := NodeEtcdPrefix.Key(id)
if err != nil {
rpc.Trace(ctx).Printf("invalid node id: %v", err)
return status.Errorf(codes.InvalidArgument, "invalid node id")
@@ -440,7 +440,7 @@
rpc.Trace(ctx).Printf("nodeDestroy(%s)...", id)
// Get paths for node data and join key.
- nkey, err := nodeEtcdPrefix.Key(id)
+ nkey, err := NodeEtcdPrefix.Key(id)
if err != nil {
rpc.Trace(ctx).Printf("invalid node id: %v", err)
return status.Errorf(codes.InvalidArgument, "invalid node id")
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index e8c96c6..c3fcfb3 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -6,6 +6,7 @@
"net"
"source.monogon.dev/metropolis/node/core/clusternet"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
@@ -15,8 +16,6 @@
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
-
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -161,6 +160,7 @@
ClusterDomain: clusterDomain,
KPKI: pki,
Root: s.storageRoot,
+ Consensus: d.lcp.consensus,
Network: s.network,
})
// Start Kubernetes.
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index d775c50..854f6c3 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -19,6 +19,7 @@
"//go/net/tinylb",
"//metropolis/node",
"//metropolis/node/core/clusternet",
+ "//metropolis/node/core/consensus",
"//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/curator/watcher",
"//metropolis/node/core/identity",
diff --git a/metropolis/node/kubernetes/reconciler/BUILD.bazel b/metropolis/node/kubernetes/reconciler/BUILD.bazel
index 306c273..caa239a 100644
--- a/metropolis/node/kubernetes/reconciler/BUILD.bazel
+++ b/metropolis/node/kubernetes/reconciler/BUILD.bazel
@@ -4,6 +4,7 @@
name = "reconciler",
srcs = [
"reconciler.go",
+ "reconciler_status.go",
"resources_csi.go",
"resources_rbac.go",
"resources_runtimeclass.go",
@@ -12,19 +13,48 @@
importpath = "source.monogon.dev/metropolis/node/kubernetes/reconciler",
visibility = ["//metropolis/node:__subpackages__"],
deps = [
+ "//metropolis/node/core/consensus/client",
+ "//metropolis/node/core/curator",
+ "//metropolis/node/core/curator/proto/private",
+ "//metropolis/pkg/event/etcd",
+ "//metropolis/pkg/event/memory",
"//metropolis/pkg/supervisor",
+ "//metropolis/version",
+ "//version",
+ "//version/spec",
+ "@com_github_cenkalti_backoff_v4//:backoff",
+ "@io_etcd_go_etcd_api_v3//mvccpb",
+ "@io_etcd_go_etcd_client_v3//:client",
+ "@io_etcd_go_etcd_client_v3//concurrency",
"@io_k8s_api//core/v1:core",
"@io_k8s_api//node/v1:node",
"@io_k8s_api//rbac/v1:rbac",
"@io_k8s_api//storage/v1:storage",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_client_go//kubernetes",
+ "@org_golang_google_protobuf//proto",
],
)
go_test(
name = "reconciler_test",
- srcs = ["reconciler_test.go"],
+ srcs = [
+ "reconciler_status_test.go",
+ "reconciler_test.go",
+ ],
embed = [":reconciler"],
- deps = ["@io_k8s_apimachinery//pkg/apis/meta/v1:meta"],
+ deps = [
+ "//metropolis/node/core/consensus/client",
+ "//metropolis/node/core/curator",
+ "//metropolis/node/core/curator/proto/private",
+ "//metropolis/pkg/supervisor",
+ "//metropolis/proto/common",
+ "//metropolis/version",
+ "//version",
+ "//version/spec",
+ "@io_etcd_go_etcd_tests_v3//integration",
+ "@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
+ "@io_k8s_client_go//kubernetes/fake",
+ "@org_golang_google_protobuf//proto",
+ ],
)
diff --git a/metropolis/node/kubernetes/reconciler/reconciler.go b/metropolis/node/kubernetes/reconciler/reconciler.go
index 2d2bfda..4ea2d84 100644
--- a/metropolis/node/kubernetes/reconciler/reconciler.go
+++ b/metropolis/node/kubernetes/reconciler/reconciler.go
@@ -29,12 +29,9 @@
import (
"context"
"fmt"
- "time"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
-
- "source.monogon.dev/metropolis/pkg/supervisor"
)
// True is a sad workaround for all the pointer booleans in K8s specs
@@ -120,7 +117,7 @@
}
}
-func ReconcileAll(ctx context.Context, clientSet kubernetes.Interface) error {
+func reconcileAll(ctx context.Context, clientSet kubernetes.Interface) error {
resources := allResources(clientSet)
for name, resource := range resources {
err := reconcile(ctx, resource)
@@ -131,26 +128,6 @@
return nil
}
-func Maintain(clientSet kubernetes.Interface) supervisor.Runnable {
- return func(ctx context.Context) error {
- log := supervisor.Logger(ctx)
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- t := time.NewTicker(10 * time.Second)
- defer t.Stop()
- for {
- select {
- case <-t.C:
- err := ReconcileAll(ctx, clientSet)
- if err != nil {
- log.Warning(err)
- }
- case <-ctx.Done():
- return nil
- }
- }
- }
-}
-
func reconcile(ctx context.Context, r resource) error {
present, err := r.List(ctx)
if err != nil {
diff --git a/metropolis/node/kubernetes/reconciler/reconciler_status.go b/metropolis/node/kubernetes/reconciler/reconciler_status.go
new file mode 100644
index 0000000..f3ad06b
--- /dev/null
+++ b/metropolis/node/kubernetes/reconciler/reconciler_status.go
@@ -0,0 +1,528 @@
+package reconciler
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "go.etcd.io/etcd/api/v3/mvccpb"
+ clientv3 "go.etcd.io/etcd/client/v3"
+ "go.etcd.io/etcd/client/v3/concurrency"
+ "google.golang.org/protobuf/proto"
+ "k8s.io/client-go/kubernetes"
+
+ "source.monogon.dev/metropolis/node/core/consensus/client"
+ "source.monogon.dev/metropolis/node/core/curator"
+ ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+ "source.monogon.dev/metropolis/pkg/event/etcd"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ mversion "source.monogon.dev/metropolis/version"
+ "source.monogon.dev/version"
+ vpb "source.monogon.dev/version/spec"
+)
+
+// This file contains the reconciler Service, whose purpose is to run
+// reconcileAll in a controlled way (leader elected and only when other nodes
+// are compatible) and to set the reconciler status in etcd.
+// The file also contains WaitReady, which watches the status and returns
+// when the apiserver can start serving.
+// These two form the public interface of the reconciler.
+
+const (
+ // statusKey is the key in the curator etcd namespace
+ // under which the reconciler status is stored.
+ // At some point, we do a transaction involving both this key and
+ // the nodes prefix, so both must be in the same namespace.
+ statusKey = "/kubernetes/reconciler/status"
+ // electionPrefix is the etcd prefix where
+ // a node is elected to run the reconciler.
+ electionPrefix = "/kubernetes/reconciler/leader"
+)
+
+var (
+ // minReconcilerRelease is the minimum Metropolis release which
+ // the node last performing reconciliation must have
+ // for the local node to be able to start serving.
+ // Set this to the next release when making changes to reconciled state
+ // which must be applied before starting to serve.
+ minReconcilerRelease = &vpb.Version_Release{Major: 0, Minor: 1, Patch: 0}
+ // minApiserverRelease is the minimum Metropolis release which all Kubernetes
+ // controller nodes must have before the local node can reconcile.
+ // This will be written to minimum_compatible_release in the reconciler status,
+ // and thus block any reappearing apiservers with a lower release from serving,
+ // until a reconciler of a lower release has run.
+ // Increase this when making changes to reconciled state which are
+ // incompatible with apiservers serving at the current minApiserverRelease.
+ minApiserverRelease = &vpb.Version_Release{Major: 0, Minor: 1, Patch: 0}
+)
+
+// reconcileWait is the wait time between getting elected and
+// starting to reconcile.
+// It is a variable to allow changing it from tests.
+var reconcileWait = 5 * time.Second
+
+// WaitReady watches the reconciler status and returns once initial
+// reconciliation is done and the reconciled state is compatible.
+func WaitReady(ctx context.Context, etcdClient client.Namespaced) error {
+ value := etcd.NewValue(etcdClient, statusKey, func(_, data []byte) (*ppb.KubernetesReconcilerStatus, error) {
+ status := &ppb.KubernetesReconcilerStatus{}
+ if err := proto.Unmarshal(data, status); err != nil {
+ return nil, fmt.Errorf("could not unmarshal: %w", err)
+ }
+ return status, nil
+ })
+
+ w := value.Watch()
+ defer w.Close()
+
+ for {
+ status, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+
+ state := "unknown"
+ switch status.State {
+ case ppb.KubernetesReconcilerStatus_STATE_DONE:
+ state = "done"
+ case ppb.KubernetesReconcilerStatus_STATE_WORKING:
+ state = "working"
+ }
+ supervisor.Logger(ctx).Infof("Reconciler status: %s, version: %s, minimum compatible release: %s. Local node version: %s, minimum reconciler release: %s.",
+ state,
+ version.Semver(status.Version),
+ version.Release(status.MinimumCompatibleRelease),
+ version.Semver(mversion.Version),
+ version.Release(minReconcilerRelease),
+ )
+
+ if version.ReleaseLessThan(mversion.Version.Release, status.MinimumCompatibleRelease) {
+ supervisor.Logger(ctx).Info("Not ready, because the local node release is below the reconciler minimum compatible release. Waiting for status change.")
+ continue
+ }
+
+ if version.ReleaseLessThan(status.Version.Release, minReconcilerRelease) {
+ supervisor.Logger(ctx).Info("Not ready, because the reconciler release is below the local required minimum. Waiting for status change.")
+ continue
+ }
+
+ // Startup is intentionally not blocked by state=working.
+ // As long as a node is compatible with both the before and after state,
+ // it can continue running, and startup should also be allowed.
+ // This way, disruption is minimized in case reconciliation fails
+ // to complete and the status stays in working state.
+ // For the initial reconcile, a status is only created after it is complete.
+
+ return nil
+ }
+}
+
+// Service is the reconciler service.
+type Service struct {
+ // Etcd is an etcd client for the curator namespace.
+ Etcd client.Namespaced
+ // ClientSet is what the reconciler uses to interact with the apiserver.
+ ClientSet kubernetes.Interface
+ // NodeID is the ID of the local node.
+ NodeID string
+ // releases is set by watchNodes and watched by other parts of the service.
+ releases memory.Value[*nodeReleases]
+}
+
+// nodeReleases contains a summary of the releases of all
+// Kubernetes controller nodes currently in the cluster.
+type nodeReleases struct {
+ minRelease *vpb.Version_Release
+ maxRelease *vpb.Version_Release
+ // revision is the etcd revision at which this info is valid.
+ revision int64
+}
+
+// The reconciler service has a tree of runnables:
+//
+// - watch-nodes: Watches nodes in etcd and sets releases.
+// - watch-releases: Watches releases and runs elect while the local node is
+// the latest release.
+// - elect: Performs etcd leader election and starts lead once elected.
+// - lead: Checks current status, watches releases until incompatible
+// nodes disappear, updates status, runs reconcileAll.
+
+// Run is the root runnable of the reconciler service.
+func (s *Service) Run(ctx context.Context) error {
+ err := supervisor.Run(ctx, "watch-nodes", s.watchNodes)
+ if err != nil {
+ return fmt.Errorf("could not run watch-nodes: %w", err)
+ }
+
+ err = supervisor.Run(ctx, "watch-releases", s.watchReleases)
+ if err != nil {
+ return fmt.Errorf("could not run watch-releases: %w", err)
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+}
+
+// watchNodes watches nodes in etcd, and publishes a summary of
+// releases of Kubernetes controller nodes in s.releases.
+func (s *Service) watchNodes(ctx context.Context) error {
+ nodesStart, nodesEnd := curator.NodeEtcdPrefix.KeyRange()
+
+ var revision int64
+ nodeToRelease := make(map[string]string)
+ releaseCount := make(map[string]int)
+ releaseStruct := make(map[string]*vpb.Version_Release)
+
+ updateNode := func(kv *mvccpb.KeyValue) {
+ nodeKey := string(kv.Key)
+ // Subtract the previous release of this node if any.
+ if prevRelease, ok := nodeToRelease[nodeKey]; ok {
+ delete(nodeToRelease, nodeKey)
+ releaseCount[prevRelease] -= 1
+ if releaseCount[prevRelease] == 0 {
+ delete(releaseCount, prevRelease)
+ delete(releaseStruct, prevRelease)
+ }
+ }
+
+ // Parse the node release. Skip if the node was deleted, is not a
+ // Kubernetes controller, or does not have a release.
+ if len(kv.Value) == 0 {
+ return
+ }
+ node := ppb.Node{}
+ if err := proto.Unmarshal(kv.Value, &node); err != nil {
+ supervisor.Logger(ctx).Errorf("Failed to unmarshal node %q: %w", nodeKey, err)
+ return
+ }
+ if node.Roles.KubernetesController == nil {
+ return
+ }
+ if node.Status == nil || node.Status.Version == nil {
+ return
+ }
+ release := version.Release(node.Status.Version.Release)
+ // Add the new release.
+ nodeToRelease[nodeKey] = release
+ if releaseCount[release] == 0 {
+ releaseStruct[release] = node.Status.Version.Release
+ }
+ releaseCount[release] += 1
+ }
+
+ publish := func() {
+ minRelease := mversion.Version.Release
+ maxRelease := mversion.Version.Release
+ for _, release := range releaseStruct {
+ if version.ReleaseLessThan(release, minRelease) {
+ minRelease = release
+ }
+ if version.ReleaseLessThan(maxRelease, release) {
+ maxRelease = release
+ }
+ }
+ s.releases.Set(&nodeReleases{
+ minRelease: minRelease,
+ maxRelease: maxRelease,
+ revision: revision,
+ })
+ }
+
+ // Get the initial nodes data.
+ get, err := s.Etcd.Get(ctx, nodesStart, clientv3.WithRange(nodesEnd))
+ if err != nil {
+ return fmt.Errorf("when retrieving initial nodes: %w", err)
+ }
+
+ for _, kv := range get.Kvs {
+ updateNode(kv)
+ }
+ revision = get.Header.Revision
+ publish()
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ // Watch for changes.
+ wch := s.Etcd.Watch(ctx, nodesStart, clientv3.WithRange(nodesEnd), clientv3.WithRev(revision+1))
+ for resp := range wch {
+ if err := resp.Err(); err != nil {
+ return fmt.Errorf("watch failed: %w", err)
+ }
+ for _, ev := range resp.Events {
+ updateNode(ev.Kv)
+ }
+ revision = resp.Header.Revision
+ publish()
+ }
+ return fmt.Errorf("channel closed: %w", ctx.Err())
+}
+
+// watchReleases watches s.releases, and runs elect for as long as
+// the local node has the latest release.
+func (s *Service) watchReleases(ctx context.Context) error {
+ w := s.releases.Watch()
+ defer w.Close()
+
+ r, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+
+ shouldRun := !version.ReleaseLessThan(mversion.Version.Release, r.maxRelease)
+ if shouldRun {
+ supervisor.Logger(ctx).Info("This Kubernetes controller node has the latest release, starting election.")
+ err := supervisor.Run(ctx, "elect", s.elect)
+ if err != nil {
+ return fmt.Errorf("could not run elect: %w", err)
+ }
+ } else {
+ supervisor.Logger(ctx).Infof("This Kubernetes controller node does not have the latest release, not starting election. Latest release: %s", version.Release(r.maxRelease))
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ for {
+ r, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+ shouldRunNow := !version.ReleaseLessThan(mversion.Version.Release, r.maxRelease)
+ if shouldRunNow != shouldRun {
+ return errors.New("latest release changed, restarting")
+ }
+ }
+}
+
+func (s *Service) elect(ctx context.Context) error {
+ session, err := concurrency.NewSession(s.Etcd.ThinClient(ctx))
+ if err != nil {
+ return fmt.Errorf("creating session failed: %w", err)
+ }
+
+ defer func() {
+ session.Orphan()
+ // ctx may be canceled, but we still try to revoke with a short timeout.
+ revokeCtx, cancel := context.WithTimeout(context.Background(), time.Second)
+ _, err := s.Etcd.Revoke(revokeCtx, session.Lease())
+ cancel()
+ if err != nil {
+ supervisor.Logger(ctx).Warningf("Failed to revoke lease: %v", err)
+ }
+ }()
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ supervisor.Logger(ctx).Infof("Campaigning. Lease ID: %x", session.Lease())
+ election := concurrency.NewElection(session, electionPrefix)
+
+ // The election value is unused; we put the node ID there for manual inspection.
+ err = election.Campaign(ctx, s.NodeID)
+ if err != nil {
+ return fmt.Errorf("campaigning failed: %w", err)
+ }
+ supervisor.Logger(ctx).Info("Elected.")
+
+ leadCtx, leadCancel := context.WithCancel(ctx)
+ go func() {
+ <-session.Done()
+ leadCancel()
+ }()
+
+ isLeaderCmp := clientv3.Compare(clientv3.CreateRevision(election.Key()), "=", election.Rev())
+ return s.lead(leadCtx, isLeaderCmp)
+}
+
+func (s *Service) lead(ctx context.Context, isLeaderCmp clientv3.Cmp) error {
+ log := supervisor.Logger(ctx)
+
+ // Retrieve the initial status.
+ status := &ppb.KubernetesReconcilerStatus{}
+ statusGet, err := s.Etcd.Get(ctx, statusKey)
+ if err != nil {
+ return fmt.Errorf("when getting status: %w", err)
+ }
+ if len(statusGet.Kvs) == 1 {
+ err := proto.Unmarshal(statusGet.Kvs[0].Value, status)
+ if err != nil {
+ log.Warningf("Could not unmarshal status: %v", err)
+ status = nil
+ }
+ } else {
+ status = nil
+ }
+
+ doneStatus := &ppb.KubernetesReconcilerStatus{
+ State: ppb.KubernetesReconcilerStatus_STATE_DONE,
+ Version: mversion.Version,
+ MinimumCompatibleRelease: minApiserverRelease,
+ }
+ doneStatusBytes, err := proto.Marshal(doneStatus)
+ if err != nil {
+ return fmt.Errorf("could not marshal status: %w", err)
+ }
+
+ if status == nil {
+ // The status does not exist yet. Reconcile, then create the status.
+ log.Info("Status does not exist yet.")
+ } else if proto.Equal(status, doneStatus) {
+ // The status is already what we would set, so leave it as is.
+ log.Info("Status is already up to date.")
+ } else if !version.ReleaseLessThan(mversion.Version.Release, status.Version.Release) &&
+ !version.ReleaseLessThan(status.MinimumCompatibleRelease, minApiserverRelease) {
+ // The status does not allow apiservers to start serving which would be
+ // incompatible after we reconcile. So just set the state to working.
+ log.Info("Status is compatible, setting state to working.")
+ if status.State != ppb.KubernetesReconcilerStatus_STATE_WORKING {
+ status.State = ppb.KubernetesReconcilerStatus_STATE_WORKING
+
+ workingStatusBytes, err := proto.Marshal(status)
+ if err != nil {
+ return fmt.Errorf("could not marshal status: %w", err)
+ }
+ resp, err := s.Etcd.Txn(ctx).If(isLeaderCmp).Then(
+ clientv3.OpPut(statusKey, string(workingStatusBytes)),
+ ).Commit()
+ if err != nil {
+ return fmt.Errorf("failed to update status: %w", err)
+ }
+ if !resp.Succeeded {
+ return errors.New("lost leadership, could not update status")
+ }
+ }
+ } else {
+ // The status allows apiservers to start which would be incompatible after
+ // we reconcile. We need to wait for any such nodes to disappear, then set
+ // the status to disallow these nodes from starting before reconciling.
+ // While reconciliation is ongoing, we are in an intermediate state
+ // between the previous and the new reconciled state, and we only want
+ // to allow nodes that are compatible with both. So we use the minimum of
+ // the two versions and the maximum of the two MinimumCompatibleReleases,
+ // which results in allowing the intersection of the two statuses.
+ log.Info("Status allows incompatible releases, need to restrict.")
+
+ status.State = ppb.KubernetesReconcilerStatus_STATE_WORKING
+ if !version.ReleaseLessThan(status.Version.Release, mversion.Version.Release) {
+ status.Version = mversion.Version
+ }
+ if version.ReleaseLessThan(status.MinimumCompatibleRelease, minApiserverRelease) {
+ status.MinimumCompatibleRelease = minApiserverRelease
+ }
+ restrictedStatusBytes, err := proto.Marshal(status)
+ if err != nil {
+ return fmt.Errorf("could not marshal status: %w", err)
+ }
+
+ releasesW := s.releases.Watch()
+ defer releasesW.Close()
+
+ lastLogRelease := ""
+ for {
+ releases, err := releasesW.Get(ctx)
+ if err != nil {
+ return err
+ }
+ if version.ReleaseLessThan(mversion.Version.Release, releases.maxRelease) {
+ // We will likely get canceled soon by watchReleases restarting, unless
+ // this is a very short transient that is not noticed by watchReleases.
+ continue
+ }
+ if version.ReleaseLessThan(releases.minRelease, minApiserverRelease) {
+ rel := version.Release(releases.minRelease)
+ if rel != lastLogRelease {
+ lastLogRelease = rel
+ log.Infof("There are incompatible nodes, waiting for node changes. Minimum node release: %s Need at least: %s", rel, version.Release(minApiserverRelease))
+ }
+ continue
+ }
+
+ nodesStart, nodesEnd := curator.NodeEtcdPrefix.KeyRange()
+ resp, err := s.Etcd.Txn(ctx).If(
+ isLeaderCmp,
+ clientv3.Compare(clientv3.ModRevision(nodesStart).WithRange(nodesEnd), "<", releases.revision+1),
+ ).Then(
+ clientv3.OpPut(statusKey, string(restrictedStatusBytes)),
+ ).Commit()
+ if err != nil {
+ return fmt.Errorf("failed to update status: %w", err)
+ }
+ if !resp.Succeeded {
+ // This could happen either if we lost leadership, or any node was
+ // modified since we got the releases. If a node was modified, this
+ // should be seen soon by the nodes watcher. If we lost leadership,
+ // we will get canceled soon, and it's fine to go back to watching.
+ log.Info("Transaction failed, retrying.")
+ continue
+ }
+ break
+ }
+ }
+
+ if status != nil {
+ // A status exists, which means a reconciler has been running before.
+ // Wait a bit for any still outstanding Kubernetes API requests by the
+ // previous reconciler to be processed.
+ // The Kubernetes API does not support making requests conditional on an
+ // etcd lease, so requests can still be processed after leadership expired.
+ // This is best effort, since requests could take arbitrarily long to be
+ // processed. The periodic reconcile below ensures that we eventually
+ // reach the desired state and stay there.
+ select {
+ case <-time.After(reconcileWait):
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+
+ log.Info("Performing initial resource reconciliation...")
+ // If the apiserver was just started, reconciliation will fail until the
+ // apiserver is ready. To keep the logs clean, retry with exponential
+ // backoff and only start logging errors after some time has passed.
+ startLogging := time.Now().Add(2 * time.Second)
+ bo := backoff.NewExponentialBackOff()
+ bo.InitialInterval = 100 * time.Millisecond
+ bo.MaxElapsedTime = 0
+ err = backoff.Retry(func() error {
+ err := reconcileAll(ctx, s.ClientSet)
+ if err != nil && time.Now().After(startLogging) {
+ log.Errorf("Still couldn't do initial reconciliation: %v", err)
+ startLogging = time.Now().Add(10 * time.Second)
+ }
+ return err
+ }, backoff.WithContext(bo, ctx))
+ if err != nil {
+ return err
+ }
+ log.Infof("Initial resource reconciliation succeeded.")
+
+ // Update status.
+ if !proto.Equal(status, doneStatus) {
+ resp, err := s.Etcd.Txn(ctx).If(isLeaderCmp).Then(
+ clientv3.OpPut(statusKey, string(doneStatusBytes)),
+ ).Commit()
+ if err != nil {
+ return fmt.Errorf("failed to update status: %w", err)
+ }
+ if !resp.Succeeded {
+ return errors.New("lost leadership, could not update status")
+ }
+ }
+
+ // Reconcile at a regular interval.
+ t := time.NewTicker(30 * time.Second)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ err := reconcileAll(ctx, s.ClientSet)
+ if err != nil {
+ log.Warning(err)
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+}
diff --git a/metropolis/node/kubernetes/reconciler/reconciler_status_test.go b/metropolis/node/kubernetes/reconciler/reconciler_status_test.go
new file mode 100644
index 0000000..bd627a2
--- /dev/null
+++ b/metropolis/node/kubernetes/reconciler/reconciler_status_test.go
@@ -0,0 +1,305 @@
+package reconciler
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "go.etcd.io/etcd/tests/v3/integration"
+ "google.golang.org/protobuf/proto"
+ "k8s.io/client-go/kubernetes/fake"
+
+ "source.monogon.dev/metropolis/node/core/consensus/client"
+ "source.monogon.dev/metropolis/node/core/curator"
+ ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
+ mversion "source.monogon.dev/metropolis/version"
+ "source.monogon.dev/version"
+ vpb "source.monogon.dev/version/spec"
+)
+
+// TestMinimumReleasesNotAboveMetropolisRelease tests that minimum releases
+// are not above the metropolis release itself, because that would cause
+// things to get stuck.
+func TestMinimumReleasesNotAboveMetropolisRelease(t *testing.T) {
+ if version.ReleaseLessThan(mversion.Version.Release, minReconcilerRelease) {
+ t.Errorf("Metropolis release %s is below the minimum reconciler release %s",
+ version.Semver(mversion.Version),
+ version.Release(minReconcilerRelease),
+ )
+ }
+ if version.ReleaseLessThan(mversion.Version.Release, minApiserverRelease) {
+ t.Errorf("Metropolis release %s is below the minimum apiserver release %s",
+ version.Semver(mversion.Version),
+ version.Release(minApiserverRelease),
+ )
+ }
+}
+
+// startEtcd creates an etcd cluster and client for testing.
+func startEtcd(t *testing.T) client.Namespaced {
+ t.Helper()
+ // Start a single-node etcd cluster.
+ integration.BeforeTestExternal(t)
+ cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+ t.Cleanup(func() {
+ cluster.Terminate(t)
+ })
+ // Create etcd client to test cluster.
+ curEtcd, _ := client.NewLocal(cluster.Client(0)).Sub("curator")
+ return curEtcd
+}
+
+func setStatus(t *testing.T, cl client.Namespaced, status *ppb.KubernetesReconcilerStatus) {
+ t.Helper()
+ ctx := context.Background()
+
+ statusBytes, err := proto.Marshal(status)
+ if err != nil {
+ t.Fatalf("Failed to marshal status: %v", err)
+ }
+
+ _, err = cl.Put(ctx, statusKey, string(statusBytes))
+ if err != nil {
+ t.Fatalf("Put: %v", err)
+ }
+}
+
+func makeNode(isController bool, release *vpb.Version_Release) *ppb.Node {
+ node := &ppb.Node{
+ Roles: &cpb.NodeRoles{},
+ Status: &cpb.NodeStatus{
+ Version: &vpb.Version{Release: release},
+ },
+ }
+ if isController {
+ node.Roles.KubernetesController = &cpb.NodeRoles_KubernetesController{}
+ }
+ return node
+}
+
+// putNode puts the node into etcd, or deletes if nil.
+// It returns the etcd revision of the operation.
+func putNode(t *testing.T, cl client.Namespaced, id string, node *ppb.Node) int64 {
+ t.Helper()
+ ctx := context.Background()
+
+ nkey, err := curator.NodeEtcdPrefix.Key(id)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if node != nil {
+ nodeBytes, err := proto.Marshal(node)
+ if err != nil {
+ t.Fatalf("Failed to marshal node: %v", err)
+ }
+ resp, err := cl.Put(ctx, nkey, string(nodeBytes))
+ if err != nil {
+ t.Fatalf("Put: %v", err)
+ }
+ return resp.Header.Revision
+ } else {
+ resp, err := cl.Delete(ctx, nkey)
+ if err != nil {
+ t.Fatalf("Delete: %v", err)
+ }
+ return resp.Header.Revision
+ }
+}
+
+// TestWaitReady tests that WaitReady does not return too early, and the test
+// will time out if WaitReady fails to return when it is supposed to.
+func TestWaitReady(t *testing.T) {
+ cl := startEtcd(t)
+
+ isReady := make(chan struct{})
+ supervisor.TestHarness(t, func(ctx context.Context) error {
+ err := WaitReady(ctx, cl)
+ if err != nil {
+ t.Error(err)
+ }
+ close(isReady)
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+ })
+
+ // status does not exist.
+ time.Sleep(10 * time.Millisecond)
+
+ // Version is too old.
+ setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
+ State: ppb.KubernetesReconcilerStatus_STATE_DONE,
+ Version: &vpb.Version{
+ Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 0},
+ },
+ MinimumCompatibleRelease: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 0},
+ })
+ time.Sleep(10 * time.Millisecond)
+
+ // MinimumCompatibleRelease is too new.
+ setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
+ State: ppb.KubernetesReconcilerStatus_STATE_DONE,
+ Version: &vpb.Version{
+ Release: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
+ },
+ MinimumCompatibleRelease: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
+ })
+ time.Sleep(10 * time.Millisecond)
+
+ select {
+ case <-isReady:
+ t.Fatal("WaitReady returned too early.")
+ default:
+ }
+
+ // Now set the status to something compatible.
+ setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
+ State: ppb.KubernetesReconcilerStatus_STATE_DONE,
+ Version: &vpb.Version{
+ Release: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
+ },
+ MinimumCompatibleRelease: mversion.Version.Release,
+ })
+
+ <-isReady
+}
+
+// TestWatchNodes ensures that WatchNodes always updates releases correctly
+// as nodes are changed in various ways.
+func TestWatchNodes(t *testing.T) {
+ ctx := context.Background()
+ cl := startEtcd(t)
+ s := Service{
+ Etcd: cl,
+ }
+ w := s.releases.Watch()
+ defer w.Close()
+
+ expectReleases := func(expectMin, expectMax string, expectRev int64) {
+ t.Helper()
+ releases, err := w.Get(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if actualMin := version.Release(releases.minRelease); actualMin != expectMin {
+ t.Fatalf("Expected minimum release %s, got %s", expectMin, actualMin)
+ }
+ if actualMax := version.Release(releases.maxRelease); actualMax != expectMax {
+ t.Fatalf("Expected maximum release %s, got %s", expectMax, actualMax)
+ }
+ if releases.revision != expectRev {
+ t.Fatalf("Expected revision %v, got %v", expectRev, releases.revision)
+ }
+ }
+
+ putNode(t, cl, "a1", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+ putNode(t, cl, "a2", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+ putNode(t, cl, "a3", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+ putNode(t, cl, "b", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 3}))
+ rev := putNode(t, cl, "c", makeNode(true, &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0}))
+
+ supervisor.TestHarness(t, s.watchNodes)
+ expectReleases("0.0.2", "10000.0.0", rev)
+ // Node a1 is no longer a Kubernetes controller.
+ rev = putNode(t, cl, "a1", makeNode(false, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+ expectReleases("0.0.2", "10000.0.0", rev)
+ // Node a2 is deleted.
+ rev = putNode(t, cl, "a2", nil)
+ expectReleases("0.0.2", "10000.0.0", rev)
+ // Node a3 changes release. Now, the minimum should change.
+ rev = putNode(t, cl, "a3", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 4}))
+ expectReleases("0.0.3", "10000.0.0", rev)
+}
+
+// TestService tests the entire service, checking that it reconciles
+// only in situations where it should.
+func TestService(t *testing.T) {
+ reconcileWait = 10 * time.Millisecond
+ cl := startEtcd(t)
+ clientset := fake.NewSimpleClientset()
+ s := Service{
+ Etcd: cl,
+ ClientSet: clientset,
+ NodeID: "testnode",
+ }
+
+ // This node is newer than the local node, election should not start.
+ putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0}))
+
+ cancelService, _ := supervisor.TestHarness(t, s.Run)
+
+ time.Sleep(50 * time.Millisecond)
+ if len(clientset.Actions()) != 0 {
+ t.Fatal("Actions shouldn't have been performed yet.")
+ }
+
+ // The status allows a too old node to start.
+ setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
+ State: ppb.KubernetesReconcilerStatus_STATE_DONE,
+ Version: &vpb.Version{
+ Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
+ },
+ MinimumCompatibleRelease: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
+ })
+
+ // This node is too old, before minApiserverRelease.
+ putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+
+ // watch-releases restarts with 500 ms backoff + randomization, so wait 1s.
+ time.Sleep(time.Second)
+ if len(clientset.Actions()) != 0 {
+ t.Fatal("Actions shouldn't have been performed yet.")
+ }
+
+ // Upgrade the node.
+ putNode(t, cl, "a", makeNode(true, minApiserverRelease))
+
+ // Wait for status to be set.
+ waitForActions := func() {
+ isReady := make(chan struct{})
+ supervisor.TestHarness(t, func(ctx context.Context) error {
+ err := WaitReady(ctx, cl)
+ if err != nil {
+ t.Error(err)
+ }
+ close(isReady)
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+ })
+ <-isReady
+
+ if len(clientset.Actions()) == 0 {
+ t.Fatal("Actions should have been performed.")
+ }
+ clientset.ClearActions()
+ }
+ waitForActions()
+
+ // The status does not allow a too old node to start.
+ setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
+ State: ppb.KubernetesReconcilerStatus_STATE_DONE,
+ Version: &vpb.Version{
+ Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
+ },
+ MinimumCompatibleRelease: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
+ })
+
+ // This node is too old, before minApiserverRelease. But because it is not
+ // allowed to start, the reconciler is not blocked.
+ putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+
+ // Start another instance. The old node is still leader.
+ supervisor.TestHarness(t, s.Run)
+
+ time.Sleep(50 * time.Millisecond)
+ if len(clientset.Actions()) != 0 {
+ t.Fatal("Actions shouldn't have been performed yet.")
+ }
+
+ // Stop the first instance. Now the second instance should get elected.
+ cancelService()
+ waitForActions()
+}
diff --git a/metropolis/node/kubernetes/service_controller.go b/metropolis/node/kubernetes/service_controller.go
index c1cec27..6b4360b 100644
--- a/metropolis/node/kubernetes/service_controller.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -20,13 +20,13 @@
"context"
"fmt"
"net"
- "time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
@@ -36,7 +36,6 @@
"source.monogon.dev/metropolis/node/kubernetes/pki"
"source.monogon.dev/metropolis/node/kubernetes/reconciler"
"source.monogon.dev/metropolis/pkg/supervisor"
-
apb "source.monogon.dev/metropolis/proto/api"
)
@@ -45,10 +44,11 @@
ClusterNet net.IPNet
ClusterDomain string
- KPKI *pki.PKI
- Root *localstorage.Root
- Network *network.Service
- Node *identity.NodeCredentials
+ KPKI *pki.PKI
+ Root *localstorage.Root
+ Consensus consensus.ServiceHandle
+ Network *network.Service
+ Node *identity.NodeCredentials
}
type Controller struct {
@@ -93,6 +93,18 @@
return fmt.Errorf("could not generate kubernetes client: %w", err)
}
+ supervisor.Logger(ctx).Infof("Waiting for consensus...")
+ w := s.c.Consensus.Watch()
+ defer w.Close()
+ st, err := w.Get(ctx, consensus.FilterRunning)
+ if err != nil {
+ return fmt.Errorf("while waiting for consensus: %w", err)
+ }
+ etcd, err := st.CuratorClient()
+ if err != nil {
+ return fmt.Errorf("while retrieving consensus client: %w", err)
+ }
+
// Sub-runnable which starts all parts of Kubernetes that depend on the
// machine's external IP address. If it changes, the runnable will exit.
// TODO(q3k): test this
@@ -137,24 +149,26 @@
return fmt.Errorf("network configuration changed (%s -> %s)", address.String(), status.ExternalAddress.String())
})
+ reconcilerService := &reconciler.Service{
+ Etcd: etcd,
+ ClientSet: clientSet,
+ NodeID: s.c.Node.ID(),
+ }
+ err = supervisor.Run(ctx, "reconciler", reconcilerService.Run)
+ if err != nil {
+ return fmt.Errorf("could not run sub-service reconciler: %w", err)
+ }
+
// Before we start anything else, make sure reconciliation passes at least once.
// This makes the initial startup of a cluster much cleaner as we don't end up
// starting the scheduler/controller-manager/etc just to get them to immediately
// fail and back off with 'unauthorized'.
- startLogging := time.Now().Add(2 * time.Second)
- supervisor.Logger(ctx).Infof("Performing initial resource reconciliation...")
- for {
- err := reconciler.ReconcileAll(ctx, clientSet)
- if err == nil {
- supervisor.Logger(ctx).Infof("Initial resource reconciliation succeeded.")
- break
- }
- if time.Now().After(startLogging) {
- supervisor.Logger(ctx).Errorf("Still couldn't do initial reconciliation: %v", err)
- startLogging = time.Now().Add(10 * time.Second)
- }
- time.Sleep(100 * time.Millisecond)
+ supervisor.Logger(ctx).Info("Waiting for reconciler...")
+ err = reconciler.WaitReady(ctx, etcd)
+ if err != nil {
+ return fmt.Errorf("while waiting for reconciler: %w", err)
}
+ supervisor.Logger(ctx).Info("Reconciler is done.")
authProxy := authproxy.Service{
KPKI: s.c.KPKI,
@@ -171,7 +185,6 @@
}{
{"controller-manager", runControllerManager(*controllerManagerConfig)},
{"scheduler", runScheduler(*schedulerConfig)},
- {"reconciler", reconciler.Maintain(clientSet)},
{"authproxy", authProxy.Run},
{"metricsproxy", metricsProxy.Run},
} {
diff --git a/version/version.go b/version/version.go
index b495528..668e0fa 100644
--- a/version/version.go
+++ b/version/version.go
@@ -10,18 +10,17 @@
// Release converts a spec.Version's Release field into a SemVer 2.0.0 compatible
// string in the X.Y.Z form.
-func Release(v *spec.Version) string {
- if v == nil || v.Release == nil {
+func Release(rel *spec.Version_Release) string {
+ if rel == nil {
return "0.0.0"
}
- rel := v.Release
return fmt.Sprintf("%d.%d.%d", rel.Major, rel.Minor, rel.Patch)
}
// Semver converts a spec.Version proto message into a SemVer 2.0.0 compatible
// string.
func Semver(v *spec.Version) string {
- ver := "v" + Release(v)
+ ver := "v" + Release(v.Release)
var prerelease []string
if git := v.GitInformation; git != nil {
if n := git.CommitsSinceRelease; n != 0 {
@@ -38,3 +37,17 @@
}
return ver
}
+
+// ReleaseLessThan returns true if Release a is lexicographically smaller than b.
+func ReleaseLessThan(a, b *spec.Version_Release) bool {
+ if a.Major != b.Major {
+ return a.Major < b.Major
+ }
+ if a.Minor != b.Minor {
+ return a.Minor < b.Minor
+ }
+ if a.Patch != b.Patch {
+ return a.Patch < b.Patch
+ }
+ return false
+}