m/test/e2e: Implement TestE2ECoreHA

This is a basic test which exercises a TPM-sealed three-node cluster by
performing a rolling restart of control plane nodes.

Change-Id: Ic6f46192d8ccba1ef7a767988cf5a216beb5a4c6
Reviewed-on: https://review.monogon.dev/c/monogon/+/2884
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index 19df697..7004f7b 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -191,7 +191,72 @@
 	})
 }
 
-// TestE2ECore exercisees the Kubernetes functionality of Metropolis.
+// TestE2ECoreHA exercises the basics of a high-availability control plane by
+// starting up a 3-node cluster, turning all nodes into ConsensusMembers, then
+// performing a rolling restart.
+func TestE2ECoreHA(t *testing.T) {
+	// Set a global timeout to make sure this terminates
+	ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
+	defer cancel()
+
+	rPath, err := runfiles.Rlocation("_main/metropolis/test/e2e/testimages_manifest.prototxt")
+	if err != nil {
+		t.Fatalf("Resolving registry manifest failed: %v", err)
+	}
+	df, err := os.ReadFile(rPath)
+	if err != nil {
+		t.Fatalf("Reading registry manifest failed: %v", err)
+	}
+	lr, err := localregistry.FromBazelManifest(df)
+	if err != nil {
+		t.Fatalf("Creating test image registry failed: %v", err)
+	}
+	// Launch cluster.
+	clusterOptions := cluster.ClusterOptions{
+		NumNodes:        3,
+		LocalRegistry:   lr,
+		NodeLogsToFiles: true,
+	}
+	cluster, err := cluster.LaunchCluster(ctx, clusterOptions)
+	if err != nil {
+		t.Fatalf("LaunchCluster failed: %v", err)
+	}
+	defer func() {
+		err := cluster.Close()
+		if err != nil {
+			t.Fatalf("cluster Close failed: %v", err)
+		}
+	}()
+
+	launch.Log("E2E: Cluster running, starting tests...")
+
+	util.MustTestEventual(t, "Add ConsensusMember roles", ctx, smallTestTimeout, func(ctx context.Context) error {
+		// Make everything but the first node into ConsensusMember.
+		for i := 1; i < clusterOptions.NumNodes; i++ {
+			err := cluster.MakeConsensusMember(ctx, cluster.NodeIDs[i])
+			if err != nil {
+				return fmt.Errorf("MakeConsensusMember(%d/%s): %w", i, cluster.NodeIDs[i], err)
+			}
+		}
+		return nil
+	})
+	util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
+
+	// Perform rolling restart of all nodes. When a node rejoins it must be able to
+	// contact the cluster, so this also exercises that the cluster is serving even
+	// with the node having rebooted.
+	for i := 0; i < clusterOptions.NumNodes; i++ {
+		util.MustTestEventual(t, fmt.Sprintf("Node %d rejoin successful", i), ctx, 60*time.Second, func(ctx context.Context) error {
+			// Ensure nodes rejoin the cluster after a reboot by reboting the 1st node.
+			if err := cluster.RebootNode(ctx, i); err != nil {
+				return fmt.Errorf("while rebooting a node: %w", err)
+			}
+			return nil
+		})
+	}
+}
+
+// TestE2EKubernetes exercises the Kubernetes functionality of Metropolis.
 //
 // The tests are performed against an in-memory cluster.
 func TestE2EKubernetes(t *testing.T) {
diff --git a/metropolis/test/launch/cluster/BUILD.bazel b/metropolis/test/launch/cluster/BUILD.bazel
index 5d48f70..4b935f0 100644
--- a/metropolis/test/launch/cluster/BUILD.bazel
+++ b/metropolis/test/launch/cluster/BUILD.bazel
@@ -21,6 +21,7 @@
     deps = [
         "//metropolis/cli/metroctl/core",
         "//metropolis/node",
+        "//metropolis/node/core/curator/proto/api",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/rpc",
         "//metropolis/node/core/rpc/resolver",
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index b1bb072..49c60dc 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -37,6 +37,7 @@
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
 
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	apb "source.monogon.dev/metropolis/proto/api"
 	cpb "source.monogon.dev/metropolis/proto/common"
 
@@ -1310,3 +1311,60 @@
 	})
 	return err
 }
+
+// MakeConsensusMember adds the ConsensusMember role to a node by ID.
+func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
+	curC, err := c.CuratorClient()
+	if err != nil {
+		return err
+	}
+	mgmt := apb.NewManagementClient(curC)
+	cur := ipb.NewCuratorClient(curC)
+
+	tr := true
+	launch.Log("Cluster: %s: adding ConsensusMember", id)
+	bo := backoff.NewExponentialBackOff()
+	bo.MaxElapsedTime = 10 * time.Second
+
+	backoff.Retry(func() error {
+		_, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
+			Node: &apb.UpdateNodeRolesRequest_Id{
+				Id: id,
+			},
+			ConsensusMember: &tr,
+		})
+		if err != nil {
+			launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
+		}
+		return err
+	}, backoff.WithContext(bo, ctx))
+	if err != nil {
+		return err
+	}
+
+	launch.Log("Cluster: %s: waiting for learner/full members...", id)
+
+	learner := false
+	for {
+		res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
+		if err != nil {
+			return fmt.Errorf("GetConsensusStatus: %w", err)
+		}
+		for _, member := range res.EtcdMember {
+			if member.Id != id {
+				continue
+			}
+			switch member.Status {
+			case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
+				if !learner {
+					learner = true
+					launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
+				}
+			case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
+				launch.Log("Cluster: %s: became a full member", id)
+				return nil
+			}
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+}