m/test/e2e: Implement TestE2ECoreHA
This is a basic test which exercises a TPM-sealed three-node cluster by
performing a rolling restart of control plane nodes.
Change-Id: Ic6f46192d8ccba1ef7a767988cf5a216beb5a4c6
Reviewed-on: https://review.monogon.dev/c/monogon/+/2884
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index 19df697..7004f7b 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -191,7 +191,72 @@
})
}
-// TestE2ECore exercisees the Kubernetes functionality of Metropolis.
+// TestE2ECoreHA exercises the basics of a high-availability control plane by
+// starting up a 3-node cluster, turning all nodes into ConsensusMembers, then
+// performing a rolling restart.
+func TestE2ECoreHA(t *testing.T) {
+ // Set a global timeout to make sure this terminates
+ ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
+ defer cancel()
+
+ rPath, err := runfiles.Rlocation("_main/metropolis/test/e2e/testimages_manifest.prototxt")
+ if err != nil {
+ t.Fatalf("Resolving registry manifest failed: %v", err)
+ }
+ df, err := os.ReadFile(rPath)
+ if err != nil {
+ t.Fatalf("Reading registry manifest failed: %v", err)
+ }
+ lr, err := localregistry.FromBazelManifest(df)
+ if err != nil {
+ t.Fatalf("Creating test image registry failed: %v", err)
+ }
+ // Launch cluster.
+ clusterOptions := cluster.ClusterOptions{
+ NumNodes: 3,
+ LocalRegistry: lr,
+ NodeLogsToFiles: true,
+ }
+ cluster, err := cluster.LaunchCluster(ctx, clusterOptions)
+ if err != nil {
+ t.Fatalf("LaunchCluster failed: %v", err)
+ }
+ defer func() {
+ err := cluster.Close()
+ if err != nil {
+ t.Fatalf("cluster Close failed: %v", err)
+ }
+ }()
+
+ launch.Log("E2E: Cluster running, starting tests...")
+
+ util.MustTestEventual(t, "Add ConsensusMember roles", ctx, smallTestTimeout, func(ctx context.Context) error {
+ // Make everything but the first node into ConsensusMember.
+ for i := 1; i < clusterOptions.NumNodes; i++ {
+ err := cluster.MakeConsensusMember(ctx, cluster.NodeIDs[i])
+ if err != nil {
+ return fmt.Errorf("MakeConsensusMember(%d/%s): %w", i, cluster.NodeIDs[i], err)
+ }
+ }
+ return nil
+ })
+ util.TestEventual(t, "Heartbeat test successful", ctx, 20*time.Second, cluster.AllNodesHealthy)
+
+ // Perform rolling restart of all nodes. When a node rejoins it must be able to
+ // contact the cluster, so this also exercises that the cluster is serving even
+ // with the node having rebooted.
+ for i := 0; i < clusterOptions.NumNodes; i++ {
+ util.MustTestEventual(t, fmt.Sprintf("Node %d rejoin successful", i), ctx, 60*time.Second, func(ctx context.Context) error {
+ // Ensure nodes rejoin the cluster after a reboot by reboting the 1st node.
+ if err := cluster.RebootNode(ctx, i); err != nil {
+ return fmt.Errorf("while rebooting a node: %w", err)
+ }
+ return nil
+ })
+ }
+}
+
+// TestE2EKubernetes exercises the Kubernetes functionality of Metropolis.
//
// The tests are performed against an in-memory cluster.
func TestE2EKubernetes(t *testing.T) {
diff --git a/metropolis/test/launch/cluster/BUILD.bazel b/metropolis/test/launch/cluster/BUILD.bazel
index 5d48f70..4b935f0 100644
--- a/metropolis/test/launch/cluster/BUILD.bazel
+++ b/metropolis/test/launch/cluster/BUILD.bazel
@@ -21,6 +21,7 @@
deps = [
"//metropolis/cli/metroctl/core",
"//metropolis/node",
+ "//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/identity",
"//metropolis/node/core/rpc",
"//metropolis/node/core/rpc/resolver",
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index b1bb072..49c60dc 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -37,6 +37,7 @@
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
apb "source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
@@ -1310,3 +1311,60 @@
})
return err
}
+
+// MakeConsensusMember adds the ConsensusMember role to a node by ID.
+func (c *Cluster) MakeConsensusMember(ctx context.Context, id string) error {
+ curC, err := c.CuratorClient()
+ if err != nil {
+ return err
+ }
+ mgmt := apb.NewManagementClient(curC)
+ cur := ipb.NewCuratorClient(curC)
+
+ tr := true
+ launch.Log("Cluster: %s: adding ConsensusMember", id)
+ bo := backoff.NewExponentialBackOff()
+ bo.MaxElapsedTime = 10 * time.Second
+
+ backoff.Retry(func() error {
+ _, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
+ Node: &apb.UpdateNodeRolesRequest_Id{
+ Id: id,
+ },
+ ConsensusMember: &tr,
+ })
+ if err != nil {
+ launch.Log("Cluster: %s: UpdateNodeRoles failed: %v", id, err)
+ }
+ return err
+ }, backoff.WithContext(bo, ctx))
+ if err != nil {
+ return err
+ }
+
+ launch.Log("Cluster: %s: waiting for learner/full members...", id)
+
+ learner := false
+ for {
+ res, err := cur.GetConsensusStatus(ctx, &ipb.GetConsensusStatusRequest{})
+ if err != nil {
+ return fmt.Errorf("GetConsensusStatus: %w", err)
+ }
+ for _, member := range res.EtcdMember {
+ if member.Id != id {
+ continue
+ }
+ switch member.Status {
+ case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_LEARNER:
+ if !learner {
+ learner = true
+ launch.Log("Cluster: %s: became a learner, waiting for full member...", id)
+ }
+ case ipb.GetConsensusStatusResponse_EtcdMember_STATUS_FULL:
+ launch.Log("Cluster: %s: became a full member", id)
+ return nil
+ }
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+}