blob: 2a66e290e72d82885cc105773c6fbbd025c950f6 [file] [log] [blame]
Serge Bazanski3c885de2021-06-17 17:21:00 +02001package curator
2
3import (
Serge Bazanskif0b4da52021-06-21 20:05:59 +02004 "context"
5 "errors"
6 "fmt"
Serge Bazanski5a637b02022-02-18 12:18:04 +01007 "strings"
Serge Bazanski3be48322021-10-05 17:24:26 +02008 "sync"
Mateusz Zalega32b19292022-05-17 13:26:55 +02009 "time"
Serge Bazanskif0b4da52021-06-21 20:05:59 +020010
Lorenz Brund13c1c62022-03-30 19:58:58 +020011 clientv3 "go.etcd.io/etcd/client/v3"
Serge Bazanski3c885de2021-06-17 17:21:00 +020012 "google.golang.org/grpc/codes"
13 "google.golang.org/grpc/status"
14
Serge Bazanski5839e972021-11-16 15:46:19 +010015 "source.monogon.dev/metropolis/node/core/consensus"
Serge Bazanski3c885de2021-06-17 17:21:00 +020016 "source.monogon.dev/metropolis/node/core/consensus/client"
Serge Bazanski2f58ac02021-10-05 11:47:20 +020017 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski5a637b02022-02-18 12:18:04 +010018 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski3c885de2021-06-17 17:21:00 +020019)
20
Mateusz Zalega32b19292022-05-17 13:26:55 +020021// leaderState is the transient state of the Curator leader. All the
22// information kept inside is lost whenever another leader is elected.
23type leaderState struct {
24 // heartbeatTimestamps maps node IDs to monotonic clock timestamps matching
25 // the last corresponding node heartbeats received by the current Curator
26 // leader.
27 heartbeatTimestamps sync.Map
28
29 // startTs is a local monotonic clock timestamp associated with this node's
30 // assumption of Curator leadership.
31 startTs time.Time
Serge Bazanskibc739722023-03-28 20:12:01 +020032
33 // clusternetCache maps wireguard public keys (as strings) into node IDs. It is
34 // used to detect possibly re-used WireGuard public keys without having to get
35 // all nodes from etcd.
36 clusternetCache map[string]string
Mateusz Zalega32b19292022-05-17 13:26:55 +020037}
38
Serge Bazanski99f47742021-08-04 20:21:42 +020039// leadership represents the curator leader's ability to perform actions as a
40// leader. It is available to all services implemented by the leader.
41type leadership struct {
Serge Bazanskif0b4da52021-06-21 20:05:59 +020042 // lockKey is the etcd key which backs this leader-elected instance.
Serge Bazanski3c885de2021-06-17 17:21:00 +020043 lockKey string
Serge Bazanskif0b4da52021-06-21 20:05:59 +020044 // lockRev is the revision at which lockKey was created. The leader will use it
45 // in combination with lockKey to ensure all mutations/reads performed to etcd
46 // succeed only if this leader election is still current.
Serge Bazanski3c885de2021-06-17 17:21:00 +020047 lockRev int64
Serge Bazanski268dd8c2022-06-22 12:50:44 +020048 // leaderID is the node ID of this curator's node, ie. the one acting as a
49 // curator leader.
50 leaderID string
Serge Bazanskif0b4da52021-06-21 20:05:59 +020051 // etcd is the etcd client in which curator data and leader election state is
52 // stored.
53 etcd client.Namespaced
Serge Bazanski3be48322021-10-05 17:24:26 +020054
55 // muNodes guards any changes to nodes, and prevents race conditions where the
56 // curator performs a read-modify-write operation to node data. The curator's
57 // leadership ensure no two curators run simultaneously, and this lock ensures
58 // no two parallel curator operations race eachother.
59 //
60 // This lock has to be taken any time such RMW operation takes place when not
61 // additionally guarded using etcd transactions.
62 muNodes sync.Mutex
Serge Bazanski516d3002021-10-01 00:05:41 +020063
Serge Bazanski5839e972021-11-16 15:46:19 +010064 consensus consensus.ServiceHandle
65
Serge Bazanski516d3002021-10-01 00:05:41 +020066 // muRegisterTicket guards changes to the register ticket. Its usage semantics
67 // are the same as for muNodes, as described above.
68 muRegisterTicket sync.Mutex
Mateusz Zalega32b19292022-05-17 13:26:55 +020069
70 // ls contains the current leader's non-persistent local state.
71 ls leaderState
Serge Bazanskif0b4da52021-06-21 20:05:59 +020072}
73
74var (
75 // lostLeadership is returned by txnAsLeader if the transaction got canceled
76 // because leadership was lost.
77 lostLeadership = errors.New("lost leadership")
78)
79
80// txnAsLeader performs an etcd transaction guarded by continued leadership.
81// lostLeadership will be returned as an error in case the leadership is lost.
Serge Bazanski99f47742021-08-04 20:21:42 +020082func (l *leadership) txnAsLeader(ctx context.Context, ops ...clientv3.Op) (*clientv3.TxnResponse, error) {
Serge Bazanski5a637b02022-02-18 12:18:04 +010083 var opsStr []string
84 for _, op := range ops {
85 opstr := "unk"
86 switch {
87 case op.IsGet():
88 opstr = "get"
89 case op.IsDelete():
90 opstr = "delete"
91 case op.IsPut():
92 opstr = "put"
93 }
94 opsStr = append(opsStr, fmt.Sprintf("%s: %s", opstr, op.KeyBytes()))
95 }
96 rpc.Trace(ctx).Printf("txnAsLeader(%s)...", strings.Join(opsStr, ","))
Serge Bazanski99f47742021-08-04 20:21:42 +020097 resp, err := l.etcd.Txn(ctx).If(
98 clientv3.Compare(clientv3.CreateRevision(l.lockKey), "=", l.lockRev),
Serge Bazanskif0b4da52021-06-21 20:05:59 +020099 ).Then(ops...).Commit()
100 if err != nil {
Serge Bazanski5a637b02022-02-18 12:18:04 +0100101 rpc.Trace(ctx).Printf("txnAsLeader(...): failed: %v", err)
Serge Bazanskif0b4da52021-06-21 20:05:59 +0200102 return nil, fmt.Errorf("when running leader transaction: %w", err)
103 }
104 if !resp.Succeeded {
Serge Bazanski5a637b02022-02-18 12:18:04 +0100105 rpc.Trace(ctx).Printf("txnAsLeader(...): rejected (lost leadership)")
Serge Bazanskif0b4da52021-06-21 20:05:59 +0200106 return nil, lostLeadership
107 }
Serge Bazanski5a637b02022-02-18 12:18:04 +0100108 rpc.Trace(ctx).Printf("txnAsLeader(...): ok")
Serge Bazanskif0b4da52021-06-21 20:05:59 +0200109 return resp, nil
110}
111
112// rpcError attempts to convert a given error to a high-level error that can be
113// directly exposed to RPC clients. If false is returned, the error was not
114// converted and is returned verbatim.
115func rpcError(err error) (error, bool) {
116 if errors.Is(err, lostLeadership) {
117 return status.Error(codes.Unavailable, "lost leadership"), true
118 }
Serge Bazanskibc7614e2021-09-09 13:07:09 +0200119 if errors.Is(err, context.DeadlineExceeded) {
120 return status.Error(codes.DeadlineExceeded, err.Error()), true
121 }
122 if errors.Is(err, context.Canceled) {
123 return status.Error(codes.Canceled, err.Error()), true
124 }
Serge Bazanskif0b4da52021-06-21 20:05:59 +0200125 return err, false
Serge Bazanski3c885de2021-06-17 17:21:00 +0200126}
127
Serge Bazanski99f47742021-08-04 20:21:42 +0200128// curatorLeader implements the curator acting as the elected leader of a
129// cluster. It performs direct reads/writes from/to etcd as long as it remains
130// leader.
131//
132// Its made up of different subcomponents implementing gRPC services, each of
133// which has access to the leadership structure.
134type curatorLeader struct {
135 leaderCurator
Serge Bazanski41d275a2021-08-17 13:09:43 +0200136 leaderAAA
Serge Bazanski6bd41592021-08-23 13:18:37 +0200137 leaderManagement
Serge Bazanski99f47742021-08-04 20:21:42 +0200138}
Serge Bazanskif0b4da52021-06-21 20:05:59 +0200139
Serge Bazanski2f58ac02021-10-05 11:47:20 +0200140func newCuratorLeader(l *leadership, node *identity.Node) *curatorLeader {
Mateusz Zalega32b19292022-05-17 13:26:55 +0200141 // Mark the start of this leader's tenure.
142 l.ls.startTs = time.Now()
143
Serge Bazanski99f47742021-08-04 20:21:42 +0200144 return &curatorLeader{
Serge Bazanski2893e982021-09-09 13:06:16 +0200145 leaderCurator{leadership: l},
146 leaderAAA{leadership: l},
Serge Bazanski2f58ac02021-10-05 11:47:20 +0200147 leaderManagement{leadership: l, node: node},
Serge Bazanskif0b4da52021-06-21 20:05:59 +0200148 }
Serge Bazanski3c885de2021-06-17 17:21:00 +0200149}