VOL-2339: DHCP issue on second ONU, when two ONUs exist on same PON
- Process to_remove before to_add items in UpdateFlowsIncrementally
method
- Sequentially add flows for a given pon, onu, uni inorder to avoid
PON resource collisions. Used lock per key so that different
subscribers do not have to wait for each other to add flows.
- Wait for any flow removes for the subscriber to complete before
adding flows.
Change-Id: I095291b9a53fd0f19dc79f2b44923ec786a26d6e
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 7e09fc8..9c65cee 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -1081,14 +1081,15 @@
func (dh *DeviceHandler) UpdateFlowsIncrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("Received-incremental-flowupdate-in-device-handler", log.Fields{"deviceID": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
if flows != nil {
- for _, flow := range flows.ToAdd.Items {
- log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
- dh.flowMgr.AddFlow(flow, flowMetadata)
- }
for _, flow := range flows.ToRemove.Items {
log.Debug("Removing flow", log.Fields{"deviceId": device.Id, "flowToRemove": flow})
dh.flowMgr.RemoveFlow(flow)
}
+
+ for _, flow := range flows.ToAdd.Items {
+ log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
+ dh.flowMgr.AddFlow(flow, flowMetadata)
+ }
}
if groups != nil && flows != nil {
for _, flow := range flows.ToRemove.Items {
diff --git a/adaptercore/olt_platform.go b/adaptercore/olt_platform.go
index 0186712..8c7cf8d 100644
--- a/adaptercore/olt_platform.go
+++ b/adaptercore/olt_platform.go
@@ -259,5 +259,9 @@
onuID = OnuIDFromUniPortNum(uniPortNo)
uniID = UniIDFromPortNum(uniPortNo)
+ log.Debugw("flow extract info result",
+ log.Fields{"uniPortNo": uniPortNo, "ponIntf": ponIntf,
+ "onuID": onuID, "uniID": uniID, "inPort": inPort, "ethType": ethType})
+
return uniPortNo, ponIntf, onuID, uniID, inPort, ethType, nil
}
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 565c8c8..71a0850 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -26,6 +26,7 @@
"fmt"
"math/big"
"sync"
+ "time"
"github.com/opencord/voltha-lib-go/v2/pkg/flows"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
@@ -39,6 +40,7 @@
"github.com/opencord/voltha-protos/v2/go/voltha"
//deepcopy "github.com/getlantern/deepcopy"
+ "github.com/EagleChen/mapmutex"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -152,6 +154,18 @@
gemPort uint32
}
+type pendingFlowDeleteKey struct {
+ intfID uint32
+ onuID uint32
+ uniID uint32
+}
+
+type tpLockKey struct {
+ intfID uint32
+ onuID uint32
+ uniID uint32
+}
+
type schedQueue struct {
direction tp_pb.Direction
intfID uint32
@@ -174,6 +188,9 @@
packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
onuGemInfo map[uint32][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache
lockCache sync.RWMutex
+ pendingFlowDelete sync.Map
+ // The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
+ perUserFlowHandleLock *mapmutex.Mutex
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -204,6 +221,8 @@
flowMgr.loadFlowIDlistForGem(idx)
}
flowMgr.lockCache = sync.RWMutex{}
+ flowMgr.pendingFlowDelete = sync.Map{}
+ flowMgr.perUserFlowHandleLock = mapmutex.NewMapMutex()
log.Info("Initialization of flow manager success!!")
return &flowMgr
}
@@ -254,24 +273,32 @@
uni := getUniPortPath(intfID, int32(onuID), int32(uniID))
log.Debugw("Uni port name", log.Fields{"uni": uni})
- allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
- if allocID == 0 || gemPorts == nil || TpInst == nil {
- log.Error("alloc-id-gem-ports-tp-unavailable")
+
+ tpLockMapKey := tpLockKey{intfID, onuID, uniID}
+ if f.perUserFlowHandleLock.TryLock(tpLockMapKey) {
+ allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+ if allocID == 0 || gemPorts == nil || TpInst == nil {
+ log.Error("alloc-id-gem-ports-tp-unavailable")
+ f.perUserFlowHandleLock.Unlock(tpLockMapKey)
+ return
+ }
+ args := make(map[string]uint32)
+ args[IntfID] = intfID
+ args[OnuID] = onuID
+ args[UniID] = uniID
+ args[PortNo] = portNo
+ args[AllocID] = allocID
+
+ /* Flows can be added specific to gemport if p-bits are received.
+ * If no pbit mentioned then adding flows for all gemports
+ */
+ f.checkAndAddFlow(args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
+ f.perUserFlowHandleLock.Unlock(tpLockMapKey)
+ } else {
+ log.Errorw("failed to acquire per user flow handle lock",
+ log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID})
return
}
-
- /* Flows can be added specific to gemport if p-bits are received.
- * If no pbit mentioned then adding flows for all gemports
- */
-
- args := make(map[string]uint32)
- args[IntfID] = intfID
- args[OnuID] = onuID
- args[UniID] = uniID
- args[PortNo] = portNo
- args[AllocID] = allocID
-
- f.checkAndAddFlow(args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
}
// CreateSchedulerQueues creates traffic schedulers on the device with the given scheduler configuration and traffic shaping info
@@ -445,6 +472,7 @@
var allocIDs []uint32
var allgemPortIDs []uint32
var gemPortIDs []uint32
+ tpInstanceExists := false
allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(intfID, onuID, uniID)
allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(intfID, onuID, uniID)
@@ -466,6 +494,7 @@
f.resourceMgr.UpdateTechProfileIDForOnu(intfID, onuID, uniID, TpID)
} else {
log.Debugw("Tech-profile-instance-already-exist-for-given port-name", log.Fields{"uni": uni})
+ tpInstanceExists = true
}
if UsMeterID != 0 {
sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
@@ -485,13 +514,19 @@
}
allocID := techProfileInstance.UsScheduler.AllocID
- allocIDs = appendUnique(allocIDs, allocID)
-
for _, gem := range techProfileInstance.UpstreamGemPortAttributeList {
- allgemPortIDs = appendUnique(allgemPortIDs, gem.GemportID)
gemPortIDs = append(gemPortIDs, gem.GemportID)
}
+ if tpInstanceExists {
+ return allocID, gemPortIDs, techProfileInstance
+ }
+
+ allocIDs = appendUnique(allocIDs, allocID)
+ for _, gemPortID := range gemPortIDs {
+ allgemPortIDs = appendUnique(allgemPortIDs, gemPortID)
+ }
+
log.Debugw("Allocated Tcont and GEM ports", log.Fields{"allocIDs": allocIDs, "gemports": allgemPortIDs})
// Send Tconts and GEM ports to KV store
f.storeTcontsGEMPortsIntoKVStore(intfID, onuID, uniID, allocIDs, allgemPortIDs)
@@ -603,6 +638,10 @@
log.Debugw("Found pbit in the flow", log.Fields{"VlanPbit": vlanPbit})
}
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ log.Debug("Flow-exists--not-re-adding")
+ return
+ }
flowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
if err != nil {
log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
@@ -674,11 +713,15 @@
delete(classifier, VlanVid)
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ log.Debug("Flow-exists--not-re-adding")
+ return
+ }
- flowID, err = f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0 /*classifier[VLAN_PCP].(uint32)*/)
+ flowID, err = f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
if err != nil {
- log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
+ log.Errorw("flowId unavailable for UL DHCP", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
return
}
@@ -729,10 +772,8 @@
uplinkClassifier := make(map[string]interface{})
uplinkAction := make(map[string]interface{})
- downlinkClassifier := make(map[string]interface{})
- downlinkAction := make(map[string]interface{})
+
var upstreamFlow openoltpb2.Flow
- var downstreamFlow openoltpb2.Flow
var networkIntfID uint32
// Fill Classfier
@@ -742,6 +783,10 @@
// Fill action
uplinkAction[TrapToHost] = true
flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ log.Debug("Flow-exists--not-re-adding")
+ return
+ }
//Add Uplink EAPOL Flow
uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
if err != nil {
@@ -795,78 +840,7 @@
return
}
}
- // Dummy Downstream flow due to BAL 2.6 limitation
- {
- /* Add Downstream EAPOL Flow, Only for first EAP flow (BAL
- # requirement)
- # On one of the platforms (Broadcom BAL), when same DL classifier
- # vlan was used across multiple ONUs, eapol flow re-adds after
- # flow delete (cases of onu reboot/disable) fails.
- # In order to generate unique vlan, a combination of intf_id
- # onu_id and uniId is used.
- # uniId defaults to 0, so add 1 to it.
- */
- log.Debugw("Creating DL EAPOL flow with default vlan", log.Fields{"vlan": vlanID})
- specialVlanDlFlow := 4090 - intfID*onuID*(uniID+1)
- // Assert that we do not generate invalid vlans under no condition
- if specialVlanDlFlow <= 2 {
- log.Fatalw("invalid-vlan-generated", log.Fields{"vlan": specialVlanDlFlow})
- return
- }
- log.Debugw("specialVlanEAPOLDlFlow:", log.Fields{"dl_vlan": specialVlanDlFlow})
- // Fill Classfier
- downlinkClassifier[PacketTagType] = SingleTag
- downlinkClassifier[EthType] = uint32(EapEthType)
- downlinkClassifier[VlanVid] = uint32(specialVlanDlFlow)
- // Fill action
- downlinkAction[PushVlan] = true
- downlinkAction[VlanVid] = vlanID
- flowStoreCookie := getFlowStoreCookie(downlinkClassifier, gemPortID)
- downlinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
- if err != nil {
- log.Errorw("flowId unavailable for DL EAPOL",
- log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
- return
- }
- log.Debugw("Creating DL EAPOL flow",
- log.Fields{"dl_classifier": downlinkClassifier, "dl_action": downlinkAction, "downlinkFlowID": downlinkFlowID})
- if classifierProto = makeOpenOltClassifierField(downlinkClassifier); classifierProto == nil {
- log.Error("Error in making classifier protobuf for downlink flow")
- return
- }
- if actionProto = makeOpenOltActionField(downlinkAction); actionProto == nil {
- log.Error("Error in making action protobuf for dl flow")
- return
- }
- // Downstream flow in grpc protobuf
- downstreamFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
- OnuId: int32(onuID),
- UniId: int32(uniID),
- FlowId: downlinkFlowID,
- FlowType: Downstream,
- AllocId: int32(allocID),
- NetworkIntfId: int32(networkIntfID),
- GemportId: int32(gemPortID),
- Classifier: classifierProto,
- Action: actionProto,
- Priority: int32(logicalFlow.Priority),
- Cookie: logicalFlow.Cookie,
- PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &downstreamFlow); ok {
- log.Debug("EAPOL DL flow added to device successfully")
- flowCategory := ""
- flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(downstreamFlow.AccessIntfId,
- downstreamFlow.OnuId,
- downstreamFlow.UniId,
- downstreamFlow.FlowId,
- /* flowCategory, */
- flowsToKVStore); err != nil {
- log.Errorw("Error uploading EAPOL DL flow into KV store", log.Fields{"flow": upstreamFlow, "error": err})
- return
- }
- }
- }
+
log.Debugw("Added EAPOL flows to device successfully", log.Fields{"flow": logicalFlow})
}
@@ -967,6 +941,7 @@
log.Error("Invalid classfier object")
return 0
}
+ log.Debugw("generating flow store cookie", log.Fields{"classifier": classifier, "gemPortID": gemPortID})
var jsonData []byte
var flowString string
var err error
@@ -983,7 +958,9 @@
_, _ = h.Write([]byte(flowString))
hash := big.NewInt(0)
hash.SetBytes(h.Sum(nil))
- return hash.Uint64()
+ generatedHash := hash.Uint64()
+ log.Debugw("hash generated", log.Fields{"hash": generatedHash})
+ return generatedHash
}
func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32, logicalFlowID uint64) *[]rsrcMgr.FlowInfo {
@@ -1062,7 +1039,7 @@
st, _ := status.FromError(err)
if st.Code() == codes.AlreadyExists {
log.Debug("Flow already exists", log.Fields{"err": err, "deviceFlow": deviceFlow})
- return false
+ return true
}
if err != nil {
@@ -1270,6 +1247,29 @@
return nil
}
+func (f *OpenOltFlowMgr) deletePendingFlows(Intf uint32, onuID int32, uniID int32) {
+ pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
+ if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); ok {
+ if val.(int) > 0 {
+ pnFlDels := val.(int) - 1
+ if pnFlDels > 0 {
+ log.Debugw("flow delete succeeded, more pending",
+ log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID, "currPendingFlowCnt": pnFlDels})
+ f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
+ } else {
+ log.Debugw("all pending flow deletes handled, removing entry from map",
+ log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID})
+ f.pendingFlowDelete.Delete(pnFlDelKey)
+ }
+ }
+ } else {
+ log.Debugw("no pending delete flows found",
+ log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID})
+
+ }
+
+}
+
//clearResources clears pon resources in kv store and the device
func (f *OpenOltFlowMgr) clearResources(flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
gemPortID int32, flowID uint32, flowDirection string,
@@ -1288,6 +1288,23 @@
// between DS and US.
f.updateFlowInfoToKVStore(int32(Intf), int32(onuID), int32(uniID), flowID, &updatedFlows)
if len(updatedFlows) == 0 {
+ // Do this for subscriber flows only (not trap from NNI flows)
+ if onuID != -1 && uniID != -1 {
+ pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
+ if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
+ log.Debugw("creating entry for pending flow delete",
+ log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID})
+ f.pendingFlowDelete.Store(pnFlDelKey, 1)
+ } else {
+ pnFlDels := val.(int) + 1
+ log.Debugw("updating flow delete entry",
+ log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID, "currPendingFlowCnt": pnFlDels})
+ f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
+ }
+
+ defer f.deletePendingFlows(Intf, onuID, uniID)
+ }
+
log.Debugw("Releasing flow Id to resource manager", log.Fields{"Intf": Intf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
f.resourceMgr.FreeFlowID(Intf, int32(onuID), int32(uniID), flowID)
@@ -1369,6 +1386,7 @@
log.Error(err)
return
}
+
onuID = int32(onu)
uniID = int32(uni)
@@ -1451,6 +1469,24 @@
return
}
+func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,
+ uniID uint32, ch chan bool) {
+ pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
+ for {
+ select {
+ case <-time.After(20 * time.Millisecond):
+ if flowDelRefCnt, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok || flowDelRefCnt == 0 {
+ log.Debug("pending flow deletes completed")
+ ch <- true
+ return
+ }
+ case <-ctx.Done():
+ log.Error("flow delete wait handler routine canceled")
+ return
+ }
+ }
+}
+
// AddFlow add flow to device
func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
classifierInfo := make(map[string]interface{})
@@ -1514,7 +1550,26 @@
log.Debugw("Downstream-flow-meter-id", log.Fields{"DsMeterID": DsMeterID})
}
- f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+
+ pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
+ if _, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
+ log.Debugw("no pending flows found, going ahead with flow install", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+ f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+ } else {
+ ctx := context.Background()
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ pendingFlowDelComplete := make(chan bool)
+ go f.waitForFlowDeletesToCompleteForOnu(ctx, intfID, onuID, uniID, pendingFlowDelComplete)
+ select {
+ case <-pendingFlowDelComplete:
+ log.Debugw("all pending flow deletes completed", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+ f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+
+ case <-time.After(10 * time.Second):
+ log.Errorw("pending flow deletes not completed after timeout", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+ }
+ }
}
//sendTPDownloadMsgToChild send payload