m/p/event/etcd: better handle update coalescing

I wasn't able to replicate this in a test, but sometimes etcd sends
multiple events relating to the same key within a single entry in the
watch channel. I assume this happens when the etcd watcher client is not
reading from the server fast enough.

Anyway, when that happened, we'd get a panic about duplicate keys in a
non-ranged call because of a logic bug in the entry coalescing (in which
we would not coalesce multiple updates to the same key, just updates
with the same key and value). Now we coalesce these too, and the bug
seems to be gone.

Change-Id: I36234cc1104ec96a38ad1566b9df75532df44bba
Reviewed-on: https://review.monogon.dev/c/monogon/+/800
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/pkg/event/etcd/etcd.go b/metropolis/pkg/event/etcd/etcd.go
index 0f7f453..d49a592 100644
--- a/metropolis/pkg/event/etcd/etcd.go
+++ b/metropolis/pkg/event/etcd/etcd.go
@@ -278,6 +278,15 @@
 		//
 		// TODO(q3k): this could be stored in the watcher state to not waste time on
 		// each update, but it's good enough for now.
+
+		// Prepare a set of keys that already exist in the backlog. This will be used
+		// to make sure we don't duplicate backlog entries while maintaining a stable
+		// backlog order.
+		seen := make(map[string]bool)
+		for _, k := range w.backlogged {
+			seen[string(k)] = true
+		}
+
 		for _, ev := range resp.Events {
 			var value []byte
 			switch ev.Type {
@@ -290,10 +299,19 @@
 
 			keyS := string(ev.Kv.Key)
 			prev := w.current[keyS]
-			if !bytes.Equal(prev, value) {
-				w.backlogged = append(w.backlogged, ev.Kv.Key)
-				w.current[keyS] = value
+			// Short-circuit and skip updates with the same content as already present.
+			// These are sometimes emitted by etcd.
+			if bytes.Equal(prev, value) {
+				continue
 			}
+
+			// Only insert to backlog if not yet present, but maintain order.
+			if !seen[string(ev.Kv.Key)] {
+				w.backlogged = append(w.backlogged, ev.Kv.Key)
+				seen[string(ev.Kv.Key)] = true
+			}
+			// Regardless of backlog list, always update the key to its newest value.
+			w.current[keyS] = value
 		}
 
 		// Still nothing in backlog? Keep trying.