m/n/k/reconciler: implement leader election

Before this change, the reconciler runs on all Kubernetes controllers. 
When we are in a rolling upgrade of the cluster where a reconciled 
object changes, this will cause the old and new versions of the 
reconciler to fight each other, constantly updating the object back and 
forth.

Now, the reconciler is elected among nodes of the latest release. The 
status of the reconciliation is communicated to all Kubernetes 
controllers through a new key-value in etcd.

Additionally, compatibility constraints can be expressed by changing the 
constants minReconcilerRelease and minApiserverRelease, allowing 
reconciliation to happen in a controlled way that ensures compatibility 
even during rolling upgrades.

Change-Id: Iaf7c27702bd9809a13d47bcf041b71438353bef2
Reviewed-on: https://review.monogon.dev/c/monogon/+/3062
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/kubernetes/reconciler/reconciler_status_test.go b/metropolis/node/kubernetes/reconciler/reconciler_status_test.go
new file mode 100644
index 0000000..bd627a2
--- /dev/null
+++ b/metropolis/node/kubernetes/reconciler/reconciler_status_test.go
@@ -0,0 +1,305 @@
+package reconciler
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"go.etcd.io/etcd/tests/v3/integration"
+	"google.golang.org/protobuf/proto"
+	"k8s.io/client-go/kubernetes/fake"
+
+	"source.monogon.dev/metropolis/node/core/consensus/client"
+	"source.monogon.dev/metropolis/node/core/curator"
+	ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+	cpb "source.monogon.dev/metropolis/proto/common"
+	mversion "source.monogon.dev/metropolis/version"
+	"source.monogon.dev/version"
+	vpb "source.monogon.dev/version/spec"
+)
+
+// TestMinimumReleasesNotAboveMetropolisRelease tests that minimum releases
+// are not above the metropolis release itself, because that would cause
+// things to get stuck.
+func TestMinimumReleasesNotAboveMetropolisRelease(t *testing.T) {
+	if version.ReleaseLessThan(mversion.Version.Release, minReconcilerRelease) {
+		t.Errorf("Metropolis release %s is below the minimum reconciler release %s",
+			version.Semver(mversion.Version),
+			version.Release(minReconcilerRelease),
+		)
+	}
+	if version.ReleaseLessThan(mversion.Version.Release, minApiserverRelease) {
+		t.Errorf("Metropolis release %s is below the minimum apiserver release %s",
+			version.Semver(mversion.Version),
+			version.Release(minApiserverRelease),
+		)
+	}
+}
+
+// startEtcd creates an etcd cluster and client for testing.
+func startEtcd(t *testing.T) client.Namespaced {
+	t.Helper()
+	// Start a single-node etcd cluster.
+	integration.BeforeTestExternal(t)
+	cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	t.Cleanup(func() {
+		cluster.Terminate(t)
+	})
+	// Create etcd client to test cluster.
+	curEtcd, _ := client.NewLocal(cluster.Client(0)).Sub("curator")
+	return curEtcd
+}
+
+func setStatus(t *testing.T, cl client.Namespaced, status *ppb.KubernetesReconcilerStatus) {
+	t.Helper()
+	ctx := context.Background()
+
+	statusBytes, err := proto.Marshal(status)
+	if err != nil {
+		t.Fatalf("Failed to marshal status: %v", err)
+	}
+
+	_, err = cl.Put(ctx, statusKey, string(statusBytes))
+	if err != nil {
+		t.Fatalf("Put: %v", err)
+	}
+}
+
+func makeNode(isController bool, release *vpb.Version_Release) *ppb.Node {
+	node := &ppb.Node{
+		Roles: &cpb.NodeRoles{},
+		Status: &cpb.NodeStatus{
+			Version: &vpb.Version{Release: release},
+		},
+	}
+	if isController {
+		node.Roles.KubernetesController = &cpb.NodeRoles_KubernetesController{}
+	}
+	return node
+}
+
+// putNode puts the node into etcd, or deletes if nil.
+// It returns the etcd revision of the operation.
+func putNode(t *testing.T, cl client.Namespaced, id string, node *ppb.Node) int64 {
+	t.Helper()
+	ctx := context.Background()
+
+	nkey, err := curator.NodeEtcdPrefix.Key(id)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if node != nil {
+		nodeBytes, err := proto.Marshal(node)
+		if err != nil {
+			t.Fatalf("Failed to marshal node: %v", err)
+		}
+		resp, err := cl.Put(ctx, nkey, string(nodeBytes))
+		if err != nil {
+			t.Fatalf("Put: %v", err)
+		}
+		return resp.Header.Revision
+	} else {
+		resp, err := cl.Delete(ctx, nkey)
+		if err != nil {
+			t.Fatalf("Delete: %v", err)
+		}
+		return resp.Header.Revision
+	}
+}
+
+// TestWaitReady tests that WaitReady does not return too early, and the test
+// will time out if WaitReady fails to return when it is supposed to.
+func TestWaitReady(t *testing.T) {
+	cl := startEtcd(t)
+
+	isReady := make(chan struct{})
+	supervisor.TestHarness(t, func(ctx context.Context) error {
+		err := WaitReady(ctx, cl)
+		if err != nil {
+			t.Error(err)
+		}
+		close(isReady)
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		supervisor.Signal(ctx, supervisor.SignalDone)
+		return nil
+	})
+
+	// status does not exist.
+	time.Sleep(10 * time.Millisecond)
+
+	// Version is too old.
+	setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
+		State: ppb.KubernetesReconcilerStatus_STATE_DONE,
+		Version: &vpb.Version{
+			Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 0},
+		},
+		MinimumCompatibleRelease: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 0},
+	})
+	time.Sleep(10 * time.Millisecond)
+
+	// MinimumCompatibleRelease is too new.
+	setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
+		State: ppb.KubernetesReconcilerStatus_STATE_DONE,
+		Version: &vpb.Version{
+			Release: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
+		},
+		MinimumCompatibleRelease: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
+	})
+	time.Sleep(10 * time.Millisecond)
+
+	select {
+	case <-isReady:
+		t.Fatal("WaitReady returned too early.")
+	default:
+	}
+
+	// Now set the status to something compatible.
+	setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
+		State: ppb.KubernetesReconcilerStatus_STATE_DONE,
+		Version: &vpb.Version{
+			Release: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
+		},
+		MinimumCompatibleRelease: mversion.Version.Release,
+	})
+
+	<-isReady
+}
+
+// TestWatchNodes ensures that WatchNodes always updates releases correctly
+// as nodes are changed in various ways.
+func TestWatchNodes(t *testing.T) {
+	ctx := context.Background()
+	cl := startEtcd(t)
+	s := Service{
+		Etcd: cl,
+	}
+	w := s.releases.Watch()
+	defer w.Close()
+
+	expectReleases := func(expectMin, expectMax string, expectRev int64) {
+		t.Helper()
+		releases, err := w.Get(ctx)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if actualMin := version.Release(releases.minRelease); actualMin != expectMin {
+			t.Fatalf("Expected minimum release %s, got %s", expectMin, actualMin)
+		}
+		if actualMax := version.Release(releases.maxRelease); actualMax != expectMax {
+			t.Fatalf("Expected maximum release %s, got %s", expectMax, actualMax)
+		}
+		if releases.revision != expectRev {
+			t.Fatalf("Expected revision %v, got %v", expectRev, releases.revision)
+		}
+	}
+
+	putNode(t, cl, "a1", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+	putNode(t, cl, "a2", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+	putNode(t, cl, "a3", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+	putNode(t, cl, "b", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 3}))
+	rev := putNode(t, cl, "c", makeNode(true, &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0}))
+
+	supervisor.TestHarness(t, s.watchNodes)
+	expectReleases("0.0.2", "10000.0.0", rev)
+	// Node a1 is no longer a Kubernetes controller.
+	rev = putNode(t, cl, "a1", makeNode(false, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+	expectReleases("0.0.2", "10000.0.0", rev)
+	// Node a2 is deleted.
+	rev = putNode(t, cl, "a2", nil)
+	expectReleases("0.0.2", "10000.0.0", rev)
+	// Node a3 changes release. Now, the minimum should change.
+	rev = putNode(t, cl, "a3", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 4}))
+	expectReleases("0.0.3", "10000.0.0", rev)
+}
+
+// TestService tests the entire service, checking that it reconciles
+// only in situations where it should.
+func TestService(t *testing.T) {
+	reconcileWait = 10 * time.Millisecond
+	cl := startEtcd(t)
+	clientset := fake.NewSimpleClientset()
+	s := Service{
+		Etcd:      cl,
+		ClientSet: clientset,
+		NodeID:    "testnode",
+	}
+
+	// This node is newer than the local node, election should not start.
+	putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0}))
+
+	cancelService, _ := supervisor.TestHarness(t, s.Run)
+
+	time.Sleep(50 * time.Millisecond)
+	if len(clientset.Actions()) != 0 {
+		t.Fatal("Actions shouldn't have been performed yet.")
+	}
+
+	// The status allows a too old node to start.
+	setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
+		State: ppb.KubernetesReconcilerStatus_STATE_DONE,
+		Version: &vpb.Version{
+			Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
+		},
+		MinimumCompatibleRelease: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
+	})
+
+	// This node is too old, before minApiserverRelease.
+	putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+
+	// watch-releases restarts with 500 ms backoff + randomization, so wait 1s.
+	time.Sleep(time.Second)
+	if len(clientset.Actions()) != 0 {
+		t.Fatal("Actions shouldn't have been performed yet.")
+	}
+
+	// Upgrade the node.
+	putNode(t, cl, "a", makeNode(true, minApiserverRelease))
+
+	// Wait for status to be set.
+	waitForActions := func() {
+		isReady := make(chan struct{})
+		supervisor.TestHarness(t, func(ctx context.Context) error {
+			err := WaitReady(ctx, cl)
+			if err != nil {
+				t.Error(err)
+			}
+			close(isReady)
+			supervisor.Signal(ctx, supervisor.SignalHealthy)
+			supervisor.Signal(ctx, supervisor.SignalDone)
+			return nil
+		})
+		<-isReady
+
+		if len(clientset.Actions()) == 0 {
+			t.Fatal("Actions should have been performed.")
+		}
+		clientset.ClearActions()
+	}
+	waitForActions()
+
+	// The status does not allow a too old node to start.
+	setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
+		State: ppb.KubernetesReconcilerStatus_STATE_DONE,
+		Version: &vpb.Version{
+			Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
+		},
+		MinimumCompatibleRelease: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
+	})
+
+	// This node is too old, before minApiserverRelease. But because it is not
+	// allowed to start, the reconciler is not blocked.
+	putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
+
+	// Start another instance. The old node is still leader.
+	supervisor.TestHarness(t, s.Run)
+
+	time.Sleep(50 * time.Millisecond)
+	if len(clientset.Actions()) != 0 {
+		t.Fatal("Actions shouldn't have been performed yet.")
+	}
+
+	// Stop the first instance. Now the second instance should get elected.
+	cancelService()
+	waitForActions()
+}