m/n/c/rpc: replace SetupExternalGRPC with an option generator
This is one step closer to making interactions with gRPC not magic.
We've done a similar cleanup on the client side, now we do it on server
side too.
Change-Id: I6b7d7767044db47ab6b0660fd985723a91607f71
Reviewed-on: https://review.monogon.dev/c/monogon/+/687
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index af0e11f..8d150b4 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -40,6 +40,7 @@
"@go_googleapis//google/api/expr/v1alpha1:expr_go_proto",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
+ "@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//proto",
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index cf16af8..15c05dc 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -139,11 +139,14 @@
// Create a curator gRPC server which performs authentication as per the created
// ServerSecurity and is backed by the created leader.
- externalSrv := sec.SetupExternalGRPC(nil, leader)
+ srv := grpc.NewServer(sec.GRPCOptions(nil)...)
+ ipb.RegisterCuratorServer(srv, leader)
+ apb.RegisterAAAServer(srv, leader)
+ apb.RegisterManagementServer(srv, leader)
// The gRPC server will listen on an internal 'loopback' buffer.
externalLis := bufconn.Listen(1024 * 1024)
go func() {
- if err := externalSrv.Serve(externalLis); err != nil {
+ if err := srv.Serve(externalLis); err != nil {
t.Fatalf("GRPC serve failed: %v", err)
}
}()
@@ -151,7 +154,7 @@
// Stop the gRPC server on context cancel.
go func() {
<-ctx.Done()
- externalSrv.Stop()
+ srv.Stop()
}()
withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 5af644b..8718efd 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -5,12 +5,16 @@
"fmt"
"net"
+ "google.golang.org/grpc"
+
"source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/consensus/client"
+ cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/supervisor"
+ apb "source.monogon.dev/metropolis/proto/api"
)
// listener is the curator runnable responsible for listening for gRPC
@@ -83,7 +87,13 @@
etcd: l.etcd,
consensus: l.consensus,
}, &l.node.Node)
- runnable := supervisor.GRPCServer(sec.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), leader), lis, true)
+ logger := supervisor.MustSubLogger(ctx, "rpc")
+ srv := grpc.NewServer(sec.GRPCOptions(logger)...)
+ cpb.RegisterCuratorServer(srv, leader)
+ apb.RegisterAAAServer(srv, leader)
+ apb.RegisterManagementServer(srv, leader)
+ runnable := supervisor.GRPCServer(srv, lis, true)
+
if err := supervisor.Run(ctx, "server", runnable); err != nil {
return fmt.Errorf("could not run server: %w", err)
}
@@ -98,15 +108,18 @@
}
}
case st.follower != nil && st.follower.lock != nil:
- supervisor.Logger(ctx).Infof("This curator is a follower (leader is %q), starting proxy.", st.follower.lock.NodeId)
+ supervisor.Logger(ctx).Infof("This curator is a follower (leader is %q), starting minimal implementation.", st.follower.lock.NodeId)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", node.CuratorServicePort))
if err != nil {
return fmt.Errorf("failed to listen on curator socket: %w", err)
}
defer lis.Close()
- follower := &curatorFollower{}
- runnable := supervisor.GRPCServer(sec.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), follower), lis, true)
+ logger := supervisor.MustSubLogger(ctx, "rpc")
+ srv := grpc.NewServer(sec.GRPCOptions(logger)...)
+ // Note: curatorFollower not created nor registered. All RPCs will respond with
+ // 'Unimplemented'.
+ runnable := supervisor.GRPCServer(srv, lis, true)
if err := supervisor.Run(ctx, "server", runnable); err != nil {
return fmt.Errorf("could not run server: %w", err)
}
diff --git a/metropolis/node/core/rpc/resolver_test.go b/metropolis/node/core/rpc/resolver_test.go
index 5acca74..8231273 100644
--- a/metropolis/node/core/rpc/resolver_test.go
+++ b/metropolis/node/core/rpc/resolver_test.go
@@ -90,7 +90,10 @@
ss := ServerSecurity{
NodeCredentials: eph.Nodes[i],
}
- servers[i] = ss.SetupExternalGRPC(nil, impls[i])
+ servers[i] = grpc.NewServer(ss.GRPCOptions(nil)...)
+ ipb.RegisterCuratorServer(servers[i], impls[i])
+ apb.RegisterAAAServer(servers[i], impls[i])
+ apb.RegisterManagementServer(servers[i], impls[i])
go func() {
if err := servers[i].Serve(listeners[i]); err != nil {
t.Fatalf("GRPC serve failed: %v", err)
diff --git a/metropolis/node/core/rpc/server_authentication.go b/metropolis/node/core/rpc/server_authentication.go
index eccf006..83afad2 100644
--- a/metropolis/node/core/rpc/server_authentication.go
+++ b/metropolis/node/core/rpc/server_authentication.go
@@ -12,10 +12,8 @@
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
- cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/pkg/logtree"
- apb "source.monogon.dev/metropolis/proto/api"
)
// ServerSecurity are the security options of a RPC server that will run
@@ -32,30 +30,24 @@
nodePermissions Permissions
}
-// SetupExternalGRPC returns a grpc.Server ready to listen and serve all gRPC
-// services that the cluster server implementation should run, with all calls
-// authenticated and authorized based on the data in ServerSecurity. The
-// argument 'impls' is the object implementing the gRPC APIs.
+// GRPCOptions returns a list of gRPC ServerOptions used to run a Metropolis
+// gRPC server with security and logging middleware enabled.
//
// Under the hood, this configures gRPC interceptors that verify
// metropolis.proto.ext.authorization options and authenticate/authorize
// incoming connections. It also runs the gRPC server with the correct TLS
// settings for authenticating itself to callers.
-func (s *ServerSecurity) SetupExternalGRPC(logger logtree.LeveledLogger, impls ClusterServices) *grpc.Server {
+func (s *ServerSecurity) GRPCOptions(logger logtree.LeveledLogger) []grpc.ServerOption {
externalCreds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{s.NodeCredentials.TLSCredentials()},
ClientAuth: tls.RequestClientCert,
})
- srv := grpc.NewServer(
+ return []grpc.ServerOption{
grpc.Creds(externalCreds),
grpc.UnaryInterceptor(s.unaryInterceptor(logger)),
grpc.StreamInterceptor(s.streamInterceptor(logger)),
- )
- cpb.RegisterCuratorServer(srv, impls)
- apb.RegisterAAAServer(srv, impls)
- apb.RegisterManagementServer(srv, impls)
- return srv
+ }
}
// streamInterceptor returns a gRPC StreamInterceptor interface for use with
diff --git a/metropolis/node/core/rpc/server_authentication_test.go b/metropolis/node/core/rpc/server_authentication_test.go
index 247479e..a229927 100644
--- a/metropolis/node/core/rpc/server_authentication_test.go
+++ b/metropolis/node/core/rpc/server_authentication_test.go
@@ -43,7 +43,10 @@
}
impl := &testImplementation{}
- srv := ss.SetupExternalGRPC(nil, impl)
+ srv := grpc.NewServer(ss.GRPCOptions(nil)...)
+ cpb.RegisterCuratorServer(srv, impl)
+ apb.RegisterManagementServer(srv, impl)
+ apb.RegisterAAAServer(srv, impl)
lis := bufconn.Listen(1024 * 1024)
go func() {
if err := srv.Serve(lis); err != nil {