[VOL-5243] Flow audit control

Change-Id: Ia70283da583ea870af078bf78538c1416f5b795c
diff --git a/internal/pkg/controller/addflows.go b/internal/pkg/controller/addflows.go
index 894127b..2f13dbd 100644
--- a/internal/pkg/controller/addflows.go
+++ b/internal/pkg/controller/addflows.go
@@ -93,7 +93,7 @@
 				if err.Error() == ErrDuplicateFlow {
 					dbFlow, _ := aft.device.GetFlow(flow.Cookie)
 					if dbFlow.State == of.FlowAddSuccess {
-						aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+						aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
 						flowsPresent++
 					}
 				}
@@ -108,7 +108,7 @@
 				// aft.device.AddFlowToDb(dbFlow)
 				flowsToProcess[flow.Cookie] = dbFlow
 			}
-			aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+			aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, false)
 		}
 	}
 
@@ -124,7 +124,7 @@
 			for _, flow := range aft.flow.SubFlows {
 				logger.Warnw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
 				if aft.flow.Command == of.CommandDel {
-					aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+					aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
 				}
 			}
 			return nil
@@ -160,7 +160,7 @@
 				}
 				break
 			}
-			aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+			aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, err, true)
 		} else {
 			logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
 		}
diff --git a/internal/pkg/controller/audittables.go b/internal/pkg/controller/audittables.go
index f422d52..d5ce858 100644
--- a/internal/pkg/controller/audittables.go
+++ b/internal/pkg/controller/audittables.go
@@ -267,8 +267,6 @@
 				delete(rcvdFlows, flow.Cookie)
 			}
 			defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
-
-			GetController().ProcessFlowModResultIndication(cntx, defaultSuccessFlowStatus)
 		} else {
 			// The flow exists at the controller but not at the device
 			// Push the flow to the device
@@ -315,7 +313,7 @@
 		if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flow); err != nil {
 			logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Reason": err.Error()})
 		}
-		att.device.triggerFlowResultNotification(cntx, flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err)
+		att.device.triggerFlowResultNotification(cntx, flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err, true)
 	}
 }
 
@@ -336,6 +334,11 @@
 			continue
 		}
 
+		if flag := GetController().IsFlowDelThresholdReached(cntx, strconv.FormatUint(flow.Cookie, 10), att.device.ID); flag {
+			logger.Warnw(ctx, "Flow delete threshold reached, skipping flow delete", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
+			continue
+		}
+
 		logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
 		// Create the flowMod structure and fill it out
 		flowMod := &ofp.OfpFlowMod{}
@@ -362,7 +365,7 @@
 		if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
 			logger.Errorw(ctx, "Flow Audit Delete Failed", log.Fields{"Reason": err.Error()})
 		}
-		att.device.triggerFlowResultNotification(cntx, flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err)
+		att.device.triggerFlowResultNotification(cntx, flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err, true)
 	}
 }
 
@@ -601,13 +604,11 @@
 
 // AddMissingPorts to add the missing ports
 func (att *AuditTablesTask) AddMissingPorts(cntx context.Context, mps map[uint32]*ofp.OfpPort) {
-	logger.Infow(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
+	logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
 
 	addMissingPort := func(mp *ofp.OfpPort) {
 		logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
 
-		// Error is ignored as it only drops duplicate ports
-		logger.Debugw(ctx, "Calling AddPort", log.Fields{"No": mp.PortNo, "Name": mp.Name})
 		if err := att.device.AddPort(cntx, mp); err != nil {
 			logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
 		}
diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go
index c31d892..76e4558 100644
--- a/internal/pkg/controller/controller.go
+++ b/internal/pkg/controller/controller.go
@@ -72,7 +72,9 @@
 	rebootInProgressDevices map[string]string
 	deviceLock              sync.RWMutex
 	rebootLock              sync.Mutex
-	deviceTableSyncDuration time.Duration
+	deviceTableSyncDuration time.Duration // Time interval between each cycle of audit task
+	maxFlowRetryDuration    time.Duration // Maximum duration for which flows will be retried upon failures
+	maxFlowRetryAttempts    uint32        // maxFlowRetryAttempt = maxFlowRetryDuration / deviceTableSyncDuration
 	RebootFlow              bool
 }
 
@@ -100,11 +102,26 @@
 	v.deviceTableSyncDuration = time.Duration(duration) * time.Second
 }
 
+// SetMaxFlowRetryDuration - sets max flow retry interval
+func (v *VoltController) SetMaxFlowRetryDuration(duration int) {
+	v.maxFlowRetryDuration = time.Duration(duration) * time.Second
+}
+
+// SetMaxFlowRetryAttempts - sets max flow retry attempts
+func (v *VoltController) SetMaxFlowRetryAttempts() {
+	v.maxFlowRetryAttempts = uint32((v.maxFlowRetryDuration / v.deviceTableSyncDuration))
+}
+
 // GetDeviceTableSyncDuration - returns configured device table sync duration
 func (v *VoltController) GetDeviceTableSyncDuration() time.Duration {
 	return v.deviceTableSyncDuration
 }
 
+// GetMaxFlowRetryAttempt - returns max flow retry attempst
+func (v *VoltController) GetMaxFlowRetryAttempt() uint32 {
+	return v.maxFlowRetryAttempts
+}
+
 // AddDevice to add device
 func (v *VoltController) AddDevice(cntx context.Context, config *intf.VPClientCfg) intf.IVPClient {
 	d := NewDevice(cntx, config.DeviceID, config.SerialNum, config.VolthaClient, config.SouthBoundID, config.MfrDesc, config.HwDesc, config.SwDesc)
@@ -249,6 +266,11 @@
 	v.app.ProcessFlowModResultIndication(cntx, flowStatus)
 }
 
+// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
+func (v *VoltController) IsFlowDelThresholdReached(cntx context.Context, cookie string, device string) bool {
+	return v.app.IsFlowDelThresholdReached(cntx, cookie, device)
+}
+
 // AddVPAgent to add the vpagent
 func (v *VoltController) AddVPAgent(vep string, vpa *vpagent.VPAgent) {
 	v.vagent[vep] = vpa
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
index 618afae..bdac105 100644
--- a/internal/pkg/controller/device.go
+++ b/internal/pkg/controller/device.go
@@ -1059,12 +1059,12 @@
 	return false
 }
 
-func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
 	flow, _ := d.GetFlow(cookie)
-	d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
+	d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err, sendFlowNotif)
 }
 
-func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
 	statusCode, statusMsg := infraerror.GetErrorInfo(err)
 	success := isFlowOperSuccess(statusCode, oper)
 
@@ -1108,6 +1108,8 @@
 		AdditionalData: bwDetails,
 	}
 
-	logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
-	GetController().ProcessFlowModResultIndication(cntx, flowResult)
+	if sendFlowNotif {
+		logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
+		GetController().ProcessFlowModResultIndication(cntx, flowResult)
+	}
 }
diff --git a/internal/pkg/controller/device_test.go b/internal/pkg/controller/device_test.go
index 8f35aef..7089c64 100644
--- a/internal/pkg/controller/device_test.go
+++ b/internal/pkg/controller/device_test.go
@@ -170,7 +170,7 @@
 			db = dbintf
 			dbintf.EXPECT().PutFlow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
 			appMock.EXPECT().ProcessFlowModResultIndication(gomock.Any(), gomock.Any()).Times(1)
-			d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err)
+			d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err, false)
 		})
 	}
 }