[VOL-5243] Flow audit control
Change-Id: Ia70283da583ea870af078bf78538c1416f5b795c
diff --git a/internal/pkg/application/application.go b/internal/pkg/application/application.go
index 51c13ea..371cd84 100644
--- a/internal/pkg/application/application.go
+++ b/internal/pkg/application/application.go
@@ -1767,6 +1767,35 @@
}
}
+// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
+func (va *VoltApplication) IsFlowDelThresholdReached(cntx context.Context, cookie string, device string) bool {
+ logger.Debugw(ctx, "Check flow delete threshold", log.Fields{"Cookie": cookie, "Device": device})
+ d := va.GetDevice(device)
+ if d == nil {
+ logger.Warnw(ctx, "Failed to get device during flow delete threshold check", log.Fields{"Cookie": cookie, "Device": device})
+ return false
+ }
+
+ flowEventMap, err := d.GetFlowEventRegister(of.CommandDel)
+ if err != nil {
+ logger.Warnw(ctx, "Flow event map does not exists", log.Fields{"flowMod": of.CommandDel, "Error": err})
+ return false
+ }
+ flowEventMap.MapLock.Lock()
+ var event interface{}
+ if event, _ = flowEventMap.Get(cookie); event == nil {
+ logger.Warnw(ctx, "Event does not exist during flow delete threshold check", log.Fields{"Cookie": cookie})
+ flowEventMap.MapLock.Unlock()
+ return false
+ }
+ flowEventMap.MapLock.Unlock()
+ flowEvent := event.(*FlowEvent)
+ vs := flowEvent.eventData.(*VoltService)
+ vs.ServiceLock.RLock()
+ defer vs.ServiceLock.RUnlock()
+ return vs.FlowPushCount[cookie] == controller.GetController().GetMaxFlowRetryAttempt()
+}
+
func pushFlowFailureNotif(flowStatus intf.FlowStatus) {
subFlow := flowStatus.Flow
cookie := subFlow.Cookie
diff --git a/internal/pkg/application/flowevent.go b/internal/pkg/application/flowevent.go
index 577cdca..63123da 100644
--- a/internal/pkg/application/flowevent.go
+++ b/internal/pkg/application/flowevent.go
@@ -19,6 +19,7 @@
"context"
infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
+ "voltha-go-controller/internal/pkg/util"
"voltha-go-controller/internal/pkg/intf"
"voltha-go-controller/log"
@@ -31,7 +32,7 @@
type FlowEventType string
// FlowEventHandler - Func prototype for flow event handling funcs
-type FlowEventHandler func(context.Context, *FlowEvent, intf.FlowStatus)
+type FlowEventHandler func(context.Context, *FlowEvent, intf.FlowStatus, *util.ConcurrentMap)
var eventMapper map[FlowEventType]FlowEventHandler
@@ -92,15 +93,19 @@
flowEventMap.MapLock.Unlock()
return false
}
- flowEventMap.Remove(cookie)
flowEventMap.MapLock.Unlock()
flowEvent := event.(*FlowEvent)
- eventMapper[flowEvent.eType](cntx, flowEvent, flowStatus)
+ if flowEvent.eType != EventTypeServiceFlowAdded && flowEvent.eType != EventTypeServiceFlowRemoved {
+ flowEventMap.MapLock.Lock()
+ flowEventMap.Remove(cookie)
+ flowEventMap.MapLock.Unlock()
+ }
+ eventMapper[flowEvent.eType](cntx, flowEvent, flowStatus, flowEventMap)
return true
}
// ProcessUsIgmpFlowAddEvent - Process Us Igmp Flow event trigger
-func ProcessUsIgmpFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessUsIgmpFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
logger.Infow(ctx, "Processing Post Flow Add Event for US Igmp", log.Fields{"Cookie": event.cookie, "event": event})
vpv := event.eventData.(*VoltPortVnet)
if isFlowStatusSuccess(flowStatus.Status, true) {
@@ -111,18 +116,18 @@
}
// ProcessServiceFlowAddEvent - Process Service Flow event trigger
-func ProcessServiceFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessServiceFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
logger.Infow(ctx, "Processing Post Flow Add Event for Service", log.Fields{"Cookie": event.cookie, "event": event})
vs := event.eventData.(*VoltService)
if isFlowStatusSuccess(flowStatus.Status, true) {
- vs.FlowInstallSuccess(cntx, event.cookie, flowStatus.AdditionalData)
+ vs.FlowInstallSuccess(cntx, event.cookie, flowStatus.AdditionalData, flowEventMap)
} else {
- vs.FlowInstallFailure(event.cookie, flowStatus.Status, flowStatus.Reason)
+ vs.FlowInstallFailure(cntx, event.cookie, flowStatus.Status, flowStatus.Reason, flowEventMap)
}
}
// ProcessControlFlowAddEvent - Process Control Flow event trigger
-func ProcessControlFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessControlFlowAddEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
logger.Infow(ctx, "Processing Post Flow Add Event for VPV", log.Fields{"Cookie": event.cookie, "event": event})
vpv := event.eventData.(*VoltPortVnet)
if !isFlowStatusSuccess(flowStatus.Status, true) {
@@ -131,18 +136,18 @@
}
// ProcessServiceFlowDelEvent - Process Service Flow event trigger
-func ProcessServiceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessServiceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
logger.Infow(ctx, "Processing Post Flow Remove Event for Service", log.Fields{"Cookie": event.cookie, "event": event})
vs := event.eventData.(*VoltService)
if isFlowStatusSuccess(flowStatus.Status, false) {
- vs.FlowRemoveSuccess(cntx, event.cookie)
+ vs.FlowRemoveSuccess(cntx, event.cookie, flowEventMap)
} else {
- vs.FlowRemoveFailure(cntx, event.cookie, flowStatus.Status, flowStatus.Reason)
+ vs.FlowRemoveFailure(cntx, event.cookie, flowStatus.Status, flowStatus.Reason, flowEventMap)
}
}
// ProcessControlFlowDelEvent - Process Control Flow event trigger
-func ProcessControlFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessControlFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
logger.Infow(ctx, "Processing Post Flow Remove Event for VPV", log.Fields{"Cookie": event.cookie, "event": event})
vpv := event.eventData.(*VoltPortVnet)
if isFlowStatusSuccess(flowStatus.Status, false) {
@@ -153,7 +158,7 @@
}
// ProcessMcastFlowDelEvent - Process Control Flow event trigger
-func ProcessMcastFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessMcastFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
logger.Infow(ctx, "Processing Post Flow Remove Event for Mcast/Igmp", log.Fields{"Cookie": event.cookie, "event": event})
mvp := event.eventData.(*MvlanProfile)
if isFlowStatusSuccess(flowStatus.Status, false) {
@@ -164,7 +169,7 @@
}
// ProcessDeviceFlowDelEvent - Process Control Flow event trigger
-func ProcessDeviceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus) {
+func ProcessDeviceFlowDelEvent(cntx context.Context, event *FlowEvent, flowStatus intf.FlowStatus, flowEventMap *util.ConcurrentMap) {
logger.Debugw(ctx, "Processing Post Flow Remove Event for VNET", log.Fields{"Cookie": event.cookie, "event": event})
vnet := event.eventData.(*VoltVnet)
if isFlowStatusSuccess(flowStatus.Status, false) {
diff --git a/internal/pkg/application/flowevent_test.go b/internal/pkg/application/flowevent_test.go
index b6b8738..6bd5da2 100644
--- a/internal/pkg/application/flowevent_test.go
+++ b/internal/pkg/application/flowevent_test.go
@@ -150,16 +150,24 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- ProcessUsIgmpFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+ ProcessUsIgmpFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
})
}
}
func TestProcessServiceFlowAddEvent(t *testing.T) {
type args struct {
- cntx context.Context
- event *FlowEvent
- flowStatus intf.FlowStatus
+ cntx context.Context
+ event *FlowEvent
+ flowStatus intf.FlowStatus
+ flowEventMap *util.ConcurrentMap
+ }
+
+ flowPushCountMap := make(map[string]uint32)
+ vs := &VoltService{
+ VoltServiceCfg: VoltServiceCfg{
+ FlowPushCount: flowPushCountMap,
+ },
}
tests := []struct {
@@ -172,8 +180,9 @@
cntx: context.Background(),
event: &FlowEvent{
device: "test_device",
- eventData: voltService,
+ eventData: vs,
},
+ flowEventMap: util.NewConcurrentMap(),
},
},
{
@@ -182,17 +191,18 @@
cntx: context.Background(),
event: &FlowEvent{
device: "test_device",
- eventData: voltService,
+ eventData: vs,
},
flowStatus: intf.FlowStatus{
Status: uint32(1001),
},
+ flowEventMap: util.NewConcurrentMap(),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- ProcessServiceFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+ ProcessServiceFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, tt.args.flowEventMap)
})
}
}
@@ -231,17 +241,26 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- ProcessControlFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+ ProcessControlFlowAddEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
})
}
}
func TestProcessServiceFlowDelEvent(t *testing.T) {
type args struct {
- cntx context.Context
- event *FlowEvent
- flowStatus intf.FlowStatus
+ cntx context.Context
+ event *FlowEvent
+ flowStatus intf.FlowStatus
+ flowEventMap *util.ConcurrentMap
}
+
+ flowPushCountMap := make(map[string]uint32)
+ vs := &VoltService{
+ VoltServiceCfg: VoltServiceCfg{
+ FlowPushCount: flowPushCountMap,
+ },
+ }
+
tests := []struct {
name string
args args
@@ -251,8 +270,9 @@
args: args{
cntx: context.Background(),
event: &FlowEvent{
- eventData: voltService,
+ eventData: vs,
},
+ flowEventMap: util.NewConcurrentMap(),
},
},
{
@@ -260,11 +280,12 @@
args: args{
cntx: context.Background(),
event: &FlowEvent{
- eventData: voltService,
+ eventData: vs,
},
flowStatus: intf.FlowStatus{
Status: uint32(1001),
},
+ flowEventMap: util.NewConcurrentMap(),
},
},
}
@@ -273,7 +294,7 @@
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutService(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
- ProcessServiceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+ ProcessServiceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, tt.args.flowEventMap)
})
}
}
@@ -315,7 +336,7 @@
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutVpv(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
- ProcessControlFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+ ProcessControlFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
})
}
}
@@ -360,7 +381,7 @@
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutMvlan(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
- ProcessMcastFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+ ProcessMcastFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
})
}
}
@@ -410,9 +431,9 @@
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutVnet(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil).AnyTimes()
- ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+ ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
case "ProcessDeviceFlowDelEvent_else_condition":
- ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus)
+ ProcessDeviceFlowDelEvent(tt.args.cntx, tt.args.event, tt.args.flowStatus, nil)
}
})
}
diff --git a/internal/pkg/application/service.go b/internal/pkg/application/service.go
index d93c8f4..38d41c2 100644
--- a/internal/pkg/application/service.go
+++ b/internal/pkg/application/service.go
@@ -117,6 +117,8 @@
AllowTransparent bool
EnableMulticastKPI bool
IsActivated bool
+ FlowPushCount map[string]uint32 // Tracks the number of flow install/delete failure attempts per cookie in order to throttle flow auditing
+ ServiceDeactivateReason SvcDeactivateReason // Mentions why the service was deactivated
}
// VoltServiceOper structure
@@ -164,6 +166,18 @@
ServiceVlanUpdate ServiceTrigger = 1
)
+// SvcDeactivateReason - Reason for service deactivation
+type SvcDeactivateReason uint8
+
+const (
+ // Service deactivated reason - none
+ SvcDeacRsn_None SvcDeactivateReason = 0
+ // Service deactivate reason - NB
+ SvcDeacRsn_NB SvcDeactivateReason = 1
+ // Service deactivate reason - Controller
+ SvcDeacRsn_Controller SvcDeactivateReason = 2
+)
+
// AppMutexes structure
type AppMutexes struct {
ServiceDataMutex sync.Mutex `json:"-"`
@@ -187,13 +201,16 @@
vs.DsHSIAFlowsApplied = false
vs.DeleteInProgress = false
vs.DeactivateInProgress = false
+ vs.ServiceDeactivateReason = SvcDeacRsn_None
//vs.MacAddr, _ = net.ParseMAC("00:00:00:00:00:00")
+
vs.IsOption82Enabled = cfg.IsOption82Enabled
vs.MacAddr = cfg.MacAddr
vs.Ipv4Addr = net.ParseIP("0.0.0.0")
vs.Ipv6Addr = net.ParseIP("::")
vs.PendingFlows = make(map[string]bool)
vs.AssociatedFlows = make(map[string]bool)
+ vs.FlowPushCount = make(map[string]uint32)
return &vs
}
@@ -1037,6 +1054,8 @@
vs.DeactivateInProgress = oper.DeactivateInProgress
vs.BwAvailInfo = oper.BwAvailInfo
vs.Device = oper.Device
+ vs.FlowPushCount = cfg.FlowPushCount
+ vs.ServiceDeactivateReason = cfg.ServiceDeactivateReason
} else {
// Sorting Pbit from highest
sort.Slice(vs.Pbits, func(i, j int) bool {
@@ -1274,8 +1293,12 @@
// FlowInstallSuccess - Called when corresponding service flow installation is success
// If no more pending flows, HSIA indication wil be triggered
-func (vs *VoltService) FlowInstallSuccess(cntx context.Context, cookie string, bwAvailInfo of.BwAvailDetails) {
+func (vs *VoltService) FlowInstallSuccess(cntx context.Context, cookie string, bwAvailInfo of.BwAvailDetails, flowEventMap *util.ConcurrentMap) {
logger.Debugw(ctx, "Flow Add Success Notification", log.Fields{"Cookie": cookie, "bwAvailInfo": bwAvailInfo, "Service": vs.Name})
+ flowEventMap.MapLock.Lock()
+ flowEventMap.Remove(cookie)
+ flowEventMap.MapLock.Unlock()
+
if vs.DeleteInProgress {
logger.Warnw(ctx, "Skipping Flow Add Success Notification. Service deletion in-progress", log.Fields{"Cookie": cookie, "Service": vs.Name})
return
@@ -1290,6 +1313,7 @@
delete(vs.PendingFlows, cookie)
vs.AssociatedFlows[cookie] = true
+ vs.FlowPushCount[cookie] = 0
vs.ServiceLock.Unlock()
var prevBwAvail, presentBwAvail string
if bwAvailInfo.PrevBw != "" && bwAvailInfo.PresentBw != "" {
@@ -1322,10 +1346,19 @@
// FlowInstallFailure - Called when corresponding service flow installation is failed
// Trigger service failure indication to NB
-func (vs *VoltService) FlowInstallFailure(cookie string, errorCode uint32, errReason string) {
- vs.ServiceLock.RLock()
-
+func (vs *VoltService) FlowInstallFailure(cntx context.Context, cookie string, errorCode uint32, errReason string, flowEventMap *util.ConcurrentMap) {
logger.Debugw(ctx, "Service flow installation failure", log.Fields{"Service": vs.Name, "Cookie": cookie, "errorCode": errorCode, "errReason": errReason})
+
+ isServiceDeactivated := vs.CheckAndDeactivateService(cntx, cookie)
+ if isServiceDeactivated {
+ flowEventMap.MapLock.Lock()
+ for ck := range vs.PendingFlows {
+ flowEventMap.Remove(ck)
+ }
+ flowEventMap.MapLock.Unlock()
+ }
+
+ vs.ServiceLock.RLock()
if _, ok := vs.PendingFlows[cookie]; !ok {
logger.Errorw(ctx, "Flow Add Failure for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie})
vs.ServiceLock.RUnlock()
@@ -1369,13 +1402,54 @@
}
}
+// CheckAndDeactivateService - deactivate service and remove flows from DB, if the max flows retry attempt has reached
+func (vs *VoltService) CheckAndDeactivateService(cntx context.Context, cookie string) bool {
+ vs.ServiceLock.Lock()
+ logger.Debugw(ctx, "Check and Deactivate service if flow install threshold is reached and remove flows from DB/Device", log.Fields{"serviceName": vs.Name, "FlowPushCount": vs.FlowPushCount[cookie]})
+ vs.FlowPushCount[cookie]++
+ if vs.FlowPushCount[cookie] == controller.GetController().GetMaxFlowRetryAttempt() {
+ if vs.IsActivated {
+ vs.ServiceLock.Unlock()
+ device, err := GetApplication().GetDeviceFromPort(vs.Port)
+ if err != nil {
+ // Even if the port/device does not exists at this point in time, the deactivate request is succss.
+ // So no error is returned
+ logger.Warnw(ctx, "Error Getting Device", log.Fields{"Reason": err.Error(), "Port": vs.Port})
+ return false
+ }
+ vs.SetSvcDeactivationFlags(SvcDeacRsn_Controller)
+ GetApplication().ServiceByName.Store(vs.Name, vs)
+ p := device.GetPort(vs.Port)
+ if p != nil && (p.State == PortStateUp) {
+ if vpv := GetApplication().GetVnetByPort(vs.Port, vs.SVlan, vs.CVlan, vs.UniVlan); vpv != nil {
+ // Port down call internally deletes all the flows
+ vpv.PortDownInd(cntx, vs.Device, vs.Port, true, true)
+ } else {
+ logger.Warnw(ctx, "VPV does not exists!!!", log.Fields{"Device": vs.Device, "port": vs.Port, "SvcName": vs.Name})
+ }
+ }
+ vs.DeactivateInProgress = false
+ GetApplication().ServiceByName.Store(vs.Name, vs)
+ vs.WriteToDb(cntx)
+ logger.Infow(ctx, "Service deactivated after max flow install attempts", log.Fields{"SvcName": vs.Name, "Cookie": cookie})
+ return true
+ }
+ }
+ vs.ServiceLock.Unlock()
+ return false
+}
+
// FlowRemoveSuccess - Called when corresponding service flow removal is success
// If no more associated flows, DelHSIA indication wil be triggered
-func (vs *VoltService) FlowRemoveSuccess(cntx context.Context, cookie string) {
+func (vs *VoltService) FlowRemoveSuccess(cntx context.Context, cookie string, flowEventMap *util.ConcurrentMap) {
// if vs.DeleteInProgress {
// logger.Warnw(ctx, "Skipping Flow Remove Success Notification. Service deletion in-progress", log.Fields{"Cookie": cookie, "Service": vs.Name})
// return
// }
+ flowEventMap.MapLock.Lock()
+ flowEventMap.Remove(cookie)
+ flowEventMap.MapLock.Unlock()
+
vs.ServiceLock.Lock()
logger.Infow(ctx, "Processing Service Flow Remove Success Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
@@ -1387,6 +1461,7 @@
logger.Debugw(ctx, "Service Flow Remove Success for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie, "AssociatedFlows": vs.AssociatedFlows, "PendingFlows": vs.PendingFlows})
}
+ vs.FlowPushCount[cookie] = 0
vs.ServiceLock.Unlock()
vs.WriteToDb(cntx)
@@ -1416,19 +1491,20 @@
// FlowRemoveFailure - Called when corresponding service flow installation is failed
// Trigger service failure indication to NB
-func (vs *VoltService) FlowRemoveFailure(cntx context.Context, cookie string, errorCode uint32, errReason string) {
- vs.ServiceLock.RLock()
- logger.Debugw(ctx, "Processing Service Flow Remove Failure Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied})
+func (vs *VoltService) FlowRemoveFailure(cntx context.Context, cookie string, errorCode uint32, errReason string, flowEventMap *util.ConcurrentMap) {
+ vs.ServiceLock.Lock()
+ logger.Debugw(ctx, "Processing Service Flow Remove Failure Indication", log.Fields{"Cookie": cookie, "Service": vs.Name, "Associated Flows": vs.AssociatedFlows, "DsFlowsApplied": vs.DsHSIAFlowsApplied, "FlowPushCount": vs.FlowPushCount[cookie]})
+ vs.FlowPushCount[cookie]++
if _, ok := vs.AssociatedFlows[cookie]; !ok {
logger.Warnw(ctx, "Flow Failure for unknown Cookie", log.Fields{"Service": vs.Name, "Cookie": cookie})
- vs.ServiceLock.RUnlock()
+ vs.ServiceLock.Unlock()
return
}
if vs.DeleteInProgress {
delete(vs.AssociatedFlows, cookie)
}
- vs.ServiceLock.RUnlock()
+ vs.ServiceLock.Unlock()
logger.Debugw(ctx, "Service Flow Remove Failure Notification", log.Fields{"uniPort": vs.Port, "Cookie": cookie, "Service": vs.Name, "ErrorCode": errorCode, "ErrorReason": errReason})
vs.triggerServiceFailureInd(errorCode, errReason)
@@ -2115,20 +2191,20 @@
if deviceID == DeviceAny {
deviceID = device.Name
} else if deviceID != device.Name {
- logger.Errorw(ctx, "Wrong Device ID", log.Fields{"Device": deviceID, "Port": portNo})
- return errorCodes.ErrDeviceNotFound
+ err := errorCodes.ErrDeviceNotFound
+ return fmt.Errorf("wrong device id %s : %w", deviceID, err)
}
}
va.ServiceByName.Range(func(key, value interface{}) bool {
vs := value.(*VoltService)
// If svlan if provided, then the tags and tpID of service has to be matching
if sVlan != of.VlanNone && (sVlan != vs.SVlan || cVlan != vs.CVlan || tpID != vs.TechProfileID) {
- logger.Infow(ctx, "Service Activate Request Does not match", log.Fields{"Device": deviceID, "voltService": vs})
+ logger.Warnw(ctx, "Service Activate Request Does not match", log.Fields{"Device": deviceID, "voltService": vs})
return true
}
if portNo == vs.Port && !vs.IsActivated {
// Mark the service as activated, so that we can push the flows later when the port is added by voltha
- logger.Infow(ctx, "Service Activate", log.Fields{"Name": vs.Name})
+ logger.Debugw(ctx, "Service Activate", log.Fields{"Name": vs.Name})
vs.IsActivated = true
va.ServiceByName.Store(vs.Name, vs)
vs.WriteToDb(cntx)
@@ -2156,6 +2232,12 @@
return nil
}
+func (vs *VoltService) SetSvcDeactivationFlags(deactivateRsn SvcDeactivateReason) {
+ vs.DeactivateInProgress = true
+ vs.IsActivated = false
+ vs.ServiceDeactivateReason = deactivateRsn
+}
+
// DeactivateService to activate pre-provisioned service
func (va *VoltApplication) DeactivateService(cntx context.Context, deviceID, portNo string, sVlan, cVlan of.VlanType, tpID uint16) error {
logger.Infow(ctx, "Service Deactivate Request ", log.Fields{"Device": deviceID, "Port": portNo, "Svaln": sVlan, "Cvlan": cVlan, "TpID": tpID})
@@ -2169,8 +2251,7 @@
return true
}
if portNo == vs.Port && vs.IsActivated {
- vs.IsActivated = false
- vs.DeactivateInProgress = true
+ vs.SetSvcDeactivationFlags(SvcDeacRsn_NB)
va.ServiceByName.Store(vs.Name, vs)
vs.WriteToDb(cntx)
device, err := va.GetDeviceFromPort(portNo)
diff --git a/internal/pkg/application/service_test.go b/internal/pkg/application/service_test.go
index 83e39d7..13201ff 100644
--- a/internal/pkg/application/service_test.go
+++ b/internal/pkg/application/service_test.go
@@ -224,22 +224,30 @@
switch tt.name {
case "VoltService_FlowRemoveFailure":
associatedFlows := map[string]bool{}
+ flowPushCountMap := map[string]uint32{}
associatedFlows["test_cookie"] = true
vs := &VoltService{
VoltServiceOper: VoltServiceOper{
AssociatedFlows: associatedFlows,
},
+ VoltServiceCfg: VoltServiceCfg{
+ FlowPushCount: flowPushCountMap,
+ },
}
- vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason)
+ vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
case "cookie_not_found":
associatedFlows := map[string]bool{}
+ flowPushCountMap := map[string]uint32{}
associatedFlows["cookie"] = true
vs := &VoltService{
VoltServiceOper: VoltServiceOper{
AssociatedFlows: associatedFlows,
},
+ VoltServiceCfg: VoltServiceCfg{
+ FlowPushCount: flowPushCountMap,
+ },
}
- vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason)
+ vs.FlowRemoveFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
}
})
}
@@ -525,9 +533,10 @@
func TestVoltService_FlowInstallSuccess(t *testing.T) {
type args struct {
- cntx context.Context
- cookie string
- bwAvailInfo of.BwAvailDetails
+ cntx context.Context
+ cookie string
+ bwAvailInfo of.BwAvailDetails
+ flowEventMap *util.ConcurrentMap
}
tests := []struct {
name string
@@ -542,6 +551,7 @@
PrevBw: "test_prev_BW",
PresentBw: "test_present_BW",
},
+ flowEventMap: util.NewConcurrentMap(),
},
},
}
@@ -550,6 +560,7 @@
pendingFlows := map[string]bool{}
pendingFlows["test_cookie"] = true
associatedFlows := map[string]bool{}
+ flowPushCountMap := map[string]uint32{}
associatedFlows["test_cookie"] = true
vs := &VoltService{
VoltServiceOper: VoltServiceOper{
@@ -558,13 +569,14 @@
DsHSIAFlowsApplied: true,
},
VoltServiceCfg: VoltServiceCfg{
- Port: "test_port",
+ Port: "test_port",
+ FlowPushCount: flowPushCountMap,
},
}
ga := GetApplication()
ga.PortsDisc.Store("test_port", voltPort)
ga.DevicesDisc.Store(test_device, voltDevice)
- vs.FlowInstallSuccess(tt.args.cntx, tt.args.cookie, tt.args.bwAvailInfo)
+ vs.FlowInstallSuccess(tt.args.cntx, tt.args.cookie, tt.args.bwAvailInfo, tt.args.flowEventMap)
})
}
}
@@ -1065,9 +1077,6 @@
if err := va.ActivateService(tt.args.cntx, tt.args.deviceID, tt.args.portNo, tt.args.sVlan, tt.args.cVlan, tt.args.tpID); (err != nil) != tt.wantErr {
t.Errorf("VoltApplication.ActivateService() error = %v, wantErr %v", err, tt.wantErr)
}
- case GetDeviceFromPort_error:
- err := va.ActivateService(tt.args.cntx, tt.args.deviceID, tt.args.portNo, tt.args.sVlan, tt.args.cVlan, tt.args.tpID)
- assert.Nil(t, err)
case "deviceID != device.Name":
var voltPortTest1 = &VoltPort{
Name: "test_name",
@@ -2316,6 +2325,7 @@
func TestVoltService_FlowInstallFailure(t *testing.T) {
type args struct {
+ cntx context.Context
cookie string
errorCode uint32
errReason string
@@ -2327,6 +2337,7 @@
{
name: "VoltService_FlowInstallFailure",
args: args{
+ cntx: context.Background(),
cookie: "test_cookie",
errorCode: uint32(1),
errReason: "err_reason",
@@ -2335,6 +2346,7 @@
{
name: "PendingFlows[cookie]_false",
args: args{
+ cntx: context.Background(),
cookie: "test_cookie",
errorCode: uint32(1),
errReason: "err_reason",
@@ -2345,15 +2357,19 @@
pendingFlows["test_cookie"] = true
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ flowPushCountMap := map[string]uint32{}
vs := &VoltService{
VoltServiceOper: VoltServiceOper{},
+ VoltServiceCfg: VoltServiceCfg{
+ FlowPushCount: flowPushCountMap,
+ },
}
switch tt.name {
case "VoltService_FlowInstallFailure":
vs.PendingFlows = pendingFlows
- vs.FlowInstallFailure(tt.args.cookie, tt.args.errorCode, tt.args.errReason)
+ vs.FlowInstallFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
case "PendingFlows[cookie]_false":
- vs.FlowInstallFailure(tt.args.cookie, tt.args.errorCode, tt.args.errReason)
+ vs.FlowInstallFailure(tt.args.cntx, tt.args.cookie, tt.args.errorCode, tt.args.errReason, nil)
}
})
}
@@ -2361,8 +2377,9 @@
func TestVoltService_FlowRemoveSuccess(t *testing.T) {
type args struct {
- cntx context.Context
- cookie string
+ cntx context.Context
+ cookie string
+ flowEventMap *util.ConcurrentMap
}
tests := []struct {
name string
@@ -2371,17 +2388,22 @@
{
name: "GetDevice != nil",
args: args{
- cntx: context.Background(),
- cookie: "test_cookie",
+ cntx: context.Background(),
+ cookie: "test_cookie",
+ flowEventMap: util.NewConcurrentMap(),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ flowPushCountMap := map[string]uint32{}
vs := &VoltService{
VoltServiceOper: VoltServiceOper{
Device: test_device,
},
+ VoltServiceCfg: VoltServiceCfg{
+ FlowPushCount: flowPushCountMap,
+ },
}
ga := GetApplication()
ga.DevicesDisc.Store(test_device, voltDevice2)
@@ -2389,7 +2411,7 @@
dbintf := mocks.NewMockDBIntf(gomock.NewController(t))
db = dbintf
dbintf.EXPECT().PutService(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
- vs.FlowRemoveSuccess(tt.args.cntx, tt.args.cookie)
+ vs.FlowRemoveSuccess(tt.args.cntx, tt.args.cookie, tt.args.flowEventMap)
})
}
}
diff --git a/internal/pkg/controller/addflows.go b/internal/pkg/controller/addflows.go
index 894127b..2f13dbd 100644
--- a/internal/pkg/controller/addflows.go
+++ b/internal/pkg/controller/addflows.go
@@ -93,7 +93,7 @@
if err.Error() == ErrDuplicateFlow {
dbFlow, _ := aft.device.GetFlow(flow.Cookie)
if dbFlow.State == of.FlowAddSuccess {
- aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+ aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
flowsPresent++
}
}
@@ -108,7 +108,7 @@
// aft.device.AddFlowToDb(dbFlow)
flowsToProcess[flow.Cookie] = dbFlow
}
- aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+ aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, false)
}
}
@@ -124,7 +124,7 @@
for _, flow := range aft.flow.SubFlows {
logger.Warnw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
if aft.flow.Command == of.CommandDel {
- aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+ aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
}
}
return nil
@@ -160,7 +160,7 @@
}
break
}
- aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+ aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, err, true)
} else {
logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
}
diff --git a/internal/pkg/controller/audittables.go b/internal/pkg/controller/audittables.go
index f422d52..d5ce858 100644
--- a/internal/pkg/controller/audittables.go
+++ b/internal/pkg/controller/audittables.go
@@ -267,8 +267,6 @@
delete(rcvdFlows, flow.Cookie)
}
defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
-
- GetController().ProcessFlowModResultIndication(cntx, defaultSuccessFlowStatus)
} else {
// The flow exists at the controller but not at the device
// Push the flow to the device
@@ -315,7 +313,7 @@
if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flow); err != nil {
logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Reason": err.Error()})
}
- att.device.triggerFlowResultNotification(cntx, flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err)
+ att.device.triggerFlowResultNotification(cntx, flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err, true)
}
}
@@ -336,6 +334,11 @@
continue
}
+ if flag := GetController().IsFlowDelThresholdReached(cntx, strconv.FormatUint(flow.Cookie, 10), att.device.ID); flag {
+ logger.Warnw(ctx, "Flow delete threshold reached, skipping flow delete", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
+ continue
+ }
+
logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
// Create the flowMod structure and fill it out
flowMod := &ofp.OfpFlowMod{}
@@ -362,7 +365,7 @@
if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
logger.Errorw(ctx, "Flow Audit Delete Failed", log.Fields{"Reason": err.Error()})
}
- att.device.triggerFlowResultNotification(cntx, flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err)
+ att.device.triggerFlowResultNotification(cntx, flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err, true)
}
}
@@ -601,13 +604,11 @@
// AddMissingPorts to add the missing ports
func (att *AuditTablesTask) AddMissingPorts(cntx context.Context, mps map[uint32]*ofp.OfpPort) {
- logger.Infow(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
+ logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
addMissingPort := func(mp *ofp.OfpPort) {
logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
- // Error is ignored as it only drops duplicate ports
- logger.Debugw(ctx, "Calling AddPort", log.Fields{"No": mp.PortNo, "Name": mp.Name})
if err := att.device.AddPort(cntx, mp); err != nil {
logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
}
diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go
index c31d892..76e4558 100644
--- a/internal/pkg/controller/controller.go
+++ b/internal/pkg/controller/controller.go
@@ -72,7 +72,9 @@
rebootInProgressDevices map[string]string
deviceLock sync.RWMutex
rebootLock sync.Mutex
- deviceTableSyncDuration time.Duration
+ deviceTableSyncDuration time.Duration // Time interval between each cycle of audit task
+ maxFlowRetryDuration time.Duration // Maximum duration for which flows will be retried upon failures
+ maxFlowRetryAttempts uint32 // maxFlowRetryAttempt = maxFlowRetryDuration / deviceTableSyncDuration
RebootFlow bool
}
@@ -100,11 +102,26 @@
v.deviceTableSyncDuration = time.Duration(duration) * time.Second
}
+// SetMaxFlowRetryDuration - sets max flow retry interval
+func (v *VoltController) SetMaxFlowRetryDuration(duration int) {
+ v.maxFlowRetryDuration = time.Duration(duration) * time.Second
+}
+
+// SetMaxFlowRetryAttempts - sets max flow retry attempts
+func (v *VoltController) SetMaxFlowRetryAttempts() {
+ v.maxFlowRetryAttempts = uint32((v.maxFlowRetryDuration / v.deviceTableSyncDuration))
+}
+
// GetDeviceTableSyncDuration - returns configured device table sync duration
func (v *VoltController) GetDeviceTableSyncDuration() time.Duration {
return v.deviceTableSyncDuration
}
+// GetMaxFlowRetryAttempt - returns max flow retry attempst
+func (v *VoltController) GetMaxFlowRetryAttempt() uint32 {
+ return v.maxFlowRetryAttempts
+}
+
// AddDevice to add device
func (v *VoltController) AddDevice(cntx context.Context, config *intf.VPClientCfg) intf.IVPClient {
d := NewDevice(cntx, config.DeviceID, config.SerialNum, config.VolthaClient, config.SouthBoundID, config.MfrDesc, config.HwDesc, config.SwDesc)
@@ -249,6 +266,11 @@
v.app.ProcessFlowModResultIndication(cntx, flowStatus)
}
+// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
+func (v *VoltController) IsFlowDelThresholdReached(cntx context.Context, cookie string, device string) bool {
+ return v.app.IsFlowDelThresholdReached(cntx, cookie, device)
+}
+
// AddVPAgent to add the vpagent
func (v *VoltController) AddVPAgent(vep string, vpa *vpagent.VPAgent) {
v.vagent[vep] = vpa
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
index 618afae..bdac105 100644
--- a/internal/pkg/controller/device.go
+++ b/internal/pkg/controller/device.go
@@ -1059,12 +1059,12 @@
return false
}
-func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
flow, _ := d.GetFlow(cookie)
- d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
+ d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err, sendFlowNotif)
}
-func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
statusCode, statusMsg := infraerror.GetErrorInfo(err)
success := isFlowOperSuccess(statusCode, oper)
@@ -1108,6 +1108,8 @@
AdditionalData: bwDetails,
}
- logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
- GetController().ProcessFlowModResultIndication(cntx, flowResult)
+ if sendFlowNotif {
+ logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
+ GetController().ProcessFlowModResultIndication(cntx, flowResult)
+ }
}
diff --git a/internal/pkg/controller/device_test.go b/internal/pkg/controller/device_test.go
index 8f35aef..7089c64 100644
--- a/internal/pkg/controller/device_test.go
+++ b/internal/pkg/controller/device_test.go
@@ -170,7 +170,7 @@
db = dbintf
dbintf.EXPECT().PutFlow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
appMock.EXPECT().ProcessFlowModResultIndication(gomock.Any(), gomock.Any()).Times(1)
- d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err)
+ d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err, false)
})
}
}
diff --git a/internal/pkg/intf/appif.go b/internal/pkg/intf/appif.go
index afc8e4e..46fb9c9 100644
--- a/internal/pkg/intf/appif.go
+++ b/internal/pkg/intf/appif.go
@@ -31,6 +31,7 @@
DelDevice(context.Context, string)
SetRebootFlag(bool)
ProcessFlowModResultIndication(context.Context, FlowStatus)
+ IsFlowDelThresholdReached(context.Context, string, string) bool
DeviceRebootInd(context.Context, string, string, string)
DeviceDisableInd(context.Context, string)
UpdateMvlanProfilesForDevice(context.Context, string)
diff --git a/internal/pkg/util/envutils/envutils.go b/internal/pkg/util/envutils/envutils.go
index 392e9b8..ba70d5e 100644
--- a/internal/pkg/util/envutils/envutils.go
+++ b/internal/pkg/util/envutils/envutils.go
@@ -81,6 +81,7 @@
MemProfile = "MEM_PROFILE"
VendorID = "VENDOR_ID"
DeviceSyncDuration = "DEVICE_SYNC_DURATION"
+ MaxFlowRetryDuration = "MAX_FLOW_RETRY_DURATION"
// openonu environment variables
OmciPacketCapture = "SAVE_OMCI_PACKET_CAPTURE"
diff --git a/internal/test/mocks/mock_appif.go b/internal/test/mocks/mock_appif.go
index 42bd490..8b986ec 100644
--- a/internal/test/mocks/mock_appif.go
+++ b/internal/test/mocks/mock_appif.go
@@ -206,6 +206,19 @@
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessFlowModResultIndication", reflect.TypeOf((*MockApp)(nil).ProcessFlowModResultIndication), arg0, arg1)
}
+// IsFlowDelThresholdReached mocks base method.
+func (m *MockApp) IsFlowDelThresholdReached(arg0 context.Context, arg1 string, arg2 string) bool {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "IsFlowDelThresholdReached", arg0, arg1, arg2)
+ return false
+}
+
+// IsFlowDelThresholdReached indicates an expected call of IsFlowDelThresholdReached.
+func (mr *MockAppMockRecorder) IsFlowDelThresholdReached(arg0, arg1, arg2 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsFlowDelThresholdReached", reflect.TypeOf((*MockApp)(nil).IsFlowDelThresholdReached), arg0, arg1, arg2)
+}
+
// SetRebootFlag mocks base method.
func (m *MockApp) SetRebootFlag(arg0 bool) {
m.ctrl.T.Helper()