[VOL-5243] Flow audit control

Change-Id: Ia70283da583ea870af078bf78538c1416f5b795c
diff --git a/internal/pkg/application/application.go b/internal/pkg/application/application.go
index 51c13ea..371cd84 100644
--- a/internal/pkg/application/application.go
+++ b/internal/pkg/application/application.go
@@ -1767,6 +1767,35 @@
 	}
 }
 
+// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
+func (va *VoltApplication) IsFlowDelThresholdReached(cntx context.Context, cookie string, device string) bool {
+	logger.Debugw(ctx, "Check flow delete threshold", log.Fields{"Cookie": cookie, "Device": device})
+	d := va.GetDevice(device)
+	if d == nil {
+		logger.Warnw(ctx, "Failed to get device during flow delete threshold check", log.Fields{"Cookie": cookie, "Device": device})
+		return false
+	}
+
+	flowEventMap, err := d.GetFlowEventRegister(of.CommandDel)
+	if err != nil {
+		logger.Warnw(ctx, "Flow event map does not exists", log.Fields{"flowMod": of.CommandDel, "Error": err})
+		return false
+	}
+	flowEventMap.MapLock.Lock()
+	var event interface{}
+	if event, _ = flowEventMap.Get(cookie); event == nil {
+		logger.Warnw(ctx, "Event does not exist during flow delete threshold check", log.Fields{"Cookie": cookie})
+		flowEventMap.MapLock.Unlock()
+		return false
+	}
+	flowEventMap.MapLock.Unlock()
+	flowEvent := event.(*FlowEvent)
+	vs := flowEvent.eventData.(*VoltService)
+	vs.ServiceLock.RLock()
+	defer vs.ServiceLock.RUnlock()
+	return vs.FlowPushCount[cookie] == controller.GetController().GetMaxFlowRetryAttempt()
+}
+
 func pushFlowFailureNotif(flowStatus intf.FlowStatus) {
 	subFlow := flowStatus.Flow
 	cookie := subFlow.Cookie
diff --git a/internal/pkg/application/flowevent.go b/internal/pkg/application/flowevent.go
index 577cdca..63123da 100644
--- a/internal/pkg/application/flowevent.go
+++ b/internal/pkg/application/flowevent.go
@@ -19,6 +19,7 @@
 	"context"
 
 	infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
+	"voltha-go-controller/internal/pkg/util"
 
 	"voltha-go-controller/internal/pkg/intf"
 	"voltha-go-controller/log"
@@ -31,7 +32,7 @@
 type FlowEventType string
 
 // FlowEventHandler - Func prototype for flow event handling funcs
-type FlowEventHandler func(context.Context, *FlowEvent, intf.FlowStatus)
+type FlowEventHandler func(context.Context, *FlowEvent, intf.FlowStatus, *util.ConcurrentMap)
 
 var eventMapper map[FlowEventType]FlowEventHandler
 
@@ -92,15 +93,19 @@
 		flowEventMap.MapLock.Unlock()
 		return false
 	}
-	flowEventMap.Remove(cookie)
 	flowEventMap.MapLock.Unlock()
 	flowEvent := event.(*FlowEvent)
-	eventMapper[flowEvent.eType](cntx, flowEvent, flowStatus)
+	if flowEvent.eType != EventTypeServiceFlowAdded && flowEvent.eType != EventTypeServiceFlowRemoved {
+		flowEventMap.MapLock.Lock()
+		flowEventMap.Remove(cookie)
+		flowEventMap.MapLock.Unlock()
+	}
+	eventMapper[flowEvent.eType](cntx, flowEvent, flowStatus, flowEventMap)
 	return true
 }
 
 // ProcessUsIgmpFlowAddEvent - Process Us Igmp Flow event trigger
-func ProcessUsIgmpFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessUsIgmpFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
 	logger.Infow(ctx, "Processing Post Flow Add Event for US Igmp", log.Fields{"Cookie": event.cookie, "event": event})
 	vpv := event.eventData.(*VoltPortVnet)
 	if isFlowStatusSuccess(flowStatus.Status, true) {
@@ -111,18 +116,18 @@
 }
 
 // ProcessServiceFlowAddEvent - Process Service Flow event trigger
-func ProcessServiceFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessServiceFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
 	logger.Infow(ctx, "Processing Post Flow Add Event for Service", log.Fields{"Cookie": event.cookie, "event": event})
 	vs := event.eventData.(*VoltService)
 	if isFlowStatusSuccess(flowStatus.Status, true) {
-		vs.FlowInstallSuccess(cntx, event.cookie, flowStatus.AdditionalData)
+		vs.FlowInstallSuccess(cntx, event.cookie, flowStatus.AdditionalData, flowEventMap)
 	} else {
-		vs.FlowInstallFailure(event.cookie, flowStatus.Status, flowStatus.Reason)
+		vs.FlowInstallFailure(cntx, event.cookie, flowStatus.Status, flowStatus.Reason, flowEventMap)
 	}
 }
 
 // ProcessControlFlowAddEvent - Process Control Flow event trigger
-func ProcessControlFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessControlFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
 	logger.Infow(ctx, "Processing Post Flow Add Event for VPV", log.Fields{"Cookie": event.cookie, "event": event})
 	vpv := event.eventData.(*VoltPortVnet)
 	if !isFlowStatusSuccess(flowStatus.Status, true) {
@@ -131,18 +136,18 @@
 }
 
 // ProcessServiceFlowDelEvent - Process Service Flow event trigger
-func ProcessServiceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessServiceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
 	logger.Infow(ctx, "Processing Post Flow Remove Event for Service", log.Fields{"Cookie": event.cookie, "event": event})
 	vs := event.eventData.(*VoltService)
 	if isFlowStatusSuccess(flowStatus.Status, false) {
-		vs.FlowRemoveSuccess(cntx, event.cookie)
+		vs.FlowRemoveSuccess(cntx, event.cookie, flowEventMap)
 	} else {
-		vs.FlowRemoveFailure(cntx, event.cookie, flowStatus.Status, flowStatus.Reason)
+		vs.FlowRemoveFailure(cntx, event.cookie, flowStatus.Status, flowStatus.Reason, flowEventMap)
 	}
 }
 
 // ProcessControlFlowDelEvent - Process Control Flow event trigger
-func ProcessControlFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessControlFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
 	logger.Infow(ctx, "Processing Post Flow Remove Event for VPV", log.Fields{"Cookie": event.cookie, "event": event})
 	vpv := event.eventData.(*VoltPortVnet)
 	if isFlowStatusSuccess(flowStatus.Status, false) {
@@ -153,7 +158,7 @@
 }
 
 // ProcessMcastFlowDelEvent - Process Control Flow event trigger
-func ProcessMcastFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessMcastFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
 	logger.Infow(ctx, "Processing Post Flow Remove Event for Mcast/Igmp", log.Fields{"Cookie": event.cookie, "event": event})
 	mvp := event.eventData.(*MvlanProfile)
 	if isFlowStatusSuccess(flowStatus.Status, false) {
@@ -164,7 +169,7 @@
 }
 
 // ProcessDeviceFlowDelEvent - Process Control Flow event trigger
-func ProcessDeviceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessDeviceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
 	logger.Debugw(ctx, "Processing Post Flow Remove Event for VNET", log.Fields{"Cookie": event.cookie, "event": event})
 	vnet := event.eventData.(*VoltVnet)
 	if isFlowStatusSuccess(flowStatus.Status, false) {
diff --git a/internal/pkg/application/flowevent_test.go b/internal/pkg/application/flowevent_test.go
index b6b8738..6bd5da2 100644
--- a/internal/pkg/application/flowevent_test.go
+++ b/internal/pkg/application/flowevent_test.go
@@ -150,16 +150,24 @@
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			ProcessUsIgmpFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+			ProcessUsIgmpFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
 		})
 	}
 }
 
 func TestProcessServiceFlowAddEvent(t *testing.T) {
 	type args struct {
-		cntx       context.Context
-		event      *FlowEvent
-		flowStatus intf.FlowStatus
+		cntx         context.Context
+		event        *FlowEvent
+		flowStatus   intf.FlowStatus
+		flowEventMap *util.ConcurrentMap
+	}
+
+	flowPushCountMap := make(map[string]uint32)
+	vs := &VoltService{
+		VoltServiceCfg: VoltServiceCfg{
+			FlowPushCount: flowPushCountMap,
+		},
 	}
 
 	tests := []struct {
@@ -172,8 +180,9 @@
 				cntx: context.Background(),
 				event: &FlowEvent{
 					device:    "test_device",
-					eventData: voltService,
+					eventData: vs,
 				},
+				flowEventMap: util.NewConcurrentMap(),
 			},
 		},
 		{
@@ -182,17 +191,18 @@
 				cntx: context.Background(),
 				event: &FlowEvent{
 					device:    "test_device",
-					eventData: voltService,
+					eventData: vs,
 				},
 				flowStatus: intf.FlowStatus{
 					Status: uint32(1001),
 				},
+				flowEventMap: util.NewConcurrentMap(),
 			},
 		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			ProcessServiceFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+			ProcessServiceFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, tt.args.flowEventMap)
 		})
 	}
 }
@@ -231,17 +241,26 @@
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			ProcessControlFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+			ProcessControlFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
 		})
 	}
 }
 
 func TestProcessServiceFlowDelEvent(t *testing.T) {
 	type args struct {
-		cntx       context.Context
-		event      *FlowEvent
-		flowStatus intf.FlowStatus
+		cntx         context.Context
+		event        *FlowEvent
+		flowStatus   intf.FlowStatus
+		flowEventMap *util.ConcurrentMap
 	}
+
+	flowPushCountMap := make(map[string]uint32)
+	vs := &VoltService{
+		VoltServiceCfg: VoltServiceCfg{
+			FlowPushCount: flowPushCountMap,
+		},
+	}
+
 	tests := []struct {
 		name string
 		args args
@@ -251,8 +270,9 @@
 			args: args{
 				cntx: context.Background(),
 				event: &FlowEvent{
-					eventData: voltService,
+					eventData: vs,
 				},
+				flowEventMap: util.NewConcurrentMap(),
 			},
 		},
 		{
@@ -260,11 +280,12 @@
 			args: args{
 				cntx: context.Background(),
 				event: &FlowEvent{
-					eventData: voltService,
+					eventData: vs,
 				},
 				flowStatus: intf.FlowStatus{
 					Status: uint32(1001),
 				},
+				flowEventMap: util.NewConcurrentMap(),
 			},
 		},
 	}
@@ -273,7 +294,7 @@
 			dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
 			db = dbintf
 			dbintf.EXPECT().PutService(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
-			ProcessServiceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+			ProcessServiceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, tt.args.flowEventMap)
 		})
 	}
 }
@@ -315,7 +336,7 @@
 			dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
 			db = dbintf
 			dbintf.EXPECT().PutVpv(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
-			ProcessControlFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+			ProcessControlFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
 		})
 	}
 }
@@ -360,7 +381,7 @@
 			dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
 			db = dbintf
 			dbintf.EXPECT().PutMvlan(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
-			ProcessMcastFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+			ProcessMcastFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
 		})
 	}
 }
@@ -410,9 +431,9 @@
 				dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
 				db = dbintf
 				dbintf.EXPECT().PutVnet(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil).AnyTimes()
-				ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+				ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
 			case "ProcessDeviceFlowDelEvent_else_condition":
-				ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+				ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
 			}
 		})
 	}
diff --git a/internal/pkg/application/service.go b/internal/pkg/application/service.go
index d93c8f4..38d41c2 100644
--- a/internal/pkg/application/service.go
+++ b/internal/pkg/application/service.go
@@ -117,6 +117,8 @@
 	AllowTransparent           bool
 	EnableMulticastKPI         bool
 	IsActivated                bool
+	FlowPushCount              map[string]uint32   // Tracks the number of flow install/delete failure attempts per cookie in order to throttle flow auditing
+	ServiceDeactivateReason    SvcDeactivateReason // Mentions why the service was deactivated
 }
 
 // VoltServiceOper structure
@@ -164,6 +166,18 @@
 	ServiceVlanUpdate ServiceTrigger = 1
 )
 
+// SvcDeactivateReason - Reason for service deactivation
+type SvcDeactivateReason uint8
+
+const (
+	// Service deactivated reason - none
+	SvcDeacRsn_None SvcDeactivateReason = 0
+	// Service deactivate reason - NB
+	SvcDeacRsn_NB SvcDeactivateReason = 1
+	// Service deactivate reason - Controller
+	SvcDeacRsn_Controller SvcDeactivateReason = 2
+)
+
 // AppMutexes structure
 type AppMutexes struct {
 	ServiceDataMutex sync.Mutex `json:"-"`
@@ -187,13 +201,16 @@
 	vs.DsHSIAFlowsApplied = false
 	vs.DeleteInProgress = false
 	vs.DeactivateInProgress = false
+	vs.ServiceDeactivateReason = SvcDeacRsn_None
 	//vs.MacAddr, _ = net.ParseMAC("00:00:00:00:00:00")
+
 	vs.IsOption82Enabled = cfg.IsOption82Enabled
 	vs.MacAddr = cfg.MacAddr
 	vs.Ipv4Addr = net.ParseIP("0.0.0.0")
 	vs.Ipv6Addr = net.ParseIP("::")
 	vs.PendingFlows = make(map[string]bool)
 	vs.AssociatedFlows = make(map[string]bool)
+	vs.FlowPushCount = make(map[string]uint32)
 	return &vs
 }
 
@@ -1037,6 +1054,8 @@
 		vs.DeactivateInProgress = oper.DeactivateInProgress
 		vs.BwAvailInfo = oper.BwAvailInfo
 		vs.Device = oper.Device
+		vs.FlowPushCount = cfg.FlowPushCount
+		vs.ServiceDeactivateReason = cfg.ServiceDeactivateReason
 	} else {
 		// Sorting Pbit from highest
 		sort.Slice(vs.Pbits, func(i, j int) bool {
@@ -1274,8 +1293,12 @@
 
 // FlowInstallSuccess - Called when corresponding service flow installation is success
 // If no more pending flows, HSIA indication wil be triggered
-func (vs *VoltService) FlowInstallSuccess(cntx context.Context, cookie string, bwAvailInfo of.BwAvailDetails) {
+func (vs *VoltService) FlowInstallSuccess(cntx context.Context, cookie string, bwAvailInfo of.BwAvailDetails, flowEventMap *util.ConcurrentMap) {
 	logger.Debugw(ctx, "Flow Add Success Notification", log.Fields{"Cookie": cookie, "bwAvailInfo": bwAvailInfo, "Service": vs.Name})
+	flowEventMap.MapLock.Lock()
+	flowEventMap.Remove(cookie)
+	flowEventMap.MapLock.Unlock()
+
 	if vs.DeleteInProgress {
 		logger.Warnw(ctx, "Skipping Flow Add Success Notification. Service deletion in-progress", log.Fields{"Cookie": cookie, "Service": vs.Name})
 		return
@@ -1290,6 +1313,7 @@
 
 	delete(vs.PendingFlows, cookie)
 	vs.AssociatedFlows[cookie] = true
+	vs.FlowPushCount[cookie] = 0
 	vs.ServiceLock.Unlock()
 	var prevBwAvail, presentBwAvail string
 	if bwAvailInfo.PrevBw != "" && bwAvailInfo.PresentBw != "" {
@@ -1322,10 +1346,19 @@
 
 // FlowInstallFailure - Called when corresponding service flow installation is failed
 // Trigger service failure indication to NB
-func (vs *VoltService) FlowInstallFailure(cookie string, errorCode uint32, errReason string) {
-	vs.ServiceLock.RLock()
-
+func (vs *VoltService) FlowInstallFailure(cntx context.Context, cookie string, errorCode uint32, errReason string, flowEventMap *util.ConcurrentMap) {
 	logger.Debugw(ctx, "Service flow installation failure", log.Fields{"Service": vs.Name, "Cookie": cookie, "errorCode": errorCode, "errReason": errReason})
+
+	isServiceDeactivated := vs.CheckAndDeactivateService(cntx, cookie)
+	if isServiceDeactivated {
+		flowEventMap.MapLock.Lock()
+		for ck := range vs.PendingFlows {
+			flowEventMap.Remove(ck)
+		}
+		flowEventMap.MapLock.Unlock()
+	}
+
+	vs.ServiceLock.RLock()
 	if _, ok := vs.PendingFlows[cookie]; !ok {
 		logger.Errorw(ctx, "Flow Add Failure for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie})
 		vs.ServiceLock.RUnlock()
@@ -1369,13 +1402,54 @@
 	}
 }
 
+// CheckAndDeactivateService - deactivate service and remove flows from DB, if the max flows retry attempt has reached
+func (vs *VoltService) CheckAndDeactivateService(cntx context.Context, cookie string) bool {
+	vs.ServiceLock.Lock()
+	logger.Debugw(ctx, "Check and Deactivate service if flow install threshold is reached and remove flows from DB/Device", log.Fields{"serviceName": vs.Name, "FlowPushCount": vs.FlowPushCount[cookie]})
+	vs.FlowPushCount[cookie]++
+	if vs.FlowPushCount[cookie] == controller.GetController().GetMaxFlowRetryAttempt() {
+		if vs.IsActivated {
+			vs.ServiceLock.Unlock()
+			device, err := GetApplication().GetDeviceFromPort(vs.Port)
+			if err != nil {
+				// Even if the port/device does not exists at this point in time, the deactivate request is succss.
+				// So no error is returned
+				logger.Warnw(ctx, "Error Getting Device", log.Fields{"Reason": err.Error(), "Port": vs.Port})
+				return false
+			}
+			vs.SetSvcDeactivationFlags(SvcDeacRsn_Controller)
+			GetApplication().ServiceByName.Store(vs.Name, vs)
+			p := device.GetPort(vs.Port)
+			if p != nil && (p.State == PortStateUp) {
+				if vpv := GetApplication().GetVnetByPort(vs.Port, vs.SVlan, vs.CVlan, vs.UniVlan); vpv != nil {
+					// Port down call internally deletes all the flows
+					vpv.PortDownInd(cntx, vs.Device, vs.Port, true, true)
+				} else {
+					logger.Warnw(ctx, "VPV does not exists!!!", log.Fields{"Device": vs.Device, "port": vs.Port, "SvcName": vs.Name})
+				}
+			}
+			vs.DeactivateInProgress = false
+			GetApplication().ServiceByName.Store(vs.Name, vs)
+			vs.WriteToDb(cntx)
+			logger.Infow(ctx, "Service deactivated after max flow install attempts", log.Fields{"SvcName": vs.Name, "Cookie": cookie})
+			return true
+		}
+	}
+	vs.ServiceLock.Unlock()
+	return false
+}
+
 // FlowRemoveSuccess - Called when corresponding service flow removal is success
 // If no more associated flows, DelHSIA indication wil be triggered
-func (vs *VoltService) FlowRemoveSuccess(cntx context.Context, cookie string) {
+func (vs *VoltService) FlowRemoveSuccess(cntx context.Context, cookie string, flowEventMap *util.ConcurrentMap) {
 	// if vs.DeleteInProgress {
 	// 	logger.Warnw(ctx, "Skipping Flow Remove Success Notification. Service deletion in-progress", log.Fields{"Cookie": cookie, "Service": vs.Name})
 	// 	return
 	// }
+	flowEventMap.MapLock.Lock()
+	flowEventMap.Remove(cookie)
+	flowEventMap.MapLock.Unlock()
+
 	vs.ServiceLock.Lock()
 	logger.Infow(ctx, "Processing Service Flow Remove Success Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
 
@@ -1387,6 +1461,7 @@
 		logger.Debugw(ctx, "Service Flow Remove Success for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie, "AssociatedFlows": vs.AssociatedFlows, "PendingFlows": vs.PendingFlows})
 	}
 
+	vs.FlowPushCount[cookie] = 0
 	vs.ServiceLock.Unlock()
 
 	vs.WriteToDb(cntx)
@@ -1416,19 +1491,20 @@
 
 // FlowRemoveFailure - Called when corresponding service flow installation is failed
 // Trigger service failure indication to NB
-func (vs *VoltService) FlowRemoveFailure(cntx context.Context, cookie string, errorCode uint32, errReason string) {
-	vs.ServiceLock.RLock()
-	logger.Debugw(ctx, "Processing Service Flow Remove Failure Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
+func (vs *VoltService) FlowRemoveFailure(cntx context.Context, cookie string, errorCode uint32, errReason string, flowEventMap *util.ConcurrentMap) {
+	vs.ServiceLock.Lock()
+	logger.Debugw(ctx, "Processing Service Flow Remove Failure Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied, "FlowPushCount": vs.FlowPushCount[cookie]})
 
+	vs.FlowPushCount[cookie]++
 	if _, ok := vs.AssociatedFlows[cookie]; !ok {
 		logger.Warnw(ctx, "Flow Failure for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie})
-		vs.ServiceLock.RUnlock()
+		vs.ServiceLock.Unlock()
 		return
 	}
 	if vs.DeleteInProgress {
 		delete(vs.AssociatedFlows, cookie)
 	}
-	vs.ServiceLock.RUnlock()
+	vs.ServiceLock.Unlock()
 	logger.Debugw(ctx, "Service Flow Remove Failure Notification", log.Fields{"uniPort": vs.Port, "Cookie": cookie, "Service": vs.Name, "ErrorCode": errorCode, "ErrorReason": errReason})
 
 	vs.triggerServiceFailureInd(errorCode, errReason)
@@ -2115,20 +2191,20 @@
 		if deviceID == DeviceAny {
 			deviceID = device.Name
 		} else if deviceID != device.Name {
-			logger.Errorw(ctx, "Wrong Device ID", log.Fields{"Device": deviceID, "Port": portNo})
-			return errorCodes.ErrDeviceNotFound
+			err := errorCodes.ErrDeviceNotFound
+			return fmt.Errorf("wrong device id %s : %w", deviceID, err)
 		}
 	}
 	va.ServiceByName.Range(func(key, value interface{}) bool {
 		vs := value.(*VoltService)
 		// If svlan if provided, then the tags and tpID of service has to be matching
 		if sVlan != of.VlanNone && (sVlan != vs.SVlan || cVlan != vs.CVlan || tpID != vs.TechProfileID) {
-			logger.Infow(ctx, "Service Activate Request Does not match", log.Fields{"Device": deviceID, "voltService": vs})
+			logger.Warnw(ctx, "Service Activate Request Does not match", log.Fields{"Device": deviceID, "voltService": vs})
 			return true
 		}
 		if portNo == vs.Port && !vs.IsActivated {
 			// Mark the service as activated, so that we can push the flows later when the port is added by voltha
-			logger.Infow(ctx, "Service Activate", log.Fields{"Name": vs.Name})
+			logger.Debugw(ctx, "Service Activate", log.Fields{"Name": vs.Name})
 			vs.IsActivated = true
 			va.ServiceByName.Store(vs.Name, vs)
 			vs.WriteToDb(cntx)
@@ -2156,6 +2232,12 @@
 	return nil
 }
 
+func (vs *VoltService) SetSvcDeactivationFlags(deactivateRsn SvcDeactivateReason) {
+	vs.DeactivateInProgress = true
+	vs.IsActivated = false
+	vs.ServiceDeactivateReason = deactivateRsn
+}
+
 // DeactivateService to activate pre-provisioned service
 func (va *VoltApplication) DeactivateService(cntx context.Context, deviceID, portNo string, sVlan, cVlan of.VlanType, tpID uint16) error {
 	logger.Infow(ctx, "Service Deactivate Request ", log.Fields{"Device": deviceID, "Port": portNo, "Svaln": sVlan, "Cvlan": cVlan, "TpID": tpID})
@@ -2169,8 +2251,7 @@
 			return true
 		}
 		if portNo == vs.Port && vs.IsActivated {
-			vs.IsActivated = false
-			vs.DeactivateInProgress = true
+			vs.SetSvcDeactivationFlags(SvcDeacRsn_NB)
 			va.ServiceByName.Store(vs.Name, vs)
 			vs.WriteToDb(cntx)
 			device, err := va.GetDeviceFromPort(portNo)
diff --git a/internal/pkg/application/service_test.go b/internal/pkg/application/service_test.go
index 83e39d7..13201ff 100644
--- a/internal/pkg/application/service_test.go
+++ b/internal/pkg/application/service_test.go
@@ -224,22 +224,30 @@
 			switch tt.name {
 			case "VoltService_FlowRemoveFailure":
 				associatedFlows := map[string]bool{}
+				flowPushCountMap := map[string]uint32{}
 				associatedFlows["test_cookie"] = true
 				vs := &VoltService{
 					VoltServiceOper: VoltServiceOper{
 						AssociatedFlows: associatedFlows,
 					},
+					VoltServiceCfg: VoltServiceCfg{
+						FlowPushCount: flowPushCountMap,
+					},
 				}
-				vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason)
+				vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
 			case "cookie_not_found":
 				associatedFlows := map[string]bool{}
+				flowPushCountMap := map[string]uint32{}
 				associatedFlows["cookie"] = true
 				vs := &VoltService{
 					VoltServiceOper: VoltServiceOper{
 						AssociatedFlows: associatedFlows,
 					},
+					VoltServiceCfg: VoltServiceCfg{
+						FlowPushCount: flowPushCountMap,
+					},
 				}
-				vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason)
+				vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
 			}
 		})
 	}
@@ -525,9 +533,10 @@
 
 func TestVoltService_FlowInstallSuccess(t *testing.T) {
 	type args struct {
-		cntx        context.Context
-		cookie      string
-		bwAvailInfo of.BwAvailDetails
+		cntx         context.Context
+		cookie       string
+		bwAvailInfo  of.BwAvailDetails
+		flowEventMap *util.ConcurrentMap
 	}
 	tests := []struct {
 		name string
@@ -542,6 +551,7 @@
 					PrevBw:    "test_prev_BW",
 					PresentBw: "test_present_BW",
 				},
+				flowEventMap: util.NewConcurrentMap(),
 			},
 		},
 	}
@@ -550,6 +560,7 @@
 			pendingFlows := map[string]bool{}
 			pendingFlows["test_cookie"] = true
 			associatedFlows := map[string]bool{}
+			flowPushCountMap := map[string]uint32{}
 			associatedFlows["test_cookie"] = true
 			vs := &VoltService{
 				VoltServiceOper: VoltServiceOper{
@@ -558,13 +569,14 @@
 					DsHSIAFlowsApplied: true,
 				},
 				VoltServiceCfg: VoltServiceCfg{
-					Port: "test_port",
+					Port:          "test_port",
+					FlowPushCount: flowPushCountMap,
 				},
 			}
 			ga := GetApplication()
 			ga.PortsDisc.Store("test_port", voltPort)
 			ga.DevicesDisc.Store(test_device, voltDevice)
-			vs.FlowInstallSuccess(tt.args.cntx, tt.args.cookie, tt.args.bwAvailInfo)
+			vs.FlowInstallSuccess(tt.args.cntx, tt.args.cookie, tt.args.bwAvailInfo, tt.args.flowEventMap)
 		})
 	}
 }
@@ -1065,9 +1077,6 @@
 				if err := va.ActivateService(tt.args.cntx, tt.args.deviceID, tt.args.portNo, tt.args.sVlan, tt.args.cVlan, tt.args.tpID); (err != nil) != tt.wantErr {
 					t.Errorf("VoltApplication.ActivateService() error = %v, wantErr %v", err, tt.wantErr)
 				}
-			case GetDeviceFromPort_error:
-				err := va.ActivateService(tt.args.cntx, tt.args.deviceID, tt.args.portNo, tt.args.sVlan, tt.args.cVlan, tt.args.tpID)
-				assert.Nil(t, err)
 			case "deviceID != device.Name":
 				var voltPortTest1 = &VoltPort{
 					Name:   "test_name",
@@ -2316,6 +2325,7 @@
 
 func TestVoltService_FlowInstallFailure(t *testing.T) {
 	type args struct {
+		cntx      context.Context
 		cookie    string
 		errorCode uint32
 		errReason string
@@ -2327,6 +2337,7 @@
 		{
 			name: "VoltService_FlowInstallFailure",
 			args: args{
+				cntx:      context.Background(),
 				cookie:    "test_cookie",
 				errorCode: uint32(1),
 				errReason: "err_reason",
@@ -2335,6 +2346,7 @@
 		{
 			name: "PendingFlows[cookie]_false",
 			args: args{
+				cntx:      context.Background(),
 				cookie:    "test_cookie",
 				errorCode: uint32(1),
 				errReason: "err_reason",
@@ -2345,15 +2357,19 @@
 	pendingFlows["test_cookie"] = true
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			flowPushCountMap := map[string]uint32{}
 			vs := &VoltService{
 				VoltServiceOper: VoltServiceOper{},
+				VoltServiceCfg: VoltServiceCfg{
+					FlowPushCount: flowPushCountMap,
+				},
 			}
 			switch tt.name {
 			case "VoltService_FlowInstallFailure":
 				vs.PendingFlows = pendingFlows
-				vs.FlowInstallFailure(tt.args.cookie, tt.args.errorCode, tt.args.errReason)
+				vs.FlowInstallFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
 			case "PendingFlows[cookie]_false":
-				vs.FlowInstallFailure(tt.args.cookie, tt.args.errorCode, tt.args.errReason)
+				vs.FlowInstallFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
 			}
 		})
 	}
@@ -2361,8 +2377,9 @@
 
 func TestVoltService_FlowRemoveSuccess(t *testing.T) {
 	type args struct {
-		cntx   context.Context
-		cookie string
+		cntx         context.Context
+		cookie       string
+		flowEventMap *util.ConcurrentMap
 	}
 	tests := []struct {
 		name string
@@ -2371,17 +2388,22 @@
 		{
 			name: "GetDevice != nil",
 			args: args{
-				cntx:   context.Background(),
-				cookie: "test_cookie",
+				cntx:         context.Background(),
+				cookie:       "test_cookie",
+				flowEventMap: util.NewConcurrentMap(),
 			},
 		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			flowPushCountMap := map[string]uint32{}
 			vs := &VoltService{
 				VoltServiceOper: VoltServiceOper{
 					Device: test_device,
 				},
+				VoltServiceCfg: VoltServiceCfg{
+					FlowPushCount: flowPushCountMap,
+				},
 			}
 			ga := GetApplication()
 			ga.DevicesDisc.Store(test_device, voltDevice2)
@@ -2389,7 +2411,7 @@
 			dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
 			db = dbintf
 			dbintf.EXPECT().PutService(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
-			vs.FlowRemoveSuccess(tt.args.cntx, tt.args.cookie)
+			vs.FlowRemoveSuccess(tt.args.cntx, tt.args.cookie, tt.args.flowEventMap)
 		})
 	}
 }
diff --git a/internal/pkg/controller/addflows.go b/internal/pkg/controller/addflows.go
index 894127b..2f13dbd 100644
--- a/internal/pkg/controller/addflows.go
+++ b/internal/pkg/controller/addflows.go
@@ -93,7 +93,7 @@
 				if err.Error() == ErrDuplicateFlow {
 					dbFlow, _ := aft.device.GetFlow(flow.Cookie)
 					if dbFlow.State == of.FlowAddSuccess {
-						aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+						aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
 						flowsPresent++
 					}
 				}
@@ -108,7 +108,7 @@
 				// aft.device.AddFlowToDb(dbFlow)
 				flowsToProcess[flow.Cookie] = dbFlow
 			}
-			aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+			aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, false)
 		}
 	}
 
@@ -124,7 +124,7 @@
 			for _, flow := range aft.flow.SubFlows {
 				logger.Warnw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
 				if aft.flow.Command == of.CommandDel {
-					aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+					aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
 				}
 			}
 			return nil
@@ -160,7 +160,7 @@
 				}
 				break
 			}
-			aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+			aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, err, true)
 		} else {
 			logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
 		}
diff --git a/internal/pkg/controller/audittables.go b/internal/pkg/controller/audittables.go
index f422d52..d5ce858 100644
--- a/internal/pkg/controller/audittables.go
+++ b/internal/pkg/controller/audittables.go
@@ -267,8 +267,6 @@
 				delete(rcvdFlows, flow.Cookie)
 			}
 			defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
-
-			GetController().ProcessFlowModResultIndication(cntx, defaultSuccessFlowStatus)
 		} else {
 			// The flow exists at the controller but not at the device
 			// Push the flow to the device
@@ -315,7 +313,7 @@
 		if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flow); err != nil {
 			logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Reason": err.Error()})
 		}
-		att.device.triggerFlowResultNotification(cntx, flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err)
+		att.device.triggerFlowResultNotification(cntx, flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err, true)
 	}
 }
 
@@ -336,6 +334,11 @@
 			continue
 		}
 
+		if flag := GetController().IsFlowDelThresholdReached(cntx, strconv.FormatUint(flow.Cookie, 10), att.device.ID); flag {
+			logger.Warnw(ctx, "Flow delete threshold reached, skipping flow delete", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
+			continue
+		}
+
 		logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
 		// Create the flowMod structure and fill it out
 		flowMod := &ofp.OfpFlowMod{}
@@ -362,7 +365,7 @@
 		if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
 			logger.Errorw(ctx, "Flow Audit Delete Failed", log.Fields{"Reason": err.Error()})
 		}
-		att.device.triggerFlowResultNotification(cntx, flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err)
+		att.device.triggerFlowResultNotification(cntx, flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err, true)
 	}
 }
 
@@ -601,13 +604,11 @@
 
 // AddMissingPorts to add the missing ports
 func (att *AuditTablesTask) AddMissingPorts(cntx context.Context, mps map[uint32]*ofp.OfpPort) {
-	logger.Infow(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
+	logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
 
 	addMissingPort := func(mp *ofp.OfpPort) {
 		logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
 
-		// Error is ignored as it only drops duplicate ports
-		logger.Debugw(ctx, "Calling AddPort", log.Fields{"No": mp.PortNo, "Name": mp.Name})
 		if err := att.device.AddPort(cntx, mp); err != nil {
 			logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
 		}
diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go
index c31d892..76e4558 100644
--- a/internal/pkg/controller/controller.go
+++ b/internal/pkg/controller/controller.go
@@ -72,7 +72,9 @@
 	rebootInProgressDevices map[string]string
 	deviceLock              sync.RWMutex
 	rebootLock              sync.Mutex
-	deviceTableSyncDuration time.Duration
+	deviceTableSyncDuration time.Duration // Time interval between each cycle of audit task
+	maxFlowRetryDuration    time.Duration // Maximum duration for which flows will be retried upon failures
+	maxFlowRetryAttempts    uint32        // maxFlowRetryAttempt = maxFlowRetryDuration / deviceTableSyncDuration
 	RebootFlow              bool
 }
 
@@ -100,11 +102,26 @@
 	v.deviceTableSyncDuration = time.Duration(duration) * time.Second
 }
 
+// SetMaxFlowRetryDuration - sets max flow retry interval
+func (v *VoltController) SetMaxFlowRetryDuration(duration int) {
+	v.maxFlowRetryDuration = time.Duration(duration) * time.Second
+}
+
+// SetMaxFlowRetryAttempts - sets max flow retry attempts
+func (v *VoltController) SetMaxFlowRetryAttempts() {
+	v.maxFlowRetryAttempts = uint32((v.maxFlowRetryDuration / v.deviceTableSyncDuration))
+}
+
 // GetDeviceTableSyncDuration - returns configured device table sync duration
 func (v *VoltController) GetDeviceTableSyncDuration() time.Duration {
 	return v.deviceTableSyncDuration
 }
 
+// GetMaxFlowRetryAttempt - returns max flow retry attempst
+func (v *VoltController) GetMaxFlowRetryAttempt() uint32 {
+	return v.maxFlowRetryAttempts
+}
+
 // AddDevice to add device
 func (v *VoltController) AddDevice(cntx context.Context, config *intf.VPClientCfg) intf.IVPClient {
 	d := NewDevice(cntx, config.DeviceID, config.SerialNum, config.VolthaClient, config.SouthBoundID, config.MfrDesc, config.HwDesc, config.SwDesc)
@@ -249,6 +266,11 @@
 	v.app.ProcessFlowModResultIndication(cntx, flowStatus)
 }
 
+// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
+func (v *VoltController) IsFlowDelThresholdReached(cntx context.Context, cookie string, device string) bool {
+	return v.app.IsFlowDelThresholdReached(cntx, cookie, device)
+}
+
 // AddVPAgent to add the vpagent
 func (v *VoltController) AddVPAgent(vep string, vpa *vpagent.VPAgent) {
 	v.vagent[vep] = vpa
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
index 618afae..bdac105 100644
--- a/internal/pkg/controller/device.go
+++ b/internal/pkg/controller/device.go
@@ -1059,12 +1059,12 @@
 	return false
 }
 
-func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
 	flow, _ := d.GetFlow(cookie)
-	d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
+	d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err, sendFlowNotif)
 }
 
-func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
 	statusCode, statusMsg := infraerror.GetErrorInfo(err)
 	success := isFlowOperSuccess(statusCode, oper)
 
@@ -1108,6 +1108,8 @@
 		AdditionalData: bwDetails,
 	}
 
-	logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
-	GetController().ProcessFlowModResultIndication(cntx, flowResult)
+	if sendFlowNotif {
+		logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
+		GetController().ProcessFlowModResultIndication(cntx, flowResult)
+	}
 }
diff --git a/internal/pkg/controller/device_test.go b/internal/pkg/controller/device_test.go
index 8f35aef..7089c64 100644
--- a/internal/pkg/controller/device_test.go
+++ b/internal/pkg/controller/device_test.go
@@ -170,7 +170,7 @@
 			db = dbintf
 			dbintf.EXPECT().PutFlow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
 			appMock.EXPECT().ProcessFlowModResultIndication(gomock.Any(), gomock.Any()).Times(1)
-			d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err)
+			d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err, false)
 		})
 	}
 }
diff --git a/internal/pkg/intf/appif.go b/internal/pkg/intf/appif.go
index afc8e4e..46fb9c9 100644
--- a/internal/pkg/intf/appif.go
+++ b/internal/pkg/intf/appif.go
@@ -31,6 +31,7 @@
 	DelDevice(context.Context, string)
 	SetRebootFlag(bool)
 	ProcessFlowModResultIndication(context.Context, FlowStatus)
+	IsFlowDelThresholdReached(context.Context, string, string) bool
 	DeviceRebootInd(context.Context, string, string, string)
 	DeviceDisableInd(context.Context, string)
 	UpdateMvlanProfilesForDevice(context.Context, string)
diff --git a/internal/pkg/util/envutils/envutils.go b/internal/pkg/util/envutils/envutils.go
index 392e9b8..ba70d5e 100644
--- a/internal/pkg/util/envutils/envutils.go
+++ b/internal/pkg/util/envutils/envutils.go
@@ -81,6 +81,7 @@
 	MemProfile                = "MEM_PROFILE"
 	VendorID                  = "VENDOR_ID"
 	DeviceSyncDuration        = "DEVICE_SYNC_DURATION"
+	MaxFlowRetryDuration      = "MAX_FLOW_RETRY_DURATION"
 	// openonu environment variables
 
 	OmciPacketCapture = "SAVE_OMCI_PACKET_CAPTURE"
diff --git a/internal/test/mocks/mock_appif.go b/internal/test/mocks/mock_appif.go
index 42bd490..8b986ec 100644
--- a/internal/test/mocks/mock_appif.go
+++ b/internal/test/mocks/mock_appif.go
@@ -206,6 +206,19 @@
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessFlowModResultIndication", reflect.TypeOf((*MockApp)(nil).ProcessFlowModResultIndication), arg0, arg1)
 }
 
+// IsFlowDelThresholdReached mocks base method.
+func (m *MockApp) IsFlowDelThresholdReached(arg0 context.Context, arg1 string, arg2 string) bool {
+	m.ctrl.T.Helper()
+	m.ctrl.Call(m, "IsFlowDelThresholdReached", arg0, arg1, arg2)
+	return false
+}
+
+// IsFlowDelThresholdReached indicates an expected call of IsFlowDelThresholdReached.
+func (mr *MockAppMockRecorder) IsFlowDelThresholdReached(arg0, arg1, arg2 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsFlowDelThresholdReached", reflect.TypeOf((*MockApp)(nil).IsFlowDelThresholdReached), arg0, arg1, arg2)
+}
+
 // SetRebootFlag mocks base method.
 func (m *MockApp) SetRebootFlag(arg0 bool) {
 	m.ctrl.T.Helper()
diff --git a/voltha-go-controller/config.go b/voltha-go-controller/config.go
index 5bef7dc..d147aed 100644
--- a/voltha-go-controller/config.go
+++ b/voltha-go-controller/config.go
@@ -35,6 +35,7 @@
 	defaultMemProfile                = ""
 	defaultDeviceListRefreshInterval = 10
 	defaultDeviceSyncDuration        = 5
+	defaultMaxFlowRetryDuration      = 60
 	/*
 		FIXME(At RWCORE) Problem: VGC comes up fast by that time RWCORE may not be up and will retry after 10 sec
 		but rwcore could come up before the 10 second expiry and post indications to VGC which can't be consumed by
@@ -107,7 +108,8 @@
 	DeviceListRefreshInterval int // in seconds
 	ConnectionRetryDelay      int // in seconds
 	ConnectionMaxRetries      int
-	DeviceSyncDuration        int
+	DeviceSyncDuration        int // Time interval between each cycle of audit task
+	MaxFlowRetryDuration      int // Maximum duration for which flows will be retried upon failures
 	Banner                    bool
 	DisplayVersion            bool
 }
@@ -141,6 +143,7 @@
 	cf.VolthaAPIEndPoint = cf.VolthaHost + ":" + strconv.Itoa(cf.VolthaPort)
 
 	cf.DeviceSyncDuration = int(envutils.ParseIntEnvVariable(envutils.DeviceSyncDuration, defaultDeviceSyncDuration))
+	cf.MaxFlowRetryDuration = int(envutils.ParseIntEnvVariable(envutils.MaxFlowRetryDuration, defaultMaxFlowRetryDuration))
 }
 
 type multiFlag []string
diff --git a/voltha-go-controller/main.go b/voltha-go-controller/main.go
index 268bbc6..1b8d3e3 100644
--- a/voltha-go-controller/main.go
+++ b/voltha-go-controller/main.go
@@ -197,6 +197,8 @@
 	app.GetApplication().SetVendorID(config.VendorID)
 	ofca := controller.NewController(ctx, app.GetApplication())
 	controller.GetController().SetDeviceTableSyncDuration(config.DeviceSyncDuration)
+	controller.GetController().SetMaxFlowRetryDuration(config.MaxFlowRetryDuration)
+	controller.GetController().SetMaxFlowRetryAttempts()
 	vpa, err1 := vpagent.NewVPAgent(&vpagent.VPAgent{
 		VolthaAPIEndPoint:         config.VolthaAPIEndPoint,
 		DeviceListRefreshInterval: time.Duration(config.DeviceListRefreshInterval) * time.Second,