Girish Kumar | 7424065 | 2020-07-10 11:54:28 +0000 | [diff] [blame] | 1 | // Copyright 2017 Michal Witkowski. All Rights Reserved. |
| 2 | // See LICENSE for licensing terms. |
| 3 | |
| 4 | package grpc_opentracing |
| 5 | |
| 6 | import ( |
khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 7 | "context" |
Girish Kumar | 7424065 | 2020-07-10 11:54:28 +0000 | [diff] [blame] | 8 | "io" |
| 9 | "sync" |
| 10 | |
| 11 | "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils" |
| 12 | opentracing "github.com/opentracing/opentracing-go" |
| 13 | "github.com/opentracing/opentracing-go/ext" |
| 14 | "github.com/opentracing/opentracing-go/log" |
Girish Kumar | 7424065 | 2020-07-10 11:54:28 +0000 | [diff] [blame] | 15 | "google.golang.org/grpc" |
| 16 | "google.golang.org/grpc/grpclog" |
| 17 | "google.golang.org/grpc/metadata" |
| 18 | ) |
| 19 | |
| 20 | // UnaryClientInterceptor returns a new unary client interceptor for OpenTracing. |
| 21 | func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor { |
| 22 | o := evaluateOptions(opts) |
| 23 | return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { |
| 24 | if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) { |
| 25 | return invoker(parentCtx, method, req, reply, cc, opts...) |
| 26 | } |
| 27 | newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method) |
khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 28 | if o.unaryRequestHandlerFunc != nil { |
| 29 | o.unaryRequestHandlerFunc(clientSpan, req) |
| 30 | } |
Girish Kumar | 7424065 | 2020-07-10 11:54:28 +0000 | [diff] [blame] | 31 | err := invoker(newCtx, method, req, reply, cc, opts...) |
| 32 | finishClientSpan(clientSpan, err) |
| 33 | return err |
| 34 | } |
| 35 | } |
| 36 | |
| 37 | // StreamClientInterceptor returns a new streaming client interceptor for OpenTracing. |
| 38 | func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor { |
| 39 | o := evaluateOptions(opts) |
| 40 | return func(parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { |
| 41 | if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) { |
| 42 | return streamer(parentCtx, desc, cc, method, opts...) |
| 43 | } |
| 44 | newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method) |
| 45 | clientStream, err := streamer(newCtx, desc, cc, method, opts...) |
| 46 | if err != nil { |
| 47 | finishClientSpan(clientSpan, err) |
| 48 | return nil, err |
| 49 | } |
| 50 | return &tracedClientStream{ClientStream: clientStream, clientSpan: clientSpan}, nil |
| 51 | } |
| 52 | } |
| 53 | |
| 54 | // type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a |
| 55 | // proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish |
| 56 | // a new ClientStream according to the retry policy. |
| 57 | type tracedClientStream struct { |
| 58 | grpc.ClientStream |
| 59 | mu sync.Mutex |
| 60 | alreadyFinished bool |
| 61 | clientSpan opentracing.Span |
| 62 | } |
| 63 | |
| 64 | func (s *tracedClientStream) Header() (metadata.MD, error) { |
| 65 | h, err := s.ClientStream.Header() |
| 66 | if err != nil { |
| 67 | s.finishClientSpan(err) |
| 68 | } |
| 69 | return h, err |
| 70 | } |
| 71 | |
| 72 | func (s *tracedClientStream) SendMsg(m interface{}) error { |
| 73 | err := s.ClientStream.SendMsg(m) |
| 74 | if err != nil { |
| 75 | s.finishClientSpan(err) |
| 76 | } |
| 77 | return err |
| 78 | } |
| 79 | |
| 80 | func (s *tracedClientStream) CloseSend() error { |
| 81 | err := s.ClientStream.CloseSend() |
khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 82 | s.finishClientSpan(err) |
Girish Kumar | 7424065 | 2020-07-10 11:54:28 +0000 | [diff] [blame] | 83 | return err |
| 84 | } |
| 85 | |
| 86 | func (s *tracedClientStream) RecvMsg(m interface{}) error { |
| 87 | err := s.ClientStream.RecvMsg(m) |
| 88 | if err != nil { |
| 89 | s.finishClientSpan(err) |
| 90 | } |
| 91 | return err |
| 92 | } |
| 93 | |
| 94 | func (s *tracedClientStream) finishClientSpan(err error) { |
| 95 | s.mu.Lock() |
| 96 | defer s.mu.Unlock() |
| 97 | if !s.alreadyFinished { |
| 98 | finishClientSpan(s.clientSpan, err) |
| 99 | s.alreadyFinished = true |
| 100 | } |
| 101 | } |
| 102 | |
| 103 | // ClientAddContextTags returns a context with specified opentracing tags, which |
| 104 | // are used by UnaryClientInterceptor/StreamClientInterceptor when creating a |
| 105 | // new span. |
| 106 | func ClientAddContextTags(ctx context.Context, tags opentracing.Tags) context.Context { |
| 107 | return context.WithValue(ctx, clientSpanTagKey{}, tags) |
| 108 | } |
| 109 | |
| 110 | type clientSpanTagKey struct{} |
| 111 | |
| 112 | func newClientSpanFromContext(ctx context.Context, tracer opentracing.Tracer, fullMethodName string) (context.Context, opentracing.Span) { |
| 113 | var parentSpanCtx opentracing.SpanContext |
| 114 | if parent := opentracing.SpanFromContext(ctx); parent != nil { |
| 115 | parentSpanCtx = parent.Context() |
| 116 | } |
| 117 | opts := []opentracing.StartSpanOption{ |
| 118 | opentracing.ChildOf(parentSpanCtx), |
| 119 | ext.SpanKindRPCClient, |
| 120 | grpcTag, |
| 121 | } |
| 122 | if tagx := ctx.Value(clientSpanTagKey{}); tagx != nil { |
| 123 | if opt, ok := tagx.(opentracing.StartSpanOption); ok { |
| 124 | opts = append(opts, opt) |
| 125 | } |
| 126 | } |
| 127 | clientSpan := tracer.StartSpan(fullMethodName, opts...) |
| 128 | // Make sure we add this to the metadata of the call, so it gets propagated: |
| 129 | md := metautils.ExtractOutgoing(ctx).Clone() |
| 130 | if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil { |
khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 131 | grpclog.Infof("grpc_opentracing: failed serializing trace information: %v", err) |
Girish Kumar | 7424065 | 2020-07-10 11:54:28 +0000 | [diff] [blame] | 132 | } |
| 133 | ctxWithMetadata := md.ToOutgoing(ctx) |
| 134 | return opentracing.ContextWithSpan(ctxWithMetadata, clientSpan), clientSpan |
| 135 | } |
| 136 | |
| 137 | func finishClientSpan(clientSpan opentracing.Span, err error) { |
| 138 | if err != nil && err != io.EOF { |
| 139 | ext.Error.Set(clientSpan, true) |
| 140 | clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error())) |
| 141 | } |
| 142 | clientSpan.Finish() |
| 143 | } |