VOL-3501 Code changes to support rpc event

Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 2fca7c5..9ea12fa 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -67,7 +67,7 @@
 			// This can happen when an agent for the logical device has been created but the logical device
 			// itself is not ready for action as it is waiting for switch and port capabilities from the
 			// relevant adapter.  In such a case prevent any request aimed at that logical device.
-			logger.Debugf(ctx, "Logical device %s is not ready to serve requests", logicalDeviceID)
+			logger.Debugf(ctx, "logical-device-%s-is-not-ready-to-serve-requests", logicalDeviceID)
 			return nil
 		}
 		return lda
@@ -88,7 +88,8 @@
 // GetLogicalDevice provides a cloned most up to date logical device.  If device is not in memory
 // it will be fetched from the dB
 func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
-	logger.Debugw(ctx, "getlogicalDevice", log.Fields{"logical-device-id": id})
+	ctx = utils.WithRPCMetadataContext(ctx, "GetLogicalDevice")
+	logger.Debugw(ctx, "get-logical-device", log.Fields{"logical-device-id": id})
 	if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
 		return agent.GetLogicalDeviceReadOnly(ctx)
 	}
@@ -97,7 +98,8 @@
 
 //ListLogicalDevices returns the list of all logical devices
 func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context, _ *empty.Empty) (*voltha.LogicalDevices, error) {
-	logger.Debug(ctx, "ListAllLogicalDevices")
+	ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDevices")
+	logger.Debug(ctx, "list-all-logical-devices")
 
 	var logicalDevices []*voltha.LogicalDevice
 	if err := ldMgr.ldProxy.List(ctx, &logicalDevices); err != nil {
@@ -139,7 +141,8 @@
 	go func() {
 		//TODO: either wait for the agent to be started before returning, or
 		//      implement locks in the agent to ensure request are not processed before start() is complete
-		err := agent.start(log.WithSpanFromContext(context.Background(), ctx), false)
+		ldCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+		err := agent.start(ldCtx, false)
 		if err != nil {
 			logger.Errorw(ctx, "unable-to-create-the-logical-device", log.Fields{"error": err})
 			ldMgr.deleteLogicalDeviceAgent(id)
@@ -205,7 +208,7 @@
 				}
 				ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
 			} else {
-				logger.Debugw(ctx, "logicalDevice not in model", log.Fields{"lDeviceId": lDeviceID})
+				logger.Debugw(ctx, "logical-device-not-in-model", log.Fields{"logical-device-id": lDeviceID})
 			}
 			// announce completion of task to any number of waiting channels
 			ldMgr.logicalDevicesLoadingLock.Lock()
@@ -281,7 +284,8 @@
 
 // ListLogicalDeviceFlows returns the flows of logical device
 func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
-	logger.Debugw(ctx, "ListLogicalDeviceFlows", log.Fields{"logical-device-id": id.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceFlows")
+	logger.Debugw(ctx, "list-logical-device-flows", log.Fields{"logical-device-id": id.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -298,7 +302,8 @@
 
 // ListLogicalDeviceFlowGroups returns logical device flow groups
 func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
-	logger.Debugw(ctx, "ListLogicalDeviceFlowGroups", log.Fields{"logical-device-id": id.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceFlowGroups")
+	logger.Debugw(ctx, "list-logical-device-flow-groups", log.Fields{"logical-device-id": id.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -315,7 +320,8 @@
 
 // ListLogicalDevicePorts returns logical device ports
 func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
-	logger.Debugw(ctx, "ListLogicalDevicePorts", log.Fields{"logical-device-id": id.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDevicePorts")
+	logger.Debugw(ctx, "list-logical-device-ports", log.Fields{"logical-device-id": id.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -332,6 +338,7 @@
 
 // GetLogicalDevicePort returns logical device port details
 func (ldMgr *LogicalManager) GetLogicalDevicePort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+	ctx = utils.WithRPCMetadataContext(ctx, "GetLogicalDevicePort")
 	// Get the logical device where this port is attached
 	agent := ldMgr.getLogicalDeviceAgent(ctx, lPortID.Id)
 	if agent == nil {
@@ -382,7 +389,7 @@
 }
 
 func (ldMgr *LogicalManager) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device, childDevicePorts map[uint32]*voltha.Port) error {
-	logger.Debugw(ctx, "setupUNILogicalPorts", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId, "current-data": childDevice})
+	logger.Debugw(ctx, "setup-uni-logical-ports", log.Fields{"child-device-id": childDevice.Id, "parent-device-id": childDevice.ParentId, "current-data": childDevice})
 	// Sanity check
 	if childDevice.Root {
 		return errors.New("Device-root")
@@ -392,7 +399,7 @@
 	parentID := childDevice.ParentId
 	logDeviceID := ldMgr.deviceMgr.GetParentDeviceID(ctx, parentID)
 
-	logger.Debugw(ctx, "setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceID, "parentId": parentID})
+	logger.Debugw(ctx, "setup-uni-logical-ports", log.Fields{"logical-device-id": logDeviceID, "parentId": parentID})
 
 	if parentID == "" || logDeviceID == "" {
 		return errors.New("device-in-invalid-state")
@@ -407,7 +414,7 @@
 }
 
 func (ldMgr *LogicalManager) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
-	logger.Debugw(ctx, "deleteAllLogicalPorts", log.Fields{"device-id": device.Id})
+	logger.Debugw(ctx, "delete-all-logical-ports", log.Fields{"device-id": device.Id})
 
 	var ldID *string
 	var err error
@@ -425,7 +432,7 @@
 }
 
 func (ldMgr *LogicalManager) updatePortState(ctx context.Context, deviceID string, portNo uint32, state voltha.OperStatus_Types) error {
-	logger.Debugw(ctx, "updatePortState", log.Fields{"device-id": deviceID, "state": state, "portNo": portNo})
+	logger.Debugw(ctx, "update-Port-state", log.Fields{"device-id": deviceID, "state": state, "port-no": portNo})
 
 	var ldID *string
 	var err error
@@ -444,7 +451,8 @@
 
 // UpdateLogicalDeviceFlowTable updates logical device flow table
 func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
-	logger.Debugw(ctx, "UpdateLogicalDeviceFlowTable", log.Fields{"logical-device-id": flow.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceFlowTable")
+	logger.Debugw(ctx, "update-logical-device-flow-table", log.Fields{"logical-device-id": flow.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
@@ -454,7 +462,8 @@
 
 // UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response
 func (ldMgr *LogicalManager) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
-	logger.Debugw(ctx, "UpdateLogicalDeviceMeterTable", log.Fields{"logical-device-id": meter.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceMeterTable")
+	logger.Debugw(ctx, "update-logical-device-meter-table", log.Fields{"logical-device-id": meter.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, meter.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", meter.Id)
@@ -464,7 +473,8 @@
 
 // ListLogicalDeviceMeters returns logical device meters
 func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
-	logger.Debugw(ctx, "ListLogicalDeviceMeters", log.Fields{"logical-device-id": id.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceMeters")
+	logger.Debugw(ctx, "list-logical-device-meters", log.Fields{"logical-device-id": id.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -480,7 +490,8 @@
 
 // UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
 func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
-	logger.Debugw(ctx, "UpdateGroupTable", log.Fields{"logical-device-id": flow.Id})
+	ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceFlowGroupTable")
+	logger.Debugw(ctx, "update-group-table", log.Fields{"logical-device-id": flow.Id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
@@ -490,7 +501,8 @@
 
 // EnableLogicalDevicePort enables logical device port
 func (ldMgr *LogicalManager) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
-	logger.Debugw(ctx, "EnableLogicalDevicePort", log.Fields{"logical-device-id": id})
+	ctx = utils.WithRPCMetadataContext(ctx, "EnableLogicalDevicePort")
+	logger.Debugw(ctx, "enable-logical-device-port", log.Fields{"logical-device-id": id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -504,7 +516,8 @@
 
 // DisableLogicalDevicePort disables logical device port
 func (ldMgr *LogicalManager) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
-	logger.Debugw(ctx, "DisableLogicalDevicePort", log.Fields{"logical-device-id": id})
+	ctx = utils.WithRPCMetadataContext(ctx, "DisableLogicalDevicePort")
+	logger.Debugw(ctx, "disable-logical-device-port", log.Fields{"logical-device-id": id})
 	agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
 	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -517,7 +530,7 @@
 }
 
 func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
-	logger.Debugw(ctx, "packetIn", log.Fields{"logical-device-id": logicalDeviceID, "port": port})
+	logger.Debugw(ctx, "packet-in", log.Fields{"logical-device-id": logicalDeviceID, "port": port})
 	if agent := ldMgr.getLogicalDeviceAgent(ctx, logicalDeviceID); agent != nil {
 		agent.packetIn(ctx, port, transactionID, packet)
 	} else {
@@ -529,35 +542,45 @@
 // StreamPacketsOut sends packets to adapter
 func (ldMgr *LogicalManager) StreamPacketsOut(packets voltha.VolthaService_StreamPacketsOutServer) error {
 	ctx := context.Background()
-	logger.Debugw(ctx, "StreamPacketsOut-request", log.Fields{"packets": packets})
+	logger.Debugw(ctx, "stream-packets-out-request", log.Fields{"packets": packets})
+
 loop:
 	for {
 		select {
 		case <-packets.Context().Done():
-			logger.Infow(ctx, "StreamPacketsOut-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
+			logger.Infow(ctx, "stream-packets-out-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
 			break loop
 		default:
 		}
 
 		packet, err := packets.Recv()
 
+		pktCtx := utils.WithRPCMetadataContext(packets.Context(), "StreamPacketsOut")
+
 		if err == io.EOF {
-			logger.Debugw(ctx, "Received-EOF", log.Fields{"packets": packets})
+			logger.Debugw(ctx, "received-eof", log.Fields{"packets": packets})
 			break loop
 		}
-
 		if err != nil {
-			logger.Errorw(ctx, "Failed to receive packet out", log.Fields{"error": err})
+			logger.Errorw(ctx, "failed-to-receive-packet-out", log.Fields{"error": err})
+			go ldMgr.SendRPCEvent(pktCtx, packet.Id, err.Error(), nil,
+				"RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
 			continue
 		}
 
-		if agent := ldMgr.getLogicalDeviceAgent(packets.Context(), packet.Id); agent != nil {
-			agent.packetOut(packets.Context(), packet.PacketOut)
+		if agent := ldMgr.getLogicalDeviceAgent(pktCtx, packet.Id); agent != nil {
+			agent.packetOut(pktCtx, packet.PacketOut)
 		} else {
-			logger.Errorf(ctx, "No logical device agent present", log.Fields{"logical-device-id": packet.Id})
+			logger.Errorf(ctx, "no-logical-device-agent-present", log.Fields{"logical-device-id": packet.Id})
 		}
 	}
 
-	logger.Debugw(ctx, "StreamPacketsOut-request-done", log.Fields{"packets": packets})
+	logger.Debugw(ctx, "stream-packets-out-request-done", log.Fields{"packets": packets})
 	return nil
 }
+
+func (ldMgr *LogicalManager) SendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
+	id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+	ldMgr.Manager.RPCEventManager.GetAndSendRPCEvent(ctx, resourceID, desc, context, id,
+		category, subCategory, raisedTs)
+}