VOL-3501 Code changes to support rpc event

Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index c122574..84f765f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -133,7 +133,7 @@
 		agent.portLoader.Load(ctx)
 		agent.transientStateLoader.Load(ctx)
 
-		logger.Infow(ctx, "device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
+		logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
 	} else {
 		// Create a new device
 		// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
@@ -172,7 +172,7 @@
 	}
 	defer agent.requestQueue.RequestComplete()
 
-	logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parentId": agent.parentID})
+	logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
 	// Remove the device transient loader
 	if err := agent.deleteTransientState(ctx); err != nil {
 		return err
@@ -236,7 +236,7 @@
 	// TODO: Post failure message onto kafka
 }
 
-func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+func (agent *Agent) waitForAdapterForceDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
 	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
 	defer cancel()
 	select {
@@ -283,19 +283,31 @@
 
 }
 
-func (agent *Agent) waitForAdapterDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
 	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
 	defer cancel()
+	var rpce *voltha.RPCEvent
+	defer func() {
+		if rpce != nil {
+			go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+		}
+	}()
 	select {
 	case rpcResponse, ok := <-ch:
 		if !ok {
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
 			onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+			//add failure
 		} else if rpcResponse.Err != nil {
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
 			onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
+			//add failure
 		} else {
 			onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
 		}
 	case <-ctx.Done():
+		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
 		onFailure(ctx, rpc, ctx.Err(), reqArgs)
 	}
 }
@@ -326,7 +338,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "enableDevice", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
 
 	oldDevice := agent.getDeviceReadOnlyWithoutLock()
 	if oldDevice.AdminState == voltha.AdminState_ENABLED {
@@ -360,6 +372,8 @@
 	// Adopt the device if it was in pre-provision state.  In all other cases, try to re-enable it.
 	var ch chan *kafka.RpcResponse
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
 		ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
 	} else {
@@ -376,16 +390,28 @@
 
 func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
 	defer cancel()
+	var rpce *voltha.RPCEvent
+	defer func() {
+		if rpce != nil {
+			go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+		}
+	}()
 	select {
 	case rpcResponse, ok := <-ch:
 		if !ok {
+			//add failure
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
 			response.Error(status.Errorf(codes.Aborted, "channel-closed"))
 		} else if rpcResponse.Err != nil {
+			//add failure
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
 			response.Error(rpcResponse.Err)
 		} else {
 			response.Done()
 		}
 	case <-ctx.Done():
+		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
 		response.Error(ctx.Err())
 	}
 }
@@ -451,7 +477,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "disableDevice", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
 
 	cloned := agent.cloneDeviceWithoutLock()
 
@@ -476,6 +502,8 @@
 	}
 
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
 	if err != nil {
 		cancel()
@@ -491,13 +519,15 @@
 		return err
 	}
 	defer agent.requestQueue.RequestComplete()
-	logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID})
 
 	device := agent.getDeviceReadOnlyWithoutLock()
 	if agent.isDeletionInProgress() {
 		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
 	}
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
 	if err != nil {
 		cancel()
@@ -508,7 +538,7 @@
 }
 
 func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
-	logger.Debugw(ctx, "deleteDeviceForce", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
@@ -521,27 +551,29 @@
 			agent.deviceID)
 	}
 	device := agent.cloneDeviceWithoutLock()
-	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, voltha.DeviceTransientState_FORCE_DELETING,
-		previousDeviceTransientState); err != nil {
+	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+		voltha.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
 		return err
 	}
 	previousAdminState := device.AdminState
 	if previousAdminState != ic.AdminState_PREPROVISIONED {
 		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+		subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 		ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
 		if err != nil {
 			cancel()
 			return err
 		}
 		// Since it is a case of force delete, nothing needs to be done on adapter responses.
-		go agent.waitForAdapterResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
+		go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
 			agent.onFailure)
 	}
 	return nil
 }
 
 func (agent *Agent) deleteDevice(ctx context.Context) error {
-	logger.Debugw(ctx, "deleteDevice", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
@@ -561,14 +593,16 @@
 		// Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
 		currentDeviceTransientState = voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
 	}
-	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, currentDeviceTransientState,
-		previousDeviceTransientState); err != nil {
+	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+		currentDeviceTransientState, previousDeviceTransientState); err != nil {
 		return err
 	}
 	// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
 	// adapter
 	if previousAdminState != ic.AdminState_PREPROVISIONED {
 		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+		subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 		ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
 		if err != nil {
 			cancel()
@@ -578,7 +612,7 @@
 			}
 			return err
 		}
-		go agent.waitForAdapterDeleteResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
+		go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
 			agent.onDeleteFailure)
 	}
 	return nil
@@ -588,7 +622,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
+	logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})
 
 	cloned := agent.cloneDeviceWithoutLock()
 	cloned.ParentId = parentID
@@ -597,7 +631,7 @@
 
 // getSwitchCapability retrieves the switch capability of a parent device
 func (agent *Agent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
-	logger.Debugw(ctx, "getSwitchCapability", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": agent.deviceID})
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
@@ -651,6 +685,8 @@
 	}
 	//	Send packet to adapter
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.PacketOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
 	if err != nil {
 		cancel()
@@ -664,7 +700,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+	logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id})
 
 	cloned := agent.cloneDeviceWithoutLock()
 	cloned.Root = device.Root
@@ -686,14 +722,14 @@
 	cloned := agent.cloneDeviceWithoutLock()
 	// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
 	if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
-		logger.Debugw(ctx, "updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
+		logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
 		cloned.ConnectStatus = connStatus
 	}
 	if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
-		logger.Debugw(ctx, "updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
+		logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
 		cloned.OperStatus = operStatus
 	}
-	logger.Debugw(ctx, "updateDeviceStatus", log.Fields{"device-id": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
+	logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": cloned.Id, "oper-status": cloned.OperStatus, "connect-status": cloned.ConnectStatus})
 	// Store the device
 	return agent.updateDeviceAndReleaseLock(ctx, cloned)
 }
@@ -742,11 +778,13 @@
 		return err
 	}
 	defer agent.requestQueue.RequestComplete()
-	logger.Debugw(ctx, "simulateAlarm", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "simulate-alarm", log.Fields{"device-id": agent.deviceID})
 
 	device := agent.getDeviceReadOnlyWithoutLock()
 
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
 	if err != nil {
 		cancel()
@@ -778,10 +816,16 @@
 
 	// release lock before processing transition
 	agent.requestQueue.RequestComplete()
+	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
 
-	if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+	if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
 		device, prevDevice, voltha.DeviceTransientState_NONE, voltha.DeviceTransientState_NONE); err != nil {
-		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+		// Sending RPC EVENT here
+		rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+		go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+			nil, time.Now().UnixNano())
+
 	}
 	return nil
 }
@@ -805,7 +849,7 @@
 		//Reverting TransientState update
 		err := agent.updateTransientState(ctx, prevTransientState)
 		logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
-			"previousTransientState": prevTransientState, "currentTransientState": transientState})
+			"previous-transient-state": prevTransientState, "current-transient-state": transientState})
 		agent.requestQueue.RequestComplete()
 		return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
 	}
@@ -818,10 +862,14 @@
 
 	// release lock before processing transition
 	agent.requestQueue.RequestComplete()
-
-	if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+	if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
 		device, prevDevice, transientState, prevTransientState); err != nil {
-		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+		// Sending RPC EVENT here
+		rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+		go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+			nil, time.Now().UnixNano())
 	}
 	return nil
 }
@@ -829,7 +877,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "updateDeviceReason", log.Fields{"device-id": agent.deviceID, "reason": reason})
+	logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
 
 	cloned := agent.cloneDeviceWithoutLock()
 	cloned.Reason = reason
@@ -837,7 +885,7 @@
 }
 
 func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
-	logger.Debugw(ctx, "childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
+	logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
 
 	// Remove the associated peer ports on the parent device
 	for portID := range agent.portLoader.ListIDs() {
@@ -861,6 +909,8 @@
 
 	//send request to adapter
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId)
 	if err != nil {
 		cancel()
@@ -907,12 +957,12 @@
 	if err := ptypes.UnmarshalAny(rpcResponse.Reply, testResp); err != nil {
 		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
 	}
-	logger.Debugw(ctx, "Omci_test_Request-Success-device-agent", log.Fields{"testResp": testResp})
+	logger.Debugw(ctx, "omci_test_request-success-device-agent", log.Fields{"test-resp": testResp})
 	return testResp, nil
 }
 
 func (agent *Agent) getExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, valueparam *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
-	logger.Debugw(ctx, "getExtValue", log.Fields{"device-id": agent.deviceID, "onuid": valueparam.Id, "valuetype": valueparam.Value})
+	logger.Debugw(ctx, "get-ext-value", log.Fields{"device-id": agent.deviceID, "onu-id": valueparam.Id, "value-type": valueparam.Value})
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
@@ -938,12 +988,12 @@
 	if err := ptypes.UnmarshalAny(rpcResponse.Reply, Resp); err != nil {
 		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
 	}
-	logger.Debugw(ctx, "getExtValue-Success-device-agent", log.Fields{"Resp": Resp})
+	logger.Debugw(ctx, "get-ext-value-success-device-agent", log.Fields{"Resp": Resp})
 	return Resp, nil
 }
 
 func (agent *Agent) setExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (*empty.Empty, error) {
-	logger.Debugw(ctx, "setExtValue", log.Fields{"device-id": value.Id})
+	logger.Debugw(ctx, "set-ext-value", log.Fields{"device-id": value.Id})
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
@@ -965,12 +1015,12 @@
 	}
 
 	// Unmarshal and return the response
-	logger.Debug(ctx, "setExtValue-Success-device-agent")
+	logger.Debug(ctx, "set-ext-value-success-device-agent")
 	return &empty.Empty{}, nil
 }
 
 func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
-	logger.Debugw(ctx, "getSingleValue", log.Fields{"device-id": request.TargetId})
+	logger.Debugw(ctx, "get-single-value", log.Fields{"device-id": request.TargetId})
 
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
@@ -1004,7 +1054,7 @@
 }
 
 func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
-	logger.Debugw(ctx, "setSingleValue", log.Fields{"device-id": request.TargetId})
+	logger.Debugw(ctx, "set-single-value", log.Fields{"device-id": request.TargetId})
 
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 8b8e1a7..f2fd10a 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -78,7 +78,7 @@
 				flowsToAdd = append(flowsToAdd, flow)
 			} else {
 				//No need to change the flow. It is already exist.
-				logger.Debugw(ctx, "No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+				logger.Debugw(ctx, "no-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
 			}
 		}
 		flowHandle.Unlock()
@@ -92,6 +92,8 @@
 
 	// Send update to adapters
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
 
@@ -151,6 +153,8 @@
 
 	// Send update to adapters
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
 
@@ -182,7 +186,7 @@
 }
 
 func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw(ctx, "updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+	logger.Debugw(ctx, "update-flows-to-adapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 
 	if (len(updatedFlows)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
@@ -218,6 +222,8 @@
 	}
 
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	response := coreutils.NewResponse()
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index e450972..9552b78 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -81,7 +81,7 @@
 				groupsToAdd = append(groupsToAdd, group)
 			} else {
 				//No need to change the group. It is already exist.
-				logger.Debugw(ctx, "No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
+				logger.Debugw(ctx, "no-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
 			}
 		}
 
@@ -95,6 +95,8 @@
 
 	// Send update to adapters
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		updatedAllGroups := agent.listDeviceGroups()
@@ -153,6 +155,8 @@
 
 	// Send update to adapters
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		updatedAllGroups := agent.listDeviceGroups()
@@ -183,7 +187,7 @@
 }
 
 func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw(ctx, "updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+	logger.Debugw(ctx, "update-groups-to-adapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
 
 	if (len(updatedGroups)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
@@ -216,6 +220,8 @@
 	}
 
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	response := coreutils.NewResponse()
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index acebaca..eed1879 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -22,6 +22,7 @@
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
+	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
 	"google.golang.org/grpc/codes"
@@ -32,7 +33,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	logger.Debugw(ctx, "downloadImage", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "download-image", log.Fields{"device-id": agent.deviceID})
 
 	if agent.device.Root {
 		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
@@ -62,6 +63,8 @@
 
 	// Send the request to the adapter
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.DownloadImage(subCtx, cloned, clonedImg)
 	if err != nil {
 		cancel()
@@ -87,7 +90,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	logger.Debugw(ctx, "cancelImageDownload", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "cancel-image-download", log.Fields{"device-id": agent.deviceID})
 
 	// Update image download state
 	cloned := agent.cloneDeviceWithoutLock()
@@ -108,6 +111,8 @@
 			return nil, err
 		}
 		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+		subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 		ch, err := agent.adapterProxy.CancelImageDownload(subCtx, cloned, img)
 		if err != nil {
 			cancel()
@@ -123,7 +128,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	logger.Debugw(ctx, "activateImage", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "activate-image", log.Fields{"device-id": agent.deviceID})
 
 	// Update image download state
 	cloned := agent.cloneDeviceWithoutLock()
@@ -158,6 +163,8 @@
 	}
 
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, cloned, img)
 	if err != nil {
 		cancel()
@@ -174,7 +181,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
-	logger.Debugw(ctx, "revertImage", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "revert-image", log.Fields{"device-id": agent.deviceID})
 
 	// Update image download state
 	cloned := agent.cloneDeviceWithoutLock()
@@ -195,6 +202,8 @@
 	}
 
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, cloned, img)
 	if err != nil {
 		cancel()
@@ -206,7 +215,7 @@
 }
 
 func (agent *Agent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	logger.Debugw(ctx, "getImageDownloadStatus", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "get-image-download-status", log.Fields{"device-id": agent.deviceID})
 
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
@@ -261,7 +270,7 @@
 }
 
 func (agent *Agent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	logger.Debugw(ctx, "getImageDownload", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "get-image-download", log.Fields{"device-id": agent.deviceID})
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
@@ -276,7 +285,7 @@
 }
 
 func (agent *Agent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
-	logger.Debugw(ctx, "listImageDownloads", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "list-image-downloads", log.Fields{"device-id": agent.deviceID})
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
@@ -288,7 +297,7 @@
 // onImageFailure brings back the device to Enabled state and sets the image to image download_failed.
 func (agent *Agent) onImageFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		logger.Errorw(ctx, "can't obtain lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
+		logger.Errorw(ctx, "cannot-obtain-lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
 		return
 	}
 	if res, ok := response.(error); ok {
@@ -338,7 +347,7 @@
 // onImageSuccess brings back the device to Enabled state and sets the image to image download_failed.
 func (agent *Agent) onImageSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		logger.Errorw(ctx, "can't obtain lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
+		logger.Errorw(ctx, "cannot-obtain-lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
 		return
 	}
 	logger.Errorw(ctx, "rpc-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response, "args": reqArgs})
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
index 0640788..d726115 100644
--- a/rw_core/core/device/agent_pm_config.go
+++ b/rw_core/core/device/agent_pm_config.go
@@ -20,6 +20,7 @@
 	"context"
 
 	"github.com/gogo/protobuf/proto"
+	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
 	"google.golang.org/grpc/codes"
@@ -30,7 +31,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
+	logger.Debugw(ctx, "update-pm-configs", log.Fields{"device-id": pmConfigs.Id})
 
 	cloned := agent.cloneDeviceWithoutLock()
 	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
@@ -40,6 +41,8 @@
 	}
 	// Send the request to the adapter
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
 	if err != nil {
 		cancel()
@@ -53,7 +56,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
+	logger.Debugw(ctx, "init-pm-configs", log.Fields{"device-id": pmConfigs.Id})
 
 	cloned := agent.cloneDeviceWithoutLock()
 	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
@@ -61,7 +64,7 @@
 }
 
 func (agent *Agent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
-	logger.Debugw(ctx, "listPmConfigs", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "list-pm-configs", log.Fields{"device-id": agent.deviceID})
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index a3489fc..6e53d16 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -22,6 +22,7 @@
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/rw_core/core/device/port"
+	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
 	"google.golang.org/grpc/codes"
@@ -43,7 +44,7 @@
 
 // getPorts retrieves the ports information of the device based on the port type.
 func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
-	logger.Debugw(ctx, "getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
+	logger.Debugw(ctx, "get-ports", log.Fields{"device-id": agent.deviceID, "port-type": portType})
 	ports := &voltha.Ports{}
 	for _, port := range agent.listDevicePorts() {
 		if port.Type == portType {
@@ -63,7 +64,7 @@
 }
 
 func (agent *Agent) updatePortsOperState(ctx context.Context, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
-	logger.Debugw(ctx, "updatePortsOperState", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "update-ports-oper-state", log.Fields{"device-id": agent.deviceID})
 
 	for portID := range agent.portLoader.ListIDs() {
 		if portHandle, have := agent.portLoader.Lock(portID); have {
@@ -79,12 +80,14 @@
 				// Notify the logical device manager to change the port state
 				// Do this for NNI and UNIs only. PON ports are not known by logical device
 				if newPort.Type == voltha.Port_ETHERNET_NNI || newPort.Type == voltha.Port_ETHERNET_UNI {
+					subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+
 					go func(portID uint32, ctx context.Context) {
 						if err := agent.deviceMgr.logicalDeviceMgr.updatePortState(ctx, agent.deviceID, portID, operStatus); err != nil {
 							// TODO: VOL-2707
 							logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
 						}
-					}(portID, log.WithSpanFromContext(context.Background(), ctx))
+					}(portID, subCtx)
 				}
 			}
 			portHandle.Unlock()
@@ -116,7 +119,7 @@
 }
 
 func (agent *Agent) deleteAllPorts(ctx context.Context) error {
-	logger.Debugw(ctx, "deleteAllPorts", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "delete-all-ports", log.Fields{"device-id": agent.deviceID})
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
@@ -159,7 +162,7 @@
 
 	oldPort := portHandle.GetReadOnly()
 	if oldPort.Label != "" || oldPort.Type != voltha.Port_PON_OLT {
-		logger.Debugw(ctx, "port already exists", log.Fields{"port": port})
+		logger.Debugw(ctx, "port-already-exists", log.Fields{"port": port})
 		return nil
 	}
 
@@ -218,7 +221,7 @@
 }
 
 func (agent *Agent) disablePort(ctx context.Context, portID uint32) error {
-	logger.Debugw(ctx, "disablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+	logger.Debugw(ctx, "disable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
 
 	portHandle, have := agent.portLoader.Lock(portID)
 	if !have {
@@ -244,6 +247,8 @@
 		return err
 	}
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.DisablePort(ctx, device, &newPort)
 	if err != nil {
 		cancel()
@@ -254,7 +259,7 @@
 }
 
 func (agent *Agent) enablePort(ctx context.Context, portID uint32) error {
-	logger.Debugw(ctx, "enablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+	logger.Debugw(ctx, "enable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
 
 	portHandle, have := agent.portLoader.Lock(portID)
 	if !have {
@@ -280,6 +285,8 @@
 		return err
 	}
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.EnablePort(ctx, device, &newPort)
 	if err != nil {
 		cancel()
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 3d06a68..6947447 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -35,6 +35,7 @@
 	tst "github.com/opencord/voltha-go/rw_core/test"
 	com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v4/pkg/db"
+	"github.com/opencord/voltha-lib-go/v4/pkg/events"
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
 	mock_etcd "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
@@ -52,6 +53,7 @@
 	adapterMgr       *adapter.Manager
 	kmp              kafka.InterContainerProxy
 	kClient          kafka.Client
+	kEventClient     kafka.Client
 	kvClientPort     int
 	oltAdapter       *cm.OLTAdapter
 	onuAdapter       *cm.ONUAdapter
@@ -75,6 +77,7 @@
 	}
 	// Create the kafka client
 	test.kClient = mock_kafka.NewKafkaClient()
+	test.kEventClient = mock_kafka.NewKafkaClient()
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-da-test"
@@ -116,6 +119,7 @@
 func (dat *DATest) startCore(ctx context.Context) {
 	cfg := config.NewRWCoreFlags()
 	cfg.CoreTopic = "rw_core"
+	cfg.EventTopic = "voltha.events"
 	cfg.DefaultRequestTimeout = dat.defaultTimeout
 	cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
 	grpcPort, err := freeport.GetFreePort()
@@ -138,8 +142,8 @@
 	endpointMgr := kafka.NewEndpointManager(backend)
 	proxy := model.NewDBPath(backend)
 	dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
-
-	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+	eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
 	dat.adapterMgr.Start(context.Background())
 	if err = dat.kmp.Start(ctx); err != nil {
 		logger.Fatal(ctx, "Cannot start InterContainerProxy")
@@ -161,6 +165,9 @@
 	if dat.etcdServer != nil {
 		tst.StopEmbeddedEtcdServer(ctx, dat.etcdServer)
 	}
+	if dat.kEventClient != nil {
+		dat.kEventClient.Stop(ctx)
+	}
 }
 
 func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 21d535f..4489196 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -20,12 +20,18 @@
 	"context"
 	"encoding/binary"
 	"encoding/hex"
-	"sync"
-
+	"fmt"
 	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/voltha-go/rw_core/utils"
+	"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/common"
 	"github.com/opencord/voltha-protos/v4/go/openflow_13"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opentracing/opentracing-go"
+	jtracing "github.com/uber/jaeger-client-go"
+	"sync"
+	"time"
 )
 
 type Manager struct {
@@ -33,21 +39,34 @@
 	packetInQueueDone    chan bool
 	changeEventQueue     chan openflow_13.ChangeEvent
 	changeEventQueueDone chan bool
+	RPCEventManager      *RPCEventManager
 }
 
-func NewManager() *Manager {
+type RPCEventManager struct {
+	eventProxy     eventif.EventProxy
+	coreInstanceID string
+}
+
+func NewManager(proxyForRPCEvents eventif.EventProxy, instanceID string) *Manager {
 	return &Manager{
 		packetInQueue:        make(chan openflow_13.PacketIn, 100),
 		packetInQueueDone:    make(chan bool, 1),
 		changeEventQueue:     make(chan openflow_13.ChangeEvent, 100),
 		changeEventQueueDone: make(chan bool, 1),
+		RPCEventManager:      NewRPCEventManager(proxyForRPCEvents, instanceID),
 	}
 }
 
+func NewRPCEventManager(proxyForRPCEvents eventif.EventProxy, instanceID string) *RPCEventManager {
+	return &RPCEventManager{
+		eventProxy:     proxyForRPCEvents,
+		coreInstanceID: instanceID,
+	}
+}
 func (q *Manager) SendPacketIn(ctx context.Context, deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
 	// TODO: Augment the OF PacketIn to include the transactionId
 	packetIn := openflow_13.PacketIn{Id: deviceID, PacketIn: packet}
-	logger.Debugw(ctx, "SendPacketIn", log.Fields{"packetIn": packetIn})
+	logger.Debugw(ctx, "send-packet-in", log.Fields{"packet-in": packetIn})
 	q.packetInQueue <- packetIn
 }
 
@@ -66,9 +85,9 @@
 	defer streamingTracker.Unlock()
 	if _, ok := streamingTracker.calls[method]; ok {
 		// bail out the other packet in thread
-		logger.Debugf(ctx, "%s streaming call already running. Exiting it", method)
+		logger.Debugf(ctx, "%s-streaming-call-already-running-exiting-it", method)
 		done <- true
-		logger.Debugf(ctx, "Last %s exited. Continuing ...", method)
+		logger.Debugf(ctx, "last-%s-exited-continuing", method)
 	} else {
 		streamingTracker.calls[method] = &callTracker{failedPacket: nil}
 	}
@@ -79,10 +98,10 @@
 	if tracker.failedPacket != nil {
 		switch tracker.failedPacket.(type) {
 		case openflow_13.PacketIn:
-			logger.Debug(ctx, "Enqueueing last failed packetIn")
+			logger.Debug(ctx, "enqueueing-last-failed-packet-in")
 			q.packetInQueue <- tracker.failedPacket.(openflow_13.PacketIn)
 		case openflow_13.ChangeEvent:
-			logger.Debug(ctx, "Enqueueing last failed changeEvent")
+			logger.Debug(ctx, "enqueueing-last-failed-change-event")
 			q.changeEventQueue <- tracker.failedPacket.(openflow_13.ChangeEvent)
 		}
 	}
@@ -92,8 +111,9 @@
 // ReceivePacketsIn receives packets from adapter
 func (q *Manager) ReceivePacketsIn(_ *empty.Empty, packetsIn voltha.VolthaService_ReceivePacketsInServer) error {
 	ctx := context.Background()
+	ctx = utils.WithRPCMetadataContext(ctx, "ReceivePacketsIn")
 	var streamingTracker = q.getStreamingTracker(ctx, "ReceivePacketsIn", q.packetInQueueDone)
-	logger.Debugw(ctx, "ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
+	logger.Debugw(ctx, "receive-packets-in-request", log.Fields{"packets-in": packetsIn})
 
 	err := q.flushFailedPackets(ctx, streamingTracker)
 	if err != nil {
@@ -109,6 +129,9 @@
 			})
 			if err := packetsIn.Send(&packet); err != nil {
 				logger.Errorw(ctx, "failed-to-send-packet", log.Fields{"error": err})
+				go q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
+					nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION,
+					nil, time.Now().UnixNano())
 				// save the last failed packet in
 				streamingTracker.failedPacket = packet
 			} else {
@@ -118,7 +141,7 @@
 				}
 			}
 		case <-q.packetInQueueDone:
-			logger.Debug(ctx, "Another ReceivePacketsIn running. Bailing out ...")
+			logger.Debug(ctx, "another-receive-packets-in-running-bailing-out")
 			break loop
 		}
 	}
@@ -128,7 +151,7 @@
 }
 
 func (q *Manager) SendChangeEvent(ctx context.Context, deviceID string, reason openflow_13.OfpPortReason, desc *openflow_13.OfpPort) {
-	logger.Debugw(ctx, "SendChangeEvent", log.Fields{"device-id": deviceID, "reason": reason, "desc": desc})
+	logger.Debugw(ctx, "send-change-event", log.Fields{"device-id": deviceID, "reason": reason, "desc": desc})
 	q.changeEventQueue <- openflow_13.ChangeEvent{
 		Id: deviceID,
 		Event: &openflow_13.ChangeEvent_PortStatus{
@@ -141,8 +164,8 @@
 }
 
 func (q *Manager) SendFlowChangeEvent(ctx context.Context, deviceID string, res []error, xid uint32, flowCookie uint64) {
-	logger.Debugw(ctx, "SendChangeEvent", log.Fields{"device-id": deviceID,
-		"flowId": xid, "flowCookie": flowCookie, "errors": res})
+	logger.Debugw(ctx, "send-change-event", log.Fields{"device-id": deviceID,
+		"flow-id": xid, "flow-cookie": flowCookie, "errors": res})
 	errorType := openflow_13.OfpErrorType_OFPET_FLOW_MOD_FAILED
 	//Manually creating the data payload for the flow error message
 	bs := make([]byte, 2)
@@ -179,8 +202,9 @@
 // ReceiveChangeEvents receives change in events
 func (q *Manager) ReceiveChangeEvents(_ *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
 	ctx := context.Background()
+	ctx = utils.WithRPCMetadataContext(ctx, "ReceiveChangeEvents")
 	var streamingTracker = q.getStreamingTracker(ctx, "ReceiveChangeEvents", q.changeEventQueueDone)
-	logger.Debugw(ctx, "ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
+	logger.Debugw(ctx, "receive-change-events-request", log.Fields{"change-events": changeEvents})
 
 	err := q.flushFailedPackets(ctx, streamingTracker)
 	if err != nil {
@@ -195,7 +219,10 @@
 			logger.Debugw(ctx, "sending-change-event", log.Fields{"event": event})
 			if err := changeEvents.Send(&event); err != nil {
 				logger.Errorw(ctx, "failed-to-send-change-event", log.Fields{"error": err})
-				// save last failed changeevent
+				go q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
+					nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil,
+					time.Now().UnixNano())
+				// save last failed change event
 				streamingTracker.failedPacket = event
 			} else {
 				if streamingTracker.failedPacket != nil {
@@ -204,7 +231,7 @@
 				}
 			}
 		case <-q.changeEventQueueDone:
-			logger.Debug(ctx, "Another ReceiveChangeEvents already running. Bailing out ...")
+			logger.Debug(ctx, "another-receive-change-events-already-running-bailing-out")
 			break loop
 		}
 	}
@@ -215,3 +242,44 @@
 func (q *Manager) GetChangeEventsQueueForTest() <-chan openflow_13.ChangeEvent {
 	return q.changeEventQueue
 }
+
+func (q *RPCEventManager) NewRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string) *voltha.RPCEvent {
+	logger.Debugw(ctx, "new-rpc-event", log.Fields{"resource-id": resourceID})
+	var opID string
+	var rpc string
+
+	if span := opentracing.SpanFromContext(ctx); span != nil {
+		if jSpan, ok := span.(*jtracing.Span); ok {
+			opID = fmt.Sprintf("%016x", jSpan.SpanContext().TraceID().Low) // Using Sprintf to avoid removal of leading 0s
+		}
+	}
+	rpc = utils.GetRPCMetadataFromContext(ctx)
+	rpcev := &voltha.RPCEvent{
+		Rpc:         rpc,
+		OperationId: opID,
+		ResourceId:  resourceID,
+		Service:     q.coreInstanceID,
+		Status: &common.OperationResp{
+			Code: common.OperationResp_OPERATION_FAILURE,
+		},
+		Description: desc,
+		Context:     context,
+	}
+	return rpcev
+}
+
+func (q *RPCEventManager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+	//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
+	if rpcEvent.Rpc != "" {
+		_ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+	}
+}
+
+func (q *RPCEventManager) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
+	id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+	rpcEvent := q.NewRPCEvent(ctx, resourceID, desc, context)
+	//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
+	if rpcEvent.Rpc != "" {
+		_ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+	}
+}
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 6b0b6f7..7ab00da 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -92,7 +92,7 @@
 		return nil
 	}
 
-	logger.Infow(ctx, "starting-logical_device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
+	logger.Infow(ctx, "starting-logical-device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
 
 	var startSucceeded bool
 	defer func() {
@@ -128,13 +128,14 @@
 			logger.Errorw(ctx, "failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
 			return err
 		}
-		logger.Debugw(ctx, "logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
+		logger.Debugw(ctx, "logical-device-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
 
 		agent.logicalDevice = ld
 
 		// Setup the logicalports - internal processing, no need to propagate the client context
 		go func() {
-			err := agent.setupLogicalPorts(log.WithSpanFromContext(context.Background(), ctx))
+			subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+			err := agent.setupLogicalPorts(subCtx)
 			if err != nil {
 				logger.Errorw(ctx, "unable-to-setup-logical-ports", log.Fields{"error": err})
 			}
@@ -169,7 +170,8 @@
 	// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
 	if loadFromDB {
 		go func() {
-			if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+			subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+			if err := agent.buildRoutes(subCtx); err != nil {
 				logger.Warn(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
 			}
 		}()
@@ -183,7 +185,7 @@
 func (agent *LogicalAgent) stop(ctx context.Context) error {
 	var returnErr error
 	agent.stopOnce.Do(func() {
-		logger.Info(ctx, "stopping-logical_device-agent")
+		logger.Info(ctx, "stopping-logical-device-agent")
 
 		if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 			// This should never happen - an error is returned only if the agent is stopped and an agent is only stopped once.
@@ -202,14 +204,14 @@
 		if err := agent.ldProxy.Remove(ctx, agent.logicalDeviceID); err != nil {
 			returnErr = err
 		} else {
-			logger.Debugw(ctx, "logicaldevice-removed", log.Fields{"logical-device-id": agent.logicalDeviceID})
+			logger.Debugw(ctx, "logical-device-removed", log.Fields{"logical-device-id": agent.logicalDeviceID})
 		}
 		// TODO: remove all entries from all loaders
 		// TODO: don't allow any more modifications to flows/groups/meters/ports or to any logical device field
 
 		agent.stopped = true
 
-		logger.Info(ctx, "logical_device-agent-stopped")
+		logger.Info(ctx, "logical-device-agent-stopped")
 	})
 	return returnErr
 }
@@ -224,7 +226,7 @@
 }
 
 func (agent *LogicalAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
-	logger.Debugw(ctx, "send-add-flows-to-device-manager", log.Fields{"logical-device-id": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
+	logger.Debugw(ctx, "send-add-flows-to-device-manager", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-rules": deviceRules, "flow-metadata": flowMetadata})
 
 	responses := make([]coreutils.Response, 0)
 	for deviceID, value := range deviceRules.GetRules() {
@@ -232,6 +234,8 @@
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
 			subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+			subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 			defer cancel()
 			start := time.Now()
 			if err := agent.deviceMgr.addFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
@@ -260,6 +264,8 @@
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
 			subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+			subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 			defer cancel()
 			start := time.Now()
 			if err := agent.deviceMgr.deleteFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
@@ -286,6 +292,8 @@
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
 			subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+			subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 			defer cancel()
 			if err := agent.deviceMgr.updateFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				logger.Errorw(ctx, "flow-update-failed", log.Fields{"device-id": deviceId, "error": err})
@@ -313,6 +321,8 @@
 		logger.Debugw(ctx, "uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
 		go func(uniPort uint32, metadata *voltha.FlowMetadata) {
 			subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+			subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 			defer cancel()
 			if err := agent.deviceMgr.deleteParentFlows(subCtx, agent.rootDeviceID, uniPort, metadata); err != nil {
 				logger.Error(ctx, "flow-delete-failed-from-parent-device", log.Fields{
@@ -331,15 +341,15 @@
 func (agent *LogicalAgent) packetOut(ctx context.Context, packet *ofp.OfpPacketOut) {
 	if logger.V(log.InfoLevel) {
 		logger.Infow(ctx, "packet-out", log.Fields{
-			"packet": hex.EncodeToString(packet.Data),
-			"inPort": packet.GetInPort(),
+			"packet":  hex.EncodeToString(packet.Data),
+			"in-port": packet.GetInPort(),
 		})
 	}
 	outPort := fu.GetPacketOutPort(packet)
 	//frame := packet.GetData()
 	//TODO: Use a channel between the logical agent and the device agent
 	if err := agent.deviceMgr.packetOut(ctx, agent.rootDeviceID, outPort, packet); err != nil {
-		logger.Error(ctx, "packetout-failed", log.Fields{"logical-device-id": agent.rootDeviceID})
+		logger.Error(ctx, "packet-out-failed", log.Fields{"logical-device-id": agent.rootDeviceID})
 	}
 }
 
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 6b8abef..2b88abc 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -21,6 +21,7 @@
 	"errors"
 	"fmt"
 	"strconv"
+	"time"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/rw_core/route"
@@ -48,7 +49,7 @@
 
 //updateFlowTable updates the flow table of that logical device
 func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.FlowTableUpdate) error {
-	logger.Debug(ctx, "UpdateFlowTable")
+	logger.Debug(ctx, "update-flow-table")
 	if flow == nil {
 		return nil
 	}
@@ -72,19 +73,19 @@
 //flowAdd adds a flow to the flow table of that logical device
 func (agent *LogicalAgent) flowAdd(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
 	mod := flowUpdate.FlowMod
-	logger.Debugw(ctx, "flowAdd", log.Fields{"flow": mod})
+	logger.Debugw(ctx, "flow-add", log.Fields{"flow": mod})
 	if mod == nil {
 		return nil
 	}
 	flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
 	if err != nil {
-		logger.Errorw(ctx, "flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
+		logger.Errorw(ctx, "flow-add-failed", log.Fields{"flow-mod": mod, "err": err})
 		return err
 	}
 	var updated bool
 	var changed bool
 	if changed, updated, err = agent.decomposeAndAdd(ctx, flow, flowUpdate); err != nil {
-		logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
+		logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flow-mod": mod, "err": err})
 		return err
 	}
 	if changed && !updated {
@@ -115,7 +116,7 @@
 		// TODO: this currently does nothing
 		if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
 			// TODO: should this error be notified other than being logged?
-			logger.Warnw(ctx, "overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+			logger.Warnw(ctx, "overlapped-flows", log.Fields{"logical-device-id": agent.logicalDeviceID})
 		} else {
 			//	Add flow
 			changed = true
@@ -141,7 +142,7 @@
 
 		flowMeterConfig, err := agent.GetMeterConfig(ctx, updatedFlows)
 		if err != nil {
-			logger.Error(ctx, "Meter-referred-in-flow-not-present")
+			logger.Error(ctx, "meter-referred-in-flow-not-present")
 			return changed, updated, err
 		}
 
@@ -178,8 +179,10 @@
 					"flow":              flow,
 					"groups":            groups,
 				})
+				subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+
 				// Revert added flows
-				if err := agent.revertAddedFlows(log.WithSpanFromContext(context.Background(), ctx), mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
+				if err := agent.revertAddedFlows(subCtx, mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
 					logger.Errorw(ctx, "failure-to-delete-flow-after-failed-addition", log.Fields{
 						"error":             err,
 						"logical-device-id": agent.logicalDeviceID,
@@ -189,6 +192,13 @@
 				}
 				// send event
 				agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
+				context := make(map[string]string)
+				context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+				context["flow-id"] = string(flow.Id)
+				context["device-rules"] = deviceRules.String()
+				go agent.ldeviceMgr.SendRPCEvent(ctx,
+					agent.logicalDeviceID, "failed-to-add-flow", context, "RPC_ERROR_RAISE_EVENT",
+					voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
 			}
 		}()
 	}
@@ -198,7 +208,7 @@
 // revertAddedFlows reverts flows after the flowAdd request has failed.  All flows corresponding to that flowAdd request
 // will be reverted, both from the logical devices and the devices.
 func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
-	logger.Debugw(ctx, "revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
+	logger.Debugw(ctx, "revert-flow-add", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
 
 	flowHandle, have := agent.flowLoader.Lock(addedFlow.Id)
 	if !have {
@@ -243,7 +253,7 @@
 
 //flowDelete deletes a flow from the flow table of that logical device
 func (agent *LogicalAgent) flowDelete(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
-	logger.Debug(ctx, "flowDelete")
+	logger.Debug(ctx, "flow-delete")
 	mod := flowUpdate.FlowMod
 	if mod == nil {
 		return nil
@@ -274,7 +284,7 @@
 
 	//Delete the matched flows
 	if len(toDelete) > 0 {
-		logger.Debugw(ctx, "flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
+		logger.Debugw(ctx, "flow-delete", log.Fields{"logical-device-id": agent.logicalDeviceID, "to-delete": len(toDelete)})
 
 		for _, flow := range toDelete {
 			if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
@@ -299,7 +309,7 @@
 
 		metersConfig, err := agent.GetMeterConfig(ctx, toDelete)
 		if err != nil { // This should never happen
-			logger.Error(ctx, "Meter-referred-in-flows-not-present")
+			logger.Error(ctx, "meter-referred-in-flows-not-present")
 			return err
 		}
 
@@ -336,7 +346,13 @@
 		go func() {
 			// Wait for completion
 			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
-				logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+				context := make(map[string]string)
+				context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+				context["device-rules"] = deviceRules.String()
+				go agent.ldeviceMgr.SendRPCEvent(ctx,
+					agent.logicalDeviceID, "failed-to-update-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+					voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
 				// TODO: Revert the flow deletion
 				// send event, and allow any queued events to be sent as well
 				agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
@@ -350,7 +366,7 @@
 //flowDeleteStrict deletes a flow from the flow table of that logical device
 func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
 	mod := flowUpdate.FlowMod
-	logger.Debugw(ctx, "flowDeleteStrict", log.Fields{"mod": mod})
+	logger.Debugw(ctx, "flow-delete-strict", log.Fields{"mod": mod})
 	if mod == nil {
 		return nil
 	}
@@ -359,10 +375,10 @@
 	if err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
+	logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flow-id": flow.Id})
 	flowHandle, have := agent.flowLoader.Lock(flow.Id)
 	if !have {
-		logger.Debugw(ctx, "Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
+		logger.Debugw(ctx, "skipping-flow-delete-strict-request-no-flow-found", log.Fields{"flow-mod": mod})
 		return nil
 	}
 	defer flowHandle.Unlock()
@@ -421,6 +437,14 @@
 			// TODO: Revert flow changes
 			// send event, and allow any queued events to be sent as well
 			agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
+			context := make(map[string]string)
+			context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+			context["flow-id"] = string(flow.Id)
+			context["device-rules"] = deviceRules.String()
+			// Create context and send extra information as part of it.
+			go agent.ldeviceMgr.SendRPCEvent(ctx,
+				agent.logicalDeviceID, "failed-to-delete-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
 		}
 	}()
 
@@ -448,7 +472,7 @@
 }
 
 func (agent *LogicalAgent) deleteFlowsHavingMeter(ctx context.Context, meterID uint32) error {
-	logger.Infow(ctx, "Delete-flows-matching-meter", log.Fields{"meter": meterID})
+	logger.Infow(ctx, "delete-flows-matching-meter", log.Fields{"meter": meterID})
 	for flowID := range agent.flowLoader.ListIDs() {
 		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
 			if flowMeterID := fu.GetMeterIdFromFlow(flowHandle.GetReadOnly()); flowMeterID != 0 && flowMeterID == meterID {
@@ -466,7 +490,7 @@
 }
 
 func (agent *LogicalAgent) deleteFlowsHavingGroup(ctx context.Context, groupID uint32) (map[uint64]*ofp.OfpFlowStats, error) {
-	logger.Infow(ctx, "Delete-flows-matching-group", log.Fields{"groupID": groupID})
+	logger.Infow(ctx, "delete-flows-matching-group", log.Fields{"group-id": groupID})
 	flowsRemoved := make(map[uint64]*ofp.OfpFlowStats)
 	for flowID := range agent.flowLoader.ListIDs() {
 		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index 7a9f91c..b7bf1b6 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"fmt"
+	"time"
 
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
@@ -44,7 +45,7 @@
 
 //updateGroupTable updates the group table of that logical device
 func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
-	logger.Debug(ctx, "updateGroupTable")
+	logger.Debug(ctx, "update-group-table")
 	if groupMod == nil {
 		return nil
 	}
@@ -64,7 +65,7 @@
 	if groupMod == nil {
 		return nil
 	}
-	logger.Debugw(ctx, "groupAdd", log.Fields{"GroupId": groupMod.GroupId})
+	logger.Debugw(ctx, "group-add", log.Fields{"group-id": groupMod.GroupId})
 
 	groupEntry := fu.GroupEntryFromGroupMod(groupMod)
 
@@ -83,7 +84,7 @@
 	deviceRules := fu.NewDeviceRules()
 	deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
 
-	logger.Debugw(ctx, "rules", log.Fields{"rules for group-add": deviceRules.String()})
+	logger.Debugw(ctx, "rules", log.Fields{"rules-for-group-add": deviceRules.String()})
 
 	// Update the devices
 	respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
@@ -92,6 +93,13 @@
 	go func() {
 		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
 			logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+			context := make(map[string]string)
+			context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+			context["group-id"] = string(groupMod.GroupId)
+			context["device-rules"] = deviceRules.String()
+			go agent.ldeviceMgr.SendRPCEvent(ctx,
+				agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
 			//TODO: Revert flow changes
 		}
 	}()
@@ -99,7 +107,7 @@
 }
 
 func (agent *LogicalAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
-	logger.Debugw(ctx, "groupDelete", log.Fields{"groupMod": groupMod})
+	logger.Debugw(ctx, "group-delete", log.Fields{"group-mod": groupMod})
 	if groupMod == nil {
 		return nil
 	}
@@ -135,7 +143,7 @@
 	}
 
 	if len(affectedGroups) == 0 {
-		logger.Debugw(ctx, "no-group-to-delete", log.Fields{"groupId": groupMod.GroupId})
+		logger.Debugw(ctx, "no-group-to-delete", log.Fields{"group-id": groupMod.GroupId})
 		return nil
 	}
 
@@ -167,6 +175,13 @@
 	go func() {
 		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
 			logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+			context := make(map[string]string)
+			context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+			context["group-id"] = string(groupMod.GroupId)
+			context["device-rules"] = deviceRules.String()
+			go agent.ldeviceMgr.SendRPCEvent(ctx,
+				agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
 			//TODO: Revert flow changes
 		}
 	}()
@@ -174,7 +189,7 @@
 }
 
 func (agent *LogicalAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
-	logger.Debug(ctx, "groupModify")
+	logger.Debug(ctx, "group-modify")
 	if groupMod == nil {
 		return nil
 	}
@@ -199,7 +214,7 @@
 
 	//update KV
 	if err := groupHandle.Update(ctx, groupEntry); err != nil {
-		logger.Errorw(ctx, "Cannot-update-logical-group", log.Fields{"logical-device-id": agent.logicalDeviceID})
+		logger.Errorw(ctx, "cannot-update-logical-group", log.Fields{"logical-device-id": agent.logicalDeviceID})
 		return err
 	}
 
@@ -210,6 +225,13 @@
 	go func() {
 		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
 			logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+			context := make(map[string]string)
+			context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+			context["group-id"] = string(groupMod.GroupId)
+			context["device-rules"] = deviceRules.String()
+			go agent.ldeviceMgr.SendRPCEvent(ctx,
+				agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
 			//TODO: Revert flow changes
 		}
 	}()
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index ab1b82b..05a5108 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -32,7 +32,7 @@
 
 // listLogicalDevicePorts returns logical device ports
 func (agent *LogicalAgent) listLogicalDevicePorts(ctx context.Context) map[uint32]*voltha.LogicalPort {
-	logger.Debug(ctx, "listLogicalDevicePorts")
+	logger.Debug(ctx, "list-logical-device-ports")
 	portIDs := agent.portLoader.ListIDs()
 	ret := make(map[uint32]*voltha.LogicalPort, len(portIDs))
 	for portID := range portIDs {
@@ -45,7 +45,7 @@
 }
 
 func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
-	logger.Debugw(ctx, "updateLogicalPort", log.Fields{"device-id": device.Id, "port": port})
+	logger.Debugw(ctx, "update-logical-port", log.Fields{"device-id": device.Id, "port": port})
 	switch port.Type {
 	case voltha.Port_ETHERNET_NNI:
 		if err := agent.addNNILogicalPort(ctx, device.Id, devicePorts, port); err != nil {
@@ -58,7 +58,9 @@
 	case voltha.Port_PON_OLT:
 		// Rebuilt the routes on Parent PON port addition
 		go func() {
-			if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+			subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+
+			if err := agent.buildRoutes(subCtx); err != nil {
 				// Not an error - temporary state
 				logger.Infow(ctx, "failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
 			}
@@ -67,7 +69,8 @@
 	case voltha.Port_PON_ONU:
 		// Add the routes corresponding to that child device
 		go func() {
-			if err := agent.updateAllRoutes(log.WithSpanFromContext(context.Background(), ctx), device.Id, devicePorts); err != nil {
+			subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+			if err := agent.updateAllRoutes(subCtx, device.Id, devicePorts); err != nil {
 				// Not an error - temporary state
 				logger.Infow(ctx, "failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
 			}
@@ -82,10 +85,10 @@
 // added to it.  While the logical device was being created we could have received requests to add
 // NNI and UNI ports which were discarded.  Now is the time to add them if needed
 func (agent *LogicalAgent) setupLogicalPorts(ctx context.Context) error {
-	logger.Infow(ctx, "setupLogicalPorts", log.Fields{"logical-device-id": agent.logicalDeviceID})
+	logger.Infow(ctx, "setup-logical-ports", log.Fields{"logical-device-id": agent.logicalDeviceID})
 	// First add any NNI ports which could have been missing
 	if err := agent.setupNNILogicalPorts(ctx, agent.rootDeviceID); err != nil {
-		logger.Errorw(ctx, "error-setting-up-NNI-ports", log.Fields{"error": err, "device-id": agent.rootDeviceID})
+		logger.Errorw(ctx, "error-setting-up-nni-ports", log.Fields{"error": err, "device-id": agent.rootDeviceID})
 		return err
 	}
 
@@ -99,21 +102,22 @@
 	for _, child := range children.Items {
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
+		subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
 		go func(ctx context.Context, child *voltha.Device) {
 			defer response.Done()
 
 			childPorts, err := agent.deviceMgr.listDevicePorts(ctx, child.Id)
 			if err != nil {
-				logger.Error(ctx, "setting-up-UNI-ports-failed", log.Fields{"device-id": child.Id})
+				logger.Error(ctx, "setting-up-uni-ports-failed", log.Fields{"device-id": child.Id})
 				response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
 				return
 			}
 
 			if err = agent.setupUNILogicalPorts(ctx, child, childPorts); err != nil {
-				logger.Error(ctx, "setting-up-UNI-ports-failed", log.Fields{"device-id": child.Id})
+				logger.Error(ctx, "setting-up-uni-ports-failed", log.Fields{"device-id": child.Id})
 				response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
 			}
-		}(log.WithSpanFromContext(context.Background(), ctx), child)
+		}(subCtx, child)
 	}
 	// Wait for completion
 	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
@@ -124,7 +128,7 @@
 
 // setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
 func (agent *LogicalAgent) setupNNILogicalPorts(ctx context.Context, deviceID string) error {
-	logger.Infow(ctx, "setupNNILogicalPorts-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
+	logger.Infow(ctx, "setup-nni-logical-ports-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
 	// Build the logical device based on information retrieved from the device adapter
 
 	devicePorts, err := agent.deviceMgr.listDevicePorts(ctx, deviceID)
@@ -137,7 +141,7 @@
 	for _, port := range devicePorts {
 		if port.Type == voltha.Port_ETHERNET_NNI {
 			if err = agent.addNNILogicalPort(ctx, deviceID, devicePorts, port); err != nil {
-				logger.Errorw(ctx, "error-adding-NNI-port", log.Fields{"error": err})
+				logger.Errorw(ctx, "error-adding-nni-port", log.Fields{"error": err})
 			}
 		}
 	}
@@ -146,7 +150,7 @@
 
 // updatePortState updates the port state of the device
 func (agent *LogicalAgent) updatePortState(ctx context.Context, portNo uint32, operStatus voltha.OperStatus_Types) error {
-	logger.Infow(ctx, "updatePortState-start", log.Fields{"logical-device-id": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
+	logger.Infow(ctx, "update-port-state-start", log.Fields{"logical-device-id": agent.logicalDeviceID, "port-no": portNo, "state": operStatus})
 
 	portHandle, have := agent.portLoader.Lock(portNo)
 	if !have {
@@ -179,14 +183,14 @@
 
 // setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
 func (agent *LogicalAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device, childDevicePorts map[uint32]*voltha.Port) error {
-	logger.Infow(ctx, "setupUNILogicalPorts", log.Fields{"logical-device-id": agent.logicalDeviceID})
+	logger.Infow(ctx, "setup-uni-logical-ports", log.Fields{"logical-device-id": agent.logicalDeviceID})
 	// Build the logical device based on information retrieved from the device adapter
 	var err error
 	//Get UNI port number
 	for _, port := range childDevicePorts {
 		if port.Type == voltha.Port_ETHERNET_UNI {
 			if err = agent.addUNILogicalPort(ctx, childDevice.Id, childDevice.AdminState, childDevice.OperStatus, childDevicePorts, port); err != nil {
-				logger.Errorw(ctx, "error-adding-UNI-port", log.Fields{"error": err})
+				logger.Errorw(ctx, "error-adding-uni-port", log.Fields{"error": err})
 			}
 		}
 	}
@@ -195,7 +199,7 @@
 
 // deleteAllLogicalPorts deletes all logical ports associated with this logical device
 func (agent *LogicalAgent) deleteAllLogicalPorts(ctx context.Context) error {
-	logger.Infow(ctx, "updatePortsState-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
+	logger.Infow(ctx, "update-ports-state-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
 
 	// for each port
 	for portID := range agent.portLoader.ListIDs() {
@@ -215,7 +219,8 @@
 
 	// Reset the logical device routes
 	go func() {
-		if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+		subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+		if err := agent.buildRoutes(subCtx); err != nil {
 			logger.Warnw(ctx, "device-routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
 		}
 	}()
@@ -245,7 +250,8 @@
 
 	// Reset the logical device routes
 	go func() {
-		if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+		subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+		if err := agent.buildRoutes(subCtx); err != nil {
 			logger.Warnw(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
 		}
 	}()
@@ -301,7 +307,7 @@
 // (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
 // scenario. This also applies to the case where the port was already added.
 func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, deviceID string, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
-	logger.Debugw(ctx, "addNNILogicalPort", log.Fields{"logical-device-id": agent.logicalDeviceID, "nni-port": port})
+	logger.Debugw(ctx, "add-nni-logical-port", log.Fields{"logical-device-id": agent.logicalDeviceID, "nni-port": port})
 
 	label := fmt.Sprintf("nni-%d", port.PortNo)
 	ofpPort := *port.OfpPort
@@ -334,7 +340,8 @@
 	// Setup the routes for this device and then send the port update event to the OF Controller
 	go func() {
 		// First setup the routes
-		if err := agent.updateRoutes(log.WithSpanFromContext(context.Background(), ctx), deviceID, devicePorts, nniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
+		subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+		if err := agent.updateRoutes(subCtx, deviceID, devicePorts, nniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
 			// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
 			// created yet.
 			logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": nniPort.OfpPort.PortNo, "error": err})
@@ -351,7 +358,7 @@
 // (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
 // scenario. This also applies to the case where the port was already added.
 func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, deviceID string, deviceAdminState voltha.AdminState_Types, deviceOperStatus voltha.OperStatus_Types, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
-	logger.Debugw(ctx, "addUNILogicalPort", log.Fields{"port": port})
+	logger.Debugw(ctx, "add-uni-logical-port", log.Fields{"port": port})
 	if deviceAdminState != voltha.AdminState_ENABLED || deviceOperStatus != voltha.OperStatus_ACTIVE {
 		logger.Infow(ctx, "device-not-ready", log.Fields{"device-id": deviceID, "admin": deviceAdminState, "oper": deviceOperStatus})
 		return nil
@@ -384,16 +391,16 @@
 
 	// Setup the routes for this device and then send the port update event to the OF Controller
 	go func() {
-		ctx = log.WithSpanFromContext(context.Background(), ctx)
+		subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
 		// First setup the routes
-		if err := agent.updateRoutes(ctx, deviceID, devicePorts, uniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
+		if err := agent.updateRoutes(subCtx, deviceID, devicePorts, uniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
 			// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
 			// created yet.
 			logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": uniPort.OfpPort.PortNo, "error": err})
 		}
 
 		// send event, and allow any queued events to be sent as well
-		queuePosition.send(ctx, agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, uniPort.OfpPort)
+		queuePosition.send(subCtx, agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, uniPort.OfpPort)
 	}()
 	return nil
 }
@@ -414,7 +421,8 @@
 // send is a convenience to avoid calling both assignQueuePosition and qp.send
 func (e *orderedEvents) send(ctx context.Context, agent *LogicalAgent, deviceID string, reason ofp.OfpPortReason, desc *ofp.OfpPort) {
 	qp := e.assignQueuePosition()
-	go qp.send(log.WithSpanFromContext(context.Background(), ctx), agent, deviceID, reason, desc)
+	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+	go qp.send(subCtx, agent, deviceID, reason, desc)
 }
 
 // sendCompletion will make sure that given channel is notified when queue is empty
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index dcd6691..71ecf26 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -30,6 +30,7 @@
 	tst "github.com/opencord/voltha-go/rw_core/test"
 	com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v4/pkg/db"
+	"github.com/opencord/voltha-lib-go/v4/pkg/events"
 	fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	mock_etcd "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
@@ -46,6 +47,7 @@
 	kmp              kafka.InterContainerProxy
 	logicalDeviceMgr *LogicalManager
 	kClient          kafka.Client
+	kEventClient     kafka.Client
 	kvClientPort     int
 	oltAdapterName   string
 	onuAdapterName   string
@@ -68,6 +70,7 @@
 	}
 	// Create the kafka client
 	test.kClient = mock_kafka.NewKafkaClient()
+	test.kEventClient = mock_kafka.NewKafkaClient()
 	test.oltAdapterName = "olt_adapter_mock"
 	test.onuAdapterName = "onu_adapter_mock"
 	test.coreInstanceID = "rw-da-test"
@@ -135,6 +138,7 @@
 func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
 	cfg := config.NewRWCoreFlags()
 	cfg.CoreTopic = "rw_core"
+	cfg.EventTopic = "voltha.events"
 	cfg.DefaultRequestTimeout = lda.defaultTimeout
 	cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
 	grpcPort, err := freeport.GetFreePort()
@@ -157,8 +161,8 @@
 	endpointMgr := kafka.NewEndpointManager(backend)
 	proxy := model.NewDBPath(backend)
 	adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
-
-	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
+	eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
 	if err = lda.kmp.Start(ctx); err != nil {
 		logger.Fatal(ctx, "Cannot start InterContainerProxy")
 	}
@@ -175,6 +179,9 @@
 	if lda.etcdServer != nil {
 		tst.StopEmbeddedEtcdServer(ctx, lda.etcdServer)
 	}
+	if lda.kEventClient != nil {
+		lda.kEventClient.Stop(ctx)
+	}
 }
 
 func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 2fca7c5..9ea12fa 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -67,7 +67,7 @@
 			// This can happen when an agent for the logical device has been created but the logical device
 			// itself is not ready for action as it is waiting for switch and port capabilities from the
 			// relevant adapter.  In such a case prevent any request aimed at that logical device.
-			logger.Debugf(ctx, "Logical device %s is not ready to serve requests", logicalDeviceID)
+			logger.Debugf(ctx, "logical-device-%s-is-not-ready-to-serve-requests", logicalDeviceID)
 			return nil
 		}
 		return lda
@@ -88,7 +88,8 @@
 // GetLogicalDevice provides a cloned most up to date logical device.  If device is not in memory
 // it will be fetched from the dB
 func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
-	logger.Debugw(ctx, "getlogicalDevice", log.Fields{"logical-device-id": id})
+	ctx = utils.WithRPCMetadataContext(ctx, "GetLogicalDevice")
+	logger.Debugw(ctx, "get-logical-device", log.Fields{"logical-device-id": id})
 	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
 		return agent.GetLogicalDeviceReadOnly(ctx)
 	}
@@ -97,7 +98,8 @@
 
 //ListLogicalDevices returns the list of all logical devices
 func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context, _ *empty.Empty) (*voltha.LogicalDevices, error) {
-	logger.Debug(ctx, "ListAllLogicalDevices")
+	ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDevices")
+	logger.Debug(ctx, "list-all-logical-devices")
 
 	var logicalDevices []*voltha.LogicalDevice
 	if err := ldMgr.ldProxy.List(ctx, &logicalDevices); err != nil {
@@ -139,7 +141,8 @@
 	go func() {
 		//TODO: either wait for the agent to be started before returning, or
 		//      implement locks in the agent to ensure request are not processed before start() is complete
-		err := agent.start(log.WithSpanFromContext(context.Background(), ctx), false)
+		ldCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+		err := agent.start(ldCtx, false)
 		if err != nil {
 			logger.Errorw(ctx, "unable-to-create-the-logical-device", log.Fields{"error": err})
 			ldMgr.deleteLogicalDeviceAgent(id)
@@ -205,7 +208,7 @@
 				}
 				ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
 			} else {
-				logger.Debugw(ctx, "logicalDevice not in model", log.Fields{"lDeviceId": lDeviceID})
+				logger.Debugw(ctx, "logical-device-not-in-model", log.Fields{"logical-device-id": lDeviceID})
 			}
 			// announce completion of task to any number of waiting channels
 			ldMgr.logicalDevicesLoadingLock.Lock()
@@ -281,7 +284,8 @@
 
 // ListLogicalDeviceFlows returns the flows of logical device
 func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
-	logger.Debugw(ctx, "ListLogicalDeviceFlows", log.Fields{"logical-device-id": id.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceFlows")
+	logger.Debugw(ctx, "list-logical-device-flows", log.Fields{"logical-device-id": id.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -298,7 +302,8 @@
 
 // ListLogicalDeviceFlowGroups returns logical device flow groups
 func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
-	logger.Debugw(ctx, "ListLogicalDeviceFlowGroups", log.Fields{"logical-device-id": id.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceFlowGroups")
+	logger.Debugw(ctx, "list-logical-device-flow-groups", log.Fields{"logical-device-id": id.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -315,7 +320,8 @@
 
 // ListLogicalDevicePorts returns logical device ports
 func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
-	logger.Debugw(ctx, "ListLogicalDevicePorts", log.Fields{"logical-device-id": id.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDevicePorts")
+	logger.Debugw(ctx, "list-logical-device-ports", log.Fields{"logical-device-id": id.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -332,6 +338,7 @@
 
 // GetLogicalDevicePort returns logical device port details
 func (ldMgr *LogicalManager) GetLogicalDevicePort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "GetLogicalDevicePort")
 	// Get the logical device where this port is attached
 	agent := ldMgr.getLogicalDeviceAgent(ctx, lPortID.Id)
 	if agent == nil {
@@ -382,7 +389,7 @@
 }
 
 func (ldMgr *LogicalManager) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device, childDevicePorts map[uint32]*voltha.Port) error {
-	logger.Debugw(ctx, "setupUNILogicalPorts", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId, "current-data": childDevice})
+	logger.Debugw(ctx, "setup-uni-logical-ports", log.Fields{"child-device-id": childDevice.Id, "parent-device-id": childDevice.ParentId, "current-data": childDevice})
 	// Sanity check
 	if childDevice.Root {
 		return errors.New("Device-root")
@@ -392,7 +399,7 @@
 	parentID := childDevice.ParentId
 	logDeviceID := ldMgr.deviceMgr.GetParentDeviceID(ctx, parentID)
 
-	logger.Debugw(ctx, "setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceID, "parentId": parentID})
+	logger.Debugw(ctx, "setup-uni-logical-ports", log.Fields{"logical-device-id": logDeviceID, "parentId": parentID})
 
 	if parentID == "" || logDeviceID == "" {
 		return errors.New("device-in-invalid-state")
@@ -407,7 +414,7 @@
 }
 
 func (ldMgr *LogicalManager) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
-	logger.Debugw(ctx, "deleteAllLogicalPorts", log.Fields{"device-id": device.Id})
+	logger.Debugw(ctx, "delete-all-logical-ports", log.Fields{"device-id": device.Id})
 
 	var ldID *string
 	var err error
@@ -425,7 +432,7 @@
 }
 
 func (ldMgr *LogicalManager) updatePortState(ctx context.Context, deviceID string, portNo uint32, state voltha.OperStatus_Types) error {
-	logger.Debugw(ctx, "updatePortState", log.Fields{"device-id": deviceID, "state": state, "portNo": portNo})
+	logger.Debugw(ctx, "update-Port-state", log.Fields{"device-id": deviceID, "state": state, "port-no": portNo})
 
 	var ldID *string
 	var err error
@@ -444,7 +451,8 @@
 
 // UpdateLogicalDeviceFlowTable updates logical device flow table
 func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
-	logger.Debugw(ctx, "UpdateLogicalDeviceFlowTable", log.Fields{"logical-device-id": flow.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceFlowTable")
+	logger.Debugw(ctx, "update-logical-device-flow-table", log.Fields{"logical-device-id": flow.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
@@ -454,7 +462,8 @@
 
 // UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response
 func (ldMgr *LogicalManager) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
-	logger.Debugw(ctx, "UpdateLogicalDeviceMeterTable", log.Fields{"logical-device-id": meter.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceMeterTable")
+	logger.Debugw(ctx, "update-logical-device-meter-table", log.Fields{"logical-device-id": meter.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, meter.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", meter.Id)
@@ -464,7 +473,8 @@
 
 // ListLogicalDeviceMeters returns logical device meters
 func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
-	logger.Debugw(ctx, "ListLogicalDeviceMeters", log.Fields{"logical-device-id": id.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceMeters")
+	logger.Debugw(ctx, "list-logical-device-meters", log.Fields{"logical-device-id": id.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -480,7 +490,8 @@
 
 // UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
 func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
-	logger.Debugw(ctx, "UpdateGroupTable", log.Fields{"logical-device-id": flow.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceFlowGroupTable")
+	logger.Debugw(ctx, "update-group-table", log.Fields{"logical-device-id": flow.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
@@ -490,7 +501,8 @@
 
 // EnableLogicalDevicePort enables logical device port
 func (ldMgr *LogicalManager) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
-	logger.Debugw(ctx, "EnableLogicalDevicePort", log.Fields{"logical-device-id": id})
+	ctx = utils.WithRPCMetadataContext(ctx, "EnableLogicalDevicePort")
+	logger.Debugw(ctx, "enable-logical-device-port", log.Fields{"logical-device-id": id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -504,7 +516,8 @@
 
 // DisableLogicalDevicePort disables logical device port
 func (ldMgr *LogicalManager) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
-	logger.Debugw(ctx, "DisableLogicalDevicePort", log.Fields{"logical-device-id": id})
+	ctx = utils.WithRPCMetadataContext(ctx, "DisableLogicalDevicePort")
+	logger.Debugw(ctx, "disable-logical-device-port", log.Fields{"logical-device-id": id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -517,7 +530,7 @@
 }
 
 func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
-	logger.Debugw(ctx, "packetIn", log.Fields{"logical-device-id": logicalDeviceID, "port": port})
+	logger.Debugw(ctx, "packet-in", log.Fields{"logical-device-id": logicalDeviceID, "port": port})
 	if agent := ldMgr.getLogicalDeviceAgent(ctx, logicalDeviceID); agent != nil {
 		agent.packetIn(ctx, port, transactionID, packet)
 	} else {
@@ -529,35 +542,45 @@
 // StreamPacketsOut sends packets to adapter
 func (ldMgr *LogicalManager) StreamPacketsOut(packets voltha.VolthaService_StreamPacketsOutServer) error {
 	ctx := context.Background()
-	logger.Debugw(ctx, "StreamPacketsOut-request", log.Fields{"packets": packets})
+	logger.Debugw(ctx, "stream-packets-out-request", log.Fields{"packets": packets})
+
 loop:
 	for {
 		select {
 		case <-packets.Context().Done():
-			logger.Infow(ctx, "StreamPacketsOut-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
+			logger.Infow(ctx, "stream-packets-out-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
 			break loop
 		default:
 		}
 
 		packet, err := packets.Recv()
 
+		pktCtx := utils.WithRPCMetadataContext(packets.Context(), "StreamPacketsOut")
+
 		if err == io.EOF {
-			logger.Debugw(ctx, "Received-EOF", log.Fields{"packets": packets})
+			logger.Debugw(ctx, "received-eof", log.Fields{"packets": packets})
 			break loop
 		}
-
 		if err != nil {
-			logger.Errorw(ctx, "Failed to receive packet out", log.Fields{"error": err})
+			logger.Errorw(ctx, "failed-to-receive-packet-out", log.Fields{"error": err})
+			go ldMgr.SendRPCEvent(pktCtx, packet.Id, err.Error(), nil,
+				"RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
 			continue
 		}
 
-		if agent := ldMgr.getLogicalDeviceAgent(packets.Context(), packet.Id); agent != nil {
-			agent.packetOut(packets.Context(), packet.PacketOut)
+		if agent := ldMgr.getLogicalDeviceAgent(pktCtx, packet.Id); agent != nil {
+			agent.packetOut(pktCtx, packet.PacketOut)
 		} else {
-			logger.Errorf(ctx, "No logical device agent present", log.Fields{"logical-device-id": packet.Id})
+			logger.Errorf(ctx, "no-logical-device-agent-present", log.Fields{"logical-device-id": packet.Id})
 		}
 	}
 
-	logger.Debugw(ctx, "StreamPacketsOut-request-done", log.Fields{"packets": packets})
+	logger.Debugw(ctx, "stream-packets-out-request-done", log.Fields{"packets": packets})
 	return nil
 }
+
+func (ldMgr *LogicalManager) SendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
+	id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+	ldMgr.Manager.RPCEventManager.GetAndSendRPCEvent(ctx, resourceID, desc, context, id,
+		category, subCategory, raisedTs)
+}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index f92f2e5..cbc6bb9 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -29,6 +29,7 @@
 	"github.com/opencord/voltha-go/rw_core/core/device/remote"
 	"github.com/opencord/voltha-go/rw_core/core/device/state"
 	"github.com/opencord/voltha-go/rw_core/utils"
+	"github.com/opencord/voltha-lib-go/v4/pkg/events"
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
 	"github.com/opencord/voltha-protos/v4/go/common"
@@ -42,10 +43,11 @@
 
 // Manager represent device manager attributes
 type Manager struct {
-	deviceAgents            sync.Map
-	rootDevices             map[string]bool
-	lockRootDeviceMap       sync.RWMutex
-	adapterProxy            *remote.AdapterProxy
+	deviceAgents      sync.Map
+	rootDevices       map[string]bool
+	lockRootDeviceMap sync.RWMutex
+	adapterProxy      *remote.AdapterProxy
+	*event.RPCEventManager
 	adapterMgr              *adapter.Manager
 	logicalDeviceMgr        *LogicalManager
 	kafkaICProxy            kafka.InterContainerProxy
@@ -59,7 +61,7 @@
 }
 
 //NewManagers creates the Manager and the Logical Manager.
-func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration, eventProxy *events.EventProxy) (*Manager, *LogicalManager) {
 	deviceMgr := &Manager{
 		rootDevices:             make(map[string]bool),
 		kafkaICProxy:            kmp,
@@ -69,12 +71,13 @@
 		dProxy:                  dbPath.Proxy("devices"),
 		adapterMgr:              adapterMgr,
 		defaultTimeout:          defaultCoreTimeout,
+		RPCEventManager:         event.NewRPCEventManager(eventProxy, coreInstanceID),
 		deviceLoadingInProgress: make(map[string][]chan int),
 	}
 	deviceMgr.stateTransitions = state.NewTransitionMap(deviceMgr)
 
 	logicalDeviceMgr := &LogicalManager{
-		Manager:                        event.NewManager(),
+		Manager:                        event.NewManager(eventProxy, coreInstanceID),
 		deviceMgr:                      deviceMgr,
 		kafkaICProxy:                   kmp,
 		dbPath:                         dbPath,
@@ -141,21 +144,22 @@
 // CreateDevice creates a new parent device in the data model
 func (dMgr *Manager) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
 	if device.MacAddress == "" && device.GetHostAndPort() == "" {
-		logger.Errorf(ctx, "No Device Info Present")
+		logger.Errorf(ctx, "no-device-info-present")
 		return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
 	}
+	ctx = utils.WithRPCMetadataContext(ctx, "CreateDevice")
 	logger.Debugw(ctx, "create-device", log.Fields{"device": *device})
 
 	deviceExist, err := dMgr.isParentDeviceExist(ctx, device)
 	if err != nil {
-		logger.Errorf(ctx, "Failed to fetch parent device info")
+		logger.Errorf(ctx, "failed-to-fetch-parent-device-info")
 		return nil, err
 	}
 	if deviceExist {
-		logger.Errorf(ctx, "Device is Pre-provisioned already with same IP-Port or MAC Address")
+		logger.Errorf(ctx, "device-is-pre-provisioned-already-with-same-ip-port-or-mac-address")
 		return nil, errors.New("device is already pre-provisioned")
 	}
-	logger.Debugw(ctx, "CreateDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
+	logger.Debugw(ctx, "create-device", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
 
 	// Ensure this device is set as root
 	device.Root = true
@@ -163,7 +167,7 @@
 	agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
 	device, err = agent.start(ctx, device)
 	if err != nil {
-		logger.Errorw(ctx, "Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
+		logger.Errorw(ctx, "fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
 		return nil, err
 	}
 	dMgr.addDeviceAgentToMap(agent)
@@ -172,9 +176,9 @@
 
 // EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
 func (dMgr *Manager) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "EnableDevice")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
-	logger.Debugw(ctx, "EnableDevice", log.Fields{"device-id": id.Id})
+	logger.Debugw(ctx, "enable-device", log.Fields{"device-id": id.Id})
 	agent := dMgr.getDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -184,9 +188,9 @@
 
 // DisableDevice disables a device along with any child device it may have
 func (dMgr *Manager) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "DisableDevice")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
-	logger.Debugw(ctx, "DisableDevice", log.Fields{"device-id": id.Id})
+	logger.Debugw(ctx, "disable-device", log.Fields{"device-id": id.Id})
 	agent := dMgr.getDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -196,9 +200,9 @@
 
 //RebootDevice invoked the reboot API to the corresponding adapter
 func (dMgr *Manager) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "RebootDevice")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
-	logger.Debugw(ctx, "RebootDevice", log.Fields{"device-id": id.Id})
+	logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": id.Id})
 	agent := dMgr.getDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -208,8 +212,9 @@
 
 // DeleteDevice removes a device from the data model
 func (dMgr *Manager) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "DeleteDevice")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-	logger.Debugw(ctx, "DeleteDevice", log.Fields{"device-id": id.Id})
+	logger.Debugw(ctx, "delete-device", log.Fields{"device-id": id.Id})
 	agent := dMgr.getDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -219,8 +224,9 @@
 
 // ForceDeleteDevice removes a device from the data model forcefully without successfully waiting for the adapters.
 func (dMgr *Manager) ForceDeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "ForceDeleteDevice")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-	logger.Debugw(ctx, "ForceDeleteDevice", log.Fields{"device-id": id.Id})
+	logger.Debugw(ctx, "force-delete-device", log.Fields{"device-id": id.Id})
 	agent := dMgr.getDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -230,7 +236,7 @@
 
 // GetDevicePort returns the port details for a specific device port entry
 func (dMgr *Manager) GetDevicePort(ctx context.Context, deviceID string, portID uint32) (*voltha.Port, error) {
-	logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": deviceID})
+	logger.Debugw(ctx, "get-device-port", log.Fields{"device-id": deviceID})
 	agent := dMgr.getDeviceAgent(ctx, deviceID)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "device-%s", deviceID)
@@ -240,9 +246,9 @@
 
 // ListDevicePorts returns the ports details for a specific device entry
 func (dMgr *Manager) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "ListDevicePorts")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
-	logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": id.Id})
+	logger.Debugw(ctx, "list-device-ports", log.Fields{"device-id": id.Id})
 	agent := dMgr.getDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
@@ -259,9 +265,9 @@
 
 // ListDeviceFlows returns the flow details for a specific device entry
 func (dMgr *Manager) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*ofp.Flows, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceFlows")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
-	logger.Debugw(ctx, "ListDeviceFlows", log.Fields{"device-id": id.Id})
+	logger.Debugw(ctx, "list-device-flows", log.Fields{"device-id": id.Id})
 	agent := dMgr.getDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
@@ -278,9 +284,9 @@
 
 // ListDeviceFlowGroups returns the flow group details for a specific device entry
 func (dMgr *Manager) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceFlowGroups")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
-	logger.Debugw(ctx, "ListDeviceFlowGroups", log.Fields{"device-id": id.Id})
+	logger.Debugw(ctx, "list-device-flow-groups", log.Fields{"device-id": id.Id})
 	agent := dMgr.getDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
@@ -298,7 +304,7 @@
 // This function is called only in the Core that does not own this device.  In the Core that owns this device then a
 // deletion deletion also includes removal of any reference of this device.
 func (dMgr *Manager) stopManagingDevice(ctx context.Context, id string) {
-	logger.Infow(ctx, "stopManagingDevice", log.Fields{"device-id": id})
+	logger.Infow(ctx, "stop-managing-device", log.Fields{"device-id": id})
 	if dMgr.IsDeviceInCache(id) { // Proceed only if an agent is present for this device
 		if device, err := dMgr.getDeviceReadOnly(ctx, id); err == nil && device.Root {
 			// stop managing the logical device
@@ -315,7 +321,7 @@
 
 // RunPostDeviceDelete removes any reference of this device
 func (dMgr *Manager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
-	logger.Infow(ctx, "RunPostDeviceDelete", log.Fields{"device-id": cDevice.Id})
+	logger.Infow(ctx, "run-post-device-delete", log.Fields{"device-id": cDevice.Id})
 	dMgr.stopManagingDevice(ctx, cDevice.Id)
 	return nil
 }
@@ -323,13 +329,14 @@
 // GetDevice exists primarily to implement the gRPC interface.
 // Internal functions should call getDeviceReadOnly instead.
 func (dMgr *Manager) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "GetDevice")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
 	return dMgr.getDeviceReadOnly(ctx, id.Id)
 }
 
 // getDeviceReadOnly will returns a device, either from memory or from the dB, if present
 func (dMgr *Manager) getDeviceReadOnly(ctx context.Context, id string) (*voltha.Device, error) {
-	logger.Debugw(ctx, "getDeviceReadOnly", log.Fields{"device-id": id})
+	logger.Debugw(ctx, "get-device-read-only", log.Fields{"device-id": id})
 	if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
 		return agent.getDeviceReadOnly(ctx)
 	}
@@ -337,7 +344,7 @@
 }
 
 func (dMgr *Manager) listDevicePorts(ctx context.Context, id string) (map[uint32]*voltha.Port, error) {
-	logger.Debugw(ctx, "listDevicePorts", log.Fields{"device-id": id})
+	logger.Debugw(ctx, "list-device-ports", log.Fields{"device-id": id})
 	agent := dMgr.getDeviceAgent(ctx, id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id)
@@ -347,8 +354,8 @@
 
 // GetChildDevice will return a device, either from memory or from the dB, if present
 func (dMgr *Manager) GetChildDevice(ctx context.Context, parentDeviceID string, serialNumber string, onuID int64, parentPortNo int64) (*voltha.Device, error) {
-	logger.Debugw(ctx, "GetChildDevice", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber,
-		"parentPortNo": parentPortNo, "onuId": onuID})
+	logger.Debugw(ctx, "get-child-device", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber,
+		"parent-port-no": parentPortNo, "onu-id": onuID})
 
 	parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID)
 	if err != nil {
@@ -356,7 +363,7 @@
 	}
 	childDeviceIds := dMgr.getAllChildDeviceIds(ctx, parentDevicePorts)
 	if len(childDeviceIds) == 0 {
-		logger.Debugw(ctx, "no-child-devices", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber, "onuId": onuID})
+		logger.Debugw(ctx, "no-child-devices", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber, "onu-id": onuID})
 		return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
 	}
 
@@ -368,14 +375,14 @@
 			foundOnuID := false
 			if searchDevice.ProxyAddress.OnuId == uint32(onuID) {
 				if searchDevice.ParentPortNo == uint32(parentPortNo) {
-					logger.Debugw(ctx, "found-child-by-onuid", log.Fields{"parent-device-id": parentDeviceID, "onuId": onuID})
+					logger.Debugw(ctx, "found-child-by-onuid", log.Fields{"parent-device-id": parentDeviceID, "onu-id": onuID})
 					foundOnuID = true
 				}
 			}
 
 			foundSerialNumber := false
 			if searchDevice.SerialNumber == serialNumber {
-				logger.Debugw(ctx, "found-child-by-serialnumber", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber})
+				logger.Debugw(ctx, "found-child-by-serial-number", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber})
 				foundSerialNumber = true
 			}
 
@@ -395,18 +402,18 @@
 	}
 
 	if foundChildDevice != nil {
-		logger.Debugw(ctx, "child-device-found", log.Fields{"parent-device-id": parentDeviceID, "foundChildDevice": foundChildDevice})
+		logger.Debugw(ctx, "child-device-found", log.Fields{"parent-device-id": parentDeviceID, "found-child-device": foundChildDevice})
 		return foundChildDevice, nil
 	}
 
 	logger.Debugw(ctx, "child-device-not-found", log.Fields{"parent-device-id": parentDeviceID,
-		"serialNumber": serialNumber, "onuId": onuID, "parentPortNo": parentPortNo})
+		"serial-number": serialNumber, "onu-id": onuID, "parent-port-no": parentPortNo})
 	return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
 }
 
 // GetChildDeviceWithProxyAddress will return a device based on proxy address
 func (dMgr *Manager) GetChildDeviceWithProxyAddress(ctx context.Context, proxyAddress *voltha.Device_ProxyAddress) (*voltha.Device, error) {
-	logger.Debugw(ctx, "GetChildDeviceWithProxyAddress", log.Fields{"proxyAddress": proxyAddress})
+	logger.Debugw(ctx, "get-child-device-with-proxy-address", log.Fields{"proxy-address": proxyAddress})
 
 	parentDevicePorts, err := dMgr.listDevicePorts(ctx, proxyAddress.DeviceId)
 	if err != nil {
@@ -429,11 +436,11 @@
 	}
 
 	if foundChildDevice != nil {
-		logger.Debugw(ctx, "child-device-found", log.Fields{"proxyAddress": proxyAddress})
+		logger.Debugw(ctx, "child-device-found", log.Fields{"proxy-address": proxyAddress})
 		return foundChildDevice, nil
 	}
 
-	logger.Warnw(ctx, "child-device-not-found", log.Fields{"proxyAddress": proxyAddress})
+	logger.Warnw(ctx, "child-device-not-found", log.Fields{"proxy-address": proxyAddress})
 	return nil, status.Errorf(codes.NotFound, "%s", proxyAddress)
 }
 
@@ -445,7 +452,8 @@
 
 // ListDevices retrieves the latest devices from the data model
 func (dMgr *Manager) ListDevices(ctx context.Context, _ *empty.Empty) (*voltha.Devices, error) {
-	logger.Debug(ctx, "ListDevices")
+	ctx = utils.WithRPCMetadataContext(ctx, "ListDevices")
+	logger.Debug(ctx, "list-devices")
 	result := &voltha.Devices{}
 
 	var devices []*voltha.Device
@@ -467,7 +475,7 @@
 		}
 		result.Items = append(result.Items, device)
 	}
-	logger.Debugw(ctx, "ListDevices-end", log.Fields{"len": len(result.Items)})
+	logger.Debugw(ctx, "list-devices-end", log.Fields{"len": len(result.Items)})
 	return result, nil
 }
 
@@ -476,7 +484,7 @@
 	hostPort := newDevice.GetHostAndPort()
 	var devices []*voltha.Device
 	if err := dMgr.dProxy.List(ctx, &devices); err != nil {
-		logger.Errorw(ctx, "Failed to list devices from cluster data proxy", log.Fields{"error": err})
+		logger.Errorw(ctx, "failed-to-list-devices-from-cluster-data-proxy", log.Fields{"error": err})
 		return false, err
 	}
 	for _, device := range devices {
@@ -524,12 +532,12 @@
 				logger.Debugw(ctx, "loading-device", log.Fields{"device-id": deviceID})
 				agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
 				if _, err = agent.start(ctx, nil); err != nil {
-					logger.Warnw(ctx, "Failure loading device", log.Fields{"device-id": deviceID, "error": err})
+					logger.Warnw(ctx, "failure-loading-device", log.Fields{"device-id": deviceID, "error": err})
 				} else {
 					dMgr.addDeviceAgentToMap(agent)
 				}
 			} else {
-				logger.Debugw(ctx, "Device not in model", log.Fields{"device-id": deviceID})
+				logger.Debugw(ctx, "device-is-not-in-model", log.Fields{"device-id": deviceID})
 			}
 			// announce completion of task to any number of waiting channels
 			dMgr.devicesLoadingLock.Lock()
@@ -577,7 +585,7 @@
 				return err
 			}
 		}
-		logger.Debugw(ctx, "loaded-children", log.Fields{"device-id": device.Id, "numChildren": len(childDeviceIds)})
+		logger.Debugw(ctx, "loaded-children", log.Fields{"device-id": device.Id, "num-children": len(childDeviceIds)})
 	}
 	return nil
 }
@@ -626,7 +634,8 @@
 
 // ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
 func (dMgr *Manager) ListDeviceIds(ctx context.Context, _ *empty.Empty) (*voltha.IDs, error) {
-	logger.Debug(ctx, "ListDeviceIDs")
+	ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceIds")
+	logger.Debug(ctx, "list-device-ids")
 	// Report only device IDs that are in the device agent map
 	return dMgr.listDeviceIdsFromMap(), nil
 }
@@ -634,7 +643,8 @@
 // ReconcileDevices is a request to a voltha core to update its list of managed devices.  This will
 // trigger loading the devices along with their children and parent in memory
 func (dMgr *Manager) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
-	logger.Debugw(ctx, "ReconcileDevices", log.Fields{"numDevices": len(ids.Items)})
+	ctx = utils.WithRPCMetadataContext(ctx, "ReconcileDevices")
+	logger.Debugw(ctx, "reconcile-devices", log.Fields{"num-devices": len(ids.Items)})
 	if ids != nil && len(ids.Items) != 0 {
 		toReconcile := len(ids.Items)
 		reconciled := 0
@@ -668,12 +678,12 @@
 
 // adapterRestarted is invoked whenever an adapter is restarted
 func (dMgr *Manager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
-	logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
-		"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
+	logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapter-id": adapter.Id, "vendor": adapter.Vendor,
+		"current-replica": adapter.CurrentReplica, "total-replicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
 
 	// Let's reconcile the device managed by this Core only
 	if len(dMgr.rootDevices) == 0 {
-		logger.Debugw(ctx, "nothing-to-reconcile", log.Fields{"adapterId": adapter.Id})
+		logger.Debugw(ctx, "nothing-to-reconcile", log.Fields{"adapter-id": adapter.Id})
 		return nil
 	}
 
@@ -683,7 +693,7 @@
 		if dAgent == nil {
 			continue
 		}
-		logger.Debugw(ctx, "checking-adapter-type", log.Fields{"agentType": dAgent.deviceType, "adapterType": adapter.Type})
+		logger.Debugw(ctx, "checking-adapter-type", log.Fields{"agentType": dAgent.deviceType, "adapter-type": adapter.Type})
 		if dAgent.deviceType == adapter.Type {
 			rootDevice, _ := dAgent.getDeviceReadOnly(ctx)
 			if rootDevice == nil {
@@ -691,7 +701,7 @@
 			}
 			isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, rootDeviceID, adapter.Type, adapter.CurrentReplica)
 			if err != nil {
-				logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "root-device-id": rootDeviceID, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
+				logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "root-device-id": rootDeviceID, "adapter-type": adapter.Type, "replica-number": adapter.CurrentReplica})
 				continue
 			}
 			if isDeviceOwnedByService {
@@ -709,7 +719,7 @@
 						if childDevice, _ := dMgr.getDeviceFromModel(ctx, peer.DeviceId); childDevice != nil {
 							isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, childDevice.Id, adapter.Type, adapter.CurrentReplica)
 							if err != nil {
-								logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
+								logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapter-type": adapter.Type, "replica-number": adapter.CurrentReplica})
 							}
 							if isDeviceOwnedByService {
 								if dMgr.isOkToReconcile(ctx, childDevice) {
@@ -735,7 +745,7 @@
 			return status.Errorf(codes.Aborted, "errors-%s", res)
 		}
 	} else {
-		logger.Debugw(ctx, "no-managed-device-to-reconcile", log.Fields{"adapterId": adapter.Id})
+		logger.Debugw(ctx, "no-managed-device-to-reconcile", log.Fields{"adapter-id": adapter.Id})
 	}
 	return nil
 }
@@ -782,7 +792,7 @@
 }
 
 func (dMgr *Manager) UpdateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
-	logger.Debugw(ctx, "UpdateDeviceUsingAdapterData", log.Fields{"device-id": device.Id, "device": device})
+	logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id, "device": device})
 	if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
 		return agent.updateDeviceUsingAdapterData(ctx, device)
 	}
@@ -809,7 +819,9 @@
 	if err != nil {
 		return err
 	}
-	if err = dMgr.logicalDeviceMgr.updateLogicalPort(log.WithSpanFromContext(context.Background(), ctx), device, ports, port); err != nil {
+	subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+
+	if err = dMgr.logicalDeviceMgr.updateLogicalPort(subCtx, device, ports, port); err != nil {
 		return err
 	}
 	return nil
@@ -823,7 +835,8 @@
 		}
 		//	Setup peer ports in its own routine
 		go func() {
-			if err := dMgr.addPeerPort(log.WithSpanFromContext(context.Background(), ctx), deviceID, port); err != nil {
+			subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+			if err := dMgr.addPeerPort(subCtx, deviceID, port); err != nil {
 				logger.Errorw(ctx, "unable-to-add-peer-port", log.Fields{"error": err, "device-id": deviceID})
 			}
 		}()
@@ -833,7 +846,7 @@
 }
 
 func (dMgr *Manager) addFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	logger.Debugw(ctx, "addFlowsAndGroups", log.Fields{"device-id": deviceID, "groups:": groups, "flowMetadata": flowMetadata})
+	logger.Debugw(ctx, "add-flows-and-groups", log.Fields{"device-id": deviceID, "groups:": groups, "flow-metadata": flowMetadata})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		return agent.addFlowsAndGroups(ctx, flows, groups, flowMetadata)
 	}
@@ -842,7 +855,7 @@
 
 // deleteParentFlows removes flows from the parent device based on  specific attributes
 func (dMgr *Manager) deleteParentFlows(ctx context.Context, deviceID string, uniPort uint32, metadata *voltha.FlowMetadata) error {
-	logger.Debugw(ctx, "deleteParentFlows", log.Fields{"device-id": deviceID, "uni-port": uniPort, "metadata": metadata})
+	logger.Debugw(ctx, "delete-parent-flows", log.Fields{"device-id": deviceID, "uni-port": uniPort, "metadata": metadata})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		if !agent.isRootDevice {
 			return status.Errorf(codes.FailedPrecondition, "not-a-parent-device-%s", deviceID)
@@ -853,7 +866,7 @@
 }
 
 func (dMgr *Manager) deleteFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	logger.Debugw(ctx, "deleteFlowsAndGroups", log.Fields{"device-id": deviceID})
+	logger.Debugw(ctx, "delete-flows-and-groups", log.Fields{"device-id": deviceID})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		return agent.deleteFlowsAndGroups(ctx, flows, groups, flowMetadata)
 	}
@@ -861,7 +874,7 @@
 }
 
 func (dMgr *Manager) updateFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	logger.Debugw(ctx, "updateFlowsAndGroups", log.Fields{"device-id": deviceID})
+	logger.Debugw(ctx, "update-flows-and-groups", log.Fields{"device-id": deviceID})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		return agent.updateFlowsAndGroups(ctx, flows, groups, flowMetadata)
 	}
@@ -871,8 +884,8 @@
 // UpdateDevicePmConfigs updates the PM configs.  This is executed when the northbound gRPC API is invoked, typically
 // following a user action
 func (dMgr *Manager) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "UpdateDevicePmConfigs")
 	log.EnrichSpan(ctx, log.Fields{"device-id": configs.Id})
-
 	if configs.Id == "" {
 		return nil, status.Error(codes.FailedPrecondition, "invalid-device-Id")
 	}
@@ -896,8 +909,8 @@
 
 // ListDevicePmConfigs returns pm configs of device
 func (dMgr *Manager) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "ListDevicePmConfigs")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
 	agent := dMgr.getDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -906,7 +919,7 @@
 }
 
 func (dMgr *Manager) getSwitchCapability(ctx context.Context, deviceID string) (*ic.SwitchCapability, error) {
-	logger.Debugw(ctx, "getSwitchCapability", log.Fields{"device-id": deviceID})
+	logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": deviceID})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		return agent.getSwitchCapability(ctx)
 	}
@@ -914,7 +927,7 @@
 }
 
 func (dMgr *Manager) GetPorts(ctx context.Context, deviceID string, portType voltha.Port_PortType) (*voltha.Ports, error) {
-	logger.Debugw(ctx, "GetPorts", log.Fields{"device-id": deviceID, "portType": portType})
+	logger.Debugw(ctx, "get-ports", log.Fields{"device-id": deviceID, "port-type": portType})
 	agent := dMgr.getDeviceAgent(ctx, deviceID)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", deviceID)
@@ -923,7 +936,7 @@
 }
 
 func (dMgr *Manager) UpdateDeviceStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
-	logger.Debugw(ctx, "UpdateDeviceStatus", log.Fields{"device-id": deviceID, "operStatus": operStatus, "connStatus": connStatus})
+	logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": deviceID, "oper-status": operStatus, "conn-status": connStatus})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		return agent.updateDeviceStatus(ctx, operStatus, connStatus)
 	}
@@ -931,7 +944,7 @@
 }
 
 func (dMgr *Manager) UpdateChildrenStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
-	logger.Debugw(ctx, "UpdateChildrenStatus", log.Fields{"parent-device-id": deviceID, "operStatus": operStatus, "connStatus": connStatus})
+	logger.Debugw(ctx, "update-children-status", log.Fields{"parent-device-id": deviceID, "oper-status": operStatus, "conn-status": connStatus})
 	parentDevicePorts, err := dMgr.listDevicePorts(ctx, deviceID)
 	if err != nil {
 		return status.Errorf(codes.Aborted, "%s", err.Error())
@@ -947,17 +960,18 @@
 }
 
 func (dMgr *Manager) UpdatePortState(ctx context.Context, deviceID string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
-	logger.Debugw(ctx, "UpdatePortState", log.Fields{"device-id": deviceID, "portType": portType, "portNo": portNo, "operStatus": operStatus})
+	logger.Debugw(ctx, "update-port-state", log.Fields{"device-id": deviceID, "port-type": portType, "port-no": portNo, "oper-status": operStatus})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		if err := agent.updatePortState(ctx, portType, portNo, operStatus); err != nil {
-			logger.Errorw(ctx, "updating-port-state-failed", log.Fields{"device-id": deviceID, "portNo": portNo, "error": err})
+			logger.Errorw(ctx, "updating-port-state-failed", log.Fields{"device-id": deviceID, "port-no": portNo, "error": err})
 			return err
 		}
 		// Notify the logical device manager to change the port state
 		// Do this for NNI and UNIs only. PON ports are not known by logical device
 		if portType == voltha.Port_ETHERNET_NNI || portType == voltha.Port_ETHERNET_UNI {
 			go func() {
-				err := dMgr.logicalDeviceMgr.updatePortState(log.WithSpanFromContext(context.Background(), ctx), deviceID, portNo, operStatus)
+				subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+				err := dMgr.logicalDeviceMgr.updatePortState(subCtx, deviceID, portNo, operStatus)
 				if err != nil {
 					// While we want to handle (catch) and log when
 					// an update to a port was not able to be
@@ -975,7 +989,7 @@
 }
 
 func (dMgr *Manager) DeleteAllPorts(ctx context.Context, deviceID string) error {
-	logger.Debugw(ctx, "DeleteAllPorts", log.Fields{"device-id": deviceID})
+	logger.Debugw(ctx, "delete-all-ports", log.Fields{"device-id": deviceID})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		if err := agent.deleteAllPorts(ctx); err != nil {
 			return err
@@ -985,7 +999,8 @@
 		// typically is part of a device deletion phase.
 		if device, err := dMgr.getDeviceReadOnly(ctx, deviceID); err == nil {
 			go func() {
-				if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(log.WithSpanFromContext(context.Background(), ctx), device); err != nil {
+				subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+				if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(subCtx, device); err != nil {
 					logger.Errorw(ctx, "unable-to-delete-logical-ports", log.Fields{"error": err})
 				}
 			}()
@@ -1000,7 +1015,7 @@
 
 //UpdatePortsState updates all ports on the device
 func (dMgr *Manager) UpdatePortsState(ctx context.Context, deviceID string, portTypeFilter uint32, state voltha.OperStatus_Types) error {
-	logger.Debugw(ctx, "UpdatePortsState", log.Fields{"device-id": deviceID})
+	logger.Debugw(ctx, "update-ports-state", log.Fields{"device-id": deviceID})
 	agent := dMgr.getDeviceAgent(ctx, deviceID)
 	if agent == nil {
 		return status.Errorf(codes.NotFound, "%s", deviceID)
@@ -1009,7 +1024,7 @@
 		return status.Error(codes.Unimplemented, "state-change-not-implemented")
 	}
 	if err := agent.updatePortsOperState(ctx, portTypeFilter, state); err != nil {
-		logger.Warnw(ctx, "updatePortsOperState-failed", log.Fields{"device-id": deviceID, "error": err})
+		logger.Warnw(ctx, "update-ports-state-failed", log.Fields{"device-id": deviceID, "error": err})
 		return err
 	}
 	return nil
@@ -1017,7 +1032,7 @@
 
 func (dMgr *Manager) ChildDeviceDetected(ctx context.Context, parentDeviceID string, parentPortNo int64, deviceType string,
 	channelID int64, vendorID string, serialNumber string, onuID int64) (*voltha.Device, error) {
-	logger.Debugw(ctx, "ChildDeviceDetected", log.Fields{"parent-device-id": parentDeviceID, "parentPortNo": parentPortNo, "deviceType": deviceType, "channelId": channelID, "vendorId": vendorID, "serialNumber": serialNumber, "onuId": onuID})
+	logger.Debugw(ctx, "child-device-detected", log.Fields{"parent-device-id": parentDeviceID, "parent-port-no": parentPortNo, "device-type": deviceType, "channel-id": channelID, "vendor-id": vendorID, "serial-number": serialNumber, "onu-id": onuID})
 
 	if deviceType == "" && vendorID != "" {
 		logger.Debug(ctx, "device-type-is-nil-fetching-device-type")
@@ -1037,7 +1052,7 @@
 	}
 	//if no match found for the vendorid,report adapter with the custom error message
 	if deviceType == "" {
-		logger.Errorw(ctx, "failed-to-fetch-adapter-name ", log.Fields{"vendorId": vendorID})
+		logger.Errorw(ctx, "failed-to-fetch-adapter-name ", log.Fields{"vendor-id": vendorID})
 		return nil, status.Errorf(codes.NotFound, "%s", vendorID)
 	}
 
@@ -1060,7 +1075,7 @@
 	}
 
 	if device, err := dMgr.GetChildDevice(ctx, parentDeviceID, serialNumber, onuID, parentPortNo); err == nil {
-		logger.Warnw(ctx, "child-device-exists", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber})
+		logger.Warnw(ctx, "child-device-exists", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber})
 		return device, status.Errorf(codes.AlreadyExists, "%s", serialNumber)
 	}
 
@@ -1078,7 +1093,8 @@
 	// Activate the child device
 	if agent = dMgr.getDeviceAgent(ctx, agent.deviceID); agent != nil {
 		go func() {
-			err := agent.enableDevice(log.WithSpanFromContext(context.Background(), ctx))
+			subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+			err := agent.enableDevice(subCtx)
 			if err != nil {
 				logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
 			}
@@ -1089,7 +1105,7 @@
 }
 
 func (dMgr *Manager) packetOut(ctx context.Context, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) error {
-	logger.Debugw(ctx, "packetOut", log.Fields{"device-id": deviceID, "outPort": outPort})
+	logger.Debugw(ctx, "packet-out", log.Fields{"device-id": deviceID, "out-port": outPort})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		return agent.packetOut(ctx, outPort, packet)
 	}
@@ -1098,7 +1114,7 @@
 
 // PacketIn receives packet from adapter
 func (dMgr *Manager) PacketIn(ctx context.Context, deviceID string, port uint32, transactionID string, packet []byte) error {
-	logger.Debugw(ctx, "PacketIn", log.Fields{"device-id": deviceID, "port": port})
+	logger.Debugw(ctx, "packet-in", log.Fields{"device-id": deviceID, "port": port})
 	// Get the logical device Id based on the deviceId
 	var device *voltha.Device
 	var err error
@@ -1118,24 +1134,25 @@
 }
 
 func (dMgr *Manager) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
-	logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parentId": parentID})
+	logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})
 	if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
 		return agent.setParentID(ctx, device, parentID)
 	}
 	return status.Errorf(codes.NotFound, "%s", device.Id)
 }
 
-// CreateLogicalDevice creates logical device in core
+//
+//CreateLogicalDevice creates logical device in core
 func (dMgr *Manager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
-	logger.Info(ctx, "CreateLogicalDevice")
+	logger.Info(ctx, "create-logical-device")
 	// Verify whether the logical device has already been created
 	if cDevice.ParentId != "" {
-		logger.Debugw(ctx, "Parent device already exist.", log.Fields{"device-id": cDevice.Id, "logical-device-id": cDevice.Id})
+		logger.Debugw(ctx, "parent-device-already-exist", log.Fields{"device-id": cDevice.Id, "logical-device-id": cDevice.Id})
 		return nil
 	}
 	var err error
 	if _, err = dMgr.logicalDeviceMgr.createLogicalDevice(ctx, cDevice); err != nil {
-		logger.Warnw(ctx, "createlogical-device-error", log.Fields{"device": cDevice})
+		logger.Warnw(ctx, "create-logical-device-error", log.Fields{"device": cDevice})
 		return err
 	}
 	return nil
@@ -1143,10 +1160,10 @@
 
 // DeleteLogicalDevice deletes logical device from core
 func (dMgr *Manager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
-	logger.Info(ctx, "DeleteLogicalDevice")
+	logger.Info(ctx, "delete-logical-device")
 	var err error
 	if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(ctx, cDevice); err != nil {
-		logger.Warnw(ctx, "deleteLogical-device-error", log.Fields{"device-id": cDevice.Id})
+		logger.Warnw(ctx, "delete-logical-device-error", log.Fields{"device-id": cDevice.Id})
 		return err
 	}
 	// Remove the logical device Id from the parent device
@@ -1160,7 +1177,7 @@
 	logger.Debugw(ctx, "delete-all-logical-ports", log.Fields{"device-id": cDevice.Id})
 	if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, cDevice.Id); err != nil {
 		// Just log the error.   The logical device or port may already have been deleted before this callback is invoked.
-		logger.Warnw(ctx, "deleteLogical-ports-error", log.Fields{"device-id": cDevice.Id, "error": err})
+		logger.Warnw(ctx, "delete-logical-ports-error", log.Fields{"device-id": cDevice.Id, "error": err})
 	}
 	return nil
 }
@@ -1178,7 +1195,7 @@
 //ChildDevicesLost is invoked by an adapter to indicate that a parent device is in a state (Disabled) where it
 //cannot manage the child devices.  This will trigger the Core to disable all the child devices.
 func (dMgr *Manager) ChildDevicesLost(ctx context.Context, parentDeviceID string) error {
-	logger.Debug(ctx, "ChildDevicesLost")
+	logger.Debug(ctx, "child-devices-lost")
 	parentDevice, err := dMgr.getDeviceReadOnly(ctx, parentDeviceID)
 	if err != nil {
 		logger.Warnw(ctx, "failed-getting-device", log.Fields{"parent-device-id": parentDeviceID, "error": err})
@@ -1190,7 +1207,7 @@
 //ChildDevicesDetected is invoked by an adapter when child devices are found, typically after after a
 // disable/enable sequence.  This will trigger the Core to Enable all the child devices of that parent.
 func (dMgr *Manager) ChildDevicesDetected(ctx context.Context, parentDeviceID string) error {
-	logger.Debug(ctx, "ChildDevicesDetected")
+	logger.Debug(ctx, "child-devices-detected")
 	parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID)
 	if err != nil {
 		logger.Warnw(ctx, "failed-getting-device", log.Fields{"device-id": parentDeviceID, "error": err})
@@ -1203,16 +1220,17 @@
 	allChildEnableRequestSent := true
 	for childDeviceID := range childDeviceIds {
 		if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
+			subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
 			// Run the children re-registration in its own routine
 			go func(ctx context.Context) {
 				err = agent.enableDevice(ctx)
 				if err != nil {
 					logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
 				}
-			}(log.WithSpanFromContext(context.Background(), ctx))
+			}(subCtx)
 		} else {
 			err = status.Errorf(codes.Unavailable, "no agent for child device %s", childDeviceID)
-			logger.Errorw(ctx, "no-child-device-agent", log.Fields{"parent-device-id": parentDeviceID, "childId": childDeviceID})
+			logger.Errorw(ctx, "no-child-device-agent", log.Fields{"parent-device-id": parentDeviceID, "child-id": childDeviceID})
 			allChildEnableRequestSent = false
 		}
 	}
@@ -1229,7 +1247,7 @@
 
 //DisableAllChildDevices is invoked as a callback when the parent device is disabled
 func (dMgr *Manager) DisableAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
-	logger.Debug(ctx, "DisableAllChildDevices")
+	logger.Debug(ctx, "disable-all-child-devices")
 	ports, _ := dMgr.listDevicePorts(ctx, parentCurrDevice.Id)
 	for childDeviceID := range dMgr.getAllChildDeviceIds(ctx, ports) {
 		if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
@@ -1244,7 +1262,7 @@
 
 //DeleteAllChildDevices is invoked as a callback when the parent device is deleted
 func (dMgr *Manager) DeleteAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
-	logger.Debug(ctx, "DeleteAllChildDevices")
+	logger.Debug(ctx, "delete-all-child-devices")
 	force := false
 	// Get the parent device Transient state, if its FORCE_DELETED(go for force delete for child devices)
 	// So in cases when this handler is getting called other than DELETE operation, no force option would be used.
@@ -1301,7 +1319,7 @@
 
 //getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
 func (dMgr *Manager) getAllChildDeviceIds(ctx context.Context, parentDevicePorts map[uint32]*voltha.Port) map[string]struct{} {
-	logger.Debug(ctx, "getAllChildDeviceIds")
+	logger.Debug(ctx, "get-all-child-device-ids")
 	childDeviceIds := make(map[string]struct{}, len(parentDevicePorts))
 	for _, port := range parentDevicePorts {
 		for _, peer := range port.Peers {
@@ -1314,7 +1332,7 @@
 
 //GetAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
 func (dMgr *Manager) GetAllChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error) {
-	logger.Debugw(ctx, "GetAllChildDevices", log.Fields{"parent-device-id": parentDeviceID})
+	logger.Debugw(ctx, "get-all-child-devices", log.Fields{"parent-device-id": parentDeviceID})
 	if parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID); err == nil {
 		childDevices := make([]*voltha.Device, 0)
 		for deviceID := range dMgr.getAllChildDeviceIds(ctx, parentDevicePorts) {
@@ -1329,13 +1347,13 @@
 
 // SetupUNILogicalPorts creates UNI ports on the logical device that represents a child UNI interface
 func (dMgr *Manager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
-	logger.Info(ctx, "SetupUNILogicalPorts")
+	logger.Info(ctx, "setup-uni-logical-ports")
 	cDevicePorts, err := dMgr.listDevicePorts(ctx, cDevice.Id)
 	if err != nil {
 		return err
 	}
 	if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice, cDevicePorts); err != nil {
-		logger.Warnw(ctx, "setupUNILogicalPorts-error", log.Fields{"device": cDevice, "err": err})
+		logger.Warnw(ctx, "setup-uni-logical-ports-error", log.Fields{"device": cDevice, "err": err})
 		return err
 	}
 	return nil
@@ -1346,9 +1364,9 @@
 
 // DownloadImage execute an image download request
 func (dMgr *Manager) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "DownloadImage")
 	log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
-	logger.Debugw(ctx, "DownloadImage", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	logger.Debugw(ctx, "download-image", log.Fields{"device-id": img.Id, "image-name": img.Name})
 	agent := dMgr.getDeviceAgent(ctx, img.Id)
 	if agent == nil {
 		return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1362,9 +1380,9 @@
 
 // CancelImageDownload cancels image download request
 func (dMgr *Manager) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "CancelImageDownload")
 	log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
-	logger.Debugw(ctx, "CancelImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	logger.Debugw(ctx, "cancel-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
 	agent := dMgr.getDeviceAgent(ctx, img.Id)
 	if agent == nil {
 		return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1378,9 +1396,9 @@
 
 // ActivateImageUpdate activates image update request
 func (dMgr *Manager) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "ActivateImageUpdate")
 	log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
-	logger.Debugw(ctx, "ActivateImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	logger.Debugw(ctx, "activate-image-update", log.Fields{"device-id": img.Id, "image-name": img.Name})
 	agent := dMgr.getDeviceAgent(ctx, img.Id)
 	if agent == nil {
 		return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1394,9 +1412,9 @@
 
 // RevertImageUpdate reverts image update
 func (dMgr *Manager) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "RevertImageUpdate")
 	log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
-	logger.Debugw(ctx, "RevertImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	logger.Debugw(ctx, "rever-image-update", log.Fields{"device-id": img.Id, "image-name": img.Name})
 	agent := dMgr.getDeviceAgent(ctx, img.Id)
 	if agent == nil {
 		return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1413,9 +1431,9 @@
 
 // GetImageDownloadStatus returns status of image download
 func (dMgr *Manager) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "GetImageDownloadStatus")
 	log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
-	logger.Debugw(ctx, "GetImageDownloadStatus", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	logger.Debugw(ctx, "get-image-download-status", log.Fields{"device-id": img.Id, "image-name": img.Name})
 	agent := dMgr.getDeviceAgent(ctx, img.Id)
 	if agent == nil {
 		return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1428,12 +1446,12 @@
 }
 
 func (dMgr *Manager) UpdateImageDownload(ctx context.Context, deviceID string, img *voltha.ImageDownload) error {
+	ctx = utils.WithRPCMetadataContext(ctx, "UpdateImageDownload")
 	log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
-	logger.Debugw(ctx, "UpdateImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	logger.Debugw(ctx, "update-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		if err := agent.updateImageDownload(ctx, img); err != nil {
-			logger.Debugw(ctx, "UpdateImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
+			logger.Debugw(ctx, "update-image-download-failed", log.Fields{"err": err, "image-name": img.Name})
 			return err
 		}
 	} else {
@@ -1444,9 +1462,9 @@
 
 // GetImageDownload returns image download
 func (dMgr *Manager) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "GetImageDownload")
 	log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
-	logger.Debugw(ctx, "GetImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+	logger.Debugw(ctx, "get-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
 	agent := dMgr.getDeviceAgent(ctx, img.Id)
 	if agent == nil {
 		return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1460,9 +1478,9 @@
 
 // ListImageDownloads returns image downloads
 func (dMgr *Manager) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "ListImageDownloads")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
-	logger.Debugw(ctx, "ListImageDownloads", log.Fields{"device-id": id.Id})
+	logger.Debugw(ctx, "list-image-downloads", log.Fields{"device-id": id.Id})
 	agent := dMgr.getDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return &voltha.ImageDownloads{Items: []*voltha.ImageDownload{imageDownloadFailureResp}}, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -1476,9 +1494,9 @@
 
 // GetImages returns all images for a specific device entry
 func (dMgr *Manager) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "GetImages")
 	log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
-	logger.Debugw(ctx, "GetImages", log.Fields{"device-id": id.Id})
+	logger.Debugw(ctx, "get-images", log.Fields{"device-id": id.Id})
 	device, err := dMgr.getDeviceReadOnly(ctx, id.Id)
 	if err != nil {
 		return nil, err
@@ -1487,7 +1505,7 @@
 }
 
 func (dMgr *Manager) NotifyInvalidTransition(ctx context.Context, device *voltha.Device) error {
-	logger.Errorw(ctx, "NotifyInvalidTransition", log.Fields{
+	logger.Errorw(ctx, "notify-invalid-transition", log.Fields{
 		"device":           device.Id,
 		"curr-admin-state": device.AdminState,
 		"curr-oper-state":  device.OperStatus,
@@ -1507,16 +1525,17 @@
 // GetParentDeviceID returns parent device id, either from memory or from the dB, if present
 func (dMgr *Manager) GetParentDeviceID(ctx context.Context, deviceID string) string {
 	if device, _ := dMgr.getDeviceReadOnly(ctx, deviceID); device != nil {
-		logger.Infow(ctx, "GetParentDeviceId", log.Fields{"device-id": device.Id, "parentId": device.ParentId})
+		logger.Infow(ctx, "get-parent-device-id", log.Fields{"device-id": device.Id, "parent-id": device.ParentId})
 		return device.ParentId
 	}
 	return ""
 }
 
 func (dMgr *Manager) SimulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) (*common.OperationResp, error) {
-	logger.Debugw(ctx, "SimulateAlarm", log.Fields{"id": simulateReq.Id, "Indicator": simulateReq.Indicator, "IntfId": simulateReq.IntfId,
-		"PortTypeName": simulateReq.PortTypeName, "OnuDeviceId": simulateReq.OnuDeviceId, "InverseBitErrorRate": simulateReq.InverseBitErrorRate,
-		"Drift": simulateReq.Drift, "NewEqd": simulateReq.NewEqd, "OnuSerialNumber": simulateReq.OnuSerialNumber, "Operation": simulateReq.Operation})
+	ctx = utils.WithRPCMetadataContext(ctx, "SimulateAlarm")
+	logger.Debugw(ctx, "simulate-alarm", log.Fields{"id": simulateReq.Id, "indicator": simulateReq.Indicator, "intf-id": simulateReq.IntfId,
+		"port-type-name": simulateReq.PortTypeName, "onu-device-id": simulateReq.OnuDeviceId, "inverse-bit-error-rate": simulateReq.InverseBitErrorRate,
+		"drift": simulateReq.Drift, "new-eqd": simulateReq.NewEqd, "onu-serial-number": simulateReq.OnuSerialNumber, "operation": simulateReq.Operation})
 	agent := dMgr.getDeviceAgent(ctx, simulateReq.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", simulateReq.Id)
@@ -1528,7 +1547,7 @@
 }
 
 func (dMgr *Manager) UpdateDeviceReason(ctx context.Context, deviceID string, reason string) error {
-	logger.Debugw(ctx, "UpdateDeviceReason", log.Fields{"device-id": deviceID, "reason": reason})
+	logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": deviceID, "reason": reason})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
 		return agent.updateDeviceReason(ctx, reason)
 	}
@@ -1536,9 +1555,9 @@
 }
 
 func (dMgr *Manager) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "EnablePort")
 	log.EnrichSpan(ctx, log.Fields{"device-id": port.DeviceId})
-
-	logger.Debugw(ctx, "EnablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+	logger.Debugw(ctx, "enable-port", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
 	agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
@@ -1547,9 +1566,9 @@
 }
 
 func (dMgr *Manager) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "DisablePort")
 	log.EnrichSpan(ctx, log.Fields{"device-id": port.DeviceId})
-
-	logger.Debugw(ctx, "DisablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+	logger.Debugw(ctx, "disable-port", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
 	agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
@@ -1559,7 +1578,7 @@
 
 // ChildDeviceLost  calls parent adapter to delete child device and all its references
 func (dMgr *Manager) ChildDeviceLost(ctx context.Context, curr *voltha.Device) error {
-	logger.Debugw(ctx, "childDeviceLost", log.Fields{"child-device-id": curr.Id, "parent-device-id": curr.ParentId})
+	logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": curr.Id, "parent-device-id": curr.ParentId})
 	if parentAgent := dMgr.getDeviceAgent(ctx, curr.ParentId); parentAgent != nil {
 		if err := parentAgent.ChildDeviceLost(ctx, curr); err != nil {
 			// Just log the message and let the remaining pipeline proceed.
@@ -1571,9 +1590,9 @@
 }
 
 func (dMgr *Manager) StartOmciTestAction(ctx context.Context, request *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "StartOmciTestAction")
 	log.EnrichSpan(ctx, log.Fields{"device-id": request.Id})
-
-	logger.Debugw(ctx, "StartOmciTestAction", log.Fields{"device-id": request.Id, "uuid": request.Uuid})
+	logger.Debugw(ctx, "start-omci-test-action", log.Fields{"device-id": request.Id, "uuid": request.Uuid})
 	agent := dMgr.getDeviceAgent(ctx, request.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", request.Id)
@@ -1582,9 +1601,9 @@
 }
 
 func (dMgr *Manager) GetExtValue(ctx context.Context, value *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "GetExtValue")
 	log.EnrichSpan(ctx, log.Fields{"device-id": value.Id})
-
-	logger.Debugw(ctx, "getExtValue", log.Fields{"onu-id": value.Id})
+	logger.Debugw(ctx, "get-ext-value", log.Fields{"onu-id": value.Id})
 	cDevice, err := dMgr.getDeviceReadOnly(ctx, value.Id)
 	if err != nil {
 		return nil, status.Errorf(codes.Aborted, "%s", err.Error())
@@ -1598,7 +1617,7 @@
 		if err != nil {
 			return nil, err
 		}
-		logger.Debugw(ctx, "getExtValue-result", log.Fields{"result": resp})
+		logger.Debugw(ctx, "get-ext-value-result", log.Fields{"result": resp})
 		return resp, nil
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", value.Id)
@@ -1607,7 +1626,8 @@
 
 // SetExtValue  set some given configs or value
 func (dMgr *Manager) SetExtValue(ctx context.Context, value *voltha.ValueSet) (*empty.Empty, error) {
-	logger.Debugw(ctx, "setExtValue", log.Fields{"onu-id": value.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "SetExtValue")
+	logger.Debugw(ctx, "set-ext-value", log.Fields{"onu-id": value.Id})
 	device, err := dMgr.getDeviceReadOnly(ctx, value.Id)
 	if err != nil {
 		return nil, status.Errorf(codes.Aborted, "%s", err.Error())
@@ -1617,9 +1637,15 @@
 		if err != nil {
 			return nil, err
 		}
-		logger.Debugw(ctx, "setExtValue-result", log.Fields{"result": resp})
+		logger.Debugw(ctx, "set-ext-value-result", log.Fields{"result": resp})
 		return resp, nil
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", value.Id)
 
 }
+
+func (dMgr *Manager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent,
+	category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+	//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
+	dMgr.RPCEventManager.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+}