curator: remove dispatch system
This vastly simplifies the curator by removing the dispatch and per-RPC
switch logic, instead replacing it with the gRPC server stopping
whenever the leadership status of a curator switches.
The downside is technically the possibility of some 'stale' RPC
handling, where a leader/follower accepts some new RPC even though it's
in the process of switching its leadership.
The rationale for this change is:
1. Leadership-exclusive actions are guarded by the etcd leadership
lock being held, so there's no chance a long pending RPC to a
leader that just stepped down will cause split brain scenarios.
2. We're moving away from follower proxying, and followers will
instead just serve a 'who's the leader' RPC. These are okay to
serve stale data (ie. when the contacted follower should've
switched to be a leader, or a follower of another leader), as
during leadership failover we expect clients to perform retry
loops until a new leadership connection is established.
Another downside (or perhaps upside) is that we don't start the listener
until we're ready to serve data, either the full API as a leader or a
reduced API as a follower. The downside is that clients will have to
retry connections until the leader is running, and that it might be
difficult to tell apart a node which isn't yet running the curator from
a broken node, or one that will not run the curator at all. On the other
hand, succesfully establishing a connections means that we are sure to
get a gRPC response instead of a hang because the curator isn't yet ready
to serve.
Change-Id: I2ec35f00bce72f0f337e8e25e8c71f5265a7d8bb
Reviewed-on: https://review.monogon.dev/c/monogon/+/685
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 16bfbfe..681b637 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -26,7 +26,6 @@
"//metropolis/node/core/curator/proto/private",
"//metropolis/node/core/identity",
"//metropolis/node/core/rpc",
- "//metropolis/pkg/combinectx",
"//metropolis/pkg/event",
"//metropolis/pkg/event/etcd",
"//metropolis/pkg/event/memory",
@@ -36,7 +35,6 @@
"//metropolis/proto/common",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
- "@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//proto",
@@ -48,7 +46,6 @@
srcs = [
"curator_test.go",
"impl_leader_test.go",
- "listener_test.go",
"state_test.go",
],
embed = [":curator"],
@@ -59,7 +56,6 @@
"//metropolis/node/core/curator/proto/private",
"//metropolis/node/core/identity",
"//metropolis/node/core/rpc",
- "//metropolis/pkg/event/memory",
"//metropolis/pkg/pki",
"//metropolis/pkg/supervisor",
"//metropolis/proto/api",
@@ -67,8 +63,6 @@
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//:go_default_library",
- "@org_golang_google_grpc//codes",
- "@org_golang_google_grpc//status",
"@org_golang_google_grpc//test/bufconn",
"@org_golang_google_protobuf//proto",
],
diff --git a/metropolis/node/core/curator/curator.go b/metropolis/node/core/curator/curator.go
index d6ec579..9ad88a8 100644
--- a/metropolis/node/core/curator/curator.go
+++ b/metropolis/node/core/curator/curator.go
@@ -280,7 +280,6 @@
lis := listener{
node: s.config.NodeCredentials,
electionWatch: s.electionWatch,
- dispatchC: make(chan dispatchRequest),
consensus: s.config.Consensus,
etcd: etcd,
}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 9715c91..5af644b 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -2,23 +2,15 @@
import (
"context"
- "errors"
"fmt"
"net"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-
"source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/consensus/client"
- cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
- "source.monogon.dev/metropolis/pkg/combinectx"
"source.monogon.dev/metropolis/pkg/supervisor"
- apb "source.monogon.dev/metropolis/proto/api"
)
// listener is the curator runnable responsible for listening for gRPC
@@ -50,118 +42,6 @@
electionWatch func() electionWatcher
consensus consensus.ServiceHandle
-
- dispatchC chan dispatchRequest
-}
-
-// dispatcher is the 'listener dispatcher', the listener's runnable responsible
-// for keeping track of the currently selected curator implementation and
-// switching over when necessary.
-//
-// It listens for 'dispatch' requests from the listener's RPC handlers and
-// returns a Curator implementation that should be used to handle this request,
-// alongside a context expressing the lifespan of this implementation.
-func (l *listener) dispatcher(ctx context.Context) error {
- supervisor.Logger(ctx).Info("Dispatcher starting...")
-
- // Start with an empty 'active target'. This will be populated before the
- // first dispatch request is served.
- t := activeTarget{}
- w := l.electionWatch()
-
- supervisor.Signal(ctx, supervisor.SignalHealthy)
-
- // Channel containing electionStatus updates from value.
- c := make(chan *electionStatus)
- defer close(c)
-
- go func() {
- // Wait for initial status.
- s, ok := <-c
- if !ok {
- return
- }
- t.switchTo(ctx, l, s)
-
- // Respond to requests and status updates.
- for {
- select {
- case <-ctx.Done():
- return
- case r := <-l.dispatchC:
- // Handle request.
- r.resC <- listenerTarget{
- ctx: *t.ctx,
- impl: t.impl,
- }
- case s, ok := <-c:
- // Handle status update, or quit on status update error.
- if !ok {
- return
- }
- t.switchTo(ctx, l, s)
- }
- }
- }()
-
- // Convert event electionStatus updates to channel sends. If we cannot retrieve
- // the newest electionStatus, we kill the dispatcher runner.
- for {
- s, err := w.get(ctx)
- if err != nil {
- return fmt.Errorf("could not get newest electionStatus: %w", err)
- }
- c <- s
- }
-}
-
-// activeTarget is the active implementation used by the listener dispatcher, or
-// nil if none is active yet.
-type activeTarget struct {
- // context describing the lifecycle of the active implementation, or nil if the
- // impl is nil.
- ctx *context.Context
- // context cancel function for ctx, or nil if ctx is nil.
- ctxC *context.CancelFunc
- // active Curator implementation, or nil if not yet set up.
- impl rpc.ClusterServices
-}
-
-// switchTo switches the activeTarget over to a Curator implementation as per
-// the electionStatus and leader configuration. If the activeTarget already had
-// an implementation set, its associated context is canceled.
-func (t *activeTarget) switchTo(ctx context.Context, l *listener, status *electionStatus) {
- if t.ctxC != nil {
- (*t.ctxC)()
- }
- implCtx, implCtxC := context.WithCancel(ctx)
- t.ctx = &implCtx
- t.ctxC = &implCtxC
- if leader := status.leader; leader != nil {
- supervisor.Logger(ctx).Info("Dispatcher switching over to local leader")
- // Create a new leadership and pass it to all leader service instances.
- //
- // This shares the leadership locks across all of them. Each time we regain
- // leadership, a new set of locks is created - this is fine, as even if we
- // happen to have to instances of the leader running (one old hanging on a lock
- // and a new one with the lock freed) the previous leader will fail on
- // txnAsLeader due to the leadership being outdated.
- t.impl = newCuratorLeader(&leadership{
- lockKey: leader.lockKey,
- lockRev: leader.lockRev,
- etcd: l.etcd,
- consensus: l.consensus,
- }, &l.node.Node)
- } else {
- supervisor.Logger(ctx).Info("Dispatcher switching over to follower")
- t.impl = &curatorFollower{}
- }
-}
-
-// dispatchRequest is a request sent to the dispatcher by the listener when it
-// needs an up to date listenerTarget to run RPC calls against.
-type dispatchRequest struct {
- resC chan listenerTarget
}
// listenerTarget is where the listener should forward a given curator RPC. This
@@ -175,255 +55,72 @@
impl rpc.ClusterServices
}
-// dispatch contacts the dispatcher to retrieve an up-to-date listenerTarget.
-// This target is then used to serve RPCs. The given context is only used to
-// time out the dispatch action and does not influence the listenerTarget
-// returned.
-func (l *listener) dispatch(ctx context.Context) (*listenerTarget, error) {
- req := dispatchRequest{
- // resC is non-blocking to ensure slow dispatch requests do not further cascade
- // into blocking the dispatcher.
- resC: make(chan listenerTarget, 1),
- }
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case l.dispatchC <- req:
- }
-
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case res := <-req.resC:
- return &res, nil
- }
-}
-
// run is the listener runnable. It listens on gRPC sockets and serves RPCs.
func (l *listener) run(ctx context.Context) error {
- supervisor.Logger(ctx).Info("Listeners starting...")
- if err := supervisor.Run(ctx, "dispatcher", l.dispatcher); err != nil {
- return fmt.Errorf("when starting dispatcher: %w", err)
+ w := l.electionWatch()
+ supervisor.Logger(ctx).Infof("Waiting for election status...")
+ st, err := w.get(ctx)
+ if err != nil {
+ return fmt.Errorf("could not get election status: %w", err)
}
sec := rpc.ServerSecurity{
NodeCredentials: l.node,
}
- err := supervisor.Run(ctx, "external", func(ctx context.Context) error {
- lisExternal, err := net.Listen("tcp", fmt.Sprintf(":%d", node.CuratorServicePort))
+ switch {
+ case st.leader != nil:
+ supervisor.Logger(ctx).Infof("This curator is a leader, starting listener.")
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%d", node.CuratorServicePort))
if err != nil {
- return fmt.Errorf("failed to listen on external curator socket: %w", err)
+ return fmt.Errorf("failed to listen on curator socket: %w", err)
}
- defer lisExternal.Close()
+ defer lis.Close()
- runnable := supervisor.GRPCServer(sec.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), l), lisExternal, true)
- return runnable(ctx)
- })
- if err != nil {
- return fmt.Errorf("while starting external gRPC listener: %w", err)
- }
-
- supervisor.Logger(ctx).Info("Listeners started.")
- supervisor.Signal(ctx, supervisor.SignalHealthy)
-
- // Keep the listener running, as its a parent to the gRPC listener.
- <-ctx.Done()
- return ctx.Err()
-}
-
-// implOperation is a function passed to callImpl by a listener RPC shim. It
-// sets up and calls the appropriate RPC for the shim that is it's being used in.
-//
-// Each gRPC service exposed by the Curator is implemented directly on the
-// listener as a shim, and that shim uses callImpl to execute the correct,
-// current (leader or follower) RPC call. The implOperation is defined inline in
-// each shim to perform that call, and received the context and implementation
-// reflecting the current active implementation (leader/follower). Errors
-// returned are either returned directly or converted to an UNAVAILABLE status
-// if the error is as a result of the context being canceled due to the
-// implementation switching.
-type implOperation func(ctx context.Context, impl rpc.ClusterServices) error
-
-// callImpl gets the newest listenerTarget from the dispatcher, combines the
-// given context with the context of the listenerTarget implementation and calls
-// the given function with the combined context and implementation.
-//
-// It's called by listener RPC shims.
-func (l *listener) callImpl(ctx context.Context, op implOperation) error {
- lt, err := l.dispatch(ctx)
- // dispatch will only return errors on context cancellations.
- if err != nil {
- return err
- }
-
- ctxCombined := combinectx.Combine(ctx, lt.ctx)
- err = op(ctxCombined, lt.impl)
-
- // No error occurred? Nothing else to do.
- if err == nil {
- return nil
- }
- cerr := &combinectx.Error{}
- // An error occurred. Was it a context error?
- if errors.As(err, &cerr) {
- if cerr.First() {
- // Request context got canceled. Return inner context error.
- return cerr.Unwrap()
- } else {
- // Leadership changed. Return an UNAVAILABLE so that the request gets retried by
- // the caller if needed.
- return status.Error(codes.Unavailable, "curator backend switched, request can be retried")
+ leader := newCuratorLeader(&leadership{
+ lockKey: st.leader.lockKey,
+ lockRev: st.leader.lockRev,
+ etcd: l.etcd,
+ consensus: l.consensus,
+ }, &l.node.Node)
+ runnable := supervisor.GRPCServer(sec.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), leader), lis, true)
+ if err := supervisor.Run(ctx, "server", runnable); err != nil {
+ return fmt.Errorf("could not run server: %w", err)
}
- } else {
- // Not a context error, return verbatim.
- return err
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ for {
+ st, err := w.get(ctx)
+ if err != nil {
+ return fmt.Errorf("getting election status after starting listener failed, bailing just in case: %w", err)
+ }
+ if st.leader == nil {
+ return fmt.Errorf("this curator stopped being a leader, quitting")
+ }
+ }
+ case st.follower != nil && st.follower.lock != nil:
+ supervisor.Logger(ctx).Infof("This curator is a follower (leader is %q), starting proxy.", st.follower.lock.NodeId)
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%d", node.CuratorServicePort))
+ if err != nil {
+ return fmt.Errorf("failed to listen on curator socket: %w", err)
+ }
+ defer lis.Close()
+
+ follower := &curatorFollower{}
+ runnable := supervisor.GRPCServer(sec.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), follower), lis, true)
+ if err := supervisor.Run(ctx, "server", runnable); err != nil {
+ return fmt.Errorf("could not run server: %w", err)
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ for {
+ st, err := w.get(ctx)
+ if err != nil {
+ return fmt.Errorf("getting election status after starting listener failed, bailing just in case: %w", err)
+ }
+ if st.follower == nil {
+ return fmt.Errorf("this curator stopped being a follower, quitting")
+ }
+ }
+ default:
+ return fmt.Errorf("curator is neither leader nor follower - this is likely transient, restarting listener now")
}
}
-
-// RPC shims start here. Each method defined below is a gRPC RPC handler which
-// uses callImpl to forward the incoming RPC into the current implementation of
-// the curator (leader or follower).
-//
-// TODO(q3k): once Go 1.18 lands, simplify this using type arguments (Generics).
-
-// curatorWatchServer is a Curator_WatchServer but shimmed to use an expiring
-// context.
-type curatorWatchServer struct {
- grpc.ServerStream
- ctx context.Context
-}
-
-func (c *curatorWatchServer) Context() context.Context {
- return c.ctx
-}
-
-func (c *curatorWatchServer) Send(m *cpb.WatchEvent) error {
- return c.ServerStream.SendMsg(m)
-}
-
-func (l *listener) Watch(req *cpb.WatchRequest, srv cpb.Curator_WatchServer) error {
- proxy := func(ctx context.Context, impl rpc.ClusterServices) error {
- return impl.Watch(req, &curatorWatchServer{
- ServerStream: srv,
- ctx: ctx,
- })
- }
- return l.callImpl(srv.Context(), proxy)
-}
-
-type aaaEscrowServer struct {
- grpc.ServerStream
- ctx context.Context
-}
-
-func (m *aaaEscrowServer) Context() context.Context {
- return m.ctx
-}
-
-func (m *aaaEscrowServer) Send(r *apb.EscrowFromServer) error {
- return m.ServerStream.SendMsg(r)
-}
-
-func (m *aaaEscrowServer) Recv() (*apb.EscrowFromClient, error) {
- var res apb.EscrowFromClient
- if err := m.ServerStream.RecvMsg(&res); err != nil {
- return nil, err
- }
- return &res, nil
-}
-
-func (l *listener) Escrow(srv apb.AAA_EscrowServer) error {
- return l.callImpl(srv.Context(), func(ctx context.Context, impl rpc.ClusterServices) error {
- return impl.Escrow(&aaaEscrowServer{
- ServerStream: srv,
- ctx: ctx,
- })
- })
-}
-
-func (l *listener) GetRegisterTicket(ctx context.Context, req *apb.GetRegisterTicketRequest) (res *apb.GetRegisterTicketResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
- var err2 error
- res, err2 = impl.GetRegisterTicket(ctx, req)
- return err2
- })
- return
-}
-
-func (l *listener) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (res *cpb.UpdateNodeStatusResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
- var err2 error
- res, err2 = impl.UpdateNodeStatus(ctx, req)
- return err2
- })
- return
-}
-
-func (l *listener) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (res *apb.GetClusterInfoResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
- var err2 error
- res, err2 = impl.GetClusterInfo(ctx, req)
- return err2
- })
- return
-}
-
-func (l *listener) RegisterNode(ctx context.Context, req *cpb.RegisterNodeRequest) (res *cpb.RegisterNodeResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
- var err2 error
- res, err2 = impl.RegisterNode(ctx, req)
- return err2
- })
- return
-}
-
-func (l *listener) CommitNode(ctx context.Context, req *cpb.CommitNodeRequest) (res *cpb.CommitNodeResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
- var err2 error
- res, err2 = impl.CommitNode(ctx, req)
- return err2
- })
- return
-}
-
-func (l *listener) JoinNode(ctx context.Context, req *cpb.JoinNodeRequest) (res *cpb.JoinNodeResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
- var err2 error
- res, err2 = impl.JoinNode(ctx, req)
- return err2
- })
- return
-}
-
-type managementGetNodesServer struct {
- grpc.ServerStream
- ctx context.Context
-}
-
-func (s *managementGetNodesServer) Context() context.Context {
- return s.ctx
-}
-
-func (s *managementGetNodesServer) Send(m *apb.Node) error {
- return s.ServerStream.SendMsg(m)
-}
-
-func (l *listener) GetNodes(req *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
- proxy := func(ctx context.Context, impl rpc.ClusterServices) error {
- return impl.GetNodes(req, &managementGetNodesServer{
- ServerStream: srv,
- ctx: ctx,
- })
- }
- return l.callImpl(srv.Context(), proxy)
-}
-
-func (l *listener) ApproveNode(ctx context.Context, req *apb.ApproveNodeRequest) (res *apb.ApproveNodeResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
- var err2 error
- res, err2 = impl.ApproveNode(ctx, req)
- return err2
- })
- return
-}
diff --git a/metropolis/node/core/curator/listener_test.go b/metropolis/node/core/curator/listener_test.go
deleted file mode 100644
index 3961bf8..0000000
--- a/metropolis/node/core/curator/listener_test.go
+++ /dev/null
@@ -1,94 +0,0 @@
-package curator
-
-import (
- "context"
- "errors"
- "testing"
-
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-
- "source.monogon.dev/metropolis/node/core/rpc"
- "source.monogon.dev/metropolis/pkg/event/memory"
- "source.monogon.dev/metropolis/pkg/supervisor"
-)
-
-// TestListenerSwitch exercises the curator listener's
-// switch-to-different-implementation functionality, notably ensuring that the
-// correct implementation is called and that the context is canceled accordingly
-// on implementation switch.
-//
-// It does not test the gRPC listener socket itself and the actual
-// implementations - that is deferred to curator functionality tests.
-func TestListenerSwitch(t *testing.T) {
- // Create test event value.
- var val memory.Value
-
- eph := rpc.NewEphemeralClusterCredentials(t, 1)
- creds := eph.Nodes[0]
-
- // Create DUT listener.
- l := &listener{
- etcd: nil,
- electionWatch: func() electionWatcher {
- return electionWatcher{
- Watcher: val.Watch(),
- }
- },
- dispatchC: make(chan dispatchRequest),
- node: creds,
- }
-
- // Start listener under supervisor.
- supervisor.TestHarness(t, l.run)
-
- // Begin with a follower.
- val.Set(electionStatus{
- follower: &electionStatusFollower{},
- })
-
- // Context for this test.
- ctx, ctxC := context.WithCancel(context.Background())
- defer ctxC()
-
- // Simulate a request context.
- ctxR, ctxRC := context.WithCancel(ctx)
-
- // Check that canceling the request unblocks a pending dispatched call.
- errC := make(chan error)
- go func() {
- errC <- l.callImpl(ctxR, func(ctx context.Context, impl rpc.ClusterServices) error {
- <-ctx.Done()
- return ctx.Err()
- })
- }()
- ctxRC()
- err := <-errC
- if err == nil || !errors.Is(err, context.Canceled) {
- t.Fatalf("callImpl context should have returned context error, got %v", err)
- }
-
- // Check that switching implementations unblocks a pending dispatched call.
- scheduledC := make(chan struct{})
- go func() {
- errC <- l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
- close(scheduledC)
- <-ctx.Done()
- return ctx.Err()
- })
- }()
- // Block until we actually start executing on the follower listener.
- <-scheduledC
- // Switch over to leader listener.
- val.Set(electionStatus{
- leader: &electionStatusLeader{},
- })
- // Check returned error.
- err = <-errC
- if err == nil {
- t.Fatalf("callImpl context should have returned error, got nil")
- }
- if serr, ok := status.FromError(err); !ok || serr.Code() != codes.Unavailable {
- t.Fatalf("callImpl context should have returned unavailable, got %v", err)
- }
-}