[VOL-5243] Flow audit control
Change-Id: Ia70283da583ea870af078bf78538c1416f5b795c
diff --git a/internal/pkg/controller/addflows.go b/internal/pkg/controller/addflows.go
index 894127b..2f13dbd 100644
--- a/internal/pkg/controller/addflows.go
+++ b/internal/pkg/controller/addflows.go
@@ -93,7 +93,7 @@
if err.Error() == ErrDuplicateFlow {
dbFlow, _ := aft.device.GetFlow(flow.Cookie)
if dbFlow.State == of.FlowAddSuccess {
- aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+ aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
flowsPresent++
}
}
@@ -108,7 +108,7 @@
// aft.device.AddFlowToDb(dbFlow)
flowsToProcess[flow.Cookie] = dbFlow
}
- aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+ aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, false)
}
}
@@ -124,7 +124,7 @@
for _, flow := range aft.flow.SubFlows {
logger.Warnw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
if aft.flow.Command == of.CommandDel {
- aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+ aft.device.triggerFlowNotification(ctx, flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil, true)
}
}
return nil
@@ -160,7 +160,7 @@
}
break
}
- aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+ aft.device.triggerFlowNotification(ctx, flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, err, true)
} else {
logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
}
diff --git a/internal/pkg/controller/audittables.go b/internal/pkg/controller/audittables.go
index f422d52..d5ce858 100644
--- a/internal/pkg/controller/audittables.go
+++ b/internal/pkg/controller/audittables.go
@@ -267,8 +267,6 @@
delete(rcvdFlows, flow.Cookie)
}
defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
-
- GetController().ProcessFlowModResultIndication(cntx, defaultSuccessFlowStatus)
} else {
// The flow exists at the controller but not at the device
// Push the flow to the device
@@ -315,7 +313,7 @@
if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flow); err != nil {
logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Reason": err.Error()})
}
- att.device.triggerFlowResultNotification(cntx, flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err)
+ att.device.triggerFlowResultNotification(cntx, flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err, true)
}
}
@@ -336,6 +334,11 @@
continue
}
+ if flag := GetController().IsFlowDelThresholdReached(cntx, strconv.FormatUint(flow.Cookie, 10), att.device.ID); flag {
+ logger.Warnw(ctx, "Flow delete threshold reached, skipping flow delete", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
+ continue
+ }
+
logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
// Create the flowMod structure and fill it out
flowMod := &ofp.OfpFlowMod{}
@@ -362,7 +365,7 @@
if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
logger.Errorw(ctx, "Flow Audit Delete Failed", log.Fields{"Reason": err.Error()})
}
- att.device.triggerFlowResultNotification(cntx, flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err)
+ att.device.triggerFlowResultNotification(cntx, flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err, true)
}
}
@@ -601,13 +604,11 @@
// AddMissingPorts to add the missing ports
func (att *AuditTablesTask) AddMissingPorts(cntx context.Context, mps map[uint32]*ofp.OfpPort) {
- logger.Infow(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
+ logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
addMissingPort := func(mp *ofp.OfpPort) {
logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
- // Error is ignored as it only drops duplicate ports
- logger.Debugw(ctx, "Calling AddPort", log.Fields{"No": mp.PortNo, "Name": mp.Name})
if err := att.device.AddPort(cntx, mp); err != nil {
logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
}
diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go
index c31d892..76e4558 100644
--- a/internal/pkg/controller/controller.go
+++ b/internal/pkg/controller/controller.go
@@ -72,7 +72,9 @@
rebootInProgressDevices map[string]string
deviceLock sync.RWMutex
rebootLock sync.Mutex
- deviceTableSyncDuration time.Duration
+ deviceTableSyncDuration time.Duration // Time interval between each cycle of audit task
+ maxFlowRetryDuration time.Duration // Maximum duration for which flows will be retried upon failures
+ maxFlowRetryAttempts uint32 // maxFlowRetryAttempt = maxFlowRetryDuration / deviceTableSyncDuration
RebootFlow bool
}
@@ -100,11 +102,26 @@
v.deviceTableSyncDuration = time.Duration(duration) * time.Second
}
+// SetMaxFlowRetryDuration - sets max flow retry interval
+func (v *VoltController) SetMaxFlowRetryDuration(duration int) {
+ v.maxFlowRetryDuration = time.Duration(duration) * time.Second
+}
+
+// SetMaxFlowRetryAttempts - sets max flow retry attempts
+func (v *VoltController) SetMaxFlowRetryAttempts() {
+ v.maxFlowRetryAttempts = uint32((v.maxFlowRetryDuration / v.deviceTableSyncDuration))
+}
+
// GetDeviceTableSyncDuration - returns configured device table sync duration
func (v *VoltController) GetDeviceTableSyncDuration() time.Duration {
return v.deviceTableSyncDuration
}
+// GetMaxFlowRetryAttempt - returns max flow retry attempst
+func (v *VoltController) GetMaxFlowRetryAttempt() uint32 {
+ return v.maxFlowRetryAttempts
+}
+
// AddDevice to add device
func (v *VoltController) AddDevice(cntx context.Context, config *intf.VPClientCfg) intf.IVPClient {
d := NewDevice(cntx, config.DeviceID, config.SerialNum, config.VolthaClient, config.SouthBoundID, config.MfrDesc, config.HwDesc, config.SwDesc)
@@ -249,6 +266,11 @@
v.app.ProcessFlowModResultIndication(cntx, flowStatus)
}
+// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
+func (v *VoltController) IsFlowDelThresholdReached(cntx context.Context, cookie string, device string) bool {
+ return v.app.IsFlowDelThresholdReached(cntx, cookie, device)
+}
+
// AddVPAgent to add the vpagent
func (v *VoltController) AddVPAgent(vep string, vpa *vpagent.VPAgent) {
v.vagent[vep] = vpa
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
index 618afae..bdac105 100644
--- a/internal/pkg/controller/device.go
+++ b/internal/pkg/controller/device.go
@@ -1059,12 +1059,12 @@
return false
}
-func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
flow, _ := d.GetFlow(cookie)
- d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
+ d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err, sendFlowNotif)
}
-func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
statusCode, statusMsg := infraerror.GetErrorInfo(err)
success := isFlowOperSuccess(statusCode, oper)
@@ -1108,6 +1108,8 @@
AdditionalData: bwDetails,
}
- logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
- GetController().ProcessFlowModResultIndication(cntx, flowResult)
+ if sendFlowNotif {
+ logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
+ GetController().ProcessFlowModResultIndication(cntx, flowResult)
+ }
}
diff --git a/internal/pkg/controller/device_test.go b/internal/pkg/controller/device_test.go
index 8f35aef..7089c64 100644
--- a/internal/pkg/controller/device_test.go
+++ b/internal/pkg/controller/device_test.go
@@ -170,7 +170,7 @@
db = dbintf
dbintf.EXPECT().PutFlow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
appMock.EXPECT().ProcessFlowModResultIndication(gomock.Any(), gomock.Any()).Times(1)
- d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err)
+ d.triggerFlowResultNotification(tt.args.cntx, tt.args.cookie, tt.args.flow, tt.args.oper, tt.args.bwDetails, tt.args.err, false)
})
}
}