m/n/c/curator: refactor consensus status access
We already obtain the consensus status when starting the curator, and it
seems strange to do it again in one RPC handler.
Change-Id: I3dd9d93b16180011392f8b64c94b0267ec30815f
Reviewed-on: https://review.monogon.dev/c/monogon/+/3383
Tested-by: Jenkins CI
Reviewed-by: Serge Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/curator/curator.go b/metropolis/node/core/curator/curator.go
index 665fea1..237a50c 100644
--- a/metropolis/node/core/curator/curator.go
+++ b/metropolis/node/core/curator/curator.go
@@ -348,11 +348,11 @@
// the Curator API to consumers, dispatching to either a locally running leader,
// or forwarding to a remotely running leader.
lis := listener{
- node: s.config.NodeCredentials,
- etcd: etcd,
- etcdCluster: st.ClusterClient(),
- consensus: s.config.Consensus,
- status: &s.status,
+ node: s.config.NodeCredentials,
+ etcd: etcd,
+ consensusStatus: st,
+ consensus: s.config.Consensus,
+ status: &s.status,
}
if err := supervisor.Run(ctx, "listener", lis.run); err != nil {
return fmt.Errorf("when starting listener: %w", err)
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index ad557fc..fe60550 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -50,8 +50,7 @@
leaderID string
// etcd is the etcd client in which curator data and leader election state is
// stored.
- etcd client.Namespaced
- etcdCluster clientv3.Cluster
+ etcd client.Namespaced
// muNodes guards any changes to nodes, and prevents race conditions where the
// curator performs a read-modify-write operation to node data. The curator's
@@ -62,7 +61,8 @@
// additionally guarded using etcd transactions.
muNodes sync.Mutex
- consensus consensus.ServiceHandle
+ consensusStatus *consensus.Status
+ consensus consensus.ServiceHandle
// muRegisterTicket guards changes to the register ticket. Its usage semantics
// are the same as for muNodes, as described above.
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 126ba49..cd6fed2 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -609,7 +609,7 @@
func (l *leaderCurator) GetConsensusStatus(ctx context.Context, _ *ipb.GetConsensusStatusRequest) (*ipb.GetConsensusStatusResponse, error) {
var res ipb.GetConsensusStatusResponse
- members, err := l.etcdCluster.MemberList(ctx)
+ members, err := l.consensusStatus.ClusterClient().MemberList(ctx)
if err != nil {
rpc.Trace(ctx).Printf("Could not get etcd members: %v", err)
return nil, status.Errorf(codes.Internal, "could not get etcd members")
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index c0bab97..6315c14 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -13,7 +13,6 @@
dpb "google.golang.org/protobuf/types/known/durationpb"
common "source.monogon.dev/metropolis/node"
- "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
@@ -344,15 +343,7 @@
if req.ConsensusMember != nil {
if *req.ConsensusMember {
// Add a new etcd learner node.
- w := l.consensus.Watch()
- defer w.Close()
-
- st, err := w.Get(ctx, consensus.FilterRunning)
- if err != nil {
- return nil, status.Errorf(codes.Unavailable, "could not get running consensus: %v", err)
- }
-
- join, err := st.AddNode(ctx, node.pubkey)
+ join, err := l.consensusStatus.AddNode(ctx, node.pubkey)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not add node: %v", err)
}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index a5d1ea1..917ba66 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -172,14 +172,23 @@
NodeCredentials: nodeCredentials,
}
+ consensusService := consensus.TestServiceHandle(t, cluster.Client(0))
+ watcher := consensusService.Watch()
+ defer watcher.Close()
+ consensusStatus, err := watcher.Get(ctx, consensus.FilterRunning)
+ if err != nil {
+ t.Fatalf("could not get consensus status: %v", err)
+ }
+
// Build a curator leader object. This implements methods that will be
// exercised by tests.
leadership := &leadership{
- lockKey: lockKey,
- lockRev: lockRev,
- leaderID: identity.NodeID(nodePub),
- etcd: curEtcd,
- consensus: consensus.TestServiceHandle(t, cluster.Client(0)),
+ lockKey: lockKey,
+ lockRev: lockRev,
+ leaderID: identity.NodeID(nodePub),
+ etcd: curEtcd,
+ consensus: consensusService,
+ consensusStatus: consensusStatus,
}
leader := newCuratorLeader(leadership, &nodeCredentials.Node)
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index ede28bb..d099583 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -6,7 +6,6 @@
"net"
"time"
- clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
@@ -42,8 +41,8 @@
node *identity.NodeCredentials
// etcd is a client to the locally running consensus (etcd) server which is used
// both for storing lock/leader election status and actual Curator data.
- etcd client.Namespaced
- etcdCluster clientv3.Cluster
+ etcd client.Namespaced
+ consensusStatus *consensus.Status
consensus consensus.ServiceHandle
status *memory.Value[*electionStatus]
@@ -101,12 +100,12 @@
// Create a leader instance and serve it over gRPC.
leader := newCuratorLeader(&leadership{
- lockKey: st.leader.lockKey,
- lockRev: st.leader.lockRev,
- leaderID: l.node.ID(),
- etcd: l.etcd,
- etcdCluster: l.etcdCluster,
- consensus: l.consensus,
+ lockKey: st.leader.lockKey,
+ lockRev: st.leader.lockRev,
+ leaderID: l.node.ID(),
+ etcd: l.etcd,
+ consensusStatus: l.consensusStatus,
+ consensus: l.consensus,
}, &l.node.Node)
cpb.RegisterCuratorServer(srv, leader)