m/n/core/curator: implement leader election

This implements the leader election functionality subset of the curator.
It does not yet implement any business logic, just the switchover
between acting as a leader and a follower.

Test plan: implements an integration test for the leader election with
an in-memory etcd cluster.

Change-Id: Id77ecc35a9f2b18e716fffd3caf2de193982d676
Reviewed-on: https://review.monogon.dev/c/monogon/+/184
Reviewed-by: Lorenz Brun <lorenz@nexantic.com>
diff --git a/metropolis/node/core/BUILD.bazel b/metropolis/node/core/BUILD.bazel
index 24200d8..be34ea0 100644
--- a/metropolis/node/core/BUILD.bazel
+++ b/metropolis/node/core/BUILD.bazel
@@ -16,6 +16,7 @@
     deps = [
         "//metropolis/node:go_default_library",
         "//metropolis/node/core/cluster:go_default_library",
+        "//metropolis/node/core/curator:go_default_library",
         "//metropolis/node/core/localstorage:go_default_library",
         "//metropolis/node/core/localstorage/declarative:go_default_library",
         "//metropolis/node/core/network:go_default_library",
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
index eede40f..cc98d8e 100644
--- a/metropolis/node/core/cluster/manager.go
+++ b/metropolis/node/core/cluster/manager.go
@@ -53,6 +53,7 @@
 
 const (
 	ConsensusUserKubernetesPKI ConsensusUser = "kubernetes-pki"
+	ConsensusUserCurator       ConsensusUser = "curator"
 )
 
 // ConsensusClient returns an etcd/consensus client for a given ConsensusUser.
@@ -73,6 +74,7 @@
 	// casting to ConsensusUser from an arbitrary string.
 	switch user {
 	case ConsensusUserKubernetesPKI:
+	case ConsensusUserCurator:
 	default:
 		return nil, fmt.Errorf("unknown ConsensusUser %q", user)
 	}
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
new file mode 100644
index 0000000..7ff9a0e
--- /dev/null
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -0,0 +1,32 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+    name = "go_default_library",
+    srcs = ["curator.go"],
+    importpath = "source.monogon.dev/metropolis/node/core/curator",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//metropolis/node/core/consensus/client:go_default_library",
+        "//metropolis/node/core/curator/proto/private:go_default_library",
+        "//metropolis/node/core/localstorage:go_default_library",
+        "//metropolis/pkg/event:go_default_library",
+        "//metropolis/pkg/event/memory:go_default_library",
+        "//metropolis/pkg/supervisor:go_default_library",
+        "@io_etcd_go_etcd//clientv3/concurrency:go_default_library",
+        "@org_golang_google_protobuf//proto:go_default_library",
+    ],
+)
+
+go_test(
+    name = "go_default_test",
+    srcs = ["curator_test.go"],
+    embed = [":go_default_library"],
+    deps = [
+        "//metropolis/node/core/consensus/client:go_default_library",
+        "//metropolis/node/core/localstorage:go_default_library",
+        "//metropolis/node/core/localstorage/declarative:go_default_library",
+        "//metropolis/pkg/supervisor:go_default_library",
+        "@io_etcd_go_etcd//clientv3:go_default_library",
+        "@io_etcd_go_etcd//integration:go_default_library",
+    ],
+)
diff --git a/metropolis/node/core/curator/curator.go b/metropolis/node/core/curator/curator.go
new file mode 100644
index 0000000..fe7aabf
--- /dev/null
+++ b/metropolis/node/core/curator/curator.go
@@ -0,0 +1,276 @@
+// package curator implements the Curator, a service responsible for management
+// of the Metropolis cluster that it is running on.
+//
+// The Curator is implemented as a leader-elected service. Instances of the
+// service are running colocated with all nodes that run a consensus (etcd)
+// server.
+// Each instance listens locally over gRPC for requests from code running on the
+// same node, and publicly over gRPC for traffic from other nodes (eg. ones that
+// do not run an instance of the Curator) and external users.
+// The curator leader keeps its state fully in etcd. Followers forward all
+// requests to the active leader.
+package curator
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"time"
+
+	"go.etcd.io/etcd/clientv3/concurrency"
+	"google.golang.org/protobuf/proto"
+
+	"source.monogon.dev/metropolis/node/core/consensus/client"
+	ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/pkg/event"
+	"source.monogon.dev/metropolis/pkg/event/memory"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// Config is the configuration of the curator.
+type Config struct {
+	// Etcd is an etcd client in which all curator storage and leader election
+	// will be kept.
+	Etcd client.Namespaced
+	// NodeID is the ID of the node that this curator will run on. It's used to
+	// populate the leader election lock.
+	NodeID string
+	// LeaderTTL is the timeout on the lease used to perform leader election.
+	// Any active leader must continue updating its lease at least this often,
+	// or the lease (and leadership) will be lost.
+	// Lower values allow for faster failovers. Higher values allow for higher
+	// resiliency against short network partitions.
+	// A value less or equal to zero will default to 60 seconds.
+	LeaderTTL time.Duration
+	// Directory is the curator ephemeral directory in which the curator will
+	// store its local domain socket for connections from the node.
+	Directory *localstorage.EphemeralCuratorDirectory
+}
+
+// Service is the Curator service. See the package-level documentation for more
+// information.
+type Service struct {
+	// config is the configuration with which the service was started.
+	config *Config
+
+	// ttl is the effective TTL value from Config.LeaderTTL (if given as <= 0,
+	// this is the value that has been fixed up to some default).
+	ttl int
+
+	// status is a memory Event Value for keeping the electionStatus of this
+	// instance. It is not exposed to users of the Curator.
+	status memory.Value
+}
+
+// New creates a new curator Service.
+func New(cfg Config) *Service {
+	return &Service{
+		config: &cfg,
+	}
+}
+
+// electionStatus represents the status of this curator's leader election
+// attempts within the cluster.
+type electionStatus struct {
+	// leader is set if this curator is a leader, nil otherwise. This cannot be set
+	// if follower is also set, both leader and follower might be nil to signify
+	// that this curator instance is not part of a quorum.
+	leader *electionStatusLeader
+	// follower is set if the curator is a follower for another leader, nil
+	// otherwise. This cannot be set if leader is also set. However, both leader and
+	// follower might be nil to signify that this curator instance is not part of a
+	// quorum.
+	follower *electionStatusFollower
+}
+
+type electionStatusLeader struct {
+	// lockKey is the etcd key for which a lease value is set with revision
+	// lockRev. This key/revision must be ensured to exist when any etcd access
+	// is performed by the curator to ensure that it is still the active leader
+	// according to the rest of the cluster.
+	lockKey string
+	lockRev int64
+}
+
+type electionStatusFollower struct {
+	lock *ppb.LeaderElectionValue
+}
+
+func (s *Service) electionWatch() electionWatcher {
+	return electionWatcher{
+		Watcher: s.status.Watch(),
+	}
+}
+
+// electionWatcher is a type-safe wrapper around event.Watcher which provides
+// electionStatus values.
+type electionWatcher struct {
+	event.Watcher
+}
+
+// get retrieves an electionStatus from the electionWatcher.
+func (w *electionWatcher) get(ctx context.Context) (*electionStatus, error) {
+	val, err := w.Watcher.Get(ctx)
+	if err != nil {
+		return nil, err
+	}
+	status := val.(electionStatus)
+	return &status, err
+}
+
+// buildLockValue returns a serialized etcd value that will be set by the
+// instance when it becomes a leader. This value is a serialized
+// LeaderElectionValue from private/storage.proto.
+func (c *Config) buildLockValue(ttl int) ([]byte, error) {
+	v := &ppb.LeaderElectionValue{
+		NodeId: c.NodeID,
+		Ttl:    uint64(ttl),
+	}
+	bytes, err := proto.Marshal(v)
+	if err != nil {
+		return nil, fmt.Errorf("when marshaling value: %w", err)
+	}
+	return bytes, nil
+}
+
+var (
+	// electionPrefix is the prefix under which the curator instances will
+	// attempt to perform leader election.
+	//
+	// The trailing slash is omitted, as the etcd concurrency library appends one.
+	electionPrefix = "/leader"
+)
+
+// elect runs a single leader election attempt. The status of the service will
+// be updated with electionStatus values as the election makes progress.
+func (s *Service) elect(ctx context.Context) error {
+	lv, err := s.config.buildLockValue(s.ttl)
+	if err != nil {
+		return fmt.Errorf("building lock value failed: %w", err)
+	}
+
+	// Establish a lease/session with etcd.
+	session, err := concurrency.NewSession(s.config.Etcd.ThinClient(ctx),
+		concurrency.WithContext(ctx),
+		concurrency.WithTTL(s.ttl))
+	if err != nil {
+		return fmt.Errorf("creating session failed: %w", err)
+	}
+
+	// Kill the session whenever we lose leadership or error out.
+	defer func() {
+		err := session.Close()
+		if err != nil {
+			supervisor.Logger(ctx).Warningf("Failed to close session: %v", err)
+		}
+	}()
+
+	supervisor.Logger(ctx).Infof("Curator established lease, ID: %d", session.Lease())
+	election := concurrency.NewElection(session, electionPrefix)
+
+	// Observer context, we need to cancel it to not leak the observer
+	// goroutine/channel.
+	octx, octxC := context.WithCancel(ctx)
+	defer octxC()
+
+	// Channel that gets updates about the current leader in the cluster.
+	observerC := election.Observe(octx)
+	// Channel that gets updates about this instance becoming a leader.
+	campaignerC := make(chan error)
+
+	// Campaign to become leader. This blocks until leader election is successful
+	// and this instance is now the leader.
+	//
+	// The lock value is converted to string from raw binary bytes, but that's fine
+	// as that string is converted back to []byte within the etcd client library (in
+	// OpPut).
+	go func() {
+		campaignerC <- election.Campaign(ctx, string(lv))
+	}()
+
+	// While campaigning, update the electionStatus with information about the
+	// current leader.
+	for {
+		select {
+		case o := <-observerC:
+			var lock ppb.LeaderElectionValue
+			if err := proto.Unmarshal(o.Kvs[0].Value, &lock); err != nil {
+				return fmt.Errorf("parsing existing lock value failed: %w", err)
+			}
+			s.status.Set(electionStatus{
+				follower: &electionStatusFollower{
+					lock: &lock,
+				},
+			})
+		case err = <-campaignerC:
+			if err == nil {
+				goto campaigned
+			}
+			return fmt.Errorf("campaigning failed: %w", err)
+		}
+	}
+
+campaigned:
+	supervisor.Logger(ctx).Info("Curator became leader.")
+
+	// Update status, watchers will now know that this curator is the leader.
+	s.status.Set(electionStatus{
+		leader: &electionStatusLeader{
+			lockKey: election.Key(),
+			lockRev: election.Rev(),
+		},
+	})
+
+	// Wait until either we loose the lease/session or our context expires.
+	select {
+	case <-ctx.Done():
+		supervisor.Logger(ctx).Warningf("Context canceled, quitting.")
+		return fmt.Errorf("curator session canceled: %w", ctx.Err())
+	case <-session.Done():
+		supervisor.Logger(ctx).Warningf("Session done, quitting.")
+		return fmt.Errorf("curator session done")
+	}
+
+}
+
+func (s *Service) Run(ctx context.Context) error {
+	// Start local election watcher. This logs what this curator knows about its own
+	// leadership.
+	go func() {
+		w := s.electionWatch()
+		for {
+			s, err := w.get(ctx)
+			if err != nil {
+				supervisor.Logger(ctx).Warningf("Election watcher existing: get(): %w", err)
+				return
+			}
+			if l := s.leader; l != nil {
+				supervisor.Logger(ctx).Infof("Election watcher: this node's curator is leader (lock key %q, rev %d)", l.lockKey, l.lockRev)
+			} else {
+				supervisor.Logger(ctx).Infof("Election watcher: this node's curator is a follower")
+			}
+		}
+	}()
+
+	// Calculate effective TTL. This replicates the behaviour of clientv3's WithTTL,
+	// but allows us to explicitly log the used TTL.
+	s.ttl = int(s.config.LeaderTTL.Seconds())
+	if s.ttl <= 0 {
+		s.ttl = 60
+	}
+	supervisor.Logger(ctx).Infof("Curator starting on prefix %q with lease TTL of %d seconds...", electionPrefix, s.ttl)
+
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	for {
+		s.status.Set(electionStatus{})
+		err := s.elect(ctx)
+		s.status.Set(electionStatus{})
+
+		if err != nil && errors.Is(err, ctx.Err()) {
+			return fmt.Errorf("election round failed due to context cancelation, not attempting to re-elect: %w", err)
+		}
+		supervisor.Logger(ctx).Infof("Curator election round done: %v", err)
+		supervisor.Logger(ctx).Info("Curator election restarting...")
+	}
+}
diff --git a/metropolis/node/core/curator/curator_test.go b/metropolis/node/core/curator/curator_test.go
new file mode 100644
index 0000000..6ce489d
--- /dev/null
+++ b/metropolis/node/core/curator/curator_test.go
@@ -0,0 +1,346 @@
+package curator
+
+import (
+	"context"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"testing"
+	"time"
+
+	"go.etcd.io/etcd/clientv3"
+	"go.etcd.io/etcd/integration"
+
+	"source.monogon.dev/metropolis/node/core/consensus/client"
+	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/localstorage/declarative"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+var (
+	// cluster is a 3-member in-memory etcd cluster for testing.
+	cluster *integration.ClusterV3
+	// endpoints is a list of the three etcd members that make up the cluster above.
+	endpoints []string
+)
+
+// TestMain brings up a 3 node etcd cluster for tests to use.
+func TestMain(m *testing.M) {
+	cfg := integration.ClusterConfig{
+		Size:                 3,
+		GRPCKeepAliveMinTime: time.Millisecond,
+	}
+	cluster = integration.NewClusterV3(nil, &cfg)
+	endpoints = make([]string, 3)
+	for i := range endpoints {
+		endpoints[i] = cluster.Client(i).Endpoints()[0]
+	}
+
+	v := m.Run()
+	cluster.Terminate(nil)
+	os.Exit(v)
+}
+
+// dut is the design under test harness - in this case, a curator instance.
+type dut struct {
+	// endpoint of the etcd server that this instance is connected to. Each instance
+	// connects to a different member of the etcd cluster so that we can easily
+	// inject partitions between curator instances.
+	endpoint string
+	// instance is the curator Service instance itself.
+	instance *Service
+
+	// temporary directory in which the Curator's ephemeral directory is placed.
+	// Needs to be cleaned up.
+	temporary string
+}
+
+func (d *dut) cleanup() {
+	os.RemoveAll(d.temporary)
+}
+
+// newDut creates a new dut harness for a curator instance, connected to a given
+// etcd endpoint.
+func newDut(ctx context.Context, t *testing.T, endpoint string) *dut {
+	t.Helper()
+	// Create new etcd client to the given endpoint.
+	cli, err := clientv3.New(clientv3.Config{
+		Endpoints:            []string{endpoint},
+		DialTimeout:          1 * time.Second,
+		DialKeepAliveTime:    1 * time.Second,
+		DialKeepAliveTimeout: 1 * time.Second,
+		Context:              ctx,
+	})
+	if err != nil {
+		t.Fatalf("clientv3.New: %v", err)
+	}
+
+	// Create ephemeral directory for curator and place it into /tmp.
+	dir := localstorage.EphemeralCuratorDirectory{}
+	tmp, err := ioutil.TempDir("", "curator-test-*")
+	if err != nil {
+		t.Fatalf("TempDir: %v", err)
+	}
+	err = declarative.PlaceFS(&dir, tmp)
+	if err != nil {
+		t.Fatalf("PlaceFS: %v", err)
+	}
+
+	id := fmt.Sprintf("test-%s", endpoint)
+	svc := New(Config{
+		Etcd:      client.NewLocal(cli),
+		NodeID:    id,
+		LeaderTTL: time.Second,
+		Directory: &dir,
+	})
+	if err := supervisor.Run(ctx, id, svc.Run); err != nil {
+		t.Fatalf("Run %s: %v", id, err)
+	}
+	return &dut{
+		endpoint:  endpoint,
+		instance:  svc,
+		temporary: tmp,
+	}
+}
+
+// dutSet is a collection of duts keyed by endpoint to which they're connected.
+// Since each dut is connected to a different endpoint in these tests, the
+// endpoint is used as a unique identifier for each dut/instance.
+type dutSet map[string]*dut
+
+// dutUpdate is an update from a dut's Curator instance - either a new
+// electionStatus or an error while retrieving it.
+type dutUpdate struct {
+	// endpoint to which this dut's Curator instance is connected.
+	endpoint string
+	// status received from the dut's Curator instance, or nil if err is set.
+	status *electionStatus
+	err    error
+}
+
+// dutSetStatus is a point-in-time snapshot of the electionStatus of Curator
+// instances, keyed by endpoints in the same way as dutSet.
+type dutSetStatus map[string]*electionStatus
+
+// leaders returns a list of endpoints that currently see themselves as leaders.
+func (d dutSetStatus) leaders() []string {
+	var res []string
+	for e, s := range d {
+		if s.leader != nil {
+			res = append(res, e)
+		}
+	}
+	return res
+}
+
+// followers returns a list of endpoints that currently see themselves as
+// followers.
+func (d dutSetStatus) followers() []string {
+	var res []string
+	for e, s := range d {
+		if s.follower != nil {
+			res = append(res, e)
+		}
+	}
+	return res
+}
+
+// wait blocks until the dutSetStatus of a given dutSet reaches some state (as
+// implemented by predicate f).
+func (s dutSet) wait(ctx context.Context, f func(s dutSetStatus) bool) (dutSetStatus, error) {
+	ctx2, ctxC := context.WithCancel(ctx)
+	defer ctxC()
+
+	// dss is the dutSetStatus which we will keep updating with the electionStatus
+	// of each dut's Curator as long as predicate f returns false.
+	dss := make(dutSetStatus)
+
+	// updC is a channel of updates from all dut's electionStatus watchers. The
+	// dutUpdate type contains the endpoint to distinguish the source of each
+	// update.
+	updC := make(chan dutUpdate)
+
+	// Run a watcher for each dut which sends that dut's newest available
+	// electionStatus (or error) to updC.
+	for e, d := range s {
+		w := d.instance.electionWatch()
+		go func(e string, w electionWatcher) {
+			defer w.Close()
+			for {
+				s, err := w.get(ctx2)
+				if err != nil {
+					updC <- dutUpdate{
+						endpoint: e,
+						err:      err,
+					}
+					return
+				}
+				updC <- dutUpdate{
+					endpoint: e,
+					status:   s,
+				}
+			}
+		}(e, w)
+	}
+
+	// Keep updating dss with updates from updC and call f on every change.
+	for {
+		select {
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		case u := <-updC:
+			if u.err != nil {
+				return nil, fmt.Errorf("from %q: %w", u.endpoint, u.err)
+			}
+			dss[u.endpoint] = u.status
+		}
+
+		if f(dss) {
+			return dss, nil
+		}
+	}
+}
+
+// TestLeaderElectionStatus exercises the electionStatus watch/get functionality
+// from the Curator code. It spawns a cluster of three curators and ensures all
+// of them respond correctly to election, partitioning and subsequent
+// re-election.
+func TestLeaderElectionStatus(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	// Map from endpoint name to etcd member list index. Alongside with the
+	// endpoints list, this is used to quickly look up endpoint<->member_num. Since
+	// we only have one Curator instance per etcd member, we can use the instance's
+	// etcd endpoint as a unique key to identify it.
+	endpointToNum := map[string]int{
+		endpoints[0]: 0,
+		endpoints[1]: 1,
+		endpoints[2]: 2,
+	}
+
+	// Start a new supervisor in which we create all curator DUTs.
+	dutC := make(chan *dut)
+	supervisor.New(ctx, func(ctx context.Context) error {
+		for e, _ := range endpointToNum {
+			dutC <- newDut(ctx, t, e)
+		}
+		close(dutC)
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		supervisor.Signal(ctx, supervisor.SignalDone)
+		return nil
+	})
+
+	// Build dutSet, ie. map from endpoint to Curator DUT.
+	duts := make(dutSet)
+	for d := range dutC {
+		duts[d.endpoint] = d
+	}
+	// Schedule cleanup for all DUTs.
+	defer func() {
+		for _, dut := range duts {
+			dut.cleanup()
+		}
+	}()
+
+	// Wait until we have a Curator leader.
+	dss, err := duts.wait(ctx, func(dss dutSetStatus) bool {
+		return len(dss.leaders()) == 1 && len(dss.followers()) == 2
+	})
+	if err != nil {
+		t.Fatalf("waiting for dut set: %v", err)
+	}
+	leaderEndpoint := dss.leaders()[0]
+
+	// Retrieve key and rev from Curator's leader. We will later test to ensure
+	// these have changed when we switch to another leader and back.
+	key := dss[leaderEndpoint].leader.lockKey
+	rev := dss[leaderEndpoint].leader.lockRev
+	leaderNodeID := duts[leaderEndpoint].instance.config.NodeID
+	leaderNum := endpointToNum[leaderEndpoint]
+
+	// Ensure the leader/follower data in the electionStatus are as expected.
+	for endpoint, status := range dss {
+		if endpoint == leaderEndpoint {
+			// The leader instance should not also be a follower.
+			if status.follower != nil {
+				t.Errorf("leader cannot also be a follower")
+			}
+		} else {
+			// The follower instances should also not be leaders.
+			if status.leader != nil {
+				t.Errorf("instance %q is leader", endpoint)
+			}
+			follower := status.follower
+			if follower == nil {
+				t.Errorf("instance %q is not a follower", endpoint)
+				continue
+			}
+			// The follower instances should point to the leader in their seen lock.
+			if want, got := leaderNodeID, follower.lock.NodeId; want != got {
+				t.Errorf("instance %q sees node id %q as follower, wanted %q", endpoint, want, got)
+			}
+		}
+	}
+
+	// Partition off leader's etcd instance from other instances.
+	for n, member := range cluster.Members {
+		if n == leaderNum {
+			continue
+		}
+		cluster.Members[leaderNum].InjectPartition(t, member)
+	}
+
+	// Wait until we switch leaders
+	dss, err = duts.wait(ctx, func(dss dutSetStatus) bool {
+		// Ensure we've lost leadership on the initial leader.
+		if i, ok := dss[leaderEndpoint]; ok && i.leader != nil {
+			return false
+		}
+		return len(dss.leaders()) == 1 && len(dss.followers()) == 1
+	})
+	if err != nil {
+		t.Fatalf("waiting for dut set: %v", err)
+	}
+	newLeaderEndpoint := dss.leaders()[0]
+
+	// Ensure the old instance is neither leader nor follower (signaling loss of
+	// quorum).
+	if want, got := false, dss[leaderEndpoint].leader != nil; want != got {
+		t.Errorf("old leader's leadership is %v, wanted %v", want, got)
+	}
+	if want, got := false, dss[leaderEndpoint].follower != nil; want != got {
+		t.Errorf("old leader's followership is %v, wanted %v", want, got)
+	}
+
+	// Get new leader's key and rev.
+	newKey := dss[newLeaderEndpoint].leader.lockKey
+	newRev := dss[newLeaderEndpoint].leader.lockRev
+	newLeaderNodeID := duts[newLeaderEndpoint].instance.config.NodeID
+
+	if leaderEndpoint == newLeaderEndpoint {
+		t.Errorf("leader endpoint didn't change (%q -> %q)", leaderEndpoint, newLeaderEndpoint)
+	}
+	if key == newKey {
+		t.Errorf("leader election key didn't change (%q -> %q)", key, newKey)
+	}
+	if rev == newRev {
+		t.Errorf("leader election rev didn't change (%d -> %d)", rev, newRev)
+	}
+
+	// Ensure the last node of the cluster (not the current leader and not the
+	// previous leader) is now a follower pointing at the new leader.
+	for endpoint, status := range dss {
+		if endpoint == leaderEndpoint || endpoint == newLeaderEndpoint {
+			continue
+		}
+		follower := status.follower
+		if follower == nil {
+			t.Errorf("instance %q is not a follower", endpoint)
+			continue
+		}
+		if want, got := newLeaderNodeID, follower.lock.NodeId; want != got {
+			t.Errorf("instance %q sees node id %q as follower, wanted %q", endpoint, want, got)
+		}
+	}
+}
diff --git a/metropolis/node/core/curator/proto/private/BUILD.bazel b/metropolis/node/core/curator/proto/private/BUILD.bazel
new file mode 100644
index 0000000..d926d56
--- /dev/null
+++ b/metropolis/node/core/curator/proto/private/BUILD.bazel
@@ -0,0 +1,23 @@
+load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
+
+proto_library(
+    name = "private_proto",
+    srcs = ["lock.proto"],
+    visibility = ["//visibility:public"],
+)
+
+go_proto_library(
+    name = "private_go_proto",
+    importpath = "source.monogon.dev/metropolis/node/core/curator/proto/private",
+    proto = ":private_proto",
+    visibility = ["//visibility:public"],
+)
+
+go_library(
+    name = "go_default_library",
+    embed = [":private_go_proto"],
+    importpath = "source.monogon.dev/metropolis/node/core/curator/proto/private",
+    visibility = ["//visibility:public"],
+)
diff --git a/metropolis/node/core/curator/proto/private/lock.proto b/metropolis/node/core/curator/proto/private/lock.proto
new file mode 100644
index 0000000..e7d7c20
--- /dev/null
+++ b/metropolis/node/core/curator/proto/private/lock.proto
@@ -0,0 +1,16 @@
+syntax = "proto3";
+option go_package = "source.monogon.dev/metropolis/node/core/curator/proto/private";
+package metropolis.node.core.curator.proto.private;
+
+// LeaderElectionValue is the value set under an election key by the curator
+// leader. It is used by curator followers to be able to contact the current
+// leader and forward any client requests to it.
+message LeaderElectionValue {
+    // node_id is the ID of the node whose curator is acting as the leader.
+    string node_id = 1;
+    // ttl is the time-to-live set on the underlying session used for leader
+    // election. It is effectively an upper bound on how long the leader might
+    // be unavailable for until another curator instance can be elected as a
+    // replacement leader.
+    uint64 ttl = 2;
+}
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index eb4c6c7..d9f408e 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -28,12 +28,14 @@
 	"os"
 	"os/signal"
 	"runtime/debug"
+	"time"
 
 	"golang.org/x/sys/unix"
 	"google.golang.org/grpc"
 
 	common "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/cluster"
+	"source.monogon.dev/metropolis/node/core/curator"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/localstorage/declarative"
 	"source.monogon.dev/metropolis/node/core/network"
@@ -165,6 +167,22 @@
 			return fmt.Errorf("new couldn't find home in new cluster, aborting: %w", err)
 		}
 
+		// Start cluster curator.
+		kv, err := status.ConsensusClient(cluster.ConsensusUserCurator)
+		if err != nil {
+			return fmt.Errorf("failed to retrieve consensus curator client: %w", err)
+		}
+		c := curator.New(curator.Config{
+			Etcd:   kv,
+			NodeID: status.Node.ID(),
+			// TODO(q3k): make this configurable?
+			LeaderTTL: time.Second * 5,
+			Directory: &root.Ephemeral.Curator,
+		})
+		if err := supervisor.Run(ctx, "curator", c.Run); err != nil {
+			return fmt.Errorf("when starting curator: %w", err)
+		}
+
 		// We are now in a cluster. We can thus access our 'node' object and
 		// start all services that we should be running.