blob: 186b1084fd3bc33fb1f226c5f3fb998645074ec8 [file] [log] [blame]
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package grpc_opentracing
import (
"context"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
var (
grpcTag = opentracing.Tag{Key: string(ext.Component), Value: "gRPC"}
)
// UnaryServerInterceptor returns a new unary server interceptor for OpenTracing.
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
o := evaluateOptions(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if o.filterOutFunc != nil && !o.filterOutFunc(ctx, info.FullMethod) {
return handler(ctx, req)
}
opName := info.FullMethod
if o.opNameFunc != nil {
opName = o.opNameFunc(info.FullMethod)
}
newCtx, serverSpan := newServerSpanFromInbound(ctx, o.tracer, o.traceHeaderName, opName)
if o.unaryRequestHandlerFunc != nil {
o.unaryRequestHandlerFunc(serverSpan, req)
}
resp, err := handler(newCtx, req)
finishServerSpan(ctx, serverSpan, err)
return resp, err
}
}
// StreamServerInterceptor returns a new streaming server interceptor for OpenTracing.
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
o := evaluateOptions(opts)
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if o.filterOutFunc != nil && !o.filterOutFunc(stream.Context(), info.FullMethod) {
return handler(srv, stream)
}
opName := info.FullMethod
if o.opNameFunc != nil {
opName = o.opNameFunc(info.FullMethod)
}
newCtx, serverSpan := newServerSpanFromInbound(stream.Context(), o.tracer, o.traceHeaderName, opName)
wrappedStream := grpc_middleware.WrapServerStream(stream)
wrappedStream.WrappedContext = newCtx
err := handler(srv, wrappedStream)
finishServerSpan(newCtx, serverSpan, err)
return err
}
}
func newServerSpanFromInbound(ctx context.Context, tracer opentracing.Tracer, traceHeaderName, opName string) (context.Context, opentracing.Span) {
md := metautils.ExtractIncoming(ctx)
parentSpanContext, err := tracer.Extract(opentracing.HTTPHeaders, metadataTextMap(md))
if err != nil && err != opentracing.ErrSpanContextNotFound {
grpclog.Infof("grpc_opentracing: failed parsing trace information: %v", err)
}
serverSpan := tracer.StartSpan(
opName,
// this is magical, it attaches the new span to the parent parentSpanContext, and creates an unparented one if empty.
ext.RPCServerOption(parentSpanContext),
grpcTag,
)
injectOpentracingIdsToTags(traceHeaderName, serverSpan, grpc_ctxtags.Extract(ctx))
return opentracing.ContextWithSpan(ctx, serverSpan), serverSpan
}
func finishServerSpan(ctx context.Context, serverSpan opentracing.Span, err error) {
// Log context information
tags := grpc_ctxtags.Extract(ctx)
for k, v := range tags.Values() {
// Don't tag errors, log them instead.
if vErr, ok := v.(error); ok {
serverSpan.LogKV(k, vErr.Error())
} else {
serverSpan.SetTag(k, v)
}
}
if err != nil {
ext.Error.Set(serverSpan, true)
serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
}
serverSpan.Finish()
}