Serge Bazanski | c89df2f | 2021-04-27 15:51:37 +0200 | [diff] [blame] | 1 | package etcd |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "errors" |
| 6 | "fmt" |
| 7 | "sync" |
| 8 | |
| 9 | "github.com/cenkalti/backoff/v4" |
| 10 | "go.etcd.io/etcd/clientv3" |
| 11 | |
| 12 | "source.monogon.dev/metropolis/node/core/consensus/client" |
| 13 | "source.monogon.dev/metropolis/pkg/event" |
| 14 | ) |
| 15 | |
| 16 | var ( |
| 17 | // Type assert that *Value implements event.Value. We do this artificially, |
| 18 | // as there currently is no code path that needs this to be strictly true. |
| 19 | // However, users of this library might want to rely on the Value type |
| 20 | // instead of particular Value implementations. |
| 21 | _ event.Value = &Value{} |
| 22 | ) |
| 23 | |
| 24 | // Value is an 'Event Value' backed in an etcd cluster, accessed over an |
| 25 | // etcd client. This is a stateless handle and can be copied and shared across |
| 26 | // goroutines. |
| 27 | type Value struct { |
| 28 | etcd client.Namespaced |
| 29 | key string |
| 30 | decoder BytesDecoder |
| 31 | } |
| 32 | |
| 33 | // NewValue creates a new Value for a given key in an etcd client. The |
| 34 | // given decoder will be used to convert bytes retrieved from etcd into the |
| 35 | // interface{} value retrieved by Get by this value's watcher. |
| 36 | func NewValue(etcd client.Namespaced, key string, decoder BytesDecoder) *Value { |
| 37 | return &Value{ |
| 38 | etcd: etcd, |
| 39 | key: key, |
| 40 | decoder: decoder, |
| 41 | } |
| 42 | } |
| 43 | |
| 44 | // BytesDecoder is a function that converts bytes retrieved from etcd into an |
| 45 | // end-user facing value. If an error is returned, the Get call performed on a |
| 46 | // watcher configured with this decoder will fail, swallowing that particular |
| 47 | // update, but the watcher will continue to work. |
| 48 | // Any provided BytesDecoder implementations must be safe to copy. |
| 49 | type BytesDecoder = func(data []byte) (interface{}, error) |
| 50 | |
| 51 | // NoDecoder is a no-op decoder which passes through the retrieved bytes as a |
| 52 | // []byte type to the user. |
| 53 | func NoDecoder(data []byte) (interface{}, error) { |
| 54 | return data, nil |
| 55 | } |
| 56 | |
| 57 | func (e *Value) Set(val interface{}) { |
| 58 | // TODO(q3k): split up the Value interface into ValueReader/ValueWriter |
| 59 | panic("Set is unimplemented on etcd values") |
| 60 | } |
| 61 | |
| 62 | func (e *Value) Watch() event.Watcher { |
| 63 | ctx, ctxC := context.WithCancel(context.Background()) |
| 64 | return &watcher{ |
| 65 | Value: *e, |
| 66 | |
| 67 | ctx: ctx, |
| 68 | ctxC: ctxC, |
| 69 | |
| 70 | getSem: make(chan struct{}, 1), |
| 71 | } |
| 72 | } |
| 73 | |
| 74 | type watcher struct { |
| 75 | // Value copy, used to configure the behaviour of this watcher. |
| 76 | Value |
| 77 | |
| 78 | // ctx is the context that expresses the liveness of this watcher. It is |
| 79 | // canceled when the watcher is closed, and the etcd Watch hangs off of it. |
| 80 | ctx context.Context |
| 81 | ctxC context.CancelFunc |
| 82 | |
| 83 | // getSem is a semaphore used to limit concurrent Get calls and throw an |
| 84 | // error if concurrent access is attempted. |
| 85 | getSem chan struct{} |
| 86 | |
| 87 | // backlogged is the value retrieved by an initial KV Get from etcd that |
| 88 | // should be returned at the next opportunity, or nil if there isn't any. |
| 89 | backlogged *[]byte |
| 90 | // prev is the revision of a previously retrieved value within this |
| 91 | // watcher, or nil if there hasn't been any. |
| 92 | prev *int64 |
| 93 | // wc is the etcd watch channel, or nil if no channel is yet open. |
| 94 | wc clientv3.WatchChan |
| 95 | |
| 96 | // testRaceWG is an optional WaitGroup that, if set, will be waited upon |
| 97 | // after the initial KV value retrieval, but before the watch is created. |
| 98 | // This is only used for testing. |
| 99 | testRaceWG *sync.WaitGroup |
| 100 | // testSetupWG is an optional WaitGroup that, if set, will be waited upon |
| 101 | // after the etcd watch is created. |
| 102 | // This is only used for testing. |
| 103 | testSetupWG *sync.WaitGroup |
| 104 | } |
| 105 | |
| 106 | func (w *watcher) setup(ctx context.Context) error { |
| 107 | if w.wc != nil { |
| 108 | return nil |
| 109 | } |
| 110 | |
| 111 | // First, check if some data under this key already exists. |
| 112 | // We use an exponential backoff and retry here as the initial Get can fail |
| 113 | // if the cluster is unstable (eg. failing over). We only fail the retry if |
| 114 | // the context expires. |
| 115 | bo := backoff.NewExponentialBackOff() |
| 116 | bo.MaxElapsedTime = 0 |
| 117 | err := backoff.Retry(func() error { |
| 118 | get, err := w.etcd.Get(ctx, w.key) |
| 119 | if err != nil { |
| 120 | return fmt.Errorf("when retrieving initial value: %w", err) |
| 121 | } |
| 122 | w.prev = &get.Header.Revision |
| 123 | if len(get.Kvs) != 0 { |
| 124 | // Assert that the etcd API is behaving as expected. |
| 125 | if len(get.Kvs) > 1 { |
| 126 | panic("More than one key returned in unary GET response") |
| 127 | } |
| 128 | // If an existing value is present, backlog it and set the prev value |
| 129 | // accordingly. |
| 130 | kv := get.Kvs[0] |
| 131 | w.backlogged = &kv.Value |
| 132 | } else { |
| 133 | w.backlogged = nil |
| 134 | } |
| 135 | return nil |
| 136 | |
| 137 | }, backoff.WithContext(bo, ctx)) |
| 138 | |
| 139 | if w.testRaceWG != nil { |
| 140 | w.testRaceWG.Wait() |
| 141 | } |
| 142 | if err != nil { |
| 143 | return err |
| 144 | } |
| 145 | |
| 146 | var watchOpts []clientv3.OpOption |
| 147 | if w.prev != nil { |
| 148 | watchOpts = append(watchOpts, clientv3.WithRev(*w.prev+1)) |
| 149 | } else { |
| 150 | } |
| 151 | w.wc = w.etcd.Watch(w.ctx, w.key, watchOpts...) |
| 152 | |
| 153 | if w.testSetupWG != nil { |
| 154 | w.testSetupWG.Wait() |
| 155 | } |
| 156 | return nil |
| 157 | } |
| 158 | |
| 159 | func (w *watcher) get(ctx context.Context) ([]byte, error) { |
| 160 | // Return backlogged value, if present. |
| 161 | if w.backlogged != nil { |
| 162 | value := *w.backlogged |
| 163 | w.backlogged = nil |
| 164 | return value, nil |
| 165 | } |
| 166 | |
| 167 | // Keep watching for a watch event. |
| 168 | var event *clientv3.Event |
| 169 | for { |
| 170 | var resp *clientv3.WatchResponse |
| 171 | select { |
| 172 | case r := <-w.wc: |
| 173 | resp = &r |
| 174 | case <-ctx.Done(): |
| 175 | return nil, ctx.Err() |
| 176 | } |
| 177 | |
| 178 | if resp.Canceled { |
| 179 | // Only allow for watches to be canceled due to context |
| 180 | // cancellations. Any other error is something we need to handle, |
| 181 | // eg. a client close or compaction error. |
| 182 | if errors.Is(resp.Err(), ctx.Err()) { |
| 183 | return nil, fmt.Errorf("watch canceled: %w", resp.Err()) |
| 184 | } |
| 185 | |
| 186 | // Attempt to reconnect. |
| 187 | w.wc = nil |
| 188 | w.setup(ctx) |
| 189 | continue |
| 190 | } |
| 191 | |
| 192 | if len(resp.Events) < 1 { |
| 193 | continue |
| 194 | } |
| 195 | |
| 196 | event = resp.Events[len(resp.Events)-1] |
| 197 | break |
| 198 | } |
| 199 | |
| 200 | w.prev = &event.Kv.ModRevision |
| 201 | // Return deletions as nil, and new values as their content. |
| 202 | switch event.Type { |
| 203 | case clientv3.EventTypeDelete: |
| 204 | return nil, nil |
| 205 | case clientv3.EventTypePut: |
| 206 | return event.Kv.Value, nil |
| 207 | default: |
| 208 | return nil, fmt.Errorf("invalid event type %v", event.Type) |
| 209 | } |
| 210 | } |
| 211 | |
| 212 | // Get implements the Get method of the Watcher interface. |
| 213 | // It can return an error in two cases: |
| 214 | // - the given context is canceled (in which case, the given error will wrap |
| 215 | // the context error) |
| 216 | // - the watcher's BytesDecoder returned an error (in which case the error |
| 217 | // returned by the BytesDecoder will be returned verbatim) |
| 218 | // Note that transient and permanent etcd errors are never returned, and the |
| 219 | // Get call will attempt to recover from these errors as much as possible. This |
| 220 | // also means that the user of the Watcher will not be notified if the |
| 221 | // underlying etcd client disconnects from the cluster, or if the cluster loses |
| 222 | // quorum. |
| 223 | // TODO(q3k): implement leases to allow clients to be notified when there are |
| 224 | // transient cluster/quorum/partition errors, if needed. |
| 225 | // TODO(q3k): implement internal, limited buffering for backlogged data not yet |
| 226 | // consumed by client, as etcd client library seems to use an unbound buffer in |
| 227 | // case this happens ( see: watcherStream.buf in clientv3). |
| 228 | func (w *watcher) Get(ctx context.Context) (interface{}, error) { |
| 229 | select { |
| 230 | case w.getSem <- struct{}{}: |
| 231 | default: |
| 232 | return nil, fmt.Errorf("cannot Get() concurrently on a single waiter") |
| 233 | } |
| 234 | defer func() { |
| 235 | <-w.getSem |
| 236 | }() |
| 237 | |
| 238 | // Early check for context cancelations, preventing spurious contact with etcd |
| 239 | // if there's no need to. |
| 240 | if w.ctx.Err() != nil { |
| 241 | return nil, w.ctx.Err() |
| 242 | } |
| 243 | |
| 244 | if err := w.setup(ctx); err != nil { |
| 245 | return nil, fmt.Errorf("when setting up watcher: %w", err) |
| 246 | } |
| 247 | |
| 248 | value, err := w.get(ctx) |
| 249 | if err != nil { |
| 250 | return nil, fmt.Errorf("when watching for new value: %w", err) |
| 251 | } |
| 252 | return w.decoder(value) |
| 253 | } |
| 254 | |
| 255 | func (w *watcher) Close() error { |
| 256 | w.ctxC() |
| 257 | return nil |
| 258 | } |