Serge Bazanski | fb0fb6d | 2022-02-18 12:11:28 +0100 | [diff] [blame] | 1 | package rpc |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "fmt" |
| 6 | "strings" |
| 7 | "time" |
| 8 | |
| 9 | "google.golang.org/grpc" |
| 10 | "google.golang.org/protobuf/encoding/prototext" |
| 11 | "google.golang.org/protobuf/proto" |
| 12 | |
| 13 | "source.monogon.dev/metropolis/pkg/logtree" |
| 14 | ) |
| 15 | |
| 16 | // Span implements a compatible subset of |
| 17 | // go.opentelemetry.io/otel/trace.Span. |
| 18 | |
| 19 | // It is used in place of trace.Span until opentelemetry support |
| 20 | // is fully implemented and thus the library is pulled in. Once |
| 21 | // that happens, all relevant methods will be replace with an |
| 22 | // embedding of the trace.Span interface. |
| 23 | type Span interface { |
| 24 | // End() not implemented. |
| 25 | |
| 26 | // AddEvent adds an event with the provided name. |
| 27 | // |
| 28 | // Changed from otel/trace.Span: no options. |
| 29 | AddEvent(name string) |
| 30 | |
| 31 | // IsRecording returns the recording state of the Span. It will return true if |
| 32 | // the Span is active and events can be recorded. |
| 33 | IsRecording() bool |
| 34 | |
| 35 | // RecordError() not implemented. |
| 36 | |
| 37 | // SpanContext() not implemented. |
| 38 | |
| 39 | // SetStatus() not implemented. |
| 40 | |
| 41 | // SetName() not implemented. |
| 42 | |
| 43 | // SetAttributes() not implemented. |
| 44 | |
| 45 | // TraceProvider() not implemented. |
| 46 | |
| 47 | // Monogon extensions follow. These call into standard otel.Span methods |
| 48 | // (and effectively underlying model), but provide tighter API for |
| 49 | // Metropolis. |
| 50 | |
| 51 | // Printf adds an event via AddEvent after performing a string format expansion |
| 52 | // via fmt.Sprintf. The formatting is performed during the call if the span is |
| 53 | // recording, or never if it isn't. |
| 54 | Printf(format string, a ...interface{}) |
| 55 | } |
| 56 | |
| 57 | // logtreeSpan is an implementation of Span which just forwards events into a |
| 58 | // local logtree LeveledLogger. All spans are always recording. |
| 59 | // |
| 60 | // This is a stop-gap implementation to introduce gRPC trace-based |
| 61 | // logging/metrics into Metropolis which can then be evolved into a full-blown |
| 62 | // opentelemetry implementation. |
| 63 | type logtreeSpan struct { |
| 64 | // logger is the logtree LeveledLogger backing this span. All Events added into |
| 65 | // the Span will go straight into that logger. If the logger is nil, all events |
| 66 | // will be dropped instead. |
| 67 | logger logtree.LeveledLogger |
| 68 | // uid is the span ID of this logtreeSpan. Currently this is a monotonic counter |
| 69 | // based on the current nanosecond epoch, but this might change in the future. |
| 70 | // This field is ignored if logger is nil. |
| 71 | uid uint64 |
| 72 | } |
| 73 | |
| 74 | func newLogtreeSpan(l logtree.LeveledLogger) *logtreeSpan { |
| 75 | uid := uint64(time.Now().UnixNano()) |
| 76 | return &logtreeSpan{ |
| 77 | logger: l, |
| 78 | uid: uid, |
| 79 | } |
| 80 | } |
| 81 | |
| 82 | func (l *logtreeSpan) AddEvent(name string) { |
| 83 | if l.logger == nil { |
| 84 | return |
| 85 | } |
| 86 | l.logger.WithAddedStackDepth(1).Infof("Span %x: %s", l.uid, name) |
| 87 | } |
| 88 | |
| 89 | func (l *logtreeSpan) Printf(format string, a ...interface{}) { |
| 90 | if l.logger == nil { |
| 91 | return |
| 92 | } |
| 93 | msg := fmt.Sprintf(format, a...) |
| 94 | l.logger.WithAddedStackDepth(1).Infof("Span %x: %s", l.uid, msg) |
| 95 | } |
| 96 | |
| 97 | func (l *logtreeSpan) IsRecording() bool { |
| 98 | return l.logger != nil |
| 99 | } |
| 100 | |
| 101 | type spanKey string |
| 102 | |
| 103 | var spanKeyValue spanKey = "metropolis-trace-span" |
| 104 | |
| 105 | // contextWithSpan wraps a given context with a given logtreeSpan. This |
| 106 | // logtreeSpan will be returned by Trace() calls on the returned context. |
| 107 | func contextWithSpan(ctx context.Context, s *logtreeSpan) context.Context { |
| 108 | return context.WithValue(ctx, spanKeyValue, s) |
| 109 | } |
| 110 | |
| 111 | // Trace returns the active Span for the current Go context. If no Span was set |
| 112 | // up for this context, an inactive/empty span object is returned, on which |
| 113 | // every operation is a no-op. |
| 114 | func Trace(ctx context.Context) Span { |
| 115 | v := ctx.Value(spanKeyValue) |
| 116 | if v == nil { |
| 117 | return &logtreeSpan{} |
| 118 | } |
| 119 | if s, ok := v.(*logtreeSpan); ok { |
| 120 | return s |
| 121 | } |
| 122 | return &logtreeSpan{} |
| 123 | } |
| 124 | |
| 125 | // spanServerStream is a grpc.ServerStream wrapper which contains some |
| 126 | // logtreeSpan, and returns it as part of the Context() of the ServerStream. It |
| 127 | // also intercepts SendMsg/RecvMsg and logs them to the same span. |
| 128 | type spanServerStream struct { |
| 129 | grpc.ServerStream |
| 130 | span *logtreeSpan |
| 131 | } |
| 132 | |
| 133 | func (s *spanServerStream) Context() context.Context { |
| 134 | return contextWithSpan(s.ServerStream.Context(), s.span) |
| 135 | } |
| 136 | |
| 137 | func (s *spanServerStream) SendMsg(m interface{}) error { |
| 138 | s.span.Printf("RPC send: %s", protoMessagePretty(m)) |
| 139 | return s.ServerStream.SendMsg(m) |
| 140 | } |
| 141 | |
| 142 | func (s *spanServerStream) RecvMsg(m interface{}) error { |
| 143 | err := s.ServerStream.RecvMsg(m) |
| 144 | s.span.Printf("RPC recv: %s", protoMessagePretty(m)) |
| 145 | return err |
| 146 | } |
| 147 | |
| 148 | // protoMessagePretty attempts to pretty-print a given proto message into a |
| 149 | // one-line string. The returned format is not guaranteed to be stable, and is |
| 150 | // only intended to be used for debug purposes by operators. |
| 151 | // |
| 152 | // TODO(q3k): make this not print any confidential fields (once we have any), |
| 153 | // eg. via extensions/annotations. |
| 154 | func protoMessagePretty(m interface{}) string { |
| 155 | if m == nil { |
| 156 | return "nil" |
| 157 | } |
| 158 | v, ok := m.(proto.Message) |
| 159 | if !ok { |
| 160 | return "invalid" |
| 161 | } |
| 162 | name := string(v.ProtoReflect().Type().Descriptor().Name()) |
| 163 | bytes, err := prototext.Marshal(v) |
| 164 | if err != nil { |
| 165 | return name |
| 166 | } |
| 167 | return fmt.Sprintf("%s: %s", name, strings.ReplaceAll(string(bytes), "\n", " ")) |
| 168 | } |