curator: provisions for implementing multiple gRPC services
We want to run some other gRPC services on the Curator leader/follower
implementations other than just the Curator gRPC service.
This decouples the local types from implementing a particular gRPC
service (instead proxying through an interface) and splits out the
implementation of the Curator gRPC service from the main leader objects.
This should allow us to add an implementation of eg. a Management gRPC
service in a testable manner (the only thing we have to dependency
inject is the leadership struct, and that's trivial to do with a simple
etcd test server).
Change-Id: Ia0ea65e40a775bf49661d0b99c0185aa83547ed0
Reviewed-on: https://review.monogon.dev/c/monogon/+/260
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index aa9eb95..594989d 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -7,6 +7,7 @@
"curator.go",
"impl_follower.go",
"impl_leader.go",
+ "impl_leader_curator.go",
"listener.go",
"state_node.go",
"state_pki.go",
@@ -44,7 +45,6 @@
embed = [":go_default_library"],
deps = [
"//metropolis/node/core/consensus/client:go_default_library",
- "//metropolis/node/core/curator/proto/api:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/localstorage/declarative:go_default_library",
"//metropolis/pkg/event/memory:go_default_library",
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index d90727b..f42da61 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -10,17 +10,11 @@
"google.golang.org/grpc/status"
"source.monogon.dev/metropolis/node/core/consensus/client"
- apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- "source.monogon.dev/metropolis/pkg/event/etcd"
)
-// curatorLeader implements the curator acting as the elected leader of a
-// cluster. It performs direct reads/writes from/to etcd as long as it remains
-// leader.
-//
-// It effectively implements all the core management logic of a Metropolis
-// cluster.
-type curatorLeader struct {
+// leadership represents the curator leader's ability to perform actions as a
+// leader. It is available to all services implemented by the leader.
+type leadership struct {
// lockKey is the etcd key which backs this leader-elected instance.
lockKey string
// lockRev is the revision at which lockKey was created. The leader will use it
@@ -40,9 +34,9 @@
// txnAsLeader performs an etcd transaction guarded by continued leadership.
// lostLeadership will be returned as an error in case the leadership is lost.
-func (c *curatorLeader) txnAsLeader(ctx context.Context, ops ...clientv3.Op) (*clientv3.TxnResponse, error) {
- resp, err := c.etcd.Txn(ctx).If(
- clientv3.Compare(clientv3.CreateRevision(c.lockKey), "=", c.lockRev),
+func (l *leadership) txnAsLeader(ctx context.Context, ops ...clientv3.Op) (*clientv3.TxnResponse, error) {
+ resp, err := l.etcd.Txn(ctx).If(
+ clientv3.Compare(clientv3.CreateRevision(l.lockKey), "=", l.lockRev),
).Then(ops...).Commit()
if err != nil {
return nil, fmt.Errorf("when running leader transaction: %w", err)
@@ -63,44 +57,18 @@
return err, false
}
-func (l *curatorLeader) Watch(req *apb.WatchRequest, srv apb.Curator_WatchServer) error {
- nic, ok := req.Kind.(*apb.WatchRequest_NodeInCluster_)
- if !ok {
- return status.Error(codes.Unimplemented, "unsupported watch kind")
- }
- nodeID := nic.NodeInCluster.NodeId
- // Constructing arbitrary etcd path: this is okay, as we only have node objects
- // underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node
- // that doesn't exist, and that will just hang . All access is privileged, so
- // there's also no need to filter anything.
- // TODO(q3k): formalize and strongly type etcd paths for cluster state?
- // Probably worth waiting for type parameters before attempting to do that.
- nodePath := nodeEtcdPath(nodeID)
+// curatorLeader implements the curator acting as the elected leader of a
+// cluster. It performs direct reads/writes from/to etcd as long as it remains
+// leader.
+//
+// Its made up of different subcomponents implementing gRPC services, each of
+// which has access to the leadership structure.
+type curatorLeader struct {
+ leaderCurator
+}
- value := etcd.NewValue(l.etcd, nodePath, func(data []byte) (interface{}, error) {
- return nodeUnmarshal(data)
- })
- w := value.Watch()
- defer w.Close()
-
- for {
- v, err := w.Get(srv.Context())
- if err != nil {
- if rpcErr, ok := rpcError(err); ok {
- return rpcErr
- }
- }
- node := v.(*Node)
- ev := &apb.WatchEvent{
- Nodes: []*apb.Node{
- {
- Id: node.ID(),
- Roles: node.proto().Roles,
- },
- },
- }
- if err := srv.Send(ev); err != nil {
- return err
- }
+func newCuratorLeader(l leadership) *curatorLeader {
+ return &curatorLeader{
+ leaderCurator{l},
}
}
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
new file mode 100644
index 0000000..34dd62b
--- /dev/null
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -0,0 +1,61 @@
+package curator
+
+import (
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/pkg/event/etcd"
+)
+
+// leaderCurator implements the Curator gRPC API (cpb.Curator) as a curator
+// leader.
+type leaderCurator struct {
+ leadership
+}
+
+// Watch returns a stream of updates concerning some part of the cluster
+// managed by the curator.
+//
+// See metropolis.node.core.curator.proto.api.Curator for more information.
+func (l *leaderCurator) Watch(req *cpb.WatchRequest, srv cpb.Curator_WatchServer) error {
+ nic, ok := req.Kind.(*cpb.WatchRequest_NodeInCluster_)
+ if !ok {
+ return status.Error(codes.Unimplemented, "unsupported watch kind")
+ }
+ nodeID := nic.NodeInCluster.NodeId
+ // Constructing arbitrary etcd path: this is okay, as we only have node objects
+ // underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node
+ // that doesn't exist, and that will just hang . All access is privileged, so
+ // there's also no need to filter anything.
+ // TODO(q3k): formalize and strongly type etcd paths for cluster state?
+ // Probably worth waiting for type parameters before attempting to do that.
+ nodePath := nodeEtcdPath(nodeID)
+
+ value := etcd.NewValue(l.etcd, nodePath, func(data []byte) (interface{}, error) {
+ return nodeUnmarshal(data)
+ })
+ w := value.Watch()
+ defer w.Close()
+
+ for {
+ v, err := w.Get(srv.Context())
+ if err != nil {
+ if rpcErr, ok := rpcError(err); ok {
+ return rpcErr
+ }
+ }
+ node := v.(*Node)
+ ev := &cpb.WatchEvent{
+ Nodes: []*cpb.Node{
+ {
+ Id: node.ID(),
+ Roles: node.proto().Roles,
+ },
+ },
+ }
+ if err := srv.Send(ev); err != nil {
+ return err
+ }
+ }
+}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index ae158b9..f2a76f5 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -11,7 +11,7 @@
"google.golang.org/grpc/status"
"source.monogon.dev/metropolis/node/core/consensus/client"
- apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/pkg/combinectx"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -23,13 +23,17 @@
// - a follower implementation that forwards the RPCs over to a remote leader.
//
// Its goal is to make any switches over between leader and follower painless to
-// the gRPC callers.
-// Any pending calls will be canceled with UNAVAILABLE and an error message
-// describing the fact that the implementation has been switched over.
-// The gRPC sockets will always be listening for connections, and block until
-// able to serve a request (either locally or by forwarding).
-// No retries will be attempted on switchover, as some calls might not be
-// idempotent and the caller is better equipped to know when to retry.
+// the gRPC callers. Each incoming RPC first goes into a shim defined directly
+// on the listener, then goes on to be passed into either implementation with a
+// context that is valid as long as that implementation is current.
+//
+// Any calls which are pending during a switchover will have their context
+// canceled with UNAVAILABLE and an error message describing the fact that the
+// implementation has been switched over. The gRPC sockets will always be
+// listening for connections, and block until able to serve a request (either
+// locally or by forwarding). No retries will be attempted on switchover, as
+// some calls might not be idempotent and the caller is better equipped to know
+// when to retry.
type listener struct {
// etcd is a client to the locally running consensus (etcd) server which is used
// both for storing lock/leader election status and actual Curator data.
@@ -105,6 +109,12 @@
}
}
+// services is the interface containing all gRPC services that a curator
+// must implement.
+type services interface {
+ cpb.CuratorServer
+}
+
// activeTarget is the active implementation used by the listener dispatcher, or
// nil if none is active yet.
type activeTarget struct {
@@ -114,7 +124,7 @@
// context cancel function for ctx, or nil if ctx is nil.
ctxC *context.CancelFunc
// active Curator implementation, or nil if not yet set up.
- impl apb.CuratorServer
+ impl services
}
// switchTo switches the activeTarget over to a Curator implementation as per
@@ -129,11 +139,11 @@
t.ctxC = &implCtxC
if leader := status.leader; leader != nil {
supervisor.Logger(ctx).Info("Dispatcher switching over to local leader")
- t.impl = &curatorLeader{
+ t.impl = newCuratorLeader(leadership{
lockKey: leader.lockKey,
lockRev: leader.lockRev,
etcd: l.etcd,
- }
+ })
} else {
supervisor.Logger(ctx).Info("Dispatcher switching over to follower")
t.impl = &curatorFollower{}
@@ -154,7 +164,7 @@
ctx context.Context
// impl is the CuratorServer implementation to which RPCs should be directed
// according to the dispatcher.
- impl apb.CuratorServer
+ impl services
}
// dispatch contacts the dispatcher to retrieve an up-to-date listenerTarget.
@@ -197,7 +207,7 @@
// TODO(q3k): run remote/public gRPC listener.
srv := grpc.NewServer()
- apb.RegisterCuratorServer(srv, l)
+ cpb.RegisterCuratorServer(srv, l)
if err := supervisor.Run(ctx, "local", supervisor.GRPCServer(srv, lis, true)); err != nil {
return fmt.Errorf("while starting local gRPC listener: %w", err)
@@ -210,13 +220,25 @@
return ctx.Err()
}
+// implOperation is a function passed to callImpl by a listener RPC shim. It
+// sets up and calls the appropriate RPC for the shim that is it's being used in.
+//
+// Each gRPC service exposed by the Curator is implemented directly on the
+// listener as a shim, and that shim uses callImpl to execute the correct,
+// current (leader or follower) RPC call. The implOperation is defined inline in
+// each shim to perform that call, and received the context and implementation
+// reflecting the current active implementation (leader/follower). Errors
+// returned are either returned directly or converted to an UNAVAILABLE status
+// if the error is as a result of the context being canceled due to the
+// implementation switching.
+type implOperation func(ctx context.Context, impl services) error
+
// callImpl gets the newest listenerTarget from the dispatcher, combines the
// given context with the context of the listenerTarget implementation and calls
// the given function with the combined context and implementation.
//
-// It is effectively a helper wrapper used by the Curator implementation of the
-// listener to run the RPC against the active listenerTarget.
-func (l *listener) callImpl(ctx context.Context, f func(ctx context.Context, impl apb.CuratorServer) error) error {
+// It's called by listener RPC shims.
+func (l *listener) callImpl(ctx context.Context, op implOperation) error {
lt, err := l.dispatch(ctx)
// dispatch will only return errors on context cancellations.
if err != nil {
@@ -224,7 +246,7 @@
}
ctxCombined := combinectx.Combine(ctx, lt.ctx)
- err = f(ctxCombined, lt.impl)
+ err = op(ctxCombined, lt.impl)
// No error occurred? Nothing else to do.
if err == nil {
@@ -247,9 +269,14 @@
}
}
-// curatorWatchServer implements Curator_WatchServer but overrides the context
-// of the streaming RPC call with some other context (in this case, the combined
-// context from callImpl).
+// RPC shims start here. Each method defined below is a gRPC RPC handler which
+// uses callImpl to forward the incoming RPC into the current implementation of
+// the curator (leader or follower).
+//
+// TODO(q3k): once Go 1.18 lands, simplify this using type arguments (Generics).
+
+// curatorWatchServer is a Curator_WatchServer but shimmed to use an expiring
+// context.
type curatorWatchServer struct {
grpc.ServerStream
ctx context.Context
@@ -259,17 +286,16 @@
return c.ctx
}
-func (c *curatorWatchServer) Send(m *apb.WatchEvent) error {
+func (c *curatorWatchServer) Send(m *cpb.WatchEvent) error {
return c.ServerStream.SendMsg(m)
}
-// Watch implements the Watch RPC from Curator by dispatching it against the
-// correct implementation for this curator instance.
-func (l *listener) Watch(req *apb.WatchRequest, srv apb.Curator_WatchServer) error {
- return l.callImpl(srv.Context(), func(ctx context.Context, impl apb.CuratorServer) error {
+func (l *listener) Watch(req *cpb.WatchRequest, srv cpb.Curator_WatchServer) error {
+ proxy := func(ctx context.Context, impl services) error {
return impl.Watch(req, &curatorWatchServer{
ServerStream: srv,
ctx: ctx,
})
- })
+ }
+ return l.callImpl(srv.Context(), proxy)
}
diff --git a/metropolis/node/core/curator/listener_test.go b/metropolis/node/core/curator/listener_test.go
index 7e998da..95afe85 100644
--- a/metropolis/node/core/curator/listener_test.go
+++ b/metropolis/node/core/curator/listener_test.go
@@ -9,7 +9,6 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/localstorage/declarative"
"source.monogon.dev/metropolis/pkg/event/memory"
@@ -71,7 +70,7 @@
// Check that canceling the request unblocks a pending dispatched call.
errC := make(chan error)
go func() {
- errC <- l.callImpl(ctxR, func(ctx context.Context, impl apb.CuratorServer) error {
+ errC <- l.callImpl(ctxR, func(ctx context.Context, impl services) error {
<-ctx.Done()
return ctx.Err()
})
@@ -85,7 +84,7 @@
// Check that switching implementations unblocks a pending dispatched call.
scheduledC := make(chan struct{})
go func() {
- errC <- l.callImpl(ctx, func(ctx context.Context, impl apb.CuratorServer) error {
+ errC <- l.callImpl(ctx, func(ctx context.Context, impl services) error {
close(scheduledC)
<-ctx.Done()
return ctx.Err()
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index e15cd6c..106bc23 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -32,6 +32,7 @@
"golang.org/x/sys/unix"
"google.golang.org/grpc"
+
common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/cluster"
"source.monogon.dev/metropolis/node/core/curator"