m/n/c/rpc: implement Span/Trace

This is a first pass at implementing basic support for
Dapper/OpenTracing/OpenTelemetry-style tracing within Metropolis RPCs.

More precisely, this implements an API to expose an RPC-local Span to
RPC handlers (unary and streaming). These Spans are currently backed by
a logtree logger, and aren't processed further (ie. there's no support
for child spans and carrying span information over the wire when
performing remote calls from an active Span). However, this allows us to
at least start emitting Span Events and use them for debugging purposes.

Since we don't yet have OpenTelemetry in our GOPATH, we reimplement a
minimum subset of the Span type that should still be compatible with
real OpenTelemetry types. Once OpenTelemetry lands in our GOPATH (by way
of it landing in k8s, for example), we'll move over to using the real
type instead. Then, we can also begin integrating with OpenTelemetry
proper, ie. start sending traces over to collectors, start
injecting/extracing span information over gRPC, etc.

Another change on top of this one actually uses the Trace(ctx)
functionality within the curator - this is just the library
implementation.

Change-Id: I85506303538aacc137a28828ab39ccfd9ff72924
Reviewed-on: https://review.monogon.dev/c/monogon/+/541
Reviewed-by: Leopold Schabel <leo@nexantic.com>
diff --git a/metropolis/node/core/rpc/trace.go b/metropolis/node/core/rpc/trace.go
new file mode 100644
index 0000000..65802cd
--- /dev/null
+++ b/metropolis/node/core/rpc/trace.go
@@ -0,0 +1,168 @@
+package rpc
+
+import (
+	"context"
+	"fmt"
+	"strings"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/protobuf/encoding/prototext"
+	"google.golang.org/protobuf/proto"
+
+	"source.monogon.dev/metropolis/pkg/logtree"
+)
+
+// Span implements a compatible subset of
+// go.opentelemetry.io/otel/trace.Span.
+
+// It is used in place of trace.Span until opentelemetry support
+// is fully implemented and thus the library is pulled in. Once
+// that happens, all relevant methods will be replace with an
+// embedding of the trace.Span interface.
+type Span interface {
+	// End() not implemented.
+
+	// AddEvent adds an event with the provided name.
+	//
+	// Changed from otel/trace.Span: no options.
+	AddEvent(name string)
+
+	// IsRecording returns the recording state of the Span. It will return true if
+	// the Span is active and events can be recorded.
+	IsRecording() bool
+
+	// RecordError() not implemented.
+
+	// SpanContext() not implemented.
+
+	// SetStatus() not implemented.
+
+	// SetName() not implemented.
+
+	// SetAttributes() not implemented.
+
+	// TraceProvider() not implemented.
+
+	// Monogon extensions follow. These call into standard otel.Span methods
+	// (and effectively underlying model), but provide tighter API for
+	// Metropolis.
+
+	// Printf adds an event via AddEvent after performing a string format expansion
+	// via fmt.Sprintf. The formatting is performed during the call if the span is
+	// recording, or never if it isn't.
+	Printf(format string, a ...interface{})
+}
+
+// logtreeSpan is an implementation of Span which just forwards events into a
+// local logtree LeveledLogger. All spans are always recording.
+//
+// This is a stop-gap implementation to introduce gRPC trace-based
+// logging/metrics into Metropolis which can then be evolved into a full-blown
+// opentelemetry implementation.
+type logtreeSpan struct {
+	// logger is the logtree LeveledLogger backing this span. All Events added into
+	// the Span will go straight into that logger. If the logger is nil, all events
+	// will be dropped instead.
+	logger logtree.LeveledLogger
+	// uid is the span ID of this logtreeSpan. Currently this is a monotonic counter
+	// based on the current nanosecond epoch, but this might change in the future.
+	// This field is ignored if logger is nil.
+	uid uint64
+}
+
+func newLogtreeSpan(l logtree.LeveledLogger) *logtreeSpan {
+	uid := uint64(time.Now().UnixNano())
+	return &logtreeSpan{
+		logger: l,
+		uid:    uid,
+	}
+}
+
+func (l *logtreeSpan) AddEvent(name string) {
+	if l.logger == nil {
+		return
+	}
+	l.logger.WithAddedStackDepth(1).Infof("Span %x: %s", l.uid, name)
+}
+
+func (l *logtreeSpan) Printf(format string, a ...interface{}) {
+	if l.logger == nil {
+		return
+	}
+	msg := fmt.Sprintf(format, a...)
+	l.logger.WithAddedStackDepth(1).Infof("Span %x: %s", l.uid, msg)
+}
+
+func (l *logtreeSpan) IsRecording() bool {
+	return l.logger != nil
+}
+
+type spanKey string
+
+var spanKeyValue spanKey = "metropolis-trace-span"
+
+// contextWithSpan wraps a given context with a given logtreeSpan. This
+// logtreeSpan will be returned by Trace() calls on the returned context.
+func contextWithSpan(ctx context.Context, s *logtreeSpan) context.Context {
+	return context.WithValue(ctx, spanKeyValue, s)
+}
+
+// Trace returns the active Span for the current Go context. If no Span was set
+// up for this context, an inactive/empty span object is returned, on which
+// every operation is a no-op.
+func Trace(ctx context.Context) Span {
+	v := ctx.Value(spanKeyValue)
+	if v == nil {
+		return &logtreeSpan{}
+	}
+	if s, ok := v.(*logtreeSpan); ok {
+		return s
+	}
+	return &logtreeSpan{}
+}
+
+// spanServerStream is a grpc.ServerStream wrapper which contains some
+// logtreeSpan, and returns it as part of the Context() of the ServerStream. It
+// also intercepts SendMsg/RecvMsg and logs them to the same span.
+type spanServerStream struct {
+	grpc.ServerStream
+	span *logtreeSpan
+}
+
+func (s *spanServerStream) Context() context.Context {
+	return contextWithSpan(s.ServerStream.Context(), s.span)
+}
+
+func (s *spanServerStream) SendMsg(m interface{}) error {
+	s.span.Printf("RPC send: %s", protoMessagePretty(m))
+	return s.ServerStream.SendMsg(m)
+}
+
+func (s *spanServerStream) RecvMsg(m interface{}) error {
+	err := s.ServerStream.RecvMsg(m)
+	s.span.Printf("RPC recv: %s", protoMessagePretty(m))
+	return err
+}
+
+// protoMessagePretty attempts to pretty-print a given proto message into a
+// one-line string. The returned format is not guaranteed to be stable, and is
+// only intended to be used for debug purposes by operators.
+//
+// TODO(q3k): make this not print any confidential fields (once we have any),
+// eg. via extensions/annotations.
+func protoMessagePretty(m interface{}) string {
+	if m == nil {
+		return "nil"
+	}
+	v, ok := m.(proto.Message)
+	if !ok {
+		return "invalid"
+	}
+	name := string(v.ProtoReflect().Type().Descriptor().Name())
+	bytes, err := prototext.Marshal(v)
+	if err != nil {
+		return name
+	}
+	return fmt.Sprintf("%s: %s", name, strings.ReplaceAll(string(bytes), "\n", " "))
+}