VOL-3501 Code changes to support rpc event
Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 6b8abef..2b88abc 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -21,6 +21,7 @@
"errors"
"fmt"
"strconv"
+ "time"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/route"
@@ -48,7 +49,7 @@
//updateFlowTable updates the flow table of that logical device
func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.FlowTableUpdate) error {
- logger.Debug(ctx, "UpdateFlowTable")
+ logger.Debug(ctx, "update-flow-table")
if flow == nil {
return nil
}
@@ -72,19 +73,19 @@
//flowAdd adds a flow to the flow table of that logical device
func (agent *LogicalAgent) flowAdd(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
mod := flowUpdate.FlowMod
- logger.Debugw(ctx, "flowAdd", log.Fields{"flow": mod})
+ logger.Debugw(ctx, "flow-add", log.Fields{"flow": mod})
if mod == nil {
return nil
}
flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
if err != nil {
- logger.Errorw(ctx, "flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
+ logger.Errorw(ctx, "flow-add-failed", log.Fields{"flow-mod": mod, "err": err})
return err
}
var updated bool
var changed bool
if changed, updated, err = agent.decomposeAndAdd(ctx, flow, flowUpdate); err != nil {
- logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
+ logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flow-mod": mod, "err": err})
return err
}
if changed && !updated {
@@ -115,7 +116,7 @@
// TODO: this currently does nothing
if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
// TODO: should this error be notified other than being logged?
- logger.Warnw(ctx, "overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+ logger.Warnw(ctx, "overlapped-flows", log.Fields{"logical-device-id": agent.logicalDeviceID})
} else {
// Add flow
changed = true
@@ -141,7 +142,7 @@
flowMeterConfig, err := agent.GetMeterConfig(ctx, updatedFlows)
if err != nil {
- logger.Error(ctx, "Meter-referred-in-flow-not-present")
+ logger.Error(ctx, "meter-referred-in-flow-not-present")
return changed, updated, err
}
@@ -178,8 +179,10 @@
"flow": flow,
"groups": groups,
})
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+
// Revert added flows
- if err := agent.revertAddedFlows(log.WithSpanFromContext(context.Background(), ctx), mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
+ if err := agent.revertAddedFlows(subCtx, mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
logger.Errorw(ctx, "failure-to-delete-flow-after-failed-addition", log.Fields{
"error": err,
"logical-device-id": agent.logicalDeviceID,
@@ -189,6 +192,13 @@
}
// send event
agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["flow-id"] = string(flow.Id)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-add-flow", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
}
}()
}
@@ -198,7 +208,7 @@
// revertAddedFlows reverts flows after the flowAdd request has failed. All flows corresponding to that flowAdd request
// will be reverted, both from the logical devices and the devices.
func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
+ logger.Debugw(ctx, "revert-flow-add", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
flowHandle, have := agent.flowLoader.Lock(addedFlow.Id)
if !have {
@@ -243,7 +253,7 @@
//flowDelete deletes a flow from the flow table of that logical device
func (agent *LogicalAgent) flowDelete(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
- logger.Debug(ctx, "flowDelete")
+ logger.Debug(ctx, "flow-delete")
mod := flowUpdate.FlowMod
if mod == nil {
return nil
@@ -274,7 +284,7 @@
//Delete the matched flows
if len(toDelete) > 0 {
- logger.Debugw(ctx, "flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
+ logger.Debugw(ctx, "flow-delete", log.Fields{"logical-device-id": agent.logicalDeviceID, "to-delete": len(toDelete)})
for _, flow := range toDelete {
if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
@@ -299,7 +309,7 @@
metersConfig, err := agent.GetMeterConfig(ctx, toDelete)
if err != nil { // This should never happen
- logger.Error(ctx, "Meter-referred-in-flows-not-present")
+ logger.Error(ctx, "meter-referred-in-flows-not-present")
return err
}
@@ -336,7 +346,13 @@
go func() {
// Wait for completion
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
- logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-update-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
// TODO: Revert the flow deletion
// send event, and allow any queued events to be sent as well
agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
@@ -350,7 +366,7 @@
//flowDeleteStrict deletes a flow from the flow table of that logical device
func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
mod := flowUpdate.FlowMod
- logger.Debugw(ctx, "flowDeleteStrict", log.Fields{"mod": mod})
+ logger.Debugw(ctx, "flow-delete-strict", log.Fields{"mod": mod})
if mod == nil {
return nil
}
@@ -359,10 +375,10 @@
if err != nil {
return err
}
- logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
+ logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flow-id": flow.Id})
flowHandle, have := agent.flowLoader.Lock(flow.Id)
if !have {
- logger.Debugw(ctx, "Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
+ logger.Debugw(ctx, "skipping-flow-delete-strict-request-no-flow-found", log.Fields{"flow-mod": mod})
return nil
}
defer flowHandle.Unlock()
@@ -421,6 +437,14 @@
// TODO: Revert flow changes
// send event, and allow any queued events to be sent as well
agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["flow-id"] = string(flow.Id)
+ context["device-rules"] = deviceRules.String()
+ // Create context and send extra information as part of it.
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-delete-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
}
}()
@@ -448,7 +472,7 @@
}
func (agent *LogicalAgent) deleteFlowsHavingMeter(ctx context.Context, meterID uint32) error {
- logger.Infow(ctx, "Delete-flows-matching-meter", log.Fields{"meter": meterID})
+ logger.Infow(ctx, "delete-flows-matching-meter", log.Fields{"meter": meterID})
for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
if flowMeterID := fu.GetMeterIdFromFlow(flowHandle.GetReadOnly()); flowMeterID != 0 && flowMeterID == meterID {
@@ -466,7 +490,7 @@
}
func (agent *LogicalAgent) deleteFlowsHavingGroup(ctx context.Context, groupID uint32) (map[uint64]*ofp.OfpFlowStats, error) {
- logger.Infow(ctx, "Delete-flows-matching-group", log.Fields{"groupID": groupID})
+ logger.Infow(ctx, "delete-flows-matching-group", log.Fields{"group-id": groupID})
flowsRemoved := make(map[uint64]*ofp.OfpFlowStats)
for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {