[VOL-3070]Enrich span with 'device-id' and propagate context into go routine carrying the span information
Change-Id: I6509de7542942dbcc29a090a47ff0a2732507860
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 516cc91..727f64d 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -150,6 +150,7 @@
agent.device = device
}
startSucceeded = true
+ log.EnrichSpan(ctx, log.Fields{"device-id": agent.deviceID})
logger.Debugw(ctx, "device-agent-started", log.Fields{"device-id": agent.deviceID})
return agent.getDeviceReadOnly(ctx)
@@ -303,7 +304,7 @@
// Adopt the device if it was in pre-provision state. In all other cases, try to re-enable it.
var ch chan *kafka.RpcResponse
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
} else {
@@ -416,7 +417,7 @@
return err
}
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
if err != nil {
cancel()
@@ -435,7 +436,7 @@
logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
if err != nil {
cancel()
@@ -464,7 +465,7 @@
// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
// adapter
if previousState != ic.AdminState_PREPROVISIONED {
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.DeleteDevice(subCtx, cloned)
if err != nil {
cancel()
@@ -541,7 +542,7 @@
agent.reconcileWithKVStore(ctx)
}
// Send packet to adapter
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.PacketOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
if err != nil {
cancel()
@@ -636,7 +637,7 @@
device := agent.getDeviceReadOnlyWithoutLock()
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
if err != nil {
cancel()
@@ -669,7 +670,7 @@
// release lock before processing transition
agent.requestQueue.RequestComplete()
- if err := agent.deviceMgr.processTransition(context.Background(), device, previousState); err != nil {
+ if err := agent.deviceMgr.processTransition(log.WithSpanFromContext(context.Background(), ctx), device, previousState); err != nil {
log.Errorw("failed-process-transition", log.Fields{"deviceId": device.Id, "previousAdminState": previousState.Admin, "currentAdminState": device.AdminState})
}
return nil
@@ -710,7 +711,7 @@
}
//send request to adapter
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId)
if err != nil {
cancel()
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 94376d1..56b09da 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -91,7 +91,7 @@
}
// Send update to adapters
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
@@ -150,7 +150,7 @@
}
// Send update to adapters
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
@@ -217,7 +217,7 @@
}
}
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
response := coreutils.NewResponse()
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index 96254e4..720e240 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -94,7 +94,7 @@
}
// Send update to adapters
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
updatedAllGroups := agent.listDeviceGroups()
@@ -152,7 +152,7 @@
}
// Send update to adapters
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
updatedAllGroups := agent.listDeviceGroups()
@@ -215,7 +215,7 @@
}
}
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
response := coreutils.NewResponse()
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index 0dc56bf..0f9c75e 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -56,7 +56,7 @@
}
// Send the request to the adapter
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg)
if err != nil {
cancel()
@@ -106,7 +106,7 @@
if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return nil, err
}
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.CancelImageDownload(subCtx, cloned, img)
if err != nil {
cancel()
@@ -147,7 +147,7 @@
return nil, err
}
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, cloned, img)
if err != nil {
cancel()
@@ -188,7 +188,7 @@
return nil, err
}
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, cloned, img)
if err != nil {
cancel()
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
index 8677029..d9df968 100644
--- a/rw_core/core/device/agent_pm_config.go
+++ b/rw_core/core/device/agent_pm_config.go
@@ -39,7 +39,7 @@
return err
}
// Send the request to the adapter
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
if err != nil {
cancel()
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index 2840235..8e68318 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -84,7 +84,7 @@
// TODO: VOL-2707
logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
}
- }(portID, context.Background())
+ }(portID, log.WithSpanFromContext(context.Background(), ctx))
}
}
portHandle.Unlock()
@@ -242,7 +242,7 @@
if err != nil {
return err
}
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.DisablePort(ctx, device, &newPort)
if err != nil {
cancel()
@@ -278,7 +278,7 @@
if err != nil {
return err
}
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.EnablePort(ctx, device, &newPort)
if err != nil {
cancel()
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index a581c5b..7bfa264 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -134,7 +134,7 @@
// Setup the logicalports - internal processing, no need to propagate the client context
go func() {
- err := agent.setupLogicalPorts(context.Background())
+ err := agent.setupLogicalPorts(log.WithSpanFromContext(context.Background(), ctx))
if err != nil {
logger.Errorw(ctx, "unable-to-setup-logical-ports", log.Fields{"error": err})
}
@@ -169,7 +169,7 @@
// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
if loadFromDB {
go func() {
- if err := agent.buildRoutes(context.Background()); err != nil {
+ if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
logger.Warn(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
@@ -225,7 +225,7 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
defer cancel()
start := time.Now()
if err := agent.deviceMgr.addFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
@@ -247,7 +247,7 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
defer cancel()
start := time.Now()
if err := agent.deviceMgr.deleteFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
@@ -273,7 +273,7 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
defer cancel()
if err := agent.deviceMgr.updateFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
logger.Errorw(ctx, "flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
@@ -300,7 +300,7 @@
}
logger.Debugw(ctx, "uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
go func(uniPort uint32, metadata *voltha.FlowMetadata) {
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
defer cancel()
if err := agent.deviceMgr.deleteParentFlows(subCtx, agent.rootDeviceID, uniPort, metadata); err != nil {
logger.Error(ctx, "flow-delete-failed-from-parent-device", log.Fields{
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 1f8137d..eb3e880 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -170,7 +170,7 @@
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
logger.Infow(ctx, "failed-to-add-flows-will-attempt-deletion", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
// Revert added flows
- if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
+ if err := agent.revertAddedFlows(log.WithSpanFromContext(context.Background(), ctx), mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
logger.Errorw(ctx, "failure-to-delete-flows-after-failed-addition", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 3ae6534..b8bec82 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -58,7 +58,7 @@
case voltha.Port_PON_OLT:
// Rebuilt the routes on Parent PON port addition
go func() {
- if err := agent.buildRoutes(context.Background()); err != nil {
+ if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
// Not an error - temporary state
logger.Infow(ctx, "failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
}
@@ -67,7 +67,7 @@
case voltha.Port_PON_ONU:
// Add the routes corresponding to that child device
go func() {
- if err := agent.updateAllRoutes(context.Background(), device.Id, devicePorts); err != nil {
+ if err := agent.updateAllRoutes(log.WithSpanFromContext(context.Background(), ctx), device.Id, devicePorts); err != nil {
// Not an error - temporary state
logger.Infow(ctx, "failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
}
@@ -113,7 +113,7 @@
logger.Error(ctx, "setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
}
- }(context.Background(), child)
+ }(log.WithSpanFromContext(context.Background(), ctx), child)
}
// Wait for completion
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
@@ -215,7 +215,7 @@
// Reset the logical device routes
go func() {
- if err := agent.buildRoutes(context.Background()); err != nil {
+ if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
logger.Warnw(ctx, "device-routes-not-ready", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
}
}()
@@ -245,7 +245,7 @@
// Reset the logical device routes
go func() {
- if err := agent.buildRoutes(context.Background()); err != nil {
+ if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
logger.Warnw(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
@@ -334,7 +334,7 @@
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
// First setup the routes
- if err := agent.updateRoutes(context.Background(), deviceID, devicePorts, nniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
+ if err := agent.updateRoutes(log.WithSpanFromContext(context.Background(), ctx), deviceID, devicePorts, nniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": nniPort.OfpPort.PortNo, "error": err})
@@ -384,15 +384,16 @@
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
+ ctx = log.WithSpanFromContext(context.Background(), ctx)
// First setup the routes
- if err := agent.updateRoutes(context.Background(), deviceID, devicePorts, uniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
+ if err := agent.updateRoutes(ctx, deviceID, devicePorts, uniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": uniPort.OfpPort.PortNo, "error": err})
}
// send event, and allow any queued events to be sent as well
- queuePosition.send(context.Background(), agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, uniPort.OfpPort)
+ queuePosition.send(ctx, agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, uniPort.OfpPort)
}()
return nil
}
@@ -400,7 +401,7 @@
// send is a convenience to avoid calling both assignQueuePosition and qp.send
func (e *orderedEvents) send(ctx context.Context, agent *LogicalAgent, deviceID string, reason ofp.OfpPortReason, desc *ofp.OfpPort) {
qp := e.assignQueuePosition()
- go qp.send(context.Background(), agent, deviceID, reason, desc)
+ go qp.send(log.WithSpanFromContext(context.Background(), ctx), agent, deviceID, reason, desc)
}
// TODO: shouldn't need to guarantee event ordering like this
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index ec4a528..457b5eb 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -139,7 +139,7 @@
go func() {
//TODO: either wait for the agent to be started before returning, or
// implement locks in the agent to ensure request are not processed before start() is complete
- err := agent.start(context.Background(), false)
+ err := agent.start(log.WithSpanFromContext(context.Background(), ctx), false)
if err != nil {
logger.Errorw(ctx, "unable-to-create-the-logical-device", log.Fields{"error": err})
ldMgr.deleteLogicalDeviceAgent(id)
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 19dab3d..148d67a 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -173,6 +173,8 @@
// EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
func (dMgr *Manager) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+
logger.Debugw(ctx, "EnableDevice", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
@@ -183,6 +185,8 @@
// DisableDevice disables a device along with any child device it may have
func (dMgr *Manager) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+
logger.Debugw(ctx, "DisableDevice", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
@@ -193,6 +197,8 @@
//RebootDevice invoked the reboot API to the corresponding adapter
func (dMgr *Manager) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+
logger.Debugw(ctx, "RebootDevice", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
@@ -203,6 +209,8 @@
// DeleteDevice removes a device from the data model
func (dMgr *Manager) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+
logger.Debugw(ctx, "DeleteDevice", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
@@ -223,6 +231,8 @@
// ListDevicePorts returns the ports details for a specific device entry
func (dMgr *Manager) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+
logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
@@ -240,6 +250,8 @@
// ListDeviceFlows returns the flow details for a specific device entry
func (dMgr *Manager) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*ofp.Flows, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+
logger.Debugw(ctx, "ListDeviceFlows", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
@@ -257,6 +269,8 @@
// ListDeviceFlowGroups returns the flow group details for a specific device entry
func (dMgr *Manager) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+
logger.Debugw(ctx, "ListDeviceFlowGroups", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
@@ -300,6 +314,7 @@
// GetDevice exists primarily to implement the gRPC interface.
// Internal functions should call getDeviceReadOnly instead.
func (dMgr *Manager) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
return dMgr.getDeviceReadOnly(ctx, id.Id)
}
@@ -782,7 +797,7 @@
if err != nil {
return err
}
- if err = dMgr.logicalDeviceMgr.updateLogicalPort(context.Background(), device, ports, port); err != nil {
+ if err = dMgr.logicalDeviceMgr.updateLogicalPort(log.WithSpanFromContext(context.Background(), ctx), device, ports, port); err != nil {
return err
}
return nil
@@ -796,7 +811,7 @@
}
// Setup peer ports in its own routine
go func() {
- if err := dMgr.addPeerPort(context.Background(), deviceID, port); err != nil {
+ if err := dMgr.addPeerPort(log.WithSpanFromContext(context.Background(), ctx), deviceID, port); err != nil {
logger.Errorw(ctx, "unable-to-add-peer-port", log.Fields{"error": err, "device-id": deviceID})
}
}()
@@ -844,6 +859,8 @@
// UpdateDevicePmConfigs updates the PM configs. This is executed when the northbound gRPC API is invoked, typically
// following a user action
func (dMgr *Manager) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": configs.Id})
+
if configs.Id == "" {
return nil, status.Error(codes.FailedPrecondition, "invalid-device-Id")
}
@@ -867,6 +884,8 @@
// ListDevicePmConfigs returns pm configs of device
func (dMgr *Manager) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -926,7 +945,7 @@
// Do this for NNI and UNIs only. PON ports are not known by logical device
if portType == voltha.Port_ETHERNET_NNI || portType == voltha.Port_ETHERNET_UNI {
go func() {
- err := dMgr.logicalDeviceMgr.updatePortState(context.Background(), deviceID, portNo, operStatus)
+ err := dMgr.logicalDeviceMgr.updatePortState(log.WithSpanFromContext(context.Background(), ctx), deviceID, portNo, operStatus)
if err != nil {
// While we want to handle (catch) and log when
// an update to a port was not able to be
@@ -954,7 +973,7 @@
// typically is part of a device deletion phase.
if device, err := dMgr.getDeviceReadOnly(ctx, deviceID); err == nil {
go func() {
- if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(context.Background(), device); err != nil {
+ if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(log.WithSpanFromContext(context.Background(), ctx), device); err != nil {
logger.Errorw(ctx, "unable-to-delete-logical-ports", log.Fields{"error": err})
}
}()
@@ -1047,7 +1066,7 @@
// Activate the child device
if agent = dMgr.getDeviceAgent(ctx, agent.deviceID); agent != nil {
go func() {
- err := agent.enableDevice(context.Background())
+ err := agent.enableDevice(log.WithSpanFromContext(context.Background(), ctx))
if err != nil {
logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
}
@@ -1205,7 +1224,7 @@
if err != nil {
logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
}
- }(context.Background())
+ }(log.WithSpanFromContext(context.Background(), ctx))
} else {
err = status.Errorf(codes.Unavailable, "no agent for child device %s", childDeviceID)
logger.Errorw(ctx, "no-child-device-agent", log.Fields{"parentDeviceId": parentDeviceID, "childId": childDeviceID})
@@ -1324,6 +1343,8 @@
// DownloadImage execute an image download request
func (dMgr *Manager) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
+
logger.Debugw(ctx, "DownloadImage", log.Fields{"device-id": img.Id, "imageName": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
@@ -1338,6 +1359,8 @@
// CancelImageDownload cancels image download request
func (dMgr *Manager) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
+
logger.Debugw(ctx, "CancelImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
@@ -1352,6 +1375,8 @@
// ActivateImageUpdate activates image update request
func (dMgr *Manager) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
+
logger.Debugw(ctx, "ActivateImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
@@ -1366,6 +1391,8 @@
// RevertImageUpdate reverts image update
func (dMgr *Manager) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
+
logger.Debugw(ctx, "RevertImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
@@ -1383,6 +1410,8 @@
// GetImageDownloadStatus returns status of image download
func (dMgr *Manager) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
+
logger.Debugw(ctx, "GetImageDownloadStatus", log.Fields{"device-id": img.Id, "imageName": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
@@ -1396,6 +1425,8 @@
}
func (dMgr *Manager) UpdateImageDownload(ctx context.Context, deviceID string, img *voltha.ImageDownload) error {
+ log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
+
logger.Debugw(ctx, "UpdateImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.updateImageDownload(ctx, img); err != nil {
@@ -1410,6 +1441,8 @@
// GetImageDownload returns image download
func (dMgr *Manager) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
+
logger.Debugw(ctx, "GetImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
@@ -1424,6 +1457,8 @@
// ListImageDownloads returns image downloads
func (dMgr *Manager) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+
logger.Debugw(ctx, "ListImageDownloads", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
@@ -1438,6 +1473,8 @@
// GetImages returns all images for a specific device entry
func (dMgr *Manager) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
+
logger.Debugw(ctx, "GetImages", log.Fields{"device-id": id.Id})
device, err := dMgr.getDeviceReadOnly(ctx, id.Id)
if err != nil {
@@ -1502,6 +1539,8 @@
}
func (dMgr *Manager) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": port.DeviceId})
+
logger.Debugw(ctx, "EnablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
if agent == nil {
@@ -1511,6 +1550,8 @@
}
func (dMgr *Manager) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": port.DeviceId})
+
logger.Debugw(ctx, "DisablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
if agent == nil {
@@ -1533,6 +1574,8 @@
}
func (dMgr *Manager) StartOmciTestAction(ctx context.Context, request *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": request.Id})
+
logger.Debugw(ctx, "StartOmciTestAction", log.Fields{"device-id": request.Id, "uuid": request.Uuid})
agent := dMgr.getDeviceAgent(ctx, request.Id)
if agent == nil {
@@ -1542,6 +1585,8 @@
}
func (dMgr *Manager) GetExtValue(ctx context.Context, value *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
+ log.EnrichSpan(ctx, log.Fields{"device-id": value.Id})
+
log.Debugw("getExtValue", log.Fields{"onu-id": value.Id})
cDevice, err := dMgr.getDeviceReadOnly(ctx, value.Id)
if err != nil {
diff --git a/rw_core/core/device/remote/adapter_proxy.go b/rw_core/core/device/remote/adapter_proxy.go
index 6654faf..3d25a43 100755
--- a/rw_core/core/device/remote/adapter_proxy.go
+++ b/rw_core/core/device/remote/adapter_proxy.go
@@ -303,7 +303,7 @@
{Key: "flow_metadata", Value: flowMetadata},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(log.WithSpanFromContext(context.TODO(), ctx), rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// UpdateFlowsIncremental invokes update flows incremental rpc
@@ -329,7 +329,7 @@
{Key: "flow_metadata", Value: flowMetadata},
}
replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
+ return ap.sendRPC(log.WithSpanFromContext(context.TODO(), ctx), rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// UpdatePmConfigs invokes update pm configs rpc