m/n/core/curator: implement gRPC listener
This implements the Curator listener and listener dispatcher, two
runnables responsible for maintaining an active Curator RPC
implementation (either leader or follower) and switching over
appropriately as the election status changes.
This might be overengineered. The implementation switchover logic and
context joining could possibly be ripped out and replaced by plain gRPC
socket close and re-open on switchover.
Testing plan: implements unit tests for switching/dispatching.
Change-Id: Ib62195b336d0754d99865d7a2a32ba2ffa3543ce
Reviewed-on: https://review.monogon.dev/c/monogon/+/185
Reviewed-by: Lorenz Brun <lorenz@nexantic.com>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 7ff9a0e..f52233d 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -2,31 +2,48 @@
go_library(
name = "go_default_library",
- srcs = ["curator.go"],
+ srcs = [
+ "curator.go",
+ "impl_follower.go",
+ "impl_leader.go",
+ "listener.go",
+ ],
importpath = "source.monogon.dev/metropolis/node/core/curator",
visibility = ["//visibility:public"],
deps = [
"//metropolis/node/core/consensus/client:go_default_library",
+ "//metropolis/node/core/curator/proto/api:go_default_library",
"//metropolis/node/core/curator/proto/private:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
+ "//metropolis/pkg/combinectx:go_default_library",
"//metropolis/pkg/event:go_default_library",
"//metropolis/pkg/event/memory:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
"@io_etcd_go_etcd//clientv3/concurrency:go_default_library",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//codes:go_default_library",
+ "@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)
go_test(
name = "go_default_test",
- srcs = ["curator_test.go"],
+ srcs = [
+ "curator_test.go",
+ "listener_test.go",
+ ],
embed = [":go_default_library"],
deps = [
"//metropolis/node/core/consensus/client:go_default_library",
+ "//metropolis/node/core/curator/proto/api:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/localstorage/declarative:go_default_library",
+ "//metropolis/pkg/event/memory:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
"@io_etcd_go_etcd//clientv3:go_default_library",
"@io_etcd_go_etcd//integration:go_default_library",
+ "@org_golang_google_grpc//codes:go_default_library",
+ "@org_golang_google_grpc//status:go_default_library",
],
)
diff --git a/metropolis/node/core/curator/curator.go b/metropolis/node/core/curator/curator.go
index fe7aabf..2dc7556 100644
--- a/metropolis/node/core/curator/curator.go
+++ b/metropolis/node/core/curator/curator.go
@@ -18,9 +18,11 @@
"time"
"go.etcd.io/etcd/clientv3/concurrency"
+ "google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node/core/consensus/client"
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/pkg/event"
@@ -253,6 +255,19 @@
}
}()
+ // Start listener. This is a gRPC service listening on a local socket,
+ // providing the Curator API to consumers, dispatching to either a locally
+ // running leader, or forwarding to a remotely running leader.
+ lis := listener{
+ directory: s.config.Directory,
+ electionWatch: s.electionWatch,
+ etcd: s.config.Etcd,
+ dispatchC: make(chan dispatchRequest),
+ }
+ if err := supervisor.Run(ctx, "listener", lis.run); err != nil {
+ return fmt.Errorf("when starting listener: %w", err)
+ }
+
// Calculate effective TTL. This replicates the behaviour of clientv3's WithTTL,
// but allows us to explicitly log the used TTL.
s.ttl = int(s.config.LeaderTTL.Seconds())
@@ -274,3 +289,11 @@
supervisor.Logger(ctx).Info("Curator election restarting...")
}
}
+
+func (s *Service) DialCluster(ctx context.Context) (apb.CuratorClient, error) {
+ conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix://%s", s.config.Directory.ClientSocket.FullPath()), grpc.WithInsecure())
+ if err != nil {
+ return nil, fmt.Errorf("failed to dial local curator socket: %w", err)
+ }
+ return apb.NewCuratorClient(conn), nil
+}
diff --git a/metropolis/node/core/curator/impl_follower.go b/metropolis/node/core/curator/impl_follower.go
new file mode 100644
index 0000000..85e6bf5
--- /dev/null
+++ b/metropolis/node/core/curator/impl_follower.go
@@ -0,0 +1,15 @@
+package curator
+
+import (
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+type curatorFollower struct {
+}
+
+func (f *curatorFollower) Watch(req *apb.WatchRequest, srv apb.Curator_WatchServer) error {
+ return status.Error(codes.Unimplemented, "curator follower not implemented")
+}
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
new file mode 100644
index 0000000..2f19fd5
--- /dev/null
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -0,0 +1,19 @@
+package curator
+
+import (
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "source.monogon.dev/metropolis/node/core/consensus/client"
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+type curatorLeader struct {
+ lockKey string
+ lockRev int64
+ etcd client.Namespaced
+}
+
+func (l *curatorLeader) Watch(req *apb.WatchRequest, srv apb.Curator_WatchServer) error {
+ return status.Error(codes.Unimplemented, "curator leader not implemented")
+}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
new file mode 100644
index 0000000..29b1167
--- /dev/null
+++ b/metropolis/node/core/curator/listener.go
@@ -0,0 +1,274 @@
+package curator
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "source.monogon.dev/metropolis/node/core/consensus/client"
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/pkg/combinectx"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// listener is the curator runnable responsible for listening for gRPC
+// connections and forwarding them over to one of two possible implementations:
+// - a local leader implementation which is backed by etcd
+// - a follower implementation that forwards the RPCs over to a remote leader.
+//
+// Its goal is to make any switches over between leader and follower painless to
+// the gRPC callers.
+// Any pending calls will be canceled with UNAVAILABLE and an error message
+// describing the fact that the implementation has been switched over.
+// The gRPC sockets will always be listening for connections, and block until
+// able to serve a request (either locally or by forwarding).
+// No retries will be attempted on switchover, as some calls might not be
+// idempotent and the caller is better equipped to know when to retry.
+type listener struct {
+ // etcd is a client to the locally running consensus (etcd) server which is used
+ // both for storing lock/leader election status and actual Curator data.
+ etcd client.Namespaced
+ // directory is the ephemeral directory in which the local gRPC socket will
+ // be available for node-local consumers.
+ directory *localstorage.EphemeralCuratorDirectory
+ // electionWatch is a function that returns an active electionWatcher for the
+ // listener to use when determining local leadership. As the listener may
+ // restart on error, this factory-function is used instead of an electionWatcher
+ // directly.
+ electionWatch func() electionWatcher
+
+ dispatchC chan dispatchRequest
+}
+
+// dispatcher is the 'listener dispatcher', the listener's runnable responsible
+// for keeping track of the currently selected curator implementation and
+// switching over when necessary.
+//
+// It listens for 'dispatch' requests from the listener's RPC handlers and
+// returns a Curator implementation that should be used to handle this request,
+// alongside a context expressing the lifespan of this implementation.
+func (l *listener) dispatcher(ctx context.Context) error {
+ supervisor.Logger(ctx).Info("Dispatcher starting...")
+
+ // Start with an empty 'active target'. This will be populated before the
+ // first dispatch request is served.
+ t := activeTarget{}
+ w := l.electionWatch()
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ // Channel containing electionStatus updates from value.
+ c := make(chan *electionStatus)
+ defer close(c)
+
+ go func() {
+ // Wait for initial status.
+ s, ok := <-c
+ if !ok {
+ return
+ }
+ t.switchTo(ctx, l, s)
+
+ // Respond to requests and status updates.
+ for {
+ select {
+ case r := <-l.dispatchC:
+ // Handle request.
+ r.resC <- listenerTarget{
+ ctx: *t.ctx,
+ impl: t.impl,
+ }
+ case s, ok := <-c:
+ // Handle status update, or quit on status update error.
+ if !ok {
+ return
+ }
+ t.switchTo(ctx, l, s)
+ }
+ }
+ }()
+
+ // Convert event electionStatus updates to channel sends. If we cannot retrieve
+ // the newest electionStatus, we kill the dispatcher runner.
+ for {
+ s, err := w.get(ctx)
+ if err != nil {
+ return fmt.Errorf("could not get newest electionStatus: %w", err)
+ }
+ c <- s
+ }
+}
+
+// activeTarget is the active implementation used by the listener dispatcher, or
+// nil if none is active yet.
+type activeTarget struct {
+ // context describing the lifecycle of the active implementation, or nil if the
+ // impl is nil.
+ ctx *context.Context
+ // context cancel function for ctx, or nil if ctx is nil.
+ ctxC *context.CancelFunc
+ // active Curator implementation, or nil if not yet set up.
+ impl apb.CuratorServer
+}
+
+// switchTo switches the activeTarget over to a Curator implementation as per
+// the electionStatus and leader configuration. If the activeTarget already had
+// an implementation set, its associated context is canceled.
+func (t *activeTarget) switchTo(ctx context.Context, l *listener, status *electionStatus) {
+ if t.ctxC != nil {
+ (*t.ctxC)()
+ }
+ implCtx, implCtxC := context.WithCancel(ctx)
+ t.ctx = &implCtx
+ t.ctxC = &implCtxC
+ if leader := status.leader; leader != nil {
+ supervisor.Logger(ctx).Info("Dispatcher switching over to local leader")
+ t.impl = &curatorLeader{
+ lockKey: leader.lockKey,
+ lockRev: leader.lockRev,
+ etcd: l.etcd,
+ }
+ } else {
+ supervisor.Logger(ctx).Info("Dispatcher switching over to follower")
+ t.impl = &curatorFollower{}
+ }
+}
+
+// dispatchRequest is a request sent to the dispatcher by the listener when it
+// needs an up to date listenerTarget to run RPC calls against.
+type dispatchRequest struct {
+ resC chan listenerTarget
+}
+
+// listenerTarget is where the listener should forward a given curator RPC. This
+// is provided by the listener dispatcher on request (on 'dispatch').
+type listenerTarget struct {
+ // ctx is the context representing the lifetime of the given impl. It will be
+ // canceled when that implementation switches over to a different one.
+ ctx context.Context
+ // impl is the CuratorServer implementation to which RPCs should be directed
+ // according to the dispatcher.
+ impl apb.CuratorServer
+}
+
+// dispatch contacts the dispatcher to retrieve an up-to-date listenerTarget.
+// This target is then used to serve RPCs. The given context is only used to
+// time out the dispatch action and does not influence the listenerTarget
+// returned.
+func (l *listener) dispatch(ctx context.Context) (*listenerTarget, error) {
+ req := dispatchRequest{
+ // resC is non-blocking to ensure slow dispatch requests do not further cascade
+ // into blocking the dispatcher.
+ resC: make(chan listenerTarget, 1),
+ }
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case l.dispatchC <- req:
+ }
+
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case res := <-req.resC:
+ return &res, nil
+ }
+}
+
+// run is the listener runnable. It listens on gRPC sockets and serves RPCs.
+func (l *listener) run(ctx context.Context) error {
+ supervisor.Logger(ctx).Info("Listener starting...")
+ if err := supervisor.Run(ctx, "dispatcher", l.dispatcher); err != nil {
+ return fmt.Errorf("when starting dispatcher: %w", err)
+ }
+
+ // TODO(q3k): recreate socket if already exists? Is this needed?
+ lis, err := net.ListenUnix("unix", &net.UnixAddr{Name: l.directory.ClientSocket.FullPath(), Net: "unix"})
+ if err != nil {
+ return fmt.Errorf("failed to listen on curator listener socket: %w", err)
+ }
+
+ // TODO(q3k): run remote/public gRPC listener.
+
+ srv := grpc.NewServer()
+ apb.RegisterCuratorServer(srv, l)
+
+ if err := supervisor.Run(ctx, "local", supervisor.GRPCServer(srv, lis, true)); err != nil {
+ return fmt.Errorf("while starting local gRPC listener: %w", err)
+ }
+ supervisor.Logger(ctx).Info("Listener running.")
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ // Keep the listener running, as its a parent to the gRPC listener.
+ <-ctx.Done()
+ return ctx.Err()
+}
+
+// callImpl gets the newest listenerTarget from the dispatcher, combines the
+// given context with the context of the listenerTarget implementation and calls
+// the given function with the combined context and implementation.
+//
+// It is effectively a helper wrapper used by the Curator implementation of the
+// listener to run the RPC against the active listenerTarget.
+func (l *listener) callImpl(ctx context.Context, f func(ctx context.Context, impl apb.CuratorServer) error) error {
+ lt, err := l.dispatch(ctx)
+ // dispatch will only return errors on context cancellations.
+ if err != nil {
+ return err
+ }
+
+ ctxCombined := combinectx.Combine(ctx, lt.ctx)
+ err = f(ctxCombined, lt.impl)
+
+ // No error occurred? Nothing else to do.
+ if err == nil {
+ return nil
+ }
+ cerr := &combinectx.Error{}
+ // An error occurred. Was it a context error?
+ if errors.As(err, &cerr) {
+ if cerr.First() {
+ // Request context got canceled. Return inner context error.
+ return cerr.Unwrap()
+ } else {
+ // Leadership changed. Return an UNAVAILABLE so that the request gets retried by
+ // the caller if needed.
+ return status.Error(codes.Unavailable, "curator backend switched, request can be retried")
+ }
+ } else {
+ // Not a context error, return verbatim.
+ return err
+ }
+}
+
+// curatorWatchServer implements Curator_WatchServer but overrides the context
+// of the streaming RPC call with some other context (in this case, the combined
+// context from callImpl).
+type curatorWatchServer struct {
+ grpc.ServerStream
+ ctx context.Context
+}
+
+func (c *curatorWatchServer) Context() context.Context {
+ return c.ctx
+}
+
+func (c *curatorWatchServer) Send(m *apb.WatchEvent) error {
+ return c.ServerStream.SendMsg(m)
+}
+
+// Watch implements the Watch RPC from Curator by dispatching it against the
+// correct implementation for this curator instance.
+func (l *listener) Watch(req *apb.WatchRequest, srv apb.Curator_WatchServer) error {
+ return l.callImpl(srv.Context(), func(ctx context.Context, impl apb.CuratorServer) error {
+ return impl.Watch(req, &curatorWatchServer{
+ ServerStream: srv,
+ ctx: ctx,
+ })
+ })
+}
diff --git a/metropolis/node/core/curator/listener_test.go b/metropolis/node/core/curator/listener_test.go
new file mode 100644
index 0000000..0739d84
--- /dev/null
+++ b/metropolis/node/core/curator/listener_test.go
@@ -0,0 +1,106 @@
+package curator
+
+import (
+ "context"
+ "errors"
+ "io/ioutil"
+ "testing"
+
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/localstorage/declarative"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// TestListenerSwitch exercises the curator listener's
+// switch-to-different-implementation functionality, notably ensuring that the
+// correct implementation is called and that the context is canceled accordingly
+// on implementation switch.
+//
+// It does not test the gRPC listener socket itself and the actual
+// implementations - that is deferred to curator functionality tests.
+func TestListenerSwitch(t *testing.T) {
+ // Create ephemeral directory for curator and place it into /tmp.
+ dir := localstorage.EphemeralCuratorDirectory{}
+ // Force usage of /tmp as temp directory root, otherwsie TMPDIR from Bazel
+ // returns a path long enough that socket binds in the localstorage fail
+ // (as socket names are limited to 108 characters).
+ tmp, err := ioutil.TempDir("/tmp", "curator-test-*")
+ if err != nil {
+ t.Fatalf("TempDir: %v", err)
+ }
+ err = declarative.PlaceFS(&dir, tmp)
+ if err != nil {
+ t.Fatalf("PlaceFS: %v", err)
+ }
+
+ // Create test event value.
+ var val memory.Value
+
+ // Create DUT listener.
+ l := &listener{
+ etcd: nil,
+ directory: &dir,
+ electionWatch: func() electionWatcher {
+ return electionWatcher{
+ Watcher: val.Watch(),
+ }
+ },
+ dispatchC: make(chan dispatchRequest),
+ }
+
+ // Start listener under supervisor.
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+ supervisor.New(ctx, l.run)
+
+ // Begin with a follower.
+ val.Set(electionStatus{
+ follower: &electionStatusFollower{},
+ })
+
+ // Simulate a request context.
+ ctxR, ctxRC := context.WithCancel(ctx)
+
+ // Check that canceling the request unblocks a pending dispatched call.
+ errC := make(chan error)
+ go func() {
+ errC <- l.callImpl(ctxR, func(ctx context.Context, impl apb.CuratorServer) error {
+ <-ctx.Done()
+ return ctx.Err()
+ })
+ }()
+ ctxRC()
+ err = <-errC
+ if err == nil || !errors.Is(err, context.Canceled) {
+ t.Fatalf("callImpl context should have returned context error, got %v", err)
+ }
+
+ // Check that switching implementations unblocks a pending dispatched call.
+ scheduledC := make(chan struct{})
+ go func() {
+ errC <- l.callImpl(ctx, func(ctx context.Context, impl apb.CuratorServer) error {
+ close(scheduledC)
+ <-ctx.Done()
+ return ctx.Err()
+ })
+ }()
+ // Block until we actually start executing on the follower listener.
+ <-scheduledC
+ // Switch over to leader listener.
+ val.Set(electionStatus{
+ leader: &electionStatusLeader{},
+ })
+ // Check returned error.
+ err = <-errC
+ if err == nil {
+ t.Fatalf("callImpl context should have returned error, got nil")
+ }
+ if serr, ok := status.FromError(err); !ok || serr.Code() != codes.Unavailable {
+ t.Fatalf("callImpl context should have returned unavailable, got %v", err)
+ }
+}