blob: 53764a083c177bf13e598b071d78df59f881d1ec [file] [log] [blame]
Girish Kumar2ed051b2020-07-28 16:35:25 +00001// Copyright 2017 Michal Witkowski. All Rights Reserved.
2// See LICENSE for licensing terms.
3
4package grpc_opentracing
5
6import (
7 "github.com/grpc-ecosystem/go-grpc-middleware"
8 "github.com/grpc-ecosystem/go-grpc-middleware/tags"
9 "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
10 "github.com/opentracing/opentracing-go"
11 "github.com/opentracing/opentracing-go/ext"
12 "github.com/opentracing/opentracing-go/log"
13 "golang.org/x/net/context"
14 "google.golang.org/grpc"
15 "google.golang.org/grpc/grpclog"
16)
17
18var (
19 grpcTag = opentracing.Tag{Key: string(ext.Component), Value: "gRPC"}
20)
21
22// UnaryServerInterceptor returns a new unary server interceptor for OpenTracing.
23func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
24 o := evaluateOptions(opts)
25 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
26 if o.filterOutFunc != nil && !o.filterOutFunc(ctx, info.FullMethod) {
27 return handler(ctx, req)
28 }
29 newCtx, serverSpan := newServerSpanFromInbound(ctx, o.tracer, info.FullMethod)
30 resp, err := handler(newCtx, req)
31 finishServerSpan(ctx, serverSpan, err)
32 return resp, err
33 }
34}
35
36// StreamServerInterceptor returns a new streaming server interceptor for OpenTracing.
37func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
38 o := evaluateOptions(opts)
39 return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
40 if o.filterOutFunc != nil && !o.filterOutFunc(stream.Context(), info.FullMethod) {
41 return handler(srv, stream)
42 }
43 newCtx, serverSpan := newServerSpanFromInbound(stream.Context(), o.tracer, info.FullMethod)
44 wrappedStream := grpc_middleware.WrapServerStream(stream)
45 wrappedStream.WrappedContext = newCtx
46 err := handler(srv, wrappedStream)
47 finishServerSpan(newCtx, serverSpan, err)
48 return err
49 }
50}
51
52func newServerSpanFromInbound(ctx context.Context, tracer opentracing.Tracer, fullMethodName string) (context.Context, opentracing.Span) {
53 md := metautils.ExtractIncoming(ctx)
54 parentSpanContext, err := tracer.Extract(opentracing.HTTPHeaders, metadataTextMap(md))
55 if err != nil && err != opentracing.ErrSpanContextNotFound {
56 grpclog.Printf("grpc_opentracing: failed parsing trace information: %v", err)
57 }
58
59 serverSpan := tracer.StartSpan(
60 fullMethodName,
61 // this is magical, it attaches the new span to the parent parentSpanContext, and creates an unparented one if empty.
62 ext.RPCServerOption(parentSpanContext),
63 grpcTag,
64 )
65
66 injectOpentracingIdsToTags(serverSpan, grpc_ctxtags.Extract(ctx))
67 return opentracing.ContextWithSpan(ctx, serverSpan), serverSpan
68}
69
70func finishServerSpan(ctx context.Context, serverSpan opentracing.Span, err error) {
71 // Log context information
72 tags := grpc_ctxtags.Extract(ctx)
73 for k, v := range tags.Values() {
74 // Don't tag errors, log them instead.
75 if vErr, ok := v.(error); ok {
76 serverSpan.LogKV(k, vErr.Error())
77
78 } else {
79 serverSpan.SetTag(k, v)
80 }
81 }
82 if err != nil {
83 ext.Error.Set(serverSpan, true)
84 serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
85 }
86 serverSpan.Finish()
87}