VOL-1943 Adding multi tcont support to openolt adapter
Change-Id: Ibdc18b9c2f1cac3abbc43ba77512484d4574347b
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 9766484..f0e81f6 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -136,6 +136,16 @@
VlanvIDMask = 0xFFF
//MaxPonPorts constant
MaxPonPorts = 16
+ //IntfID constant
+ IntfID = "intfId"
+ //OnuID constant
+ OnuID = "onuId"
+ //UniID constant
+ UniID = "uniId"
+ //PortNo constant
+ PortNo = "portNo"
+ //AllocID constant
+ AllocID = "allocId"
)
type onuInfo struct {
@@ -160,17 +170,30 @@
logicalPort uint32
}
+type schedQueue struct {
+ direction tp_pb.Direction
+ intfID uint32
+ onuID uint32
+ uniID uint32
+ tpID uint32
+ uniPort uint32
+ tpInst *tp.TechProfile
+ meterID uint32
+ flowMetadata *voltha.FlowMetadata
+}
+
//OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
type OpenOltFlowMgr struct {
- techprofile []tp.TechProfileIf
- deviceHandler *DeviceHandler
- resourceMgr *rsrcMgr.OpenOltResourceMgr
- onuIds map[onuIDKey]onuInfo //OnuId -> OnuInfo
- onuSerialNumbers map[string]onuInfo //onu serial_number (string) -> OnuInfo
- onuGemPortIds map[gemPortKey]onuInfo //GemPortId -> OnuInfo
- packetInGemPort map[packetInInfoKey]uint32 //packet in gem port
- storedDeviceFlows []ofp.OfpFlowStats /* Required during deletion to obtain device flows from logical flows */
- onuIdsLock sync.RWMutex
+ techprofile []tp.TechProfileIf
+ deviceHandler *DeviceHandler
+ resourceMgr *rsrcMgr.OpenOltResourceMgr
+ onuIds map[onuIDKey]onuInfo //OnuId -> OnuInfo
+ onuSerialNumbers map[string]onuInfo //onu serial_number (string) -> OnuInfo
+ onuGemPortIds map[gemPortKey]onuInfo //GemPortId -> OnuInfo
+ packetInGemPort map[packetInInfoKey]uint32 //packet in gem port
+ storedDeviceFlows []ofp.OfpFlowStats /* Required during deletion to obtain device flows from logical flows */
+ onuIdsLock sync.RWMutex
+ flowsUsedByGemPort map[gemPortKey][]uint32 //gem port id to flow ids
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -188,6 +211,7 @@
flowMgr.onuSerialNumbers = make(map[string]onuInfo)
flowMgr.onuGemPortIds = make(map[gemPortKey]onuInfo)
flowMgr.packetInGemPort = make(map[packetInInfoKey]uint32)
+ flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
flowMgr.onuIdsLock = sync.RWMutex{}
log.Info("Initialization of flow manager success!!")
return &flowMgr
@@ -216,15 +240,21 @@
deviceFlow.FlowId, deviceFlow.FlowType))
storedFlow.Cookie = flowFromCore.Id
f.storedDeviceFlows = append(f.storedDeviceFlows, storedFlow)
+ gemPK := gemPortKey{uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId)}
+ flowIDList, ok := f.flowsUsedByGemPort[gemPK]
+ if !ok {
+ flowIDList = []uint32{deviceFlow.FlowId}
+ }
+ flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
+ f.flowsUsedByGemPort[gemPK] = flowIDList
log.Debugw("updated Stored flow info", log.Fields{"storedDeviceFlows": f.storedDeviceFlows})
}
func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32,
classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) {
- var allocID []uint32
+ var allocID uint32
var gemPorts []uint32
- var gemPort uint32
var TpInst *tp.TechProfile
log.Infow("Dividing flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "portNo": portNo,
@@ -240,7 +270,7 @@
uni := getUniPortPath(intfID, onuID, uniID)
log.Debugw("Uni port name", log.Fields{"uni": uni})
allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
- if allocID == nil || gemPorts == nil || TpInst == nil {
+ if allocID == 0 || gemPorts == nil || TpInst == nil {
log.Error("alloc-id-gem-ports-tp-unavailable")
return
}
@@ -250,22 +280,23 @@
*/
args := make(map[string]uint32)
- args["intfId"] = intfID
- args["onuId"] = onuID
- args["uniId"] = uniID
- args["portNo"] = portNo
- args["allocId"] = allocID[0]
+ args[IntfID] = intfID
+ args[OnuID] = onuID
+ args[UniID] = uniID
+ args[PortNo] = portNo
+ args[AllocID] = allocID
- f.checkAndAddFlow(args, classifierInfo, actionInfo, flow, gemPort, intfID, onuID, uniID, portNo, TpInst, allocID, gemPorts, TpID, uni)
+ f.checkAndAddFlow(args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
}
// CreateSchedulerQueues creates traffic schedulers on the device with the given scheduler configuration and traffic shaping info
-func (f *OpenOltFlowMgr) CreateSchedulerQueues(Dir tp_pb.Direction, IntfID uint32, OnuID uint32, UniID uint32, UniPort uint32, TpInst *tp.TechProfile, MeterID uint32, flowMetadata *voltha.FlowMetadata) error {
+func (f *OpenOltFlowMgr) CreateSchedulerQueues(sq schedQueue) error {
- log.Debugw("CreateSchedulerQueues", log.Fields{"Dir": Dir, "IntfID": IntfID, "OnuID": OnuID,
- "UniID": UniID, "MeterID": MeterID, "TpInst": *TpInst, "flowMetadata": flowMetadata})
+ log.Debugw("CreateSchedulerQueues", log.Fields{"Dir": sq.direction, "IntfID": sq.intfID,
+ "OnuID": sq.onuID, "UniID": sq.uniID, "TpID": sq.tpID, "MeterID": sq.meterID,
+ "TpInst": sq.tpInst, "flowMetadata": sq.flowMetadata})
- Direction, err := verifyMeterIDAndGetDirection(MeterID, Dir)
+ Direction, err := verifyMeterIDAndGetDirection(sq.meterID, sq.direction)
if err != nil {
return err
}
@@ -276,29 +307,29 @@
*/
var SchedCfg *tp_pb.SchedulerConfig
- KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(Direction, IntfID, OnuID, UniID)
+ KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
- log.Error("Failed to get meter for intf %d, onuid %d, uniid %d", IntfID, OnuID, UniID)
+ log.Error("Failed to get meter for intf %d, onuid %d, uniid %d", sq.intfID, sq.onuID, sq.uniID)
return err
}
if KvStoreMeter != nil {
- if KvStoreMeter.MeterId == MeterID {
+ if KvStoreMeter.MeterId == sq.meterID {
log.Debug("Scheduler already created for upstream")
return nil
}
- log.Errorw("Dynamic meter update not supported", log.Fields{"KvStoreMeterId": KvStoreMeter.MeterId, "MeterID-in-flow": MeterID})
+ log.Errorw("Dynamic meter update not supported", log.Fields{"KvStoreMeterId": KvStoreMeter.MeterId, "MeterID-in-flow": sq.meterID})
return errors.New("invalid-meter-id-in-flow")
}
- log.Debugw("Meter-does-not-exist-Creating-new", log.Fields{"MeterID": MeterID, "Direction": Direction})
- if Dir == tp_pb.Direction_UPSTREAM {
- SchedCfg = f.techprofile[IntfID].GetUsScheduler(TpInst)
- } else if Dir == tp_pb.Direction_DOWNSTREAM {
- SchedCfg = f.techprofile[IntfID].GetDsScheduler(TpInst)
+ log.Debugw("Meter-does-not-exist-Creating-new", log.Fields{"MeterID": sq.meterID, "Direction": Direction})
+ if sq.direction == tp_pb.Direction_UPSTREAM {
+ SchedCfg = f.techprofile[sq.intfID].GetUsScheduler(sq.tpInst)
+ } else if sq.direction == tp_pb.Direction_DOWNSTREAM {
+ SchedCfg = f.techprofile[sq.intfID].GetDsScheduler(sq.tpInst)
}
var meterConfig *ofp.OfpMeterConfig
- if flowMetadata != nil {
- for _, meter := range flowMetadata.Meters {
- if MeterID == meter.MeterId {
+ if sq.flowMetadata != nil {
+ for _, meter := range sq.flowMetadata.Meters {
+ if sq.meterID == meter.MeterId {
meterConfig = meter
log.Debugw("Found-meter-config-from-flowmetadata", log.Fields{"meterConfig": meterConfig})
break
@@ -308,10 +339,11 @@
log.Error("Flow-metadata-is-not-present-in-flow")
}
if meterConfig == nil {
- log.Errorw("Could-not-get-meterbands-from-flowMetadata", log.Fields{"flowMetadata": flowMetadata, "MeterID": MeterID})
+ log.Errorw("Could-not-get-meterbands-from-flowMetadata", log.Fields{"flowMetadata": sq.flowMetadata,
+ "MeterID": sq.meterID})
return errors.New("failed-to-get-meter-from-flowMetadata")
} else if len(meterConfig.Bands) < MaxMeterBand {
- log.Errorw("Invalid-number-of-bands-in-meter", log.Fields{"Bands": meterConfig.Bands, "MeterID": MeterID})
+ log.Errorw("Invalid-number-of-bands-in-meter", log.Fields{"Bands": meterConfig.Bands, "MeterID": sq.meterID})
return errors.New("invalid-number-of-bands-in-meter")
}
cir := meterConfig.Bands[0].Rate
@@ -322,23 +354,23 @@
pbs := cbs + ebs
TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
- TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[IntfID].GetTrafficScheduler(TpInst, SchedCfg, TrafficShaping)}
+ TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst, SchedCfg, TrafficShaping)}
log.Debugw("Sending Traffic scheduler create to device", log.Fields{"Direction": Direction, "TrafficScheds": TrafficSched})
if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
- IntfId: IntfID, OnuId: OnuID,
- UniId: UniID, PortNo: UniPort,
+ IntfId: sq.intfID, OnuId: sq.onuID,
+ UniId: sq.uniID, PortNo: sq.uniPort,
TrafficScheds: TrafficSched}); err != nil {
log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
return err
}
// On receiving the CreateTrafficQueues request, the driver should create corresponding
// downstream queues.
- trafficQueues := f.techprofile[IntfID].GetTrafficQueues(TpInst, Dir)
+ trafficQueues := f.techprofile[sq.intfID].GetTrafficQueues(sq.tpInst, sq.direction)
log.Debugw("Sending Traffic Queues create to device", log.Fields{"Direction": Direction, "TrafficQueues": trafficQueues})
if _, err := f.deviceHandler.Client.CreateTrafficQueues(context.Background(),
- &tp_pb.TrafficQueues{IntfId: IntfID, OnuId: OnuID,
- UniId: UniID, PortNo: UniPort,
+ &tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
+ UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: trafficQueues}); err != nil {
log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
return err
@@ -347,8 +379,8 @@
/* After we successfully applied the scheduler configuration on the OLT device,
* store the meter id on the KV store, for further reference.
*/
- if err := f.resourceMgr.UpdateMeterIDForOnu(Direction, IntfID, OnuID, UniID, meterConfig); err != nil {
- log.Error("Failed to update meter id for onu %d, meterid %d", OnuID, MeterID)
+ if err := f.resourceMgr.UpdateMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterConfig); err != nil {
+ log.Error("Failed to update meter id for onu %d, meterid %d", sq.onuID, sq.meterID)
return err
}
log.Debugw("updated-meter-info into KV store successfully", log.Fields{"Direction": Direction,
@@ -357,27 +389,28 @@
}
// RemoveSchedulerQueues removes the traffic schedulers from the device based on the given scheduler configuration and traffic shaping info
-func (f *OpenOltFlowMgr) RemoveSchedulerQueues(Dir tp_pb.Direction, IntfID uint32, OnuID uint32, UniID uint32, UniPort uint32, TpInst *tp.TechProfile) error {
+func (f *OpenOltFlowMgr) RemoveSchedulerQueues(sq schedQueue) error {
var Direction string
var SchedCfg *tp_pb.SchedulerConfig
var err error
- log.Debugw("Removing schedulers and Queues in OLT", log.Fields{"Direction": Dir, "IntfID": IntfID, "OnuID": OnuID, "UniID": UniID, "UniPort": UniPort})
- if Dir == tp_pb.Direction_UPSTREAM {
- SchedCfg = f.techprofile[IntfID].GetUsScheduler(TpInst)
+ log.Debugw("Removing schedulers and Queues in OLT", log.Fields{"Direction": sq.direction, "IntfID": sq.intfID,
+ "OnuID": sq.onuID, "UniID": sq.uniID, "UniPort": sq.uniPort})
+ if sq.direction == tp_pb.Direction_UPSTREAM {
+ SchedCfg = f.techprofile[sq.intfID].GetUsScheduler(sq.tpInst)
Direction = "upstream"
- } else if Dir == tp_pb.Direction_DOWNSTREAM {
- SchedCfg = f.techprofile[IntfID].GetDsScheduler(TpInst)
+ } else if sq.direction == tp_pb.Direction_DOWNSTREAM {
+ SchedCfg = f.techprofile[sq.intfID].GetDsScheduler(sq.tpInst)
Direction = "downstream"
}
- KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(Direction, IntfID, OnuID, UniID)
+ KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
- log.Errorf("Failed to get Meter for Onu %d", OnuID)
+ log.Errorf("Failed to get Meter for Onu %d", sq.onuID)
return err
}
if KVStoreMeter == nil {
- log.Debugw("No-meter-has-been-installed-yet", log.Fields{"direction": Direction, "IntfID": IntfID, "OnuID": OnuID, "UniID": UniID})
+ log.Debugw("No-meter-has-been-installed-yet", log.Fields{"direction": Direction, "IntfID": sq.intfID, "OnuID": sq.onuID, "UniID": sq.uniID})
return nil
}
cir := KVStoreMeter.Bands[0].Rate
@@ -389,20 +422,20 @@
TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
- TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[IntfID].GetTrafficScheduler(TpInst, SchedCfg, TrafficShaping)}
- TrafficQueues := f.techprofile[IntfID].GetTrafficQueues(TpInst, Dir)
+ TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst, SchedCfg, TrafficShaping)}
+ TrafficQueues := f.techprofile[sq.intfID].GetTrafficQueues(sq.tpInst, sq.direction)
if _, err = f.deviceHandler.Client.RemoveTrafficQueues(context.Background(),
- &tp_pb.TrafficQueues{IntfId: IntfID, OnuId: OnuID,
- UniId: UniID, PortNo: UniPort,
+ &tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
+ UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: TrafficQueues}); err != nil {
log.Errorw("Failed to remove traffic queues", log.Fields{"error": err})
return err
}
log.Debug("Removed traffic queues successfully")
if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
- IntfId: IntfID, OnuId: OnuID,
- UniId: UniID, PortNo: UniPort,
+ IntfId: sq.intfID, OnuId: sq.onuID,
+ UniId: sq.uniID, PortNo: sq.uniPort,
TrafficScheds: TrafficSched}); err != nil {
log.Errorw("failed to remove traffic schedulers", log.Fields{"error": err})
return err
@@ -413,31 +446,30 @@
/* After we successfully remove the scheduler configuration on the OLT device,
* delete the meter id on the KV store.
*/
- err = f.resourceMgr.RemoveMeterIDForOnu(Direction, IntfID, OnuID, UniID)
+ err = f.resourceMgr.RemoveMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
- log.Errorf("Failed to remove meter for onu %d, meter id %d", OnuID, KVStoreMeter.MeterId)
+ log.Errorf("Failed to remove meter for onu %d, meter id %d", sq.onuID, KVStoreMeter.MeterId)
return err
}
log.Debugw("Removed-meter-from-KV-store successfully", log.Fields{"MeterId": KVStoreMeter.MeterId, "dir": Direction})
return err
}
-// This function allocates tconts and GEM ports for an ONU, currently one TCONT is supported per ONU
-func (f *OpenOltFlowMgr) createTcontGemports(intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) ([]uint32, []uint32, *tp.TechProfile) {
- var allocID []uint32
+// This function allocates tconts and GEM ports for an ONU
+func (f *OpenOltFlowMgr) createTcontGemports(intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) (uint32, []uint32, *tp.TechProfile) {
+ var allocIDs []uint32
+ var allgemPortIDs []uint32
var gemPortIDs []uint32
- //If we already have allocated earlier for this onu, render them
- if tcontID := f.resourceMgr.GetCurrentAllocIDForOnu(intfID, onuID, uniID); tcontID != 0 {
- allocID = append(allocID, tcontID)
- }
- gemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(intfID, onuID, uniID)
+
+ allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(intfID, onuID, uniID)
+ allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(intfID, onuID, uniID)
tpPath := f.getTPpath(intfID, uni, TpID)
// Check tech profile instance already exists for derived port name
techProfileInstance, err := f.techprofile[intfID].GetTPInstanceFromKVStore(TpID, tpPath)
if err != nil { // This should not happen, something wrong in KV backend transaction
log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": TpID, "path": tpPath})
- return nil, nil, nil
+ return 0, nil, nil
}
log.Debug("Creating New TConts and Gem ports", log.Fields{"pon": intfID, "onu": onuID, "uni": uniID})
@@ -447,35 +479,40 @@
techProfileInstance = f.techprofile[intfID].CreateTechProfInstance(TpID, uni, intfID)
if techProfileInstance == nil {
log.Error("Tech-profile-instance-creation-failed")
- return nil, nil, nil
+ return 0, nil, nil
}
f.resourceMgr.UpdateTechProfileIDForOnu(intfID, onuID, uniID, TpID)
} else {
log.Debugw("Tech-profile-instance-already-exist-for-given port-name", log.Fields{"uni": uni})
}
if UsMeterID != 0 {
- if err := f.CreateSchedulerQueues(tp_pb.Direction_UPSTREAM, intfID, onuID, uniID, uniPort, techProfileInstance, UsMeterID, flowMetadata); err != nil {
+ sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
+ uniPort: uniPort, tpInst: techProfileInstance, meterID: UsMeterID, flowMetadata: flowMetadata}
+ if err := f.CreateSchedulerQueues(sq); err != nil {
log.Errorw("CreateSchedulerQueues Failed-upstream", log.Fields{"error": err, "meterID": UsMeterID})
- return nil, nil, nil
+ return 0, nil, nil
}
}
if DsMeterID != 0 {
- if err := f.CreateSchedulerQueues(tp_pb.Direction_DOWNSTREAM, intfID, onuID, uniID, uniPort, techProfileInstance, DsMeterID, flowMetadata); err != nil {
+ sq := schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
+ uniPort: uniPort, tpInst: techProfileInstance, meterID: DsMeterID, flowMetadata: flowMetadata}
+ if err := f.CreateSchedulerQueues(sq); err != nil {
log.Errorw("CreateSchedulerQueues Failed-downstream", log.Fields{"error": err, "meterID": DsMeterID})
- return nil, nil, nil
+ return 0, nil, nil
}
}
- if len(allocID) == 0 { // Created TCONT first time
- allocID = append(allocID, techProfileInstance.UsScheduler.AllocID)
+
+ allocID := techProfileInstance.UsScheduler.AllocID
+ allocIDs = appendUnique(allocIDs, allocID)
+
+ for _, gem := range techProfileInstance.UpstreamGemPortAttributeList {
+ allgemPortIDs = appendUnique(allgemPortIDs, gem.GemportID)
+ gemPortIDs = append(gemPortIDs, gem.GemportID)
}
- if len(gemPortIDs) == 0 { // Create GEM ports first time
- for _, gem := range techProfileInstance.UpstreamGemPortAttributeList {
- gemPortIDs = append(gemPortIDs, gem.GemportID)
- }
- }
- log.Debugw("Allocated Tcont and GEM ports", log.Fields{"allocID": allocID, "gemports": gemPortIDs})
+
+ log.Debugw("Allocated Tcont and GEM ports", log.Fields{"allocIDs": allocIDs, "gemports": allgemPortIDs})
// Send Tconts and GEM ports to KV store
- f.storeTcontsGEMPortsIntoKVStore(intfID, onuID, uniID, allocID, gemPortIDs)
+ f.storeTcontsGEMPortsIntoKVStore(intfID, onuID, uniID, allocIDs, allgemPortIDs)
return allocID, gemPortIDs, techProfileInstance
}
@@ -577,13 +614,13 @@
log.Debugw("Adding HSIA flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "classifier": classifier,
"action": action, "direction": direction, "allocId": allocID, "gemPortId": gemPortID,
"logicalFlow": *logicalFlow})
- var vlanPit uint32
+ var vlanPbit uint32
if _, ok := classifier[VlanPcp]; ok {
- vlanPit = classifier[VlanPcp].(uint32)
- log.Debugw("Found pbit in the flow", log.Fields{"vlan_pit": vlanPit})
+ vlanPbit = classifier[VlanPcp].(uint32)
+ log.Debugw("Found pbit in the flow", log.Fields{"VlanPbit": vlanPbit})
}
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
- flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, HsiaFlow, vlanPit)
+ flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
if err != nil {
log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
return
@@ -901,10 +938,24 @@
return f.techprofile[intfID].GetTechProfileInstanceKVPath(TpID, uni)
}
-// DeleteTechProfileInstance removes the tech profile instance from persistent storage
-func (f *OpenOltFlowMgr) DeleteTechProfileInstance(intfID uint32, onuID uint32, uniID uint32, sn string) error {
- tpID := f.resourceMgr.GetTechProfileIDForOnu(intfID, onuID, uniID)
+// DeleteTechProfileInstances removes the tech profile instances from persistent storage
+func (f *OpenOltFlowMgr) DeleteTechProfileInstances(intfID uint32, onuID uint32, uniID uint32, sn string) error {
+ tpIDList := f.resourceMgr.GetTechProfileIDForOnu(intfID, onuID, uniID)
uniPortName := fmt.Sprintf("pon-{%d}/onu-{%d}/uni-{%d}", intfID, onuID, uniID)
+ for _, tpID := range tpIDList {
+ if err := f.DeleteTechProfileInstance(intfID, onuID, uniID, uniPortName, tpID); err != nil {
+ log.Debugw("Failed-to-delete-tp-instance-from-kv-store", log.Fields{"tp-id": tpID, "uni-port-name": uniPortName})
+ return err
+ }
+ }
+ return nil
+}
+
+// DeleteTechProfileInstance removes the tech profile instance from persistent storage
+func (f *OpenOltFlowMgr) DeleteTechProfileInstance(intfID uint32, onuID uint32, uniID uint32, uniPortName string, tpID uint32) error {
+ if uniPortName == "" {
+ uniPortName = fmt.Sprintf("pon-{%d}/onu-{%d}/uni-{%d}", intfID, onuID, uniID)
+ }
if err := f.techprofile[intfID].DeleteTechProfileInstance(tpID, uniPortName); err != nil {
log.Debugw("Failed-to-delete-tp-instance-from-kv-store", log.Fields{"tp-id": tpID, "uni-port-name": uniPortName})
return err
@@ -1011,7 +1062,7 @@
st, _ := status.FromError(err)
if st.Code() == codes.AlreadyExists {
- log.Debug("Flow already exixts", log.Fields{"err": err, "deviceFlow": deviceFlow})
+ log.Debug("Flow already exists", log.Fields{"err": err, "deviceFlow": deviceFlow})
return false
}
@@ -1192,11 +1243,12 @@
flowsInfo := f.resourceMgr.GetFlowIDInfo(ponIntf, onuID, uniID, flowID)
if flowsInfo == nil {
- log.Debugw("No FlowInfo found found in KV store",
+ log.Debugw("No FlowInfo found in KV store",
log.Fields{"ponIntf": ponIntf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
return
}
var updatedFlows []rsrcMgr.FlowInfo
+ var gemPortID int32
for _, flow := range *flowsInfo {
updatedFlows = append(updatedFlows, flow)
@@ -1206,11 +1258,14 @@
if flowDirection == storedFlow.Flow.FlowType {
//Remove the Flow from FlowInfo
log.Debugw("Removing flow to be deleted", log.Fields{"flow": storedFlow})
+ gemPortID = storedFlow.Flow.GemportId
updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
break
}
}
+ tpID := getTpIDFromFlow(flow)
+
if len(updatedFlows) >= 0 {
// There are still flows referencing the same flow_id.
// So the flow should not be freed yet.
@@ -1220,33 +1275,55 @@
if len(updatedFlows) == 0 {
log.Debugw("Releasing flow Id to resource manager", log.Fields{"ponIntf": ponIntf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
f.resourceMgr.FreeFlowID(ponIntf, int32(onuID), int32(uniID), flowID)
- }
- }
- flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ponIntf, onuID, uniID)
- if len(flowIds) == 0 {
- log.Debugf("Flow count for subscriber %d is zero", onuID)
- kvstoreTpID := f.resourceMgr.GetTechProfileIDForOnu(ponIntf, onuID, uniID)
- if kvstoreTpID == 0 {
- log.Warnw("Could-not-find-techprofile-tableid-for-uni", log.Fields{"ponIntf": ponIntf, "onuId": onuID, "uniId": uniID})
- return
- }
- uni := getUniPortPath(ponIntf, onuID, uniID)
- tpPath := f.getTPpath(ponIntf, uni, kvstoreTpID)
- log.Debugw("Getting-techprofile-instance-for-subscriber", log.Fields{"TP-PATH": tpPath})
- techprofileInst, err := f.techprofile[ponIntf].GetTPInstanceFromKVStore(kvstoreTpID, tpPath)
- if err != nil { // This should not happen, something wrong in KV backend transaction
- log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": 20, "path": tpPath})
- return
- }
- if techprofileInst == nil {
- log.Errorw("Tech-profile-instance-does-not-exist-in-KV Store", log.Fields{"tpPath": tpPath})
- return
- }
- f.RemoveSchedulerQueues(tp_pb.Direction_UPSTREAM, ponIntf, onuID, uniID, portNum, techprofileInst)
- f.RemoveSchedulerQueues(tp_pb.Direction_DOWNSTREAM, ponIntf, onuID, uniID, portNum, techprofileInst)
- } else {
- log.Debugf("Flow ids for subscriber", log.Fields{"onu": onuID, "flows": flowIds})
+ uni := getUniPortPath(ponIntf, onuID, uniID)
+ tpPath := f.getTPpath(ponIntf, uni, tpID)
+ log.Debugw("Getting-techprofile-instance-for-subscriber", log.Fields{"TP-PATH": tpPath})
+ techprofileInst, err := f.techprofile[ponIntf].GetTPInstanceFromKVStore(tpID, tpPath)
+ if err != nil { // This should not happen, something wrong in KV backend transaction
+ log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": 20, "path": tpPath})
+ return
+ }
+ if techprofileInst == nil {
+ log.Errorw("Tech-profile-instance-does-not-exist-in-KV Store", log.Fields{"tpPath": tpPath})
+ return
+ }
+
+ gemPK := gemPortKey{ponIntf, uint32(gemPortID)}
+ if f.isGemPortUsedByAnotherFlow(gemPK) {
+ flowIDs := f.flowsUsedByGemPort[gemPK]
+ for i, flowIDinMap := range flowIDs {
+ if flowIDinMap == flowID {
+ flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
+ f.flowsUsedByGemPort[gemPK] = flowIDs
+ break
+ }
+ }
+ log.Debugw("Gem port id is still used by other flows", log.Fields{"gemPortID": gemPortID, "usedByFlows": flowIDs})
+ return
+ }
+
+ log.Debugf("Gem port id %d is not used by another flow - releasing the gem port", gemPortID)
+ f.resourceMgr.RemoveGemPortIDForOnu(ponIntf, onuID, uniID, uint32(gemPortID))
+ // TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
+ // But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
+ f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(uint32(gemPortID), ponIntf)
+ f.onuIdsLock.Lock()
+ delete(f.flowsUsedByGemPort, gemPK)
+ delete(f.onuGemPortIds, gemPK)
+ f.resourceMgr.FreeGemPortID(ponIntf, onuID, uniID, uint32(gemPortID))
+ f.onuIdsLock.Unlock()
+
+ ok, _ := f.isTechProfileUsedByAnotherGem(ponIntf, onuID, uniID, techprofileInst, uint32(gemPortID))
+ if !ok {
+ f.resourceMgr.RemoveTechProfileIDForOnu(ponIntf, onuID, uniID, tpID)
+ f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: ponIntf, onuID: onuID, uniID: uniID, tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
+ f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: ponIntf, onuID: onuID, uniID: uniID, tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
+ f.DeleteTechProfileInstance(ponIntf, onuID, uniID, "", tpID)
+ f.resourceMgr.FreeAllocID(ponIntf, onuID, uniID, techprofileInst.UsScheduler.AllocID)
+ // TODO: Send a "Delete TechProfile" message to ONU to do its own clean up on ONU OMCI stack
+ }
+ }
}
}
@@ -1335,26 +1412,8 @@
f.deviceHandler.AddUniPortToOnu(intfID, onuID, portNo)
- /* Metadata 8 bytes:
- Most Significant 2 Bytes = Inner VLAN
- Next 2 Bytes = Tech Profile ID(TPID)
- Least Significant 4 Bytes = Port ID
- Flow Metadata carries Tech-Profile (TP) ID and is mandatory in all
- subscriber related flows.
- */
- metadata := flows.GetMetadataFromWriteMetadataAction(flow)
- if metadata == 0 {
- log.Error("Metadata is not present in flow which is mandatory")
- return
- }
- TpID := flows.GetTechProfileIDFromWriteMetaData(metadata)
- kvstoreTpID := f.resourceMgr.GetTechProfileIDForOnu(intfID, onuID, uniID)
- if kvstoreTpID == 0 {
- log.Debugf("tpid-not-present-in-kvstore, using tp id %d from flow metadata", TpID)
- } else if kvstoreTpID != uint32(TpID) {
- log.Error(" Tech-profile-updates-not-supported", log.Fields{"Tpid-in-flow": TpID, "kvstore-TpId": kvstoreTpID})
- return
- }
+ TpID := getTpIDFromFlow(flow)
+
log.Debugw("TPID for this subcriber", log.Fields{"TpId": TpID, "pon": intfID, "onuID": onuID, "uniID": uniID})
if IsUpstream(actionInfo[Output].(uint32)) {
UsMeterID = flows.GetMeterIdFromFlow(flow)
@@ -1592,8 +1651,14 @@
}
func (f *OpenOltFlowMgr) checkAndAddFlow(args map[string]uint32, classifierInfo map[string]interface{},
- actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, gemPort, intfID, onuID, uniID, portNo uint32,
- TpInst *tp.TechProfile, allocID []uint32, gemPorts []uint32, TpID uint32, uni string) {
+ actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpInst *tp.TechProfile, gemPorts []uint32,
+ TpID uint32, uni string) {
+ var gemPort uint32
+ intfID := args[IntfID]
+ onuID := args[OnuID]
+ uniID := args[UniID]
+ portNo := args[PortNo]
+ allocID := TpInst.UsScheduler.AllocID
if ipProto, ok := classifierInfo[IPProto]; ok {
if ipProto.(uint32) == IPProtoDhcp {
log.Info("Adding DHCP flow")
@@ -1602,7 +1667,7 @@
tp_pb.Direction_UPSTREAM,
pcp.(uint32))
//Adding DHCP upstream flow
- f.addDHCPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
+ f.addDHCPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
} else {
//Adding DHCP upstream flow to all gemports
installFlowOnAllGemports(f.addDHCPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, DhcpFlow)
@@ -1629,7 +1694,7 @@
tp_pb.Direction_UPSTREAM,
pcp.(uint32))
- f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID[0], gemPort, vlanID)
+ f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID, gemPort, vlanID)
} else {
installFlowOnAllGemports(nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, EapolFlow, vlanID)
}
@@ -1641,7 +1706,7 @@
tp_pb.Direction_UPSTREAM,
pcp.(uint32))
//Adding HSIA upstream flow
- f.addUpstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
+ f.addUpstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
} else {
//Adding HSIA upstream flow to all gemports
installFlowOnAllGemports(f.addUpstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
@@ -1653,7 +1718,7 @@
tp_pb.Direction_DOWNSTREAM,
pcp.(uint32))
//Adding HSIA downstream flow
- f.addDownstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
+ f.addDownstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
} else {
//Adding HSIA downstream flow to all gemports
installFlowOnAllGemports(f.addDownstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
@@ -1666,6 +1731,27 @@
go f.sendTPDownloadMsgToChild(intfID, onuID, uniID, uni, TpID)
}
+func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPK gemPortKey) bool {
+ flowIDList := f.flowsUsedByGemPort[gemPK]
+ if len(flowIDList) > 1 {
+ return true
+ }
+ return false
+}
+
+func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ponIntf uint32, onuID uint32, uniID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
+ currentGemPorts := f.resourceMgr.GetCurrentGEMPortIDsForOnu(ponIntf, onuID, uniID)
+ tpGemPorts := tpInst.UpstreamGemPortAttributeList
+ for _, currentGemPort := range currentGemPorts {
+ for _, tpGemPort := range tpGemPorts {
+ if (currentGemPort == tpGemPort.GemportID) && (currentGemPort != gemPortID) {
+ return true, currentGemPort
+ }
+ }
+ }
+ return false, 0
+}
+
func formulateClassifierInfoFromFlow(classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
for _, field := range flows.GetOfbFields(flow) {
if field.Type == flows.ETH_TYPE {
@@ -1801,3 +1887,29 @@
}
return nil
}
+
+func getTpIDFromFlow(flow *ofp.OfpFlowStats) uint32 {
+ /* Metadata 8 bytes:
+ Most Significant 2 Bytes = Inner VLAN
+ Next 2 Bytes = Tech Profile ID(TPID)
+ Least Significant 4 Bytes = Port ID
+ Flow Metadata carries Tech-Profile (TP) ID and is mandatory in all
+ subscriber related flows.
+ */
+ metadata := flows.GetMetadataFromWriteMetadataAction(flow)
+ if metadata == 0 {
+ log.Error("Metadata is not present in flow which is mandatory")
+ return 0
+ }
+ TpID := flows.GetTechProfileIDFromWriteMetaData(metadata)
+ return uint32(TpID)
+}
+
+func appendUnique(slice []uint32, item uint32) []uint32 {
+ for _, sliceElement := range slice {
+ if sliceElement == item {
+ return slice
+ }
+ }
+ return append(slice, item)
+}