blob: 186b1084fd3bc33fb1f226c5f3fb998645074ec8 [file] [log] [blame]
khenaidoo7d3c5582021-08-11 18:09:44 -04001// Copyright 2017 Michal Witkowski. All Rights Reserved.
2// See LICENSE for licensing terms.
3
4package grpc_opentracing
5
6import (
7 "context"
8
9 "github.com/grpc-ecosystem/go-grpc-middleware"
10 "github.com/grpc-ecosystem/go-grpc-middleware/tags"
11 "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
12 "github.com/opentracing/opentracing-go"
13 "github.com/opentracing/opentracing-go/ext"
14 "github.com/opentracing/opentracing-go/log"
15 "google.golang.org/grpc"
16 "google.golang.org/grpc/grpclog"
17)
18
19var (
20 grpcTag = opentracing.Tag{Key: string(ext.Component), Value: "gRPC"}
21)
22
23// UnaryServerInterceptor returns a new unary server interceptor for OpenTracing.
24func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
25 o := evaluateOptions(opts)
26 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
27 if o.filterOutFunc != nil && !o.filterOutFunc(ctx, info.FullMethod) {
28 return handler(ctx, req)
29 }
30 opName := info.FullMethod
31 if o.opNameFunc != nil {
32 opName = o.opNameFunc(info.FullMethod)
33 }
34 newCtx, serverSpan := newServerSpanFromInbound(ctx, o.tracer, o.traceHeaderName, opName)
35 if o.unaryRequestHandlerFunc != nil {
36 o.unaryRequestHandlerFunc(serverSpan, req)
37 }
38 resp, err := handler(newCtx, req)
39 finishServerSpan(ctx, serverSpan, err)
40 return resp, err
41 }
42}
43
44// StreamServerInterceptor returns a new streaming server interceptor for OpenTracing.
45func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
46 o := evaluateOptions(opts)
47 return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
48 if o.filterOutFunc != nil && !o.filterOutFunc(stream.Context(), info.FullMethod) {
49 return handler(srv, stream)
50 }
51 opName := info.FullMethod
52 if o.opNameFunc != nil {
53 opName = o.opNameFunc(info.FullMethod)
54 }
55 newCtx, serverSpan := newServerSpanFromInbound(stream.Context(), o.tracer, o.traceHeaderName, opName)
56 wrappedStream := grpc_middleware.WrapServerStream(stream)
57 wrappedStream.WrappedContext = newCtx
58 err := handler(srv, wrappedStream)
59 finishServerSpan(newCtx, serverSpan, err)
60 return err
61 }
62}
63
64func newServerSpanFromInbound(ctx context.Context, tracer opentracing.Tracer, traceHeaderName, opName string) (context.Context, opentracing.Span) {
65 md := metautils.ExtractIncoming(ctx)
66 parentSpanContext, err := tracer.Extract(opentracing.HTTPHeaders, metadataTextMap(md))
67 if err != nil && err != opentracing.ErrSpanContextNotFound {
68 grpclog.Infof("grpc_opentracing: failed parsing trace information: %v", err)
69 }
70
71 serverSpan := tracer.StartSpan(
72 opName,
73 // this is magical, it attaches the new span to the parent parentSpanContext, and creates an unparented one if empty.
74 ext.RPCServerOption(parentSpanContext),
75 grpcTag,
76 )
77
78 injectOpentracingIdsToTags(traceHeaderName, serverSpan, grpc_ctxtags.Extract(ctx))
79 return opentracing.ContextWithSpan(ctx, serverSpan), serverSpan
80}
81
82func finishServerSpan(ctx context.Context, serverSpan opentracing.Span, err error) {
83 // Log context information
84 tags := grpc_ctxtags.Extract(ctx)
85 for k, v := range tags.Values() {
86 // Don't tag errors, log them instead.
87 if vErr, ok := v.(error); ok {
88 serverSpan.LogKV(k, vErr.Error())
89 } else {
90 serverSpan.SetTag(k, v)
91 }
92 }
93 if err != nil {
94 ext.Error.Set(serverSpan, true)
95 serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
96 }
97 serverSpan.Finish()
98}