m/n/c/curator: maintain consistency between roles and etcd members

When updating the consensus role, both etcd membership and the role need
to be updated. It is possible that the etcd membership change is applied
but the role update fails, resulting in an inconsistency. This change
adds a background process which cleans up this inconsistency by updating
roles to match etcd membership.

This is partially based on previous work by Serge Bazanski, where this
background sync was performed in the opposite direction: etcd membership
is removed if the role is missing. Here, I instead update the role based
on etcd membership. This has the benefit that we finish partially
applied management operations, instead of fighting them.

Co-authored-by: Serge Bazanski <serge@monogon.tech>
Change-Id: I8871b068d1d20c65bcbea5289eafe54676906819
Reviewed-on: https://review.monogon.dev/c/monogon/+/3438
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index fb8336a..eb6dbc8 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -9,6 +9,7 @@
         "impl_follower.go",
         "impl_leader.go",
         "impl_leader_aaa.go",
+        "impl_leader_background.go",
         "impl_leader_certificates.go",
         "impl_leader_cluster_networking.go",
         "impl_leader_curator.go",
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index fe60550..6d5be0f 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -150,6 +150,7 @@
 	leaderCurator
 	leaderAAA
 	leaderManagement
+	leaderBackground
 }
 
 func newCuratorLeader(l *leadership, node *identity.Node) *curatorLeader {
@@ -160,5 +161,6 @@
 		leaderCurator{leadership: l},
 		leaderAAA{leadership: l},
 		leaderManagement{leadership: l, node: node},
+		leaderBackground{leadership: l},
 	}
 }
diff --git a/metropolis/node/core/curator/impl_leader_background.go b/metropolis/node/core/curator/impl_leader_background.go
new file mode 100644
index 0000000..46f78ec
--- /dev/null
+++ b/metropolis/node/core/curator/impl_leader_background.go
@@ -0,0 +1,160 @@
+package curator
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"source.monogon.dev/metropolis/node/core/consensus"
+	"source.monogon.dev/osbase/supervisor"
+)
+
+// leaderBackground holds runnables which perform background processing on the
+// curator leader.
+type leaderBackground struct {
+	*leadership
+}
+
+func (l *leaderBackground) background(ctx context.Context) error {
+	if err := supervisor.Run(ctx, "sync-etcd", l.backgroundSyncEtcd); err != nil {
+		return err
+	}
+
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	<-ctx.Done()
+	return ctx.Err()
+}
+
+// backgroundSyncEtcd ensures consistency between the set of nodes with
+// ConsensusMember role, and the set of etcd members.
+//
+// When updating the ConsensusMember role, etcd membership is always added or
+// removed first. Only after etcd accepts the change, the role is updated. If
+// the role update fails, then roles and membership are inconsistent. To resolve
+// the inconsistency, we take etcd membership as the source of truth, and update
+// roles to match membership. That way, partially applied membership changes are
+// made complete.
+//
+// Another way to end up in an inconsistent state is by deleting a node from the
+// cluster which still has etcd membership, either by setting
+// SafetyBypassHasRoles, or if the role has not been synced to match membership
+// yet. In this case, we remove etcd membership.
+func (l *leaderBackground) backgroundSyncEtcd(ctx context.Context) error {
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	for {
+		// Process every 5 seconds.
+		select {
+		case <-time.After(5 * time.Second):
+		case <-ctx.Done():
+			return ctx.Err()
+		}
+		err := l.doSyncEtcd(ctx)
+		if err != nil {
+			return err
+		}
+	}
+}
+
+func (l *leaderBackground) doSyncEtcd(ctx context.Context) error {
+	// Take muNodes to prevent concurrent role updates.
+	l.muNodes.Lock()
+	defer l.muNodes.Unlock()
+
+	// Get etcd members.
+	members, err := l.consensusStatus.ClusterClient().MemberList(ctx)
+	if err != nil {
+		return fmt.Errorf("could not get etcd members: %w", err)
+	}
+	isEtcdMember := make(map[string]bool)
+	// etcdMemberIDs is a map from etcd member ID (uint64) to node ID (string).
+	etcdMemberIDs := make(map[uint64]string)
+	for _, member := range members.Members {
+		nodeID := consensus.GetEtcdMemberNodeId(member)
+		isEtcdMember[nodeID] = true
+		etcdMemberIDs[member.ID] = nodeID
+	}
+
+	// Get cluster nodes.
+	res, err := l.txnAsLeader(ctx, NodeEtcdPrefix.Range())
+	if err != nil {
+		return fmt.Errorf("could not get nodes: %w", err)
+	}
+	isClusterNode := make(map[string]bool)
+	var clusterNodes []*Node
+	for _, kv := range res.Responses[0].GetResponseRange().Kvs {
+		node, err := nodeUnmarshal(kv.Value)
+		if err != nil {
+			return fmt.Errorf("could not unmarshal node %q: %w", kv.Key, err)
+		}
+		isClusterNode[node.ID()] = true
+		clusterNodes = append(clusterNodes, node)
+	}
+
+	// Removing ConsensusMember roles is a potentially dangerous operation. If
+	// something goes wrong and we fail to match an etcd member to the
+	// corresponding cluster node, we could end up removing both the etcd
+	// membership and the role. As a safety measure, refuse to perform any changes
+	// if we do not find the local node ID in both etcd and cluster members.
+	// Additionally, we first remove etcd members before removing roles, which
+	// should prevent breaking consensus even if we fail to match cluster nodes to
+	// corresponding etcd members.
+	if !isEtcdMember[l.leaderID] {
+		return fmt.Errorf("did not find local node ID in etcd members; refusing to do anything")
+	}
+	if !isClusterNode[l.leaderID] {
+		return fmt.Errorf("did not find local node ID in cluster nodes; refusing to do anything")
+	}
+
+	// Remove etcd members that don't exist as nodes.
+	//
+	// Note that etcd membership operations are not guarded by curator leadership,
+	// so we cannot assume that we are still leader. If we do not find a node,
+	// this could be either because it has been deleted, or because it has been
+	// created after we retrieved the list of nodes (assuming node IDs are never
+	// reused). The second case would be bad, but it cannot occur because we get
+	// the list of etcd members before the list of nodes.
+	for memberID, nodeID := range etcdMemberIDs {
+		if !isClusterNode[nodeID] {
+			supervisor.Logger(ctx).Infof("Removing etcd member of non-existent node: %x / %s...", memberID, nodeID)
+			_, err := l.consensusStatus.ClusterClient().MemberRemove(ctx, memberID)
+			if err != nil {
+				return fmt.Errorf("failed to remove etcd member: %w", err)
+			}
+			// Perform at most one change per call to doSyncEtcd.
+			return nil
+		}
+	}
+
+	// Update consensus roles to match etcd membership.
+	for _, node := range clusterNodes {
+		nodeID := node.ID()
+		switch {
+		case isEtcdMember[nodeID] && node.consensusMember == nil:
+			supervisor.Logger(ctx).Infof("Adding ConsensusMember role to node which is etcd member: %s...", nodeID)
+			// The node is already etcd member. We only call AddNode to obtain the
+			// join parameters.
+			join, err := l.consensusStatus.AddNode(ctx, node.pubkey)
+			if err != nil {
+				return fmt.Errorf("failed to obtain consensus join parameters: %w", err)
+			}
+			node.EnableConsensusMember(join)
+		case !isEtcdMember[nodeID] && node.consensusMember != nil:
+			if node.kubernetesController != nil {
+				// The KubernetesController role requires the ConsensusMember role,
+				// so we need to remove it first.
+				supervisor.Logger(ctx).Infof("Removing KubernetesController role from node which is not etcd member: %s...", nodeID)
+				node.DisableKubernetesController()
+			}
+			supervisor.Logger(ctx).Infof("Removing ConsensusMember role from node which is not etcd member: %s...", nodeID)
+			node.DisableConsensusMember()
+		default:
+			continue
+		}
+		if err := nodeSave(ctx, l.leadership, node); err != nil {
+			return fmt.Errorf("could not save node with updated roles: %w", err)
+		}
+		return nil
+	}
+
+	return nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 917ba66..a6ad457 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -10,12 +10,14 @@
 	"encoding/hex"
 	"fmt"
 	"io"
+	"maps"
 	"net"
 	"strings"
 	"testing"
 	"time"
 
 	"github.com/google/go-cmp/cmp"
+	clientv3 "go.etcd.io/etcd/client/v3"
 	"go.etcd.io/etcd/tests/v3/integration"
 	"go.uber.org/zap"
 	"google.golang.org/grpc"
@@ -35,6 +37,7 @@
 	cpb "source.monogon.dev/metropolis/proto/common"
 	"source.monogon.dev/osbase/logtree"
 	"source.monogon.dev/osbase/pki"
+	"source.monogon.dev/osbase/supervisor"
 )
 
 // fakeLeader creates a curatorLeader without any underlying leader election, in
@@ -107,6 +110,7 @@
 	if err != nil {
 		t.Fatalf("could not generate node keypair: %v", err)
 	}
+	nodeID := identity.NodeID(nodePub)
 	cNode := NewNodeForBootstrap(&NewNodeData{
 		CUK:      nil,
 		Pubkey:   nodePub,
@@ -172,7 +176,19 @@
 		NodeCredentials: nodeCredentials,
 	}
 
-	consensusService := consensus.TestServiceHandle(t, cluster.Client(0))
+	// NewClusterV3 assigns generated names to etcd members, which are not
+	// configurable, but we need the name to match the node ID. To make this work,
+	// we patch the MemberList call to replace the cluster name.
+	cli := clientv3.NewCtxClient(ctx)
+	memberNameMap := map[string]string{cluster.Members[0].Name: nodeID}
+	cli.Cluster = etcdClusterWrap{cluster.Client(0).Cluster, memberNameMap}
+	cli.KV = cluster.Client(0).KV
+	cli.Lease = cluster.Client(0).Lease
+	cli.Watcher = cluster.Client(0).Watcher
+	cli.Auth = cluster.Client(0).Auth
+	cli.Maintenance = cluster.Client(0).Maintenance
+
+	consensusService := consensus.TestServiceHandle(t, cli)
 	watcher := consensusService.Watch()
 	defer watcher.Close()
 	consensusStatus, err := watcher.Get(ctx, consensus.FilterRunning)
@@ -185,7 +201,7 @@
 	leadership := &leadership{
 		lockKey:         lockKey,
 		lockRev:         lockRev,
-		leaderID:        identity.NodeID(nodePub),
+		leaderID:        nodeID,
 		etcd:            curEtcd,
 		consensus:       consensusService,
 		consensusStatus: consensusStatus,
@@ -305,6 +321,25 @@
 	etcd client.Namespaced
 }
 
+// etcdClusterWrap replaces the cluster member names which clients see.
+type etcdClusterWrap struct {
+	clientv3.Cluster
+	nameMap map[string]string
+}
+
+func (c etcdClusterWrap) MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) {
+	list, err := c.Cluster.MemberList(ctx)
+	if list == nil {
+		return list, err
+	}
+	for _, member := range list.Members {
+		if name, ok := c.nameMap[member.Name]; ok {
+			member.Name = name
+		}
+	}
+	return list, err
+}
+
 // putNode is a helper function that creates a new node within the cluster. The
 // new node will have its Cluster Unlock Key, Public Key and Join Key set. A
 // non-nil mut can further mutate the node before it's saved.
@@ -1889,3 +1924,121 @@
 		})
 	}
 }
+
+// TestBackgroundSyncEtcd tests that backgroundSyncEtcd behaves as expected.
+func TestBackgroundSyncEtcd(t *testing.T) {
+	// Obtain a supervisor context.
+	ctxChannel := make(chan context.Context)
+	supervisor.TestHarness(t, func(ctx context.Context) error {
+		ctxChannel <- ctx
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		supervisor.Signal(ctx, supervisor.SignalDone)
+		return nil
+	})
+	ctx := <-ctxChannel
+
+	cl := fakeLeader(t)
+	mgmt := apb.NewManagementClient(cl.mgmtConn)
+	background := leaderBackground{leadership: cl.l}
+
+	assertState := func(expected map[string]ConsensusNodeState) {
+		t.Helper()
+		actual := map[string]ConsensusNodeState{}
+		for _, node := range getNodes(t, ctx, mgmt, "") {
+			if _, ok := actual[node.Id]; ok {
+				t.Fatalf("duplicate node ID")
+			}
+			actual[node.Id] = ConsensusNodeState{
+				node:                 true,
+				consensusMember:      node.Roles.ConsensusMember != nil,
+				kubernetesController: node.Roles.KubernetesController != nil,
+			}
+		}
+		members, err := cl.l.consensusStatus.ClusterClient().MemberList(ctx)
+		if err != nil {
+			t.Fatalf("failed to get etcd members: %v", err)
+		}
+		for _, member := range members.Members {
+			nodeID := consensus.GetEtcdMemberNodeId(member)
+			nodeState := actual[nodeID]
+			if nodeState.etcdMember {
+				t.Fatalf("duplicate etcd member")
+			}
+			nodeState.etcdMember = true
+			actual[nodeID] = nodeState
+		}
+
+		if !maps.Equal(expected, actual) {
+			t.Fatalf("Expected node state %v, got %v", expected, actual)
+		}
+	}
+
+	assertState(map[string]ConsensusNodeState{
+		cl.l.leaderID: {node: true, etcdMember: true},
+	})
+
+	// This should add the missing ConsensusMember role.
+	err := background.doSyncEtcd(ctx)
+	if err != nil {
+		t.Fatalf("doSyncEtcd: %v", err)
+	}
+	assertState(map[string]ConsensusNodeState{
+		cl.l.leaderID: {node: true, consensusMember: true, etcdMember: true},
+	})
+
+	// Add a cluster node with ConsensusMember role, and an etcd member with a
+	// different name.
+	newNode := putNode(t, ctx, cl.l, func(n *Node) {
+		join, err := cl.l.consensusStatus.AddNode(ctx, n.pubkey)
+		if err != nil {
+			t.Fatalf("failed to obtain consensus join parameters: %v", err)
+		}
+		n.EnableConsensusMember(join)
+		n.EnableKubernetesController()
+	})
+	if _, err := cl.l.consensusStatus.ClusterClient().MemberAddAsLearner(ctx, []string{"https://foo:1234"}); err != nil {
+		t.Fatalf("could not add new member as learner: %v", err)
+	}
+	assertState(map[string]ConsensusNodeState{
+		cl.l.leaderID: {node: true, consensusMember: true, etcdMember: true},
+		newNode.ID():  {node: true, consensusMember: true, kubernetesController: true},
+		"foo":         {etcdMember: true},
+	})
+
+	// This should remove the extra etcd member, but not change roles yet.
+	err = background.doSyncEtcd(ctx)
+	if err != nil {
+		t.Fatalf("doSyncEtcd: %v", err)
+	}
+	assertState(map[string]ConsensusNodeState{
+		cl.l.leaderID: {node: true, consensusMember: true, etcdMember: true},
+		newNode.ID():  {node: true, consensusMember: true, kubernetesController: true},
+	})
+
+	// This should remove the extra roles.
+	err = background.doSyncEtcd(ctx)
+	if err != nil {
+		t.Fatalf("doSyncEtcd: %v", err)
+	}
+	assertState(map[string]ConsensusNodeState{
+		cl.l.leaderID: {node: true, consensusMember: true, etcdMember: true},
+		newNode.ID():  {node: true},
+	})
+
+	// This should change nothing.
+	err = background.doSyncEtcd(ctx)
+	if err != nil {
+		t.Fatalf("doSyncEtcd: %v", err)
+	}
+	assertState(map[string]ConsensusNodeState{
+		cl.l.leaderID: {node: true, consensusMember: true, etcdMember: true},
+		newNode.ID():  {node: true},
+	})
+}
+
+type ConsensusNodeState struct {
+	node                 bool
+	consensusMember      bool
+	kubernetesController bool
+	etcdMember           bool
+}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index d099583..40d1f19 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -112,6 +112,10 @@
 		cpb.RegisterCuratorLocalServer(srv, leader)
 		apb.RegisterAAAServer(srv, leader)
 		apb.RegisterManagementServer(srv, leader)
+
+		if err := supervisor.Run(ctx, "background", leader.background); err != nil {
+			return fmt.Errorf("could not run leader background processor: %w", err)
+		}
 	case st.follower != nil:
 		supervisor.Logger(ctx).Infof("This curator is a follower (leader is %q), starting minimal implementation.", st.follower.lock.NodeId)