m/test/e2e: deflake

Now that nodes don't heartbeat before they have critical ESP data
persisted, we can simply make sure they heartbeat before any disruptive
reboot and that will remove our biggest source of flakiness in E2E
tests.

Change-Id: I9d4483015341157af6b27c8bd98f5df64da229d2
Reviewed-on: https://review.monogon.dev/c/monogon/+/1499
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/test/e2e/BUILD.bazel b/metropolis/test/e2e/BUILD.bazel
index 8853554..3db8a33 100644
--- a/metropolis/test/e2e/BUILD.bazel
+++ b/metropolis/test/e2e/BUILD.bazel
@@ -27,8 +27,6 @@
         "//third_party/edk2:firmware",
     ],
     embed = [":e2e"],
-    # TODO: https://github.com/monogon-dev/monogon/issues/170
-    flaky = True,
     rundir = ".",
     deps = [
         "//metropolis/node",
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index b57f3d3..7a72db2 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -20,7 +20,6 @@
 	"context"
 	"errors"
 	"fmt"
-	"io"
 	"net"
 	"net/http"
 	_ "net/http"
@@ -144,6 +143,7 @@
 				}
 				return nil
 			})
+			util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
 			util.TestEventual(t, "Node rejoin successful", ctx, 60*time.Second, func(ctx context.Context) error {
 				// Ensure nodes rejoin the cluster after a reboot by reboting the 1st node.
 				if err := cluster.RebootNode(ctx, 1); err != nil {
@@ -151,40 +151,7 @@
 				}
 				return nil
 			})
-			util.TestEventual(t, "Heartbeat test successful", ctx, 60*time.Second, func(ctx context.Context) error {
-				// Ensure all cluster nodes are capable of sending heartbeat updates.
-				// This test assumes the expected count of nodes is already present in
-				// the cluster.
-				for {
-					srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
-					if err != nil {
-						return fmt.Errorf("GetNodes: %w", err)
-					}
-
-					// Count the unhealthy nodes.
-					var unhealthy int
-					for {
-						node, err := srvN.Recv()
-						if err == io.EOF {
-							break
-						}
-						if err != nil {
-							return fmt.Errorf("GetNodes.Recv: %w", err)
-						}
-
-						if node.Health != apb.Node_HEALTHY {
-							unhealthy++
-						}
-					}
-
-					// If all nodes tested in this iteration are healthy, the test has
-					// been passed.
-					if unhealthy == 0 {
-						break
-					}
-				}
-				return nil
-			})
+			util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
 		})
 		t.Run("Kubernetes", func(t *testing.T) {
 			t.Parallel()
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index c433780..0efd08b 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -18,6 +18,7 @@
 	"os/exec"
 	"path"
 	"path/filepath"
+	"strings"
 	"syscall"
 	"time"
 
@@ -1099,3 +1100,28 @@
 	}
 	return res, nil
 }
+
+func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
+	// Get an authenticated owner client within the cluster.
+	curC, err := c.CuratorClient()
+	if err != nil {
+		return err
+	}
+	mgmt := apb.NewManagementClient(curC)
+	nodes, err := getNodes(ctx, mgmt)
+	if err != nil {
+		return err
+	}
+
+	var unhealthy []string
+	for _, node := range nodes {
+		if node.Health == apb.Node_HEALTHY {
+			continue
+		}
+		unhealthy = append(unhealthy, node.Id)
+	}
+	if len(unhealthy) == 0 {
+		return nil
+	}
+	return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
+}