[VOL-5402]-VGC all fixes till date from jan 2024
Change-Id: I2857e0ef9b1829a28c6e3ad04da96b826cb900b6
Signed-off-by: Akash Soni <akash.soni@radisys.com>
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
index bdac105..abf3108 100644
--- a/internal/pkg/controller/device.go
+++ b/internal/pkg/controller/device.go
@@ -199,8 +199,8 @@
func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
d.flowLock.RLock()
defer d.flowLock.RUnlock()
- logger.Debugw(ctx, "Get Flow", log.Fields{"Cookie": cookie})
flow, ok := d.flows[cookie]
+ logger.Debugw(ctx, "Get Flow", log.Fields{"Cookie": cookie})
return flow, ok
}
@@ -235,8 +235,11 @@
d.flowLock.Lock()
defer d.flowLock.Unlock()
logger.Debugw(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
- if _, ok := d.flows[flow.Cookie]; ok {
- return errors.New(ErrDuplicateFlow)
+ if dbFlow, ok := d.flows[flow.Cookie]; ok {
+ // In case of ONU reboot after flow delete failure, try to install flow in the device by checking for previous flow state
+ if dbFlow.State != of.FlowDelFailure {
+ return errors.New(ErrDuplicateFlow)
+ }
}
d.flows[flow.Cookie] = flow
d.AddFlowToDb(cntx, flow)
@@ -277,7 +280,7 @@
return false
} else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
if _, ok := d.flows[flow.OldCookie]; ok {
- logger.Debugw(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
+ logger.Warnw(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
return true
}
}
@@ -487,13 +490,14 @@
// Inform the application if the port is successfully added
func (d *Device) AddPort(cntx context.Context, mp *ofp.OfpPort) error {
d.portLock.Lock()
- defer d.portLock.Unlock()
id := mp.PortNo
name := mp.Name
if _, ok := d.PortsByID[id]; ok {
+ d.portLock.Unlock()
return errors.New(Duplicate_Port)
}
if _, ok := d.PortsByName[name]; ok {
+ d.portLock.Unlock()
return errors.New(Duplicate_Port)
}
@@ -501,6 +505,7 @@
d.PortsByID[id] = p
d.PortsByName[name] = p
d.WritePortToDb(cntx, p)
+ d.portLock.Unlock()
GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
return nil
@@ -533,6 +538,19 @@
return nil
}
+// CheckAndDeletePort deletes the port if the port name matches with VGC and one sent from voltha in OFPPR_DELETE
+func (d *Device) CheckAndDeletePort(cntx context.Context, portNo uint32, portName string) {
+ if p := d.GetPortByID(portNo); p != nil {
+ if p.Name != portName {
+ logger.Warnw(ctx, "Dropping Del Port event: Port name mismatch", log.Fields{"vgcPortName": p.Name, "ofpPortName": portName, "ID": p.ID})
+ return
+ }
+ if err := d.DelPort(cntx, portNo, portName); err != nil {
+ logger.Warnw(ctx, "DelPort Failed", log.Fields{"Port No": portNo, "Error": err})
+ }
+ }
+}
+
// UpdatePortByName is utility to update the port by Name
func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
d.portLock.Lock()
@@ -796,15 +814,11 @@
func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
if p := d.GetPortByName(portName); p != nil {
if p.ID != port {
- logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
- if p.State != PortStateDown {
- logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
- return
- }
- d.UpdatePortByName(cntx, portName, port)
- logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
+ logger.Warnw(ctx, "Port update indication received with mismatching ID", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
+ return
+ //Do not process port update received from change event, as we will only handle port updates during polling
}
- d.ProcessPortState(cntx, port, state)
+ d.ProcessPortState(cntx, port, state, portName)
}
}
@@ -824,7 +838,7 @@
// ProcessPortState deals with the change in port status and taking action
// based on the new state and the old state
-func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32) {
+func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32, portName string) {
if d.State != DeviceStateUP && !util.IsNniPort(port) {
logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
return
@@ -832,6 +846,10 @@
if p := d.GetPortByID(port); p != nil {
logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
+ if p.Name != portName {
+ logger.Warnw(ctx, "Dropping Port State processing: Port name does not match", log.Fields{"vgcPort": p.Name, "ofpPort": portName, "ID": port})
+ return
+ }
// Avoid blind initialization as the current tasks in the queue will be lost
// Eg: Service Del followed by Port Down - The flows will be dangling
// Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
@@ -1059,19 +1077,43 @@
return false
}
-func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
- flow, _ := d.GetFlow(cookie)
- d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err, sendFlowNotif)
+// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
+func (d *Device) IsFlowDelThresholdReached(flowCount uint32, cookie uint64) bool {
+ logger.Debugw(ctx, "Check flow delete threshold", log.Fields{"Cookie": cookie, "FlowCount": flowCount})
+ return flowCount >= GetController().GetMaxFlowRetryAttempt()
}
-func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
+// IsFlowAddThresholdReached - check if the attempts for flow add has reached threshold or not
+func (d *Device) IsFlowAddThresholdReached(flowCount uint32, cookie uint64) bool {
+ logger.Debugw(ctx, "Check flow add threshold", log.Fields{"Cookie": cookie, "FlowCount": flowCount})
+ return flowCount >= GetController().GetMaxFlowRetryAttempt()
+}
+
+func (d *Device) UpdateFlowCount(cntx context.Context, cookie uint64) {
+ if dbFlow, ok := d.flows[cookie]; ok {
+ dbFlow.FlowCount++
+ d.AddFlowToDb(cntx, dbFlow)
+ }
+}
+
+func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+ flow, _ := d.GetFlow(cookie)
+ d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
+}
+
+func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
statusCode, statusMsg := infraerror.GetErrorInfo(err)
success := isFlowOperSuccess(statusCode, oper)
- updateFlow := func(cookie uint64, state int, reason string) {
- if dbFlow, ok := d.GetFlow(cookie); ok {
+ updateFlowStatus := func(cookie uint64, state int, reason string) {
+ d.flowLock.Lock()
+ defer d.flowLock.Unlock()
+ if dbFlow, ok := d.flows[cookie]; ok {
dbFlow.State = uint8(state)
dbFlow.ErrorReason = reason
+ if state == of.FlowAddSuccess {
+ dbFlow.FlowCount = 0
+ }
d.AddFlowToDb(cntx, dbFlow)
}
}
@@ -1086,15 +1128,24 @@
state = of.FlowAddFailure
reason = statusMsg
}
- updateFlow(cookie, state, reason)
- logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
+ updateFlowStatus(cookie, state, reason)
+ logger.Debugw(ctx, "Add flow updated to DB", log.Fields{"Cookie": cookie, "State": state})
} else {
if success && flow != nil {
+ logger.Debugw(ctx, "Deleted flow from device and DB", log.Fields{"Cookie": cookie})
if err := d.DelFlow(cntx, flow); err != nil {
logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
}
} else if !success {
- updateFlow(cookie, of.FlowDelFailure, statusMsg)
+ if d.IsFlowDelThresholdReached(flow.FlowCount, flow.Cookie) {
+ logger.Debugw(ctx, "Deleted flow from device and DB after delete threshold reached", log.Fields{"Cookie": cookie})
+ if err := d.DelFlow(cntx, flow); err != nil {
+ logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
+ }
+ } else {
+ updateFlowStatus(cookie, of.FlowDelFailure, statusMsg)
+ logger.Debugw(ctx, "Delete flow updated to DB", log.Fields{"Cookie": cookie})
+ }
}
}
@@ -1108,8 +1159,6 @@
AdditionalData: bwDetails,
}
- if sendFlowNotif {
- logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
- GetController().ProcessFlowModResultIndication(cntx, flowResult)
- }
+ logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
+ GetController().ProcessFlowModResultIndication(cntx, flowResult)
}