m/p/event/etcd: handle spurious watch updates

With the recent etcd updates, we started seeing some failures in tests
for the etcd-backed Event Value library.

This seems to be due to etcd now sometimes returning 'spurious' watch
updates, in which a keyvalue is returned twice, with two separate
revision numbers, even though the underlying value has not been
updated.

We elect to deduplicate these within the event value library itself, if
only to make it less work for downstream users to do the same. This is
done be keeping a cross-watcher.Get map of key->values, and filtering
out updates which effectively do not update the data underneath.

We had one test relying on 1:1 correspondance between etcd puts and
Event Value backlogged Gets. However, the rest of our codebase does not
make this assumption, and it seems fair that this assumption doesn't
make sense alongside the intended use of the Event Value system in which
we deliberately and arbitrarily drop intermediate updates within a
single Get call.

Change-Id: I731a15b2d15ab6807bb95cb6c777c176dde22f0b
Reviewed-on: https://review.monogon.dev/c/monogon/+/654
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/pkg/event/etcd/etcd.go b/metropolis/pkg/event/etcd/etcd.go
index cb93325..f5ccd27 100644
--- a/metropolis/pkg/event/etcd/etcd.go
+++ b/metropolis/pkg/event/etcd/etcd.go
@@ -1,6 +1,7 @@
 package etcd
 
 import (
+	"bytes"
 	"context"
 	"errors"
 	"fmt"
@@ -108,17 +109,12 @@
 		ctx:  ctx,
 		ctxC: ctxC,
 
+		current: make(map[string][]byte),
+
 		getSem: make(chan struct{}, 1),
 	}
 }
 
-// keyValue is an intermediate type used to keep etcd values in the watcher's
-// backlog.
-type keyValue struct {
-	key   []byte
-	value []byte
-}
-
 type watcher struct {
 	// Value copy, used to configure the behaviour of this watcher.
 	Value
@@ -132,13 +128,23 @@
 	// error if concurrent access is attempted.
 	getSem chan struct{}
 
-	// backlogged is a list of (key, value) pairs retrieved from etcd but not yet
-	// returned via Get. These items are not a replay of all the updates from etcd,
-	// but are already compacted to deduplicate updates to the same object (ie., if
-	// the update stream from etcd is for keys A, B, and A, the backlogged list will
+	// backlogged is a list of keys retrieved from etcd but not yet returned via
+	// Get. These items are not a replay of all the updates from etcd, but are
+	// already compacted to deduplicate updates to the same object (ie., if the
+	// update stream from etcd is for keys A, B, and A, the backlogged list will
 	// only contain one update for A and B each, with the first update for A being
 	// discarded upon arrival of the second update).
-	backlogged []*keyValue
+	//
+	// The keys are an index into the current map, which contains the values
+	// retrieved, including ones that have already been returned via Get. This
+	// persistence allows us to deduplicate spurious updates to the user, in which
+	// etcd returned a new revision of a key, but the data stayed the same.
+	backlogged [][]byte
+	// current map, keyed from etcd key into etcd value at said key. This map
+	// persists alongside an etcd connection, permitting deduplication of spurious
+	// etcd updates even across multiple Get calls.
+	current map[string][]byte
+
 	// prev is the etcd store revision of a previously completed etcd Get/Watch
 	// call, used to resume a Watch call in case of failures.
 	prev *int64
@@ -193,11 +199,10 @@
 		w.prev = &get.Header.Revision
 
 		w.backlogged = nil
+		w.current = make(map[string][]byte)
 		for _, kv := range get.Kvs {
-			w.backlogged = append(w.backlogged, &keyValue{
-				key:   kv.Key,
-				value: kv.Value,
-			})
+			w.backlogged = append(w.backlogged, kv.Key)
+			w.current[string(kv.Key)] = kv.Value
 		}
 		return nil
 
@@ -273,11 +278,6 @@
 		//
 		// TODO(q3k): this could be stored in the watcher state to not waste time on
 		// each update, but it's good enough for now.
-		lastUpdate := make(map[string]*keyValue)
-		for _, kv := range w.backlogged {
-			kv := kv
-			lastUpdate[string(kv.key)] = kv
-		}
 		for _, ev := range resp.Events {
 			var value []byte
 			switch ev.Type {
@@ -289,15 +289,11 @@
 			}
 
 			keyS := string(ev.Kv.Key)
-			prev := lastUpdate[keyS]
-			if prev == nil {
-				kv := &keyValue{
-					key: ev.Kv.Key,
-				}
-				w.backlogged = append(w.backlogged, kv)
-				prev = kv
+			prev := w.current[keyS]
+			if !bytes.Equal(prev, value) {
+				w.backlogged = append(w.backlogged, ev.Kv.Key)
+				w.current[keyS] = value
 			}
-			prev.value = value
 		}
 
 		// Still nothing in backlog? Keep trying.
@@ -413,14 +409,16 @@
 		if len(w.backlogged) != 1 {
 			panic("multiple keys in nonranged value")
 		}
-		kv := w.backlogged[0]
+		k := w.backlogged[0]
+		v := w.current[string(k)]
 		w.backlogged = nil
-		return w.decoder(kv.key, kv.value)
+		return w.decoder(k, v)
 	} else {
 		// For ranged queries, pop one ranged query off the backlog.
-		kv := w.backlogged[0]
+		k := w.backlogged[0]
+		v := w.current[string(k)]
 		w.backlogged = w.backlogged[1:]
-		return w.decoder(kv.key, kv.value)
+		return w.decoder(k, v)
 	}
 }
 
diff --git a/metropolis/pkg/event/etcd/etcd_test.go b/metropolis/pkg/event/etcd/etcd_test.go
index faa9629..d91a6b9 100644
--- a/metropolis/pkg/event/etcd/etcd_test.go
+++ b/metropolis/pkg/event/etcd/etcd_test.go
@@ -654,17 +654,16 @@
 	wantError("foo")
 	wantValue("18", 18)
 	wantError("10")
-	wantError("10")
 	wantValue("27", 27)
 	wantValue("36", 36)
 
 	for i, want := range wantList {
 		q := <-queue
 		if want == nil && q.err == nil {
-			t.Errorf("Case %d: wanted error, got no error and value %d", i, q.val)
+			t.Fatalf("Case %d: wanted error, got no error and value %d", i, q.val)
 		}
 		if want != nil && (*want) != q.val {
-			t.Errorf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
+			t.Fatalf("Case %d: wanted value %d, got error %v and value %d", i, *want, q.err, q.val)
 		}
 	}
 }