VOL-3501 Code changes to support rpc event

Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 6b8abef..2b88abc 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -21,6 +21,7 @@
 	"errors"
 	"fmt"
 	"strconv"
+	"time"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/rw_core/route"
@@ -48,7 +49,7 @@
 
 //updateFlowTable updates the flow table of that logical device
 func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.FlowTableUpdate) error {
-	logger.Debug(ctx, "UpdateFlowTable")
+	logger.Debug(ctx, "update-flow-table")
 	if flow == nil {
 		return nil
 	}
@@ -72,19 +73,19 @@
 //flowAdd adds a flow to the flow table of that logical device
 func (agent *LogicalAgent) flowAdd(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
 	mod := flowUpdate.FlowMod
-	logger.Debugw(ctx, "flowAdd", log.Fields{"flow": mod})
+	logger.Debugw(ctx, "flow-add", log.Fields{"flow": mod})
 	if mod == nil {
 		return nil
 	}
 	flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
 	if err != nil {
-		logger.Errorw(ctx, "flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
+		logger.Errorw(ctx, "flow-add-failed", log.Fields{"flow-mod": mod, "err": err})
 		return err
 	}
 	var updated bool
 	var changed bool
 	if changed, updated, err = agent.decomposeAndAdd(ctx, flow, flowUpdate); err != nil {
-		logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
+		logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flow-mod": mod, "err": err})
 		return err
 	}
 	if changed && !updated {
@@ -115,7 +116,7 @@
 		// TODO: this currently does nothing
 		if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
 			// TODO: should this error be notified other than being logged?
-			logger.Warnw(ctx, "overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+			logger.Warnw(ctx, "overlapped-flows", log.Fields{"logical-device-id": agent.logicalDeviceID})
 		} else {
 			//	Add flow
 			changed = true
@@ -141,7 +142,7 @@
 
 		flowMeterConfig, err := agent.GetMeterConfig(ctx, updatedFlows)
 		if err != nil {
-			logger.Error(ctx, "Meter-referred-in-flow-not-present")
+			logger.Error(ctx, "meter-referred-in-flow-not-present")
 			return changed, updated, err
 		}
 
@@ -178,8 +179,10 @@
 					"flow":              flow,
 					"groups":            groups,
 				})
+				subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+
 				// Revert added flows
-				if err := agent.revertAddedFlows(log.WithSpanFromContext(context.Background(), ctx), mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
+				if err := agent.revertAddedFlows(subCtx, mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
 					logger.Errorw(ctx, "failure-to-delete-flow-after-failed-addition", log.Fields{
 						"error":             err,
 						"logical-device-id": agent.logicalDeviceID,
@@ -189,6 +192,13 @@
 				}
 				// send event
 				agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
+				context := make(map[string]string)
+				context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+				context["flow-id"] = string(flow.Id)
+				context["device-rules"] = deviceRules.String()
+				go agent.ldeviceMgr.SendRPCEvent(ctx,
+					agent.logicalDeviceID, "failed-to-add-flow", context, "RPC_ERROR_RAISE_EVENT",
+					voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
 			}
 		}()
 	}
@@ -198,7 +208,7 @@
 // revertAddedFlows reverts flows after the flowAdd request has failed.  All flows corresponding to that flowAdd request
 // will be reverted, both from the logical devices and the devices.
 func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
-	logger.Debugw(ctx, "revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
+	logger.Debugw(ctx, "revert-flow-add", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
 
 	flowHandle, have := agent.flowLoader.Lock(addedFlow.Id)
 	if !have {
@@ -243,7 +253,7 @@
 
 //flowDelete deletes a flow from the flow table of that logical device
 func (agent *LogicalAgent) flowDelete(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
-	logger.Debug(ctx, "flowDelete")
+	logger.Debug(ctx, "flow-delete")
 	mod := flowUpdate.FlowMod
 	if mod == nil {
 		return nil
@@ -274,7 +284,7 @@
 
 	//Delete the matched flows
 	if len(toDelete) > 0 {
-		logger.Debugw(ctx, "flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
+		logger.Debugw(ctx, "flow-delete", log.Fields{"logical-device-id": agent.logicalDeviceID, "to-delete": len(toDelete)})
 
 		for _, flow := range toDelete {
 			if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
@@ -299,7 +309,7 @@
 
 		metersConfig, err := agent.GetMeterConfig(ctx, toDelete)
 		if err != nil { // This should never happen
-			logger.Error(ctx, "Meter-referred-in-flows-not-present")
+			logger.Error(ctx, "meter-referred-in-flows-not-present")
 			return err
 		}
 
@@ -336,7 +346,13 @@
 		go func() {
 			// Wait for completion
 			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
-				logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+				logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+				context := make(map[string]string)
+				context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+				context["device-rules"] = deviceRules.String()
+				go agent.ldeviceMgr.SendRPCEvent(ctx,
+					agent.logicalDeviceID, "failed-to-update-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+					voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
 				// TODO: Revert the flow deletion
 				// send event, and allow any queued events to be sent as well
 				agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
@@ -350,7 +366,7 @@
 //flowDeleteStrict deletes a flow from the flow table of that logical device
 func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
 	mod := flowUpdate.FlowMod
-	logger.Debugw(ctx, "flowDeleteStrict", log.Fields{"mod": mod})
+	logger.Debugw(ctx, "flow-delete-strict", log.Fields{"mod": mod})
 	if mod == nil {
 		return nil
 	}
@@ -359,10 +375,10 @@
 	if err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
+	logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flow-id": flow.Id})
 	flowHandle, have := agent.flowLoader.Lock(flow.Id)
 	if !have {
-		logger.Debugw(ctx, "Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
+		logger.Debugw(ctx, "skipping-flow-delete-strict-request-no-flow-found", log.Fields{"flow-mod": mod})
 		return nil
 	}
 	defer flowHandle.Unlock()
@@ -421,6 +437,14 @@
 			// TODO: Revert flow changes
 			// send event, and allow any queued events to be sent as well
 			agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
+			context := make(map[string]string)
+			context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+			context["flow-id"] = string(flow.Id)
+			context["device-rules"] = deviceRules.String()
+			// Create context and send extra information as part of it.
+			go agent.ldeviceMgr.SendRPCEvent(ctx,
+				agent.logicalDeviceID, "failed-to-delete-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
 		}
 	}()
 
@@ -448,7 +472,7 @@
 }
 
 func (agent *LogicalAgent) deleteFlowsHavingMeter(ctx context.Context, meterID uint32) error {
-	logger.Infow(ctx, "Delete-flows-matching-meter", log.Fields{"meter": meterID})
+	logger.Infow(ctx, "delete-flows-matching-meter", log.Fields{"meter": meterID})
 	for flowID := range agent.flowLoader.ListIDs() {
 		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
 			if flowMeterID := fu.GetMeterIdFromFlow(flowHandle.GetReadOnly()); flowMeterID != 0 && flowMeterID == meterID {
@@ -466,7 +490,7 @@
 }
 
 func (agent *LogicalAgent) deleteFlowsHavingGroup(ctx context.Context, groupID uint32) (map[uint64]*ofp.OfpFlowStats, error) {
-	logger.Infow(ctx, "Delete-flows-matching-group", log.Fields{"groupID": groupID})
+	logger.Infow(ctx, "delete-flows-matching-group", log.Fields{"group-id": groupID})
 	flowsRemoved := make(map[uint64]*ofp.OfpFlowStats)
 	for flowID := range agent.flowLoader.ListIDs() {
 		if flowHandle, have := agent.flowLoader.Lock(flowID); have {