blob: 336b7d6ea43245d881494697894cac8f4a6973e7 [file] [log] [blame]
// package curator implements the Curator, a service responsible for management
// of the Metropolis cluster that it is running on.
//
// The Curator is implemented as a leader-elected service. Instances of the
// service are running colocated with all nodes that run a consensus (etcd)
// server.
//
// Each instance listens on all network interfaces, for requests both from the
// code running on the same node, for traffic from other nodes (eg. ones that do
// not run an instance of the Curator) and from external users.
//
// The curator leader keeps its state fully in etcd. Followers forward all
// requests to the active leader.
package curator
import (
"context"
"errors"
"fmt"
"time"
"go.etcd.io/etcd/client/v3/concurrency"
"google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node/core/consensus"
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
)
// Config is the configuration of the curator.
type Config struct {
// NodeCredentials are the identity credentials for the node that is running
// this curator.
NodeCredentials *identity.NodeCredentials
Consensus consensus.ServiceHandle
// LeaderTTL is the timeout on the lease used to perform leader election.
// Any active leader must continue updating its lease at least this often,
// or the lease (and leadership) will be lost.
// Lower values allow for faster failovers. Higher values allow for higher
// resiliency against short network partitions.
// A value less or equal to zero will default to 60 seconds.
LeaderTTL time.Duration
}
// Service is the Curator service. See the package-level documentation for more
// information.
type Service struct {
// config is the configuration with which the service was started.
config *Config
// ttl is the effective TTL value from Config.LeaderTTL (if given as <= 0,
// this is the value that has been fixed up to some default).
ttl int
// status is a memory Event Value for keeping the electionStatus of this
// instance. It is not exposed to users of the Curator.
status memory.Value[*electionStatus]
}
// New creates a new curator Service.
func New(cfg Config) *Service {
return &Service{
config: &cfg,
}
}
// electionStatus represents the status of this curator's leader election
// attempts within the cluster.
type electionStatus struct {
// leader is set if this curator is a leader, nil otherwise. This cannot be set
// if follower is also set, both leader and follower might be nil to signify
// that this curator instance is not part of a quorum.
leader *electionStatusLeader
// follower is set if the curator is a follower for another leader, nil
// otherwise. This cannot be set if leader is also set. However, both leader and
// follower might be nil to signify that this curator instance is not part of a
// quorum.
follower *electionStatusFollower
}
type electionStatusLeader struct {
// lockKey is the etcd key for which a lease value is set with revision
// lockRev. This key/revision must be ensured to exist when any etcd access
// is performed by the curator to ensure that it is still the active leader
// according to the rest of the cluster.
lockKey string
lockRev int64
}
type electionStatusFollower struct {
lock *ppb.LeaderElectionValue
}
// buildLockValue returns a serialized etcd value that will be set by the
// instance when it becomes a leader. This value is a serialized
// LeaderElectionValue from private/storage.proto.
func (c *Config) buildLockValue(ttl int) ([]byte, error) {
v := &ppb.LeaderElectionValue{
NodeId: c.NodeCredentials.ID(),
Ttl: uint64(ttl),
}
bytes, err := proto.Marshal(v)
if err != nil {
return nil, fmt.Errorf("when marshaling value: %w", err)
}
return bytes, nil
}
var (
// electionPrefix is the prefix under which the curator instances will
// attempt to perform leader election.
//
// The trailing slash is omitted, as the etcd concurrency library appends one.
electionPrefix = "/leader"
)
// elect runs a single leader election attempt. The status of the service will
// be updated with electionStatus values as the election makes progress.
func (s *Service) elect(ctx context.Context) error {
lv, err := s.config.buildLockValue(s.ttl)
if err != nil {
return fmt.Errorf("building lock value failed: %w", err)
}
w := s.config.Consensus.Watch()
defer w.Close()
st, err := w.Get(ctx)
if err != nil {
return fmt.Errorf("getting consensus status failed: %w", err)
}
cl, err := st.CuratorClient()
if err != nil {
return fmt.Errorf("getting consensus client failed: %w", err)
}
// Establish a lease/session with etcd.
session, err := concurrency.NewSession(cl.ThinClient(ctx),
concurrency.WithContext(ctx),
concurrency.WithTTL(s.ttl))
if err != nil {
return fmt.Errorf("creating session failed: %w", err)
}
// Kill the session whenever we lose leadership or error out.
defer func() {
err := session.Close()
if err != nil {
supervisor.Logger(ctx).Warningf("Failed to close session: %v", err)
}
}()
supervisor.Logger(ctx).Infof("Curator established lease, ID: %d", session.Lease())
election := concurrency.NewElection(session, electionPrefix)
// Observer context, we need to cancel it to not leak the observer
// goroutine/channel.
octx, octxC := context.WithCancel(ctx)
defer octxC()
// Channel that gets updates about the current leader in the cluster.
observerC := election.Observe(octx)
// Channel that gets updates about this instance becoming a leader.
campaignerC := make(chan error)
// Campaign to become leader. This blocks until leader election is successful
// and this instance is now the leader.
//
// The lock value is converted to string from raw binary bytes, but that's fine
// as that string is converted back to []byte within the etcd client library (in
// OpPut).
go func() {
campaignerC <- election.Campaign(ctx, string(lv))
}()
// While campaigning, update the electionStatus with information about the
// current leader.
for {
select {
case o := <-observerC:
var lock ppb.LeaderElectionValue
if err := proto.Unmarshal(o.Kvs[0].Value, &lock); err != nil {
return fmt.Errorf("parsing existing lock value failed: %w", err)
}
s.status.Set(&electionStatus{
follower: &electionStatusFollower{
lock: &lock,
},
})
case err = <-campaignerC:
if err == nil {
goto campaigned
}
return fmt.Errorf("campaigning failed: %w", err)
}
}
campaigned:
supervisor.Logger(ctx).Info("Curator became leader.")
// Update status, watchers will now know that this curator is the leader.
s.status.Set(&electionStatus{
leader: &electionStatusLeader{
lockKey: election.Key(),
lockRev: election.Rev(),
},
})
// Wait until either we loose the lease/session or our context expires.
select {
case <-ctx.Done():
supervisor.Logger(ctx).Warningf("Context canceled, quitting.")
return fmt.Errorf("curator session canceled: %w", ctx.Err())
case <-session.Done():
supervisor.Logger(ctx).Warningf("Session done, quitting.")
return fmt.Errorf("curator session done")
}
}
func (s *Service) Run(ctx context.Context) error {
// Start local election watcher. This logs what this curator knows about its own
// leadership.
go func() {
w := s.status.Watch()
for {
s, err := w.Get(ctx)
if err != nil {
supervisor.Logger(ctx).Warningf("Election watcher exiting: get(): %v", err)
return
}
if l := s.leader; l != nil {
supervisor.Logger(ctx).Infof("Election watcher: this node's curator is leader (lock key %q, rev %d)", l.lockKey, l.lockRev)
} else {
supervisor.Logger(ctx).Infof("Election watcher: this node's curator is a follower")
}
}
}()
supervisor.Logger(ctx).Infof("Waiting for consensus...")
w := s.config.Consensus.Watch()
defer w.Close()
st, err := w.Get(ctx, consensus.FilterRunning)
if err != nil {
return fmt.Errorf("while waiting for consensus: %w", err)
}
supervisor.Logger(ctx).Infof("Got consensus, starting up...")
etcd, err := st.CuratorClient()
if err != nil {
return fmt.Errorf("while retrieving consensus client: %w", err)
}
// Start listener. This is a gRPC service listening on all interfaces, providing
// the Curator API to consumers, dispatching to either a locally running leader,
// or forwarding to a remotely running leader.
lis := listener{
node: s.config.NodeCredentials,
electionWatch: s.status.Watch,
consensus: s.config.Consensus,
etcd: etcd,
}
if err := supervisor.Run(ctx, "listener", lis.run); err != nil {
return fmt.Errorf("when starting listener: %w", err)
}
// Calculate effective TTL. This replicates the behaviour of clientv3's WithTTL,
// but allows us to explicitly log the used TTL.
s.ttl = int(s.config.LeaderTTL.Seconds())
if s.ttl <= 0 {
s.ttl = 10
}
supervisor.Logger(ctx).Infof("Curator starting on prefix %q with lease TTL of %d seconds...", electionPrefix, s.ttl)
supervisor.Signal(ctx, supervisor.SignalHealthy)
for {
s.status.Set(&electionStatus{})
err := s.elect(ctx)
s.status.Set(&electionStatus{})
if err != nil && errors.Is(err, ctx.Err()) {
return fmt.Errorf("election round failed due to context cancelation, not attempting to re-elect: %w", err)
}
supervisor.Logger(ctx).Infof("Curator election round done: %v", err)
supervisor.Logger(ctx).Info("Curator election restarting...")
}
}