m/n/c/curator: add CuratorLocal.GetCurrentLeader

This adds a service (CuratorLocal) which runs on both leader and
follower curators. It has one RPC, GetCurrentLeader, which returns
information about the leader election status from the point of view of
the callee.

We add a test to make sure the current leader returns correct data, but
we don't yet have a test for a follower (that would require a
significant test harness). In an upcoming CL we'll be exercising this in
an end-to-end test, however.

Change-Id: I4dea780953bdc196bbc5a744f49ee688327c3269
Reviewed-on: https://review.monogon.dev/c/monogon/+/784
Tested-by: Jenkins CI
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 8d150b4..b8ff16a 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -56,12 +56,14 @@
     ],
     embed = [":curator"],
     deps = [
+        "//metropolis/node",
         "//metropolis/node/core/consensus",
         "//metropolis/node/core/consensus/client",
         "//metropolis/node/core/curator/proto/api",
         "//metropolis/node/core/curator/proto/private",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/rpc",
+        "//metropolis/pkg/logtree",
         "//metropolis/pkg/pki",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/api",
diff --git a/metropolis/node/core/curator/impl_follower.go b/metropolis/node/core/curator/impl_follower.go
index c9a50d6..c9cc06c 100644
--- a/metropolis/node/core/curator/impl_follower.go
+++ b/metropolis/node/core/curator/impl_follower.go
@@ -1,12 +1,67 @@
 package curator
 
 import (
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+
+	common "source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/consensus/client"
 	cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-	apb "source.monogon.dev/metropolis/proto/api"
+	ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+	"source.monogon.dev/metropolis/node/core/rpc"
 )
 
 type curatorFollower struct {
-	apb.UnimplementedAAAServer
-	apb.UnimplementedManagementServer
-	cpb.UnimplementedCuratorServer
+	lock       *ppb.LeaderElectionValue
+	etcd       client.Namespaced
+	followerID string
+}
+
+func (f *curatorFollower) GetCurrentLeader(_ *cpb.GetCurrentLeaderRequest, srv cpb.CuratorLocal_GetCurrentLeaderServer) error {
+	ctx := srv.Context()
+	if f.lock == nil {
+		return status.Errorf(codes.Unavailable, "could not determine current leader")
+	}
+	nodeId := f.lock.NodeId
+
+	// Manually load node status data from etcd, even though we are not a leader.
+	// This is fine, as if we ever end up serving stale data, the client will
+	// realize and call us again.
+	key, err := nodeEtcdPrefix.Key(nodeId)
+	if err != nil {
+		rpc.Trace(ctx).Printf("invalid leader node id %q: %v", nodeId, err)
+		return status.Errorf(codes.Internal, "current leader has invalid node id")
+	}
+	res, err := f.etcd.Get(ctx, key)
+	if err != nil {
+		rpc.Trace(ctx).Printf("Get(%q) failed: %v", key, err)
+		return status.Errorf(codes.Unavailable, "could not retrieve leader node from etcd")
+	}
+	if len(res.Kvs) != 1 {
+		rpc.Trace(ctx).Printf("Get(%q) returned %d nodes", key, len(res.Kvs))
+		return status.Errorf(codes.Internal, "current leader not found in etcd")
+	}
+	node, err := nodeUnmarshal(res.Kvs[0].Value)
+	if err != nil {
+		rpc.Trace(ctx).Printf("could not unmarshal leader node %s: %v", nodeId, err)
+		return status.Errorf(codes.Unavailable, "could not unmarshal leader node")
+	}
+	if node.status == nil || node.status.ExternalAddress == "" {
+		rpc.Trace(ctx).Printf("leader node %s has no address in status", nodeId)
+		return status.Errorf(codes.Unavailable, "current leader has no reported address")
+	}
+
+	err = srv.Send(&cpb.GetCurrentLeaderResponse{
+		LeaderNodeId: nodeId,
+		LeaderHost:   node.status.ExternalAddress,
+		LeaderPort:   int32(common.CuratorServicePort),
+		ThisNodeId:   f.followerID,
+	})
+	if err != nil {
+		return err
+	}
+
+	<-ctx.Done()
+	rpc.Trace(ctx).Printf("Interrupting due to context cancellation")
+	return nil
 }
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index 095ee89..ffb927b 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -40,6 +40,9 @@
 	// in combination with lockKey to ensure all mutations/reads performed to etcd
 	// succeed only if this leader election is still current.
 	lockRev int64
+	// leaderID is the node ID of this curator's node, ie. the one acting as a
+	// curator leader.
+	leaderID string
 	// etcd is the etcd client in which curator data and leader election state is
 	// stored.
 	etcd client.Namespaced
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 296e856..792c951 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -13,6 +13,7 @@
 	"google.golang.org/grpc/status"
 	"google.golang.org/protobuf/proto"
 
+	common "source.monogon.dev/metropolis/node"
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
@@ -489,3 +490,32 @@
 		ClusterUnlockKey: node.clusterUnlockKey,
 	}, nil
 }
+
+func (l *leaderCurator) GetCurrentLeader(_ *ipb.GetCurrentLeaderRequest, srv ipb.CuratorLocal_GetCurrentLeaderServer) error {
+	ctx := srv.Context()
+
+	// We're the leader.
+	node, err := nodeLoad(ctx, l.leadership, l.leaderID)
+	if err != nil {
+		rpc.Trace(ctx).Printf("nodeLoad(%q) failed: %v", l.leaderID, err)
+		return status.Errorf(codes.Unavailable, "failed to load leader node")
+	}
+	host := ""
+	if node.status != nil && node.status.ExternalAddress != "" {
+		host = node.status.ExternalAddress
+	}
+
+	err = srv.Send(&ipb.GetCurrentLeaderResponse{
+		LeaderNodeId: l.leaderID,
+		LeaderHost:   host,
+		LeaderPort:   int32(common.CuratorServicePort),
+		ThisNodeId:   l.leaderID,
+	})
+	if err != nil {
+		return err
+	}
+
+	<-ctx.Done()
+	rpc.Trace(ctx).Printf("Interrupting due to context cancellation")
+	return nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 15c05dc..ce88ec6 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -18,12 +18,14 @@
 	"google.golang.org/grpc/test/bufconn"
 	"google.golang.org/protobuf/proto"
 
+	common "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/consensus"
 	"source.monogon.dev/metropolis/node/core/consensus/client"
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/pki"
 	apb "source.monogon.dev/metropolis/proto/api"
 	cpb "source.monogon.dev/metropolis/proto/common"
@@ -83,6 +85,10 @@
 		t.Fatalf("could not generate node keypair: %v", err)
 	}
 	cNode := NewNodeForBootstrap(nil, nodePub, nodeJoinPub)
+
+	// Here we would enable the leader node's roles. But for tests, we don't enable
+	// any.
+
 	caCertBytes, nodeCertBytes, err := BootstrapNodeFinish(ctx, curEtcd, &cNode, nil)
 	if err != nil {
 		t.Fatalf("could not finish node bootstrap: %v", err)
@@ -132,15 +138,20 @@
 	leadership := &leadership{
 		lockKey:   lockKey,
 		lockRev:   lockRev,
+		leaderID:  identity.NodeID(nodePub),
 		etcd:      curEtcd,
 		consensus: consensus.TestServiceHandle(t, cluster.Client(0)),
 	}
 	leader := newCuratorLeader(leadership, &nodeCredentials.Node)
 
+	lt := logtree.New()
+	logtree.PipeAllToStderr(t, lt)
+
 	// Create a curator gRPC server which performs authentication as per the created
 	// ServerSecurity and is backed by the created leader.
-	srv := grpc.NewServer(sec.GRPCOptions(nil)...)
+	srv := grpc.NewServer(sec.GRPCOptions(lt.MustLeveledFor("leader"))...)
 	ipb.RegisterCuratorServer(srv, leader)
+	ipb.RegisterCuratorLocalServer(srv, leader)
 	apb.RegisterAAAServer(srv, leader)
 	apb.RegisterManagementServer(srv, leader)
 	// The gRPC server will listen on an internal 'loopback' buffer.
@@ -1075,3 +1086,30 @@
 		}
 	}
 }
+
+// TestGetCurrentLeader ensures that a leader responds with its own information
+// when asked for information about the current leader.
+func TestGetCurrentLeader(t *testing.T) {
+	cl := fakeLeader(t)
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	curl := ipb.NewCuratorLocalClient(cl.localNodeConn)
+	srv, err := curl.GetCurrentLeader(ctx, &ipb.GetCurrentLeaderRequest{})
+	if err != nil {
+		t.Fatalf("GetCurrentLeader: %v", err)
+	}
+	res, err := srv.Recv()
+	if err != nil {
+		t.Fatalf("GetCurrentLeader.Recv: %v", err)
+	}
+	if want, got := cl.localNodeID, res.LeaderNodeId; want != got {
+		t.Errorf("Wanted leader node ID %q, got %q", want, got)
+	}
+	if want, got := cl.localNodeID, res.ThisNodeId; want != got {
+		t.Errorf("Wanted local node ID %q, got %q", want, got)
+	}
+	if want, got := int32(common.CuratorServicePort), res.LeaderPort; want != got {
+		t.Errorf("Wanted leader port %d, got %d", want, got)
+	}
+}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 8718efd..4bd0d96 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -59,8 +59,11 @@
 	impl rpc.ClusterServices
 }
 
-// run is the listener runnable. It listens on gRPC sockets and serves RPCs.
+// run is the listener runnable. It listens on the Curator's gRPC socket, either
+// by starting a leader or follower instance.
 func (l *listener) run(ctx context.Context) error {
+	// First, figure out what we're ought to be running by watching the election and
+	// waiting for a result.
 	w := l.electionWatch()
 	supervisor.Logger(ctx).Infof("Waiting for election status...")
 	st, err := w.get(ctx)
@@ -68,72 +71,104 @@
 		return fmt.Errorf("could not get election status: %w", err)
 	}
 
+	// Short circuit a possible situation in which we're a follower of an unknown
+	// leader, or neither a follower nor a leader.
+	if (st.leader == nil && st.follower == nil) || (st.follower != nil && st.follower.lock == nil) {
+		return fmt.Errorf("curator is neither leader nor follower - this is likely transient, restarting listener now")
+	}
+
+	if st.leader != nil && st.follower != nil {
+		// This indicates a serious programming error. Let's catch it explicitly.
+		panic("curator listener is supposed to run both as leader and follower")
+	}
+
 	sec := rpc.ServerSecurity{
 		NodeCredentials: l.node,
 	}
 
+	// Prepare a gRPC server and listener.
+	logger := supervisor.MustSubLogger(ctx, "rpc")
+	srv := grpc.NewServer(sec.GRPCOptions(logger)...)
+	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", node.CuratorServicePort))
+	if err != nil {
+		return fmt.Errorf("failed to listen on curator socket: %w", err)
+	}
+	defer lis.Close()
+
+	// Depending on the election status, register either a leader or a follower to
+	// the gRPC server.
 	switch {
 	case st.leader != nil:
-		supervisor.Logger(ctx).Infof("This curator is a leader, starting listener.")
-		lis, err := net.Listen("tcp", fmt.Sprintf(":%d", node.CuratorServicePort))
-		if err != nil {
-			return fmt.Errorf("failed to listen on curator socket: %w", err)
-		}
-		defer lis.Close()
+		supervisor.Logger(ctx).Infof("This curator is a leader.")
 
+		// Create a leader instance and serve it over gRPC.
 		leader := newCuratorLeader(&leadership{
 			lockKey:   st.leader.lockKey,
 			lockRev:   st.leader.lockRev,
+			leaderID:  l.node.ID(),
 			etcd:      l.etcd,
 			consensus: l.consensus,
 		}, &l.node.Node)
-		logger := supervisor.MustSubLogger(ctx, "rpc")
-		srv := grpc.NewServer(sec.GRPCOptions(logger)...)
+
 		cpb.RegisterCuratorServer(srv, leader)
+		cpb.RegisterCuratorLocalServer(srv, leader)
 		apb.RegisterAAAServer(srv, leader)
 		apb.RegisterManagementServer(srv, leader)
-		runnable := supervisor.GRPCServer(srv, lis, true)
+	case st.follower != nil:
+		supervisor.Logger(ctx).Infof("This curator is a follower (leader is %q), starting minimal implementation.", st.follower.lock.NodeId)
 
-		if err := supervisor.Run(ctx, "server", runnable); err != nil {
-			return fmt.Errorf("could not run server: %w", err)
+		// Create a follower instance and serve it over gRPC.
+		follower := &curatorFollower{
+			lock:       st.follower.lock,
+			etcd:       l.etcd,
+			followerID: l.node.ID(),
 		}
-		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		cpb.RegisterCuratorLocalServer(srv, follower)
+	}
+
+	// Start running the server as a runnable, stopping whenever this runnable exits
+	// (on leadership change) or crashes. It's set to not be terminated gracefully
+	// because:
+	//  1. Followers should notify (by closing) clients about a leadership change as
+	//     early as possible,
+	//  2. Any long-running leadership calls will start failing anyway as the
+	//     leadership has been lost.
+	runnable := supervisor.GRPCServer(srv, lis, false)
+	if err := supervisor.Run(ctx, "server", runnable); err != nil {
+		return fmt.Errorf("could not run server: %w", err)
+	}
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+	// Act upon any leadership changes. This depends on whether we were running a
+	// leader or a follower.
+	switch {
+	case st.leader != nil:
+		supervisor.Logger(ctx).Infof("Leader running until leadership lost.")
 		for {
-			st, err := w.get(ctx)
+			nst, err := w.get(ctx)
 			if err != nil {
 				return fmt.Errorf("getting election status after starting listener failed, bailing just in case: %w", err)
 			}
-			if st.leader == nil {
+			if nst.leader == nil {
 				return fmt.Errorf("this curator stopped being a leader, quitting")
 			}
 		}
-	case st.follower != nil && st.follower.lock != nil:
-		supervisor.Logger(ctx).Infof("This curator is a follower (leader is %q), starting minimal implementation.", st.follower.lock.NodeId)
-		lis, err := net.Listen("tcp", fmt.Sprintf(":%d", node.CuratorServicePort))
-		if err != nil {
-			return fmt.Errorf("failed to listen on curator socket: %w", err)
-		}
-		defer lis.Close()
-
-		logger := supervisor.MustSubLogger(ctx, "rpc")
-		srv := grpc.NewServer(sec.GRPCOptions(logger)...)
-		// Note: curatorFollower not created nor registered. All RPCs will respond with
-		// 'Unimplemented'.
-		runnable := supervisor.GRPCServer(srv, lis, true)
-		if err := supervisor.Run(ctx, "server", runnable); err != nil {
-			return fmt.Errorf("could not run server: %w", err)
-		}
-		supervisor.Signal(ctx, supervisor.SignalHealthy)
+	case st.follower != nil:
+		supervisor.Logger(ctx).Infof("Follower running until leadership change.")
 		for {
-			st, err := w.get(ctx)
+			nst, err := w.get(ctx)
 			if err != nil {
 				return fmt.Errorf("getting election status after starting listener failed, bailing just in case: %w", err)
 			}
-			if st.follower == nil {
+			if nst.follower == nil {
 				return fmt.Errorf("this curator stopped being a follower, quitting")
 			}
+			if nst.follower.lock.NodeId != st.follower.lock.NodeId {
+				// TODO(q3k): don't restart then, just update the server's lock
+				return fmt.Errorf("leader changed from %q to %q, quitting", st.follower.lock.NodeId, nst.follower.lock.NodeId)
+			}
 		}
 	default:
-		return fmt.Errorf("curator is neither leader nor follower - this is likely transient, restarting listener now")
+		panic("unreachable")
 	}
 }
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index 01998f9..8791ad8 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -11,9 +11,9 @@
 // and internal services to get cluster state and and get/mutate cluster
 // configuration.
 // It is currently implemented as a leader-elected service running on all nodes
-// that run a consensus server (etcd). Every instance either serves traffic
-// directly (if it is the leader) or passes all RPCs over to the current
-// leader.
+// that run a consensus server (etcd). Only the elected leader will serve this
+// service.
+//
 // The curator listens on gRPC on all network interfaces at a well known port,
 // with access encrypted and authenticated by TLS using certificates issued by
 // the Cluster CA.
@@ -21,10 +21,6 @@
 // The curator is a privileged service, and performs per-RPC authorization based
 // on the identity of the client, which is determined by the client certificate
 // supplied over TLS.
-//
-// TODO(q3k): implement and document public Cluster gRPC.
-// TODO(q3k): implement and document cluster auth for nodes and escrowed user
-// keys.
 service Curator {
     // Watch returns a stream of updates concerning some part of the cluster
     // managed by the curator, and is the main way in which node code responds
@@ -256,3 +252,47 @@
     // with Join Credentials.
     bytes cluster_unlock_key = 1;
 }
+
+// CuratorLocal is served by both the Curator leader and followers, and returns
+// data pertinent to the local node or the leader election status of the
+// Curator. Most importantly, it can be used to retrieve the current Curator
+// leader.
+service CuratorLocal {
+    // GetCurrentLeader returns the leader known to the contacted curator.
+    // An error will be returned if no leader can be established.
+    //
+    // This is a streaming call so that clients can wait on any changes, instead
+    // of polling repeatedly. The server will either reply with new leader
+    // information (if available) or close the stream (if not) as early as it's
+    // aware of a leadership change.
+    rpc GetCurrentLeader(GetCurrentLeaderRequest) returns (stream GetCurrentLeaderResponse) {
+        option (metropolis.proto.ext.authorization) = {
+            // This call pretty much needs to be public, as it's used in early
+            // connections to figure out what curator to connect to. This might
+            // be a node which hasn't yet joined a cluster (thereby not having
+            // cluster credentials), or it might be a user which hasn't yet
+            // authenticated fully into the cluster.
+            allow_unauthenticated: true
+        };
+    }
+}
+
+message GetCurrentLeaderRequest {
+}
+
+message GetCurrentLeaderResponse {
+    // leader_node_id is the leader's (as seen by the responding node) Node ID.
+    string leader_node_id = 1;
+    // leader_host is the host/IP address at which the leader node's curator
+    // is listening.
+    //
+    // This can be zero/empty if the leader has not yet reported its external
+    // address to the cluster.
+    //
+    // TODO(q3k): guarantee this being always non-zero
+    string leader_host = 2;
+    // leader_port is the port at which the leader node's curator is listening.
+    int32 leader_port = 3;
+    // this_node_id is the Node ID of the node which sent this response.
+    string this_node_id = 4;
+}