curator: remove dispatch system

This vastly simplifies the curator by removing the dispatch and per-RPC
switch logic, instead replacing it with the gRPC server stopping
whenever the leadership status of a curator switches.

The downside is technically the possibility of some 'stale' RPC
handling, where a leader/follower accepts some new RPC even though it's
in the process of switching its leadership.

The rationale for this change is:

   1. Leadership-exclusive actions are guarded by the etcd leadership
      lock being held, so there's no chance a long pending RPC to a
      leader that just stepped down will cause split brain scenarios.

   2. We're moving away from follower proxying, and followers will
      instead just serve a 'who's the leader' RPC. These are okay to
      serve stale data (ie. when the contacted follower should've
      switched to be a leader, or a follower of another leader), as
      during leadership failover we expect clients to perform retry
      loops until a new leadership connection is established.

Another downside (or perhaps upside) is that we don't start the listener
until we're ready to serve data, either the full API as a leader or a
reduced API as a follower. The downside is that clients will have to
retry connections until the leader is running, and that it might be
difficult to tell apart a node which isn't yet running the curator from
a broken node, or one that will not run the curator at all. On the other
hand, succesfully establishing a connections means that we are sure to
get a gRPC response instead of a hang because the curator isn't yet ready
to serve.

Change-Id: I2ec35f00bce72f0f337e8e25e8c71f5265a7d8bb
Reviewed-on: https://review.monogon.dev/c/monogon/+/685
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 16bfbfe..681b637 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -26,7 +26,6 @@
         "//metropolis/node/core/curator/proto/private",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/rpc",
-        "//metropolis/pkg/combinectx",
         "//metropolis/pkg/event",
         "//metropolis/pkg/event/etcd",
         "//metropolis/pkg/event/memory",
@@ -36,7 +35,6 @@
         "//metropolis/proto/common",
         "@io_etcd_go_etcd_client_v3//:client",
         "@io_etcd_go_etcd_client_v3//concurrency",
-        "@org_golang_google_grpc//:go_default_library",
         "@org_golang_google_grpc//codes",
         "@org_golang_google_grpc//status",
         "@org_golang_google_protobuf//proto",
@@ -48,7 +46,6 @@
     srcs = [
         "curator_test.go",
         "impl_leader_test.go",
-        "listener_test.go",
         "state_test.go",
     ],
     embed = [":curator"],
@@ -59,7 +56,6 @@
         "//metropolis/node/core/curator/proto/private",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/rpc",
-        "//metropolis/pkg/event/memory",
         "//metropolis/pkg/pki",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/api",
@@ -67,8 +63,6 @@
         "@io_etcd_go_etcd_client_v3//:client",
         "@io_etcd_go_etcd_tests_v3//integration",
         "@org_golang_google_grpc//:go_default_library",
-        "@org_golang_google_grpc//codes",
-        "@org_golang_google_grpc//status",
         "@org_golang_google_grpc//test/bufconn",
         "@org_golang_google_protobuf//proto",
     ],
diff --git a/metropolis/node/core/curator/curator.go b/metropolis/node/core/curator/curator.go
index d6ec579..9ad88a8 100644
--- a/metropolis/node/core/curator/curator.go
+++ b/metropolis/node/core/curator/curator.go
@@ -280,7 +280,6 @@
 	lis := listener{
 		node:          s.config.NodeCredentials,
 		electionWatch: s.electionWatch,
-		dispatchC:     make(chan dispatchRequest),
 		consensus:     s.config.Consensus,
 		etcd:          etcd,
 	}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 9715c91..5af644b 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -2,23 +2,15 @@
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"net"
 
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
-
 	"source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/consensus"
 	"source.monogon.dev/metropolis/node/core/consensus/client"
-	cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
-	"source.monogon.dev/metropolis/pkg/combinectx"
 	"source.monogon.dev/metropolis/pkg/supervisor"
-	apb "source.monogon.dev/metropolis/proto/api"
 )
 
 // listener is the curator runnable responsible for listening for gRPC
@@ -50,118 +42,6 @@
 	electionWatch func() electionWatcher
 
 	consensus consensus.ServiceHandle
-
-	dispatchC chan dispatchRequest
-}
-
-// dispatcher is the 'listener dispatcher', the listener's runnable responsible
-// for keeping track of the currently selected curator implementation and
-// switching over when necessary.
-//
-// It listens for 'dispatch' requests from the listener's RPC handlers and
-// returns a Curator implementation that should be used to handle this request,
-// alongside a context expressing the lifespan of this implementation.
-func (l *listener) dispatcher(ctx context.Context) error {
-	supervisor.Logger(ctx).Info("Dispatcher starting...")
-
-	// Start with an empty 'active target'. This will be populated before the
-	// first dispatch request is served.
-	t := activeTarget{}
-	w := l.electionWatch()
-
-	supervisor.Signal(ctx, supervisor.SignalHealthy)
-
-	// Channel containing electionStatus updates from value.
-	c := make(chan *electionStatus)
-	defer close(c)
-
-	go func() {
-		// Wait for initial status.
-		s, ok := <-c
-		if !ok {
-			return
-		}
-		t.switchTo(ctx, l, s)
-
-		// Respond to requests and status updates.
-		for {
-			select {
-			case <-ctx.Done():
-				return
-			case r := <-l.dispatchC:
-				// Handle request.
-				r.resC <- listenerTarget{
-					ctx:  *t.ctx,
-					impl: t.impl,
-				}
-			case s, ok := <-c:
-				// Handle status update, or quit on  status update error.
-				if !ok {
-					return
-				}
-				t.switchTo(ctx, l, s)
-			}
-		}
-	}()
-
-	// Convert event electionStatus updates to channel sends. If we cannot retrieve
-	// the newest electionStatus, we kill the dispatcher runner.
-	for {
-		s, err := w.get(ctx)
-		if err != nil {
-			return fmt.Errorf("could not get newest electionStatus: %w", err)
-		}
-		c <- s
-	}
-}
-
-// activeTarget is the active implementation used by the listener dispatcher, or
-// nil if none is active yet.
-type activeTarget struct {
-	// context describing the lifecycle of the active implementation, or nil if the
-	// impl is nil.
-	ctx *context.Context
-	// context cancel function for ctx, or nil if ctx is nil.
-	ctxC *context.CancelFunc
-	// active Curator implementation, or nil if not yet set up.
-	impl rpc.ClusterServices
-}
-
-// switchTo switches the activeTarget over to a Curator implementation as per
-// the electionStatus and leader configuration. If the activeTarget already had
-// an implementation set, its associated context is canceled.
-func (t *activeTarget) switchTo(ctx context.Context, l *listener, status *electionStatus) {
-	if t.ctxC != nil {
-		(*t.ctxC)()
-	}
-	implCtx, implCtxC := context.WithCancel(ctx)
-	t.ctx = &implCtx
-	t.ctxC = &implCtxC
-	if leader := status.leader; leader != nil {
-		supervisor.Logger(ctx).Info("Dispatcher switching over to local leader")
-		// Create a new leadership and pass it to all leader service instances.
-		//
-		// This shares the leadership locks across all of them. Each time we regain
-		// leadership, a new set of locks is created - this is fine, as even if we
-		// happen to have to instances of the leader running (one old hanging on a lock
-		// and a new one with the lock freed) the previous leader will fail on
-		// txnAsLeader due to the leadership being outdated.
-		t.impl = newCuratorLeader(&leadership{
-			lockKey:   leader.lockKey,
-			lockRev:   leader.lockRev,
-			etcd:      l.etcd,
-			consensus: l.consensus,
-		}, &l.node.Node)
-	} else {
-		supervisor.Logger(ctx).Info("Dispatcher switching over to follower")
-		t.impl = &curatorFollower{}
-	}
-}
-
-// dispatchRequest is a request sent to the dispatcher by the listener when it
-// needs an up to date listenerTarget to run RPC calls against.
-type dispatchRequest struct {
-	resC chan listenerTarget
 }
 
 // listenerTarget is where the listener should forward a given curator RPC. This
@@ -175,255 +55,72 @@
 	impl rpc.ClusterServices
 }
 
-// dispatch contacts the dispatcher to retrieve an up-to-date listenerTarget.
-// This target is then used to serve RPCs. The given context is only used to
-// time out the dispatch action and does not influence the listenerTarget
-// returned.
-func (l *listener) dispatch(ctx context.Context) (*listenerTarget, error) {
-	req := dispatchRequest{
-		// resC is non-blocking to ensure slow dispatch requests do not further cascade
-		// into blocking the dispatcher.
-		resC: make(chan listenerTarget, 1),
-	}
-	select {
-	case <-ctx.Done():
-		return nil, ctx.Err()
-	case l.dispatchC <- req:
-	}
-
-	select {
-	case <-ctx.Done():
-		return nil, ctx.Err()
-	case res := <-req.resC:
-		return &res, nil
-	}
-}
-
 // run is the listener runnable. It listens on gRPC sockets and serves RPCs.
 func (l *listener) run(ctx context.Context) error {
-	supervisor.Logger(ctx).Info("Listeners starting...")
-	if err := supervisor.Run(ctx, "dispatcher", l.dispatcher); err != nil {
-		return fmt.Errorf("when starting dispatcher: %w", err)
+	w := l.electionWatch()
+	supervisor.Logger(ctx).Infof("Waiting for election status...")
+	st, err := w.get(ctx)
+	if err != nil {
+		return fmt.Errorf("could not get election status: %w", err)
 	}
 
 	sec := rpc.ServerSecurity{
 		NodeCredentials: l.node,
 	}
 
-	err := supervisor.Run(ctx, "external", func(ctx context.Context) error {
-		lisExternal, err := net.Listen("tcp", fmt.Sprintf(":%d", node.CuratorServicePort))
+	switch {
+	case st.leader != nil:
+		supervisor.Logger(ctx).Infof("This curator is a leader, starting listener.")
+		lis, err := net.Listen("tcp", fmt.Sprintf(":%d", node.CuratorServicePort))
 		if err != nil {
-			return fmt.Errorf("failed to listen on external curator socket: %w", err)
+			return fmt.Errorf("failed to listen on curator socket: %w", err)
 		}
-		defer lisExternal.Close()
+		defer lis.Close()
 
-		runnable := supervisor.GRPCServer(sec.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), l), lisExternal, true)
-		return runnable(ctx)
-	})
-	if err != nil {
-		return fmt.Errorf("while starting external gRPC listener: %w", err)
-	}
-
-	supervisor.Logger(ctx).Info("Listeners started.")
-	supervisor.Signal(ctx, supervisor.SignalHealthy)
-
-	// Keep the listener running, as its a parent to the gRPC listener.
-	<-ctx.Done()
-	return ctx.Err()
-}
-
-// implOperation is a function passed to callImpl by a listener RPC shim. It
-// sets up and calls the appropriate RPC for the shim that is it's being used in.
-//
-// Each gRPC service exposed by the Curator is implemented directly on the
-// listener as a shim, and that shim uses callImpl to execute the correct,
-// current (leader or follower) RPC call. The implOperation is defined inline in
-// each shim to perform that call, and received the context and implementation
-// reflecting the current active implementation (leader/follower). Errors
-// returned are either returned directly or converted to an UNAVAILABLE status
-// if the error is as a result of the context being canceled due to the
-// implementation switching.
-type implOperation func(ctx context.Context, impl rpc.ClusterServices) error
-
-// callImpl gets the newest listenerTarget from the dispatcher, combines the
-// given context with the context of the listenerTarget implementation and calls
-// the given function with the combined context and implementation.
-//
-// It's called by listener RPC shims.
-func (l *listener) callImpl(ctx context.Context, op implOperation) error {
-	lt, err := l.dispatch(ctx)
-	// dispatch will only return errors on context cancellations.
-	if err != nil {
-		return err
-	}
-
-	ctxCombined := combinectx.Combine(ctx, lt.ctx)
-	err = op(ctxCombined, lt.impl)
-
-	// No error occurred? Nothing else to do.
-	if err == nil {
-		return nil
-	}
-	cerr := &combinectx.Error{}
-	// An error occurred. Was it a context error?
-	if errors.As(err, &cerr) {
-		if cerr.First() {
-			// Request context got canceled. Return inner context error.
-			return cerr.Unwrap()
-		} else {
-			// Leadership changed. Return an UNAVAILABLE so that the request gets retried by
-			// the caller if needed.
-			return status.Error(codes.Unavailable, "curator backend switched, request can be retried")
+		leader := newCuratorLeader(&leadership{
+			lockKey:   st.leader.lockKey,
+			lockRev:   st.leader.lockRev,
+			etcd:      l.etcd,
+			consensus: l.consensus,
+		}, &l.node.Node)
+		runnable := supervisor.GRPCServer(sec.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), leader), lis, true)
+		if err := supervisor.Run(ctx, "server", runnable); err != nil {
+			return fmt.Errorf("could not run server: %w", err)
 		}
-	} else {
-		// Not a context error, return verbatim.
-		return err
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		for {
+			st, err := w.get(ctx)
+			if err != nil {
+				return fmt.Errorf("getting election status after starting listener failed, bailing just in case: %w", err)
+			}
+			if st.leader == nil {
+				return fmt.Errorf("this curator stopped being a leader, quitting")
+			}
+		}
+	case st.follower != nil && st.follower.lock != nil:
+		supervisor.Logger(ctx).Infof("This curator is a follower (leader is %q), starting proxy.", st.follower.lock.NodeId)
+		lis, err := net.Listen("tcp", fmt.Sprintf(":%d", node.CuratorServicePort))
+		if err != nil {
+			return fmt.Errorf("failed to listen on curator socket: %w", err)
+		}
+		defer lis.Close()
+
+		follower := &curatorFollower{}
+		runnable := supervisor.GRPCServer(sec.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), follower), lis, true)
+		if err := supervisor.Run(ctx, "server", runnable); err != nil {
+			return fmt.Errorf("could not run server: %w", err)
+		}
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		for {
+			st, err := w.get(ctx)
+			if err != nil {
+				return fmt.Errorf("getting election status after starting listener failed, bailing just in case: %w", err)
+			}
+			if st.follower == nil {
+				return fmt.Errorf("this curator stopped being a follower, quitting")
+			}
+		}
+	default:
+		return fmt.Errorf("curator is neither leader nor follower - this is likely transient, restarting listener now")
 	}
 }
-
-// RPC shims start here. Each method defined below is a gRPC RPC handler which
-// uses callImpl to forward the incoming RPC into the current implementation of
-// the curator (leader or follower).
-//
-// TODO(q3k): once Go 1.18 lands, simplify this using type arguments (Generics).
-
-// curatorWatchServer is a Curator_WatchServer but shimmed to use an expiring
-// context.
-type curatorWatchServer struct {
-	grpc.ServerStream
-	ctx context.Context
-}
-
-func (c *curatorWatchServer) Context() context.Context {
-	return c.ctx
-}
-
-func (c *curatorWatchServer) Send(m *cpb.WatchEvent) error {
-	return c.ServerStream.SendMsg(m)
-}
-
-func (l *listener) Watch(req *cpb.WatchRequest, srv cpb.Curator_WatchServer) error {
-	proxy := func(ctx context.Context, impl rpc.ClusterServices) error {
-		return impl.Watch(req, &curatorWatchServer{
-			ServerStream: srv,
-			ctx:          ctx,
-		})
-	}
-	return l.callImpl(srv.Context(), proxy)
-}
-
-type aaaEscrowServer struct {
-	grpc.ServerStream
-	ctx context.Context
-}
-
-func (m *aaaEscrowServer) Context() context.Context {
-	return m.ctx
-}
-
-func (m *aaaEscrowServer) Send(r *apb.EscrowFromServer) error {
-	return m.ServerStream.SendMsg(r)
-}
-
-func (m *aaaEscrowServer) Recv() (*apb.EscrowFromClient, error) {
-	var res apb.EscrowFromClient
-	if err := m.ServerStream.RecvMsg(&res); err != nil {
-		return nil, err
-	}
-	return &res, nil
-}
-
-func (l *listener) Escrow(srv apb.AAA_EscrowServer) error {
-	return l.callImpl(srv.Context(), func(ctx context.Context, impl rpc.ClusterServices) error {
-		return impl.Escrow(&aaaEscrowServer{
-			ServerStream: srv,
-			ctx:          ctx,
-		})
-	})
-}
-
-func (l *listener) GetRegisterTicket(ctx context.Context, req *apb.GetRegisterTicketRequest) (res *apb.GetRegisterTicketResponse, err error) {
-	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
-		var err2 error
-		res, err2 = impl.GetRegisterTicket(ctx, req)
-		return err2
-	})
-	return
-}
-
-func (l *listener) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (res *cpb.UpdateNodeStatusResponse, err error) {
-	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
-		var err2 error
-		res, err2 = impl.UpdateNodeStatus(ctx, req)
-		return err2
-	})
-	return
-}
-
-func (l *listener) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (res *apb.GetClusterInfoResponse, err error) {
-	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
-		var err2 error
-		res, err2 = impl.GetClusterInfo(ctx, req)
-		return err2
-	})
-	return
-}
-
-func (l *listener) RegisterNode(ctx context.Context, req *cpb.RegisterNodeRequest) (res *cpb.RegisterNodeResponse, err error) {
-	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
-		var err2 error
-		res, err2 = impl.RegisterNode(ctx, req)
-		return err2
-	})
-	return
-}
-
-func (l *listener) CommitNode(ctx context.Context, req *cpb.CommitNodeRequest) (res *cpb.CommitNodeResponse, err error) {
-	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
-		var err2 error
-		res, err2 = impl.CommitNode(ctx, req)
-		return err2
-	})
-	return
-}
-
-func (l *listener) JoinNode(ctx context.Context, req *cpb.JoinNodeRequest) (res *cpb.JoinNodeResponse, err error) {
-	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
-		var err2 error
-		res, err2 = impl.JoinNode(ctx, req)
-		return err2
-	})
-	return
-}
-
-type managementGetNodesServer struct {
-	grpc.ServerStream
-	ctx context.Context
-}
-
-func (s *managementGetNodesServer) Context() context.Context {
-	return s.ctx
-}
-
-func (s *managementGetNodesServer) Send(m *apb.Node) error {
-	return s.ServerStream.SendMsg(m)
-}
-
-func (l *listener) GetNodes(req *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
-	proxy := func(ctx context.Context, impl rpc.ClusterServices) error {
-		return impl.GetNodes(req, &managementGetNodesServer{
-			ServerStream: srv,
-			ctx:          ctx,
-		})
-	}
-	return l.callImpl(srv.Context(), proxy)
-}
-
-func (l *listener) ApproveNode(ctx context.Context, req *apb.ApproveNodeRequest) (res *apb.ApproveNodeResponse, err error) {
-	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
-		var err2 error
-		res, err2 = impl.ApproveNode(ctx, req)
-		return err2
-	})
-	return
-}
diff --git a/metropolis/node/core/curator/listener_test.go b/metropolis/node/core/curator/listener_test.go
deleted file mode 100644
index 3961bf8..0000000
--- a/metropolis/node/core/curator/listener_test.go
+++ /dev/null
@@ -1,94 +0,0 @@
-package curator
-
-import (
-	"context"
-	"errors"
-	"testing"
-
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
-
-	"source.monogon.dev/metropolis/node/core/rpc"
-	"source.monogon.dev/metropolis/pkg/event/memory"
-	"source.monogon.dev/metropolis/pkg/supervisor"
-)
-
-// TestListenerSwitch exercises the curator listener's
-// switch-to-different-implementation functionality, notably ensuring that the
-// correct implementation is called and that the context is canceled accordingly
-// on implementation switch.
-//
-// It does not test the gRPC listener socket itself and the actual
-// implementations - that is deferred to curator functionality tests.
-func TestListenerSwitch(t *testing.T) {
-	// Create test event value.
-	var val memory.Value
-
-	eph := rpc.NewEphemeralClusterCredentials(t, 1)
-	creds := eph.Nodes[0]
-
-	// Create DUT listener.
-	l := &listener{
-		etcd: nil,
-		electionWatch: func() electionWatcher {
-			return electionWatcher{
-				Watcher: val.Watch(),
-			}
-		},
-		dispatchC: make(chan dispatchRequest),
-		node:      creds,
-	}
-
-	// Start listener under supervisor.
-	supervisor.TestHarness(t, l.run)
-
-	// Begin with a follower.
-	val.Set(electionStatus{
-		follower: &electionStatusFollower{},
-	})
-
-	// Context for this test.
-	ctx, ctxC := context.WithCancel(context.Background())
-	defer ctxC()
-
-	// Simulate a request context.
-	ctxR, ctxRC := context.WithCancel(ctx)
-
-	// Check that canceling the request unblocks a pending dispatched call.
-	errC := make(chan error)
-	go func() {
-		errC <- l.callImpl(ctxR, func(ctx context.Context, impl rpc.ClusterServices) error {
-			<-ctx.Done()
-			return ctx.Err()
-		})
-	}()
-	ctxRC()
-	err := <-errC
-	if err == nil || !errors.Is(err, context.Canceled) {
-		t.Fatalf("callImpl context should have returned context error, got %v", err)
-	}
-
-	// Check that switching implementations unblocks a pending dispatched call.
-	scheduledC := make(chan struct{})
-	go func() {
-		errC <- l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
-			close(scheduledC)
-			<-ctx.Done()
-			return ctx.Err()
-		})
-	}()
-	// Block until we actually start executing on the follower listener.
-	<-scheduledC
-	// Switch over to leader listener.
-	val.Set(electionStatus{
-		leader: &electionStatusLeader{},
-	})
-	// Check returned error.
-	err = <-errC
-	if err == nil {
-		t.Fatalf("callImpl context should have returned error, got nil")
-	}
-	if serr, ok := status.FromError(err); !ok || serr.Code() != codes.Unavailable {
-		t.Fatalf("callImpl context should have returned unavailable, got %v", err)
-	}
-}