[VOL-3196] Enhanced gRPC interfaces to create and propagate Span for log correlation
Change-Id: Id9f8e549aaa192f83b8ac66d4252f71da6d11449
diff --git a/VERSION b/VERSION
index 499bd1e..f225a78 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.5.2-dev
+2.5.2
diff --git a/go.mod b/go.mod
index f0513f2..b7aabf1 100644
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,7 @@
github.com/gogo/protobuf v1.3.0
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
- github.com/opencord/voltha-lib-go/v3 v3.2.2
+ github.com/opencord/voltha-lib-go/v3 v3.2.5
github.com/opencord/voltha-protos/v3 v3.4.1
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/stretchr/testify v1.4.0
diff --git a/go.sum b/go.sum
index b8ad939..07f65f1 100644
--- a/go.sum
+++ b/go.sum
@@ -198,8 +198,8 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v3 v3.2.2 h1:MqVkzoEuAztV2SjexpTzDmuHTlDKK+gN+slcZRe7Wyc=
-github.com/opencord/voltha-lib-go/v3 v3.2.2/go.mod h1:7CnNtVE/O8y80Xw7JXtaWQ61HxL7yWXlB0/uH5R/X/Y=
+github.com/opencord/voltha-lib-go/v3 v3.2.5 h1:njPoUEcRu6gATMeKJqbQM4lnh+SKC1iRJm++muSBCQ8=
+github.com/opencord/voltha-lib-go/v3 v3.2.5/go.mod h1:MIWsmMGxIRLK8dG+CNVcE/cJCzOondyRbwJ1708BYgU=
github.com/opencord/voltha-protos/v3 v3.4.1 h1:Nrr0jKy7WiMiFlXYiK8z73RvI4K4L5b/top7d3htQQI=
github.com/opencord/voltha-protos/v3 v3.4.1/go.mod h1:nl1ETp5Iw3avxOaKD8BJlYY5wYI4KeV95aT1pL63nto=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/context.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/context.go
new file mode 100644
index 0000000..583025c
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/context.go
@@ -0,0 +1,78 @@
+package grpc_ctxtags
+
+import (
+ "context"
+)
+
+type ctxMarker struct{}
+
+var (
+ // ctxMarkerKey is the Context value marker used by *all* logging middleware.
+ // The logging middleware object must interf
+ ctxMarkerKey = &ctxMarker{}
+ // NoopTags is a trivial, minimum overhead implementation of Tags for which all operations are no-ops.
+ NoopTags = &noopTags{}
+)
+
+// Tags is the interface used for storing request tags between Context calls.
+// The default implementation is *not* thread safe, and should be handled only in the context of the request.
+type Tags interface {
+ // Set sets the given key in the metadata tags.
+ Set(key string, value interface{}) Tags
+ // Has checks if the given key exists.
+ Has(key string) bool
+ // Values returns a map of key to values.
+ // Do not modify the underlying map, please use Set instead.
+ Values() map[string]interface{}
+}
+
+type mapTags struct {
+ values map[string]interface{}
+}
+
+func (t *mapTags) Set(key string, value interface{}) Tags {
+ t.values[key] = value
+ return t
+}
+
+func (t *mapTags) Has(key string) bool {
+ _, ok := t.values[key]
+ return ok
+}
+
+func (t *mapTags) Values() map[string]interface{} {
+ return t.values
+}
+
+type noopTags struct{}
+
+func (t *noopTags) Set(key string, value interface{}) Tags {
+ return t
+}
+
+func (t *noopTags) Has(key string) bool {
+ return false
+}
+
+func (t *noopTags) Values() map[string]interface{} {
+ return nil
+}
+
+// Extracts returns a pre-existing Tags object in the Context.
+// If the context wasn't set in a tag interceptor, a no-op Tag storage is returned that will *not* be propagated in context.
+func Extract(ctx context.Context) Tags {
+ t, ok := ctx.Value(ctxMarkerKey).(Tags)
+ if !ok {
+ return NoopTags
+ }
+
+ return t
+}
+
+func setInContext(ctx context.Context, tags Tags) context.Context {
+ return context.WithValue(ctx, ctxMarkerKey, tags)
+}
+
+func newTags() Tags {
+ return &mapTags{values: make(map[string]interface{})}
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/doc.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/doc.go
new file mode 100644
index 0000000..960638d
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/doc.go
@@ -0,0 +1,22 @@
+/*
+`grpc_ctxtags` adds a Tag object to the context that can be used by other middleware to add context about a request.
+
+Request Context Tags
+
+Tags describe information about the request, and can be set and used by other middleware, or handlers. Tags are used
+for logging and tracing of requests. Tags are populated both upwards, *and* downwards in the interceptor-handler stack.
+
+You can automatically extract tags (in `grpc.request.<field_name>`) from request payloads.
+
+For unary and server-streaming methods, pass in the `WithFieldExtractor` option. For client-streams and bidirectional-streams, you can
+use `WithFieldExtractorForInitialReq` which will extract the tags from the first message passed from client to server.
+Note the tags will not be modified for subsequent requests, so this option only makes sense when the initial message
+establishes the meta-data for the stream.
+
+If a user doesn't use the interceptors that initialize the `Tags` object, all operations following from an `Extract(ctx)`
+will be no-ops. This is to ensure that code doesn't panic if the interceptors weren't used.
+
+Tags fields are typed, and shallow and should follow the OpenTracing semantics convention:
+https://github.com/opentracing/specification/blob/master/semantic_conventions.md
+*/
+package grpc_ctxtags
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/fieldextractor.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/fieldextractor.go
new file mode 100644
index 0000000..549ff48
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/fieldextractor.go
@@ -0,0 +1,85 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_ctxtags
+
+import (
+ "reflect"
+)
+
+// RequestFieldExtractorFunc is a user-provided function that extracts field information from a gRPC request.
+// It is called from tags middleware on arrival of unary request or a server-stream request.
+// Keys and values will be added to the context tags of the request. If there are no fields, you should return a nil.
+type RequestFieldExtractorFunc func(fullMethod string, req interface{}) map[string]interface{}
+
+type requestFieldsExtractor interface {
+ // ExtractRequestFields is a method declared on a Protobuf message that extracts fields from the interface.
+ // The values from the extracted fields should be set in the appendToMap, in order to avoid allocations.
+ ExtractRequestFields(appendToMap map[string]interface{})
+}
+
+// CodeGenRequestFieldExtractor is a function that relies on code-generated functions that export log fields from requests.
+// These are usually coming from a protoc-plugin that generates additional information based on custom field options.
+func CodeGenRequestFieldExtractor(fullMethod string, req interface{}) map[string]interface{} {
+ if ext, ok := req.(requestFieldsExtractor); ok {
+ retMap := make(map[string]interface{})
+ ext.ExtractRequestFields(retMap)
+ if len(retMap) == 0 {
+ return nil
+ }
+ return retMap
+ }
+ return nil
+}
+
+// TagBasedRequestFieldExtractor is a function that relies on Go struct tags to export log fields from requests.
+// These are usually coming from a protoc-plugin, such as Gogo protobuf.
+//
+// message Metadata {
+// repeated string tags = 1 [ (gogoproto.moretags) = "log_field:\"meta_tags\"" ];
+// }
+//
+// The tagName is configurable using the tagName variable. Here it would be "log_field".
+func TagBasedRequestFieldExtractor(tagName string) RequestFieldExtractorFunc {
+ return func(fullMethod string, req interface{}) map[string]interface{} {
+ retMap := make(map[string]interface{})
+ reflectMessageTags(req, retMap, tagName)
+ if len(retMap) == 0 {
+ return nil
+ }
+ return retMap
+ }
+}
+
+func reflectMessageTags(msg interface{}, existingMap map[string]interface{}, tagName string) {
+ v := reflect.ValueOf(msg)
+ // Only deal with pointers to structs.
+ if v.Kind() != reflect.Ptr || v.Elem().Kind() != reflect.Struct {
+ return
+ }
+ // Deref the pointer get to the struct.
+ v = v.Elem()
+ t := v.Type()
+ for i := 0; i < v.NumField(); i++ {
+ field := v.Field(i)
+ kind := field.Kind()
+ // Only recurse down direct pointers, which should only be to nested structs.
+ if kind == reflect.Ptr {
+ reflectMessageTags(field.Interface(), existingMap, tagName)
+ }
+ // In case of arrays/splices (repeated fields) go down to the concrete type.
+ if kind == reflect.Array || kind == reflect.Slice {
+ if field.Len() == 0 {
+ continue
+ }
+ kind = field.Index(0).Kind()
+ }
+ // Only be interested in
+ if (kind >= reflect.Bool && kind <= reflect.Float64) || kind == reflect.String {
+ if tag := t.Field(i).Tag.Get(tagName); tag != "" {
+ existingMap[tag] = field.Interface()
+ }
+ }
+ }
+ return
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/interceptors.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/interceptors.go
new file mode 100644
index 0000000..038afd2
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/interceptors.go
@@ -0,0 +1,83 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_ctxtags
+
+import (
+ "github.com/grpc-ecosystem/go-grpc-middleware"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/peer"
+)
+
+// UnaryServerInterceptor returns a new unary server interceptors that sets the values for request tags.
+func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
+ o := evaluateOptions(opts)
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ newCtx := newTagsForCtx(ctx)
+ if o.requestFieldsFunc != nil {
+ setRequestFieldTags(newCtx, o.requestFieldsFunc, info.FullMethod, req)
+ }
+ return handler(newCtx, req)
+ }
+}
+
+// StreamServerInterceptor returns a new streaming server interceptor that sets the values for request tags.
+func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
+ o := evaluateOptions(opts)
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ newCtx := newTagsForCtx(stream.Context())
+ if o.requestFieldsFunc == nil {
+ // Short-circuit, don't do the expensive bit of allocating a wrappedStream.
+ wrappedStream := grpc_middleware.WrapServerStream(stream)
+ wrappedStream.WrappedContext = newCtx
+ return handler(srv, wrappedStream)
+ }
+ wrapped := &wrappedStream{stream, info, o, newCtx, true}
+ err := handler(srv, wrapped)
+ return err
+ }
+}
+
+// wrappedStream is a thin wrapper around grpc.ServerStream that allows modifying context and extracts log fields from the initial message.
+type wrappedStream struct {
+ grpc.ServerStream
+ info *grpc.StreamServerInfo
+ opts *options
+ // WrappedContext is the wrapper's own Context. You can assign it.
+ WrappedContext context.Context
+ initial bool
+}
+
+// Context returns the wrapper's WrappedContext, overwriting the nested grpc.ServerStream.Context()
+func (w *wrappedStream) Context() context.Context {
+ return w.WrappedContext
+}
+
+func (w *wrappedStream) RecvMsg(m interface{}) error {
+ err := w.ServerStream.RecvMsg(m)
+ // We only do log fields extraction on the single-request of a server-side stream.
+ if !w.info.IsClientStream || w.opts.requestFieldsFromInitial && w.initial {
+ w.initial = false
+
+ setRequestFieldTags(w.Context(), w.opts.requestFieldsFunc, w.info.FullMethod, m)
+ }
+ return err
+}
+
+func newTagsForCtx(ctx context.Context) context.Context {
+ t := newTags()
+ if peer, ok := peer.FromContext(ctx); ok {
+ t.Set("peer.address", peer.Addr.String())
+ }
+ return setInContext(ctx, t)
+}
+
+func setRequestFieldTags(ctx context.Context, f RequestFieldExtractorFunc, fullMethodName string, req interface{}) {
+ if valMap := f(fullMethodName, req); valMap != nil {
+ t := Extract(ctx)
+ for k, v := range valMap {
+ t.Set("grpc.request."+k, v)
+ }
+ }
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/options.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/options.go
new file mode 100644
index 0000000..952775f
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tags/options.go
@@ -0,0 +1,44 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_ctxtags
+
+var (
+ defaultOptions = &options{
+ requestFieldsFunc: nil,
+ }
+)
+
+type options struct {
+ requestFieldsFunc RequestFieldExtractorFunc
+ requestFieldsFromInitial bool
+}
+
+func evaluateOptions(opts []Option) *options {
+ optCopy := &options{}
+ *optCopy = *defaultOptions
+ for _, o := range opts {
+ o(optCopy)
+ }
+ return optCopy
+}
+
+type Option func(*options)
+
+// WithFieldExtractor customizes the function for extracting log fields from protobuf messages, for
+// unary and server-streamed methods only.
+func WithFieldExtractor(f RequestFieldExtractorFunc) Option {
+ return func(o *options) {
+ o.requestFieldsFunc = f
+ }
+}
+
+// WithFieldExtractorForInitialReq customizes the function for extracting log fields from protobuf messages,
+// for all unary and streaming methods. For client-streams and bidirectional-streams, the tags will be
+// extracted from the first message from the client.
+func WithFieldExtractorForInitialReq(f RequestFieldExtractorFunc) Option {
+ return func(o *options) {
+ o.requestFieldsFunc = f
+ o.requestFieldsFromInitial = true
+ }
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/client_interceptors.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/client_interceptors.go
new file mode 100644
index 0000000..f8fdecf
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/client_interceptors.go
@@ -0,0 +1,142 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_opentracing
+
+import (
+ "io"
+ "sync"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
+ opentracing "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/ext"
+ "github.com/opentracing/opentracing-go/log"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/metadata"
+)
+
+// UnaryClientInterceptor returns a new unary client interceptor for OpenTracing.
+func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
+ o := evaluateOptions(opts)
+ return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
+ return invoker(parentCtx, method, req, reply, cc, opts...)
+ }
+ newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
+ err := invoker(newCtx, method, req, reply, cc, opts...)
+ finishClientSpan(clientSpan, err)
+ return err
+ }
+}
+
+// StreamClientInterceptor returns a new streaming client interceptor for OpenTracing.
+func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
+ o := evaluateOptions(opts)
+ return func(parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
+ return streamer(parentCtx, desc, cc, method, opts...)
+ }
+ newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
+ clientStream, err := streamer(newCtx, desc, cc, method, opts...)
+ if err != nil {
+ finishClientSpan(clientSpan, err)
+ return nil, err
+ }
+ return &tracedClientStream{ClientStream: clientStream, clientSpan: clientSpan}, nil
+ }
+}
+
+// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
+// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
+// a new ClientStream according to the retry policy.
+type tracedClientStream struct {
+ grpc.ClientStream
+ mu sync.Mutex
+ alreadyFinished bool
+ clientSpan opentracing.Span
+}
+
+func (s *tracedClientStream) Header() (metadata.MD, error) {
+ h, err := s.ClientStream.Header()
+ if err != nil {
+ s.finishClientSpan(err)
+ }
+ return h, err
+}
+
+func (s *tracedClientStream) SendMsg(m interface{}) error {
+ err := s.ClientStream.SendMsg(m)
+ if err != nil {
+ s.finishClientSpan(err)
+ }
+ return err
+}
+
+func (s *tracedClientStream) CloseSend() error {
+ err := s.ClientStream.CloseSend()
+ if err != nil {
+ s.finishClientSpan(err)
+ }
+ return err
+}
+
+func (s *tracedClientStream) RecvMsg(m interface{}) error {
+ err := s.ClientStream.RecvMsg(m)
+ if err != nil {
+ s.finishClientSpan(err)
+ }
+ return err
+}
+
+func (s *tracedClientStream) finishClientSpan(err error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.alreadyFinished {
+ finishClientSpan(s.clientSpan, err)
+ s.alreadyFinished = true
+ }
+}
+
+// ClientAddContextTags returns a context with specified opentracing tags, which
+// are used by UnaryClientInterceptor/StreamClientInterceptor when creating a
+// new span.
+func ClientAddContextTags(ctx context.Context, tags opentracing.Tags) context.Context {
+ return context.WithValue(ctx, clientSpanTagKey{}, tags)
+}
+
+type clientSpanTagKey struct{}
+
+func newClientSpanFromContext(ctx context.Context, tracer opentracing.Tracer, fullMethodName string) (context.Context, opentracing.Span) {
+ var parentSpanCtx opentracing.SpanContext
+ if parent := opentracing.SpanFromContext(ctx); parent != nil {
+ parentSpanCtx = parent.Context()
+ }
+ opts := []opentracing.StartSpanOption{
+ opentracing.ChildOf(parentSpanCtx),
+ ext.SpanKindRPCClient,
+ grpcTag,
+ }
+ if tagx := ctx.Value(clientSpanTagKey{}); tagx != nil {
+ if opt, ok := tagx.(opentracing.StartSpanOption); ok {
+ opts = append(opts, opt)
+ }
+ }
+ clientSpan := tracer.StartSpan(fullMethodName, opts...)
+ // Make sure we add this to the metadata of the call, so it gets propagated:
+ md := metautils.ExtractOutgoing(ctx).Clone()
+ if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil {
+ grpclog.Printf("grpc_opentracing: failed serializing trace information: %v", err)
+ }
+ ctxWithMetadata := md.ToOutgoing(ctx)
+ return opentracing.ContextWithSpan(ctxWithMetadata, clientSpan), clientSpan
+}
+
+func finishClientSpan(clientSpan opentracing.Span, err error) {
+ if err != nil && err != io.EOF {
+ ext.Error.Set(clientSpan, true)
+ clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
+ }
+ clientSpan.Finish()
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/doc.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/doc.go
new file mode 100644
index 0000000..7a58efc
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/doc.go
@@ -0,0 +1,22 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+/*
+`grpc_opentracing` adds OpenTracing
+
+OpenTracing Interceptors
+
+These are both client-side and server-side interceptors for OpenTracing. They are a provider-agnostic, with backends
+such as Zipkin, or Google Stackdriver Trace.
+
+For a service that sends out requests and receives requests, you *need* to use both, otherwise downstream requests will
+not have the appropriate requests propagated.
+
+All server-side spans are tagged with grpc_ctxtags information.
+
+For more information see:
+http://opentracing.io/documentation/
+https://github.com/opentracing/specification/blob/master/semantic_conventions.md
+
+*/
+package grpc_opentracing
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/id_extract.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/id_extract.go
new file mode 100644
index 0000000..d19f3c8
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/id_extract.go
@@ -0,0 +1,67 @@
+package grpc_opentracing
+
+import (
+ "strings"
+
+ grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ opentracing "github.com/opentracing/opentracing-go"
+ "google.golang.org/grpc/grpclog"
+)
+
+const (
+ TagTraceId = "trace.traceid"
+ TagSpanId = "trace.spanid"
+ TagSampled = "trace.sampled"
+ jaegerNotSampledFlag = "0"
+)
+
+// injectOpentracingIdsToTags writes trace data to ctxtags.
+// This is done in an incredibly hacky way, because the public-facing interface of opentracing doesn't give access to
+// the TraceId and SpanId of the SpanContext. Only the Tracer's Inject/Extract methods know what these are.
+// Most tracers have them encoded as keys with 'traceid' and 'spanid':
+// https://github.com/openzipkin/zipkin-go-opentracing/blob/594640b9ef7e5c994e8d9499359d693c032d738c/propagation_ot.go#L29
+// https://github.com/opentracing/basictracer-go/blob/1b32af207119a14b1b231d451df3ed04a72efebf/propagation_ot.go#L26
+// Jaeger from Uber use one-key schema with next format '{trace-id}:{span-id}:{parent-span-id}:{flags}'
+// https://www.jaegertracing.io/docs/client-libraries/#trace-span-identity
+func injectOpentracingIdsToTags(span opentracing.Span, tags grpc_ctxtags.Tags) {
+ if err := span.Tracer().Inject(span.Context(), opentracing.HTTPHeaders, &tagsCarrier{tags}); err != nil {
+ grpclog.Infof("grpc_opentracing: failed extracting trace info into ctx %v", err)
+ }
+}
+
+// tagsCarrier is a really hacky way of
+type tagsCarrier struct {
+ grpc_ctxtags.Tags
+}
+
+func (t *tagsCarrier) Set(key, val string) {
+ key = strings.ToLower(key)
+ if strings.Contains(key, "traceid") {
+ t.Tags.Set(TagTraceId, val) // this will most likely be base-16 (hex) encoded
+ }
+
+ if strings.Contains(key, "spanid") && !strings.Contains(strings.ToLower(key), "parent") {
+ t.Tags.Set(TagSpanId, val) // this will most likely be base-16 (hex) encoded
+ }
+
+ if strings.Contains(key, "sampled") {
+ switch val {
+ case "true", "false":
+ t.Tags.Set(TagSampled, val)
+ }
+ }
+
+ if key == "uber-trace-id" {
+ parts := strings.Split(val, ":")
+ if len(parts) == 4 {
+ t.Tags.Set(TagTraceId, parts[0])
+ t.Tags.Set(TagSpanId, parts[1])
+
+ if parts[3] != jaegerNotSampledFlag {
+ t.Tags.Set(TagSampled, "true")
+ } else {
+ t.Tags.Set(TagSampled, "false")
+ }
+ }
+ }
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/metadata.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/metadata.go
new file mode 100644
index 0000000..38f251d
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/metadata.go
@@ -0,0 +1,56 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_opentracing
+
+import (
+ "encoding/base64"
+ "strings"
+
+ "fmt"
+
+ "google.golang.org/grpc/metadata"
+)
+
+const (
+ binHdrSuffix = "-bin"
+)
+
+// metadataTextMap extends a metadata.MD to be an opentracing textmap
+type metadataTextMap metadata.MD
+
+// Set is a opentracing.TextMapReader interface that extracts values.
+func (m metadataTextMap) Set(key, val string) {
+ // gRPC allows for complex binary values to be written.
+ encodedKey, encodedVal := encodeKeyValue(key, val)
+ // The metadata object is a multimap, and previous values may exist, but for opentracing headers, we do not append
+ // we just override.
+ m[encodedKey] = []string{encodedVal}
+}
+
+// ForeachKey is a opentracing.TextMapReader interface that extracts values.
+func (m metadataTextMap) ForeachKey(callback func(key, val string) error) error {
+ for k, vv := range m {
+ for _, v := range vv {
+ if decodedKey, decodedVal, err := metadata.DecodeKeyValue(k, v); err == nil {
+ if err = callback(decodedKey, decodedVal); err != nil {
+ return err
+ }
+ } else {
+ return fmt.Errorf("failed decoding opentracing from gRPC metadata: %v", err)
+ }
+ }
+ }
+ return nil
+}
+
+// encodeKeyValue encodes key and value qualified for transmission via gRPC.
+// note: copy pasted from private values of grpc.metadata
+func encodeKeyValue(k, v string) (string, string) {
+ k = strings.ToLower(k)
+ if strings.HasSuffix(k, binHdrSuffix) {
+ val := base64.StdEncoding.EncodeToString([]byte(v))
+ v = string(val)
+ }
+ return k, v
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/options.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/options.go
new file mode 100644
index 0000000..e75102b
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/options.go
@@ -0,0 +1,55 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_opentracing
+
+import (
+ "context"
+
+ "github.com/opentracing/opentracing-go"
+)
+
+var (
+ defaultOptions = &options{
+ filterOutFunc: nil,
+ tracer: nil,
+ }
+)
+
+// FilterFunc allows users to provide a function that filters out certain methods from being traced.
+//
+// If it returns false, the given request will not be traced.
+type FilterFunc func(ctx context.Context, fullMethodName string) bool
+
+type options struct {
+ filterOutFunc FilterFunc
+ tracer opentracing.Tracer
+}
+
+func evaluateOptions(opts []Option) *options {
+ optCopy := &options{}
+ *optCopy = *defaultOptions
+ for _, o := range opts {
+ o(optCopy)
+ }
+ if optCopy.tracer == nil {
+ optCopy.tracer = opentracing.GlobalTracer()
+ }
+ return optCopy
+}
+
+type Option func(*options)
+
+// WithFilterFunc customizes the function used for deciding whether a given call is traced or not.
+func WithFilterFunc(f FilterFunc) Option {
+ return func(o *options) {
+ o.filterOutFunc = f
+ }
+}
+
+// WithTracer sets a custom tracer to be used for this middleware, otherwise the opentracing.GlobalTracer is used.
+func WithTracer(tracer opentracing.Tracer) Option {
+ return func(o *options) {
+ o.tracer = tracer
+ }
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/server_interceptors.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/server_interceptors.go
new file mode 100644
index 0000000..53764a0
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/server_interceptors.go
@@ -0,0 +1,87 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package grpc_opentracing
+
+import (
+ "github.com/grpc-ecosystem/go-grpc-middleware"
+ "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
+ "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/ext"
+ "github.com/opentracing/opentracing-go/log"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/grpclog"
+)
+
+var (
+ grpcTag = opentracing.Tag{Key: string(ext.Component), Value: "gRPC"}
+)
+
+// UnaryServerInterceptor returns a new unary server interceptor for OpenTracing.
+func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
+ o := evaluateOptions(opts)
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ if o.filterOutFunc != nil && !o.filterOutFunc(ctx, info.FullMethod) {
+ return handler(ctx, req)
+ }
+ newCtx, serverSpan := newServerSpanFromInbound(ctx, o.tracer, info.FullMethod)
+ resp, err := handler(newCtx, req)
+ finishServerSpan(ctx, serverSpan, err)
+ return resp, err
+ }
+}
+
+// StreamServerInterceptor returns a new streaming server interceptor for OpenTracing.
+func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
+ o := evaluateOptions(opts)
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if o.filterOutFunc != nil && !o.filterOutFunc(stream.Context(), info.FullMethod) {
+ return handler(srv, stream)
+ }
+ newCtx, serverSpan := newServerSpanFromInbound(stream.Context(), o.tracer, info.FullMethod)
+ wrappedStream := grpc_middleware.WrapServerStream(stream)
+ wrappedStream.WrappedContext = newCtx
+ err := handler(srv, wrappedStream)
+ finishServerSpan(newCtx, serverSpan, err)
+ return err
+ }
+}
+
+func newServerSpanFromInbound(ctx context.Context, tracer opentracing.Tracer, fullMethodName string) (context.Context, opentracing.Span) {
+ md := metautils.ExtractIncoming(ctx)
+ parentSpanContext, err := tracer.Extract(opentracing.HTTPHeaders, metadataTextMap(md))
+ if err != nil && err != opentracing.ErrSpanContextNotFound {
+ grpclog.Printf("grpc_opentracing: failed parsing trace information: %v", err)
+ }
+
+ serverSpan := tracer.StartSpan(
+ fullMethodName,
+ // this is magical, it attaches the new span to the parent parentSpanContext, and creates an unparented one if empty.
+ ext.RPCServerOption(parentSpanContext),
+ grpcTag,
+ )
+
+ injectOpentracingIdsToTags(serverSpan, grpc_ctxtags.Extract(ctx))
+ return opentracing.ContextWithSpan(ctx, serverSpan), serverSpan
+}
+
+func finishServerSpan(ctx context.Context, serverSpan opentracing.Span, err error) {
+ // Log context information
+ tags := grpc_ctxtags.Extract(ctx)
+ for k, v := range tags.Values() {
+ // Don't tag errors, log them instead.
+ if vErr, ok := v.(error); ok {
+ serverSpan.LogKV(k, vErr.Error())
+
+ } else {
+ serverSpan.SetTag(k, v)
+ }
+ }
+ if err != nil {
+ ext.Error.Set(serverSpan, true)
+ serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
+ }
+ serverSpan.Finish()
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/metautils/doc.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/metautils/doc.go
new file mode 100644
index 0000000..1ed9bb4
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/metautils/doc.go
@@ -0,0 +1,19 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+/*
+Package `metautils` provides convenience functions for dealing with gRPC metadata.MD objects inside
+Context handlers.
+
+While the upstream grpc-go package contains decent functionality (see https://github.com/grpc/grpc-go/blob/master/Documentation/grpc-metadata.md)
+they are hard to use.
+
+The majority of functions center around the NiceMD, which is a convenience wrapper around metadata.MD. For example
+the following code allows you to easily extract incoming metadata (server handler) and put it into a new client context
+metadata.
+
+ nmd := metautils.ExtractIncoming(serverCtx).Clone(":authorization", ":custom")
+ clientCtx := nmd.Set("x-client-header", "2").Set("x-another", "3").ToOutgoing(ctx)
+*/
+
+package metautils
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/metautils/nicemd.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/metautils/nicemd.go
new file mode 100644
index 0000000..a277bee
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/metautils/nicemd.go
@@ -0,0 +1,126 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package metautils
+
+import (
+ "strings"
+
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/metadata"
+)
+
+// NiceMD is a convenience wrapper definiting extra functions on the metadata.
+type NiceMD metadata.MD
+
+// ExtractIncoming extracts an inbound metadata from the server-side context.
+//
+// This function always returns a NiceMD wrapper of the metadata.MD, in case the context doesn't have metadata it returns
+// a new empty NiceMD.
+func ExtractIncoming(ctx context.Context) NiceMD {
+ md, ok := metadata.FromIncomingContext(ctx)
+ if !ok {
+ return NiceMD(metadata.Pairs())
+ }
+ return NiceMD(md)
+}
+
+// ExtractOutgoing extracts an outbound metadata from the client-side context.
+//
+// This function always returns a NiceMD wrapper of the metadata.MD, in case the context doesn't have metadata it returns
+// a new empty NiceMD.
+func ExtractOutgoing(ctx context.Context) NiceMD {
+ md, ok := metadata.FromOutgoingContext(ctx)
+ if !ok {
+ return NiceMD(metadata.Pairs())
+ }
+ return NiceMD(md)
+}
+
+// Clone performs a *deep* copy of the metadata.MD.
+//
+// You can specify the lower-case copiedKeys to only copy certain whitelisted keys. If no keys are explicitly whitelisted
+// all keys get copied.
+func (m NiceMD) Clone(copiedKeys ...string) NiceMD {
+ newMd := NiceMD(metadata.Pairs())
+ for k, vv := range m {
+ found := false
+ if len(copiedKeys) == 0 {
+ found = true
+ } else {
+ for _, allowedKey := range copiedKeys {
+ if strings.ToLower(allowedKey) == strings.ToLower(k) {
+ found = true
+ break
+ }
+ }
+ }
+ if !found {
+ continue
+ }
+ newMd[k] = make([]string, len(vv))
+ copy(newMd[k], vv)
+ }
+ return NiceMD(newMd)
+}
+
+// ToOutgoing sets the given NiceMD as a client-side context for dispatching.
+func (m NiceMD) ToOutgoing(ctx context.Context) context.Context {
+ return metadata.NewOutgoingContext(ctx, metadata.MD(m))
+}
+
+// ToIncoming sets the given NiceMD as a server-side context for dispatching.
+//
+// This is mostly useful in ServerInterceptors..
+func (m NiceMD) ToIncoming(ctx context.Context) context.Context {
+ return metadata.NewIncomingContext(ctx, metadata.MD(m))
+}
+
+// Get retrieves a single value from the metadata.
+//
+// It works analogously to http.Header.Get, returning the first value if there are many set. If the value is not set,
+// an empty string is returned.
+//
+// The function is binary-key safe.
+func (m NiceMD) Get(key string) string {
+ k, _ := encodeKeyValue(key, "")
+ vv, ok := m[k]
+ if !ok {
+ return ""
+ }
+ return vv[0]
+}
+
+// Del retrieves a single value from the metadata.
+//
+// It works analogously to http.Header.Del, deleting all values if they exist.
+//
+// The function is binary-key safe.
+
+func (m NiceMD) Del(key string) NiceMD {
+ k, _ := encodeKeyValue(key, "")
+ delete(m, k)
+ return m
+}
+
+// Set sets the given value in a metadata.
+//
+// It works analogously to http.Header.Set, overwriting all previous metadata values.
+//
+// The function is binary-key safe.
+func (m NiceMD) Set(key string, value string) NiceMD {
+ k, v := encodeKeyValue(key, value)
+ m[k] = []string{v}
+ return m
+}
+
+// Add retrieves a single value from the metadata.
+//
+// It works analogously to http.Header.Add, as it appends to any existing values associated with key.
+//
+// The function is binary-key safe.
+func (m NiceMD) Add(key string, value string) NiceMD {
+ k, v := encodeKeyValue(key, value)
+ m[k] = append(m[k], v)
+ return m
+}
diff --git a/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/metautils/single_key.go b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/metautils/single_key.go
new file mode 100644
index 0000000..8a53871
--- /dev/null
+++ b/vendor/github.com/grpc-ecosystem/go-grpc-middleware/util/metautils/single_key.go
@@ -0,0 +1,22 @@
+// Copyright 2016 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package metautils
+
+import (
+ "encoding/base64"
+ "strings"
+)
+
+const (
+ binHdrSuffix = "-bin"
+)
+
+func encodeKeyValue(k, v string) (string, string) {
+ k = strings.ToLower(k)
+ if strings.HasSuffix(k, binHdrSuffix) {
+ val := base64.StdEncoding.EncodeToString([]byte(v))
+ v = string(val)
+ }
+ return k, v
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
index ca44d0d..8588fe4 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
@@ -30,19 +30,17 @@
type AdapterProxy struct {
kafkaICProxy kafka.InterContainerProxy
- adapterTopic string
coreTopic string
endpointMgr kafka.EndpointManager
}
-func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, coreTopic string, backend *db.Backend) *AdapterProxy {
proxy := AdapterProxy{
kafkaICProxy: kafkaProxy,
- adapterTopic: adapterTopic,
coreTopic: coreTopic,
endpointMgr: kafka.NewEndpointManager(backend),
}
- logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+ logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic})
return &proxy
}
@@ -65,11 +63,17 @@
return err
}
+ // Set up the required rpc arguments
+ endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
+ if err != nil {
+ return err
+ }
+
//Build the inter adapter message
header := &ic.InterAdapterHeader{
Type: msgType,
FromTopic: fromAdapter,
- ToTopic: toAdapter,
+ ToTopic: string(endpoint),
ToDeviceId: toDeviceId,
ProxyDeviceId: proxyDeviceId,
}
@@ -89,15 +93,12 @@
Value: iaMsg,
}
- // Set up the required rpc arguments
- endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
- if err != nil {
- return err
- }
topic := kafka.Topic{Name: string(endpoint)}
replyToTopic := kafka.Topic{Name: fromAdapter}
rpc := "process_inter_adapter_message"
+ // Add a indication in context to differentiate this Inter Adapter message during Span processing in Kafka IC proxy
+ ctx = context.WithValue(ctx, "inter-adapter-msg-type", msgType)
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
return unPackResponse(ctx, rpc, "", success, result)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go
index ad8b11b..2f56e42 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "common"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/common.go
index 06b8b3c..a69e290 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "config"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/common.go
index fe84b46..9d50f24 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "db"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/common.go
index 0de395f..bb38a94 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "kvstore"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/common.go
index 0328d72..2d5904b 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "flowsUtils"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/grpc/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/grpc/common.go
index fa53542..d873069 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/grpc/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/grpc/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "grpc"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/grpc/server.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/grpc/server.go
index 2bf7696..9f7c785 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/grpc/server.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/grpc/server.go
@@ -17,6 +17,9 @@
import (
"context"
+ grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+ grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
+ "github.com/opentracing/opentracing-go"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@@ -96,17 +99,27 @@
logger.Fatalf(ctx, "failed to listen: %v", err)
}
+ // Use Intercepters to automatically inject and publish Open Tracing Spans by this GRPC server
+ serverOptions := []grpc.ServerOption{
+ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
+ grpc_opentracing.StreamServerInterceptor(grpc_opentracing.WithTracer(opentracing.GlobalTracer())),
+ )),
+ grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
+ grpc_opentracing.UnaryServerInterceptor(grpc_opentracing.WithTracer(opentracing.GlobalTracer())),
+ mkServerInterceptor(s),
+ ))}
+
if s.secure && s.GrpcSecurity != nil {
creds, err := credentials.NewServerTLSFromFile(s.CertFile, s.KeyFile)
if err != nil {
logger.Fatalf(ctx, "could not load TLS keys: %s", err)
}
- s.gs = grpc.NewServer(grpc.Creds(creds),
- withServerUnaryInterceptor(s))
+ serverOptions = append(serverOptions, grpc.Creds(creds))
+ s.gs = grpc.NewServer(serverOptions...)
} else {
logger.Info(ctx, "starting-insecure-grpc-server")
- s.gs = grpc.NewServer(withServerUnaryInterceptor(s))
+ s.gs = grpc.NewServer(serverOptions...)
}
// Register all required services
@@ -119,10 +132,6 @@
}
}
-func withServerUnaryInterceptor(s *GrpcServer) grpc.ServerOption {
- return grpc.UnaryInterceptor(mkServerInterceptor(s))
-}
-
// Make a serverInterceptor for the given GrpcServer
// This interceptor will check whether there is an attached probe,
// and if that probe indicates NotReady, then an UNAVAILABLE
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
index 99b4cdf..f229d46 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "kafka"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index 368391e..92d2529 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -17,22 +17,23 @@
import (
"context"
+ "encoding/json"
"errors"
"fmt"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"reflect"
"strings"
"sync"
"time"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/google/uuid"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opentracing/opentracing-go"
)
const (
@@ -197,6 +198,13 @@
waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
+
+ spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
+ if spanArg != nil {
+ kvArgs = append(kvArgs, &spanArg[0])
+ }
+ defer span.Finish()
+
// If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
// typically the device ID.
responseTopic := replyToTopic
@@ -219,6 +227,7 @@
protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
if err != nil {
logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+ log.MarkSpanError(ctx, errors.New("cannot-format-request"))
chnl <- NewResponse(RpcFormattingError, err, nil)
return
}
@@ -227,6 +236,7 @@
var ch <-chan *ic.InterContainerMessage
if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+ log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
chnl <- NewResponse(RpcTransportError, err, nil)
return
}
@@ -260,6 +270,7 @@
case msg, ok := <-ch:
if !ok {
logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ log.MarkSpanError(ctx, errors.New("channel-closed"))
chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
}
logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
@@ -280,6 +291,7 @@
}
case <-ctx.Done():
logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+ log.MarkSpanError(ctx, errors.New("context-cancelled"))
err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
chnl <- NewResponse(RpcTimeout, err, nil)
case <-kp.doneCh:
@@ -290,10 +302,72 @@
return chnl
}
+// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
+// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
+//
+// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
+// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
+// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
+// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
+// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
+func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
+ var err error
+ var newCtx context.Context
+ var spanToInject opentracing.Span
+
+ var spanName strings.Builder
+ spanName.WriteString("kafka-")
+
+ // In case of inter adapter message, use Msg Type for constructing RPC name
+ if rpc == "process_inter_adapter_message" {
+ if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
+ spanName.WriteString("inter-adapter-")
+ rpc = msgType.String()
+ }
+ }
+
+ if isAsync {
+ spanName.WriteString("async-rpc-")
+ } else {
+ spanName.WriteString("rpc-")
+ }
+ spanName.WriteString(rpc)
+
+ if isAsync {
+ spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
+ } else {
+ spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
+ }
+
+ spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
+
+ textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
+ if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
+ logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
+ return nil, spanToInject, newCtx
+ }
+
+ var textMapJson []byte
+ if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
+ logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
+ return nil, spanToInject, newCtx
+ }
+
+ spanArg := make([]KVArg, 1)
+ spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
+ return spanArg, spanToInject, newCtx
+}
+
// InvokeRPC is used to send a request to a given topic
func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
+ spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
+ if spanArg != nil {
+ kvArgs = append(kvArgs, &spanArg[0])
+ }
+ defer span.Finish()
+
// If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
// typically the device ID.
responseTopic := replyToTopic
@@ -305,6 +379,7 @@
protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
if err != nil {
logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+ log.MarkSpanError(ctx, errors.New("cannot-format-request"))
return false, nil
}
@@ -324,6 +399,7 @@
logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
go func() {
if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
+ log.MarkSpanError(ctx, errors.New("send-failed"))
logger.Errorw(ctx, "send-failed", log.Fields{
"topic": toTopic,
"key": key,
@@ -355,6 +431,7 @@
case msg, ok := <-ch:
if !ok {
logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ log.MarkSpanError(ctx, errors.New("channel-closed"))
protoError := &ic.Error{Reason: "channel-closed"}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
@@ -372,6 +449,7 @@
return responseBody.Success, responseBody.Result
case <-ctx.Done():
logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+ log.MarkSpanError(ctx, errors.New("context-cancelled"))
// pack the error as proto any type
protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
@@ -382,6 +460,7 @@
return false, marshalledArg
case <-childCtx.Done():
logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
+ log.MarkSpanError(ctx, errors.New("context-cancelled"))
// pack the error as proto any type
protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
@@ -720,6 +799,71 @@
return append(currentArgs, protoArg)
}
+// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
+// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
+// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
+// from components currently not sending the span (e.g. openonu adapter)
+func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
+
+ for _, arg := range args {
+ if arg.Key == "span" {
+ var err error
+ var textMapString ic.StrType
+ if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
+ logger.Warnw(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
+ break
+ }
+
+ spanTextMap := make(map[string]string)
+ if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
+ logger.Warnw(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
+ break
+ }
+
+ var spanContext opentracing.SpanContext
+ if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
+ logger.Warnw(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
+ break
+ }
+
+ var receivedRpcName string
+ extractBaggage := func(k, v string) bool {
+ if k == "rpc-span-name" {
+ receivedRpcName = v
+ return false
+ }
+ return true
+ }
+
+ spanContext.ForeachBaggageItem(extractBaggage)
+
+ return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
+ }
+ }
+
+ // Create new Child Span with rpc as name if no span details were received in kafka arguments
+ var spanName strings.Builder
+ spanName.WriteString("kafka-")
+
+ // In case of inter adapter message, use Msg Type for constructing RPC name
+ if rpcName == "process_inter_adapter_message" {
+ for _, arg := range args {
+ if arg.Key == "msg" {
+ iamsg := ic.InterAdapterMessage{}
+ if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
+ spanName.WriteString("inter-adapter-")
+ rpcName = iamsg.Header.Type.String()
+ }
+ }
+ }
+ }
+
+ spanName.WriteString("rpc-")
+ spanName.WriteString(rpcName)
+
+ return opentracing.StartSpanFromContext(ctx, spanName.String())
+}
+
func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
// First extract the header to know whether this is a request - responses are handled by a different handler
@@ -732,6 +876,9 @@
if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
} else {
+ span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
+ defer span.Finish()
+
logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
// let the callee unpack the arguments as its the only one that knows the real proto type
// Augment the requestBody with the message Id as it will be used in scenarios where cores
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/utils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/utils.go
index a947185..b22ca14 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/utils.go
@@ -120,6 +120,13 @@
return cfg.InitGlobalTracer(componentName, jcfg.Reporter(jReporterCfgOption), jcfg.Sampler(jSamplerCfgOption))
}
+func TerminateTracing(c io.Closer) {
+ err := c.Close()
+ if err != nil {
+ defaultLogger.Error(context.Background(), "error-while-closing-jaeger-tracer", Fields{"err": err})
+ }
+}
+
// Extracts details of Execution Context as log fields from the Tracing Span injected into the
// context instance. Following log fields are extracted:
// 1. Operation Name : key as 'op-name' and value as Span operation name
@@ -168,6 +175,15 @@
attrMap[k] = v
}
+
+ processBaggageItems := func(k, v string) bool {
+ if k != "rpc-span-name" {
+ attrMap[k] = v
+ }
+ return true
+ }
+
+ jspan.SpanContext().ForeachBaggageItem(processBaggageItems)
}
}
}
@@ -179,9 +195,23 @@
func EnrichSpan(ctx context.Context, keyAndValues ...Fields) {
span := opentracing.SpanFromContext(ctx)
if span != nil {
- for _, field := range keyAndValues {
- for k, v := range field {
- span.SetTag(k, v)
+ if jspan, ok := span.(*jtracing.Span); ok {
+ // Inject as a BaggageItem when the Span is the Root Span so that it propagates
+ // across the components along with Root Span (called as Trace)
+ // Else, inject as a Tag so that it is attached to the Child Task
+ isRootSpan := false
+ if jspan.SpanContext().TraceID().String() == jspan.SpanContext().SpanID().String() {
+ isRootSpan = true
+ }
+
+ for _, field := range keyAndValues {
+ for k, v := range field {
+ if isRootSpan {
+ span.SetBaggageItem(k, v.(string))
+ } else {
+ span.SetTag(k, v)
+ }
+ }
}
}
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/common.go
index 63d4ab0..f74c3ea 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "mocks"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go
index e980b05..07ac9ec 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "mocks"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/common.go
index 14857ab..efd27a4 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "probe"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 8646644..9c5f761 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -60,6 +60,9 @@
github.com/gorilla/websocket
# github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/grpc-ecosystem/go-grpc-middleware
+github.com/grpc-ecosystem/go-grpc-middleware/tags
+github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing
+github.com/grpc-ecosystem/go-grpc-middleware/util/metautils
# github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/go-grpc-prometheus
# github.com/grpc-ecosystem/grpc-gateway v1.9.5
@@ -99,7 +102,7 @@
github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.1
github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v3 v3.2.2
+# github.com/opencord/voltha-lib-go/v3 v3.2.5
github.com/opencord/voltha-lib-go/v3/pkg/adapters
github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif
github.com/opencord/voltha-lib-go/v3/pkg/adapters/common