|  | package etcd | 
|  |  | 
|  | import ( | 
|  | "bytes" | 
|  | "context" | 
|  | "errors" | 
|  | "fmt" | 
|  | "sync" | 
|  |  | 
|  | "github.com/cenkalti/backoff/v4" | 
|  | clientv3 "go.etcd.io/etcd/client/v3" | 
|  |  | 
|  | "source.monogon.dev/metropolis/node/core/consensus/client" | 
|  | "source.monogon.dev/metropolis/pkg/event" | 
|  | ) | 
|  |  | 
|  | var ( | 
|  | // Type assert that *Value implements event.ValueWatcher. We do this | 
|  | // artificially, as there currently is no code path that needs this to be | 
|  | // strictly true.  However, users of this library might want to rely on the | 
|  | // Value type instead of particular Value implementations. | 
|  | _ event.ValueWatch = &Value{} | 
|  | ) | 
|  |  | 
|  | // Value is an 'Event Value' backed in an etcd cluster, accessed over an | 
|  | // etcd client. This is a stateless handle and can be copied and shared across | 
|  | // goroutines. | 
|  | type Value struct { | 
|  | etcd    client.Namespaced | 
|  | key     string | 
|  | keyEnd  string | 
|  | decoder BytesDecoder | 
|  | } | 
|  |  | 
|  | type Option struct { | 
|  | rangeEnd string | 
|  | } | 
|  |  | 
|  | // Range creates a Value that is backed a range of etcd key/value pairs from | 
|  | // 'key' passed to NewValue to 'end' passed to Range. | 
|  | // | 
|  | // The key range semantics (ie. lexicographic ordering) are the same as in etcd | 
|  | // ranges, so for example to retrieve all keys prefixed by `foo/` key should be | 
|  | // `foo/` and end should be `foo0`. | 
|  | // | 
|  | // For any update in the given range, the decoder will be called and its result | 
|  | // will trigger the return of a Get() call. The decoder should return a type | 
|  | // that lets the user distinguish which of the multiple objects in the range got | 
|  | // updated, as the Get() call returns no additional information about the | 
|  | // location of the retrieved object by itself. | 
|  | // | 
|  | // The order of values retrieved by Get() is currently fully arbitrary and must | 
|  | // not be relied on. It's possible that in the future the order of updates and | 
|  | // the blocking behaviour of Get will be formalized, but this is not yet the | 
|  | // case. Instead, the data returned should be treated as eventually consistent | 
|  | // with the etcd state. | 
|  | // | 
|  | // For some uses, it might be necessary to first retrieve all the objects | 
|  | // contained within the range before starting to block on updates - in this | 
|  | // case, the BacklogOnly option should be used when calling Get. | 
|  | func Range(end string) *Option { | 
|  | return &Option{ | 
|  | rangeEnd: end, | 
|  | } | 
|  | } | 
|  |  | 
|  | // NewValue creates a new Value for a given key(s) in an etcd client. The | 
|  | // given decoder will be used to convert bytes retrieved from etcd into the | 
|  | // interface{} value retrieved by Get by this value's watcher. | 
|  | func NewValue(etcd client.Namespaced, key string, decoder BytesDecoder, options ...*Option) *Value { | 
|  | res := &Value{ | 
|  | etcd:    etcd, | 
|  | key:     key, | 
|  | keyEnd:  key, | 
|  | decoder: decoder, | 
|  | } | 
|  |  | 
|  | for _, opt := range options { | 
|  | if end := opt.rangeEnd; end != "" { | 
|  | res.keyEnd = end | 
|  | } | 
|  | } | 
|  |  | 
|  | return res | 
|  | } | 
|  |  | 
|  | // BytesDecoder is a function that converts bytes retrieved from etcd into an | 
|  | // end-user facing value. Additionally, a key is available so that returned | 
|  | // values can be augmented with the location they were retrieved from. This is | 
|  | // especially useful when returning values resulting from an etcd range. | 
|  | // | 
|  | // If an error is returned, the Get call performed on a watcher configured with | 
|  | // this decoder will fail, swallowing that particular update, but the watcher | 
|  | // will continue to work. Any provided BytesDecoder implementations must be safe | 
|  | // to copy. | 
|  | type BytesDecoder = func(key []byte, data []byte) (interface{}, error) | 
|  |  | 
|  | // NoDecoder is a no-op decoder which passes through the retrieved bytes as a | 
|  | // []byte type to the user. | 
|  | func NoDecoder(key []byte, data []byte) (interface{}, error) { | 
|  | return data, nil | 
|  | } | 
|  |  | 
|  | func (e *Value) Watch() event.Watcher { | 
|  | ctx, ctxC := context.WithCancel(context.Background()) | 
|  | return &watcher{ | 
|  | Value: *e, | 
|  |  | 
|  | ctx:  ctx, | 
|  | ctxC: ctxC, | 
|  |  | 
|  | current: make(map[string][]byte), | 
|  |  | 
|  | getSem: make(chan struct{}, 1), | 
|  | } | 
|  | } | 
|  |  | 
|  | type watcher struct { | 
|  | // Value copy, used to configure the behaviour of this watcher. | 
|  | Value | 
|  |  | 
|  | // ctx is the context that expresses the liveness of this watcher. It is | 
|  | // canceled when the watcher is closed, and the etcd Watch hangs off of it. | 
|  | ctx  context.Context | 
|  | ctxC context.CancelFunc | 
|  |  | 
|  | // getSem is a semaphore used to limit concurrent Get calls and throw an | 
|  | // error if concurrent access is attempted. | 
|  | getSem chan struct{} | 
|  |  | 
|  | // backlogged is a list of keys retrieved from etcd but not yet returned via | 
|  | // Get. These items are not a replay of all the updates from etcd, but are | 
|  | // already compacted to deduplicate updates to the same object (ie., if the | 
|  | // update stream from etcd is for keys A, B, and A, the backlogged list will | 
|  | // only contain one update for A and B each, with the first update for A being | 
|  | // discarded upon arrival of the second update). | 
|  | // | 
|  | // The keys are an index into the current map, which contains the values | 
|  | // retrieved, including ones that have already been returned via Get. This | 
|  | // persistence allows us to deduplicate spurious updates to the user, in which | 
|  | // etcd returned a new revision of a key, but the data stayed the same. | 
|  | backlogged [][]byte | 
|  | // current map, keyed from etcd key into etcd value at said key. This map | 
|  | // persists alongside an etcd connection, permitting deduplication of spurious | 
|  | // etcd updates even across multiple Get calls. | 
|  | current map[string][]byte | 
|  |  | 
|  | // prev is the etcd store revision of a previously completed etcd Get/Watch | 
|  | // call, used to resume a Watch call in case of failures. | 
|  | prev *int64 | 
|  | // wc is the etcd watch channel, or nil if no channel is yet open. | 
|  | wc clientv3.WatchChan | 
|  |  | 
|  | // testRaceWG is an optional WaitGroup that, if set, will be waited upon | 
|  | // after the initial KV value retrieval, but before the watch is created. | 
|  | // This is only used for testing. | 
|  | testRaceWG *sync.WaitGroup | 
|  | // testSetupWG is an optional WaitGroup that, if set, will be waited upon | 
|  | // after the etcd watch is created. | 
|  | // This is only used for testing. | 
|  | testSetupWG *sync.WaitGroup | 
|  | } | 
|  |  | 
|  | // setup initiates wc (the watch channel from etcd) after retrieving the initial | 
|  | // value(s) with a get operation. | 
|  | func (w *watcher) setup(ctx context.Context) error { | 
|  | if w.wc != nil { | 
|  | return nil | 
|  | } | 
|  | ranged := w.key != w.keyEnd | 
|  |  | 
|  | // First, check if some data under this key/range already exists. | 
|  |  | 
|  | // We use an exponential backoff and retry here as the initial Get can fail | 
|  | // if the cluster is unstable (eg. failing over). We only fail the retry if | 
|  | // the context expires. | 
|  | bo := backoff.NewExponentialBackOff() | 
|  | bo.MaxElapsedTime = 0 | 
|  |  | 
|  | err := backoff.Retry(func() error { | 
|  |  | 
|  | var getOpts []clientv3.OpOption | 
|  | if ranged { | 
|  | getOpts = append(getOpts, clientv3.WithRange(w.keyEnd)) | 
|  | } | 
|  | get, err := w.etcd.Get(ctx, w.key, getOpts...) | 
|  | if err != nil { | 
|  | return fmt.Errorf("when retrieving initial value: %w", err) | 
|  | } | 
|  |  | 
|  | // Assert that the etcd API is behaving as expected. | 
|  | if !ranged && len(get.Kvs) > 1 { | 
|  | panic("More than one key returned in unary GET response") | 
|  | } | 
|  |  | 
|  | // After a successful Get, save the revision to watch from and re-build the | 
|  | // backlog from scratch based on what was available in the etcd store at that | 
|  | // time. | 
|  | w.prev = &get.Header.Revision | 
|  |  | 
|  | w.backlogged = nil | 
|  | w.current = make(map[string][]byte) | 
|  | for _, kv := range get.Kvs { | 
|  | w.backlogged = append(w.backlogged, kv.Key) | 
|  | w.current[string(kv.Key)] = kv.Value | 
|  | } | 
|  | return nil | 
|  |  | 
|  | }, backoff.WithContext(bo, ctx)) | 
|  |  | 
|  | if w.testRaceWG != nil { | 
|  | w.testRaceWG.Wait() | 
|  | } | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  |  | 
|  | watchOpts := []clientv3.OpOption{ | 
|  | clientv3.WithRev(*w.prev + 1), | 
|  | } | 
|  | if ranged { | 
|  | watchOpts = append(watchOpts, clientv3.WithRange(w.keyEnd)) | 
|  | } | 
|  | w.wc = w.etcd.Watch(w.ctx, w.key, watchOpts...) | 
|  |  | 
|  | if w.testSetupWG != nil { | 
|  | w.testSetupWG.Wait() | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // backfill blocks until a backlog of items is available. An error is returned | 
|  | // if the context is canceled. | 
|  | func (w *watcher) backfill(ctx context.Context) error { | 
|  | // Keep watching for watch events. | 
|  | for { | 
|  | var resp *clientv3.WatchResponse | 
|  | select { | 
|  | case r := <-w.wc: | 
|  | resp = &r | 
|  | case <-ctx.Done(): | 
|  | return ctx.Err() | 
|  | } | 
|  |  | 
|  | if resp.Canceled { | 
|  | // Only allow for watches to be canceled due to context | 
|  | // cancellations. Any other error is something we need to handle, | 
|  | // eg. a client close or compaction error. | 
|  | if errors.Is(resp.Err(), ctx.Err()) { | 
|  | return fmt.Errorf("watch canceled: %w", resp.Err()) | 
|  | } | 
|  |  | 
|  | // Attempt to reconnect. | 
|  | if w.wc != nil { | 
|  | // If a wc already exists, close it. This forces a reconnection | 
|  | // by the next setup call. | 
|  | w.ctxC() | 
|  | w.ctx, w.ctxC = context.WithCancel(context.Background()) | 
|  | w.wc = nil | 
|  | } | 
|  | if err := w.setup(ctx); err != nil { | 
|  | return fmt.Errorf("failed to setup watcher: %w", err) | 
|  | } | 
|  | continue | 
|  | } | 
|  |  | 
|  | w.prev = &resp.Header.Revision | 
|  | // Spurious watch event with no update? Keep trying. | 
|  | if len(resp.Events) == 0 { | 
|  | continue | 
|  | } | 
|  |  | 
|  | // Process updates into compacted list, transforming deletions into value: nil | 
|  | // keyValues. This maps an etcd key into a pointer in the already existing | 
|  | // backlog list. It will then be used to compact all updates into the smallest | 
|  | // backlog possible (by overriding previously backlogged items for a key if this | 
|  | // key is encountered again). | 
|  | // | 
|  | // TODO(q3k): this could be stored in the watcher state to not waste time on | 
|  | // each update, but it's good enough for now. | 
|  |  | 
|  | // Prepare a set of keys that already exist in the backlog. This will be used | 
|  | // to make sure we don't duplicate backlog entries while maintaining a stable | 
|  | // backlog order. | 
|  | seen := make(map[string]bool) | 
|  | for _, k := range w.backlogged { | 
|  | seen[string(k)] = true | 
|  | } | 
|  |  | 
|  | for _, ev := range resp.Events { | 
|  | var value []byte | 
|  | switch ev.Type { | 
|  | case clientv3.EventTypeDelete: | 
|  | case clientv3.EventTypePut: | 
|  | value = ev.Kv.Value | 
|  | default: | 
|  | return fmt.Errorf("invalid event type %v", ev.Type) | 
|  | } | 
|  |  | 
|  | keyS := string(ev.Kv.Key) | 
|  | prev := w.current[keyS] | 
|  | // Short-circuit and skip updates with the same content as already present. | 
|  | // These are sometimes emitted by etcd. | 
|  | if bytes.Equal(prev, value) { | 
|  | continue | 
|  | } | 
|  |  | 
|  | // Only insert to backlog if not yet present, but maintain order. | 
|  | if !seen[string(ev.Kv.Key)] { | 
|  | w.backlogged = append(w.backlogged, ev.Kv.Key) | 
|  | seen[string(ev.Kv.Key)] = true | 
|  | } | 
|  | // Regardless of backlog list, always update the key to its newest value. | 
|  | w.current[keyS] = value | 
|  | } | 
|  |  | 
|  | // Still nothing in backlog? Keep trying. | 
|  | if len(w.backlogged) == 0 { | 
|  | continue | 
|  | } | 
|  |  | 
|  | return nil | 
|  | } | 
|  | } | 
|  |  | 
|  | type GetOption struct { | 
|  | backlogOnly bool | 
|  | } | 
|  |  | 
|  | var ( | 
|  | // BacklogOnly will prevent Get from blocking on waiting for more updates from | 
|  | // etcd, by instead returning BacklogDone whenever no more data is currently | 
|  | // locally available. This is different however, from establishing that there | 
|  | // are no more pending updates from the etcd cluster - the only way to ensure | 
|  | // the local client is up to date is by performing Get calls without this option | 
|  | // set. | 
|  | // | 
|  | // This mode of retrieval should only be used for the retrieval of the existing | 
|  | // data in the etcd cluster on the initial creation of the Watcher (by | 
|  | // repeatedly calling Get until BacklogDone isreturned), and shouldn't be set | 
|  | // for any subsequent call. Any use of this option after that initial fetch is | 
|  | // undefined behaviour that exposes the internals of the Get implementation, and | 
|  | // must not be relied on. However, in the future, this behaviour might be | 
|  | // formalized. | 
|  | // | 
|  | // This mode is particularly useful for ranged watchers. Non-ranged watchers can | 
|  | // still use this option to distinguish between blocking because of the | 
|  | // nonexistence of an object vs. blocking because of networking issues. However, | 
|  | // non-ranged retrieval semantics generally will rarely need to make this | 
|  | // distinction. | 
|  | BacklogOnly = GetOption{backlogOnly: true} | 
|  |  | 
|  | // BacklogDone is returned by Get when BacklogOnly is set and there is no more | 
|  | // event data stored in the Watcher client, ie. when the initial cluster state | 
|  | // of the requested key has been retrieved. | 
|  | BacklogDone = errors.New("no more backlogged data") | 
|  | ) | 
|  |  | 
|  | // Get implements the Get method of the Watcher interface. | 
|  | // It can return an error in three cases: | 
|  | //  - the given context is canceled (in which case, the given error will wrap | 
|  | //    the context error) | 
|  | //  - the watcher's BytesDecoder returned an error (in which case the error | 
|  | //    returned by the BytesDecoder will be returned verbatim) | 
|  | //  - it has been called with BacklogOnly and the Watcher has no more local | 
|  | //    event data to return (see BacklogOnly for more information on the | 
|  | //    semantics of this mode of operation) | 
|  | // Note that transient and permanent etcd errors are never returned, and the | 
|  | // Get call will attempt to recover from these errors as much as possible. This | 
|  | // also means that the user of the Watcher will not be notified if the | 
|  | // underlying etcd client disconnects from the cluster, or if the cluster loses | 
|  | // quorum. | 
|  | // | 
|  | // TODO(q3k): implement leases to allow clients to be notified when there are | 
|  | // transient cluster/quorum/partition errors, if needed. | 
|  | // | 
|  | // TODO(q3k): implement internal, limited buffering for backlogged data not yet | 
|  | // consumed by client, as etcd client library seems to use an unbound buffer in | 
|  | // case this happens ( see: watcherStream.buf in clientv3). | 
|  | func (w *watcher) Get(ctx context.Context, opts ...event.GetOption) (interface{}, error) { | 
|  | select { | 
|  | case w.getSem <- struct{}{}: | 
|  | default: | 
|  | return nil, fmt.Errorf("cannot Get() concurrently on a single waiter") | 
|  | } | 
|  | defer func() { | 
|  | <-w.getSem | 
|  | }() | 
|  |  | 
|  | backlogOnly := false | 
|  | for _, optI := range opts { | 
|  | opt, ok := optI.(GetOption) | 
|  | if !ok { | 
|  | return nil, fmt.Errorf("get options must be of type etcd.GetOption") | 
|  | } | 
|  | if opt.backlogOnly { | 
|  | backlogOnly = true | 
|  | } | 
|  | } | 
|  |  | 
|  | // Early check for context cancelations, preventing spurious contact with etcd | 
|  | // if there's no need to. | 
|  | if w.ctx.Err() != nil { | 
|  | return nil, w.ctx.Err() | 
|  | } | 
|  |  | 
|  | if err := w.setup(ctx); err != nil { | 
|  | return nil, fmt.Errorf("when setting up watcher: %w", err) | 
|  | } | 
|  |  | 
|  | if backlogOnly && len(w.backlogged) == 0 { | 
|  | return nil, BacklogDone | 
|  | } | 
|  |  | 
|  | // Update backlog from etcd if needed. | 
|  | if len(w.backlogged) < 1 { | 
|  | err := w.backfill(ctx) | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("when watching for new value: %w", err) | 
|  | } | 
|  | } | 
|  | // Backlog is now guaranteed to contain at least one element. | 
|  |  | 
|  | ranged := w.key != w.keyEnd | 
|  | if !ranged { | 
|  | // For non-ranged queries, drain backlog fully. | 
|  | if len(w.backlogged) != 1 { | 
|  | panic(fmt.Sprintf("multiple keys in nonranged value: %v", w.backlogged)) | 
|  | } | 
|  | k := w.backlogged[0] | 
|  | v := w.current[string(k)] | 
|  | w.backlogged = nil | 
|  | return w.decoder(k, v) | 
|  | } else { | 
|  | // For ranged queries, pop one ranged query off the backlog. | 
|  | k := w.backlogged[0] | 
|  | v := w.current[string(k)] | 
|  | w.backlogged = w.backlogged[1:] | 
|  | return w.decoder(k, v) | 
|  | } | 
|  | } | 
|  |  | 
|  | func (w *watcher) Close() error { | 
|  | w.ctxC() | 
|  | return nil | 
|  | } |