m/n/c/rpc: implement Span/Trace

This is a first pass at implementing basic support for
Dapper/OpenTracing/OpenTelemetry-style tracing within Metropolis RPCs.

More precisely, this implements an API to expose an RPC-local Span to
RPC handlers (unary and streaming). These Spans are currently backed by
a logtree logger, and aren't processed further (ie. there's no support
for child spans and carrying span information over the wire when
performing remote calls from an active Span). However, this allows us to
at least start emitting Span Events and use them for debugging purposes.

Since we don't yet have OpenTelemetry in our GOPATH, we reimplement a
minimum subset of the Span type that should still be compatible with
real OpenTelemetry types. Once OpenTelemetry lands in our GOPATH (by way
of it landing in k8s, for example), we'll move over to using the real
type instead. Then, we can also begin integrating with OpenTelemetry
proper, ie. start sending traces over to collectors, start
injecting/extracing span information over gRPC, etc.

Another change on top of this one actually uses the Trace(ctx)
functionality within the curator - this is just the library
implementation.

Change-Id: I85506303538aacc137a28828ab39ccfd9ff72924
Reviewed-on: https://review.monogon.dev/c/monogon/+/541
Reviewed-by: Leopold Schabel <leo@nexantic.com>
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index b716e19..a893b87 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -89,8 +89,8 @@
 	}
 	// Create a curator gRPC server which performs authentication as per the created
 	// listenerSecurity and is backed by the created leader.
-	externalSrv := externalSec.SetupExternalGRPC(leader)
-	localSrv := localSec.SetupLocalGRPC(leader)
+	externalSrv := externalSec.SetupExternalGRPC(nil, leader)
+	localSrv := localSec.SetupLocalGRPC(nil, leader)
 	// The gRPC server will listen on an internal 'loopback' buffer.
 	externalLis := bufconn.Listen(1024 * 1024)
 	localLis := bufconn.Listen(1024 * 1024)
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index ab013bb..3298d12 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -218,7 +218,7 @@
 		}
 		defer lisLocal.Close()
 
-		runnable := supervisor.GRPCServer(ls.SetupLocalGRPC(l), lisLocal, true)
+		runnable := supervisor.GRPCServer(ls.SetupLocalGRPC(nil, l), lisLocal, true)
 		return runnable(ctx)
 	})
 	if err != nil {
@@ -232,7 +232,7 @@
 		}
 		defer lisExternal.Close()
 
-		runnable := supervisor.GRPCServer(es.SetupExternalGRPC(l), lisExternal, true)
+		runnable := supervisor.GRPCServer(es.SetupExternalGRPC(nil, l), lisExternal, true)
 		return runnable(ctx)
 	})
 	if err != nil {
diff --git a/metropolis/node/core/rpc/BUILD.bazel b/metropolis/node/core/rpc/BUILD.bazel
index d281a92..e749968 100644
--- a/metropolis/node/core/rpc/BUILD.bazel
+++ b/metropolis/node/core/rpc/BUILD.bazel
@@ -8,13 +8,16 @@
         "peerinfo.go",
         "server.go",
         "server_authentication.go",
+        "server_interceptors.go",
         "testhelpers.go",
+        "trace.go",
     ],
     importpath = "source.monogon.dev/metropolis/node/core/rpc",
     visibility = ["//visibility:public"],
     deps = [
         "//metropolis/node/core/curator/proto/api:go_default_library",
         "//metropolis/node/core/identity:go_default_library",
+        "//metropolis/pkg/logtree:go_default_library",
         "//metropolis/pkg/pki:go_default_library",
         "//metropolis/proto/api:go_default_library",
         "//metropolis/proto/ext:go_default_library",
@@ -24,6 +27,7 @@
         "@org_golang_google_grpc//peer:go_default_library",
         "@org_golang_google_grpc//status:go_default_library",
         "@org_golang_google_grpc//test/bufconn:go_default_library",
+        "@org_golang_google_protobuf//encoding/prototext:go_default_library",
         "@org_golang_google_protobuf//proto:go_default_library",
         "@org_golang_google_protobuf//reflect/protoreflect:go_default_library",
         "@org_golang_google_protobuf//reflect/protoregistry:go_default_library",
@@ -32,10 +36,14 @@
 
 go_test(
     name = "go_default_test",
-    srcs = ["server_authentication_test.go"],
+    srcs = [
+        "server_authentication_test.go",
+        "trace_test.go",
+    ],
     embed = [":go_default_library"],
     deps = [
         "//metropolis/node/core/curator/proto/api:go_default_library",
+        "//metropolis/pkg/logtree:go_default_library",
         "//metropolis/proto/api:go_default_library",
         "//metropolis/proto/ext:go_default_library",
         "@org_golang_google_grpc//codes:go_default_library",
diff --git a/metropolis/node/core/rpc/server_authentication.go b/metropolis/node/core/rpc/server_authentication.go
index 6ae2618..50cf36a 100644
--- a/metropolis/node/core/rpc/server_authentication.go
+++ b/metropolis/node/core/rpc/server_authentication.go
@@ -14,6 +14,7 @@
 
 	cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/pkg/logtree"
 	apb "source.monogon.dev/metropolis/proto/api"
 )
 
@@ -41,34 +42,10 @@
 	getPeerInfoUnauthenticated(ctx context.Context) (*PeerInfo, error)
 }
 
-// stream implements the gRPC StreamInterceptor interface for use with
-// grpc.NewServer, based on an authenticationStrategy.
-func streamInterceptor(a authenticationStrategy) grpc.StreamServerInterceptor {
-	return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
-		pi, err := check(ss.Context(), a, info.FullMethod)
-		if err != nil {
-			return err
-		}
-		return handler(srv, pi.serverStream(ss))
-	}
-}
-
-// unaryInterceptor implements the gRPC UnaryInterceptor interface for use with
-// grpc.NewServer, based on an authenticationStrategy.
-func unaryInterceptor(a authenticationStrategy) grpc.UnaryServerInterceptor {
-	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
-		pi, err := check(ctx, a, info.FullMethod)
-		if err != nil {
-			return nil, err
-		}
-		return handler(pi.apply(ctx), req)
-	}
-}
-
-// check is called by the unary and server interceptors to perform
+// authenticationCheck is called by the unary and server interceptors to perform
 // authentication and authorization checks for a given RPC, calling the
 // serverInterceptors' authenticate function if needed.
-func check(ctx context.Context, a authenticationStrategy, methodName string) (*PeerInfo, error) {
+func authenticationCheck(ctx context.Context, a authenticationStrategy, methodName string) (*PeerInfo, error) {
 	mi, err := getMethodInfo(methodName)
 	if err != nil {
 		return nil, err
@@ -113,7 +90,7 @@
 // metropolis.proto.ext.authorization options and authenticate/authorize
 // incoming connections. It also runs the gRPC server with the correct TLS
 // settings for authenticating itself to callers.
-func (l *ExternalServerSecurity) SetupExternalGRPC(impls ClusterExternalServices) *grpc.Server {
+func (l *ExternalServerSecurity) SetupExternalGRPC(logger logtree.LeveledLogger, impls ClusterExternalServices) *grpc.Server {
 	externalCreds := credentials.NewTLS(&tls.Config{
 		Certificates: []tls.Certificate{l.NodeCredentials.TLSCredentials()},
 		ClientAuth:   tls.RequestClientCert,
@@ -121,8 +98,8 @@
 
 	s := grpc.NewServer(
 		grpc.Creds(externalCreds),
-		grpc.UnaryInterceptor(unaryInterceptor(l)),
-		grpc.StreamInterceptor(streamInterceptor(l)),
+		grpc.UnaryInterceptor(unaryInterceptor(logger, l)),
+		grpc.StreamInterceptor(streamInterceptor(logger, l)),
 	)
 	cpb.RegisterCuratorServer(s, impls)
 	apb.RegisterAAAServer(s, impls)
@@ -237,10 +214,10 @@
 // SetupLocalGRPC returns a grpc.Server ready to listen on a local domain socket
 // and serve the Curator service. All incoming RPCs will be authenticated as
 // originating from the node for which LocalServerSecurity has been configured.
-func (l *LocalServerSecurity) SetupLocalGRPC(impls ClusterInternalServices) *grpc.Server {
+func (l *LocalServerSecurity) SetupLocalGRPC(logger logtree.LeveledLogger, impls ClusterInternalServices) *grpc.Server {
 	s := grpc.NewServer(
-		grpc.UnaryInterceptor(unaryInterceptor(l)),
-		grpc.StreamInterceptor(streamInterceptor(l)),
+		grpc.UnaryInterceptor(unaryInterceptor(logger, l)),
+		grpc.StreamInterceptor(streamInterceptor(logger, l)),
 	)
 	cpb.RegisterCuratorServer(s, impls)
 	return s
diff --git a/metropolis/node/core/rpc/server_authentication_test.go b/metropolis/node/core/rpc/server_authentication_test.go
index d383150..795b460 100644
--- a/metropolis/node/core/rpc/server_authentication_test.go
+++ b/metropolis/node/core/rpc/server_authentication_test.go
@@ -41,7 +41,7 @@
 	}
 
 	impl := &testImplementation{}
-	srv := ss.SetupExternalGRPC(impl)
+	srv := ss.SetupExternalGRPC(nil, impl)
 	lis := bufconn.Listen(1024 * 1024)
 	go func() {
 		if err := srv.Serve(lis); err != nil {
@@ -123,7 +123,7 @@
 	}
 
 	impl := &testImplementation{}
-	srv := ls.SetupLocalGRPC(impl)
+	srv := ls.SetupLocalGRPC(nil, impl)
 	lis := bufconn.Listen(1024 * 1024)
 	go func() {
 		if err := srv.Serve(lis); err != nil {
diff --git a/metropolis/node/core/rpc/server_interceptors.go b/metropolis/node/core/rpc/server_interceptors.go
new file mode 100644
index 0000000..2bc11c2
--- /dev/null
+++ b/metropolis/node/core/rpc/server_interceptors.go
@@ -0,0 +1,59 @@
+package rpc
+
+import (
+	"context"
+
+	"google.golang.org/grpc"
+
+	"source.monogon.dev/metropolis/pkg/logtree"
+)
+
+// stream implements the gRPC StreamInterceptor interface for use with
+// grpc.NewServer, based on an authenticationStrategy. It's applied to gRPC
+// servers started within Metropolis, notably to the Curator.
+func streamInterceptor(logger logtree.LeveledLogger, a authenticationStrategy) grpc.StreamServerInterceptor {
+	return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+		pi, err := authenticationCheck(ss.Context(), a, info.FullMethod)
+		if err != nil {
+			return err
+		}
+		if logger != nil {
+			s := newLogtreeSpan(logger)
+			s.Printf("RPC received: streaming request: %s", info.FullMethod)
+			ss = &spanServerStream{
+				ServerStream: ss,
+				span:         s,
+			}
+
+		}
+		return handler(srv, pi.serverStream(ss))
+	}
+}
+
+// unaryInterceptor implements the gRPC UnaryInterceptor interface for use with
+// grpc.NewServer, based on an authenticationStrategy. It's applied to gRPC
+// servers started within Metropolis, notably to the Curator.
+func unaryInterceptor(logger logtree.LeveledLogger, a authenticationStrategy) grpc.UnaryServerInterceptor {
+	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
+		pi, err := authenticationCheck(ctx, a, info.FullMethod)
+		if err != nil {
+			return nil, err
+		}
+		ctx = pi.apply(ctx)
+		var s *logtreeSpan
+		if logger != nil {
+			s = newLogtreeSpan(logger)
+			s.Printf("RPC received: unary request: %s", info.FullMethod)
+			ctx = contextWithSpan(ctx, s)
+		}
+		resp, err = handler(pi.apply(ctx), req)
+		if s != nil {
+			if err != nil {
+				s.Printf("RPC send: error: %v", err)
+			} else {
+				s.Printf("RPC send: ok, %s", protoMessagePretty(resp))
+			}
+		}
+		return
+	}
+}
diff --git a/metropolis/node/core/rpc/trace.go b/metropolis/node/core/rpc/trace.go
new file mode 100644
index 0000000..65802cd
--- /dev/null
+++ b/metropolis/node/core/rpc/trace.go
@@ -0,0 +1,168 @@
+package rpc
+
+import (
+	"context"
+	"fmt"
+	"strings"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/protobuf/encoding/prototext"
+	"google.golang.org/protobuf/proto"
+
+	"source.monogon.dev/metropolis/pkg/logtree"
+)
+
+// Span implements a compatible subset of
+// go.opentelemetry.io/otel/trace.Span.
+
+// It is used in place of trace.Span until opentelemetry support
+// is fully implemented and thus the library is pulled in. Once
+// that happens, all relevant methods will be replace with an
+// embedding of the trace.Span interface.
+type Span interface {
+	// End() not implemented.
+
+	// AddEvent adds an event with the provided name.
+	//
+	// Changed from otel/trace.Span: no options.
+	AddEvent(name string)
+
+	// IsRecording returns the recording state of the Span. It will return true if
+	// the Span is active and events can be recorded.
+	IsRecording() bool
+
+	// RecordError() not implemented.
+
+	// SpanContext() not implemented.
+
+	// SetStatus() not implemented.
+
+	// SetName() not implemented.
+
+	// SetAttributes() not implemented.
+
+	// TraceProvider() not implemented.
+
+	// Monogon extensions follow. These call into standard otel.Span methods
+	// (and effectively underlying model), but provide tighter API for
+	// Metropolis.
+
+	// Printf adds an event via AddEvent after performing a string format expansion
+	// via fmt.Sprintf. The formatting is performed during the call if the span is
+	// recording, or never if it isn't.
+	Printf(format string, a ...interface{})
+}
+
+// logtreeSpan is an implementation of Span which just forwards events into a
+// local logtree LeveledLogger. All spans are always recording.
+//
+// This is a stop-gap implementation to introduce gRPC trace-based
+// logging/metrics into Metropolis which can then be evolved into a full-blown
+// opentelemetry implementation.
+type logtreeSpan struct {
+	// logger is the logtree LeveledLogger backing this span. All Events added into
+	// the Span will go straight into that logger. If the logger is nil, all events
+	// will be dropped instead.
+	logger logtree.LeveledLogger
+	// uid is the span ID of this logtreeSpan. Currently this is a monotonic counter
+	// based on the current nanosecond epoch, but this might change in the future.
+	// This field is ignored if logger is nil.
+	uid uint64
+}
+
+func newLogtreeSpan(l logtree.LeveledLogger) *logtreeSpan {
+	uid := uint64(time.Now().UnixNano())
+	return &logtreeSpan{
+		logger: l,
+		uid:    uid,
+	}
+}
+
+func (l *logtreeSpan) AddEvent(name string) {
+	if l.logger == nil {
+		return
+	}
+	l.logger.WithAddedStackDepth(1).Infof("Span %x: %s", l.uid, name)
+}
+
+func (l *logtreeSpan) Printf(format string, a ...interface{}) {
+	if l.logger == nil {
+		return
+	}
+	msg := fmt.Sprintf(format, a...)
+	l.logger.WithAddedStackDepth(1).Infof("Span %x: %s", l.uid, msg)
+}
+
+func (l *logtreeSpan) IsRecording() bool {
+	return l.logger != nil
+}
+
+type spanKey string
+
+var spanKeyValue spanKey = "metropolis-trace-span"
+
+// contextWithSpan wraps a given context with a given logtreeSpan. This
+// logtreeSpan will be returned by Trace() calls on the returned context.
+func contextWithSpan(ctx context.Context, s *logtreeSpan) context.Context {
+	return context.WithValue(ctx, spanKeyValue, s)
+}
+
+// Trace returns the active Span for the current Go context. If no Span was set
+// up for this context, an inactive/empty span object is returned, on which
+// every operation is a no-op.
+func Trace(ctx context.Context) Span {
+	v := ctx.Value(spanKeyValue)
+	if v == nil {
+		return &logtreeSpan{}
+	}
+	if s, ok := v.(*logtreeSpan); ok {
+		return s
+	}
+	return &logtreeSpan{}
+}
+
+// spanServerStream is a grpc.ServerStream wrapper which contains some
+// logtreeSpan, and returns it as part of the Context() of the ServerStream. It
+// also intercepts SendMsg/RecvMsg and logs them to the same span.
+type spanServerStream struct {
+	grpc.ServerStream
+	span *logtreeSpan
+}
+
+func (s *spanServerStream) Context() context.Context {
+	return contextWithSpan(s.ServerStream.Context(), s.span)
+}
+
+func (s *spanServerStream) SendMsg(m interface{}) error {
+	s.span.Printf("RPC send: %s", protoMessagePretty(m))
+	return s.ServerStream.SendMsg(m)
+}
+
+func (s *spanServerStream) RecvMsg(m interface{}) error {
+	err := s.ServerStream.RecvMsg(m)
+	s.span.Printf("RPC recv: %s", protoMessagePretty(m))
+	return err
+}
+
+// protoMessagePretty attempts to pretty-print a given proto message into a
+// one-line string. The returned format is not guaranteed to be stable, and is
+// only intended to be used for debug purposes by operators.
+//
+// TODO(q3k): make this not print any confidential fields (once we have any),
+// eg. via extensions/annotations.
+func protoMessagePretty(m interface{}) string {
+	if m == nil {
+		return "nil"
+	}
+	v, ok := m.(proto.Message)
+	if !ok {
+		return "invalid"
+	}
+	name := string(v.ProtoReflect().Type().Descriptor().Name())
+	bytes, err := prototext.Marshal(v)
+	if err != nil {
+		return name
+	}
+	return fmt.Sprintf("%s: %s", name, strings.ReplaceAll(string(bytes), "\n", " "))
+}
diff --git a/metropolis/node/core/rpc/trace_test.go b/metropolis/node/core/rpc/trace_test.go
new file mode 100644
index 0000000..750ffe8
--- /dev/null
+++ b/metropolis/node/core/rpc/trace_test.go
@@ -0,0 +1,77 @@
+package rpc
+
+import (
+	"context"
+	"fmt"
+	"strings"
+	"testing"
+
+	"source.monogon.dev/metropolis/pkg/logtree"
+)
+
+// TestSpanRecording exercises the span->logtree forwarding functionality by
+// adding an event to the span and expecting to find it as a log entry.
+func TestSpanRecording(t *testing.T) {
+	lt := logtree.New()
+	span := newLogtreeSpan(lt.MustLeveledFor("test"))
+	span.Printf("hello world")
+
+	r, err := lt.Read("test", logtree.WithBacklog(logtree.BacklogAllAvailable))
+	if err != nil {
+		t.Fatalf("logtree read failed: %v", err)
+	}
+	defer r.Close()
+	found := false
+	needle := fmt.Sprintf("Span %x: hello world", span.uid)
+	for _, e := range r.Backlog {
+		if e.DN != "test" {
+			continue
+		}
+		if e.Leveled == nil {
+			continue
+		}
+		if e.Leveled.MessagesJoined() != needle {
+			continue
+		}
+		if parts := strings.Split(e.Leveled.Location(), ":"); parts[0] != "trace_test.go" {
+			t.Errorf("Trace/log location is %s, wanted something in trace_test.go", e.Leveled.Location())
+		}
+		found = true
+		break
+	}
+	if !found {
+		t.Fatalf("did not find expected logline")
+	}
+}
+
+// TestSpanContext exercises a span context injection/extraction roundtrip.
+func TestSpanContext(t *testing.T) {
+	ctx := context.Background()
+
+	lt := logtree.New()
+	span := newLogtreeSpan(lt.MustLeveledFor("test"))
+	ctx = contextWithSpan(ctx, span)
+	span2 := Trace(ctx)
+	if !span2.IsRecording() {
+		t.Errorf("Expected span to be active")
+	}
+
+	v, ok := span2.(*logtreeSpan)
+	if !ok {
+		t.Fatalf("Retrieved span is not *logtreeSpan")
+	}
+	if v != span {
+		t.Fatalf("Retrieved span differs from injected span")
+	}
+}
+
+// TestSpanContextFallback exercises an empty span retrieved from a context with
+// no span set.
+func TestSpanContextFallback(t *testing.T) {
+	ctx := context.Background()
+	// We expect this to never panic, just to drop any event.
+	Trace(ctx).Printf("plonk")
+	if Trace(ctx).IsRecording() {
+		t.Errorf("Expected span to be inactive")
+	}
+}