[VOL-3197][VOL-3196] Enhanced Kafka RPC and gRPC interfaces to propagate Span context for log correlation
Also, made some corrections to helper method in log/utils based on testing
Change-Id: Ic0fec935dd8996b3c6c17116586c5bd307e7bebb
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/client_interceptors.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/client_interceptors.go
new file mode 100644
index 0000000..f8fdecf
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/client_interceptors.go
@@ -0,0 +1,142 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_opentracing
+
+import (
+ "io"
+ "sync"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
+ opentracing "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/ext"
+ "github.com/opentracing/opentracing-go/log"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/metadata"
+)
+
+// UnaryClientInterceptor returns a new unary client interceptor for OpenTracing.
+func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
+ o := evaluateOptions(opts)
+ return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
+ return invoker(parentCtx, method, req, reply, cc, opts...)
+ }
+ newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
+ err := invoker(newCtx, method, req, reply, cc, opts...)
+ finishClientSpan(clientSpan, err)
+ return err
+ }
+}
+
+// StreamClientInterceptor returns a new streaming client interceptor for OpenTracing.
+func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
+ o := evaluateOptions(opts)
+ return func(parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
+ return streamer(parentCtx, desc, cc, method, opts...)
+ }
+ newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
+ clientStream, err := streamer(newCtx, desc, cc, method, opts...)
+ if err != nil {
+ finishClientSpan(clientSpan, err)
+ return nil, err
+ }
+ return &tracedClientStream{ClientStream: clientStream, clientSpan: clientSpan}, nil
+ }
+}
+
+// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
+// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
+// a new ClientStream according to the retry policy.
+type tracedClientStream struct {
+ grpc.ClientStream
+ mu sync.Mutex
+ alreadyFinished bool
+ clientSpan opentracing.Span
+}
+
+func (s *tracedClientStream) Header() (metadata.MD, error) {
+ h, err := s.ClientStream.Header()
+ if err != nil {
+ s.finishClientSpan(err)
+ }
+ return h, err
+}
+
+func (s *tracedClientStream) SendMsg(m interface{}) error {
+ err := s.ClientStream.SendMsg(m)
+ if err != nil {
+ s.finishClientSpan(err)
+ }
+ return err
+}
+
+func (s *tracedClientStream) CloseSend() error {
+ err := s.ClientStream.CloseSend()
+ if err != nil {
+ s.finishClientSpan(err)
+ }
+ return err
+}
+
+func (s *tracedClientStream) RecvMsg(m interface{}) error {
+ err := s.ClientStream.RecvMsg(m)
+ if err != nil {
+ s.finishClientSpan(err)
+ }
+ return err
+}
+
+func (s *tracedClientStream) finishClientSpan(err error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.alreadyFinished {
+ finishClientSpan(s.clientSpan, err)
+ s.alreadyFinished = true
+ }
+}
+
+// ClientAddContextTags returns a context with specified opentracing tags, which
+// are used by UnaryClientInterceptor/StreamClientInterceptor when creating a
+// new span.
+func ClientAddContextTags(ctx context.Context, tags opentracing.Tags) context.Context {
+ return context.WithValue(ctx, clientSpanTagKey{}, tags)
+}
+
+type clientSpanTagKey struct{}
+
+func newClientSpanFromContext(ctx context.Context, tracer opentracing.Tracer, fullMethodName string) (context.Context, opentracing.Span) {
+ var parentSpanCtx opentracing.SpanContext
+ if parent := opentracing.SpanFromContext(ctx); parent != nil {
+ parentSpanCtx = parent.Context()
+ }
+ opts := []opentracing.StartSpanOption{
+ opentracing.ChildOf(parentSpanCtx),
+ ext.SpanKindRPCClient,
+ grpcTag,
+ }
+ if tagx := ctx.Value(clientSpanTagKey{}); tagx != nil {
+ if opt, ok := tagx.(opentracing.StartSpanOption); ok {
+ opts = append(opts, opt)
+ }
+ }
+ clientSpan := tracer.StartSpan(fullMethodName, opts...)
+ // Make sure we add this to the metadata of the call, so it gets propagated:
+ md := metautils.ExtractOutgoing(ctx).Clone()
+ if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil {
+ grpclog.Printf("grpc_opentracing: failed serializing trace information: %v", err)
+ }
+ ctxWithMetadata := md.ToOutgoing(ctx)
+ return opentracing.ContextWithSpan(ctxWithMetadata, clientSpan), clientSpan
+}
+
+func finishClientSpan(clientSpan opentracing.Span, err error) {
+ if err != nil && err != io.EOF {
+ ext.Error.Set(clientSpan, true)
+ clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
+ }
+ clientSpan.Finish()
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/doc.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/doc.go
new file mode 100644
index 0000000..7a58efc
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/doc.go
@@ -0,0 +1,22 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+/*
+`grpc_opentracing` adds OpenTracing
+
+OpenTracing Interceptors
+
+These are both client-side and server-side interceptors for OpenTracing. They are a provider-agnostic, with backends
+such as Zipkin, or Google Stackdriver Trace.
+
+For a service that sends out requests and receives requests, you *need* to use both, otherwise downstream requests will
+not have the appropriate requests propagated.
+
+All server-side spans are tagged with grpc_ctxtags information.
+
+For more information see:
+http://opentracing.io/documentation/
+https://github.com/opentracing/specification/blob/master/semantic_conventions.md
+
+*/
+package grpc_opentracing
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/id_extract.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/id_extract.go
new file mode 100644
index 0000000..d19f3c8
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/id_extract.go
@@ -0,0 +1,67 @@
+package grpc_opentracing
+
+import (
+ "strings"
+
+ grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ opentracing "github.com/opentracing/opentracing-go"
+ "google.golang.org/grpc/grpclog"
+)
+
+const (
+ TagTraceId = "trace.traceid"
+ TagSpanId = "trace.spanid"
+ TagSampled = "trace.sampled"
+ jaegerNotSampledFlag = "0"
+)
+
+// injectOpentracingIdsToTags writes trace data to ctxtags.
+// This is done in an incredibly hacky way, because the public-facing interface of opentracing doesn't give access to
+// the TraceId and SpanId of the SpanContext. Only the Tracer's Inject/Extract methods know what these are.
+// Most tracers have them encoded as keys with 'traceid' and 'spanid':
+// https://github.com/openzipkin/zipkin-go-opentracing/blob/594640b9ef7e5c994e8d9499359d693c032d738c/propagation_ot.go#L29
+// https://github.com/opentracing/basictracer-go/blob/1b32af207119a14b1b231d451df3ed04a72efebf/propagation_ot.go#L26
+// Jaeger from Uber use one-key schema with next format '{trace-id}:{span-id}:{parent-span-id}:{flags}'
+// https://www.jaegertracing.io/docs/client-libraries/#trace-span-identity
+func injectOpentracingIdsToTags(span opentracing.Span, tags grpc_ctxtags.Tags) {
+ if err := span.Tracer().Inject(span.Context(), opentracing.HTTPHeaders, &tagsCarrier{tags}); err != nil {
+ grpclog.Infof("grpc_opentracing: failed extracting trace info into ctx %v", err)
+ }
+}
+
+// tagsCarrier is a really hacky way of
+type tagsCarrier struct {
+ grpc_ctxtags.Tags
+}
+
+func (t *tagsCarrier) Set(key, val string) {
+ key = strings.ToLower(key)
+ if strings.Contains(key, "traceid") {
+ t.Tags.Set(TagTraceId, val) // this will most likely be base-16 (hex) encoded
+ }
+
+ if strings.Contains(key, "spanid") && !strings.Contains(strings.ToLower(key), "parent") {
+ t.Tags.Set(TagSpanId, val) // this will most likely be base-16 (hex) encoded
+ }
+
+ if strings.Contains(key, "sampled") {
+ switch val {
+ case "true", "false":
+ t.Tags.Set(TagSampled, val)
+ }
+ }
+
+ if key == "uber-trace-id" {
+ parts := strings.Split(val, ":")
+ if len(parts) == 4 {
+ t.Tags.Set(TagTraceId, parts[0])
+ t.Tags.Set(TagSpanId, parts[1])
+
+ if parts[3] != jaegerNotSampledFlag {
+ t.Tags.Set(TagSampled, "true")
+ } else {
+ t.Tags.Set(TagSampled, "false")
+ }
+ }
+ }
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/metadata.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/metadata.go
new file mode 100644
index 0000000..38f251d
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/metadata.go
@@ -0,0 +1,56 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_opentracing
+
+import (
+ "encoding/base64"
+ "strings"
+
+ "fmt"
+
+ "google.golang.org/grpc/metadata"
+)
+
+const (
+ binHdrSuffix = "-bin"
+)
+
+// metadataTextMap extends a metadata.MD to be an opentracing textmap
+type metadataTextMap metadata.MD
+
+// Set is a opentracing.TextMapReader interface that extracts values.
+func (m metadataTextMap) Set(key, val string) {
+ // gRPC allows for complex binary values to be written.
+ encodedKey, encodedVal := encodeKeyValue(key, val)
+ // The metadata object is a multimap, and previous values may exist, but for opentracing headers, we do not append
+ // we just override.
+ m[encodedKey] = []string{encodedVal}
+}
+
+// ForeachKey is a opentracing.TextMapReader interface that extracts values.
+func (m metadataTextMap) ForeachKey(callback func(key, val string) error) error {
+ for k, vv := range m {
+ for _, v := range vv {
+ if decodedKey, decodedVal, err := metadata.DecodeKeyValue(k, v); err == nil {
+ if err = callback(decodedKey, decodedVal); err != nil {
+ return err
+ }
+ } else {
+ return fmt.Errorf("failed decoding opentracing from gRPC metadata: %v", err)
+ }
+ }
+ }
+ return nil
+}
+
+// encodeKeyValue encodes key and value qualified for transmission via gRPC.
+// note: copy pasted from private values of grpc.metadata
+func encodeKeyValue(k, v string) (string, string) {
+ k = strings.ToLower(k)
+ if strings.HasSuffix(k, binHdrSuffix) {
+ val := base64.StdEncoding.EncodeToString([]byte(v))
+ v = string(val)
+ }
+ return k, v
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/options.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/options.go
new file mode 100644
index 0000000..e75102b
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/options.go
@@ -0,0 +1,55 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_opentracing
+
+import (
+ "context"
+
+ "github.com/opentracing/opentracing-go"
+)
+
+var (
+ defaultOptions = &options{
+ filterOutFunc: nil,
+ tracer: nil,
+ }
+)
+
+// FilterFunc allows users to provide a function that filters out certain methods from being traced.
+//
+// If it returns false, the given request will not be traced.
+type FilterFunc func(ctx context.Context, fullMethodName string) bool
+
+type options struct {
+ filterOutFunc FilterFunc
+ tracer opentracing.Tracer
+}
+
+func evaluateOptions(opts []Option) *options {
+ optCopy := &options{}
+ *optCopy = *defaultOptions
+ for _, o := range opts {
+ o(optCopy)
+ }
+ if optCopy.tracer == nil {
+ optCopy.tracer = opentracing.GlobalTracer()
+ }
+ return optCopy
+}
+
+type Option func(*options)
+
+// WithFilterFunc customizes the function used for deciding whether a given call is traced or not.
+func WithFilterFunc(f FilterFunc) Option {
+ return func(o *options) {
+ o.filterOutFunc = f
+ }
+}
+
+// WithTracer sets a custom tracer to be used for this middleware, otherwise the opentracing.GlobalTracer is used.
+func WithTracer(tracer opentracing.Tracer) Option {
+ return func(o *options) {
+ o.tracer = tracer
+ }
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/server_interceptors.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/server_interceptors.go
new file mode 100644
index 0000000..53764a0
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/server_interceptors.go
@@ -0,0 +1,87 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_opentracing
+
+import (
+ "github.com/grpc-ecosystem/go-grpc-middleware"
+ "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
+ "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/ext"
+ "github.com/opentracing/opentracing-go/log"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/grpclog"
+)
+
+var (
+ grpcTag = opentracing.Tag{Key: string(ext.Component), Value: "gRPC"}
+)
+
+// UnaryServerInterceptor returns a new unary server interceptor for OpenTracing.
+func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
+ o := evaluateOptions(opts)
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ if o.filterOutFunc != nil && !o.filterOutFunc(ctx, info.FullMethod) {
+ return handler(ctx, req)
+ }
+ newCtx, serverSpan := newServerSpanFromInbound(ctx, o.tracer, info.FullMethod)
+ resp, err := handler(newCtx, req)
+ finishServerSpan(ctx, serverSpan, err)
+ return resp, err
+ }
+}
+
+// StreamServerInterceptor returns a new streaming server interceptor for OpenTracing.
+func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
+ o := evaluateOptions(opts)
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if o.filterOutFunc != nil && !o.filterOutFunc(stream.Context(), info.FullMethod) {
+ return handler(srv, stream)
+ }
+ newCtx, serverSpan := newServerSpanFromInbound(stream.Context(), o.tracer, info.FullMethod)
+ wrappedStream := grpc_middleware.WrapServerStream(stream)
+ wrappedStream.WrappedContext = newCtx
+ err := handler(srv, wrappedStream)
+ finishServerSpan(newCtx, serverSpan, err)
+ return err
+ }
+}
+
+func newServerSpanFromInbound(ctx context.Context, tracer opentracing.Tracer, fullMethodName string) (context.Context, opentracing.Span) {
+ md := metautils.ExtractIncoming(ctx)
+ parentSpanContext, err := tracer.Extract(opentracing.HTTPHeaders, metadataTextMap(md))
+ if err != nil && err != opentracing.ErrSpanContextNotFound {
+ grpclog.Printf("grpc_opentracing: failed parsing trace information: %v", err)
+ }
+
+ serverSpan := tracer.StartSpan(
+ fullMethodName,
+ // this is magical, it attaches the new span to the parent parentSpanContext, and creates an unparented one if empty.
+ ext.RPCServerOption(parentSpanContext),
+ grpcTag,
+ )
+
+ injectOpentracingIdsToTags(serverSpan, grpc_ctxtags.Extract(ctx))
+ return opentracing.ContextWithSpan(ctx, serverSpan), serverSpan
+}
+
+func finishServerSpan(ctx context.Context, serverSpan opentracing.Span, err error) {
+ // Log context information
+ tags := grpc_ctxtags.Extract(ctx)
+ for k, v := range tags.Values() {
+ // Don't tag errors, log them instead.
+ if vErr, ok := v.(error); ok {
+ serverSpan.LogKV(k, vErr.Error())
+
+ } else {
+ serverSpan.SetTag(k, v)
+ }
+ }
+ if err != nil {
+ ext.Error.Set(serverSpan, true)
+ serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
+ }
+ serverSpan.Finish()
+}