blob: f8fdecf5ab54f680af0481da6d94e19b2819779c [file] [log] [blame]
Girish Kumar93e91742020-07-27 16:43:19 +00001// Copyright 2017 Michal Witkowski. All Rights Reserved.
2// See LICENSE for licensing terms.
3
4package grpc_opentracing
5
6import (
7 "io"
8 "sync"
9
10 "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
11 opentracing "github.com/opentracing/opentracing-go"
12 "github.com/opentracing/opentracing-go/ext"
13 "github.com/opentracing/opentracing-go/log"
14 "golang.org/x/net/context"
15 "google.golang.org/grpc"
16 "google.golang.org/grpc/grpclog"
17 "google.golang.org/grpc/metadata"
18)
19
20// UnaryClientInterceptor returns a new unary client interceptor for OpenTracing.
21func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
22 o := evaluateOptions(opts)
23 return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
24 if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
25 return invoker(parentCtx, method, req, reply, cc, opts...)
26 }
27 newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
28 err := invoker(newCtx, method, req, reply, cc, opts...)
29 finishClientSpan(clientSpan, err)
30 return err
31 }
32}
33
34// StreamClientInterceptor returns a new streaming client interceptor for OpenTracing.
35func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
36 o := evaluateOptions(opts)
37 return func(parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
38 if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
39 return streamer(parentCtx, desc, cc, method, opts...)
40 }
41 newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
42 clientStream, err := streamer(newCtx, desc, cc, method, opts...)
43 if err != nil {
44 finishClientSpan(clientSpan, err)
45 return nil, err
46 }
47 return &tracedClientStream{ClientStream: clientStream, clientSpan: clientSpan}, nil
48 }
49}
50
51// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
52// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
53// a new ClientStream according to the retry policy.
54type tracedClientStream struct {
55 grpc.ClientStream
56 mu sync.Mutex
57 alreadyFinished bool
58 clientSpan opentracing.Span
59}
60
61func (s *tracedClientStream) Header() (metadata.MD, error) {
62 h, err := s.ClientStream.Header()
63 if err != nil {
64 s.finishClientSpan(err)
65 }
66 return h, err
67}
68
69func (s *tracedClientStream) SendMsg(m interface{}) error {
70 err := s.ClientStream.SendMsg(m)
71 if err != nil {
72 s.finishClientSpan(err)
73 }
74 return err
75}
76
77func (s *tracedClientStream) CloseSend() error {
78 err := s.ClientStream.CloseSend()
79 if err != nil {
80 s.finishClientSpan(err)
81 }
82 return err
83}
84
85func (s *tracedClientStream) RecvMsg(m interface{}) error {
86 err := s.ClientStream.RecvMsg(m)
87 if err != nil {
88 s.finishClientSpan(err)
89 }
90 return err
91}
92
93func (s *tracedClientStream) finishClientSpan(err error) {
94 s.mu.Lock()
95 defer s.mu.Unlock()
96 if !s.alreadyFinished {
97 finishClientSpan(s.clientSpan, err)
98 s.alreadyFinished = true
99 }
100}
101
102// ClientAddContextTags returns a context with specified opentracing tags, which
103// are used by UnaryClientInterceptor/StreamClientInterceptor when creating a
104// new span.
105func ClientAddContextTags(ctx context.Context, tags opentracing.Tags) context.Context {
106 return context.WithValue(ctx, clientSpanTagKey{}, tags)
107}
108
109type clientSpanTagKey struct{}
110
111func newClientSpanFromContext(ctx context.Context, tracer opentracing.Tracer, fullMethodName string) (context.Context, opentracing.Span) {
112 var parentSpanCtx opentracing.SpanContext
113 if parent := opentracing.SpanFromContext(ctx); parent != nil {
114 parentSpanCtx = parent.Context()
115 }
116 opts := []opentracing.StartSpanOption{
117 opentracing.ChildOf(parentSpanCtx),
118 ext.SpanKindRPCClient,
119 grpcTag,
120 }
121 if tagx := ctx.Value(clientSpanTagKey{}); tagx != nil {
122 if opt, ok := tagx.(opentracing.StartSpanOption); ok {
123 opts = append(opts, opt)
124 }
125 }
126 clientSpan := tracer.StartSpan(fullMethodName, opts...)
127 // Make sure we add this to the metadata of the call, so it gets propagated:
128 md := metautils.ExtractOutgoing(ctx).Clone()
129 if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil {
130 grpclog.Printf("grpc_opentracing: failed serializing trace information: %v", err)
131 }
132 ctxWithMetadata := md.ToOutgoing(ctx)
133 return opentracing.ContextWithSpan(ctxWithMetadata, clientSpan), clientSpan
134}
135
136func finishClientSpan(clientSpan opentracing.Span, err error) {
137 if err != nil && err != io.EOF {
138 ext.Error.Set(clientSpan, true)
139 clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
140 }
141 clientSpan.Finish()
142}