VOL-3431: Following enhancement/changes are done in this patch
- Process ONUs in bulk rather than serial, significantly improves run time
- Used a separate API to get flow-id. This flow-id is not freed on failure,
as this adds to unnecessary complexity and unwarranted for a test tool .
- Print the total execution time at the end of the test
- Fixed the Dockerfile to not build vendor module at each docker build time
- Introduced new functions to retry scheduler, queue and flow adds on failure,
but these are not currently used
- Add vendor modules to repo just like all other ONF VOLTHA golang projects do
- Tested all three workflows - ATT, DT and TT
- Bump version to 1.1.0
Change-Id: I6102cb206e78ea04b49b7125b101946ca6f36bfb
diff --git a/core/att_workflow.go b/core/att_workflow.go
index cf5cbae..f673ee0 100644
--- a/core/att_workflow.go
+++ b/core/att_workflow.go
@@ -22,7 +22,6 @@
"github.com/opencord/openolt-scale-tester/config"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
- "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
oop "github.com/opencord/voltha-protos/v3/go/openolt"
tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
"golang.org/x/net/context"
@@ -39,11 +38,10 @@
}
func AddDhcpIPV4Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
- var flowID []uint32
+ var flowID uint32
var err error
- if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(config.NniIntfID)); err != nil {
return err
}
@@ -52,7 +50,7 @@
actionCmd := &oop.ActionCmd{TrapToHost: true}
actionInfo := &oop.Action{Cmd: actionCmd}
- flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID[0],
+ flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID,
FlowType: "downstream", AllocId: -1, GemportId: -1,
Classifier: flowClassifier, Action: actionInfo,
Priority: 1000, PortNo: uint32(config.NniIntfID)}
@@ -67,8 +65,6 @@
if err != nil {
log.Errorw("Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
- rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
- ponresourcemanager.FLOW_ID, flowID)
return err
}
log.Debugw("DHCP IPV4 added to device successfully ", log.Fields{"flow": flow})
@@ -77,11 +73,10 @@
}
func AddDhcpIPV6Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
- var flowID []uint32
+ var flowID uint32
var err error
- if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(config.NniIntfID)); err != nil {
return err
}
@@ -90,7 +85,7 @@
actionCmd := &oop.ActionCmd{TrapToHost: true}
actionInfo := &oop.Action{Cmd: actionCmd}
- flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID[0],
+ flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID,
FlowType: "downstream", AllocId: -1, GemportId: -1,
Classifier: flowClassifier, Action: actionInfo,
Priority: 1000, PortNo: uint32(config.NniIntfID)}
@@ -105,8 +100,6 @@
if err != nil {
log.Errorw("Failed to Add DHCP IPV6 to device", log.Fields{"err": err, "deviceFlow": flow})
- rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
- ponresourcemanager.FLOW_ID, flowID)
return err
}
log.Debugw("DHCP IPV6 added to device successfully ", log.Fields{"flow": flow})
@@ -156,7 +149,6 @@
log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
}
-
return nil
}
@@ -203,7 +195,7 @@
func (att AttWorkFlow) ProvisionEapFlow(subs *Subscriber) error {
var err error
- var flowID []uint32
+ var flowID uint32
var gemPortIDs []uint32
var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -216,13 +208,10 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
- if err := AddFlow(subs, EapolFlow, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, flowID)
+ if err := AddFlow(subs, EapolFlow, Upstream, flowID, allocID, gemID, pcp); err != nil {
return err
}
}
@@ -234,7 +223,7 @@
func (att AttWorkFlow) ProvisionDhcpIPV4Flow(subs *Subscriber) error {
var err error
- var flowID []uint32
+ var flowID uint32
var gemPortIDs []uint32
var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -247,13 +236,10 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
- if err := AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, flowID)
+ if err := AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); err != nil {
return err
}
}
@@ -265,7 +251,7 @@
func (att AttWorkFlow) ProvisionDhcpIPV6Flow(subs *Subscriber) error {
var err error
- var flowID []uint32
+ var flowID uint32
var gemPortIDs []uint32
var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -278,13 +264,10 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
- if err := AddFlow(subs, DhcpFlowIPV6, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, flowID)
+ if err := AddFlow(subs, DhcpFlowIPV6, Upstream, flowID, allocID, gemID, pcp); err != nil {
return err
}
}
@@ -301,7 +284,7 @@
func (att AttWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
var err error
- var flowID []uint32
+ var flowID uint32
var gemPortIDs []uint32
var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -314,22 +297,19 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
var errUs, errDs error
- if errUs = AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+ if errUs = AddFlow(subs, HsiaFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
log.Errorw("failed to install US HSIA flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errDs = AddFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+ if errDs = AddFlow(subs, HsiaFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
log.Errorw("failed to install US HSIA flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
if errUs != nil && errDs != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, flowID)
}
if errUs != nil || errDs != nil {
if errUs != nil {
diff --git a/core/dt_workflow.go b/core/dt_workflow.go
index a06b29b..5c8b345 100644
--- a/core/dt_workflow.go
+++ b/core/dt_workflow.go
@@ -22,7 +22,6 @@
"github.com/opencord/openolt-scale-tester/config"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
- "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
oop "github.com/opencord/voltha-protos/v3/go/openolt"
tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
"golang.org/x/net/context"
@@ -143,7 +142,7 @@
func (dt DtWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
var err error
- var flowID []uint32
+ var flowID uint32
var gemPortIDs []uint32
var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -156,16 +155,13 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
- if err := AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, flowID)
+ if err := AddFlow(subs, HsiaFlow, Upstream, flowID, allocID, gemID, pcp); err != nil {
return err
}
- if err := AddFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID, pcp); err != nil {
+ if err := AddFlow(subs, HsiaFlow, Downstream, flowID, allocID, gemID, pcp); err != nil {
return err
}
}
diff --git a/core/olt_manager.go b/core/olt_manager.go
index 78dd97e..f4e3143 100644
--- a/core/olt_manager.go
+++ b/core/olt_manager.go
@@ -44,7 +44,6 @@
const (
ReasonOk = "OK"
TechProfileKVPath = "service/voltha/technology_profiles/%s/%d" // service/voltha/technology_profiles/xgspon/<tech_profile_tableID>
- DTWorkFlow = "DT"
)
type OnuDeviceKey struct {
@@ -62,6 +61,7 @@
testConfig *config.OpenOltScaleTesterConfig
rsrMgr *OpenOltResourceMgr
lockRsrAlloc sync.RWMutex
+ lockOpenOltManager sync.RWMutex
}
func init() {
@@ -74,6 +74,7 @@
ipPort: ipPort,
OnuDeviceMap: make(map[OnuDeviceKey]*OnuDevice),
lockRsrAlloc: sync.RWMutex{},
+ lockOpenOltManager: sync.RWMutex{},
}
}
@@ -208,43 +209,76 @@
func (om *OpenOltManager) provisionONUs() {
var numOfONUsPerPon uint
- var i, j, onuID uint32
+ var i, j, k, onuID uint32
var err error
- oltChan := make(chan bool)
- numOfONUsPerPon = om.testConfig.NumOfOnu / uint(om.deviceInfo.PonPorts)
- if oddONUs := om.testConfig.NumOfOnu % uint(om.deviceInfo.PonPorts); oddONUs > 0 {
- log.Warnw("Odd number ONUs left out of provisioning", log.Fields{"oddONUs": oddONUs})
+ var onuWg sync.WaitGroup
+
+ defer func() {
+ // Stop the process once the job is done
+ _ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
+ }()
+
+ // If the number of ONUs to provision is not a power of 2, stop execution
+ // This is needed for ensure even distribution of ONUs across all PONs
+ if !isPowerOfTwo(om.testConfig.NumOfOnu) {
+ log.Errorw("num-of-onus-to-provision-is-not-a-power-of-2", log.Fields{"numOfOnus": om.testConfig.NumOfOnu})
+ return
}
+
+ // Number of ONUs to provision should not be less than the number of PON ports.
+ // We need at least one ONU per PON
+ if om.testConfig.NumOfOnu < uint(om.deviceInfo.PonPorts) {
+ log.Errorw("num-of-onu-is-less-than-num-of-pon-port", log.Fields{"numOfOnus":om.testConfig.NumOfOnu, "numOfPon": om.deviceInfo.PonPorts})
+ return
+ }
+
+ numOfONUsPerPon = om.testConfig.NumOfOnu / uint(om.deviceInfo.PonPorts)
totalOnusToProvision := numOfONUsPerPon * uint(om.deviceInfo.PonPorts)
log.Infow("***** all-onu-provision-started ******",
log.Fields{"totalNumOnus": totalOnusToProvision,
"numOfOnusPerPon": numOfONUsPerPon,
"numOfPons": om.deviceInfo.PonPorts})
- for i = 0; i < om.deviceInfo.PonPorts; i++ {
- for j = 0; j < uint32(numOfONUsPerPon); j++ {
- // TODO: More work with ONU provisioning
- om.lockRsrAlloc.Lock()
- sn := GenerateNextONUSerialNumber()
- om.lockRsrAlloc.Unlock()
- log.Debugw("provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
- if onuID, err = om.rsrMgr.GetONUID(i); err != nil {
- log.Errorw("error getting onu id", log.Fields{"err": err})
- continue
- }
- log.Infow("onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
- go om.activateONU(i, onuID, sn, om.stringifySerialNumber(sn), oltChan)
- // Wait for complete ONU provision to succeed, including provisioning the subscriber
- <-oltChan
- log.Infow("onu-provision-completed-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
- // Sleep for configured time before provisioning next ONU
- time.Sleep(time.Duration(om.testConfig.TimeIntervalBetweenSubs))
- }
+ // These are the number of ONUs that will be provisioned per PON port per batch.
+ // Such number of ONUs will be chosen across all PON ports per batch
+ var onusPerIterationPerPonPort uint32 = 4
+
+ // If the total number of ONUs per PON is lesser than the default ONU to provision per pon port per batch
+ // then keep halving the ONU to provision per pon port per batch until we reach an acceptable number
+ // Note: the least possible value for onusPerIterationPerPonPort is 1
+ for uint32(numOfONUsPerPon) < onusPerIterationPerPonPort {
+ onusPerIterationPerPonPort /= 2
}
- log.Info("******** all-onu-provisioning-completed *******")
- // TODO: We need to dump the results at the end. But below json marshall does not work
- // We will need custom Marshal function.
+ startTime := time.Now()
+ // Start provisioning the ONUs
+ for i = 0; i < uint32(numOfONUsPerPon)/onusPerIterationPerPonPort; i++ {
+ for j = 0; j < om.deviceInfo.PonPorts; j++ {
+ for k = 0; k < onusPerIterationPerPonPort; k++ {
+ om.lockRsrAlloc.Lock()
+ sn := GenerateNextONUSerialNumber()
+ om.lockRsrAlloc.Unlock()
+ log.Debugw("provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
+ if onuID, err = om.rsrMgr.GetONUID(j); err != nil {
+ log.Errorw("error getting onu id", log.Fields{"err": err})
+ continue
+ }
+ log.Infow("onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
+
+ onuWg.Add(1)
+ go om.activateONU(j, onuID, sn, om.stringifySerialNumber(sn), &onuWg)
+ }
+ }
+ // Wait for the group of ONUs to complete processing before going to next batch of ONUs
+ onuWg.Wait()
+ }
+ endTime := time.Now()
+ log.Info("******** all-onu-provisioning-completed *******")
+ totalTime := endTime.Sub(startTime)
+ out := time.Time{}.Add(totalTime)
+ log.Infof("****** Total Time to provision all the ONUs is => %s", out.Format("15:04:05"))
+
+ // TODO: We need to dump the results at the end. But below json marshall does not work. We will need custom Marshal function.
/*
e, err := json.Marshal(om)
if err != nil {
@@ -253,12 +287,9 @@
}
fmt.Println(string(e))
*/
-
- // Stop the process once the job is done
- _ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}
-func (om *OpenOltManager) activateONU(intfID uint32, onuID uint32, serialNum *oop.SerialNumber, serialNumber string, oltCh chan bool) {
+func (om *OpenOltManager) activateONU(intfID uint32, onuID uint32, serialNum *oop.SerialNumber, serialNumber string, onuWg *sync.WaitGroup) {
log.Debugw("activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
// TODO: need resource manager
var pir uint32 = 1000000
@@ -269,6 +300,7 @@
openOltClient: om.openOltClient,
testConfig: om.testConfig,
rsrMgr: om.rsrMgr,
+ onuWg: onuWg,
}
var err error
onuDeviceKey := OnuDeviceKey{onuID: onuID, ponInfID: intfID}
@@ -281,7 +313,6 @@
st, _ := status.FromError(err)
if st.Code() == codes.AlreadyExists {
log.Debug("ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
- oltCh <- false
} else {
nanos = now.UnixNano()
milliEnd := nanos / 1000000
@@ -289,7 +320,6 @@
onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu, "err ": err})
onuDevice.Reason = err.Error()
- oltCh <- false
}
} else {
nanos = now.UnixNano()
@@ -300,12 +330,16 @@
log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
}
+ om.lockOpenOltManager.Lock()
om.OnuDeviceMap[onuDeviceKey] = &onuDevice
+ om.lockOpenOltManager.Unlock()
// If ONU activation was success provision the ONU
if err == nil {
- // start provisioning the ONU
- go om.OnuDeviceMap[onuDeviceKey].Start(oltCh)
+ om.lockOpenOltManager.RLock()
+ go om.OnuDeviceMap[onuDeviceKey].Start()
+ om.lockOpenOltManager.RUnlock()
+
}
}
@@ -441,3 +475,7 @@
log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
return nil
}
+
+func isPowerOfTwo(numOfOnus uint) bool {
+ return (numOfOnus & (numOfOnus - 1)) == 0
+}
\ No newline at end of file
diff --git a/core/onu_manager.go b/core/onu_manager.go
index 6fa9201..4fd665b 100644
--- a/core/onu_manager.go
+++ b/core/onu_manager.go
@@ -18,6 +18,7 @@
import (
"strconv"
+ "sync"
"time"
"github.com/opencord/openolt-scale-tester/config"
@@ -45,13 +46,13 @@
openOltClient oop.OpenoltClient
testConfig *config.OpenOltScaleTesterConfig
rsrMgr *OpenOltResourceMgr
+ onuWg *sync.WaitGroup
}
-func (onu *OnuDevice) Start(oltCh chan bool) {
+func (onu *OnuDevice) Start() {
onu.SubscriberMap = make(map[SubscriberKey]*Subscriber)
- onuCh := make(chan bool)
var subs uint
-
+ var subWg sync.WaitGroup
log.Infow("onu-provision-started-from-onu-manager", log.Fields{"onuID": onu.OnuID, "ponIntf": onu.PonIntf})
for subs = 0; subs < onu.testConfig.SubscribersPerOnu; subs++ {
@@ -67,24 +68,22 @@
OpenOltClient: onu.openOltClient,
TestConfig: onu.testConfig,
RsrMgr: onu.rsrMgr,
+ subWg: &subWg,
}
subsKey := SubscriberKey{subsName}
onu.SubscriberMap[subsKey] = &subs
+ subWg.Add(1)
log.Infow("subscriber-provision-started-from-onu-manager", log.Fields{"subsName": subsName})
// Start provisioning the subscriber
- go subs.Start(onuCh, onu.testConfig.IsGroupTest)
+ go subs.Start(onu.testConfig.IsGroupTest)
- // Wait for subscriber provision to complete
- <-onuCh
-
- log.Infow("subscriber-provision-completed-from-onu-manager", log.Fields{"subsName": subsName})
-
- //Sleep for configured interval before provisioning another subscriber
- time.Sleep(time.Duration(onu.testConfig.TimeIntervalBetweenSubs))
}
- // Indicate that the ONU provisioning is complete
- oltCh <- true
+
+ // Wait for all the subscribers on the ONU to complete provisioning
+ subWg.Wait()
+ // Signal that ONU provisioning is complete
+ onu.onuWg.Done()
log.Infow("onu-provision-completed-from-onu-manager", log.Fields{"onuID": onu.OnuID, "ponIntf": onu.PonIntf})
}
diff --git a/core/resource_manager.go b/core/resource_manager.go
index 3be01bc..d8674c9 100644
--- a/core/resource_manager.go
+++ b/core/resource_manager.go
@@ -18,10 +18,9 @@
package core
import (
- "errors"
- "fmt"
"strconv"
"strings"
+ "sync"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
@@ -36,6 +35,16 @@
// OpenOltResourceMgr holds resource related information as provided below for each field
type OpenOltResourceMgr struct {
deviceInfo *openolt.DeviceInfo
+
+ // This protects concurrent onu_id allocate/delete calls on a per PON port basis
+ OnuIDMgmtLock []sync.RWMutex
+ // This protects concurrent flow_id allocate/delete calls. We do not need this on a
+ // per PON port basis as flow IDs are unique across the OLT.
+ FlowIDMgmtLock sync.RWMutex
+
+ // This protects concurrent GemID and AllocID allocate/delete calls on a per PON port basis
+ GemIDAllocIDLock []sync.RWMutex
+
// array of pon resource managers per interface technology
ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
}
@@ -48,6 +57,11 @@
log.Debugf("Init new resource manager")
ResourceMgr.deviceInfo = devInfo
+ NumPONPorts := devInfo.GetPonPorts()
+
+ ResourceMgr.OnuIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
+ ResourceMgr.GemIDAllocIDLock = make([]sync.RWMutex, NumPONPorts)
+ ResourceMgr.FlowIDMgmtLock = sync.RWMutex{}
Ranges := make(map[string]*openolt.DeviceInfo_DeviceResourceRanges)
RsrcMgrsByTech := make(map[string]*ponrmgr.PONResourceManager)
@@ -64,7 +78,6 @@
var ranges openolt.DeviceInfo_DeviceResourceRanges
ranges.Technology = devInfo.GetTechnology()
- NumPONPorts := devInfo.GetPonPorts()
var index uint32
for index = 0; index < NumPONPorts; index++ {
ranges.IntfIds = append(ranges.IntfIds, index)
@@ -277,27 +290,6 @@
// Delete clears used resources for the particular olt device being deleted
func (RsrcMgr *OpenOltResourceMgr) Delete() error {
- /* TODO
- def __del__(self):
- self.log.info("clearing-device-resource-pool")
- for key, resource_mgr in self.resource_mgrs.iteritems():
- resource_mgr.clear_device_resource_pool()
-
- def assert_pon_id_limit(self, pon_intf_id):
- assert pon_intf_id in self.resource_mgrs
-
- def assert_onu_id_limit(self, pon_intf_id, onu_id):
- self.assert_pon_id_limit(pon_intf_id)
- self.resource_mgrs[pon_intf_id].assert_resource_limits(onu_id, PONResourceManager.ONU_ID)
-
- @property
- def max_uni_id_per_onu(self):
- return 0 #OpenOltPlatform.MAX_UNIS_PER_ONU-1, zero-based indexing Uncomment or override to make default multi-uni
-
- def assert_uni_id_limit(self, pon_intf_id, onu_id, uni_id):
- self.assert_onu_id_limit(pon_intf_id, onu_id)
- self.resource_mgrs[pon_intf_id].assert_resource_limits(uni_id, PONResourceManager.UNI_ID)
- */
for _, rsrcMgr := range RsrcMgr.ResourceMgrs {
if err := rsrcMgr.ClearDeviceResourcePool(context.Background()); err != nil {
log.Debug("Failed to clear device resource pool")
@@ -310,143 +302,29 @@
// GetONUID returns the available OnuID for the given pon-port
func (RsrcMgr *OpenOltResourceMgr) GetONUID(ponIntfID uint32) (uint32, error) {
+ RsrcMgr.OnuIDMgmtLock[ponIntfID].Lock()
+ defer RsrcMgr.OnuIDMgmtLock[ponIntfID].Unlock()
// Check if Pon Interface ID is present in Resource-manager-map
- if _, ok := RsrcMgr.ResourceMgrs[ponIntfID]; !ok {
- err := errors.New("invalid-pon-interface-" + strconv.Itoa(int(ponIntfID)))
- return 0, err
- }
- // Get ONU id for a provided pon interface ID.
- ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(context.Background(), ponIntfID,
+ ONUIDs, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(context.Background(), ponIntfID,
ponrmgr.ONU_ID, 1)
if err != nil {
log.Errorf("Failed to get resource for interface %d for type %s",
ponIntfID, ponrmgr.ONU_ID)
- return 0, err
+ return uint32(0), err
}
- if ONUID != nil {
- RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(context.Background(), fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
- return ONUID[0], err
- }
-
- return 0, err // return OnuID 0 on error
+ return ONUIDs[0], err
}
-// GetAllocID return the first Alloc ID for a given pon interface id and onu id and then update the resource map on
-// the KV store with the list of alloc_ids allocated for the pon_intf_onu_id tuple
-// Currently of all the alloc_ids available, it returns the first alloc_id in the list for tha given ONU
-func (RsrcMgr *OpenOltResourceMgr) GetAllocID(intfID uint32, onuID uint32, uniID uint32) uint32 {
-
- var err error
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(context.Background(), IntfOnuIDUniID)
- if AllocID != nil {
- // Since we support only one alloc_id for the ONU at the moment,
- // return the first alloc_id in the list, if available, for that
- // ONU.
- log.Debugw("Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
- return AllocID[0]
- }
- AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(context.Background(), intfID,
- ponrmgr.ALLOC_ID, 1)
-
- if AllocID == nil || err != nil {
- log.Error("Failed to allocate alloc id")
- return 0
- }
- // update the resource map on KV store with the list of alloc_id
- // allocated for the pon_intf_onu_id tuple
- err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(context.Background(), IntfOnuIDUniID, AllocID)
+// GetFlowID return flow ID for a given pon interface id, onu id and uni id
+func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ctx context.Context, ponIntfID uint32) (uint32, error) {
+ RsrcMgr.FlowIDMgmtLock.Lock()
+ defer RsrcMgr.FlowIDMgmtLock.Unlock()
+ FlowIDs, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(context.Background(), ponIntfID,
+ ponrmgr.FLOW_ID, 1)
if err != nil {
- log.Error("Failed to update Alloc ID")
- return 0
+ log.Errorf("Failed to get resource for interface %d for type %s",
+ ponIntfID, ponrmgr.FLOW_ID)
+ return uint32(0), err
}
- log.Debugw("Allocated new Tcont from pon resource mgr", log.Fields{"AllocID": AllocID})
- return AllocID[0]
-}
-
-// GetGEMPortID gets gem port id for a particular pon port, onu id and uni id and then update the resource map on
-// the KV store with the list of gemport_id allocated for the pon_intf_onu_id tuple
-func (RsrcMgr *OpenOltResourceMgr) GetGEMPortID(ponPort uint32, onuID uint32,
- uniID uint32, NumOfPorts uint32) ([]uint32, error) {
-
- /* Get gem port id for a particular pon port, onu id
- and uni id.
- */
-
- var err error
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
-
- GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(context.Background(), IntfOnuIDUniID)
- if GEMPortList != nil {
- return GEMPortList, nil
- }
-
- GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(context.Background(), ponPort,
- ponrmgr.GEMPORT_ID, NumOfPorts)
- if err != nil && GEMPortList == nil {
- log.Errorf("Failed to get gem port id for %s", IntfOnuIDUniID)
- return nil, err
- }
-
- // update the resource map on KV store with the list of gemport_id
- // allocated for the pon_intf_onu_id tuple
- err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(context.Background(), IntfOnuIDUniID,
- GEMPortList)
- if err != nil {
- log.Errorf("Failed to update GEM ports to kv store for %s", IntfOnuIDUniID)
- return nil, err
- }
-
- return GEMPortList, err
-}
-
-// FreeFlowID returns the free flow id for a given interface, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) FreeFlowID(IntfID uint32, onuID int32,
- uniID int32, FlowID uint32) {
- var IntfONUID string
- var err error
- FlowIds := make([]uint32, 0)
-
- FlowIds = append(FlowIds, FlowID)
- IntfONUID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
- err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(context.Background(), IntfONUID, FlowID, false)
- if err != nil {
- log.Errorw("Failed to Update flow id for", log.Fields{"intf": IntfONUID})
- }
- RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(context.Background(), IntfONUID, FlowID)
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.FLOW_ID, FlowIds)
-}
-
-// FreeFlowIDs releases the flow Ids
-func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(IntfID uint32, onuID uint32,
- uniID uint32, FlowID []uint32) {
-
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.FLOW_ID, FlowID)
-
- var IntfOnuIDUniID string
- var err error
- for _, flow := range FlowID {
- IntfOnuIDUniID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
- err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(context.Background(), IntfOnuIDUniID, flow, false)
- if err != nil {
- log.Errorw("Failed to Update flow id for", log.Fields{"intf": IntfOnuIDUniID})
- }
- RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(context.Background(), IntfOnuIDUniID, flow)
- }
-}
-
-// FreeAllocID frees AllocID on the PON resource pool and also frees the allocID association
-// for the given OLT device.
-func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(IntfID uint32, allocID uint32) {
- allocIDs := make([]uint32, 0)
- allocIDs = append(allocIDs, allocID)
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.ALLOC_ID, allocIDs)
-}
-
-// FreeGemPortID frees GemPortID on the PON resource pool and also frees the gemPortID association
-// for the given OLT device.
-func (RsrcMgr *OpenOltResourceMgr) FreeGemPortID(IntfID uint32, gemPortID uint32) {
- gemPortIDs := make([]uint32, 0)
- gemPortIDs = append(gemPortIDs, gemPortID)
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
+ return FlowIDs[0], err
}
diff --git a/core/subscriber_manager.go b/core/subscriber_manager.go
index d84fad9..3cf65ad 100644
--- a/core/subscriber_manager.go
+++ b/core/subscriber_manager.go
@@ -18,6 +18,7 @@
import (
"fmt"
+ "sync"
"github.com/opencord/openolt-scale-tester/config"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -86,9 +87,11 @@
OpenOltClient oop.OpenoltClient
TestConfig *config.OpenOltScaleTesterConfig
RsrMgr *OpenOltResourceMgr
+ subWg *sync.WaitGroup
}
-func (subs *Subscriber) Start(onuCh chan bool, isGroup bool) {
+func (subs *Subscriber) Start(isGroup bool) {
+
var err error
log.Infow("workflow-deploy-started-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
@@ -97,22 +100,20 @@
for _, tpID := range subs.TestConfig.TpIDList {
uniPortName := fmt.Sprintf(UniPortName, subs.PonIntf, subs.OnuID, subs.UniID)
- if subs.TpInstance[tpID], err =
- subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.CreateTechProfInstance(context.Background(),
- uint32(tpID), uniPortName, subs.PonIntf); err != nil {
+ subs.RsrMgr.GemIDAllocIDLock[subs.PonIntf].Lock()
+ subs.TpInstance[tpID], err = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.CreateTechProfInstance(context.Background(),
+ uint32(tpID), uniPortName, subs.PonIntf)
+ subs.RsrMgr.GemIDAllocIDLock[subs.PonIntf].Unlock()
+ if err != nil {
log.Errorw("error-creating-tp-instance-for-subs",
log.Fields{"subsName": subs.SubscriberName, "onuID": subs.OnuID, "tpID": tpID})
subs.Reason = ReasonCodeToReasonString(TP_INSTANCE_CREATION_FAILED)
- onuCh <- true
-
return
}
}
- DeployWorkflow(subs, isGroup)
+ go DeployWorkflow(subs, isGroup)
- log.Infow("workflow-deploy-completed-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
-
- onuCh <- true
+ log.Infow("workflow-deploy-started-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
}
diff --git a/core/tt_workflow.go b/core/tt_workflow.go
index 862da20..7b14007 100644
--- a/core/tt_workflow.go
+++ b/core/tt_workflow.go
@@ -23,7 +23,6 @@
"github.com/opencord/openolt-scale-tester/config"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
- "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
oop "github.com/opencord/voltha-protos/v3/go/openolt"
tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
"golang.org/x/net/context"
@@ -42,11 +41,11 @@
}
func AddTtDhcpIPV4Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
- var flowID []uint32
+ var flowID uint32
var err error
- if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ // Allocating flowID from PON0 pool for an trap-from-nni flow
+ if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(0)); err != nil {
return err
}
@@ -55,7 +54,7 @@
actionCmd := &oop.ActionCmd{TrapToHost: true}
actionInfo := &oop.Action{Cmd: actionCmd}
- flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID[0],
+ flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID,
FlowType: "downstream", AllocId: -1, GemportId: -1,
Classifier: flowClassifier, Action: actionInfo,
Priority: 1000, PortNo: uint32(config.NniIntfID)}
@@ -70,8 +69,6 @@
if err != nil {
log.Errorw("Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
- rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
- ponresourcemanager.FLOW_ID, flowID)
return err
}
log.Debugw("DHCP IPV4 added to device successfully ", log.Fields{"flow": flow})
@@ -334,7 +331,7 @@
func (tt TtWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
var err error
- var flowID []uint32
+ var flowID uint32
var gemPortIDs []uint32
var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -347,23 +344,19 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
var errUs, errDs error
- if errUs = AddTtFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+ if errUs = AddTtFlow(subs, HsiaFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
log.Errorw("failed to install US HSIA flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errDs = AddTtFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+ if errDs = AddTtFlow(subs, HsiaFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
log.Errorw("failed to install DS HSIA flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errUs != nil && errDs != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, flowID)
- }
+
if errUs != nil || errDs != nil {
if errUs != nil {
return errUs
@@ -379,7 +372,7 @@
func (tt TtWorkFlow) ProvisionVoipFlow(subs *Subscriber) error {
var err error
- var flowID []uint32
+ var flowID uint32
var gemPortIDs []uint32
var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -392,27 +385,23 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
var errUs, errDs, errDhcp error
- if errUs = AddTtFlow(subs, VoipFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+ if errUs = AddTtFlow(subs, VoipFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
log.Errorw("failed to install US VOIP flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errDs = AddTtFlow(subs, VoipFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+ if errDs = AddTtFlow(subs, VoipFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
log.Errorw("failed to install DS VOIP flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+ if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); errDhcp != nil {
log.Errorw("failed to install US VOIP-DHCP flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errUs != nil && errDs != nil && errDhcp != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, flowID)
- }
+
if errUs != nil || errDs != nil || errDhcp != nil {
if errUs != nil {
return errUs
@@ -431,7 +420,7 @@
func (tt TtWorkFlow) ProvisionVodFlow(subs *Subscriber) error {
var err error
- var flowID []uint32
+ var flowID uint32
var gemPortIDs []uint32
var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -444,31 +433,27 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
var errUs, errDs, errDhcp, errIgmp error
- if errUs = AddTtFlow(subs, VodFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+ if errUs = AddTtFlow(subs, VodFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
log.Errorw("failed to install US VOIP flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errDs = AddTtFlow(subs, VodFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+ if errDs = AddTtFlow(subs, VodFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
log.Errorw("failed to install DS VOIP flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+ if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); errDhcp != nil {
log.Errorw("failed to install US VOIP-DHCP flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errIgmp = AddTtFlow(subs, IgmpFlow, Upstream, flowID[0], allocID, gemID, pcp); errIgmp != nil {
+ if errIgmp = AddTtFlow(subs, IgmpFlow, Upstream, flowID, allocID, gemID, pcp); errIgmp != nil {
log.Errorw("failed to install US VOIP-IGMP flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errUs != nil && errDs != nil && errDhcp != nil && errIgmp != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, flowID)
- }
+
if errUs != nil || errDs != nil || errDhcp != nil || errIgmp != nil {
if errUs != nil {
return errUs
@@ -490,7 +475,7 @@
func (tt TtWorkFlow) ProvisionMgmtFlow(subs *Subscriber) error {
var err error
- var flowID []uint32
+ var flowID uint32
var gemPortIDs []uint32
var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -503,27 +488,23 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, 1); err != nil {
+ if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
var errUs, errDs, errDhcp error
- if errUs = AddTtFlow(subs, MgmtFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+ if errUs = AddTtFlow(subs, MgmtFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
log.Errorw("failed to install US MGMT flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errDs = AddTtFlow(subs, MgmtFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+ if errDs = AddTtFlow(subs, MgmtFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
log.Errorw("failed to install DS MGMT flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+ if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); errDhcp != nil {
log.Errorw("failed to install US MGMT-DHCP flow",
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
- if errUs != nil && errDs != nil && errDhcp != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, flowID)
- }
+
if errUs != nil || errDs != nil || errDhcp != nil {
if errUs != nil {
return errUs
diff --git a/core/workflow_manager.go b/core/workflow_manager.go
index 1112a7e..e9bb24e 100644
--- a/core/workflow_manager.go
+++ b/core/workflow_manager.go
@@ -44,6 +44,9 @@
}
func DeployWorkflow(subs *Subscriber, isGroup bool) {
+
+ defer subs.subWg.Done()
+
var wf = getWorkFlow(subs)
if isGroup {
@@ -104,6 +107,7 @@
}
}
+ log.Infow("subscriber-provision-completed-from-onu-manager", log.Fields{"subsName": subs.SubscriberName})
subs.Reason = ReasonCodeToReasonString(SUBSCRIBER_PROVISION_SUCCESS)
}
diff --git a/core/workflow_utils.go b/core/workflow_utils.go
index 4ab7356..a223c02 100644
--- a/core/workflow_utils.go
+++ b/core/workflow_utils.go
@@ -19,6 +19,7 @@
import (
"errors"
"math/rand"
+ "time"
"github.com/opencord/openolt-scale-tester/config"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -537,3 +538,76 @@
return nil
}
+
+func CreateTrafficSchedWithRetry(OpenOltClient oop.OpenoltClient, sched *oop.TrafficSchedulers) error {
+ maxRetry := 20
+ if _, err := OpenOltClient.CreateTrafficSchedulers(context.Background(), sched); err == nil {
+ log.Info("succeeded in first attempt")
+ return nil
+ } else {
+ log.Info("going for a retry")
+ }
+ for i := 0; i < maxRetry; i++ {
+ if _, err := OpenOltClient.CreateTrafficSchedulers(context.Background(), sched); err != nil {
+ log.Error("retying after delay")
+ time.Sleep(50 * time.Millisecond)
+ continue
+ } else {
+ log.Infow("succeeded in retry iteration=%d!!", log.Fields{"i": i})
+ return nil
+ }
+ }
+
+ return errors.New("failed-to-create-traffic-sched-after-all-retries")
+}
+
+func CreateTrafficQueuesWithRetry(OpenOltClient oop.OpenoltClient, queue *oop.TrafficQueues) error {
+ maxRetry := 20
+ if _, err := OpenOltClient.CreateTrafficQueues(context.Background(), queue); err == nil {
+ log.Info("succeeded in first attempt")
+ return nil
+ }
+ for i := 0; i < maxRetry; i++ {
+ if _, err := OpenOltClient.CreateTrafficQueues(context.Background(), queue); err != nil {
+ time.Sleep(50 * time.Millisecond)
+ continue
+ } else {
+ log.Infow("succeeded in retry iteration=%d!!", log.Fields{"i": i})
+ return nil
+ }
+ }
+
+ return errors.New("failed-to-create-traffic-queue-after-all-retries")
+}
+
+func AddFlowWithRetry(OpenOltClient oop.OpenoltClient, flow *oop.Flow) error {
+
+ var err error
+ maxRetry := 20
+
+ _, err = OpenOltClient.FlowAdd(context.Background(), flow)
+
+ st, _ := status.FromError(err)
+ if st.Code() == codes.AlreadyExists {
+ log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+ return nil
+ }
+ if st.Code() == codes.ResourceExhausted {
+ for i := 0; i < maxRetry; i++ {
+ _, err = OpenOltClient.FlowAdd(context.Background(), flow)
+ st, _ := status.FromError(err)
+ if st.Code() == codes.ResourceExhausted {
+ log.Error("flow-install-failed--retrying")
+ continue
+ } else if st.Code() == codes.OK {
+ log.Infow("flow-install-succeeded-on-retry", log.Fields{"i": i, "flow": flow})
+ return nil
+ }
+ }
+
+ }
+
+ log.Debugw("Flow install failed on all retries ", log.Fields{"flow": flow})
+
+ return err
+}