blob: 2e9cafd2b2ebfd0fe7e8e9eaddc67a4e6bb94675 [file] [log] [blame]
Girish Kumar2ed051b2020-07-28 16:35:25 +00001// Copyright 2017 Michal Witkowski. All Rights Reserved.
2// See LICENSE for licensing terms.
3
4package grpc_opentracing
5
6import (
David K. Bainbridgee05cf0c2021-08-19 03:16:50 +00007 "context"
Girish Kumar2ed051b2020-07-28 16:35:25 +00008 "io"
9 "sync"
10
11 "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
12 opentracing "github.com/opentracing/opentracing-go"
13 "github.com/opentracing/opentracing-go/ext"
14 "github.com/opentracing/opentracing-go/log"
Girish Kumar2ed051b2020-07-28 16:35:25 +000015 "google.golang.org/grpc"
16 "google.golang.org/grpc/grpclog"
17 "google.golang.org/grpc/metadata"
18)
19
20// UnaryClientInterceptor returns a new unary client interceptor for OpenTracing.
21func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
22 o := evaluateOptions(opts)
23 return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
24 if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
25 return invoker(parentCtx, method, req, reply, cc, opts...)
26 }
27 newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
David K. Bainbridgee05cf0c2021-08-19 03:16:50 +000028 if o.unaryRequestHandlerFunc != nil {
29 o.unaryRequestHandlerFunc(clientSpan, req)
30 }
Girish Kumar2ed051b2020-07-28 16:35:25 +000031 err := invoker(newCtx, method, req, reply, cc, opts...)
32 finishClientSpan(clientSpan, err)
33 return err
34 }
35}
36
37// StreamClientInterceptor returns a new streaming client interceptor for OpenTracing.
38func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
39 o := evaluateOptions(opts)
40 return func(parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
41 if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
42 return streamer(parentCtx, desc, cc, method, opts...)
43 }
44 newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
45 clientStream, err := streamer(newCtx, desc, cc, method, opts...)
46 if err != nil {
47 finishClientSpan(clientSpan, err)
48 return nil, err
49 }
50 return &tracedClientStream{ClientStream: clientStream, clientSpan: clientSpan}, nil
51 }
52}
53
54// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
55// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
56// a new ClientStream according to the retry policy.
57type tracedClientStream struct {
58 grpc.ClientStream
59 mu sync.Mutex
60 alreadyFinished bool
61 clientSpan opentracing.Span
62}
63
64func (s *tracedClientStream) Header() (metadata.MD, error) {
65 h, err := s.ClientStream.Header()
66 if err != nil {
67 s.finishClientSpan(err)
68 }
69 return h, err
70}
71
72func (s *tracedClientStream) SendMsg(m interface{}) error {
73 err := s.ClientStream.SendMsg(m)
74 if err != nil {
75 s.finishClientSpan(err)
76 }
77 return err
78}
79
80func (s *tracedClientStream) CloseSend() error {
81 err := s.ClientStream.CloseSend()
David K. Bainbridgee05cf0c2021-08-19 03:16:50 +000082 s.finishClientSpan(err)
Girish Kumar2ed051b2020-07-28 16:35:25 +000083 return err
84}
85
86func (s *tracedClientStream) RecvMsg(m interface{}) error {
87 err := s.ClientStream.RecvMsg(m)
88 if err != nil {
89 s.finishClientSpan(err)
90 }
91 return err
92}
93
94func (s *tracedClientStream) finishClientSpan(err error) {
95 s.mu.Lock()
96 defer s.mu.Unlock()
97 if !s.alreadyFinished {
98 finishClientSpan(s.clientSpan, err)
99 s.alreadyFinished = true
100 }
101}
102
103// ClientAddContextTags returns a context with specified opentracing tags, which
104// are used by UnaryClientInterceptor/StreamClientInterceptor when creating a
105// new span.
106func ClientAddContextTags(ctx context.Context, tags opentracing.Tags) context.Context {
107 return context.WithValue(ctx, clientSpanTagKey{}, tags)
108}
109
110type clientSpanTagKey struct{}
111
112func newClientSpanFromContext(ctx context.Context, tracer opentracing.Tracer, fullMethodName string) (context.Context, opentracing.Span) {
113 var parentSpanCtx opentracing.SpanContext
114 if parent := opentracing.SpanFromContext(ctx); parent != nil {
115 parentSpanCtx = parent.Context()
116 }
117 opts := []opentracing.StartSpanOption{
118 opentracing.ChildOf(parentSpanCtx),
119 ext.SpanKindRPCClient,
120 grpcTag,
121 }
122 if tagx := ctx.Value(clientSpanTagKey{}); tagx != nil {
123 if opt, ok := tagx.(opentracing.StartSpanOption); ok {
124 opts = append(opts, opt)
125 }
126 }
127 clientSpan := tracer.StartSpan(fullMethodName, opts...)
128 // Make sure we add this to the metadata of the call, so it gets propagated:
129 md := metautils.ExtractOutgoing(ctx).Clone()
130 if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil {
David K. Bainbridgee05cf0c2021-08-19 03:16:50 +0000131 grpclog.Infof("grpc_opentracing: failed serializing trace information: %v", err)
Girish Kumar2ed051b2020-07-28 16:35:25 +0000132 }
133 ctxWithMetadata := md.ToOutgoing(ctx)
134 return opentracing.ContextWithSpan(ctxWithMetadata, clientSpan), clientSpan
135}
136
137func finishClientSpan(clientSpan opentracing.Span, err error) {
138 if err != nil && err != io.EOF {
139 ext.Error.Set(clientSpan, true)
140 clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
141 }
142 clientSpan.Finish()
143}