m/n/core/curator: implement leader election
This implements the leader election functionality subset of the curator.
It does not yet implement any business logic, just the switchover
between acting as a leader and a follower.
Test plan: implements an integration test for the leader election with
an in-memory etcd cluster.
Change-Id: Id77ecc35a9f2b18e716fffd3caf2de193982d676
Reviewed-on: https://review.monogon.dev/c/monogon/+/184
Reviewed-by: Lorenz Brun <lorenz@nexantic.com>
diff --git a/metropolis/node/core/BUILD.bazel b/metropolis/node/core/BUILD.bazel
index 24200d8..be34ea0 100644
--- a/metropolis/node/core/BUILD.bazel
+++ b/metropolis/node/core/BUILD.bazel
@@ -16,6 +16,7 @@
deps = [
"//metropolis/node:go_default_library",
"//metropolis/node/core/cluster:go_default_library",
+ "//metropolis/node/core/curator:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/localstorage/declarative:go_default_library",
"//metropolis/node/core/network:go_default_library",
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
index eede40f..cc98d8e 100644
--- a/metropolis/node/core/cluster/manager.go
+++ b/metropolis/node/core/cluster/manager.go
@@ -53,6 +53,7 @@
const (
ConsensusUserKubernetesPKI ConsensusUser = "kubernetes-pki"
+ ConsensusUserCurator ConsensusUser = "curator"
)
// ConsensusClient returns an etcd/consensus client for a given ConsensusUser.
@@ -73,6 +74,7 @@
// casting to ConsensusUser from an arbitrary string.
switch user {
case ConsensusUserKubernetesPKI:
+ case ConsensusUserCurator:
default:
return nil, fmt.Errorf("unknown ConsensusUser %q", user)
}
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
new file mode 100644
index 0000000..7ff9a0e
--- /dev/null
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -0,0 +1,32 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "go_default_library",
+ srcs = ["curator.go"],
+ importpath = "source.monogon.dev/metropolis/node/core/curator",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//metropolis/node/core/consensus/client:go_default_library",
+ "//metropolis/node/core/curator/proto/private:go_default_library",
+ "//metropolis/node/core/localstorage:go_default_library",
+ "//metropolis/pkg/event:go_default_library",
+ "//metropolis/pkg/event/memory:go_default_library",
+ "//metropolis/pkg/supervisor:go_default_library",
+ "@io_etcd_go_etcd//clientv3/concurrency:go_default_library",
+ "@org_golang_google_protobuf//proto:go_default_library",
+ ],
+)
+
+go_test(
+ name = "go_default_test",
+ srcs = ["curator_test.go"],
+ embed = [":go_default_library"],
+ deps = [
+ "//metropolis/node/core/consensus/client:go_default_library",
+ "//metropolis/node/core/localstorage:go_default_library",
+ "//metropolis/node/core/localstorage/declarative:go_default_library",
+ "//metropolis/pkg/supervisor:go_default_library",
+ "@io_etcd_go_etcd//clientv3:go_default_library",
+ "@io_etcd_go_etcd//integration:go_default_library",
+ ],
+)
diff --git a/metropolis/node/core/curator/curator.go b/metropolis/node/core/curator/curator.go
new file mode 100644
index 0000000..fe7aabf
--- /dev/null
+++ b/metropolis/node/core/curator/curator.go
@@ -0,0 +1,276 @@
+// package curator implements the Curator, a service responsible for management
+// of the Metropolis cluster that it is running on.
+//
+// The Curator is implemented as a leader-elected service. Instances of the
+// service are running colocated with all nodes that run a consensus (etcd)
+// server.
+// Each instance listens locally over gRPC for requests from code running on the
+// same node, and publicly over gRPC for traffic from other nodes (eg. ones that
+// do not run an instance of the Curator) and external users.
+// The curator leader keeps its state fully in etcd. Followers forward all
+// requests to the active leader.
+package curator
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "time"
+
+ "go.etcd.io/etcd/clientv3/concurrency"
+ "google.golang.org/protobuf/proto"
+
+ "source.monogon.dev/metropolis/node/core/consensus/client"
+ ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/pkg/event"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// Config is the configuration of the curator.
+type Config struct {
+ // Etcd is an etcd client in which all curator storage and leader election
+ // will be kept.
+ Etcd client.Namespaced
+ // NodeID is the ID of the node that this curator will run on. It's used to
+ // populate the leader election lock.
+ NodeID string
+ // LeaderTTL is the timeout on the lease used to perform leader election.
+ // Any active leader must continue updating its lease at least this often,
+ // or the lease (and leadership) will be lost.
+ // Lower values allow for faster failovers. Higher values allow for higher
+ // resiliency against short network partitions.
+ // A value less or equal to zero will default to 60 seconds.
+ LeaderTTL time.Duration
+ // Directory is the curator ephemeral directory in which the curator will
+ // store its local domain socket for connections from the node.
+ Directory *localstorage.EphemeralCuratorDirectory
+}
+
+// Service is the Curator service. See the package-level documentation for more
+// information.
+type Service struct {
+ // config is the configuration with which the service was started.
+ config *Config
+
+ // ttl is the effective TTL value from Config.LeaderTTL (if given as <= 0,
+ // this is the value that has been fixed up to some default).
+ ttl int
+
+ // status is a memory Event Value for keeping the electionStatus of this
+ // instance. It is not exposed to users of the Curator.
+ status memory.Value
+}
+
+// New creates a new curator Service.
+func New(cfg Config) *Service {
+ return &Service{
+ config: &cfg,
+ }
+}
+
+// electionStatus represents the status of this curator's leader election
+// attempts within the cluster.
+type electionStatus struct {
+ // leader is set if this curator is a leader, nil otherwise. This cannot be set
+ // if follower is also set, both leader and follower might be nil to signify
+ // that this curator instance is not part of a quorum.
+ leader *electionStatusLeader
+ // follower is set if the curator is a follower for another leader, nil
+ // otherwise. This cannot be set if leader is also set. However, both leader and
+ // follower might be nil to signify that this curator instance is not part of a
+ // quorum.
+ follower *electionStatusFollower
+}
+
+type electionStatusLeader struct {
+ // lockKey is the etcd key for which a lease value is set with revision
+ // lockRev. This key/revision must be ensured to exist when any etcd access
+ // is performed by the curator to ensure that it is still the active leader
+ // according to the rest of the cluster.
+ lockKey string
+ lockRev int64
+}
+
+type electionStatusFollower struct {
+ lock *ppb.LeaderElectionValue
+}
+
+func (s *Service) electionWatch() electionWatcher {
+ return electionWatcher{
+ Watcher: s.status.Watch(),
+ }
+}
+
+// electionWatcher is a type-safe wrapper around event.Watcher which provides
+// electionStatus values.
+type electionWatcher struct {
+ event.Watcher
+}
+
+// get retrieves an electionStatus from the electionWatcher.
+func (w *electionWatcher) get(ctx context.Context) (*electionStatus, error) {
+ val, err := w.Watcher.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+ status := val.(electionStatus)
+ return &status, err
+}
+
+// buildLockValue returns a serialized etcd value that will be set by the
+// instance when it becomes a leader. This value is a serialized
+// LeaderElectionValue from private/storage.proto.
+func (c *Config) buildLockValue(ttl int) ([]byte, error) {
+ v := &ppb.LeaderElectionValue{
+ NodeId: c.NodeID,
+ Ttl: uint64(ttl),
+ }
+ bytes, err := proto.Marshal(v)
+ if err != nil {
+ return nil, fmt.Errorf("when marshaling value: %w", err)
+ }
+ return bytes, nil
+}
+
+var (
+ // electionPrefix is the prefix under which the curator instances will
+ // attempt to perform leader election.
+ //
+ // The trailing slash is omitted, as the etcd concurrency library appends one.
+ electionPrefix = "/leader"
+)
+
+// elect runs a single leader election attempt. The status of the service will
+// be updated with electionStatus values as the election makes progress.
+func (s *Service) elect(ctx context.Context) error {
+ lv, err := s.config.buildLockValue(s.ttl)
+ if err != nil {
+ return fmt.Errorf("building lock value failed: %w", err)
+ }
+
+ // Establish a lease/session with etcd.
+ session, err := concurrency.NewSession(s.config.Etcd.ThinClient(ctx),
+ concurrency.WithContext(ctx),
+ concurrency.WithTTL(s.ttl))
+ if err != nil {
+ return fmt.Errorf("creating session failed: %w", err)
+ }
+
+ // Kill the session whenever we lose leadership or error out.
+ defer func() {
+ err := session.Close()
+ if err != nil {
+ supervisor.Logger(ctx).Warningf("Failed to close session: %v", err)
+ }
+ }()
+
+ supervisor.Logger(ctx).Infof("Curator established lease, ID: %d", session.Lease())
+ election := concurrency.NewElection(session, electionPrefix)
+
+ // Observer context, we need to cancel it to not leak the observer
+ // goroutine/channel.
+ octx, octxC := context.WithCancel(ctx)
+ defer octxC()
+
+ // Channel that gets updates about the current leader in the cluster.
+ observerC := election.Observe(octx)
+ // Channel that gets updates about this instance becoming a leader.
+ campaignerC := make(chan error)
+
+ // Campaign to become leader. This blocks until leader election is successful
+ // and this instance is now the leader.
+ //
+ // The lock value is converted to string from raw binary bytes, but that's fine
+ // as that string is converted back to []byte within the etcd client library (in
+ // OpPut).
+ go func() {
+ campaignerC <- election.Campaign(ctx, string(lv))
+ }()
+
+ // While campaigning, update the electionStatus with information about the
+ // current leader.
+ for {
+ select {
+ case o := <-observerC:
+ var lock ppb.LeaderElectionValue
+ if err := proto.Unmarshal(o.Kvs[0].Value, &lock); err != nil {
+ return fmt.Errorf("parsing existing lock value failed: %w", err)
+ }
+ s.status.Set(electionStatus{
+ follower: &electionStatusFollower{
+ lock: &lock,
+ },
+ })
+ case err = <-campaignerC:
+ if err == nil {
+ goto campaigned
+ }
+ return fmt.Errorf("campaigning failed: %w", err)
+ }
+ }
+
+campaigned:
+ supervisor.Logger(ctx).Info("Curator became leader.")
+
+ // Update status, watchers will now know that this curator is the leader.
+ s.status.Set(electionStatus{
+ leader: &electionStatusLeader{
+ lockKey: election.Key(),
+ lockRev: election.Rev(),
+ },
+ })
+
+ // Wait until either we loose the lease/session or our context expires.
+ select {
+ case <-ctx.Done():
+ supervisor.Logger(ctx).Warningf("Context canceled, quitting.")
+ return fmt.Errorf("curator session canceled: %w", ctx.Err())
+ case <-session.Done():
+ supervisor.Logger(ctx).Warningf("Session done, quitting.")
+ return fmt.Errorf("curator session done")
+ }
+
+}
+
+func (s *Service) Run(ctx context.Context) error {
+ // Start local election watcher. This logs what this curator knows about its own
+ // leadership.
+ go func() {
+ w := s.electionWatch()
+ for {
+ s, err := w.get(ctx)
+ if err != nil {
+ supervisor.Logger(ctx).Warningf("Election watcher existing: get(): %w", err)
+ return
+ }
+ if l := s.leader; l != nil {
+ supervisor.Logger(ctx).Infof("Election watcher: this node's curator is leader (lock key %q, rev %d)", l.lockKey, l.lockRev)
+ } else {
+ supervisor.Logger(ctx).Infof("Election watcher: this node's curator is a follower")
+ }
+ }
+ }()
+
+ // Calculate effective TTL. This replicates the behaviour of clientv3's WithTTL,
+ // but allows us to explicitly log the used TTL.
+ s.ttl = int(s.config.LeaderTTL.Seconds())
+ if s.ttl <= 0 {
+ s.ttl = 60
+ }
+ supervisor.Logger(ctx).Infof("Curator starting on prefix %q with lease TTL of %d seconds...", electionPrefix, s.ttl)
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ for {
+ s.status.Set(electionStatus{})
+ err := s.elect(ctx)
+ s.status.Set(electionStatus{})
+
+ if err != nil && errors.Is(err, ctx.Err()) {
+ return fmt.Errorf("election round failed due to context cancelation, not attempting to re-elect: %w", err)
+ }
+ supervisor.Logger(ctx).Infof("Curator election round done: %v", err)
+ supervisor.Logger(ctx).Info("Curator election restarting...")
+ }
+}
diff --git a/metropolis/node/core/curator/curator_test.go b/metropolis/node/core/curator/curator_test.go
new file mode 100644
index 0000000..6ce489d
--- /dev/null
+++ b/metropolis/node/core/curator/curator_test.go
@@ -0,0 +1,346 @@
+package curator
+
+import (
+ "context"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "testing"
+ "time"
+
+ "go.etcd.io/etcd/clientv3"
+ "go.etcd.io/etcd/integration"
+
+ "source.monogon.dev/metropolis/node/core/consensus/client"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/localstorage/declarative"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+var (
+ // cluster is a 3-member in-memory etcd cluster for testing.
+ cluster *integration.ClusterV3
+ // endpoints is a list of the three etcd members that make up the cluster above.
+ endpoints []string
+)
+
+// TestMain brings up a 3 node etcd cluster for tests to use.
+func TestMain(m *testing.M) {
+ cfg := integration.ClusterConfig{
+ Size: 3,
+ GRPCKeepAliveMinTime: time.Millisecond,
+ }
+ cluster = integration.NewClusterV3(nil, &cfg)
+ endpoints = make([]string, 3)
+ for i := range endpoints {
+ endpoints[i] = cluster.Client(i).Endpoints()[0]
+ }
+
+ v := m.Run()
+ cluster.Terminate(nil)
+ os.Exit(v)
+}
+
+// dut is the design under test harness - in this case, a curator instance.
+type dut struct {
+ // endpoint of the etcd server that this instance is connected to. Each instance
+ // connects to a different member of the etcd cluster so that we can easily
+ // inject partitions between curator instances.
+ endpoint string
+ // instance is the curator Service instance itself.
+ instance *Service
+
+ // temporary directory in which the Curator's ephemeral directory is placed.
+ // Needs to be cleaned up.
+ temporary string
+}
+
+func (d *dut) cleanup() {
+ os.RemoveAll(d.temporary)
+}
+
+// newDut creates a new dut harness for a curator instance, connected to a given
+// etcd endpoint.
+func newDut(ctx context.Context, t *testing.T, endpoint string) *dut {
+ t.Helper()
+ // Create new etcd client to the given endpoint.
+ cli, err := clientv3.New(clientv3.Config{
+ Endpoints: []string{endpoint},
+ DialTimeout: 1 * time.Second,
+ DialKeepAliveTime: 1 * time.Second,
+ DialKeepAliveTimeout: 1 * time.Second,
+ Context: ctx,
+ })
+ if err != nil {
+ t.Fatalf("clientv3.New: %v", err)
+ }
+
+ // Create ephemeral directory for curator and place it into /tmp.
+ dir := localstorage.EphemeralCuratorDirectory{}
+ tmp, err := ioutil.TempDir("", "curator-test-*")
+ if err != nil {
+ t.Fatalf("TempDir: %v", err)
+ }
+ err = declarative.PlaceFS(&dir, tmp)
+ if err != nil {
+ t.Fatalf("PlaceFS: %v", err)
+ }
+
+ id := fmt.Sprintf("test-%s", endpoint)
+ svc := New(Config{
+ Etcd: client.NewLocal(cli),
+ NodeID: id,
+ LeaderTTL: time.Second,
+ Directory: &dir,
+ })
+ if err := supervisor.Run(ctx, id, svc.Run); err != nil {
+ t.Fatalf("Run %s: %v", id, err)
+ }
+ return &dut{
+ endpoint: endpoint,
+ instance: svc,
+ temporary: tmp,
+ }
+}
+
+// dutSet is a collection of duts keyed by endpoint to which they're connected.
+// Since each dut is connected to a different endpoint in these tests, the
+// endpoint is used as a unique identifier for each dut/instance.
+type dutSet map[string]*dut
+
+// dutUpdate is an update from a dut's Curator instance - either a new
+// electionStatus or an error while retrieving it.
+type dutUpdate struct {
+ // endpoint to which this dut's Curator instance is connected.
+ endpoint string
+ // status received from the dut's Curator instance, or nil if err is set.
+ status *electionStatus
+ err error
+}
+
+// dutSetStatus is a point-in-time snapshot of the electionStatus of Curator
+// instances, keyed by endpoints in the same way as dutSet.
+type dutSetStatus map[string]*electionStatus
+
+// leaders returns a list of endpoints that currently see themselves as leaders.
+func (d dutSetStatus) leaders() []string {
+ var res []string
+ for e, s := range d {
+ if s.leader != nil {
+ res = append(res, e)
+ }
+ }
+ return res
+}
+
+// followers returns a list of endpoints that currently see themselves as
+// followers.
+func (d dutSetStatus) followers() []string {
+ var res []string
+ for e, s := range d {
+ if s.follower != nil {
+ res = append(res, e)
+ }
+ }
+ return res
+}
+
+// wait blocks until the dutSetStatus of a given dutSet reaches some state (as
+// implemented by predicate f).
+func (s dutSet) wait(ctx context.Context, f func(s dutSetStatus) bool) (dutSetStatus, error) {
+ ctx2, ctxC := context.WithCancel(ctx)
+ defer ctxC()
+
+ // dss is the dutSetStatus which we will keep updating with the electionStatus
+ // of each dut's Curator as long as predicate f returns false.
+ dss := make(dutSetStatus)
+
+ // updC is a channel of updates from all dut's electionStatus watchers. The
+ // dutUpdate type contains the endpoint to distinguish the source of each
+ // update.
+ updC := make(chan dutUpdate)
+
+ // Run a watcher for each dut which sends that dut's newest available
+ // electionStatus (or error) to updC.
+ for e, d := range s {
+ w := d.instance.electionWatch()
+ go func(e string, w electionWatcher) {
+ defer w.Close()
+ for {
+ s, err := w.get(ctx2)
+ if err != nil {
+ updC <- dutUpdate{
+ endpoint: e,
+ err: err,
+ }
+ return
+ }
+ updC <- dutUpdate{
+ endpoint: e,
+ status: s,
+ }
+ }
+ }(e, w)
+ }
+
+ // Keep updating dss with updates from updC and call f on every change.
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case u := <-updC:
+ if u.err != nil {
+ return nil, fmt.Errorf("from %q: %w", u.endpoint, u.err)
+ }
+ dss[u.endpoint] = u.status
+ }
+
+ if f(dss) {
+ return dss, nil
+ }
+ }
+}
+
+// TestLeaderElectionStatus exercises the electionStatus watch/get functionality
+// from the Curator code. It spawns a cluster of three curators and ensures all
+// of them respond correctly to election, partitioning and subsequent
+// re-election.
+func TestLeaderElectionStatus(t *testing.T) {
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ // Map from endpoint name to etcd member list index. Alongside with the
+ // endpoints list, this is used to quickly look up endpoint<->member_num. Since
+ // we only have one Curator instance per etcd member, we can use the instance's
+ // etcd endpoint as a unique key to identify it.
+ endpointToNum := map[string]int{
+ endpoints[0]: 0,
+ endpoints[1]: 1,
+ endpoints[2]: 2,
+ }
+
+ // Start a new supervisor in which we create all curator DUTs.
+ dutC := make(chan *dut)
+ supervisor.New(ctx, func(ctx context.Context) error {
+ for e, _ := range endpointToNum {
+ dutC <- newDut(ctx, t, e)
+ }
+ close(dutC)
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+ })
+
+ // Build dutSet, ie. map from endpoint to Curator DUT.
+ duts := make(dutSet)
+ for d := range dutC {
+ duts[d.endpoint] = d
+ }
+ // Schedule cleanup for all DUTs.
+ defer func() {
+ for _, dut := range duts {
+ dut.cleanup()
+ }
+ }()
+
+ // Wait until we have a Curator leader.
+ dss, err := duts.wait(ctx, func(dss dutSetStatus) bool {
+ return len(dss.leaders()) == 1 && len(dss.followers()) == 2
+ })
+ if err != nil {
+ t.Fatalf("waiting for dut set: %v", err)
+ }
+ leaderEndpoint := dss.leaders()[0]
+
+ // Retrieve key and rev from Curator's leader. We will later test to ensure
+ // these have changed when we switch to another leader and back.
+ key := dss[leaderEndpoint].leader.lockKey
+ rev := dss[leaderEndpoint].leader.lockRev
+ leaderNodeID := duts[leaderEndpoint].instance.config.NodeID
+ leaderNum := endpointToNum[leaderEndpoint]
+
+ // Ensure the leader/follower data in the electionStatus are as expected.
+ for endpoint, status := range dss {
+ if endpoint == leaderEndpoint {
+ // The leader instance should not also be a follower.
+ if status.follower != nil {
+ t.Errorf("leader cannot also be a follower")
+ }
+ } else {
+ // The follower instances should also not be leaders.
+ if status.leader != nil {
+ t.Errorf("instance %q is leader", endpoint)
+ }
+ follower := status.follower
+ if follower == nil {
+ t.Errorf("instance %q is not a follower", endpoint)
+ continue
+ }
+ // The follower instances should point to the leader in their seen lock.
+ if want, got := leaderNodeID, follower.lock.NodeId; want != got {
+ t.Errorf("instance %q sees node id %q as follower, wanted %q", endpoint, want, got)
+ }
+ }
+ }
+
+ // Partition off leader's etcd instance from other instances.
+ for n, member := range cluster.Members {
+ if n == leaderNum {
+ continue
+ }
+ cluster.Members[leaderNum].InjectPartition(t, member)
+ }
+
+ // Wait until we switch leaders
+ dss, err = duts.wait(ctx, func(dss dutSetStatus) bool {
+ // Ensure we've lost leadership on the initial leader.
+ if i, ok := dss[leaderEndpoint]; ok && i.leader != nil {
+ return false
+ }
+ return len(dss.leaders()) == 1 && len(dss.followers()) == 1
+ })
+ if err != nil {
+ t.Fatalf("waiting for dut set: %v", err)
+ }
+ newLeaderEndpoint := dss.leaders()[0]
+
+ // Ensure the old instance is neither leader nor follower (signaling loss of
+ // quorum).
+ if want, got := false, dss[leaderEndpoint].leader != nil; want != got {
+ t.Errorf("old leader's leadership is %v, wanted %v", want, got)
+ }
+ if want, got := false, dss[leaderEndpoint].follower != nil; want != got {
+ t.Errorf("old leader's followership is %v, wanted %v", want, got)
+ }
+
+ // Get new leader's key and rev.
+ newKey := dss[newLeaderEndpoint].leader.lockKey
+ newRev := dss[newLeaderEndpoint].leader.lockRev
+ newLeaderNodeID := duts[newLeaderEndpoint].instance.config.NodeID
+
+ if leaderEndpoint == newLeaderEndpoint {
+ t.Errorf("leader endpoint didn't change (%q -> %q)", leaderEndpoint, newLeaderEndpoint)
+ }
+ if key == newKey {
+ t.Errorf("leader election key didn't change (%q -> %q)", key, newKey)
+ }
+ if rev == newRev {
+ t.Errorf("leader election rev didn't change (%d -> %d)", rev, newRev)
+ }
+
+ // Ensure the last node of the cluster (not the current leader and not the
+ // previous leader) is now a follower pointing at the new leader.
+ for endpoint, status := range dss {
+ if endpoint == leaderEndpoint || endpoint == newLeaderEndpoint {
+ continue
+ }
+ follower := status.follower
+ if follower == nil {
+ t.Errorf("instance %q is not a follower", endpoint)
+ continue
+ }
+ if want, got := newLeaderNodeID, follower.lock.NodeId; want != got {
+ t.Errorf("instance %q sees node id %q as follower, wanted %q", endpoint, want, got)
+ }
+ }
+}
diff --git a/metropolis/node/core/curator/proto/private/BUILD.bazel b/metropolis/node/core/curator/proto/private/BUILD.bazel
new file mode 100644
index 0000000..d926d56
--- /dev/null
+++ b/metropolis/node/core/curator/proto/private/BUILD.bazel
@@ -0,0 +1,23 @@
+load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
+
+proto_library(
+ name = "private_proto",
+ srcs = ["lock.proto"],
+ visibility = ["//visibility:public"],
+)
+
+go_proto_library(
+ name = "private_go_proto",
+ importpath = "source.monogon.dev/metropolis/node/core/curator/proto/private",
+ proto = ":private_proto",
+ visibility = ["//visibility:public"],
+)
+
+go_library(
+ name = "go_default_library",
+ embed = [":private_go_proto"],
+ importpath = "source.monogon.dev/metropolis/node/core/curator/proto/private",
+ visibility = ["//visibility:public"],
+)
diff --git a/metropolis/node/core/curator/proto/private/lock.proto b/metropolis/node/core/curator/proto/private/lock.proto
new file mode 100644
index 0000000..e7d7c20
--- /dev/null
+++ b/metropolis/node/core/curator/proto/private/lock.proto
@@ -0,0 +1,16 @@
+syntax = "proto3";
+option go_package = "source.monogon.dev/metropolis/node/core/curator/proto/private";
+package metropolis.node.core.curator.proto.private;
+
+// LeaderElectionValue is the value set under an election key by the curator
+// leader. It is used by curator followers to be able to contact the current
+// leader and forward any client requests to it.
+message LeaderElectionValue {
+ // node_id is the ID of the node whose curator is acting as the leader.
+ string node_id = 1;
+ // ttl is the time-to-live set on the underlying session used for leader
+ // election. It is effectively an upper bound on how long the leader might
+ // be unavailable for until another curator instance can be elected as a
+ // replacement leader.
+ uint64 ttl = 2;
+}
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index eb4c6c7..d9f408e 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -28,12 +28,14 @@
"os"
"os/signal"
"runtime/debug"
+ "time"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/cluster"
+ "source.monogon.dev/metropolis/node/core/curator"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/localstorage/declarative"
"source.monogon.dev/metropolis/node/core/network"
@@ -165,6 +167,22 @@
return fmt.Errorf("new couldn't find home in new cluster, aborting: %w", err)
}
+ // Start cluster curator.
+ kv, err := status.ConsensusClient(cluster.ConsensusUserCurator)
+ if err != nil {
+ return fmt.Errorf("failed to retrieve consensus curator client: %w", err)
+ }
+ c := curator.New(curator.Config{
+ Etcd: kv,
+ NodeID: status.Node.ID(),
+ // TODO(q3k): make this configurable?
+ LeaderTTL: time.Second * 5,
+ Directory: &root.Ephemeral.Curator,
+ })
+ if err := supervisor.Run(ctx, "curator", c.Run); err != nil {
+ return fmt.Errorf("when starting curator: %w", err)
+ }
+
// We are now in a cluster. We can thus access our 'node' object and
// start all services that we should be running.