metropolis/curator: report leader based on ledership election status

This makes GetCurrentLeader calls respond to cluster/leader changes
instead of relying on connections getting closed on time when a leader
changes, or when a node gets elected as a leader.

This leads to less churn when cluster leadership status changes (no need
for every RPC client to re-establish connectivity to get the new
leader). It should also be faster to respond to cluster leader changes,
as it doesn't rely on the the RPC client detecting that the node it
connected to has stopped responding / disconnected.

Change-Id: I9de12286530226b3832d2ae07cb7d943ca537d3f
Reviewed-on: https://review.monogon.dev/c/monogon/+/2069
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/curator/curator.go b/metropolis/node/core/curator/curator.go
index 758da27..bfaa90d 100644
--- a/metropolis/node/core/curator/curator.go
+++ b/metropolis/node/core/curator/curator.go
@@ -325,10 +325,10 @@
 	// the Curator API to consumers, dispatching to either a locally running leader,
 	// or forwarding to a remotely running leader.
 	lis := listener{
-		node:          s.config.NodeCredentials,
-		electionWatch: s.status.Watch,
-		consensus:     s.config.Consensus,
-		etcd:          etcd,
+		node:      s.config.NodeCredentials,
+		etcd:      etcd,
+		consensus: s.config.Consensus,
+		status:    &s.status,
 	}
 	if err := supervisor.Run(ctx, "listener", lis.run); err != nil {
 		return fmt.Errorf("when starting listener: %w", err)
diff --git a/metropolis/node/core/curator/impl_follower.go b/metropolis/node/core/curator/impl_follower.go
index c9cc06c..9d1c2be 100644
--- a/metropolis/node/core/curator/impl_follower.go
+++ b/metropolis/node/core/curator/impl_follower.go
@@ -7,61 +7,70 @@
 	common "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/consensus/client"
 	cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
 	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/pkg/event/memory"
 )
 
 type curatorFollower struct {
-	lock       *ppb.LeaderElectionValue
 	etcd       client.Namespaced
 	followerID string
+
+	status *memory.Value[*electionStatus]
 }
 
 func (f *curatorFollower) GetCurrentLeader(_ *cpb.GetCurrentLeaderRequest, srv cpb.CuratorLocal_GetCurrentLeaderServer) error {
 	ctx := srv.Context()
-	if f.lock == nil {
-		return status.Errorf(codes.Unavailable, "could not determine current leader")
-	}
-	nodeId := f.lock.NodeId
 
-	// Manually load node status data from etcd, even though we are not a leader.
-	// This is fine, as if we ever end up serving stale data, the client will
-	// realize and call us again.
-	key, err := nodeEtcdPrefix.Key(nodeId)
-	if err != nil {
-		rpc.Trace(ctx).Printf("invalid leader node id %q: %v", nodeId, err)
-		return status.Errorf(codes.Internal, "current leader has invalid node id")
-	}
-	res, err := f.etcd.Get(ctx, key)
-	if err != nil {
-		rpc.Trace(ctx).Printf("Get(%q) failed: %v", key, err)
-		return status.Errorf(codes.Unavailable, "could not retrieve leader node from etcd")
-	}
-	if len(res.Kvs) != 1 {
-		rpc.Trace(ctx).Printf("Get(%q) returned %d nodes", key, len(res.Kvs))
-		return status.Errorf(codes.Internal, "current leader not found in etcd")
-	}
-	node, err := nodeUnmarshal(res.Kvs[0].Value)
-	if err != nil {
-		rpc.Trace(ctx).Printf("could not unmarshal leader node %s: %v", nodeId, err)
-		return status.Errorf(codes.Unavailable, "could not unmarshal leader node")
-	}
-	if node.status == nil || node.status.ExternalAddress == "" {
-		rpc.Trace(ctx).Printf("leader node %s has no address in status", nodeId)
-		return status.Errorf(codes.Unavailable, "current leader has no reported address")
-	}
+	w := f.status.Watch()
+	defer w.Close()
 
-	err = srv.Send(&cpb.GetCurrentLeaderResponse{
-		LeaderNodeId: nodeId,
-		LeaderHost:   node.status.ExternalAddress,
-		LeaderPort:   int32(common.CuratorServicePort),
-		ThisNodeId:   f.followerID,
-	})
-	if err != nil {
-		return err
-	}
+	for {
+		st, err := w.Get(srv.Context())
+		if err != nil {
+			return err
+		}
 
-	<-ctx.Done()
-	rpc.Trace(ctx).Printf("Interrupting due to context cancellation")
-	return nil
+		if st.leader != nil {
+			return status.Errorf(codes.Unavailable, "node became leader, try again")
+		}
+
+		lock := st.follower.lock
+		// Manually load node status data from etcd, even though we are not a leader.
+		// This is fine, as if we ever end up serving stale data, the client will
+		// realize and call us again.
+		key, err := nodeEtcdPrefix.Key(lock.NodeId)
+		if err != nil {
+			rpc.Trace(ctx).Printf("invalid leader node id %q: %v", lock.NodeId, err)
+			return status.Errorf(codes.Internal, "current leader has invalid node id")
+		}
+		res, err := f.etcd.Get(ctx, key)
+		if err != nil {
+			rpc.Trace(ctx).Printf("could not get current leader's data: %v", err)
+			return status.Errorf(codes.Internal, "could not get current leader's data")
+		}
+		if len(res.Kvs) < 1 {
+			rpc.Trace(ctx).Printf("could not get current leader's data: 0 kvs")
+			return status.Errorf(codes.Internal, "could not get current leader's data")
+		}
+		node, err := nodeUnmarshal(res.Kvs[0].Value)
+		if err != nil {
+			rpc.Trace(ctx).Printf("could not unmarshal leader node %s: %v", lock.NodeId, err)
+			return status.Errorf(codes.Unavailable, "could not unmarshal leader node")
+		}
+		if node.status == nil || node.status.ExternalAddress == "" {
+			rpc.Trace(ctx).Printf("leader node %s has no address in status", lock.NodeId)
+			return status.Errorf(codes.Unavailable, "current leader has no reported address")
+		}
+
+		rpc.Trace(ctx).Printf("Sending leader: %s at %s", lock.NodeId, node.status.ExternalAddress)
+		err = srv.Send(&cpb.GetCurrentLeaderResponse{
+			LeaderNodeId: lock.NodeId,
+			LeaderHost:   node.status.ExternalAddress,
+			LeaderPort:   int32(common.CuratorServicePort),
+			ThisNodeId:   f.followerID,
+		})
+		if err != nil {
+			return err
+		}
+	}
 }
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 2290ab9..77df684 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -15,7 +15,7 @@
 	cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
-	"source.monogon.dev/metropolis/pkg/event"
+	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	apb "source.monogon.dev/metropolis/proto/api"
 )
@@ -42,21 +42,20 @@
 	// etcd is a client to the locally running consensus (etcd) server which is used
 	// both for storing lock/leader election status and actual Curator data.
 	etcd client.Namespaced
-	// electionWatch is a function that returns an active electionWatcher for the
-	// listener to use when determining local leadership. As the listener may
-	// restart on error, this factory-function is used instead of an electionWatcher
-	// directly.
-	electionWatch func() event.Watcher[*electionStatus]
 
 	consensus consensus.ServiceHandle
+	status    *memory.Value[*electionStatus]
 }
 
 // run is the listener runnable. It listens on the Curator's gRPC socket, either
 // by starting a leader or follower instance.
 func (l *listener) run(ctx context.Context) error {
+
 	// First, figure out what we're ought to be running by watching the election and
 	// waiting for a result.
-	w := l.electionWatch()
+	w := l.status.Watch()
+	defer w.Close()
+
 	supervisor.Logger(ctx).Infof("Waiting for election status...")
 	st, err := w.Get(ctx)
 	if err != nil {
@@ -116,9 +115,9 @@
 
 		// Create a follower instance and serve it over gRPC.
 		follower := &curatorFollower{
-			lock:       st.follower.lock,
 			etcd:       l.etcd,
 			followerID: l.node.ID(),
+			status:     l.status,
 		}
 		cpb.RegisterCuratorLocalServer(srv, follower)
 	}
@@ -160,10 +159,6 @@
 			if nst.follower == nil {
 				return fmt.Errorf("this curator stopped being a follower, quitting")
 			}
-			if nst.follower.lock.NodeId != st.follower.lock.NodeId {
-				// TODO(q3k): don't restart then, just update the server's lock
-				return fmt.Errorf("leader changed from %q to %q, quitting", st.follower.lock.NodeId, nst.follower.lock.NodeId)
-			}
 		}
 	default:
 		panic("unreachable")