| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 1 | package curator |
| 2 | |
| 3 | import ( |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 4 | "context" |
| 5 | "errors" |
| 6 | "fmt" |
| Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 7 | "strings" |
| Serge Bazanski | 3be4832 | 2021-10-05 17:24:26 +0200 | [diff] [blame] | 8 | "sync" |
| Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 9 | "time" |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 10 | |
| Lorenz Brun | d13c1c6 | 2022-03-30 19:58:58 +0200 | [diff] [blame] | 11 | clientv3 "go.etcd.io/etcd/client/v3" |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 12 | "google.golang.org/grpc/codes" |
| 13 | "google.golang.org/grpc/status" |
| 14 | |
| Serge Bazanski | 5839e97 | 2021-11-16 15:46:19 +0100 | [diff] [blame] | 15 | "source.monogon.dev/metropolis/node/core/consensus" |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 16 | "source.monogon.dev/metropolis/node/core/consensus/client" |
| Serge Bazanski | 2f58ac0 | 2021-10-05 11:47:20 +0200 | [diff] [blame] | 17 | "source.monogon.dev/metropolis/node/core/identity" |
| Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 18 | "source.monogon.dev/metropolis/node/core/rpc" |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 19 | ) |
| 20 | |
| Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 21 | // leaderState is the transient state of the Curator leader. All the |
| 22 | // information kept inside is lost whenever another leader is elected. |
| 23 | type leaderState struct { |
| 24 | // heartbeatTimestamps maps node IDs to monotonic clock timestamps matching |
| 25 | // the last corresponding node heartbeats received by the current Curator |
| 26 | // leader. |
| 27 | heartbeatTimestamps sync.Map |
| 28 | |
| 29 | // startTs is a local monotonic clock timestamp associated with this node's |
| 30 | // assumption of Curator leadership. |
| 31 | startTs time.Time |
| Serge Bazanski | bc73972 | 2023-03-28 20:12:01 +0200 | [diff] [blame] | 32 | |
| 33 | // clusternetCache maps wireguard public keys (as strings) into node IDs. It is |
| 34 | // used to detect possibly re-used WireGuard public keys without having to get |
| 35 | // all nodes from etcd. |
| 36 | clusternetCache map[string]string |
| Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 37 | } |
| 38 | |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 39 | // leadership represents the curator leader's ability to perform actions as a |
| 40 | // leader. It is available to all services implemented by the leader. |
| 41 | type leadership struct { |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 42 | // lockKey is the etcd key which backs this leader-elected instance. |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 43 | lockKey string |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 44 | // lockRev is the revision at which lockKey was created. The leader will use it |
| 45 | // in combination with lockKey to ensure all mutations/reads performed to etcd |
| 46 | // succeed only if this leader election is still current. |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 47 | lockRev int64 |
| Serge Bazanski | 268dd8c | 2022-06-22 12:50:44 +0200 | [diff] [blame] | 48 | // leaderID is the node ID of this curator's node, ie. the one acting as a |
| 49 | // curator leader. |
| 50 | leaderID string |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 51 | // etcd is the etcd client in which curator data and leader election state is |
| 52 | // stored. |
| 53 | etcd client.Namespaced |
| Serge Bazanski | 3be4832 | 2021-10-05 17:24:26 +0200 | [diff] [blame] | 54 | |
| 55 | // muNodes guards any changes to nodes, and prevents race conditions where the |
| 56 | // curator performs a read-modify-write operation to node data. The curator's |
| 57 | // leadership ensure no two curators run simultaneously, and this lock ensures |
| 58 | // no two parallel curator operations race eachother. |
| 59 | // |
| 60 | // This lock has to be taken any time such RMW operation takes place when not |
| 61 | // additionally guarded using etcd transactions. |
| 62 | muNodes sync.Mutex |
| Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 63 | |
| Serge Bazanski | 5839e97 | 2021-11-16 15:46:19 +0100 | [diff] [blame] | 64 | consensus consensus.ServiceHandle |
| 65 | |
| Serge Bazanski | 516d300 | 2021-10-01 00:05:41 +0200 | [diff] [blame] | 66 | // muRegisterTicket guards changes to the register ticket. Its usage semantics |
| 67 | // are the same as for muNodes, as described above. |
| 68 | muRegisterTicket sync.Mutex |
| Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 69 | |
| 70 | // ls contains the current leader's non-persistent local state. |
| 71 | ls leaderState |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 72 | } |
| 73 | |
| 74 | var ( |
| 75 | // lostLeadership is returned by txnAsLeader if the transaction got canceled |
| 76 | // because leadership was lost. |
| 77 | lostLeadership = errors.New("lost leadership") |
| 78 | ) |
| 79 | |
| 80 | // txnAsLeader performs an etcd transaction guarded by continued leadership. |
| 81 | // lostLeadership will be returned as an error in case the leadership is lost. |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 82 | func (l *leadership) txnAsLeader(ctx context.Context, ops ...clientv3.Op) (*clientv3.TxnResponse, error) { |
| Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 83 | var opsStr []string |
| 84 | for _, op := range ops { |
| 85 | opstr := "unk" |
| 86 | switch { |
| 87 | case op.IsGet(): |
| 88 | opstr = "get" |
| 89 | case op.IsDelete(): |
| 90 | opstr = "delete" |
| 91 | case op.IsPut(): |
| 92 | opstr = "put" |
| 93 | } |
| 94 | opsStr = append(opsStr, fmt.Sprintf("%s: %s", opstr, op.KeyBytes())) |
| 95 | } |
| 96 | rpc.Trace(ctx).Printf("txnAsLeader(%s)...", strings.Join(opsStr, ",")) |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 97 | resp, err := l.etcd.Txn(ctx).If( |
| 98 | clientv3.Compare(clientv3.CreateRevision(l.lockKey), "=", l.lockRev), |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 99 | ).Then(ops...).Commit() |
| 100 | if err != nil { |
| Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 101 | rpc.Trace(ctx).Printf("txnAsLeader(...): failed: %v", err) |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 102 | return nil, fmt.Errorf("when running leader transaction: %w", err) |
| 103 | } |
| 104 | if !resp.Succeeded { |
| Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 105 | rpc.Trace(ctx).Printf("txnAsLeader(...): rejected (lost leadership)") |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 106 | return nil, lostLeadership |
| 107 | } |
| Serge Bazanski | 5a637b0 | 2022-02-18 12:18:04 +0100 | [diff] [blame] | 108 | rpc.Trace(ctx).Printf("txnAsLeader(...): ok") |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 109 | return resp, nil |
| 110 | } |
| 111 | |
| 112 | // rpcError attempts to convert a given error to a high-level error that can be |
| 113 | // directly exposed to RPC clients. If false is returned, the error was not |
| 114 | // converted and is returned verbatim. |
| 115 | func rpcError(err error) (error, bool) { |
| 116 | if errors.Is(err, lostLeadership) { |
| 117 | return status.Error(codes.Unavailable, "lost leadership"), true |
| 118 | } |
| Serge Bazanski | bc7614e | 2021-09-09 13:07:09 +0200 | [diff] [blame] | 119 | if errors.Is(err, context.DeadlineExceeded) { |
| 120 | return status.Error(codes.DeadlineExceeded, err.Error()), true |
| 121 | } |
| 122 | if errors.Is(err, context.Canceled) { |
| 123 | return status.Error(codes.Canceled, err.Error()), true |
| 124 | } |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 125 | return err, false |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 126 | } |
| 127 | |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 128 | // curatorLeader implements the curator acting as the elected leader of a |
| 129 | // cluster. It performs direct reads/writes from/to etcd as long as it remains |
| 130 | // leader. |
| 131 | // |
| 132 | // Its made up of different subcomponents implementing gRPC services, each of |
| 133 | // which has access to the leadership structure. |
| 134 | type curatorLeader struct { |
| 135 | leaderCurator |
| Serge Bazanski | 41d275a | 2021-08-17 13:09:43 +0200 | [diff] [blame] | 136 | leaderAAA |
| Serge Bazanski | 6bd4159 | 2021-08-23 13:18:37 +0200 | [diff] [blame] | 137 | leaderManagement |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 138 | } |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 139 | |
| Serge Bazanski | 2f58ac0 | 2021-10-05 11:47:20 +0200 | [diff] [blame] | 140 | func newCuratorLeader(l *leadership, node *identity.Node) *curatorLeader { |
| Mateusz Zalega | 32b1929 | 2022-05-17 13:26:55 +0200 | [diff] [blame] | 141 | // Mark the start of this leader's tenure. |
| 142 | l.ls.startTs = time.Now() |
| 143 | |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 144 | return &curatorLeader{ |
| Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 145 | leaderCurator{leadership: l}, |
| 146 | leaderAAA{leadership: l}, |
| Serge Bazanski | 2f58ac0 | 2021-10-05 11:47:20 +0200 | [diff] [blame] | 147 | leaderManagement{leadership: l, node: node}, |
| Serge Bazanski | f0b4da5 | 2021-06-21 20:05:59 +0200 | [diff] [blame] | 148 | } |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 149 | } |