[VOL-5243] Flow audit control

Change-Id: Ia70283da583ea870af078bf78538c1416f5b795c
diff --git a/internal/pkg/application/service.go b/internal/pkg/application/service.go
index d93c8f4..38d41c2 100644
--- a/internal/pkg/application/service.go
+++ b/internal/pkg/application/service.go
@@ -117,6 +117,8 @@
 	AllowTransparent           bool
 	EnableMulticastKPI         bool
 	IsActivated                bool
+	FlowPushCount              map[string]uint32   // Tracks the number of flow install/delete failure attempts per cookie in order to throttle flow auditing
+	ServiceDeactivateReason    SvcDeactivateReason // Mentions why the service was deactivated
 }
 
 // VoltServiceOper structure
@@ -164,6 +166,18 @@
 	ServiceVlanUpdate ServiceTrigger = 1
 )
 
+// SvcDeactivateReason - Reason for service deactivation
+type SvcDeactivateReason uint8
+
+const (
+	// Service deactivated reason - none
+	SvcDeacRsn_None SvcDeactivateReason = 0
+	// Service deactivate reason - NB
+	SvcDeacRsn_NB SvcDeactivateReason = 1
+	// Service deactivate reason - Controller
+	SvcDeacRsn_Controller SvcDeactivateReason = 2
+)
+
 // AppMutexes structure
 type AppMutexes struct {
 	ServiceDataMutex sync.Mutex `json:"-"`
@@ -187,13 +201,16 @@
 	vs.DsHSIAFlowsApplied = false
 	vs.DeleteInProgress = false
 	vs.DeactivateInProgress = false
+	vs.ServiceDeactivateReason = SvcDeacRsn_None
 	//vs.MacAddr, _ = net.ParseMAC("00:00:00:00:00:00")
+
 	vs.IsOption82Enabled = cfg.IsOption82Enabled
 	vs.MacAddr = cfg.MacAddr
 	vs.Ipv4Addr = net.ParseIP("0.0.0.0")
 	vs.Ipv6Addr = net.ParseIP("::")
 	vs.PendingFlows = make(map[string]bool)
 	vs.AssociatedFlows = make(map[string]bool)
+	vs.FlowPushCount = make(map[string]uint32)
 	return &vs
 }
 
@@ -1037,6 +1054,8 @@
 		vs.DeactivateInProgress = oper.DeactivateInProgress
 		vs.BwAvailInfo = oper.BwAvailInfo
 		vs.Device = oper.Device
+		vs.FlowPushCount = cfg.FlowPushCount
+		vs.ServiceDeactivateReason = cfg.ServiceDeactivateReason
 	} else {
 		// Sorting Pbit from highest
 		sort.Slice(vs.Pbits, func(i, j int) bool {
@@ -1274,8 +1293,12 @@
 
 // FlowInstallSuccess - Called when corresponding service flow installation is success
 // If no more pending flows, HSIA indication wil be triggered
-func (vs *VoltService) FlowInstallSuccess(cntx context.Context, cookie string, bwAvailInfo of.BwAvailDetails) {
+func (vs *VoltService) FlowInstallSuccess(cntx context.Context, cookie string, bwAvailInfo of.BwAvailDetails, flowEventMap *util.ConcurrentMap) {
 	logger.Debugw(ctx, "Flow Add Success Notification", log.Fields{"Cookie": cookie, "bwAvailInfo": bwAvailInfo, "Service": vs.Name})
+	flowEventMap.MapLock.Lock()
+	flowEventMap.Remove(cookie)
+	flowEventMap.MapLock.Unlock()
+
 	if vs.DeleteInProgress {
 		logger.Warnw(ctx, "Skipping Flow Add Success Notification. Service deletion in-progress", log.Fields{"Cookie": cookie, "Service": vs.Name})
 		return
@@ -1290,6 +1313,7 @@
 
 	delete(vs.PendingFlows, cookie)
 	vs.AssociatedFlows[cookie] = true
+	vs.FlowPushCount[cookie] = 0
 	vs.ServiceLock.Unlock()
 	var prevBwAvail, presentBwAvail string
 	if bwAvailInfo.PrevBw != "" && bwAvailInfo.PresentBw != "" {
@@ -1322,10 +1346,19 @@
 
 // FlowInstallFailure - Called when corresponding service flow installation is failed
 // Trigger service failure indication to NB
-func (vs *VoltService) FlowInstallFailure(cookie string, errorCode uint32, errReason string) {
-	vs.ServiceLock.RLock()
-
+func (vs *VoltService) FlowInstallFailure(cntx context.Context, cookie string, errorCode uint32, errReason string, flowEventMap *util.ConcurrentMap) {
 	logger.Debugw(ctx, "Service flow installation failure", log.Fields{"Service": vs.Name, "Cookie": cookie, "errorCode": errorCode, "errReason": errReason})
+
+	isServiceDeactivated := vs.CheckAndDeactivateService(cntx, cookie)
+	if isServiceDeactivated {
+		flowEventMap.MapLock.Lock()
+		for ck := range vs.PendingFlows {
+			flowEventMap.Remove(ck)
+		}
+		flowEventMap.MapLock.Unlock()
+	}
+
+	vs.ServiceLock.RLock()
 	if _, ok := vs.PendingFlows[cookie]; !ok {
 		logger.Errorw(ctx, "Flow Add Failure for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie})
 		vs.ServiceLock.RUnlock()
@@ -1369,13 +1402,54 @@
 	}
 }
 
+// CheckAndDeactivateService - deactivate service and remove flows from DB, if the max flows retry attempt has reached
+func (vs *VoltService) CheckAndDeactivateService(cntx context.Context, cookie string) bool {
+	vs.ServiceLock.Lock()
+	logger.Debugw(ctx, "Check and Deactivate service if flow install threshold is reached and remove flows from DB/Device", log.Fields{"serviceName": vs.Name, "FlowPushCount": vs.FlowPushCount[cookie]})
+	vs.FlowPushCount[cookie]++
+	if vs.FlowPushCount[cookie] == controller.GetController().GetMaxFlowRetryAttempt() {
+		if vs.IsActivated {
+			vs.ServiceLock.Unlock()
+			device, err := GetApplication().GetDeviceFromPort(vs.Port)
+			if err != nil {
+				// Even if the port/device does not exists at this point in time, the deactivate request is succss.
+				// So no error is returned
+				logger.Warnw(ctx, "Error Getting Device", log.Fields{"Reason": err.Error(), "Port": vs.Port})
+				return false
+			}
+			vs.SetSvcDeactivationFlags(SvcDeacRsn_Controller)
+			GetApplication().ServiceByName.Store(vs.Name, vs)
+			p := device.GetPort(vs.Port)
+			if p != nil && (p.State == PortStateUp) {
+				if vpv := GetApplication().GetVnetByPort(vs.Port, vs.SVlan, vs.CVlan, vs.UniVlan); vpv != nil {
+					// Port down call internally deletes all the flows
+					vpv.PortDownInd(cntx, vs.Device, vs.Port, true, true)
+				} else {
+					logger.Warnw(ctx, "VPV does not exists!!!", log.Fields{"Device": vs.Device, "port": vs.Port, "SvcName": vs.Name})
+				}
+			}
+			vs.DeactivateInProgress = false
+			GetApplication().ServiceByName.Store(vs.Name, vs)
+			vs.WriteToDb(cntx)
+			logger.Infow(ctx, "Service deactivated after max flow install attempts", log.Fields{"SvcName": vs.Name, "Cookie": cookie})
+			return true
+		}
+	}
+	vs.ServiceLock.Unlock()
+	return false
+}
+
 // FlowRemoveSuccess - Called when corresponding service flow removal is success
 // If no more associated flows, DelHSIA indication wil be triggered
-func (vs *VoltService) FlowRemoveSuccess(cntx context.Context, cookie string) {
+func (vs *VoltService) FlowRemoveSuccess(cntx context.Context, cookie string, flowEventMap *util.ConcurrentMap) {
 	// if vs.DeleteInProgress {
 	// 	logger.Warnw(ctx, "Skipping Flow Remove Success Notification. Service deletion in-progress", log.Fields{"Cookie": cookie, "Service": vs.Name})
 	// 	return
 	// }
+	flowEventMap.MapLock.Lock()
+	flowEventMap.Remove(cookie)
+	flowEventMap.MapLock.Unlock()
+
 	vs.ServiceLock.Lock()
 	logger.Infow(ctx, "Processing Service Flow Remove Success Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
 
@@ -1387,6 +1461,7 @@
 		logger.Debugw(ctx, "Service Flow Remove Success for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie, "AssociatedFlows": vs.AssociatedFlows, "PendingFlows": vs.PendingFlows})
 	}
 
+	vs.FlowPushCount[cookie] = 0
 	vs.ServiceLock.Unlock()
 
 	vs.WriteToDb(cntx)
@@ -1416,19 +1491,20 @@
 
 // FlowRemoveFailure - Called when corresponding service flow installation is failed
 // Trigger service failure indication to NB
-func (vs *VoltService) FlowRemoveFailure(cntx context.Context, cookie string, errorCode uint32, errReason string) {
-	vs.ServiceLock.RLock()
-	logger.Debugw(ctx, "Processing Service Flow Remove Failure Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
+func (vs *VoltService) FlowRemoveFailure(cntx context.Context, cookie string, errorCode uint32, errReason string, flowEventMap *util.ConcurrentMap) {
+	vs.ServiceLock.Lock()
+	logger.Debugw(ctx, "Processing Service Flow Remove Failure Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied, "FlowPushCount": vs.FlowPushCount[cookie]})
 
+	vs.FlowPushCount[cookie]++
 	if _, ok := vs.AssociatedFlows[cookie]; !ok {
 		logger.Warnw(ctx, "Flow Failure for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie})
-		vs.ServiceLock.RUnlock()
+		vs.ServiceLock.Unlock()
 		return
 	}
 	if vs.DeleteInProgress {
 		delete(vs.AssociatedFlows, cookie)
 	}
-	vs.ServiceLock.RUnlock()
+	vs.ServiceLock.Unlock()
 	logger.Debugw(ctx, "Service Flow Remove Failure Notification", log.Fields{"uniPort": vs.Port, "Cookie": cookie, "Service": vs.Name, "ErrorCode": errorCode, "ErrorReason": errReason})
 
 	vs.triggerServiceFailureInd(errorCode, errReason)
@@ -2115,20 +2191,20 @@
 		if deviceID == DeviceAny {
 			deviceID = device.Name
 		} else if deviceID != device.Name {
-			logger.Errorw(ctx, "Wrong Device ID", log.Fields{"Device": deviceID, "Port": portNo})
-			return errorCodes.ErrDeviceNotFound
+			err := errorCodes.ErrDeviceNotFound
+			return fmt.Errorf("wrong device id %s : %w", deviceID, err)
 		}
 	}
 	va.ServiceByName.Range(func(key, value interface{}) bool {
 		vs := value.(*VoltService)
 		// If svlan if provided, then the tags and tpID of service has to be matching
 		if sVlan != of.VlanNone && (sVlan != vs.SVlan || cVlan != vs.CVlan || tpID != vs.TechProfileID) {
-			logger.Infow(ctx, "Service Activate Request Does not match", log.Fields{"Device": deviceID, "voltService": vs})
+			logger.Warnw(ctx, "Service Activate Request Does not match", log.Fields{"Device": deviceID, "voltService": vs})
 			return true
 		}
 		if portNo == vs.Port && !vs.IsActivated {
 			// Mark the service as activated, so that we can push the flows later when the port is added by voltha
-			logger.Infow(ctx, "Service Activate", log.Fields{"Name": vs.Name})
+			logger.Debugw(ctx, "Service Activate", log.Fields{"Name": vs.Name})
 			vs.IsActivated = true
 			va.ServiceByName.Store(vs.Name, vs)
 			vs.WriteToDb(cntx)
@@ -2156,6 +2232,12 @@
 	return nil
 }
 
+func (vs *VoltService) SetSvcDeactivationFlags(deactivateRsn SvcDeactivateReason) {
+	vs.DeactivateInProgress = true
+	vs.IsActivated = false
+	vs.ServiceDeactivateReason = deactivateRsn
+}
+
 // DeactivateService to activate pre-provisioned service
 func (va *VoltApplication) DeactivateService(cntx context.Context, deviceID, portNo string, sVlan, cVlan of.VlanType, tpID uint16) error {
 	logger.Infow(ctx, "Service Deactivate Request ", log.Fields{"Device": deviceID, "Port": portNo, "Svaln": sVlan, "Cvlan": cVlan, "TpID": tpID})
@@ -2169,8 +2251,7 @@
 			return true
 		}
 		if portNo == vs.Port && vs.IsActivated {
-			vs.IsActivated = false
-			vs.DeactivateInProgress = true
+			vs.SetSvcDeactivationFlags(SvcDeacRsn_NB)
 			va.ServiceByName.Store(vs.Name, vs)
 			vs.WriteToDb(cntx)
 			device, err := va.GetDeviceFromPort(portNo)