m/n/core/rpc: remove leftover local/external listener abstractions
This continues cleanup work after review.monogon.dev/624.
Change-Id: Ic38f4547627d382a4405cf4b3336aa7cac80849b
Reviewed-on: https://review.monogon.dev/c/monogon/+/629
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 174b53c..60b1d70 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -111,7 +111,7 @@
}
// Create security interceptors for gRPC listener.
- externalSec := &rpc.ExternalServerSecurity{
+ sec := &rpc.ServerSecurity{
NodeCredentials: nodeCredentials,
}
@@ -125,8 +125,8 @@
}, &nodeCredentials.Node)
// Create a curator gRPC server which performs authentication as per the created
- // listenerSecurity and is backed by the created leader.
- externalSrv := externalSec.SetupExternalGRPC(nil, leader)
+ // ServerSecurity and is backed by the created leader.
+ externalSrv := sec.SetupExternalGRPC(nil, leader)
// The gRPC server will listen on an internal 'loopback' buffer.
externalLis := bufconn.Listen(1024 * 1024)
go func() {
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index a8d7e42..705512d 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -124,7 +124,7 @@
// context cancel function for ctx, or nil if ctx is nil.
ctxC *context.CancelFunc
// active Curator implementation, or nil if not yet set up.
- impl rpc.ClusterExternalServices
+ impl rpc.ClusterServices
}
// switchTo switches the activeTarget over to a Curator implementation as per
@@ -172,7 +172,7 @@
ctx context.Context
// impl is the CuratorServer implementation to which RPCs should be directed
// according to the dispatcher.
- impl rpc.ClusterExternalServices
+ impl rpc.ClusterServices
}
// dispatch contacts the dispatcher to retrieve an up-to-date listenerTarget.
@@ -206,7 +206,7 @@
return fmt.Errorf("when starting dispatcher: %w", err)
}
- es := rpc.ExternalServerSecurity{
+ sec := rpc.ServerSecurity{
NodeCredentials: l.node,
}
@@ -217,7 +217,7 @@
}
defer lisExternal.Close()
- runnable := supervisor.GRPCServer(es.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), l), lisExternal, true)
+ runnable := supervisor.GRPCServer(sec.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), l), lisExternal, true)
return runnable(ctx)
})
if err != nil {
@@ -243,7 +243,7 @@
// returned are either returned directly or converted to an UNAVAILABLE status
// if the error is as a result of the context being canceled due to the
// implementation switching.
-type implOperation func(ctx context.Context, impl rpc.ClusterExternalServices) error
+type implOperation func(ctx context.Context, impl rpc.ClusterServices) error
// callImpl gets the newest listenerTarget from the dispatcher, combines the
// given context with the context of the listenerTarget implementation and calls
@@ -303,7 +303,7 @@
}
func (l *listener) Watch(req *cpb.WatchRequest, srv cpb.Curator_WatchServer) error {
- proxy := func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ proxy := func(ctx context.Context, impl rpc.ClusterServices) error {
return impl.Watch(req, &curatorWatchServer{
ServerStream: srv,
ctx: ctx,
@@ -334,7 +334,7 @@
}
func (l *listener) Escrow(srv apb.AAA_EscrowServer) error {
- return l.callImpl(srv.Context(), func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ return l.callImpl(srv.Context(), func(ctx context.Context, impl rpc.ClusterServices) error {
return impl.Escrow(&aaaEscrowServer{
ServerStream: srv,
ctx: ctx,
@@ -343,7 +343,7 @@
}
func (l *listener) GetRegisterTicket(ctx context.Context, req *apb.GetRegisterTicketRequest) (res *apb.GetRegisterTicketResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
var err2 error
res, err2 = impl.GetRegisterTicket(ctx, req)
return err2
@@ -352,7 +352,7 @@
}
func (l *listener) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (res *cpb.UpdateNodeStatusResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
var err2 error
res, err2 = impl.UpdateNodeStatus(ctx, req)
return err2
@@ -361,7 +361,7 @@
}
func (l *listener) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (res *apb.GetClusterInfoResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
var err2 error
res, err2 = impl.GetClusterInfo(ctx, req)
return err2
@@ -370,7 +370,7 @@
}
func (l *listener) RegisterNode(ctx context.Context, req *cpb.RegisterNodeRequest) (res *cpb.RegisterNodeResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
var err2 error
res, err2 = impl.RegisterNode(ctx, req)
return err2
@@ -379,7 +379,7 @@
}
func (l *listener) CommitNode(ctx context.Context, req *cpb.CommitNodeRequest) (res *cpb.CommitNodeResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
var err2 error
res, err2 = impl.CommitNode(ctx, req)
return err2
@@ -401,7 +401,7 @@
}
func (l *listener) GetNodes(req *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
- proxy := func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ proxy := func(ctx context.Context, impl rpc.ClusterServices) error {
return impl.GetNodes(req, &managementGetNodesServer{
ServerStream: srv,
ctx: ctx,
@@ -411,7 +411,7 @@
}
func (l *listener) ApproveNode(ctx context.Context, req *apb.ApproveNodeRequest) (res *apb.ApproveNodeResponse, err error) {
- err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
var err2 error
res, err2 = impl.ApproveNode(ctx, req)
return err2
diff --git a/metropolis/node/core/curator/listener_test.go b/metropolis/node/core/curator/listener_test.go
index ccd094c..3961bf8 100644
--- a/metropolis/node/core/curator/listener_test.go
+++ b/metropolis/node/core/curator/listener_test.go
@@ -57,7 +57,7 @@
// Check that canceling the request unblocks a pending dispatched call.
errC := make(chan error)
go func() {
- errC <- l.callImpl(ctxR, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ errC <- l.callImpl(ctxR, func(ctx context.Context, impl rpc.ClusterServices) error {
<-ctx.Done()
return ctx.Err()
})
@@ -71,7 +71,7 @@
// Check that switching implementations unblocks a pending dispatched call.
scheduledC := make(chan struct{})
go func() {
- errC <- l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ errC <- l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterServices) error {
close(scheduledC)
<-ctx.Done()
return ctx.Err()
diff --git a/metropolis/node/core/rpc/BUILD.bazel b/metropolis/node/core/rpc/BUILD.bazel
index e749968..4d83dfe 100644
--- a/metropolis/node/core/rpc/BUILD.bazel
+++ b/metropolis/node/core/rpc/BUILD.bazel
@@ -8,7 +8,6 @@
"peerinfo.go",
"server.go",
"server_authentication.go",
- "server_interceptors.go",
"testhelpers.go",
"trace.go",
],
diff --git a/metropolis/node/core/rpc/client.go b/metropolis/node/core/rpc/client.go
index 6f113e2..5ff71be 100644
--- a/metropolis/node/core/rpc/client.go
+++ b/metropolis/node/core/rpc/client.go
@@ -132,19 +132,8 @@
return grpc.Dial(remote, opts...)
}
-func NewNodeClient(remote string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
- opts = append(opts, grpc.WithInsecure())
- return grpc.Dial(remote, opts...)
-}
-
func NewAuthenticatedClientTest(listener *bufconn.Listener, cert tls.Certificate, ca *x509.Certificate) (*grpc.ClientConn, error) {
return NewAuthenticatedClient("local", cert, ca, grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
return listener.Dial()
}))
}
-
-func NewNodeClientTest(listener *bufconn.Listener) (*grpc.ClientConn, error) {
- return NewNodeClient("local", grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
- return listener.Dial()
- }))
-}
diff --git a/metropolis/node/core/rpc/server.go b/metropolis/node/core/rpc/server.go
index 0e823f4..97cb3ec 100644
--- a/metropolis/node/core/rpc/server.go
+++ b/metropolis/node/core/rpc/server.go
@@ -16,23 +16,11 @@
}
)
-// ClusterExternalServices is the interface containing all gRPC services that a
-// Metropolis Cluster implements on its external interface. With the current
-// implementation of Metropolis, this is all implemented by the Curator.
-type ClusterExternalServices interface {
+// ClusterServices is the interface containing all gRPC services that a
+// Metropolis Cluster implements. With the current implementation of Metropolis,
+// this is all implemented by the Curator.
+type ClusterServices interface {
cpb.CuratorServer
apb.AAAServer
apb.ManagementServer
}
-
-// ClusterInternalServices is the interface containing all gRPC services that a
-// Metropolis Cluster implements on its internal interface. Currently this is
-// just the Curator service.
-type ClusterInternalServices interface {
- cpb.CuratorServer
-}
-
-type ClusterServices interface {
- ClusterExternalServices
- ClusterInternalServices
-}
diff --git a/metropolis/node/core/rpc/server_authentication.go b/metropolis/node/core/rpc/server_authentication.go
index 82cdf9c..eccf006 100644
--- a/metropolis/node/core/rpc/server_authentication.go
+++ b/metropolis/node/core/rpc/server_authentication.go
@@ -18,64 +18,11 @@
apb "source.monogon.dev/metropolis/proto/api"
)
-// authenticationStrategy is implemented by ExternalServerSecurity. Historically
-// it has also been implemented by LocalServerSecurity (listening on a local
-// domain socket), but this implementation has since been removed.
-//
-// TODO(q3k): simplify this code and remove this interface now that there's only
-// ExternalServerSecurity.
-type authenticationStrategy interface {
- // getPeerInfo will be called by the stream and unary gRPC server interceptors
- // to authenticate incoming gRPC calls. It's given the gRPC context of the call
- // (therefore allowing access to information about the underlying gRPC
- // transport), and should return a PeerInfo structure describing the
- // authenticated other end of the connection, or a gRPC status if the other
- // side could not be successfully authenticated.
- //
- // The returned PeerInfo will then be used to perform authorization checks based
- // on the configured authentication of a given gRPC method, as described by the
- // metropolis.proto.ext.authorization extension. The same PeerInfo will then be
- // available to the gRPC handler for this method by retrieving it from the
- // context (via GetPeerInfo).
- getPeerInfo(ctx context.Context) (*PeerInfo, error)
-
- // getPeerInfoUnauthenticated is an equivalent to getPeerInfo, but called by the
- // interceptors when a method is marked as 'unauthenticated'. The implementation
- // should return a PeerInfo containing Unauthenticated, potentially populating
- // it with UnauthenticatedPublicKey if such a public key could be retrieved.
- getPeerInfoUnauthenticated(ctx context.Context) (*PeerInfo, error)
-}
-
-// authenticationCheck is called by the unary and server interceptors to perform
-// authentication and authorization checks for a given RPC, calling the
-// serverInterceptors' authenticate function if needed.
-func authenticationCheck(ctx context.Context, a authenticationStrategy, methodName string) (*PeerInfo, error) {
- mi, err := getMethodInfo(methodName)
- if err != nil {
- return nil, err
- }
-
- if mi.unauthenticated {
- return a.getPeerInfoUnauthenticated(ctx)
- }
-
- pi, err := a.getPeerInfo(ctx)
- if err != nil {
- return nil, err
- }
- if err := pi.CheckPermissions(mi.need); err != nil {
- return nil, err
- }
- return pi, nil
-}
-
// ServerSecurity are the security options of a RPC server that will run
// ClusterServices on a Metropolis node. It contains all the data for the
// server implementation to authenticate itself to the clients and authenticate
// and authorize clients connecting to it.
-//
-// It implements authenticationStrategy.
-type ExternalServerSecurity struct {
+type ServerSecurity struct {
// NodeCredentials which will be used to run the gRPC server, and whose CA
// certificate will be used to authenticate incoming requests.
NodeCredentials *identity.NodeCredentials
@@ -85,47 +32,143 @@
nodePermissions Permissions
}
-// SetupExternalGRPC returns a grpc.Server ready to listen and serve all external
-// gRPC APIs that the cluster server implementation should run, with all calls
-// being authenticated and authorized based on the data in ServerSecurity. The
+// SetupExternalGRPC returns a grpc.Server ready to listen and serve all gRPC
+// services that the cluster server implementation should run, with all calls
+// authenticated and authorized based on the data in ServerSecurity. The
// argument 'impls' is the object implementing the gRPC APIs.
//
-// This effectively configures gRPC interceptors that verify
+// Under the hood, this configures gRPC interceptors that verify
// metropolis.proto.ext.authorization options and authenticate/authorize
// incoming connections. It also runs the gRPC server with the correct TLS
// settings for authenticating itself to callers.
-func (l *ExternalServerSecurity) SetupExternalGRPC(logger logtree.LeveledLogger, impls ClusterExternalServices) *grpc.Server {
+func (s *ServerSecurity) SetupExternalGRPC(logger logtree.LeveledLogger, impls ClusterServices) *grpc.Server {
externalCreds := credentials.NewTLS(&tls.Config{
- Certificates: []tls.Certificate{l.NodeCredentials.TLSCredentials()},
+ Certificates: []tls.Certificate{s.NodeCredentials.TLSCredentials()},
ClientAuth: tls.RequestClientCert,
})
- s := grpc.NewServer(
+ srv := grpc.NewServer(
grpc.Creds(externalCreds),
- grpc.UnaryInterceptor(unaryInterceptor(logger, l)),
- grpc.StreamInterceptor(streamInterceptor(logger, l)),
+ grpc.UnaryInterceptor(s.unaryInterceptor(logger)),
+ grpc.StreamInterceptor(s.streamInterceptor(logger)),
)
- cpb.RegisterCuratorServer(s, impls)
- apb.RegisterAAAServer(s, impls)
- apb.RegisterManagementServer(s, impls)
- return s
+ cpb.RegisterCuratorServer(srv, impls)
+ apb.RegisterAAAServer(srv, impls)
+ apb.RegisterManagementServer(srv, impls)
+ return srv
}
-func (l *ExternalServerSecurity) getPeerInfo(ctx context.Context) (*PeerInfo, error) {
+// streamInterceptor returns a gRPC StreamInterceptor interface for use with
+// grpc.NewServer. It's applied to gRPC servers started within Metropolis,
+// notably to the Curator.
+func (s *ServerSecurity) streamInterceptor(logger logtree.LeveledLogger) grpc.StreamServerInterceptor {
+ return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ var span *logtreeSpan
+ if logger != nil {
+ span = newLogtreeSpan(logger)
+ span.Printf("RPC invoked: streaming request: %s", info.FullMethod)
+ ss = &spanServerStream{
+ ServerStream: ss,
+ span: span,
+ }
+ }
+
+ pi, err := s.authenticationCheck(ss.Context(), info.FullMethod)
+ if err != nil {
+ if s != nil {
+ span.Printf("RPC send: authentication failed: %v", err)
+ }
+ return err
+ }
+ if span != nil {
+ span.Printf("RPC peerInfo: %s", pi.String())
+ }
+
+ return handler(srv, pi.serverStream(ss))
+ }
+}
+
+// unaryInterceptor returns a gRPC UnaryInterceptor interface for use with
+// grpc.NewServer. It's applied to gRPC servers started within Metropolis,
+// notably to the Curator.
+func (s *ServerSecurity) unaryInterceptor(logger logtree.LeveledLogger) grpc.UnaryServerInterceptor {
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
+ // Inject span if we have a logger.
+ if logger != nil {
+ ctx = contextWithSpan(ctx, newLogtreeSpan(logger))
+ }
+
+ Trace(ctx).Printf("RPC invoked: unary request: %s", info.FullMethod)
+
+ // Perform authentication check and inject PeerInfo.
+ pi, err := s.authenticationCheck(ctx, info.FullMethod)
+ if err != nil {
+ Trace(ctx).Printf("RPC send: authentication failed: %v", err)
+ return nil, err
+ }
+ ctx = pi.apply(ctx)
+
+ // Log authentication information.
+ Trace(ctx).Printf("RPC peerInfo: %s", pi.String())
+
+ // Call underlying handler.
+ resp, err = handler(ctx, req)
+
+ // Log result into span.
+ if err != nil {
+ Trace(ctx).Printf("RPC send: error: %v", err)
+ } else {
+ Trace(ctx).Printf("RPC send: ok, %s", protoMessagePretty(resp))
+ }
+ return
+ }
+}
+
+// authenticationCheck is called by the unary and server interceptors to perform
+// authentication and authorization checks for a given RPC.
+func (s *ServerSecurity) authenticationCheck(ctx context.Context, methodName string) (*PeerInfo, error) {
+ mi, err := getMethodInfo(methodName)
+ if err != nil {
+ return nil, err
+ }
+
+ if mi.unauthenticated {
+ return s.getPeerInfoUnauthenticated(ctx)
+ }
+
+ pi, err := s.getPeerInfo(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if err := pi.CheckPermissions(mi.need); err != nil {
+ return nil, err
+ }
+ return pi, nil
+}
+
+// getPeerInfo is be called by authenticationCheck to authenticate incoming gRPC
+// calls. It returns PeerInfo structure describing the authenticated other end
+// of the connection, or a gRPC status if the other side could not be
+// successfully authenticated.
+//
+// The returned PeerInfo can then be used to perform authorization checks based
+// on the configured authentication of a given gRPC method, as described by the
+// metropolis.proto.ext.authorization extension.
+func (s *ServerSecurity) getPeerInfo(ctx context.Context) (*PeerInfo, error) {
cert, err := getPeerCertificate(ctx)
if err != nil {
return nil, err
}
// Ensure that the certificate is signed by the cluster CA.
- if err := cert.CheckSignatureFrom(l.NodeCredentials.ClusterCA()); err != nil {
+ if err := cert.CheckSignatureFrom(s.NodeCredentials.ClusterCA()); err != nil {
return nil, status.Errorf(codes.Unauthenticated, "certificate not signed by cluster CA: %v", err)
}
- nodepk, errNode := identity.VerifyNodeInCluster(cert, l.NodeCredentials.ClusterCA())
+ nodepk, errNode := identity.VerifyNodeInCluster(cert, s.NodeCredentials.ClusterCA())
if errNode == nil {
// This is a Metropolis node.
- np := l.nodePermissions
+ np := s.nodePermissions
if np == nil {
np = nodePermissions
}
@@ -137,7 +180,7 @@
}, nil
}
- userid, errUser := identity.VerifyUserInCluster(cert, l.NodeCredentials.ClusterCA())
+ userid, errUser := identity.VerifyUserInCluster(cert, s.NodeCredentials.ClusterCA())
if errUser == nil {
// This is a Metropolis user/manager.
return &PeerInfo{
@@ -151,7 +194,11 @@
return nil, status.Errorf(codes.Unauthenticated, "presented certificate is neither user certificate (%v) nor node certificate (%v)", errUser, errNode)
}
-func (l *ExternalServerSecurity) getPeerInfoUnauthenticated(ctx context.Context) (*PeerInfo, error) {
+// getPeerInfoUnauthenticated is an equivalent to getPeerInfo, but called when a
+// method is marked as 'unauthenticated'. The implementation should return a
+// PeerInfo containing Unauthenticated, potentially populating it with
+// UnauthenticatedPublicKey if such a public key could be retrieved.
+func (s *ServerSecurity) getPeerInfoUnauthenticated(ctx context.Context) (*PeerInfo, error) {
res := PeerInfo{
Unauthenticated: &PeerInfoUnauthenticated{},
}
diff --git a/metropolis/node/core/rpc/server_authentication_test.go b/metropolis/node/core/rpc/server_authentication_test.go
index f0ae6a2..559d4fa 100644
--- a/metropolis/node/core/rpc/server_authentication_test.go
+++ b/metropolis/node/core/rpc/server_authentication_test.go
@@ -35,7 +35,7 @@
for k, v := range nodePermissions {
permissions[k] = v
}
- ss := ExternalServerSecurity{
+ ss := ServerSecurity{
NodeCredentials: eph.Nodes[0],
nodePermissions: permissions,
}
diff --git a/metropolis/node/core/rpc/server_interceptors.go b/metropolis/node/core/rpc/server_interceptors.go
deleted file mode 100644
index 1df8e0a..0000000
--- a/metropolis/node/core/rpc/server_interceptors.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package rpc
-
-import (
- "context"
-
- "google.golang.org/grpc"
-
- "source.monogon.dev/metropolis/pkg/logtree"
-)
-
-// stream implements the gRPC StreamInterceptor interface for use with
-// grpc.NewServer, based on an authenticationStrategy. It's applied to gRPC
-// servers started within Metropolis, notably to the Curator.
-func streamInterceptor(logger logtree.LeveledLogger, a authenticationStrategy) grpc.StreamServerInterceptor {
- return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- var s *logtreeSpan
- if logger != nil {
- s = newLogtreeSpan(logger)
- s.Printf("RPC invoked: streaming request: %s", info.FullMethod)
- ss = &spanServerStream{
- ServerStream: ss,
- span: s,
- }
- }
-
- pi, err := authenticationCheck(ss.Context(), a, info.FullMethod)
- if err != nil {
- if s != nil {
- s.Printf("RPC send: authentication failed: %v", err)
- }
- return err
- }
- if s != nil {
- s.Printf("RPC peerInfo: %s", pi.String())
- }
-
- return handler(srv, pi.serverStream(ss))
- }
-}
-
-// unaryInterceptor implements the gRPC UnaryInterceptor interface for use with
-// grpc.NewServer, based on an authenticationStrategy. It's applied to gRPC
-// servers started within Metropolis, notably to the Curator.
-func unaryInterceptor(logger logtree.LeveledLogger, a authenticationStrategy) grpc.UnaryServerInterceptor {
- return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
- // Inject span if we have a logger.
- if logger != nil {
- ctx = contextWithSpan(ctx, newLogtreeSpan(logger))
- }
-
- Trace(ctx).Printf("RPC invoked: unary request: %s", info.FullMethod)
-
- // Perform authentication check and inject PeerInfo.
- pi, err := authenticationCheck(ctx, a, info.FullMethod)
- if err != nil {
- Trace(ctx).Printf("RPC send: authentication failed: %v", err)
- return nil, err
- }
- ctx = pi.apply(ctx)
-
- // Log authentication information.
- Trace(ctx).Printf("RPC peerInfo: %s", pi.String())
-
- // Call underlying handler.
- resp, err = handler(ctx, req)
-
- // Log result into span.
- if err != nil {
- Trace(ctx).Printf("RPC send: error: %v", err)
- } else {
- Trace(ctx).Printf("RPC send: ok, %s", protoMessagePretty(resp))
- }
- return
- }
-}