VOL-3431: Following enhancement/changes are done in this patch
- Process ONUs in bulk rather than serial, significantly improves run time
- Used a separate API to get flow-id. This flow-id is not freed on failure,
  as this adds to unnecessary complexity and unwarranted for a test tool .
- Print the total execution time at the end of the test
- Fixed the Dockerfile to not build vendor module at each docker build time
- Introduced new functions to retry scheduler, queue and flow adds on failure,
  but these are not currently used
- Add vendor modules to repo just like all other ONF VOLTHA golang projects do
- Tested all three workflows - ATT, DT and TT
- Bump version to 1.1.0

Change-Id: I6102cb206e78ea04b49b7125b101946ca6f36bfb
diff --git a/core/att_workflow.go b/core/att_workflow.go
index cf5cbae..f673ee0 100644
--- a/core/att_workflow.go
+++ b/core/att_workflow.go
@@ -22,7 +22,6 @@
 
 	"github.com/opencord/openolt-scale-tester/config"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
 	oop "github.com/opencord/voltha-protos/v3/go/openolt"
 	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
 	"golang.org/x/net/context"
@@ -39,11 +38,10 @@
 }
 
 func AddDhcpIPV4Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
-	var flowID []uint32
+	var flowID uint32
 	var err error
 
-	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
-		ponresourcemanager.FLOW_ID, 1); err != nil {
+	if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(config.NniIntfID)); err != nil {
 		return err
 	}
 
@@ -52,7 +50,7 @@
 	actionCmd := &oop.ActionCmd{TrapToHost: true}
 	actionInfo := &oop.Action{Cmd: actionCmd}
 
-	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID[0],
+	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID,
 		FlowType: "downstream", AllocId: -1, GemportId: -1,
 		Classifier: flowClassifier, Action: actionInfo,
 		Priority: 1000, PortNo: uint32(config.NniIntfID)}
@@ -67,8 +65,6 @@
 
 	if err != nil {
 		log.Errorw("Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
-		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
-			ponresourcemanager.FLOW_ID, flowID)
 		return err
 	}
 	log.Debugw("DHCP IPV4 added to device successfully ", log.Fields{"flow": flow})
@@ -77,11 +73,10 @@
 }
 
 func AddDhcpIPV6Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
-	var flowID []uint32
+	var flowID uint32
 	var err error
 
-	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
-		ponresourcemanager.FLOW_ID, 1); err != nil {
+	if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(config.NniIntfID)); err != nil {
 		return err
 	}
 
@@ -90,7 +85,7 @@
 	actionCmd := &oop.ActionCmd{TrapToHost: true}
 	actionInfo := &oop.Action{Cmd: actionCmd}
 
-	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID[0],
+	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID,
 		FlowType: "downstream", AllocId: -1, GemportId: -1,
 		Classifier: flowClassifier, Action: actionInfo,
 		Priority: 1000, PortNo: uint32(config.NniIntfID)}
@@ -105,8 +100,6 @@
 
 	if err != nil {
 		log.Errorw("Failed to Add DHCP IPV6 to device", log.Fields{"err": err, "deviceFlow": flow})
-		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
-			ponresourcemanager.FLOW_ID, flowID)
 		return err
 	}
 	log.Debugw("DHCP IPV6 added to device successfully ", log.Fields{"flow": flow})
@@ -156,7 +149,6 @@
 		log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
 		return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
 	}
-
 	return nil
 }
 
@@ -203,7 +195,7 @@
 
 func (att AttWorkFlow) ProvisionEapFlow(subs *Subscriber) error {
 	var err error
-	var flowID []uint32
+	var flowID uint32
 	var gemPortIDs []uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -216,13 +208,10 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, 1); err != nil {
+				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
-					if err := AddFlow(subs, EapolFlow, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
-							ponresourcemanager.FLOW_ID, flowID)
+					if err := AddFlow(subs, EapolFlow, Upstream, flowID, allocID, gemID, pcp); err != nil {
 						return err
 					}
 				}
@@ -234,7 +223,7 @@
 
 func (att AttWorkFlow) ProvisionDhcpIPV4Flow(subs *Subscriber) error {
 	var err error
-	var flowID []uint32
+	var flowID uint32
 	var gemPortIDs []uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -247,13 +236,10 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, 1); err != nil {
+				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
-					if err := AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
-							ponresourcemanager.FLOW_ID, flowID)
+					if err := AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); err != nil {
 						return err
 					}
 				}
@@ -265,7 +251,7 @@
 
 func (att AttWorkFlow) ProvisionDhcpIPV6Flow(subs *Subscriber) error {
 	var err error
-	var flowID []uint32
+	var flowID uint32
 	var gemPortIDs []uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -278,13 +264,10 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, 1); err != nil {
+				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
-					if err := AddFlow(subs, DhcpFlowIPV6, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
-							ponresourcemanager.FLOW_ID, flowID)
+					if err := AddFlow(subs, DhcpFlowIPV6, Upstream, flowID, allocID, gemID, pcp); err != nil {
 						return err
 					}
 				}
@@ -301,7 +284,7 @@
 
 func (att AttWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
 	var err error
-	var flowID []uint32
+	var flowID uint32
 	var gemPortIDs []uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -314,22 +297,19 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, 1); err != nil {
+				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
 					var errUs, errDs error
-					if errUs = AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+					if errUs = AddFlow(subs, HsiaFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
 						log.Errorw("failed to install US HSIA flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errDs = AddFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+					if errDs = AddFlow(subs, HsiaFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
 						log.Errorw("failed to install US HSIA flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
 					if errUs != nil && errDs != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
-							ponresourcemanager.FLOW_ID, flowID)
 					}
 					if errUs != nil || errDs != nil {
 						if errUs != nil {
diff --git a/core/dt_workflow.go b/core/dt_workflow.go
index a06b29b..5c8b345 100644
--- a/core/dt_workflow.go
+++ b/core/dt_workflow.go
@@ -22,7 +22,6 @@
 
 	"github.com/opencord/openolt-scale-tester/config"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
 	oop "github.com/opencord/voltha-protos/v3/go/openolt"
 	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
 	"golang.org/x/net/context"
@@ -143,7 +142,7 @@
 
 func (dt DtWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
 	var err error
-	var flowID []uint32
+	var flowID uint32
 	var gemPortIDs []uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -156,16 +155,13 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, 1); err != nil {
+				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
-					if err := AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
-							ponresourcemanager.FLOW_ID, flowID)
+					if err := AddFlow(subs, HsiaFlow, Upstream, flowID, allocID, gemID, pcp); err != nil {
 						return err
 					}
-					if err := AddFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID, pcp); err != nil {
+					if err := AddFlow(subs, HsiaFlow, Downstream, flowID, allocID, gemID, pcp); err != nil {
 						return err
 					}
 				}
diff --git a/core/olt_manager.go b/core/olt_manager.go
index 78dd97e..f4e3143 100644
--- a/core/olt_manager.go
+++ b/core/olt_manager.go
@@ -44,7 +44,6 @@
 const (
 	ReasonOk          = "OK"
 	TechProfileKVPath = "service/voltha/technology_profiles/%s/%d" // service/voltha/technology_profiles/xgspon/<tech_profile_tableID>
-	DTWorkFlow        = "DT"
 )
 
 type OnuDeviceKey struct {
@@ -62,6 +61,7 @@
 	testConfig    *config.OpenOltScaleTesterConfig
 	rsrMgr        *OpenOltResourceMgr
 	lockRsrAlloc  sync.RWMutex
+	lockOpenOltManager sync.RWMutex
 }
 
 func init() {
@@ -74,6 +74,7 @@
 		ipPort:       ipPort,
 		OnuDeviceMap: make(map[OnuDeviceKey]*OnuDevice),
 		lockRsrAlloc: sync.RWMutex{},
+		lockOpenOltManager: sync.RWMutex{},
 	}
 }
 
@@ -208,43 +209,76 @@
 
 func (om *OpenOltManager) provisionONUs() {
 	var numOfONUsPerPon uint
-	var i, j, onuID uint32
+	var i, j, k, onuID uint32
 	var err error
-	oltChan := make(chan bool)
-	numOfONUsPerPon = om.testConfig.NumOfOnu / uint(om.deviceInfo.PonPorts)
-	if oddONUs := om.testConfig.NumOfOnu % uint(om.deviceInfo.PonPorts); oddONUs > 0 {
-		log.Warnw("Odd number ONUs left out of provisioning", log.Fields{"oddONUs": oddONUs})
+	var onuWg sync.WaitGroup
+
+	defer func() {
+		// Stop the process once the job is done
+		_ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
+	}()
+
+	// If the number of ONUs to provision is not a power of 2, stop execution
+	// This is needed for ensure even distribution of ONUs across all PONs
+	if !isPowerOfTwo(om.testConfig.NumOfOnu) {
+		log.Errorw("num-of-onus-to-provision-is-not-a-power-of-2", log.Fields{"numOfOnus": om.testConfig.NumOfOnu})
+		return
 	}
+
+	// Number of ONUs to provision should not be less than the number of PON ports.
+	// We need at least one ONU per PON
+	if om.testConfig.NumOfOnu < uint(om.deviceInfo.PonPorts) {
+		log.Errorw("num-of-onu-is-less-than-num-of-pon-port", log.Fields{"numOfOnus":om.testConfig.NumOfOnu, "numOfPon": om.deviceInfo.PonPorts})
+		return
+	}
+
+	numOfONUsPerPon = om.testConfig.NumOfOnu / uint(om.deviceInfo.PonPorts)
 	totalOnusToProvision := numOfONUsPerPon * uint(om.deviceInfo.PonPorts)
 	log.Infow("***** all-onu-provision-started ******",
 		log.Fields{"totalNumOnus": totalOnusToProvision,
 			"numOfOnusPerPon": numOfONUsPerPon,
 			"numOfPons":       om.deviceInfo.PonPorts})
-	for i = 0; i < om.deviceInfo.PonPorts; i++ {
-		for j = 0; j < uint32(numOfONUsPerPon); j++ {
-			// TODO: More work with ONU provisioning
-			om.lockRsrAlloc.Lock()
-			sn := GenerateNextONUSerialNumber()
-			om.lockRsrAlloc.Unlock()
-			log.Debugw("provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
-			if onuID, err = om.rsrMgr.GetONUID(i); err != nil {
-				log.Errorw("error getting onu id", log.Fields{"err": err})
-				continue
-			}
-			log.Infow("onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
-			go om.activateONU(i, onuID, sn, om.stringifySerialNumber(sn), oltChan)
-			// Wait for complete ONU provision to succeed, including provisioning the subscriber
-			<-oltChan
-			log.Infow("onu-provision-completed-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
 
-			// Sleep for configured time before provisioning next ONU
-			time.Sleep(time.Duration(om.testConfig.TimeIntervalBetweenSubs))
-		}
+	// These are the number of ONUs that will be provisioned per PON port per batch.
+	// Such number of ONUs will be chosen across all PON ports per batch
+	var onusPerIterationPerPonPort uint32 = 4
+
+	// If the total number of ONUs per PON is lesser than the default ONU to provision per pon port per batch
+	// then keep halving the ONU to provision per pon port per batch until we reach an acceptable number
+	// Note: the least possible value for onusPerIterationPerPonPort is 1
+	for uint32(numOfONUsPerPon) < onusPerIterationPerPonPort {
+		onusPerIterationPerPonPort /= 2
 	}
-	log.Info("******** all-onu-provisioning-completed *******")
 
-	// TODO: We need to dump the results at the end. But below json marshall does not work
-	// We will need custom Marshal function.
+	startTime := time.Now()
+	// Start provisioning the ONUs
+	for i = 0; i < uint32(numOfONUsPerPon)/onusPerIterationPerPonPort; i++ {
+		for j = 0; j < om.deviceInfo.PonPorts; j++ {
+			for k = 0; k < onusPerIterationPerPonPort; k++ {
+				om.lockRsrAlloc.Lock()
+				sn := GenerateNextONUSerialNumber()
+				om.lockRsrAlloc.Unlock()
+				log.Debugw("provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
+				if onuID, err = om.rsrMgr.GetONUID(j); err != nil {
+					log.Errorw("error getting onu id", log.Fields{"err": err})
+					continue
+				}
+				log.Infow("onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
+
+				onuWg.Add(1)
+				go om.activateONU(j, onuID, sn, om.stringifySerialNumber(sn), &onuWg)
+			}
+		}
+		// Wait for the group of ONUs to complete processing before going to next batch of ONUs
+		onuWg.Wait()
+	}
+	endTime := time.Now()
+	log.Info("******** all-onu-provisioning-completed *******")
+	totalTime := endTime.Sub(startTime)
+	out := time.Time{}.Add(totalTime)
+	log.Infof("****** Total Time to provision all the ONUs is => %s", out.Format("15:04:05"))
+
+	// TODO: We need to dump the results at the end. But below json marshall does not work. We will need custom Marshal function.
 	/*
 		e, err := json.Marshal(om)
 		if err != nil {
@@ -253,12 +287,9 @@
 		}
 		fmt.Println(string(e))
 	*/
-
-	// Stop the process once the job is done
-	_ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
 }
 
-func (om *OpenOltManager) activateONU(intfID uint32, onuID uint32, serialNum *oop.SerialNumber, serialNumber string, oltCh chan bool) {
+func (om *OpenOltManager) activateONU(intfID uint32, onuID uint32, serialNum *oop.SerialNumber, serialNumber string, onuWg *sync.WaitGroup) {
 	log.Debugw("activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
 	// TODO: need resource manager
 	var pir uint32 = 1000000
@@ -269,6 +300,7 @@
 		openOltClient: om.openOltClient,
 		testConfig:    om.testConfig,
 		rsrMgr:        om.rsrMgr,
+		onuWg:         onuWg,
 	}
 	var err error
 	onuDeviceKey := OnuDeviceKey{onuID: onuID, ponInfID: intfID}
@@ -281,7 +313,6 @@
 		st, _ := status.FromError(err)
 		if st.Code() == codes.AlreadyExists {
 			log.Debug("ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
-			oltCh <- false
 		} else {
 			nanos = now.UnixNano()
 			milliEnd := nanos / 1000000
@@ -289,7 +320,6 @@
 			onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
 			log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu, "err ": err})
 			onuDevice.Reason = err.Error()
-			oltCh <- false
 		}
 	} else {
 		nanos = now.UnixNano()
@@ -300,12 +330,16 @@
 		log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
 	}
 
+	om.lockOpenOltManager.Lock()
 	om.OnuDeviceMap[onuDeviceKey] = &onuDevice
+	om.lockOpenOltManager.Unlock()
 
 	// If ONU activation was success provision the ONU
 	if err == nil {
-		// start provisioning the ONU
-		go om.OnuDeviceMap[onuDeviceKey].Start(oltCh)
+		om.lockOpenOltManager.RLock()
+		go om.OnuDeviceMap[onuDeviceKey].Start()
+		om.lockOpenOltManager.RUnlock()
+
 	}
 }
 
@@ -441,3 +475,7 @@
 		log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
 	return nil
 }
+
+func isPowerOfTwo(numOfOnus uint) bool {
+	return (numOfOnus & (numOfOnus - 1)) == 0
+}
\ No newline at end of file
diff --git a/core/onu_manager.go b/core/onu_manager.go
index 6fa9201..4fd665b 100644
--- a/core/onu_manager.go
+++ b/core/onu_manager.go
@@ -18,6 +18,7 @@
 
 import (
 	"strconv"
+	"sync"
 	"time"
 
 	"github.com/opencord/openolt-scale-tester/config"
@@ -45,13 +46,13 @@
 	openOltClient            oop.OpenoltClient
 	testConfig               *config.OpenOltScaleTesterConfig
 	rsrMgr                   *OpenOltResourceMgr
+	onuWg                    *sync.WaitGroup
 }
 
-func (onu *OnuDevice) Start(oltCh chan bool) {
+func (onu *OnuDevice) Start() {
 	onu.SubscriberMap = make(map[SubscriberKey]*Subscriber)
-	onuCh := make(chan bool)
 	var subs uint
-
+	var subWg sync.WaitGroup
 	log.Infow("onu-provision-started-from-onu-manager", log.Fields{"onuID": onu.OnuID, "ponIntf": onu.PonIntf})
 
 	for subs = 0; subs < onu.testConfig.SubscribersPerOnu; subs++ {
@@ -67,24 +68,22 @@
 			OpenOltClient:  onu.openOltClient,
 			TestConfig:     onu.testConfig,
 			RsrMgr:         onu.rsrMgr,
+			subWg:          &subWg,
 		}
 		subsKey := SubscriberKey{subsName}
 		onu.SubscriberMap[subsKey] = &subs
 
+		subWg.Add(1)
 		log.Infow("subscriber-provision-started-from-onu-manager", log.Fields{"subsName": subsName})
 		// Start provisioning the subscriber
-		go subs.Start(onuCh, onu.testConfig.IsGroupTest)
+		go subs.Start(onu.testConfig.IsGroupTest)
 
-		// Wait for subscriber provision to complete
-		<-onuCh
-
-		log.Infow("subscriber-provision-completed-from-onu-manager", log.Fields{"subsName": subsName})
-
-		//Sleep for configured interval before provisioning another subscriber
-		time.Sleep(time.Duration(onu.testConfig.TimeIntervalBetweenSubs))
 	}
-	// Indicate that the ONU provisioning is complete
-	oltCh <- true
+
+	// Wait for all the subscribers on the ONU to complete provisioning
+	subWg.Wait()
+	// Signal that ONU provisioning is complete
+	onu.onuWg.Done()
 
 	log.Infow("onu-provision-completed-from-onu-manager", log.Fields{"onuID": onu.OnuID, "ponIntf": onu.PonIntf})
 }
diff --git a/core/resource_manager.go b/core/resource_manager.go
index 3be01bc..d8674c9 100644
--- a/core/resource_manager.go
+++ b/core/resource_manager.go
@@ -18,10 +18,9 @@
 package core
 
 import (
-	"errors"
-	"fmt"
 	"strconv"
 	"strings"
+	"sync"
 
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
@@ -36,6 +35,16 @@
 // OpenOltResourceMgr holds resource related information as provided below for each field
 type OpenOltResourceMgr struct {
 	deviceInfo *openolt.DeviceInfo
+
+	// This protects concurrent onu_id allocate/delete calls on a per PON port basis
+	OnuIDMgmtLock []sync.RWMutex
+	// This protects concurrent flow_id allocate/delete calls. We do not need this on a
+	// per PON port basis as flow IDs are unique across the OLT.
+	FlowIDMgmtLock sync.RWMutex
+
+	// This protects concurrent GemID and AllocID allocate/delete calls on a per PON port basis
+	GemIDAllocIDLock []sync.RWMutex
+
 	// array of pon resource managers per interface technology
 	ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
 }
@@ -48,6 +57,11 @@
 	log.Debugf("Init new resource manager")
 
 	ResourceMgr.deviceInfo = devInfo
+	NumPONPorts := devInfo.GetPonPorts()
+
+	ResourceMgr.OnuIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
+	ResourceMgr.GemIDAllocIDLock = make([]sync.RWMutex, NumPONPorts)
+	ResourceMgr.FlowIDMgmtLock = sync.RWMutex{}
 
 	Ranges := make(map[string]*openolt.DeviceInfo_DeviceResourceRanges)
 	RsrcMgrsByTech := make(map[string]*ponrmgr.PONResourceManager)
@@ -64,7 +78,6 @@
 		var ranges openolt.DeviceInfo_DeviceResourceRanges
 		ranges.Technology = devInfo.GetTechnology()
 
-		NumPONPorts := devInfo.GetPonPorts()
 		var index uint32
 		for index = 0; index < NumPONPorts; index++ {
 			ranges.IntfIds = append(ranges.IntfIds, index)
@@ -277,27 +290,6 @@
 
 // Delete clears used resources for the particular olt device being deleted
 func (RsrcMgr *OpenOltResourceMgr) Delete() error {
-	/* TODO
-	   def __del__(self):
-	           self.log.info("clearing-device-resource-pool")
-	           for key, resource_mgr in self.resource_mgrs.iteritems():
-	               resource_mgr.clear_device_resource_pool()
-
-	       def assert_pon_id_limit(self, pon_intf_id):
-	           assert pon_intf_id in self.resource_mgrs
-
-	       def assert_onu_id_limit(self, pon_intf_id, onu_id):
-	           self.assert_pon_id_limit(pon_intf_id)
-	           self.resource_mgrs[pon_intf_id].assert_resource_limits(onu_id, PONResourceManager.ONU_ID)
-
-	       @property
-	       def max_uni_id_per_onu(self):
-	           return 0 #OpenOltPlatform.MAX_UNIS_PER_ONU-1, zero-based indexing Uncomment or override to make default multi-uni
-
-	       def assert_uni_id_limit(self, pon_intf_id, onu_id, uni_id):
-	           self.assert_onu_id_limit(pon_intf_id, onu_id)
-	           self.resource_mgrs[pon_intf_id].assert_resource_limits(uni_id, PONResourceManager.UNI_ID)
-	*/
 	for _, rsrcMgr := range RsrcMgr.ResourceMgrs {
 		if err := rsrcMgr.ClearDeviceResourcePool(context.Background()); err != nil {
 			log.Debug("Failed to clear device resource pool")
@@ -310,143 +302,29 @@
 
 // GetONUID returns the available OnuID for the given pon-port
 func (RsrcMgr *OpenOltResourceMgr) GetONUID(ponIntfID uint32) (uint32, error) {
+	RsrcMgr.OnuIDMgmtLock[ponIntfID].Lock()
+	defer RsrcMgr.OnuIDMgmtLock[ponIntfID].Unlock()
 	// Check if Pon Interface ID is present in Resource-manager-map
-	if _, ok := RsrcMgr.ResourceMgrs[ponIntfID]; !ok {
-		err := errors.New("invalid-pon-interface-" + strconv.Itoa(int(ponIntfID)))
-		return 0, err
-	}
-	// Get ONU id for a provided pon interface ID.
-	ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(context.Background(), ponIntfID,
+	ONUIDs, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(context.Background(), ponIntfID,
 		ponrmgr.ONU_ID, 1)
 	if err != nil {
 		log.Errorf("Failed to get resource for interface %d for type %s",
 			ponIntfID, ponrmgr.ONU_ID)
-		return 0, err
+		return uint32(0), err
 	}
-	if ONUID != nil {
-		RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(context.Background(), fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
-		return ONUID[0], err
-	}
-
-	return 0, err // return OnuID 0 on error
+	return ONUIDs[0], err
 }
 
-// GetAllocID return the first Alloc ID for a given pon interface id and onu id and then update the resource map on
-// the KV store with the list of alloc_ids allocated for the pon_intf_onu_id tuple
-// Currently of all the alloc_ids available, it returns the first alloc_id in the list for tha given ONU
-func (RsrcMgr *OpenOltResourceMgr) GetAllocID(intfID uint32, onuID uint32, uniID uint32) uint32 {
-
-	var err error
-	IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
-	AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(context.Background(), IntfOnuIDUniID)
-	if AllocID != nil {
-		// Since we support only one alloc_id for the ONU at the moment,
-		// return the first alloc_id in the list, if available, for that
-		// ONU.
-		log.Debugw("Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
-		return AllocID[0]
-	}
-	AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(context.Background(), intfID,
-		ponrmgr.ALLOC_ID, 1)
-
-	if AllocID == nil || err != nil {
-		log.Error("Failed to allocate alloc id")
-		return 0
-	}
-	// update the resource map on KV store with the list of alloc_id
-	// allocated for the pon_intf_onu_id tuple
-	err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(context.Background(), IntfOnuIDUniID, AllocID)
+// GetFlowID return flow ID for a given pon interface id, onu id and uni id
+func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ctx context.Context, ponIntfID uint32) (uint32, error) {
+	RsrcMgr.FlowIDMgmtLock.Lock()
+	defer RsrcMgr.FlowIDMgmtLock.Unlock()
+	FlowIDs, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(context.Background(), ponIntfID,
+		ponrmgr.FLOW_ID, 1)
 	if err != nil {
-		log.Error("Failed to update Alloc ID")
-		return 0
+		log.Errorf("Failed to get resource for interface %d for type %s",
+			ponIntfID, ponrmgr.FLOW_ID)
+		return uint32(0), err
 	}
-	log.Debugw("Allocated new Tcont from pon resource mgr", log.Fields{"AllocID": AllocID})
-	return AllocID[0]
-}
-
-// GetGEMPortID gets gem port id for a particular pon port, onu id and uni id and then update the resource map on
-// the KV store with the list of gemport_id allocated for the pon_intf_onu_id tuple
-func (RsrcMgr *OpenOltResourceMgr) GetGEMPortID(ponPort uint32, onuID uint32,
-	uniID uint32, NumOfPorts uint32) ([]uint32, error) {
-
-	/* Get gem port id for a particular pon port, onu id
-	   and uni id.
-	*/
-
-	var err error
-	IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
-
-	GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(context.Background(), IntfOnuIDUniID)
-	if GEMPortList != nil {
-		return GEMPortList, nil
-	}
-
-	GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(context.Background(), ponPort,
-		ponrmgr.GEMPORT_ID, NumOfPorts)
-	if err != nil && GEMPortList == nil {
-		log.Errorf("Failed to get gem port id for %s", IntfOnuIDUniID)
-		return nil, err
-	}
-
-	// update the resource map on KV store with the list of gemport_id
-	// allocated for the pon_intf_onu_id tuple
-	err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(context.Background(), IntfOnuIDUniID,
-		GEMPortList)
-	if err != nil {
-		log.Errorf("Failed to update GEM ports to kv store for %s", IntfOnuIDUniID)
-		return nil, err
-	}
-
-	return GEMPortList, err
-}
-
-// FreeFlowID returns the free flow id for a given interface, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) FreeFlowID(IntfID uint32, onuID int32,
-	uniID int32, FlowID uint32) {
-	var IntfONUID string
-	var err error
-	FlowIds := make([]uint32, 0)
-
-	FlowIds = append(FlowIds, FlowID)
-	IntfONUID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
-	err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(context.Background(), IntfONUID, FlowID, false)
-	if err != nil {
-		log.Errorw("Failed to Update flow id  for", log.Fields{"intf": IntfONUID})
-	}
-	RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(context.Background(), IntfONUID, FlowID)
-	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.FLOW_ID, FlowIds)
-}
-
-// FreeFlowIDs releases the flow Ids
-func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(IntfID uint32, onuID uint32,
-	uniID uint32, FlowID []uint32) {
-
-	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.FLOW_ID, FlowID)
-
-	var IntfOnuIDUniID string
-	var err error
-	for _, flow := range FlowID {
-		IntfOnuIDUniID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
-		err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(context.Background(), IntfOnuIDUniID, flow, false)
-		if err != nil {
-			log.Errorw("Failed to Update flow id for", log.Fields{"intf": IntfOnuIDUniID})
-		}
-		RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(context.Background(), IntfOnuIDUniID, flow)
-	}
-}
-
-// FreeAllocID frees AllocID on the PON resource pool and also frees the allocID association
-// for the given OLT device.
-func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(IntfID uint32, allocID uint32) {
-	allocIDs := make([]uint32, 0)
-	allocIDs = append(allocIDs, allocID)
-	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.ALLOC_ID, allocIDs)
-}
-
-// FreeGemPortID frees GemPortID on the PON resource pool and also frees the gemPortID association
-// for the given OLT device.
-func (RsrcMgr *OpenOltResourceMgr) FreeGemPortID(IntfID uint32, gemPortID uint32) {
-	gemPortIDs := make([]uint32, 0)
-	gemPortIDs = append(gemPortIDs, gemPortID)
-	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
+	return FlowIDs[0], err
 }
diff --git a/core/subscriber_manager.go b/core/subscriber_manager.go
index d84fad9..3cf65ad 100644
--- a/core/subscriber_manager.go
+++ b/core/subscriber_manager.go
@@ -18,6 +18,7 @@
 
 import (
 	"fmt"
+	"sync"
 
 	"github.com/opencord/openolt-scale-tester/config"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -86,9 +87,11 @@
 	OpenOltClient oop.OpenoltClient
 	TestConfig    *config.OpenOltScaleTesterConfig
 	RsrMgr        *OpenOltResourceMgr
+	subWg         *sync.WaitGroup
 }
 
-func (subs *Subscriber) Start(onuCh chan bool, isGroup bool) {
+func (subs *Subscriber) Start(isGroup bool) {
+
 	var err error
 
 	log.Infow("workflow-deploy-started-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
@@ -97,22 +100,20 @@
 
 	for _, tpID := range subs.TestConfig.TpIDList {
 		uniPortName := fmt.Sprintf(UniPortName, subs.PonIntf, subs.OnuID, subs.UniID)
-		if subs.TpInstance[tpID], err =
-			subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.CreateTechProfInstance(context.Background(),
-				uint32(tpID), uniPortName, subs.PonIntf); err != nil {
+		subs.RsrMgr.GemIDAllocIDLock[subs.PonIntf].Lock()
+		subs.TpInstance[tpID], err = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.CreateTechProfInstance(context.Background(),
+			uint32(tpID), uniPortName, subs.PonIntf)
+		subs.RsrMgr.GemIDAllocIDLock[subs.PonIntf].Unlock()
+		if err != nil {
 			log.Errorw("error-creating-tp-instance-for-subs",
 				log.Fields{"subsName": subs.SubscriberName, "onuID": subs.OnuID, "tpID": tpID})
 
 			subs.Reason = ReasonCodeToReasonString(TP_INSTANCE_CREATION_FAILED)
-			onuCh <- true
-
 			return
 		}
 	}
 
-	DeployWorkflow(subs, isGroup)
+	go DeployWorkflow(subs, isGroup)
 
-	log.Infow("workflow-deploy-completed-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
-
-	onuCh <- true
+	log.Infow("workflow-deploy-started-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
 }
diff --git a/core/tt_workflow.go b/core/tt_workflow.go
index 862da20..7b14007 100644
--- a/core/tt_workflow.go
+++ b/core/tt_workflow.go
@@ -23,7 +23,6 @@
 
 	"github.com/opencord/openolt-scale-tester/config"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
 	oop "github.com/opencord/voltha-protos/v3/go/openolt"
 	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
 	"golang.org/x/net/context"
@@ -42,11 +41,11 @@
 }
 
 func AddTtDhcpIPV4Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
-	var flowID []uint32
+	var flowID uint32
 	var err error
 
-	if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
-		ponresourcemanager.FLOW_ID, 1); err != nil {
+	// Allocating flowID from PON0 pool for an trap-from-nni flow
+	if flowID, err = rsrMgr.GetFlowID(context.Background(), uint32(0)); err != nil {
 		return err
 	}
 
@@ -55,7 +54,7 @@
 	actionCmd := &oop.ActionCmd{TrapToHost: true}
 	actionInfo := &oop.Action{Cmd: actionCmd}
 
-	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID[0],
+	flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID,
 		FlowType: "downstream", AllocId: -1, GemportId: -1,
 		Classifier: flowClassifier, Action: actionInfo,
 		Priority: 1000, PortNo: uint32(config.NniIntfID)}
@@ -70,8 +69,6 @@
 
 	if err != nil {
 		log.Errorw("Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
-		rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
-			ponresourcemanager.FLOW_ID, flowID)
 		return err
 	}
 	log.Debugw("DHCP IPV4 added to device successfully ", log.Fields{"flow": flow})
@@ -334,7 +331,7 @@
 
 func (tt TtWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
 	var err error
-	var flowID []uint32
+	var flowID uint32
 	var gemPortIDs []uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -347,23 +344,19 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, 1); err != nil {
+				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
 					var errUs, errDs error
-					if errUs = AddTtFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+					if errUs = AddTtFlow(subs, HsiaFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
 						log.Errorw("failed to install US HSIA flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errDs = AddTtFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+					if errDs = AddTtFlow(subs, HsiaFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
 						log.Errorw("failed to install DS HSIA flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errUs != nil && errDs != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
-							ponresourcemanager.FLOW_ID, flowID)
-					}
+
 					if errUs != nil || errDs != nil {
 						if errUs != nil {
 							return errUs
@@ -379,7 +372,7 @@
 
 func (tt TtWorkFlow) ProvisionVoipFlow(subs *Subscriber) error {
 	var err error
-	var flowID []uint32
+	var flowID uint32
 	var gemPortIDs []uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -392,27 +385,23 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, 1); err != nil {
+				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
 					var errUs, errDs, errDhcp error
-					if errUs = AddTtFlow(subs, VoipFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+					if errUs = AddTtFlow(subs, VoipFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
 						log.Errorw("failed to install US VOIP flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errDs = AddTtFlow(subs, VoipFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+					if errDs = AddTtFlow(subs, VoipFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
 						log.Errorw("failed to install DS VOIP flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); errDhcp != nil {
 						log.Errorw("failed to install US VOIP-DHCP flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errUs != nil && errDs != nil && errDhcp != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
-							ponresourcemanager.FLOW_ID, flowID)
-					}
+
 					if errUs != nil || errDs != nil || errDhcp != nil {
 						if errUs != nil {
 							return errUs
@@ -431,7 +420,7 @@
 
 func (tt TtWorkFlow) ProvisionVodFlow(subs *Subscriber) error {
 	var err error
-	var flowID []uint32
+	var flowID uint32
 	var gemPortIDs []uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -444,31 +433,27 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, 1); err != nil {
+				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
 					var errUs, errDs, errDhcp, errIgmp error
-					if errUs = AddTtFlow(subs, VodFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+					if errUs = AddTtFlow(subs, VodFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
 						log.Errorw("failed to install US VOIP flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errDs = AddTtFlow(subs, VodFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+					if errDs = AddTtFlow(subs, VodFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
 						log.Errorw("failed to install DS VOIP flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); errDhcp != nil {
 						log.Errorw("failed to install US VOIP-DHCP flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errIgmp = AddTtFlow(subs, IgmpFlow, Upstream, flowID[0], allocID, gemID, pcp); errIgmp != nil {
+					if errIgmp = AddTtFlow(subs, IgmpFlow, Upstream, flowID, allocID, gemID, pcp); errIgmp != nil {
 						log.Errorw("failed to install US VOIP-IGMP flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errUs != nil && errDs != nil && errDhcp != nil && errIgmp != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
-							ponresourcemanager.FLOW_ID, flowID)
-					}
+
 					if errUs != nil || errDs != nil || errDhcp != nil || errIgmp != nil {
 						if errUs != nil {
 							return errUs
@@ -490,7 +475,7 @@
 
 func (tt TtWorkFlow) ProvisionMgmtFlow(subs *Subscriber) error {
 	var err error
-	var flowID []uint32
+	var flowID uint32
 	var gemPortIDs []uint32
 
 	var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
@@ -503,27 +488,23 @@
 		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
 			if pbitSet == '1' {
 				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
-				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, 1); err != nil {
+				if flowID, err = subs.RsrMgr.GetFlowID(context.Background(), uint32(subs.PonIntf)); err != nil {
 					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
 				} else {
 					var errUs, errDs, errDhcp error
-					if errUs = AddTtFlow(subs, MgmtFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+					if errUs = AddTtFlow(subs, MgmtFlow, Upstream, flowID, allocID, gemID, pcp); errUs != nil {
 						log.Errorw("failed to install US MGMT flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errDs = AddTtFlow(subs, MgmtFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+					if errDs = AddTtFlow(subs, MgmtFlow, Downstream, flowID, allocID, gemID, pcp); errDs != nil {
 						log.Errorw("failed to install DS MGMT flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+					if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID, allocID, gemID, pcp); errDhcp != nil {
 						log.Errorw("failed to install US MGMT-DHCP flow",
 							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
 					}
-					if errUs != nil && errDs != nil && errDhcp != nil {
-						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
-							ponresourcemanager.FLOW_ID, flowID)
-					}
+
 					if errUs != nil || errDs != nil || errDhcp != nil {
 						if errUs != nil {
 							return errUs
diff --git a/core/workflow_manager.go b/core/workflow_manager.go
index 1112a7e..e9bb24e 100644
--- a/core/workflow_manager.go
+++ b/core/workflow_manager.go
@@ -44,6 +44,9 @@
 }
 
 func DeployWorkflow(subs *Subscriber, isGroup bool) {
+
+	defer subs.subWg.Done()
+
 	var wf = getWorkFlow(subs)
 
 	if isGroup {
@@ -104,6 +107,7 @@
 		}
 	}
 
+	log.Infow("subscriber-provision-completed-from-onu-manager", log.Fields{"subsName": subs.SubscriberName})
 	subs.Reason = ReasonCodeToReasonString(SUBSCRIBER_PROVISION_SUCCESS)
 }
 
diff --git a/core/workflow_utils.go b/core/workflow_utils.go
index 4ab7356..a223c02 100644
--- a/core/workflow_utils.go
+++ b/core/workflow_utils.go
@@ -19,6 +19,7 @@
 import (
 	"errors"
 	"math/rand"
+	"time"
 
 	"github.com/opencord/openolt-scale-tester/config"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -537,3 +538,76 @@
 
 	return nil
 }
+
+func CreateTrafficSchedWithRetry(OpenOltClient oop.OpenoltClient, sched *oop.TrafficSchedulers) error {
+	maxRetry := 20
+	if _, err := OpenOltClient.CreateTrafficSchedulers(context.Background(), sched); err == nil {
+		log.Info("succeeded in first attempt")
+		return nil
+	} else {
+		log.Info("going for a retry")
+	}
+	for i := 0; i < maxRetry; i++ {
+		if _, err := OpenOltClient.CreateTrafficSchedulers(context.Background(), sched); err != nil {
+			log.Error("retying after delay")
+			time.Sleep(50 * time.Millisecond)
+			continue
+		} else {
+			log.Infow("succeeded in retry iteration=%d!!", log.Fields{"i": i})
+			return nil
+		}
+	}
+
+	return errors.New("failed-to-create-traffic-sched-after-all-retries")
+}
+
+func CreateTrafficQueuesWithRetry(OpenOltClient oop.OpenoltClient, queue *oop.TrafficQueues) error {
+	maxRetry := 20
+	if _, err := OpenOltClient.CreateTrafficQueues(context.Background(), queue); err == nil {
+		log.Info("succeeded in first attempt")
+		return nil
+	}
+	for i := 0; i < maxRetry; i++ {
+		if _, err := OpenOltClient.CreateTrafficQueues(context.Background(), queue); err != nil {
+			time.Sleep(50 * time.Millisecond)
+			continue
+		} else {
+			log.Infow("succeeded in retry iteration=%d!!", log.Fields{"i": i})
+			return nil
+		}
+	}
+
+	return errors.New("failed-to-create-traffic-queue-after-all-retries")
+}
+
+func AddFlowWithRetry(OpenOltClient oop.OpenoltClient, flow *oop.Flow) error {
+
+	var err error
+	maxRetry := 20
+
+	_, err = OpenOltClient.FlowAdd(context.Background(), flow)
+
+	st, _ := status.FromError(err)
+	if st.Code() == codes.AlreadyExists {
+		log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+		return nil
+	}
+	if st.Code() == codes.ResourceExhausted {
+		for i := 0; i < maxRetry; i++ {
+			_, err = OpenOltClient.FlowAdd(context.Background(), flow)
+			st, _ := status.FromError(err)
+			if st.Code() == codes.ResourceExhausted {
+				log.Error("flow-install-failed--retrying")
+				continue
+			} else if st.Code() == codes.OK {
+				log.Infow("flow-install-succeeded-on-retry", log.Fields{"i": i, "flow": flow})
+				return nil
+			}
+		}
+
+	}
+
+	log.Debugw("Flow install failed on all retries ", log.Fields{"flow": flow})
+
+	return err
+}