[VOL-3196] Using the latest voltha-lib-go in the openolt adapter
Change-Id: I0c6d166f3dda8e9cca9e19a2f33aeee96a8cc6c3
diff --git a/VERSION b/VERSION
index 9080d9c..73462a5 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.5.1-dev
+2.5.1
diff --git a/cmd/openolt-adapter/main.go b/cmd/openolt-adapter/main.go
index 1da44c4..14c47d2 100644
--- a/cmd/openolt-adapter/main.go
+++ b/cmd/openolt-adapter/main.go
@@ -120,7 +120,7 @@
a.coreProxy = com.NewCoreProxy(ctx, a.kip, a.config.Topic, a.config.CoreTopic)
// Create the adaptor proxy to handle request between olt and onu
- a.adapterProxy = com.NewAdapterProxy(ctx, a.kip, "brcm_openomci_onu", a.config.CoreTopic, cm.Backend)
+ a.adapterProxy = com.NewAdapterProxy(ctx, a.kip, a.config.CoreTopic, cm.Backend)
// Create the event proxy to post events to KAFKA
a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
diff --git a/go.mod b/go.mod
index 3fd611a..a672acb 100644
--- a/go.mod
+++ b/go.mod
@@ -8,7 +8,7 @@
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.2
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
- github.com/opencord/voltha-lib-go/v3 v3.2.3
+ github.com/opencord/voltha-lib-go/v3 v3.2.5
github.com/opencord/voltha-protos/v3 v3.4.1
github.com/opentracing/opentracing-go v1.1.0
go.etcd.io/etcd v0.0.0-20190930204107-236ac2a90522
diff --git a/go.sum b/go.sum
index 0c98513..29f60a8 100644
--- a/go.sum
+++ b/go.sum
@@ -204,8 +204,8 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v3 v3.2.3 h1:VN+PywEuABvSucQd2PS5828m/bqmQnTD/UNl9wR+pFs=
-github.com/opencord/voltha-lib-go/v3 v3.2.3/go.mod h1:7CnNtVE/O8y80Xw7JXtaWQ61HxL7yWXlB0/uH5R/X/Y=
+github.com/opencord/voltha-lib-go/v3 v3.2.5 h1:njPoUEcRu6gATMeKJqbQM4lnh+SKC1iRJm++muSBCQ8=
+github.com/opencord/voltha-lib-go/v3 v3.2.5/go.mod h1:MIWsmMGxIRLK8dG+CNVcE/cJCzOondyRbwJ1708BYgU=
github.com/opencord/voltha-protos/v3 v3.4.1 h1:Nrr0jKy7WiMiFlXYiK8z73RvI4K4L5b/top7d3htQQI=
github.com/opencord/voltha-protos/v3 v3.4.1/go.mod h1:nl1ETp5Iw3avxOaKD8BJlYY5wYI4KeV95aT1pL63nto=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
index ca44d0d..8588fe4 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
@@ -30,19 +30,17 @@
type AdapterProxy struct {
kafkaICProxy kafka.InterContainerProxy
- adapterTopic string
coreTopic string
endpointMgr kafka.EndpointManager
}
-func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, coreTopic string, backend *db.Backend) *AdapterProxy {
proxy := AdapterProxy{
kafkaICProxy: kafkaProxy,
- adapterTopic: adapterTopic,
coreTopic: coreTopic,
endpointMgr: kafka.NewEndpointManager(backend),
}
- logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+ logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic})
return &proxy
}
@@ -65,11 +63,17 @@
return err
}
+ // Set up the required rpc arguments
+ endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
+ if err != nil {
+ return err
+ }
+
//Build the inter adapter message
header := &ic.InterAdapterHeader{
Type: msgType,
FromTopic: fromAdapter,
- ToTopic: toAdapter,
+ ToTopic: string(endpoint),
ToDeviceId: toDeviceId,
ProxyDeviceId: proxyDeviceId,
}
@@ -89,15 +93,12 @@
Value: iaMsg,
}
- // Set up the required rpc arguments
- endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
- if err != nil {
- return err
- }
topic := kafka.Topic{Name: string(endpoint)}
replyToTopic := kafka.Topic{Name: fromAdapter}
rpc := "process_inter_adapter_message"
+ // Add a indication in context to differentiate this Inter Adapter message during Span processing in Kafka IC proxy
+ ctx = context.WithValue(ctx, "inter-adapter-msg-type", msgType)
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
return unPackResponse(ctx, rpc, "", success, result)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go
index ad8b11b..2f56e42 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "common"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/common.go
index 06b8b3c..a69e290 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "config"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/common.go
index fe84b46..9d50f24 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "db"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/common.go
index 0de395f..bb38a94 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "kvstore"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/common.go
index 0328d72..2d5904b 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "flowsUtils"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
index 99b4cdf..f229d46 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "kafka"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index 368391e..92d2529 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -17,22 +17,23 @@
import (
"context"
+ "encoding/json"
"errors"
"fmt"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"reflect"
"strings"
"sync"
"time"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/google/uuid"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opentracing/opentracing-go"
)
const (
@@ -197,6 +198,13 @@
waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
+
+ spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
+ if spanArg != nil {
+ kvArgs = append(kvArgs, &spanArg[0])
+ }
+ defer span.Finish()
+
// If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
// typically the device ID.
responseTopic := replyToTopic
@@ -219,6 +227,7 @@
protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
if err != nil {
logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+ log.MarkSpanError(ctx, errors.New("cannot-format-request"))
chnl <- NewResponse(RpcFormattingError, err, nil)
return
}
@@ -227,6 +236,7 @@
var ch <-chan *ic.InterContainerMessage
if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+ log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
chnl <- NewResponse(RpcTransportError, err, nil)
return
}
@@ -260,6 +270,7 @@
case msg, ok := <-ch:
if !ok {
logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ log.MarkSpanError(ctx, errors.New("channel-closed"))
chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
}
logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
@@ -280,6 +291,7 @@
}
case <-ctx.Done():
logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+ log.MarkSpanError(ctx, errors.New("context-cancelled"))
err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
chnl <- NewResponse(RpcTimeout, err, nil)
case <-kp.doneCh:
@@ -290,10 +302,72 @@
return chnl
}
+// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
+// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
+//
+// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
+// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
+// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
+// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
+// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
+func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
+ var err error
+ var newCtx context.Context
+ var spanToInject opentracing.Span
+
+ var spanName strings.Builder
+ spanName.WriteString("kafka-")
+
+ // In case of inter adapter message, use Msg Type for constructing RPC name
+ if rpc == "process_inter_adapter_message" {
+ if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
+ spanName.WriteString("inter-adapter-")
+ rpc = msgType.String()
+ }
+ }
+
+ if isAsync {
+ spanName.WriteString("async-rpc-")
+ } else {
+ spanName.WriteString("rpc-")
+ }
+ spanName.WriteString(rpc)
+
+ if isAsync {
+ spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
+ } else {
+ spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
+ }
+
+ spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
+
+ textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
+ if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
+ logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
+ return nil, spanToInject, newCtx
+ }
+
+ var textMapJson []byte
+ if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
+ logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
+ return nil, spanToInject, newCtx
+ }
+
+ spanArg := make([]KVArg, 1)
+ spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
+ return spanArg, spanToInject, newCtx
+}
+
// InvokeRPC is used to send a request to a given topic
func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
+ spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
+ if spanArg != nil {
+ kvArgs = append(kvArgs, &spanArg[0])
+ }
+ defer span.Finish()
+
// If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
// typically the device ID.
responseTopic := replyToTopic
@@ -305,6 +379,7 @@
protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
if err != nil {
logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+ log.MarkSpanError(ctx, errors.New("cannot-format-request"))
return false, nil
}
@@ -324,6 +399,7 @@
logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
go func() {
if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
+ log.MarkSpanError(ctx, errors.New("send-failed"))
logger.Errorw(ctx, "send-failed", log.Fields{
"topic": toTopic,
"key": key,
@@ -355,6 +431,7 @@
case msg, ok := <-ch:
if !ok {
logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ log.MarkSpanError(ctx, errors.New("channel-closed"))
protoError := &ic.Error{Reason: "channel-closed"}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
@@ -372,6 +449,7 @@
return responseBody.Success, responseBody.Result
case <-ctx.Done():
logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+ log.MarkSpanError(ctx, errors.New("context-cancelled"))
// pack the error as proto any type
protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
@@ -382,6 +460,7 @@
return false, marshalledArg
case <-childCtx.Done():
logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
+ log.MarkSpanError(ctx, errors.New("context-cancelled"))
// pack the error as proto any type
protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
@@ -720,6 +799,71 @@
return append(currentArgs, protoArg)
}
+// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
+// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
+// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
+// from components currently not sending the span (e.g. openonu adapter)
+func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
+
+ for _, arg := range args {
+ if arg.Key == "span" {
+ var err error
+ var textMapString ic.StrType
+ if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
+ logger.Warnw(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
+ break
+ }
+
+ spanTextMap := make(map[string]string)
+ if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
+ logger.Warnw(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
+ break
+ }
+
+ var spanContext opentracing.SpanContext
+ if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
+ logger.Warnw(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
+ break
+ }
+
+ var receivedRpcName string
+ extractBaggage := func(k, v string) bool {
+ if k == "rpc-span-name" {
+ receivedRpcName = v
+ return false
+ }
+ return true
+ }
+
+ spanContext.ForeachBaggageItem(extractBaggage)
+
+ return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
+ }
+ }
+
+ // Create new Child Span with rpc as name if no span details were received in kafka arguments
+ var spanName strings.Builder
+ spanName.WriteString("kafka-")
+
+ // In case of inter adapter message, use Msg Type for constructing RPC name
+ if rpcName == "process_inter_adapter_message" {
+ for _, arg := range args {
+ if arg.Key == "msg" {
+ iamsg := ic.InterAdapterMessage{}
+ if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
+ spanName.WriteString("inter-adapter-")
+ rpcName = iamsg.Header.Type.String()
+ }
+ }
+ }
+ }
+
+ spanName.WriteString("rpc-")
+ spanName.WriteString(rpcName)
+
+ return opentracing.StartSpanFromContext(ctx, spanName.String())
+}
+
func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
// First extract the header to know whether this is a request - responses are handled by a different handler
@@ -732,6 +876,9 @@
if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
} else {
+ span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
+ defer span.Finish()
+
logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
// let the callee unpack the arguments as its the only one that knows the real proto type
// Augment the requestBody with the message Id as it will be used in scenarios where cores
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/utils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/utils.go
index cdd0a4e..b22ca14 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/utils.go
@@ -175,6 +175,15 @@
attrMap[k] = v
}
+
+ processBaggageItems := func(k, v string) bool {
+ if k != "rpc-span-name" {
+ attrMap[k] = v
+ }
+ return true
+ }
+
+ jspan.SpanContext().ForeachBaggageItem(processBaggageItems)
}
}
}
@@ -186,9 +195,23 @@
func EnrichSpan(ctx context.Context, keyAndValues ...Fields) {
span := opentracing.SpanFromContext(ctx)
if span != nil {
- for _, field := range keyAndValues {
- for k, v := range field {
- span.SetTag(k, v)
+ if jspan, ok := span.(*jtracing.Span); ok {
+ // Inject as a BaggageItem when the Span is the Root Span so that it propagates
+ // across the components along with Root Span (called as Trace)
+ // Else, inject as a Tag so that it is attached to the Child Task
+ isRootSpan := false
+ if jspan.SpanContext().TraceID().String() == jspan.SpanContext().SpanID().String() {
+ isRootSpan = true
+ }
+
+ for _, field := range keyAndValues {
+ for k, v := range field {
+ if isRootSpan {
+ span.SetBaggageItem(k, v.(string))
+ } else {
+ span.SetTag(k, v)
+ }
+ }
}
}
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager/common.go
index 113b39c..d266617 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "ponresourcemanager"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/common.go
index 14857ab..efd27a4 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "probe"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/techprofile/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/techprofile/common.go
index e7cd798..fe99fcd 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/techprofile/common.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/techprofile/common.go
@@ -24,7 +24,7 @@
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "techprofile"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
if err != nil {
panic(err)
}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index fb9827a..8950b43 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -72,7 +72,7 @@
github.com/mitchellh/go-homedir
# github.com/mitchellh/mapstructure v1.1.2
github.com/mitchellh/mapstructure
-# github.com/opencord/voltha-lib-go/v3 v3.2.3
+# github.com/opencord/voltha-lib-go/v3 v3.2.5
github.com/opencord/voltha-lib-go/v3/pkg/adapters
github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif
github.com/opencord/voltha-lib-go/v3/pkg/adapters/common