[VOL-3070]Create Child Span for the third party library call to etcd and propagate context to carry span information.
Change-Id: Idb1105181a2b485928421c337b95bab0124813e8
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index 505dc79..188bbbd 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -146,7 +146,7 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
logger.Debugw(ctx, "DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(ctx, rpc, device.Id, success, result)
}
@@ -170,7 +170,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw(ctx, "PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(ctx, rpc, deviceId, success, result)
}
@@ -194,7 +194,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw(ctx, "PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(ctx, rpc, deviceId, success, result)
}
@@ -215,7 +215,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw(ctx, "DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(ctx, rpc, deviceId, success, result)
}
@@ -235,7 +235,7 @@
Value: &ic.IntType{Val: int64(portNo)},
}}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
logger.Debugw(ctx, "GetDevicePort-response", log.Fields{"device-id": deviceID, "success": success})
if success {
@@ -269,7 +269,7 @@
Value: &voltha.ID{Id: deviceID},
}}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
logger.Debugw(ctx, "ListDevicePorts-response", log.Fields{"device-id": deviceID, "success": success})
if success {
@@ -317,7 +317,7 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw(ctx, "DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(ctx, rpc, deviceId, success, result)
}
@@ -368,7 +368,7 @@
Value: oId,
}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw(ctx, "ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -406,7 +406,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw(ctx, "ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
return unPackResponse(ctx, rpc, parentDeviceId, success, result)
}
@@ -426,7 +426,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw(ctx, "ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
return unPackResponse(ctx, rpc, parentDeviceId, success, result)
}
@@ -445,7 +445,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw(ctx, "GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -505,7 +505,7 @@
}
}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw(ctx, "GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -541,7 +541,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw(ctx, "GetChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -587,7 +587,7 @@
Key: "packet",
Value: pkt,
}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw(ctx, "SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
return unPackResponse(ctx, rpc, deviceId, success, result)
}
@@ -611,7 +611,7 @@
Key: "device_reason",
Value: reason,
}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw(ctx, "DeviceReason-response", log.Fields{"pDeviceId": deviceId, "success": success})
return unPackResponse(ctx, rpc, deviceId, success, result)
}
@@ -629,7 +629,7 @@
Key: "device_pm_config",
Value: pmConfigs,
}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
logger.Debugw(ctx, "DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
return unPackResponse(ctx, rpc, pmConfigs.Id, success, result)
}
@@ -646,7 +646,7 @@
{Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
}
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw(ctx, "ReconcileChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
return unPackResponse(ctx, rpc, parentDeviceId, success, result)
}
@@ -683,7 +683,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw(ctx, "PortStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(ctx, rpc, deviceId, success, result)
}
diff --git a/pkg/db/backend.go b/pkg/db/backend.go
index 60afe72..efc0953 100644
--- a/pkg/db/backend.go
+++ b/pkg/db/backend.go
@@ -181,6 +181,9 @@
// List retrieves one or more items that match the specified key
func (b *Backend) List(ctx context.Context, key string) (map[string]*kvstore.KVPair, error) {
+ span, ctx := log.CreateChildSpan(ctx, "etcd-list")
+ defer span.Finish()
+
formattedPath := b.makePath(ctx, key)
logger.Debugw(ctx, "listing-key", log.Fields{"key": key, "path": formattedPath})
@@ -193,6 +196,9 @@
// Get retrieves an item that matches the specified key
func (b *Backend) Get(ctx context.Context, key string) (*kvstore.KVPair, error) {
+ span, ctx := log.CreateChildSpan(ctx, "etcd-get")
+ defer span.Finish()
+
formattedPath := b.makePath(ctx, key)
logger.Debugw(ctx, "getting-key", log.Fields{"key": key, "path": formattedPath})
@@ -205,6 +211,9 @@
// Put stores an item value under the specifed key
func (b *Backend) Put(ctx context.Context, key string, value interface{}) error {
+ span, ctx := log.CreateChildSpan(ctx, "etcd-put")
+ defer span.Finish()
+
formattedPath := b.makePath(ctx, key)
logger.Debugw(ctx, "putting-key", log.Fields{"key": key, "path": formattedPath})
@@ -217,6 +226,9 @@
// Delete removes an item under the specified key
func (b *Backend) Delete(ctx context.Context, key string) error {
+ span, ctx := log.CreateChildSpan(ctx, "etcd-delete")
+ defer span.Finish()
+
formattedPath := b.makePath(ctx, key)
logger.Debugw(ctx, "deleting-key", log.Fields{"key": key, "path": formattedPath})
@@ -229,6 +241,9 @@
// CreateWatch starts watching events for the specified key
func (b *Backend) CreateWatch(ctx context.Context, key string, withPrefix bool) chan *kvstore.Event {
+ span, ctx := log.CreateChildSpan(ctx, "etcd-create-watch")
+ defer span.Finish()
+
formattedPath := b.makePath(ctx, key)
logger.Debugw(ctx, "creating-key-watch", log.Fields{"key": key, "path": formattedPath})
@@ -237,6 +252,9 @@
// DeleteWatch stops watching events for the specified key
func (b *Backend) DeleteWatch(ctx context.Context, key string, ch chan *kvstore.Event) {
+ span, ctx := log.CreateChildSpan(ctx, "etcd-delete-watch")
+ defer span.Finish()
+
formattedPath := b.makePath(ctx, key)
logger.Debugw(ctx, "deleting-key-watch", log.Fields{"key": key, "path": formattedPath})
diff --git a/pkg/kafka/endpoint_manager.go b/pkg/kafka/endpoint_manager.go
index a876c09..266f6c1 100644
--- a/pkg/kafka/endpoint_manager.go
+++ b/pkg/kafka/endpoint_manager.go
@@ -227,7 +227,7 @@
ep.deviceTypeServiceMap = make(map[string]string)
// Load the adapters
- blobs, err := ep.backend.List(context.Background(), "adapters")
+ blobs, err := ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "adapters")
if err != nil {
return err
}
@@ -257,7 +257,7 @@
}
}
// Load the device types
- blobs, err = ep.backend.List(context.Background(), "device_types")
+ blobs, err = ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "device_types")
if err != nil {
return err
}