blob: d6894a26d8a0c54c7b1c7a0ded6c986427cad49e [file] [log] [blame]
package curator
import (
"context"
"errors"
"fmt"
"strings"
"sync"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
)
// leadership represents the curator leader's ability to perform actions as a
// leader. It is available to all services implemented by the leader.
type leadership struct {
// lockKey is the etcd key which backs this leader-elected instance.
lockKey string
// lockRev is the revision at which lockKey was created. The leader will use it
// in combination with lockKey to ensure all mutations/reads performed to etcd
// succeed only if this leader election is still current.
lockRev int64
// etcd is the etcd client in which curator data and leader election state is
// stored.
etcd client.Namespaced
// muNodes guards any changes to nodes, and prevents race conditions where the
// curator performs a read-modify-write operation to node data. The curator's
// leadership ensure no two curators run simultaneously, and this lock ensures
// no two parallel curator operations race eachother.
//
// This lock has to be taken any time such RMW operation takes place when not
// additionally guarded using etcd transactions.
muNodes sync.Mutex
consensus consensus.ServiceHandle
// muRegisterTicket guards changes to the register ticket. Its usage semantics
// are the same as for muNodes, as described above.
muRegisterTicket sync.Mutex
}
var (
// lostLeadership is returned by txnAsLeader if the transaction got canceled
// because leadership was lost.
lostLeadership = errors.New("lost leadership")
)
// txnAsLeader performs an etcd transaction guarded by continued leadership.
// lostLeadership will be returned as an error in case the leadership is lost.
func (l *leadership) txnAsLeader(ctx context.Context, ops ...clientv3.Op) (*clientv3.TxnResponse, error) {
var opsStr []string
for _, op := range ops {
opstr := "unk"
switch {
case op.IsGet():
opstr = "get"
case op.IsDelete():
opstr = "delete"
case op.IsPut():
opstr = "put"
}
opsStr = append(opsStr, fmt.Sprintf("%s: %s", opstr, op.KeyBytes()))
}
rpc.Trace(ctx).Printf("txnAsLeader(%s)...", strings.Join(opsStr, ","))
resp, err := l.etcd.Txn(ctx).If(
clientv3.Compare(clientv3.CreateRevision(l.lockKey), "=", l.lockRev),
).Then(ops...).Commit()
if err != nil {
rpc.Trace(ctx).Printf("txnAsLeader(...): failed: %v", err)
return nil, fmt.Errorf("when running leader transaction: %w", err)
}
if !resp.Succeeded {
rpc.Trace(ctx).Printf("txnAsLeader(...): rejected (lost leadership)")
return nil, lostLeadership
}
rpc.Trace(ctx).Printf("txnAsLeader(...): ok")
return resp, nil
}
// rpcError attempts to convert a given error to a high-level error that can be
// directly exposed to RPC clients. If false is returned, the error was not
// converted and is returned verbatim.
func rpcError(err error) (error, bool) {
if errors.Is(err, lostLeadership) {
return status.Error(codes.Unavailable, "lost leadership"), true
}
if errors.Is(err, context.DeadlineExceeded) {
return status.Error(codes.DeadlineExceeded, err.Error()), true
}
if errors.Is(err, context.Canceled) {
return status.Error(codes.Canceled, err.Error()), true
}
return err, false
}
// curatorLeader implements the curator acting as the elected leader of a
// cluster. It performs direct reads/writes from/to etcd as long as it remains
// leader.
//
// Its made up of different subcomponents implementing gRPC services, each of
// which has access to the leadership structure.
type curatorLeader struct {
leaderCurator
leaderAAA
leaderManagement
}
func newCuratorLeader(l *leadership, node *identity.Node) *curatorLeader {
return &curatorLeader{
leaderCurator{leadership: l},
leaderAAA{leadership: l},
leaderManagement{leadership: l, node: node},
}
}